摸鱼量化ETF轮动:根据市场动量周期调整策略因子周期,夏普提升0.3


摸鱼量化ETF轮动:根据市场动量周期调整策略因子周期,夏普提升0.3

将深度学习应用于ETF轮动策略的择时与仓位管理,是一个系统性的工程问题,需要在模型设计、特征工程、风险控制等多个环节进行创新,在过往的量化研究中,我们多将神经网络用于截面选股,得到未来一段时间股票横截面收益率差异的预测。但在择时层面,始终未能出现收益稳定性较好的策略。仅有一个优秀的截面因子,显然也仅能获取相对市场基准的超额收益,策略本身无法实现绝对收益。 本期我们使用和原本截面策略一样的神经网络模型AGRU,但使用一种全新的训练方式,使得模型起到了很好的大盘择时效果。

相较于传统的ETF轮动策略,通常基于动量、估值、宏观经济等因子进行线性或规则化判断。基于深度学习的优势在于:

捕捉非线性关系:市场状态、因子与未来收益间的关系复杂,深度学习可以拟合这种非线性。

处理高维异构数据:可同时处理量价、基本面、宏观、另类数据(如新闻情绪、资金流)等多维度信息。

端到端优化:可以直接以夏普比率或最大回撤为优化目标进行训练,而不仅仅是预测涨跌。

在资产配置领域,收益与风险的非对称性上升规律已被广泛验证 — — 当组合年化收益率突破 6%阈值后,风险溢价往往呈现指数级增长, 能够同时满足年化收益≥8%且夏普比率>1的多资产 ETF 绝对收益策略仍属稀缺资源。本期展示摸鱼量化构建的具备实战价值的高收益、高夏普的绝对收益实战策略 — — ETF轮动中频

01
策略设计框架

本期策略目标是在N个候选ETF(如沪深300ETF、创业板ETF、行业ETF、债券ETF等)中进行择时与仓位分配。我们已经构建出了相对稳定的择时策略。若希望能进一步增厚其收益,我们使用过往在截面的研究成果,此处,在使用模型训练用于ETF轮动时,我们使用一种特定的数据处理方案,使得该 策略用于ETF轮动时相较于原报告有更优的表现。

不同持仓数量下神经网络截面ETF轮动策略表现如下:

输出

    • 择时信号:每个ETF的看涨/看跌概率或预期收益率。

    • 仓位向量:一个时间点上所有ETF的权重分配,可包含空仓(现金)或杠杆限制。


02
深度学习择时+增强型风险平价模型

利用深度学习进行ETF轮动择时与仓位管理,其核心优势在于从高维数据中自动挖掘有效的非线性择时规律,并直接以风险调整后收益为目标进行优化。在A股市场成功的关键在于:

(1)构建能反映A股特有驱动因素的特征集

(2)将严格的风险控制理念嵌入模型架构和损失函数

(3)采用极其严谨的过拟合防范措施

一个可行的路径是从相对简单的LSTM模型开始,结合风险平价等经典资产配置模型,逐步迭代到更复杂的端到端架构。最终目标是构建一个能够适应不同市场状态、在牛市能跟上收益、在熊市能有效守住的稳健轮动系统

风险平价模型在上一节的测算中取得了可控风险下最高的收益率,且其对资产的配置权重集中度也低于其他策略,风险平价模型的初衷在于消除预期收益预测的不确定性,减少由参数估计不确定性带来的负面影响,同时防范因投资组合过度集 中而引发的潜在损失。风险平价模型的实质是通过增长因子(股票) 与通胀因子(债 券) 的平衡应对宏观波动,当股债呈现稳定负相关(如国内 2020 年后股债相关性 转负),股票下跌时债券上涨的“自动稳定器”效果凸显,可以使组合波动率系统性 降低,也为风险平价模型提供了合适的应用土壤。权益&债券滚动 3 年收益率相关性如下:
采用多元化投资组合来防范下行风险会限制上行收益,导致在稳定增长的市场中的表现不佳。若能够开发一个模型,允许风险平价原则被违背,同时引导求解器趋向于风险平价,来放松风险平价优化,这将改善模型的保守性,但不会进行大幅调整。
不同增强乘数的策略结果详情如下:

频率:日频或周频调仓较为适合深度学习模型。

特征质量直接决定模型上限。应构建多层次特征:

  • 价格与量能特征:各ETF的收益率序列、波动率、成交量、价量关系技术指标(如RSI、MACD,但需注意避免过度拟合)、价量相关性。

  • 轮动因子特征

    • 动量类:不同时间窗口的动量(1个月、3个月)、动量加速度、相对强弱。

    • 估值类:ETF对应指数的PE、PB分位数。

    • 市场情绪:VIX类指数、换手率、创新高个股比例、融资余额变化。

    • 宏观经济与政策:利率变化、信用利差、PMI、货币供应量,可通过宏观数据或另类数据(政策文本分析)构建。

    • 资金流特征:北向资金、ETF申赎数据、行业资金流入流出。

    • 市场结构特征:截面波动率、市场宽度、行业间相关性。

  • 特征处理:标准化、中性化(去除市场影响)、处理缺失值。可以使用滑动窗口计算特征,防止未来数据泄露。

"""基于深度学习的ETF轮动策略实现目标:择时 + 仓位管理 + 控制回撤 + 提高夏普比率"""import numpy as npimport pandas as pdimport matplotlib.pyplot as pltimport seaborn as snsfrom datetime import datetime, timedeltaimport warningswarnings.filterwarnings('ignore')# 深度学习相关库import torchimport torch.nn as nnimport torch.optim as optimfrom torch.utils.data import DataLoader, TensorDataset, Datasetimport torch.nn.functional as Ffrom sklearn.preprocessing import StandardScaler, MinMaxScalerfrom sklearn.model_selection import TimeSeriesSplit# 金融相关库import tushare as tsimport talib# 设置中文显示plt.rcParams['font.sans-serif'] = ['SimHei']plt.rcParams['axes.unicode_minus'] = False# 设置随机种子np.random.seed(42)torch.manual_seed(42)# ==================== 第一部分:数据获取与预处理 ====================class ETFDataLoader:    """ETF数据加载与预处理类"""    def __init__(self, token=None, start_date='2010-01-01', end_date=None):        """        初始化数据加载器        Args:            token: tushare token            start_date: 开始日期            end_date: 结束日期,默认为今天        """        if token:            self.pro = ts.pro_api(token)        else:            # 如果没有token,使用在线数据            self.pro = None        self.start_date = start_date        self.end_date = end_date or datetime.now().strftime('%Y-%m-%d')        # A股主要ETF列表        self.etf_list = {            '510300''沪深300ETF',            '510500''中证500ETF'            '159915''创业板ETF',            '510050''上证50ETF',            '512000''券商ETF',            '512100''1000ETF',            '512760''半导体ETF',            '159919''300ETF',            '588000''科创50ETF',            '511010''国债ETF'  # 防御性资产        }        self.features = {}    def fetch_etf_data(self, etf_code, save_local=True):        """        获取ETF数据        Args:            etf_code: ETF代码            save_local: 是否保存到本地        Returns:            DataFrame: ETF数据        """        print(f"正在获取{etf_code}数据...")        # 如果是tushare pro用户        if self.pro:            try:                df = self.pro.fund_daily(ts_code=f'{etf_code}.SH' if etf_code.startswith('51'else f'{etf_code}.SZ',                                         start_date=self.start_date,                                         end_date=self.end_date)                df = df.sort_values('trade_date')                df.index = pd.to_datetime(df['trade_date'])                df = df[['open''high''low''close''vol''amount']]                df.columns = ['open''high''low''close''volume''amount']            except:                # 回退到通用方法                df = self._fetch_from_web(etf_code)        else:            df = self._fetch_from_web(etf_code)        # 计算收益率        df['returns'] = df['close'].pct_change()        # 填充缺失值        df = df.fillna(method='ffill').fillna(method='bfill')        if save_local:            df.to_csv(f'etf_data_{etf_code}.csv')        return df    def _fetch_from_web(self, etf_code):        """从网络获取数据(备用方法)"""        try:            # 使用tushare通用接口            df = ts.get_k_data(etf_code, start=self.start_date, end=self.end_date)            df.index = pd.to_datetime(df['date'])            df = df[['open''high''low''close''volume']]            df.columns = ['open''high''low''close''volume']            return df        except:            # 模拟数据(用于测试)            print(f"无法获取{etf_code}真实数据,使用模拟数据...")            return self._generate_mock_data(etf_code)    def _generate_mock_data(self, etf_code):        """生成模拟数据用于测试"""        dates = pd.date_range(start=self.start_date, end=self.end_date, freq='D')        # 排除周末        dates = dates[dates.dayofweek < 5]        # 基础价格(不同ETF有不同的波动特性)        base_price = {            '510300'5.0,  # 沪深300ETF            '510500'6.0,  # 中证500ETF            '159915'2.5,  # 创业板ETF            '510050'3.0,  # 上证50ETF            '511010'100.0# 国债ETF        }.get(etf_code, 5.0)        n = len(dates)        np.random.seed(hash(etf_code) % 10000)        # 生成随机游走价格序列        returns = np.random.normal(0.00010.02, n)  # 日均收益0.01%,波动2%        prices = base_price * np.exp(np.cumsum(returns))        # 生成OHLCV数据        df = pd.DataFrame(index=dates[:len(prices)])        df['close'] = prices        # 生成开盘价、最高价、最低价        noise = np.random.normal(00.005len(prices))        df['open'] = df['close'].shift(1) * (1 + noise)        df['high'] = df[['open''close']].max(axis=1) * (1 + abs(np.random.normal(00.01len(prices))))        df['low'] = df[['open''close']].min(axis=1) * (1 - abs(np.random.normal(00.01len(prices))))        # 成交量        df['volume'] = np.random.lognormal(101len(prices))        # 确保价格合理性        df['high'] = df[['open''close''high']].max(axis=1)        df['low'] = df[['open''close''low']].min(axis=1)        return df    def compute_technical_features(self, df, window_sizes=[5102060]):        """        计算技术特征        Args:            df: 包含价格数据的DataFrame            window_sizes: 计算指标的窗口大小列表        Returns:            DataFrame: 包含特征的DataFrame        """        features_df = pd.DataFrame(index=df.index)        # 价格特征        features_df['close'] = df['close']        features_df['returns'] = df['returns']        # 波动率特征        for window in window_sizes:            # 滚动波动率            features_df[f'volatility_{window}'] = df['returns'].rolling(window).std()            # 滚动收益率            features_df[f'return_{window}'] = df['close'].pct_change(window)            # 滚动最大回撤            features_df[f'max_dd_{window}'] = -df['close'].rolling(window).apply(                lambda x: (x.max() - x[-1]) / x.max() if x.max() > 0 else 0            )        # 移动平均线        for window in [510203060]:            features_df[f'ma_{window}'] = df['close'].rolling(window).mean()            # 价格与均线的距离            features_df[f'price_ma_ratio_{window}'] = df['close'] / features_df[f'ma_{window}'] - 1        # 动量指标        features_df['rsi_14'] = talib.RSI(df['close'], timeperiod=14)        features_df['macd'], features_df['macd_signal'], features_df['macd_hist'] = talib.MACD(            df['close'], fastperiod=12, slowperiod=26, signalperiod=9        )        # 布林带        features_df['bb_upper'], features_df['bb_middle'], features_df['bb_lower'] = talib.BBANDS(            df['close'], timeperiod=20, nbdevup=2, nbdevdn=2, matype=0        )        features_df['bb_width'] = (features_df['bb_upper'] - features_df['bb_lower']) / features_df['bb_middle']        features_df['bb_position'] = (df['close'] - features_df['bb_lower']) / (features_df['bb_upper'] - features_df['bb_lower'])        # 成交量特征        features_df['volume'] = df['volume']        features_df['volume_ma_ratio'] = df['volume'] / df['volume'].rolling(20).mean()        features_df['volume_obv'] = talib.OBV(df['close'], df['volume'])        # 价格形态特征        features_df['atr'] = talib.ATR(df['high'], df['low'], df['close'], timeperiod=14)        features_df['adx'] = talib.ADX(df['high'], df['low'], df['close'], timeperiod=14)        # 相关性特征(需要多个ETF)        # 这里简化处理,实际应用中需要多个ETF的价格计算相关性        # 市场状态特征        features_df['day_of_week'] = df.index.dayofweek        features_df['month'] = df.index.month        features_df['is_month_start'] = (df.index.day <= 3).astype(int)        features_df['is_month_end'] = (df.index.day >= 25).astype(int)        return features_df.fillna(method='ffill').fillna(0)    def create_dataset(self, features_list, target_etf='510300', lookback=60, forward=5):        """        创建深度学习数据集        Args:            features_list: 特征DataFrame列表(每个ETF一个)            target_etf: 目标ETF代码            lookback: 回看窗口            forward: 预测未来窗口        Returns:            X, y: 特征和标签        """        # 对齐所有ETF的数据        aligned_data = []        min_len = min(len(f) for f in features_list)        for f in features_list:            aligned_data.append(f.iloc[-min_len:].values)        # 创建3D特征矩阵 [样本数, 时间步长, 特征数]        n_samples = min_len - lookback - forward        n_features = sum(f.shape[1for f in features_list)        X = np.zeros((n_samples, lookback, n_features))        y = np.zeros((n_samples, 2))  # 二元分类:涨/跌        # 填充特征矩阵        for i in range(n_samples):            feature_idx = 0            for etf_idx, etf_features in enumerate(aligned_data):                n_etf_features = features_list[etf_idx].shape[1]                X[i, :, feature_idx:feature_idx + n_etf_features] = etf_features[i:i+lookback]                feature_idx += n_etf_features        # 创建标签:未来forward天的收益率和方向        target_idx = list(self.etf_list.keys()).index(target_etf) if target_etf in self.etf_list else 0        target_features = features_list[target_idx]        for i in range(n_samples):            future_price = target_features.iloc[i+lookback+forward-1]['close']            current_price = target_features.iloc[i+lookback-1]['close']            # 收益率            future_return = (future_price - current_price) / current_price            # 分类标签:1表示上涨,0表示下跌            y[i, 0] = future_return  # 连续值用于回归            y[i, 1] = 1 if future_return > 0 else 0  # 分类标签        return X, y    def create_multi_etf_dataset(self, etf_data_dict, lookback=60, forward=5):        """        创建多ETF轮动数据集        Args:            etf_data_dict: ETF数据字典 {代码: DataFrame}            lookback: 回看窗口            forward: 预测未来窗口        Returns:            X, y_multi: 特征和多ETF标签        """        # 获取所有ETF的特征数据        features_list = []        etf_codes = []        for code, df in etf_data_dict.items():            features = self.compute_technical_features(df)            features_list.append(features)            etf_codes.append(code)        # 对齐数据        aligned_data = []        min_len = min(len(f) for f in features_list)        for f in features_list:            aligned_data.append(f.iloc[-min_len:].values)        # 创建3D特征矩阵        n_samples = min_len - lookback - forward        n_features = sum(f.shape[1for f in features_list)        X = np.zeros((n_samples, lookback, n_features))        # 填充特征矩阵        for i in range(n_samples):            feature_idx = 0            for etf_idx, etf_features in enumerate(aligned_data):                n_etf_features = features_list[etf_idx].shape[1]                X[i, :, feature_idx:feature_idx + n_etf_features] = etf_features[i:i+lookback]                feature_idx += n_etf_features        # 创建多ETF标签:每个ETF未来forward天的收益率        y_multi = np.zeros((n_samples, len(etf_codes)))        for etf_idx, features in enumerate(features_list):            for i in range(n_samples):                future_price = features.iloc[i+lookback+forward-1]['close']                current_price = features.iloc[i+lookback-1]['close']                y_multi[i, etf_idx] = (future_price - current_price) / current_price        return X, y_multi, etf_codes# ==================== 第二部分:深度学习模型 ====================class AttentionLayer(nn.Module):    """注意力机制层"""    def __init__(self, input_dim):        super(AttentionLayer, self).__init__()        self.attention = nn.Sequential(            nn.Linear(input_dim, 64),            nn.Tanh(),            nn.Linear(641)        )    def forward(self, x):        # x形状: [batch_size, seq_len, features]        weights = self.attention(x)  # [batch_size, seq_len, 1]        weights = torch.softmax(weights, dim=1)        weighted = torch.sum(x * weights, dim=1)  # [batch_size, features]        return weighted, weightsclass LSTMModel(nn.Module):    """LSTM时序预测模型"""    def __init__(self, input_dim, hidden_dim=128, num_layers=2, output_dim=1, dropout=0.3):        super(LSTMModel, self).__init__()        self.lstm = nn.LSTM(            input_dim, hidden_dim, num_layers,             batch_first=True, dropout=dropout if num_layers > 1 else 0        )        self.attention = AttentionLayer(hidden_dim)        self.fc = nn.Sequential(            nn.Linear(hidden_dim, 64),            nn.BatchNorm1d(64),            nn.ReLU(),            nn.Dropout(dropout),            nn.Linear(6432),            nn.BatchNorm1d(32),            nn.ReLU(),            nn.Dropout(dropout),            nn.Linear(32, output_dim)        )    def forward(self, x):        # LSTM层        lstm_out, (h_n, c_n) = self.lstm(x)  # lstm_out: [batch, seq_len, hidden_dim]        # 注意力层        attn_out, attn_weights = self.attention(lstm_out)        # 全连接层        output = self.fc(attn_out)        return output, attn_weightsclass MultiTaskETFModel(nn.Module):    """多任务学习模型:同时预测收益率、波动率和方向"""    def __init__(self, input_dim, hidden_dim=128, num_layers=2, dropout=0.3):        super(MultiTaskETFModel, self).__init__()        # 共享的LSTM层        self.lstm = nn.LSTM(            input_dim, hidden_dim, num_layers,             batch_first=True, dropout=dropout if num_layers > 1 else 0        )        self.attention = AttentionLayer(hidden_dim)        # 共享的特征提取层        self.shared_fc = nn.Sequential(            nn.Linear(hidden_dim, 64),            nn.BatchNorm1d(64),            nn.ReLU(),            nn.Dropout(dropout)        )        # 任务特定的输出层        # 任务1: 收益率预测(回归)        self.return_head = nn.Sequential(            nn.Linear(6432),            nn.ReLU(),            nn.Linear(321)  # 预测收益率        )        # 任务2: 波动率预测(回归)        self.volatility_head = nn.Sequential(            nn.Linear(6432),            nn.ReLU(),            nn.Linear(321)  # 预测波动率        )        # 任务3: 方向预测(分类)        self.direction_head = nn.Sequential(            nn.Linear(6432),            nn.ReLU(),            nn.Linear(322)  # 二分类:涨/跌        )    def forward(self, x):        # LSTM层        lstm_out, _ = self.lstm(x)        # 注意力层        attn_out, attn_weights = self.attention(lstm_out)        # 共享特征        shared_features = self.shared_fc(attn_out)        # 各个任务的输出        return_pred = self.return_head(shared_features)        volatility_pred = self.volatility_head(shared_features)        direction_pred = self.direction_head(shared_features)        return {            'return': return_pred,            'volatility': volatility_pred,            'direction': direction_pred,            'attention': attn_weights        }class TransformerETFModel(nn.Module):    """Transformer模型用于时序预测"""    def __init__(self, input_dim, hidden_dim=128, num_heads=4, num_layers=3, output_dim=1, dropout=0.1):        super(TransformerETFModel, self).__init__()        self.input_projection = nn.Linear(input_dim, hidden_dim)        # Transformer编码器        encoder_layer = nn.TransformerEncoderLayer(            d_model=hidden_dim,            nhead=num_heads,            dim_feedforward=hidden_dim*4,            dropout=dropout,            batch_first=True        )        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)        # 注意力池化        self.attention_pool = AttentionLayer(hidden_dim)        # 输出层        self.output_layer = nn.Sequential(            nn.Linear(hidden_dim, 64),            nn.ReLU(),            nn.Dropout(dropout),            nn.Linear(64, output_dim)        )    def forward(self, x):        # 输入投影        x = self.input_projection(x)        # Transformer编码        transformer_out = self.transformer_encoder(x)        # 注意力池化        pooled, attn_weights = self.attention_pool(transformer_out)        # 输出        output = self.output_layer(pooled)        return output, attn_weights# ==================== 第三部分:仓位管理与风险控制 ====================class RiskAwarePositionManager:    """风险感知的仓位管理器"""    def __init__(self, initial_capital=1000000, max_position_ratio=0.2                 stop_loss_pct=0.05, max_drawdown_limit=0.15, volatility_scaling=True):        """        初始化仓位管理器        Args:            initial_capital: 初始资金            max_position_ratio: 单个ETF最大仓位比例            stop_loss_pct: 止损比例            max_drawdown_limit: 最大回撤限制            volatility_scaling: 是否根据波动率调整仓位        """        self.initial_capital = initial_capital        self.current_capital = initial_capital        self.max_position_ratio = max_position_ratio        self.stop_loss_pct = stop_loss_pct        self.max_drawdown_limit = max_drawdown_limit        self.volatility_scaling = volatility_scaling        # 持仓记录        self.positions = {}        self.trade_history = []        self.equity_curve = [initial_capital]        self.max_equity = initial_capital        # 风险指标        self.current_drawdown = 0        self.peak_equity = initial_capital    def calculate_position_size(self, predictions, volatilities=None, market_state='normal'):        """        计算仓位大小        Args:            predictions: 模型预测的收益率 [n_etfs]            volatilities: 波动率预测 [n_etfs]            market_state: 市场状态 ('bull', 'bear', 'normal')        Returns:            dict: 每个ETF的权重        """        n_etfs = len(predictions)        # 基础权重:基于预测收益率        raw_weights = np.array(predictions)        # 1. 负预测值归零(不做空)        raw_weights[raw_weights < 0] = 0        # 2. 波动率调整(如果启用)        if self.volatility_scaling and volatilities is not None:            # 波动率倒数作为调整因子(波动率越高,仓位越小)            vol_adjustment = 1.0 / (volatilities + 1e-8)            raw_weights = raw_weights * vol_adjustment        # 3. 市场状态调整        market_adjustment = self._get_market_adjustment(market_state)        raw_weights = raw_weights * market_adjustment        # 4. 应用最大仓位限制        max_weight = self.max_position_ratio        if market_state == 'bear':            max_weight *= 0.5  # 熊市减半仓位限制        raw_weights = np.clip(raw_weights, 0, max_weight)        # 5. 归一化确保权重和为1        if raw_weights.sum() > 0:            weights = raw_weights / raw_weights.sum()        else:            # 如果没有正预测,全部持有现金            weights = np.zeros(n_etfs)        # 确保没有超限        weights = np.clip(weights, 0, max_weight)        return weights    def _get_market_adjustment(self, market_state):        """根据市场状态获取调整因子"""        if market_state == 'bull':            return 1.2  # 牛市增加仓位        elif market_state == 'bear':            return 0.5  # 熊市大幅降低仓位        elif market_state == 'high_volatility':            return 0.7  # 高波动市场降低仓位        else:  # 'normal'            return 1.0    def assess_market_state(self, recent_returns, recent_volatility):        """评估市场状态"""        avg_return = np.mean(recent_returns)        avg_vol = np.mean(recent_volatility)        if avg_return > 0.001 and avg_vol < 0.02:  # 正收益且低波动            return 'bull'        elif avg_return < -0.001:  # 负收益            return 'bear'        elif avg_vol > 0.025:  # 高波动            return 'high_volatility'        else:            return 'normal'    def apply_stop_loss(self, positions, current_prices, entry_prices):        """应用止损规则"""        for etf, position in positions.items():            if position['shares'] > 0:                current_value = current_prices[etf] * position['shares']                entry_value = entry_prices[etf] * position['shares']                # 计算盈亏比例                pnl_pct = (current_value - entry_value) / entry_value                # 如果亏损超过止损比例,平仓                if pnl_pct < -self.stop_loss_pct:                    print(f"止损触发: {etf}, 亏损: {pnl_pct:.2%}")                    return True, etf        return FalseNone    def update_drawdown(self, current_equity):        """更新回撤指标"""        if current_equity > self.peak_equity:            self.peak_equity = current_equity        self.current_drawdown = (self.peak_equity - current_equity) / self.peak_equity        # 如果回撤超过限制,强制减仓        if self.current_drawdown > self.max_drawdown_limit:            return 'reduce_position'        return 'normal'    def risk_parity_allocation(self, volatilities, correlations=None):        """        风险平价分配        Args:            volatilities: 各ETF的波动率            correlations: 相关系数矩阵        Returns:            array: 风险平价权重        """        n = len(volatilities)        # 如果没有相关矩阵,假设独立        if correlations is None:            correlations = np.eye(n)        # 计算协方差矩阵        cov_matrix = np.outer(volatilities, volatilities) * correlations        try:            # 计算风险贡献的逆            inv_vol = 1.0 / volatilities            risk_parity_weights = inv_vol / inv_vol.sum()        except:            # 如果波动率有零值,使用等权重            risk_parity_weights = np.ones(n) / n        return risk_parity_weights    def execute_rebalancing(self, target_weights, current_weights, prices,                             transaction_cost=0.001, min_trade_ratio=0.01):        """        执行再平衡        Args:            target_weights: 目标权重            current_weights: 当前权重            prices: 当前价格            transaction_cost: 交易成本比例            min_trade_ratio: 最小交易比例(避免小额交易)        Returns:            dict: 交易指令        """        trades = {}        total_value = self.current_capital        for i, etf in enumerate(prices.keys()):            target_value = total_value * target_weights[i]            current_value = total_value * current_weights[i]            value_diff = target_value - current_value            # 只有变化超过阈值时才交易            if abs(value_diff) > total_value * min_trade_ratio:                shares_to_trade = value_diff / prices[etf]                # 考虑交易成本                cost = abs(shares_to_trade * prices[etf]) * transaction_cost                trades[etf] = {                    'shares': shares_to_trade,                    'value': value_diff,                    'cost': cost                }        return trades# ==================== 第四部分:ETF轮动策略主类 ====================class DeepLearningETFRotationStrategy:    """深度学习ETF轮动策略主类"""    def __init__(self, config=None):        """        初始化策略        Args:            config: 配置字典        """        self.config = config or self._default_config()        self.data_loader = ETFDataLoader(            token=self.config.get('tushare_token'),            start_date=self.config.get('start_date''2015-01-01'),            end_date=self.config.get('end_date')        )        # 模型        self.model = None        self.scaler = StandardScaler()        # 仓位管理        self.position_manager = RiskAwarePositionManager(            initial_capital=self.config.get('initial_capital'1000000),            max_position_ratio=self.config.get('max_position_ratio'0.3),            stop_loss_pct=self.config.get('stop_loss_pct'0.05),            max_drawdown_limit=self.config.get('max_drawdown_limit'0.15),            volatility_scaling=self.config.get('volatility_scaling'True)        )        # 数据缓存        self.etf_data = {}        self.features = {}        # 回测结果        self.backtest_results = {}    def _default_config(self):        """默认配置"""        return {            'initial_capital'1000000,            'lookback_window'60,            'forward_window'5,            'train_test_split'0.8,            'batch_size'32,            'epochs'50,            'learning_rate'0.001,            'max_position_ratio'0.3,            'stop_loss_pct'0.05,            'max_drawdown_limit'0.15,            'transaction_cost'0.001,            'etf_codes': ['510300''510500''159915''511010'],  # 包含防御性资产            'model_type''multitask',  # 'lstm', 'transformer', 'multitask'            'use_attention'True        }    def load_and_prepare_data(self):        """加载和准备数据"""        print("加载ETF数据...")        # 加载每个ETF的数据        for code in self.config['etf_codes']:            df = self.data_loader.fetch_etf_data(code, save_local=True)            self.etf_data[code] = df            # 计算特征            features = self.data_loader.compute_technical_features(df)            self.features[code] = features            print(f"{code}{len(df)} 个交易日数据,{features.shape[1]} 个特征")        # 创建多ETF数据集        X, y_multi, etf_codes = self.data_loader.create_multi_etf_dataset(            self.etf_data,            lookback=self.config['lookback_window'],            forward=self.config['forward_window']        )        # 特征标准化        n_samples, n_timesteps, n_features = X.shape        X_reshaped = X.reshape(-1, n_features)        X_scaled = self.scaler.fit_transform(X_reshaped)        X_scaled = X_scaled.reshape(n_samples, n_timesteps, n_features)        # 划分训练集和测试集(时间序列划分)        split_idx = int(len(X_scaled) * self.config['train_test_split'])        self.X_train = X_scaled[:split_idx]        self.X_test = X_scaled[split_idx:]        self.y_train = y_multi[:split_idx]        self.y_test = y_multi[split_idx:]        self.train_dates = list(self.etf_data[self.config['etf_codes'][0]].index)[            self.config['lookback_window'] + self.config['forward_window']:split_idx +             self.config['lookback_window'] + self.config['forward_window']        ]        self.test_dates = list(self.etf_data[self.config['etf_codes'][0]].index)[            split_idx + self.config['lookback_window'] + self.config['forward_window']:        ]        print(f"训练集: {len(self.X_train)} 个样本, 测试集: {len(self.X_test)} 个样本")        return X_scaled, y_multi    def build_model(self):        """构建深度学习模型"""        n_features = self.X_train.shape[2]        n_etfs = len(self.config['etf_codes'])        if self.config['model_type'] == 'lstm':            self.model = LSTMModel(                input_dim=n_features,                hidden_dim=128,                num_layers=2,                output_dim=n_etfs,                dropout=0.3            )        elif self.config['model_type'] == 'transformer':            self.model = TransformerETFModel(                input_dim=n_features,                hidden_dim=128,                num_heads=4,                num_layers=3,                output_dim=n_etfs,                dropout=0.1            )        elif self.config['model_type'] == 'multitask':            self.model = MultiTaskETFModel(                input_dim=n_features,                hidden_dim=128,                num_layers=2,                dropout=0.3            )        else:            raise ValueError(f"未知模型类型: {self.config['model_type']}")        print(f"构建 {self.config['model_type']} 模型, 输入特征: {n_features}, 输出ETF数: {n_etfs}")        return self.model    def train_model(self):        """训练模型"""        if self.model is None:            self.build_model()        # 准备数据加载器        train_dataset = TensorDataset(            torch.FloatTensor(self.X_train),            torch.FloatTensor(self.y_train)        )        train_loader = DataLoader(            train_dataset,            batch_size=self.config['batch_size'],            shuffle=True,            drop_last=True        )        # 损失函数和优化器        if self.config['model_type'] == 'multitask':            criterion = {                'return': nn.MSELoss(),                'volatility': nn.MSELoss(),                'direction': nn.CrossEntropyLoss()            }            # 多任务损失权重            loss_weights = {                'return'1.0,                'volatility'0.5,                'direction'0.8            }        else:            criterion = nn.MSELoss()        optimizer = optim.Adam(self.model.parameters(), lr=self.config['learning_rate'])        scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=5, factor=0.5)        # 训练循环        self.model.train()        train_losses = []        for epoch in range(self.config['epochs']):            epoch_loss = 0            for batch_X, batch_y in train_loader:                optimizer.zero_grad()                if self.config['model_type'] == 'multitask':                    outputs = self.model(batch_X)                    # 计算多任务损失                    loss = 0                    return_loss = criterion['return'](outputs['return'], batch_y.mean(dim=1, keepdim=True))                    volatility_target = torch.abs(batch_y).mean(dim=1, keepdim=True)                    volatility_loss = criterion['volatility'](outputs['volatility'], volatility_target)                    # 方向标签(平均收益率为正为1,否则为0)                    direction_target = (batch_y.mean(dim=1) > 0).long()                    direction_loss = criterion['direction'](outputs['direction'], direction_target)                    loss = (loss_weights['return'] * return_loss +                            loss_weights['volatility'] * volatility_loss +                            loss_weights['direction'] * direction_loss)                else:                    outputs, _ = self.model(batch_X)                    loss = criterion(outputs, batch_y)                loss.backward()                torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)                optimizer.step()                epoch_loss += loss.item()            avg_loss = epoch_loss / len(train_loader)            train_losses.append(avg_loss)            scheduler.step(avg_loss)            if (epoch + 1) % 10 == 0:                print(f"Epoch [{epoch+1}/{self.config['epochs']}], Loss: {avg_loss:.6f}")        print("模型训练完成")        # 绘制训练损失曲线        plt.figure(figsize=(105))        plt.plot(train_losses)        plt.title('训练损失曲线')        plt.xlabel('Epoch')        plt.ylabel('Loss')        plt.grid(True)        plt.show()        return train_losses    def predict(self, X):        """使用模型进行预测"""        self.model.eval()        with torch.no_grad():            if self.config['model_type'] == 'multitask':                outputs = self.model(torch.FloatTensor(X))                predictions = outputs['return'].numpy()                volatilities = outputs['volatility'].numpy()                direction_probs = F.softmax(outputs['direction'], dim=1).numpy()                return predictions, volatilities, direction_probs            else:                outputs, attn_weights = self.model(torch.FloatTensor(X))                return outputs.numpy(), attn_weights.numpy()    def backtest(self):        """运行回测"""        print("开始回测...")        # 初始化回测变量        dates = self.test_dates        n_dates = len(dates)        n_etfs = len(self.config['etf_codes'])        # 存储回测结果        portfolio_values = [self.position_manager.initial_capital]        positions_history = []        weights_history = []        returns_history = []        # 当前权重(开始时全部现金)        current_weights = np.zeros(n_etfs)        # 回测循环        for i in range(n_dates - self.config['forward_window']):            current_date = dates[i]            # 获取当前特征            start_idx = i + len(self.X_train)            lookback_slice = slice(start_idx, start_idx + self.config['lookback_window'])            # 准备输入数据            X_batch = self.X_test[i:i+1]  # 保持3D形状            # 模型预测            if self.config['model_type'] == 'multitask':                predictions, volatilities, direction_probs = self.predict(X_batch)                predictions = predictions.flatten()                volatilities = volatilities.flatten()            else:                predictions, _ = self.predict(X_batch)                predictions = predictions.flatten()                volatilities = None            # 获取当前价格            current_prices = {}            for j, code in enumerate(self.config['etf_codes']):                if current_date in self.etf_data[code].index:                    current_prices[code] = self.etf_data[code].loc[current_date, 'close']                else:                    # 如果当天没有数据,使用最近的价格                    current_prices[code] = self.etf_data[code].iloc[                        self.etf_data[code].index.get_loc(current_date, method='nearest')                    ]['close']            # 评估市场状态            recent_returns = []            recent_volatility = []            for code in self.config['etf_codes']:                if i >= 20:  # 至少有20天数据                    recent_data = self.etf_data[code].iloc[max(0, i-20):i]                    recent_returns.extend(recent_data['returns'].dropna().tolist())                    recent_volatility.append(recent_data['returns'].std())            if recent_returns:                market_state = self.position_manager.assess_market_state(                    recent_returns, recent_volatility                )            else:                market_state = 'normal'            # 计算目标权重            target_weights = self.position_manager.calculate_position_size(                predictions, volatilities, market_state            )            # 应用风险平价调整            if volatilities is not None:                risk_parity_weights = self.position_manager.risk_parity_allocation(                    np.abs(volatilities) + 0.01  # 避免零波动率                )                # 结合模型预测和风险平价                target_weights = 0.7 * target_weights + 0.3 * risk_parity_weights            # 执行再平衡            trades = self.position_manager.execute_rebalancing(                target_weights, current_weights, current_prices,                transaction_cost=self.config.get('transaction_cost'0.001),                min_trade_ratio=0.005            )            # 更新当前权重            current_weights = target_weights            # 计算当日组合收益            daily_return = 0            for j, code in enumerate(self.config['etf_codes']):                if code in self.etf_data and i > 0:                    # 获取当日收益率                    if current_date in self.etf_data[code].index:                        etf_return = self.etf_data[code].loc[current_date, 'returns']                    else:                        etf_return = 0                    daily_return += current_weights[j] * etf_return            # 更新组合价值            prev_value = portfolio_values[-1]            current_value = prev_value * (1 + daily_return)            # 扣除交易成本            if trades:                total_cost = sum(trade['cost'for trade in trades.values())                current_value -= total_cost            portfolio_values.append(current_value)            returns_history.append(daily_return)            weights_history.append(current_weights.copy())            # 更新仓位管理器状态            self.position_manager.current_capital = current_value            self.position_manager.equity_curve.append(current_value)            # 更新回撤并检查是否需要减仓            risk_status = self.position_manager.update_drawdown(current_value)            if risk_status == 'reduce_position' and np.sum(current_weights) > 0.5:                # 强制减仓到50%                reduction_factor = 0.5 / np.sum(current_weights)                current_weights = current_weights * reduction_factor            # 每30天打印一次进度            if i % 30 == 0:                print(f"日期: {current_date}, 组合价值: {current_value:.2f}, 收益率: {daily_return:.4%}")        # 存储回测结果        self.backtest_results = {            'dates': dates[:len(portfolio_values)-1],            'portfolio_values': portfolio_values[1:],            'returns': returns_history,            'weights': weights_history,            'etf_codes'self.config['etf_codes']        }        print("回测完成")        return self.backtest_results    def calculate_performance_metrics(self):        """计算绩效指标"""        if not self.backtest_results:            print("请先运行回测")            return None        returns = np.array(self.backtest_results['returns'])        portfolio_values = np.array(self.backtest_results['portfolio_values'])        # 计算累计收益        cumulative_returns = (portfolio_values[-1] / portfolio_values[0]) - 1        # 计算年化收益率        n_years = len(returns) / 252        annualized_return = (1 + cumulative_returns) ** (1 / n_years) - 1        # 计算年化波动率        annualized_volatility = np.std(returns) * np.sqrt(252)        # 计算夏普比率(假设无风险利率3%)        risk_free_rate = 0.03        excess_returns = returns - risk_free_rate/252        sharpe_ratio = np.mean(excess_returns) / np.std(returns) * np.sqrt(252)        # 计算最大回撤        peak = np.maximum.accumulate(portfolio_values)        drawdown = (portfolio_values - peak) / peak        max_drawdown = np.min(drawdown)        # 计算Sortino比率(只考虑下行风险)        downside_returns = returns[returns < 0]        downside_std = np.std(downside_returns) if len(downside_returns) > 0 else 0        sortino_ratio = (annualized_return - risk_free_rate) / (downside_std * np.sqrt(252)) if downside_std > 0 else 0        # 计算Calmar比率        calmar_ratio = annualized_return / abs(max_drawdown) if max_drawdown != 0 else 0        # 计算胜率        winning_days = np.sum(returns > 0) / len(returns)        # 计算盈亏比        avg_win = np.mean(returns[returns > 0]) if np.sum(returns > 0) > 0 else 0        avg_loss = np.mean(returns[returns < 0]) if np.sum(returns < 0) > 0 else 0        profit_factor = abs(avg_win / avg_loss) if avg_loss != 0 else float('inf')        metrics = {            '累计收益': cumulative_returns,            '年化收益率': annualized_return,            '年化波动率': annualized_volatility,            '夏普比率': sharpe_ratio,            '最大回撤': max_drawdown,            'Sortino比率': sortino_ratio,            'Calmar比率': calmar_ratio,            '胜率': winning_days,            '盈亏比': profit_factor,            '总交易日数'len(returns)        }        return metrics    def plot_results(self):        """绘制回测结果"""        if not self.backtest_results:            print("请先运行回测")            return        dates = self.backtest_results['dates']        portfolio_values = self.backtest_results['portfolio_values']        returns = self.backtest_results['returns']        weights = np.array(self.backtest_results['weights'])        etf_codes = self.backtest_results['etf_codes']        fig, axes = plt.subplots(32, figsize=(1512))        # 1. 组合净值曲线        axes[00].plot(dates, portfolio_values)        axes[00].set_title('组合净值曲线')        axes[00].set_xlabel('日期')        axes[00].set_ylabel('组合价值')        axes[00].grid(True)        # 2. 累计收益率曲线        cumulative_returns = (np.array(portfolio_values) / portfolio_values[0]) - 1        axes[01].plot(dates, cumulative_returns)        axes[01].set_title('累计收益率曲线')        axes[01].set_xlabel('日期')        axes[01].set_ylabel('累计收益率')        axes[01].grid(True)        # 3. 回撤曲线        peak = np.maximum.accumulate(portfolio_values)        drawdown = (portfolio_values - peak) / peak        axes[10].fill_between(dates, drawdown, 0, alpha=0.3)        axes[10].set_title('回撤曲线')        axes[10].set_xlabel('日期')        axes[10].set_ylabel('回撤')        axes[10].grid(True)        # 4. 月度收益率热力图        # 创建月度收益率数据        monthly_returns = []        monthly_dates = []        df_returns = pd.Series(returns, index=dates[:len(returns)])        monthly_series = df_returns.resample('M').apply(lambda x: (1 + x).prod() - 1)        # 重塑为矩阵形式        monthly_matrix = []        for year in sorted(set(d.year for d in monthly_series.index)):            year_data = monthly_series[monthly_series.index.year == year]            if len(year_data) == 12:                monthly_matrix.append(year_data.values)        if monthly_matrix:            monthly_matrix = np.array(monthly_matrix).T  # 转置使月份为行            im = axes[11].imshow(monthly_matrix, aspect='auto', cmap='RdYlGn')            axes[11].set_title('月度收益率热力图')            axes[11].set_xlabel('年份')            axes[11].set_ylabel('月份')            plt.colorbar(im, ax=axes[11])        # 5. 仓位权重变化        for i in range(len(etf_codes)):            axes[20].plot(dates[:len(weights)], weights[:, i], label=etf_codes[i])        axes[20].set_title('仓位权重变化')        axes[20].set_xlabel('日期')        axes[20].set_ylabel('权重')        axes[20].legend()        axes[20].grid(True)        # 6. 收益率分布直方图        axes[21].hist(returns, bins=50, alpha=0.7, edgecolor='black')        axes[21].axvline(x=np.mean(returns), color='red', linestyle='--', label=f'均值: {np.mean(returns):.4%}')        axes[21].set_title('日收益率分布')        axes[21].set_xlabel('日收益率')        axes[21].set_ylabel('频数')        axes[21].legend()        axes[21].grid(True)        plt.tight_layout()        plt.show()        # 打印绩效指标        metrics = self.calculate_performance_metrics()        if metrics:            print("\n" + "="*50)            print("策略绩效指标:")            print("="*50)            for key, value in metrics.items():                if isinstance(value, float):                    if '率' in key or '比' in key:                        print(f"{key}{value:.4%}" if '回撤' not in key else f"{key}{value:.4%}")                    elif '收益' in key:                        print(f"{key}{value:.4%}")                    else:                        print(f"{key}{value:.4f}")                else:                    print(f"{key}{value}")    def run_optimization(self):        """运行参数优化(简化版)"""        print("开始参数优化...")        # 定义参数网格        param_grid = {            'lookback_window': [306090],            'forward_window': [3510],            'max_position_ratio': [0.20.30.4],            'stop_loss_pct': [0.030.050.08]        }        best_params = None        best_sharpe = -np.inf        # 简化优化:只测试少数组合        import itertools        keys, values = zip(*param_grid.items())        param_combinations = [dict(zip(keys, v)) for v in itertools.product(*values)]        # 只测试前几种组合(实际应用中可以更多)        for i, params in enumerate(param_combinations[:4]):            print(f"\n测试参数组合 {i+1}/{len(param_combinations[:4])}{params}")            # 更新配置            for key, value in params.items():                self.config[key] = value            # 重新准备数据            self.load_and_prepare_data()            # 重新训练模型            self.build_model()            self.train_model()            # 运行回测            self.backtest()            # 计算夏普比率            metrics = self.calculate_performance_metrics()            sharpe = metrics['夏普比率']            print(f"夏普比率: {sharpe:.4f}")            if sharpe > best_sharpe:                best_sharpe = sharpe                best_params = params.copy()        print(f"\n最优参数: {best_params}")        print(f"最优夏普比率: {best_sharpe:.4f}")        return best_params, best_sharpe# ==================== 第五部分:主程序执行 ====================def main():    """主函数"""    print("="*60)    print("深度学习ETF轮动策略")    print("目标:择时 + 仓位管理 + 控制回撤 + 提高夏普比率")    print("="*60)    # 配置策略    config = {        'initial_capital'1000000,        'lookback_window'60,  # 60天历史数据        'forward_window'5,    # 预测未来5天        'train_test_split'0.8,        'batch_size'32,        'epochs'50,           # 训练轮数        'learning_rate'0.001,        'max_position_ratio'0.3,      # 单个ETF最大仓位30%        'stop_loss_pct'0.05,          # 止损比例5%        'max_drawdown_limit'0.15,     # 最大回撤限制15%        'transaction_cost'0.001,      # 交易成本0.1%        'etf_codes': ['510300''510500''159915''511010'],  # ETF列表        'model_type''multitask',      # 使用多任务学习模型        'volatility_scaling'True,     # 启用波动率调整        'start_date''2016-01-01',     # 数据开始日期        'end_date''2023-12-31'        # 数据结束日期    }    # 初始化策略    strategy = DeepLearningETFRotationStrategy(config)    # 1. 加载和准备数据    print("\n[阶段1] 加载和准备数据...")    strategy.load_and_prepare_data()    # 2. 构建和训练模型    print("\n[阶段2] 构建和训练深度学习模型...")    strategy.build_model()    strategy.train_model()    # 3. 运行回测    print("\n[阶段3] 运行策略回测...")    backtest_results = strategy.backtest()    # 4. 分析结果    print("\n[阶段4] 分析回测结果...")    strategy.plot_results()    # 5. 参数优化(可选)    print("\n[阶段5] 运行参数优化...")    optimize = input("是否运行参数优化?(y/n): ").lower() == 'y'    if optimize:        best_params, best_sharpe = strategy.run_optimization()        # 使用最优参数重新运行        print("\n使用最优参数重新运行策略...")        for key, value in best_params.items():            strategy.config[key] = value        strategy.load_and_prepare_data()        strategy.build_model()        strategy.train_model()        strategy.backtest()        strategy.plot_results()    print("\n策略执行完成!")if __name__ == "__main__":    main()
具体咨询模型代码和细节,可以扫描二维码免费加入量化学习群!

03

下期我们再见!
我们会在学习群每日发布量化信号,让摸鱼学员每天及时获得量化信号进行跟踪和测试,欢迎加入我们的学习群,联系客服就可以免费加入!   

风险提示 :模型所有统计结果均基于历史数据,未来市场可能发生重大变化;单因子的收益可能存在较大波动,实际应用需结合资金管理、风险控制等方法。