当市场不是一次性完成交易时,均衡还存在吗?

观点来自:Roy Radner, Existence of Equilibrium of Plans, Prices, and Price Expectations in a Sequence of Markets;
经济学里最经典的一类均衡模型,通常来自阿罗—德布鲁传统。它有一个非常强的设定:所有商品、所有未来日期、所有可能状态下的交易,都可以在一开始一次性签完。只要市场足够完备,价格体系就能把整个经济协调起来。
但现实显然不是这样。
现实中的市场,是一段一段开的。今天先交易现货和一部分远期合约,明天在新的信息下继续交易,后天再继续。很多未来状态下的合同,今天根本买不到。于是,一个很自然的问题出现了:
如果市场是沿时间依次展开的,而且每一期市场都不完备,那么均衡还存在吗?
Roy Radner 在 1972 年的经典论文 Existence of Equilibrium of Plans, Prices, and Price Expectations in a Sequence of Markets,就是对这个问题的正面回答。它的重要性在于:这篇论文把“未来价格预期”正式纳入了一般均衡理论的核心结构之中。
一、这篇论文到底在研究什么?
Radner 研究的是:
在一个由许多不完备市场按时间顺序组成的经济中,是否存在一种由“当前价格 + 对未来价格的共同预期 + 各主体最优计划”所构成的均衡。
这里有三个关键词:第一,计划;第二,价格;第三,价格预期。
在静态一般均衡里,价格本身往往就足够了。但在序贯市场里,今天的决策不仅取决于今天的价格,还取决于人们对未来价格的看法。因为今天签不全所有未来合同,人们就必须靠对未来市场条件的预期来做今天的选择。
于是,均衡不再只是“供求相等”,而变成了一个更复杂的东西:它要求现在的价格、未来价格的预期、以及每个人今天做出的跨期计划,彼此之间能够相互支持。
二、Radner 最关键的创新:共同预期
这篇论文里最有启发性的概念,是 共同价格预期。
注意,它不是说所有人对未来世界的看法完全一样。Radner 没有要求所有人拥有相同的主观概率分布。不同的人仍然可以对同一个事件发生的可能性有不同判断。
他要求的一致性更弱,也更关键:
只要是同一个未来事件,大家都把它对应到同一组未来价格上。
也就是说,大家可以不同意“这个事件有多大概率发生”,但必须同意“如果它发生,价格会是多少”。
这个处理非常高明。因为序贯市场中的价格协调,并不一定要求主观信念完全一致;真正必须一致的,是“事件—价格”之间的映射。只有这样,今天围绕未来展开的交易才有共同的参照系。
三、模型的世界:时间、事件树与不完备合同
为了把问题说清楚,Radner 建了一个有限期模型。
经济运行在有限多个日期上。外部环境可能沿着不同路径发展,这些可能路径构成有限个“状态”。在每一个日期,人们只能观察到关于状态的一部分信息,因此每个日期对应一个事件划分。随着时间推移,信息逐步揭示,这些事件划分自然形成一棵树。
这就是后来很多动态不确定性模型都会用到的 事件树结构。
每个日期有若干商品。交易不是抽象的,而是通过合约实现。一个合约大体上说的是:
在日期 t、事件 A 发生时签约,约定在未来日期 u、事件 B 下交割某种商品若干单位。
如果
这一步很重要。因为论文不是把“不完备”理解为一个模糊说法,而是明确地把它写进合约集合本身:有些未来状态的交割合约就是不存在。
四、先看最简单的情形:纯交换经济
为了让核心思想先显出来,Radner 先不谈生产,也不谈股票市场,只讨论消费者之间的商品交换。
在这个纯交换模型里,每个交易者要同时选择两类东西:
一类是 消费计划,即在每个日期、每个事件节点上消费多少;另一类是 贸易计划,即在每个节点签哪些现货和远期合同。
一个计划要被认为是可行的,必须满足几组约束。
第一,消费本身必须落在这个人的可行消费集合中。第二,签订的合同必须是制度允许的合同。第三,在每个日期—事件节点上,计划中的交付不能超过自己在扣除消费之后还能拿得出的资源。第四,在每个节点上,按该节点的价格计算,预算必须可行。也就是说,他不能在那个节点凭空融资。
这时,每个交易者面对给定价格体系,会从自己的可行计划里选一个最优方案。
于是,一个纯交换均衡就可以定义为:
给定一组归一化的非负价格,每个交易者都选到自己的最优计划;并且把所有人的计划加总后,在每个市场上总超额供给都等于零。这里说的“每个市场”,不仅包括现货市场,也包括每个允许存在的远期和或有合同市场。
五、纯交换部分的主结论:均衡存在
这篇论文的第一个主要结果是:
在与经典一般均衡理论相近的标准假设下,纯交换经济中的这种均衡存在。
这些假设包括:消费集合闭且凸,效用函数连续且凹,以及一个非常重要的条件——每个日期—事件节点上都非饱和。
非饱和的意思很朴素:在任何一个节点上,只要给消费者单独多一点,他都会更喜欢。这个条件的经济含义很强。它排除了“某些节点上的额外消费完全没价值”的情形,也意味着消费者不会把任何可观察事件赋予零主观概率。
证明方法总体上沿用了 Debreu 的存在性技术路线:先把经济转成一个有界的等价问题,再构造一个合适的对应,最后用角谷不动点定理找到不动点,并把这个不动点解释成均衡。
从思想上说,这个结果说明:
即使市场不是一次性完备地开出来,只要人们围绕未来价格形成共同预期,仍然可以把各自的跨期计划协调成一个一致的整体。
六、为什么还要引入生产和股票市场?
如果只讨论交换,这个理论已经很漂亮了。但现实中,未来收入并不只是来自禀赋交换,还来自企业生产和股权持有。于是,Radner 把模型进一步扩展到一个更复杂、也更接近现实的环境:
有生产者,有股票市场,消费者既是商品购买者,也是股东。
在这个扩展模型里,生产者不仅要选择生产计划,还要选择贸易计划。生产计划记录的是每个日期—事件节点上各种商品的净产出;负值表示投入。企业的合同交割与生产活动必须严格接轨:在每个节点上,之前签订并要求在此交割的合同,总量必须与该节点的净产出相一致。
这时,企业的行为不再是简单的“看到一组静态价格就最大化利润”,而是要在整个事件树上安排收入流。
七、企业收入为什么要通过股票市场传递?
Radner 的处理是:每个生产者对应一种股份。消费者在每个日期都可以交易这些股份;如果不卖掉,就把股份持有到下一个时期。
企业在某个节点产生的收入,会按照前一节点持有的股份比例,分配给股东。这样一来,消费者的预算约束就不只取决于自己的商品交易和原始禀赋,还取决于:
第一,企业分红;第二,买卖股票带来的组合收入变化。
为了处理最初的股权结构,论文还额外设了一个“日期 0”的初始股票市场,它发生在真正的生产和消费开始之前。这个设定看上去有点技术化,但它解决了一个重要问题:初始股份禀赋必须先有一个市场化重组的机会,然后后续的分红归属才有统一定义。
八、复杂性的核心:消费者的预算约束不再是独立给定的
在这个扩展模型中,消费者要同时选择三类计划:
消费计划、商品贸易计划、投资组合计划。
难点在于,消费者的可行集合不再只由价格决定,还依赖于企业未来收入如何形成与分配。换句话说,消费者的问题要以“企业行为的结果”为前提,而企业行为又取决于价格。整个系统比纯交换模型紧密得多。
这也是为什么,加入生产和股票市场后,直接证明“真正均衡”存在,会变得困难很多。
九、这时 Radner 引入了一个关键中间概念:伪均衡
这篇论文最容易被忽略、但其实最重要的技术贡献之一,就是 伪均衡 这个概念。
为什么要搞一个“伪”均衡?因为在有生产和股票市场时,直接要求每个日期、每个事件、每类商品和股份市场都严格出清,证明起来太难。
于是,Radner 先退一步。
在伪均衡中,个体最优性还在:生产者在给定价格下做的是自己的最优计划;消费者在给定价格和收入分配下做的也是自己的最优计划。
但对市场清算条件,他先不直接要求“所有市场都等号成立”,而是要求:
在每个日期—事件节点上,给定价格使得“计划消费者储蓄总价值超过计划新投资总价值的部分”达到最小。
这个条件听上去抽象,直观上可以理解为:价格已经调到了一个不能再进一步压缩可见失衡的水平。它是一个比真正市场出清更弱、但足够有力的条件。
十、扩展模型中的主结论:伪均衡存在,真均衡有条件成立
在包含生产和股票市场的模型里,Radner 得到的是两步式结论。
第一步,伪均衡存在。第二步,如果额外满足一些条件,那么伪均衡其实就是真均衡。
这些额外条件里最重要的有两个:
第一,初始股票市场是清算的;第二,所有日期—事件节点上的股份价格都严格为正。此外,还需要消费者在每个节点上都非饱和。
在这些条件下,论文证明:伪均衡可以升级为真正均衡。也就是说,商品市场和股份市场最终都会在每个节点上清算。
这个结果的含义是:
在序贯市场里,即使每一期市场都不完备,只要价格、预期、计划和股权收入传递机制搭得足够严密,均衡依然可以建立起来。
十一、为什么还要区分伪均衡和真均衡?
这是整篇论文最深的地方之一。
Radner 非常清楚地意识到:有生产者时,最大的理论难题不只是谁最优,而是 如果某些企业“没有人愿意持有”,会发生什么。
如果一个企业的股份在某个节点出现持续的超额供给,那么到后继节点时,就可能有一部分企业收入无人承接。这意味着企业可能在某些未来节点上出现未被覆盖的负收入。用更直白的话说,就是企业可能会 破产,或者需要 退出。
但这套模型并没有把“破产”和“退出”完整地制度化。因此,它能无条件证明的是伪均衡,而不是真均衡。
这不是论文的缺点,恰恰是它的诚实之处。Radner 并没有为了形式上的完美,强行遮掉最棘手的问题。他反而清楚指出:序贯市场的一般均衡理论,要走得更远,就必须认真处理企业破产与退出机制。
十二、这篇论文真正留下了什么?
从今天回看,这篇论文的意义远不只是“证明了一个存在性定理”。
它真正推动的是一般均衡理论的视角转移:
从“一次性完备市场的价格协调”,转向“在不完备的序贯市场中,由价格预期支持的动态协调”。
在这个框架下,均衡不再只是某一时点的静态价格向量,而是一个更完整的系统:
-
当前价格 -
对未来价格的共同预期 -
消费者与生产者沿事件树制定的计划 -
股票市场对企业收入的跨期分配 -
以及这些计划最终是否彼此一致
这就是 Radner 模型的真正野心。
十三、总结这篇论文
Radner 证明了:在市场按时间顺序展开、且每一期都不完备的经济中,只要主体围绕未来价格形成共同预期,并在此基础上做出最优的跨期计划,那么这样的价格—预期—计划体系可以达到均衡;在纯交换下是真均衡,在有生产和股票市场时至少能得到伪均衡,而真均衡是否成立,进一步取决于股份市场清算以及企业破产、退出等问题如何处理。
这也是为什么,这篇 1972 年的论文至今仍然值得重读。它不仅是在讨论“均衡是否存在”,更是在提醒我们:
一旦离开完备市场的理想世界,经济协调问题就不再只是价格问题,而是价格与预期共同作用的问题。
README
# Radner 序贯市场纯交换模型(v3)一个用于数值实验的 Python 脚本,展示 **Radner (1972)** 第 2–4 节中的一个**可计算特例**。相较于前一版,**v3 的重点增强**是:在求解均衡后,除了打印表格,还会**自动导出 CSV、摘要文本和多张可视化图片**,更适合做课程展示、研究记录和结果汇报。---## 项目简介这个项目实现的是一个 **纯交换(pure exchange)**、**单商品(H=1)**、**有限事件树** 下的序贯市场模型。脚本会:1. 自动构造一个二叉事件树2. 为每个市场节点生成可交易合约集合3. 让每个交易者在约束下解自己的最优化问题4. 对统一价格系统做数值搜索5. 检查合约是否逐项市场出清6. 输出表格、摘要与图形化结果它不是 Radner 定理在最一般情形下的完整实现,而是一个**结构忠实、但便于计算与演示的特例**。---## v3 的主要新增内容相较于前面版本,v3 新增了更完整的**结果导出与可视化能力**:- 节点标签更适合展示,例如:- 根节点显示为 `t1`- `down` / `up` 路径会显示成 `t2:D`、`t2:U`、`t3:DU` 这样的形式- 新增 **事件树价格图**- 新增 **合约价格矩阵热力图**- 新增 **交易流向图**- 新增 **禀赋 vs 均衡消费柱状图**- 新增 **可视化结果打包导出**- 每个 demo 运行后会自动把结果保存到 `radner_outputs/` 目录这使得 v3 不只是“能算”,还更适合“展示结果”。---## 模型结构### 1. 事件树模型自动生成一个**二叉事件树**:- 根节点:`()`,展示时通常标为 `t1`- 每走一步都有两个可能分支:- `0`:down- `1`:up例如当 `T=3` 时,树大致对应:```textt1├── t2:D│ ├── t3:DD│ └── t3:DU└── t2:U├── t3:UD└── t3:UU```代码里节点仍然用元组保存历史路径,例如:- `()`:根节点- `(0,)`:第一步 down- `(1,)`:第一步 up- `(0,1)`:先 down 再 up---### 2. 合约每个合约表示为:```pythonContract = (market_node, delivery_node)```含义是:- 在 `market_node` 所在市场签订合约- 在 `delivery_node` 对应状态交割允许交割的节点必须是市场节点的后代节点。如果设置了 `max_forward_horizon`,还会限制最远可交易的前向期限。---### 3. 交易者每个交易者由 `AgentSpec` 定义,核心参数包括:- `name`:交易者名称- `p_up`:主观上行概率- `beta`:贴现因子- `utility_scale`:效用权重- `x_lower`:消费下界- `contract_limit`:合约持仓上下界- `expectation_tilt`:异质预期模式下,对未来价格的主观偏移强度---### 4. 效用与约束#### 效用函数脚本采用的是**对数效用**,并按:- 节点主观概率- 贴现因子- 效用权重进行加总。#### 约束条件个体问题中主要包含:- 消费下界约束- 合约持仓边界约束- 资源 / 自由处置约束- 每个市场节点的预算约束---## 共同预期与异质预期v3 支持两种设定:### 共同预期(`common_expectations=True`)所有交易者按照同一个未来价格系统理解后续市场。在这种情况下,如果市场逐项出清,可以把结果视为脚本意义下的 **Radner equilibrium**。### 异质预期(`common_expectations=False`)不同交易者会对未来价格做主观扭曲。脚本通过 `expectation_tilt` 来实现这种偏离:- 正值:更偏向“上行较多”的未来节点- 负值:更偏向“下行较多”的未来节点这部分的目的不是复现论文中的均衡存在定理,而是为了**故意打破共同预期条件**,观察:- 虽然暂时出清,但不构成 Radner 均衡- 或者数值上更难找到均衡---## 数值方法### 个体最优化每个交易者的问题使用:- `scipy.optimize.minimize`- 方法:`SLSQP`同时优化:- 各节点消费 `x`- 各合约头寸 `z`### 均衡搜索外层对价格系统做搜索:- 当价格维度较低时,使用 `differential_evolution`- 维度较高时,切换到多起点 `L-BFGS-B`价格系统被参数化为各市场节点上的**单纯形概率向量**,保证:- 每个价格非负- 同一市场节点内价格和为 1---## 文件中的三个 demo脚本自带三个示例模型:### 1. `baseline_model()`- 共同预期- 用于演示在标准设定下寻找市场出清价格系统### 2. `heterogeneous_expectations_model()`- 异质预期- 用于展示“找到一个临时清算价格系统”并不等于找到 Radner 均衡### 3. `failure_model()`- 更强烈冲突的主观预期- 更紧的合约界- 用于演示“更可能找不到均衡”或“数值上难以清算”的情形---## 安装依赖建议 Python 3.10+。安装依赖:```bashpip install numpy pandas scipy matplotlib```---## 运行方法假设脚本文件名为:```bashradner_sequential_market_sim_v3.py```直接运行:```bashpython radner_sequential_market_sim_v3.py```运行后会自动依次执行:- `DEMO_1_common_expectations`- `DEMO_2_heterogeneous_expectations`- `DEMO_3_incompatible_expectations`---## 运行后会得到什么### 1. 终端输出每个 demo 会在控制台打印:- 是否找到数值出清解:`exists`- 是否满足脚本定义下的 Radner equilibrium:`radner_equilibrium`- 解释性消息 `message`- 优化目标值 `objective`- 合约市场表- 消费表- 在异质预期下还会打印实际价格与主观价格对照表### 2. 导出文件v3 会把结果保存到:```textradner_outputs/```每个 demo 对应一组文件,通常包含:- `*_summary.txt`- `*_market_table.csv`- `*_consumption_table.csv`- `*_expectation_table.csv`- `*_event_tree_prices.png`- `*_price_matrix.png`- `*_trade_flows.png`- `*_consumption_vs_endowment.png`---## 可视化说明### 事件树价格图展示每个事件树节点,以及该节点对应市场上各可交易合约的价格。适合回答:- “在哪些状态节点可以交易?”- “某个市场节点上,各交割合约价格如何分布?”---### 合约价格矩阵热力图横轴是交割节点 `n`,纵轴是市场节点 `m`,格子里显示 `p(m,n)`。适合快速查看:- 哪些市场—交割组合有定义- 价格高低如何分布- 哪些位置因为不可交易而为空---### 交易流向图按交易者分别画图,用箭头表示远期交易方向和规模:- 绿色:向市场交付- 红色:从市场接收并且会在节点旁标出显著的 spot 交易量。这是 v3 很实用的一部分,因为它把原本只在表格里的 `z_{mn}` 变成了更直观的图。---### 禀赋 vs 消费图按节点比较每个交易者的:- 初始禀赋 `w`- 均衡消费 `x`方便观察交易前后资源配置的变化。---## 代码结构### `AgentSpec`定义交易者参数。### `PureExchangeConfig`定义模型配置,包括:- 树深度 `T`- 是否共同预期- 前向期限限制- 优化器参数- 导出图片 DPI- 交易者列表- 外生禀赋### `RadnerPureExchangeSpecialCase`核心模型类,负责:- 构造事件树- 构造可交易合约- 计算主观概率- 生成默认禀赋- 解个体优化- 搜索均衡- 生成表格- 绘制图形- 导出可视化结果包---## 适合的使用场景这个版本特别适合:- 课堂展示 Radner 序贯市场框架- 做小规模计算经济学实验- 对比共同预期与异质预期- 生成论文附录或讲义中的示意图- 做参数敏感性测试后的批量结果导出---## 局限性请注意,这仍然是一个**数值特例**,而不是完整一般性的理论实现:1. 只处理单商品(`H=1`)2. 使用参数化的正消费集合与对数效用3. 异质预期模式是实验性扩展,不是原论文定理的一部分4. 维度增大后,搜索会明显更重5. `exists=False` 只能说明“在当前设定和数值搜索下未找到均衡”,不等价于严格数学意义上的不存在性证明---## 最推荐的阅读顺序如果你第一次看这份代码,建议按这个顺序读:1. `AgentSpec`2. `PureExchangeConfig`3. `RadnerPureExchangeSpecialCase` 类文档字符串4. `_build_binary_tree()`5. `solve_agent()`6. `solve_equilibrium()`7. `plot_*` 系列可视化函数8. `export_visual_bundle()`9. `baseline_model()` / `heterogeneous_expectations_model()` / `failure_model()`10. `main()`---## 一句话总结**v3 = 一个能计算、能导表、还能自动出图的 Radner 序贯市场数值实验脚本。**
代码:
from __future__ import annotationsfrom dataclasses import dataclass, fieldfrom typing import Dict, List, Tuple, Any, Optionalimport mathimport osimport refrom pathlib import Pathimport matplotlibmatplotlib.use("Agg")import matplotlib.pyplot as pltfrom matplotlib.patches import FancyArrowPatchimport numpy as npimport pandas as pdfrom scipy.optimize import minimize, differential_evolutionNode = Tuple[int, ...]Contract = Tuple[Node, Node] # (market node m, delivery node n)@dataclassclass AgentSpec:name: strp_up: float = 0.5beta: float = 0.96utility_scale: float = 1.0x_lower: float = 1e-3contract_limit: float = 3.0# If common_expectations is False, the agent evaluates future markets through a# tilted relative-price system. Positive tilt overweights delivery nodes with# more "up" shocks, negative tilt overweights "down" shocks.expectation_tilt: float = 0.0@dataclassclass PureExchangeConfig:T: int = 3 # dates 1,...,T in the paper; here depths 0,...,T-1common_expectations: bool = Truemax_forward_horizon: Optional[int] = None # None = all descendants, 1 = one-step onlyoptimizer_tol: float = 1e-6price_search_starts: int = 4heavy_dimension_limit: int = 6agent_solver_maxiter: int = 500equilibrium_tol: float = 5e-4figure_dpi: int = 170agents: List[AgentSpec] = field(default_factory=lambda: [AgentSpec("A", p_up=0.40, beta=0.97, utility_scale=1.0, contract_limit=2.5, expectation_tilt=-0.0),AgentSpec("B", p_up=0.62, beta=0.97, utility_scale=1.0, contract_limit=2.5, expectation_tilt=0.0),])# Endowments by agent by node. If omitted, they are generated automatically.endowments: Optional[Dict[str, Dict[Node, float]]] = Noneclass RadnerPureExchangeSpecialCase:"""A numerically solvable *special case* of Sections 2-4 of Radner (1972).Faithful elements:- finite event tree M of date-event pairs- one commodity (H=1), finite traders, bounded allowable contracts (2.1)- trade plans z_{mn} over allowed contracts (market m, delivery n)- budget constraints p_m z_m >= 0 for every market m (3.4)- resource/free-disposal constraints sum_{m<=n} z_{mn} <= w_n - x_n (3.3)- each trader solves a concave optimization problem yi(p) (3.5)- equilibrium requires a common price system p and contract-by-contract marketclearing sum_i z_i = 0 (4.3)Honest limitation:- this is not the theorem's full generality. Xi and Ui are parameterized to acomputable family (positive-consumption sets + log utility), and H=1.- heterogeneous expectations mode is included only to *break* Radner's commonexpectations axiom and explore nonexistence/failure; it is not a theorem fromthe paper."""def __init__(self, cfg: PureExchangeConfig):self.cfg = cfgif cfg.T < 2:raise ValueError("T must be at least 2.")self.root: Node = tuple()self.nodes_by_depth: Dict[int, List[Node]] = {d: [] for d in range(cfg.T)}self.nodes: List[Node] = []self.children: Dict[Node, List[Node]] = {}self.parents: Dict[Node, Optional[Node]] = {self.root: None}self._build_binary_tree()self.node_index = {n: i for i, n in enumerate(self.nodes)}self.terminal_nodes = [n for n in self.nodes if len(n) == self.cfg.T - 1]self.branch_prob_cache: Dict[str, Dict[Node, float]] = {}self.contracts_by_market: Dict[Node, List[Node]] = self._build_allowable_deliveries()self.contracts: List[Contract] = []for m in self.nodes:for n in self.contracts_by_market[m]:self.contracts.append((m, n))self.contract_index = {c: i for i, c in enumerate(self.contracts)}self.endowments = self._build_endowments()self._check_feasible_lower_bounds()# ------------------------------------------------------------------# Tree, probabilities, endowments# ------------------------------------------------------------------def _build_binary_tree(self) -> None:self.nodes = [self.root]self.nodes_by_depth[0] = [self.root]frontier = [self.root]for d in range(self.cfg.T - 1):nxt: List[Node] = []for node in frontier:dn = node + (0,)up = node + (1,)self.children[node] = [dn, up]self.parents[dn] = nodeself.parents[up] = nodeself.nodes.append(dn)self.nodes.append(up)self.nodes_by_depth[d + 1].extend([dn, up])nxt.extend([dn, up])frontier = nxtdef _descendants(self, node: Node) -> List[Node]:return [n for n in self.nodes if len(n) >= len(node) and n[:len(node)] == node]def _build_allowable_deliveries(self) -> Dict[Node, List[Node]]:"""Implements a configurable family d_tu satisfying spot completeness andmonotonicity. For H=1, a delivery node n is allowable from market node m if:- n is a descendant of m- and the forward horizon does not exceed max_forward_horizon, if imposed."""deliveries: Dict[Node, List[Node]] = {}for m in self.nodes:nodes = []for n in self._descendants(m):horizon = len(n) - len(m)if self.cfg.max_forward_horizon is not None and horizon > self.cfg.max_forward_horizon:continuenodes.append(n)deliveries[m] = nodesreturn deliveriesdef subjective_event_prob(self, agent: AgentSpec, node: Node) -> float:key = agent.nameif key not in self.branch_prob_cache:cache: Dict[Node, float] = {}for n in self.nodes:up = sum(n)dn = len(n) - upcache[n] = (agent.p_up ** up) * ((1.0 - agent.p_up) ** dn)self.branch_prob_cache[key] = cachereturn self.branch_prob_cache[key][node]def _build_endowments(self) -> Dict[str, Dict[Node, float]]:if self.cfg.endowments is not None:return self.cfg.endowments# Smooth positive endowments differing by agent and node.out: Dict[str, Dict[Node, float]] = {}for k, ag in enumerate(self.cfg.agents):agent_endowment: Dict[Node, float] = {}for n in self.nodes:depth = len(n)avg_shock = 0.0 if depth == 0 else (2 * sum(n) - depth) / depthbase = 1.2 + 0.15 * depthtilt = (k - 0.5) * 0.18 * avg_shockagent_endowment[n] = max(ag.x_lower + 0.2, base + tilt)out[ag.name] = agent_endowmentreturn outdef _check_feasible_lower_bounds(self) -> None:for ag in self.cfg.agents:for n in self.nodes:if self.endowments[ag.name][n] <= ag.x_lower:raise ValueError(f"Agent {ag.name} violates assumption analogous to (3.8): "f"endowment at node {n} must exceed x_lower.")# ------------------------------------------------------------------# Price systems# ------------------------------------------------------------------def price_dims(self) -> List[int]:return [max(0, len(self.contracts_by_market[m]) - 1) for m in self.nodes]def _softmax(self, x: np.ndarray) -> np.ndarray:x = x - np.max(x)ex = np.exp(x)return ex / np.sum(ex)def theta_to_price_system(self, theta: np.ndarray) -> Dict[Node, np.ndarray]:"""For each market m, prices live on the unit simplex P_m. We use K_m-1 freeparameters and append a zero baseline logit, so every coordinate isnonnegative and sums to 1, matching Radner's normalization in Section 4."""prices: Dict[Node, np.ndarray] = {}pos = 0for m in self.nodes:k = len(self.contracts_by_market[m])if k == 1:prices[m] = np.array([1.0], dtype=float)else:logits = np.concatenate([theta[pos:pos + k - 1], np.array([0.0])])prices[m] = self._softmax(logits)pos += k - 1return pricesdef price_system_to_flat_contract_dict(self, prices: Dict[Node, np.ndarray]) -> Dict[Contract, float]:flat: Dict[Contract, float] = {}for m in self.nodes:for idx, n in enumerate(self.contracts_by_market[m]):flat[(m, n)] = float(prices[m][idx])return flatdef tilted_belief_prices(self, actual_prices: Dict[Node, np.ndarray], agent: AgentSpec) -> Dict[Node, np.ndarray]:if self.cfg.common_expectations:return actual_pricesout: Dict[Node, np.ndarray] = {}for m in self.nodes:vec = np.array(actual_prices[m], dtype=float)deliveries = self.contracts_by_market[m]weights = []for n in deliveries:depth = len(n)score = 0.0 if depth == 0 else (2 * sum(n) - depth) / max(depth, 1)weights.append(math.exp(agent.expectation_tilt * score))w = np.array(weights, dtype=float)distorted = vec * wdistorted = distorted / distorted.sum()out[m] = distortedreturn out# ------------------------------------------------------------------# Consumer problem yi(p)# ------------------------------------------------------------------def utility(self, agent: AgentSpec, x: np.ndarray) -> float:total = 0.0for idx, n in enumerate(self.nodes):prob = self.subjective_event_prob(agent, n)total += agent.utility_scale * (agent.beta ** len(n)) * prob * math.log(max(x[idx], agent.x_lower))return totaldef solve_agent(self, agent: AgentSpec, actual_prices: Dict[Node, np.ndarray]) -> Dict[str, Any]:believed_prices = self.tilted_belief_prices(actual_prices, agent)n_x = len(self.nodes)n_z = len(self.contracts)x0 = np.array([0.85 * self.endowments[agent.name][n] for n in self.nodes], dtype=float)z0 = np.zeros(n_z, dtype=float)v0 = np.concatenate([x0, z0])bounds = []for _n in self.nodes:bounds.append((agent.x_lower, None))for _ in self.contracts:bounds.append((-agent.contract_limit, agent.contract_limit))def unpack(v: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:return v[:n_x], v[n_x:]constraints = []# (3.3): deliveries to node n cannot exceed endowment minus consumption.for n in self.nodes:n_idx = self.node_index[n]relevant = [self.contract_index[(m, n)] for m in self.nodes if (m, n) in self.contract_index]def con_factory(node_idx=n_idx, rel=tuple(relevant), node=n):return {"type": "ineq","fun": lambda v, node_idx=node_idx, rel=rel, node=node:self.endowments[agent.name][node] - v[node_idx] - np.sum(v[n_x + np.array(rel, dtype=int)]),}constraints.append(con_factory())# (3.4): at each market, the value of signed trades at that market is nonnegative.for m in self.nodes:rel = [self.contract_index[(m, n)] for n in self.contracts_by_market[m]]price_vec = np.array(believed_prices[m], dtype=float)def budget_factory(rel=tuple(rel), price_vec=price_vec):rel_arr = np.array(rel, dtype=int)return {"type": "ineq","fun": lambda v, rel_arr=rel_arr, price_vec=price_vec:float(price_vec @ v[n_x + rel_arr])}constraints.append(budget_factory())objective = lambda v: -self.utility(agent, unpack(v)[0])res = minimize(objective,v0,method="SLSQP",bounds=bounds,constraints=constraints,options={"ftol": 1e-9, "maxiter": self.cfg.agent_solver_maxiter, "disp": False},)if not res.success:raise RuntimeError(f"Agent {agent.name} optimization failed: {res.message}")x, z = unpack(res.x)trade_dict = {c: float(z[self.contract_index[c]]) for c in self.contracts}market_values = {m: float(np.array(believed_prices[m]) @ np.array([trade_dict[(m, n)] for n in self.contracts_by_market[m]]))for m in self.nodes}deliveries = {n: float(sum(trade_dict[(m, n)] for m in self.nodes if (m, n) in self.contract_index))for n in self.nodes}return {"agent": agent.name,"x": {n: float(x[self.node_index[n]]) for n in self.nodes},"z": trade_dict,"utility": self.utility(agent, x),"market_values": market_values,"deliveries": deliveries,"believed_prices": believed_prices,"solve_result": res,}# ------------------------------------------------------------------# Equilibrium search# ------------------------------------------------------------------def aggregate_excess(self, plans: List[Dict[str, Any]]) -> np.ndarray:e = np.zeros(len(self.contracts), dtype=float)for plan in plans:for c, val in plan["z"].items():e[self.contract_index[c]] += valreturn edef _objective_given_theta(self, theta: np.ndarray) -> Tuple[float, Dict[str, Any]]:prices = self.theta_to_price_system(theta)plans: List[Dict[str, Any]] = []failures: List[str] = []for ag in self.cfg.agents:try:plans.append(self.solve_agent(ag, prices))except Exception as exc:failures.append(str(exc))if failures:return 1e6, {"failures": failures}excess = self.aggregate_excess(plans)value = float(np.sum(excess ** 2))return value, {"prices": prices, "plans": plans, "excess": excess}def solve_equilibrium(self) -> Dict[str, Any]:dim = sum(self.price_dims())if dim == 0:val, payload = self._objective_given_theta(np.zeros(0, dtype=float))best = {"opt": None, "objective": val, "payload": payload, "theta": np.zeros(0, dtype=float)}elif dim <= self.cfg.heavy_dimension_limit:bounds = [(-4.0, 4.0)] * dimobj = lambda th: self._objective_given_theta(np.array(th, dtype=float))[0]de = differential_evolution(obj,bounds=bounds,maxiter=35,popsize=10,polish=True,disp=False,seed=1,)val, payload = self._objective_given_theta(np.array(de.x, dtype=float))best = {"opt": de, "objective": val, "payload": payload, "theta": np.array(de.x, dtype=float)}else:best = Nonerng = np.random.default_rng(42)starts = [np.zeros(dim)]for _ in range(max(0, self.cfg.price_search_starts - 1)):starts.append(rng.normal(scale=0.25, size=dim))for start in starts:obj = lambda th: self._objective_given_theta(th)[0]res = minimize(obj,start,method="L-BFGS-B",options={"maxiter": 120, "ftol": 1e-9},)val, payload = self._objective_given_theta(res.x)record = {"opt": res, "objective": val, "payload": payload, "theta": res.x}if best is None or val < best["objective"]:best = recordpayload = best["payload"]if "failures" in payload:return {"exists": False,"radner_equilibrium": False,"message": "; ".join(payload["failures"]),"optimizer": best,}prices = payload["prices"]plans = payload["plans"]excess = payload["excess"]exists = float(np.max(np.abs(excess))) <= self.cfg.equilibrium_tolradner = exists and self.cfg.common_expectationsif exists and not self.cfg.common_expectations:message = ("A temporary clearing price system was found under heterogeneous believed future prices, ""but it is not a Radner equilibrium because common expectations are violated.")elif exists:message = "Contract-by-contract market clearing found under a common price system."else:message = ("No market-clearing common price system was found within tolerance. In this special case, ""that means the numerical search did not find an equilibrium under the chosen assumptions.")if dim > self.cfg.heavy_dimension_limit:message += " The model dimension exceeded the recommended exact-search range for this script."return {"exists": exists,"radner_equilibrium": radner,"message": message,"prices": prices,"plans": plans,"excess": excess,"optimizer": best,"flat_prices": self.price_system_to_flat_contract_dict(prices),}# ------------------------------------------------------------------# Diagnostics and tabular reports# ------------------------------------------------------------------def market_table(self, solution: Dict[str, Any]) -> pd.DataFrame:rows = []prices = solution["prices"]excess = solution["excess"]for m in self.nodes:for j, n in enumerate(self.contracts_by_market[m]):c = (m, n)row: Dict[str, Any] = {"market_m": self.node_label(m),"delivery_n": self.node_label(n),"depth_m": len(m),"depth_n": len(n),"price": prices[m][j],"aggregate_excess": excess[self.contract_index[c]],}for plan in solution["plans"]:row[f"z_{plan['agent']}"] = plan["z"][c]rows.append(row)return pd.DataFrame(rows)def consumption_table(self, solution: Dict[str, Any]) -> pd.DataFrame:rows = []for n in self.nodes:row: Dict[str, Any] = {"node": self.node_label(n), "depth": len(n)}for plan in solution["plans"]:ag = plan["agent"]row[f"x_{ag}"] = plan["x"][n]row[f"w_{ag}"] = self.endowments[ag][n]row[f"deliveries_{ag}"] = plan["deliveries"][n]rows.append(row)return pd.DataFrame(rows)def expectation_table(self, solution: Dict[str, Any]) -> pd.DataFrame:rows = []prices = solution["prices"]for plan in solution["plans"]:ag = next(a for a in self.cfg.agents if a.name == plan["agent"])believed = self.tilted_belief_prices(prices, ag)for m in self.nodes:for idx, n in enumerate(self.contracts_by_market[m]):rows.append({"agent": ag.name,"market_m": self.node_label(m),"delivery_n": self.node_label(n),"actual_price": prices[m][idx],"believed_price": believed[m][idx],})return pd.DataFrame(rows)def summary_text(self, solution: Dict[str, Any]) -> str:parts = []parts.append("=== Radner (1972) pure-exchange special case ===")parts.append(f"exists = {solution['exists']}")parts.append(f"radner_equilibrium = {solution['radner_equilibrium']}")parts.append(solution["message"])if "optimizer" in solution and solution["optimizer"] is not None:parts.append(f"objective = {solution['optimizer']['objective']:.8f}")return "\n".join(parts)# ------------------------------------------------------------------# Visualization helpers# ------------------------------------------------------------------def node_label(self, node: Node) -> str:if node == self.root:return "t1"shocks = "".join("U" if x == 1 else "D" for x in node)return f"t{len(node) + 1}:{shocks}"def node_positions(self) -> Dict[Node, Tuple[float, float]]:pos: Dict[Node, Tuple[float, float]] = {}for depth in range(self.cfg.T):level_nodes = sorted(self.nodes_by_depth[depth])count = len(level_nodes)if count == 1:ys = np.array([0.0])else:ys = np.linspace((count - 1) / 2.0, -(count - 1) / 2.0, count)for node, y in zip(level_nodes, ys):pos[node] = (float(depth), float(y))return posdef _draw_tree_skeleton(self, ax: plt.Axes, pos: Dict[Node, Tuple[float, float]]) -> None:for parent, kids in self.children.items():x1, y1 = pos[parent]for child in kids:x2, y2 = pos[child]ax.plot([x1, x2], [y1, y2], linewidth=1.0, alpha=0.35)for node in self.nodes:x, y = pos[node]ax.scatter([x], [y], s=420, zorder=3)ax.text(x,y,self.node_label(node),ha="center",va="center",fontsize=8,color="white",zorder=4,fontweight="bold",)ax.set_xticks(range(self.cfg.T))ax.set_xticklabels([f"date {t}" for t in range(1, self.cfg.T + 1)])ax.set_yticks([])ax.spines["top"].set_visible(False)ax.spines["right"].set_visible(False)ax.spines["left"].set_visible(False)ax.set_xlim(-0.35, self.cfg.T - 1 + 1.8)span = max(abs(y) for _, y in pos.values()) if pos else 1.0ax.set_ylim(-span - 0.85, span + 0.85)def plot_event_tree_prices(self, solution: Dict[str, Any], filepath: str) -> None:pos = self.node_positions()fig_h = max(4.8, 1.6 * max(len(v) for v in self.nodes_by_depth.values()) + 2.2)fig_w = max(8.0, 3.2 * self.cfg.T)fig, ax = plt.subplots(figsize=(fig_w, fig_h))self._draw_tree_skeleton(ax, pos)prices = solution["prices"]for m in self.nodes:x, y = pos[m]lines = []for idx, n in enumerate(self.contracts_by_market[m]):prefix = "spot" if n == m else self.node_label(n)lines.append(f"{prefix}: {prices[m][idx]:.3f}")text = "p_m contracts\n" + "\n".join(lines)ax.text(x + 0.25,y,text,fontsize=8,va="center",ha="left",bbox={"boxstyle": "round,pad=0.28", "facecolor": "white", "alpha": 0.86},)title = "Event tree and market-node price system"if not self.cfg.common_expectations:title += "\n(actual prices; agents may believe tilted prices)"ax.set_title(title)fig.tight_layout()fig.savefig(filepath, dpi=self.cfg.figure_dpi, bbox_inches="tight")plt.close(fig)def plot_price_heatmap(self, solution: Dict[str, Any], filepath: str) -> None:market_labels = [self.node_label(m) for m in self.nodes]delivery_labels = [self.node_label(n) for n in self.nodes]mat = np.full((len(self.nodes), len(self.nodes)), np.nan)prices = solution["prices"]for i, m in enumerate(self.nodes):for idx, n in enumerate(self.contracts_by_market[m]):j = self.node_index[n]mat[i, j] = prices[m][idx]fig, ax = plt.subplots(figsize=(max(6.5, 1.15 * len(self.nodes) + 2.5), max(4.8, 0.8 * len(self.nodes) + 2.0)))masked = np.ma.masked_invalid(mat)im = ax.imshow(masked, aspect="auto")cbar = fig.colorbar(im, ax=ax)cbar.set_label("price")ax.set_xticks(range(len(delivery_labels)))ax.set_xticklabels(delivery_labels, rotation=45, ha="right")ax.set_yticks(range(len(market_labels)))ax.set_yticklabels(market_labels)ax.set_xlabel("delivery node n")ax.set_ylabel("market node m")ax.set_title("Contract price matrix p(m,n)")for i in range(mat.shape[0]):for j in range(mat.shape[1]):if not np.isnan(mat[i, j]):ax.text(j, i, f"{mat[i, j]:.2f}", ha="center", va="center", fontsize=8, color="white")else:ax.text(j, i, "—", ha="center", va="center", fontsize=8)fig.tight_layout()fig.savefig(filepath, dpi=self.cfg.figure_dpi, bbox_inches="tight")plt.close(fig)def _spot_trade_by_agent(self, plan: Dict[str, Any], node: Node) -> float:return plan["z"].get((node, node), 0.0)def plot_trade_flows(self, solution: Dict[str, Any], filepath: str) -> None:pos = self.node_positions()plans = solution["plans"]n_agents = len(plans)ncols = 2 if n_agents > 1 else 1nrows = math.ceil(n_agents / ncols)fig_w = 7.0 * ncolsfig_h = max(4.8, 4.6 * nrows)fig, axes = plt.subplots(nrows, ncols, figsize=(fig_w, fig_h), squeeze=False)all_axes = axes.ravel()max_abs = 0.0for plan in plans:for c, val in plan["z"].items():if c[0] != c[1]:max_abs = max(max_abs, abs(val))max_abs = max(max_abs, 1e-6)for ax, plan in zip(all_axes, plans):self._draw_tree_skeleton(ax, pos)spot_lines = []for node in self.nodes:spot = self._spot_trade_by_agent(plan, node)if abs(spot) > 1e-5:x, y = pos[node]ax.text(x - 0.05,y - 0.42,f"spot {spot:+.2f}",ha="center",va="center",fontsize=8,bbox={"boxstyle": "round,pad=0.18", "facecolor": "white", "alpha": 0.80},)spot_lines.append(f"{self.node_label(node)}{spot:+.2f}")for (m, n), val in plan["z"].items():if m == n or abs(val) <= 1e-5:continuex1, y1 = pos[m]x2, y2 = pos[n]rad = 0.14 if y2 >= y1 else -0.14width = 0.8 + 3.8 * abs(val) / max_absarrow = FancyArrowPatch((x1 + 0.06, y1),(x2 - 0.08, y2),arrowstyle="-|>",mutation_scale=11 + 4 * abs(val) / max_abs,linewidth=width,alpha=0.78,color=("tab:green" if val > 0 else "tab:red"),connectionstyle=f"arc3,rad={rad}",)ax.add_patch(arrow)mx = (x1 + x2) / 2 + 0.08my = (y1 + y2) / 2 + 0.18 * (1 if rad > 0 else -1)ax.text(mx,my,f"{val:+.2f}",fontsize=8,ha="center",va="center",bbox={"boxstyle": "round,pad=0.14", "facecolor": "white", "alpha": 0.72},)ax.set_title(f"Agent {plan['agent']} trade flows\n"f"green = delivers to market; red = receives from market")for ax in all_axes[n_agents:]:ax.axis("off")fig.tight_layout()fig.savefig(filepath, dpi=self.cfg.figure_dpi, bbox_inches="tight")plt.close(fig)def plot_consumption_vs_endowment(self, solution: Dict[str, Any], filepath: str) -> None:labels = [self.node_label(n) for n in self.nodes]x = np.arange(len(labels))width = 0.35 / max(1, len(solution["plans"]))fig, ax = plt.subplots(figsize=(max(7.5, 1.1 * len(labels) + 2.5), 5.0))for k, plan in enumerate(solution["plans"]):agent = plan["agent"]cons = np.array([plan["x"][n] for n in self.nodes])endow = np.array([self.endowments[agent][n] for n in self.nodes])offset = (k - (len(solution["plans"]) - 1) / 2) * 2 * widthax.bar(x + offset - width / 2, endow, width=width, alpha=0.55, label=f"w_{agent}")ax.bar(x + offset + width / 2, cons, width=width, alpha=0.85, label=f"x_{agent}")ax.set_xticks(x)ax.set_xticklabels(labels, rotation=35, ha="right")ax.set_ylabel("quantity")ax.set_title("Node-by-node endowment and equilibrium consumption")handles, labels_ = ax.get_legend_handles_labels()by_label = dict(zip(labels_, handles))ax.legend(by_label.values(), by_label.keys(), ncol=2)fig.tight_layout()fig.savefig(filepath, dpi=self.cfg.figure_dpi, bbox_inches="tight")plt.close(fig)def export_visual_bundle(self, solution: Dict[str, Any], output_dir: str, stem: str) -> Dict[str, str]:out = Path(output_dir)out.mkdir(parents=True, exist_ok=True)safe = re.sub(r"[^A-Za-z0-9_-]+", "_", stem.strip()).strip("_") or "radner_run"files = {"summary_txt": str(out / f"{safe}_summary.txt"),"market_csv": str(out / f"{safe}_market_table.csv"),"consumption_csv": str(out / f"{safe}_consumption_table.csv"),"expectation_csv": str(out / f"{safe}_expectation_table.csv"),"event_tree_png": str(out / f"{safe}_event_tree_prices.png"),"price_matrix_png": str(out / f"{safe}_price_matrix.png"),"trade_flows_png": str(out / f"{safe}_trade_flows.png"),"consumption_png": str(out / f"{safe}_consumption_vs_endowment.png"),}with open(files["summary_txt"], "w", encoding="utf-8") as f:f.write(self.summary_text(solution) + "\n")self.market_table(solution).to_csv(files["market_csv"], index=False)self.consumption_table(solution).to_csv(files["consumption_csv"], index=False)self.expectation_table(solution).to_csv(files["expectation_csv"], index=False)if solution.get("prices") is not None:self.plot_event_tree_prices(solution, files["event_tree_png"])self.plot_price_heatmap(solution, files["price_matrix_png"])self.plot_trade_flows(solution, files["trade_flows_png"])self.plot_consumption_vs_endowment(solution, files["consumption_png"])return filesdef baseline_model() -> RadnerPureExchangeSpecialCase:cfg = PureExchangeConfig(T=2,common_expectations=True,max_forward_horizon=None,price_search_starts=8,agents=[AgentSpec("A", p_up=0.35, beta=0.97, utility_scale=1.0, contract_limit=2.5),AgentSpec("B", p_up=0.65, beta=0.97, utility_scale=1.0, contract_limit=2.5),],)return RadnerPureExchangeSpecialCase(cfg)def heterogeneous_expectations_model() -> RadnerPureExchangeSpecialCase:cfg = PureExchangeConfig(T=2,common_expectations=False,max_forward_horizon=None,price_search_starts=8,agents=[AgentSpec("A", p_up=0.35, beta=0.97, utility_scale=1.0, contract_limit=2.5, expectation_tilt=-1.1),AgentSpec("B", p_up=0.65, beta=0.97, utility_scale=1.0, contract_limit=2.5, expectation_tilt=+1.1),],)return RadnerPureExchangeSpecialCase(cfg)def failure_model() -> RadnerPureExchangeSpecialCase:# Strongly incompatible future-price beliefs and tight contract bounds.cfg = PureExchangeConfig(T=2,common_expectations=False,max_forward_horizon=None,price_search_starts=10,equilibrium_tol=1e-4,agents=[AgentSpec("A", p_up=0.15, beta=0.95, utility_scale=1.0, contract_limit=0.35, expectation_tilt=-2.6),AgentSpec("B", p_up=0.85, beta=0.95, utility_scale=1.0, contract_limit=0.35, expectation_tilt=+2.6),],)return RadnerPureExchangeSpecialCase(cfg)def run_demo(model: RadnerPureExchangeSpecialCase, label: str, output_dir: str = "radner_outputs") -> Dict[str, Any]:print(f"\n### {label} ###\n")sol = model.solve_equilibrium()print(model.summary_text(sol))if sol.get("prices") is not None:print("\n-- contract market table --")print(model.market_table(sol).to_string(index=False, float_format=lambda x: f"{x:0.6f}"))print("\n-- consumption table --")print(model.consumption_table(sol).to_string(index=False, float_format=lambda x: f"{x:0.6f}"))if not model.cfg.common_expectations:print("\n-- actual vs believed prices --")print(model.expectation_table(sol).to_string(index=False, float_format=lambda x: f"{x:0.6f}"))bundle = model.export_visual_bundle(sol, output_dir=output_dir, stem=label)print("\nSaved visual bundle:")for k, v in bundle.items():print(f" {k}: {v}")return soldef main() -> None:base_output = os.path.join(os.getcwd(), "radner_outputs")run_demo(baseline_model(), "DEMO_1_common_expectations", output_dir=base_output)run_demo(heterogeneous_expectations_model(), "DEMO_2_heterogeneous_expectations", output_dir=base_output)run_demo(failure_model(), "DEMO_3_incompatible_expectations", output_dir=base_output)if __name__ == "__main__":main()

-
CLEANdata是一个关于电能配送服务数字化的试验项目
-
我们团队将尝试发掘电气设备行业实际项目中的数字化技术应用场景
-
验证数字化技术对该类业务的适用性和实施效果
-
优选出该类业务中的数字化技术推荐应用
-
小刘@CLEANdata
-
Mobile&Wechat: 15801000649
-
se_switchgear@163.com


评论