diff --git a/cta_1d/__init__.py b/cta_1d/__init__.py index f7c6774..c3975e5 100644 --- a/cta_1d/__init__.py +++ b/cta_1d/__init__.py @@ -47,6 +47,7 @@ Backtesting: # Re-export all public APIs from src submodules from .src import ( CTA1DLoader, + CTA1DLoaderParquet, get_blend_weights, describe_blend_config, BLEND_CONFIGS, @@ -57,6 +58,7 @@ try: from .src import run_backtest, BacktestConfig __all__ = [ 'CTA1DLoader', + 'CTA1DLoaderParquet', 'get_blend_weights', 'describe_blend_config', 'BLEND_CONFIGS', @@ -69,6 +71,7 @@ except ImportError: # xgboost or sklearn not installed __all__ = [ 'CTA1DLoader', + 'CTA1DLoaderParquet', 'get_blend_weights', 'describe_blend_config', 'BLEND_CONFIGS', diff --git a/cta_1d/config_parquet.yaml b/cta_1d/config_parquet.yaml new file mode 100644 index 0000000..1126d8b --- /dev/null +++ b/cta_1d/config_parquet.yaml @@ -0,0 +1,41 @@ +# CTA 1D Configuration for Parquet Loading +# Use this instead of config.yaml when using CTA1DLoaderParquet + +data: + dt_range: ['2020-01-01', '2023-12-31'] + + # Parquet paths (populated by data_ops_research) + feature_path: /data/parquet/dataset/cta_alpha158_1d + hffactor_path: /data/parquet/dataset/cta_hffactor_1d + dom_path: /data/parquet/dataset/cta_dom_1d + label_path: /data/parquet/dataset/cta_labels_1d + + feature_sets: + - alpha158 + - hffactor + normalization: dual + blend_weights: equal + +segments: + train: ['2020-01-01', '2022-06-30'] + valid: ['2022-07-01', '2022-12-31'] + test: ['2023-01-01', '2023-12-31'] + +model: + type: xgb + params: + objective: reg:squarederror + eval_metric: rmse + eta: 0.05 + max_depth: 6 + subsample: 0.8 + colsample_bytree: 0.8 + seed: 42 + num_boost_round: 500 + early_stopping_rounds: 50 + +training: + return_type: o2c_twap1min + weight_factors: + positive: 1.0 + negative: 2.0 diff --git a/cta_1d/src/__init__.py b/cta_1d/src/__init__.py index 3aa7baf..8679965 100644 --- a/cta_1d/src/__init__.py +++ b/cta_1d/src/__init__.py @@ -1,6 +1,7 @@ """CTA 1-day task-specific utilities.""" from .loader import CTA1DLoader +from .loader_parquet import CTA1DLoaderParquet from .labels import get_blend_weights, describe_blend_config, BLEND_CONFIGS try: @@ -8,6 +9,7 @@ try: from .backtest import run_backtest, BacktestConfig __all__ = [ 'CTA1DLoader', + 'CTA1DLoaderParquet', 'train_model', 'TrainConfig', 'run_backtest', @@ -20,6 +22,7 @@ except ImportError: # xgboost or sklearn not installed __all__ = [ 'CTA1DLoader', + 'CTA1DLoaderParquet', 'get_blend_weights', 'describe_blend_config', 'BLEND_CONFIGS', diff --git a/cta_1d/src/loader.py b/cta_1d/src/loader.py index 2c0dc46..6b3f3ae 100644 --- a/cta_1d/src/loader.py +++ b/cta_1d/src/loader.py @@ -15,10 +15,21 @@ import polars as pl from qshare.data import pl_Dataset, pl_pipe, pl_clip, pl_cs_zscore from qshare.data.universal import DataSpec from qshare.io.ddb import get_ddb_sess, reset_index_from_ddb -from qshare.config.research.cta.features import HFFACTOR_COLS from .labels import get_blend_weights +# HFFACTOR columns (defined inline - qshare.config.research not available) +HFFACTOR_COLS = [ + 'vol_1min', + 'skew_1min', + 'volp_1min', + 'volp_ratio_1min', + 'voln_ratio_1min', + 'trend_strength_1min', + 'pv_corr_1min', + 'flowin_ratio_1min', +] + @dataclass class CTA1DLoader: @@ -97,9 +108,13 @@ class CTA1DLoader: df = df_features.join(df_label, on=['datetime', 'instrument'], how='inner') # Filter to requested date range + # Convert string dates to date objects for proper comparison + from datetime import datetime + start_dt = datetime.strptime(start_date, '%Y-%m-%d').date() + end_dt = datetime.strptime(end_date, '%Y-%m-%d').date() df = df.filter( - (pl.col('datetime') >= start_date) & - (pl.col('datetime') <= end_date) + (pl.col('datetime') >= start_dt) & + (pl.col('datetime') <= end_dt) ) # Calculate weights @@ -175,12 +190,11 @@ class CTA1DLoader: since_ddb = pd.to_datetime(start_date).strftime('%Y.%m.%d') # Load from factor table - df = sess.run(f""" - select code, m_nDate, factor_name, value - from loadTable('dfs://daily_stock_run', 'stg_1day_tinysoft_cta_hffactor') - where m_nDate >= {since_ddb} - and factor_name in [{','.join([f"'{c}'" for c in HFFACTOR_COLS])}] - """) + factor_list = ','.join([f"'{c}'" for c in HFFACTOR_COLS]) + query = f"""select code, m_nDate, factor_name, value +from loadTable('dfs://daily_stock_run', 'stg_1day_tinysoft_cta_hffactor') +where m_nDate >= {since_ddb} and factor_name in [{factor_list}]""" + df = sess.run(query) # Pivot to wide format df = df.pivot_table( @@ -263,20 +277,16 @@ class CTA1DLoader: """Apply specified normalization to label.""" fit_start, fit_end = fit_range - # Ensure datetime column is string for comparison - if pl_df['datetime'].dtype == pl.Date: - pl_df = pl_df.with_columns( - pl.col('datetime').dt.strftime('%Y-%m-%d').alias('datetime_str') - ) - date_col = 'datetime_str' - else: - date_col = 'datetime' + # Convert fit_range strings to date objects for comparison + from datetime import datetime + fit_start_date = datetime.strptime(fit_start, '%Y-%m-%d').date() + fit_end_date = datetime.strptime(fit_end, '%Y-%m-%d').date() if self.normalization == 'zscore': # Calculate mean/std on fit range fit_data = pl_df.filter( - (pl.col(date_col) >= fit_start) & - (pl.col(date_col) <= fit_end) + (pl.col('datetime') >= fit_start_date) & + (pl.col('datetime') <= fit_end_date) ) mean = fit_data['ret'].mean() std = fit_data['ret'].std() @@ -334,17 +344,15 @@ class CTA1DLoader: """Create z-score normalized label.""" fit_start, fit_end = fit_range - # Handle date type conversion for comparison - if pl_df['datetime'].dtype == pl.Date: - fit_data = pl_df.filter( - (pl.col('datetime').dt.strftime('%Y-%m-%d') >= fit_start) & - (pl.col('datetime').dt.strftime('%Y-%m-%d') <= fit_end) - ) - else: - fit_data = pl_df.filter( - (pl.col('datetime') >= fit_start) & - (pl.col('datetime') <= fit_end) - ) + # Convert fit_range strings to date objects for comparison + from datetime import datetime + fit_start_date = datetime.strptime(fit_start, '%Y-%m-%d').date() + fit_end_date = datetime.strptime(fit_end, '%Y-%m-%d').date() + + fit_data = pl_df.filter( + (pl.col('datetime') >= fit_start_date) & + (pl.col('datetime') <= fit_end_date) + ) mean = fit_data['ret'].mean() std = fit_data['ret'].std() diff --git a/cta_1d/src/loader_parquet.py b/cta_1d/src/loader_parquet.py new file mode 100644 index 0000000..6f927dc --- /dev/null +++ b/cta_1d/src/loader_parquet.py @@ -0,0 +1,427 @@ +"""CTA 1D loader using Parquet/Polars (parallel to CTA1DLoader).""" + +from dataclasses import dataclass +from datetime import date, datetime +from typing import List, Optional + +import numpy as np +import pandas as pd +import polars as pl + +from qshare.data import pl_Dataset +from qshare.io.polars import load_from_pq + +from .labels import get_blend_weights + +# HFFACTOR columns (defined inline to avoid qshare.config.research dependency) +HFFACTOR_COLS = [ + 'vol_1min', + 'skew_1min', + 'volp_1min', + 'volp_ratio_1min', + 'voln_ratio_1min', + 'trend_strength_1min', + 'pv_corr_1min', + 'flowin_ratio_1min', +] + + +@dataclass +class CTA1DLoaderParquet: + """CTA 1D loader using Parquet files instead of DolphinDB. + + This is a parallel implementation to CTA1DLoader that reads from + pre-exported Parquet tables instead of querying DolphinDB directly. + + Example: + >>> loader = CTA1DLoaderParquet( + ... return_type='o2c_twap1min', + ... normalization='dual', + ... feature_sets=['alpha158', 'hffactor'] + ... ) + >>> dataset = loader.load(dt_range=['2020-01-01', '2023-12-31']) + >>> dataset = dataset.with_segments({ + ... 'train': ('2020-01-01', '2022-12-31'), + ... 'test': ('2023-01-01', '2023-12-31') + ... }) + >>> X_train, y_train, w_train = dataset.split('train').to_numpy() + """ + + return_type: str = 'o2c_twap1min' + normalization: str = 'dual' + feature_sets: List[str] = None + weight_factors: dict = None + blend_weights: str | List[float] | None = None + + # Parquet paths (populated by data_ops_research) + feature_path: str = '/data/parquet/dataset/cta_alpha158_1d' + hffactor_path: str = '/data/parquet/dataset/cta_hffactor_1d' + dom_path: str = '/data/parquet/dataset/cta_dom_1d' + label_path: str = '/data/parquet/dataset/cta_labels_1d' + + label_cap_upper: float = 0.5 + label_cap_lower: float = -0.5 + + def __post_init__(self): + if self.feature_sets is None: + self.feature_sets = ['alpha158', 'hffactor'] + if self.weight_factors is None: + self.weight_factors = {'positive': 1.0, 'negative': 2.0} + + def load( + self, + dt_range: List[str], + fit_range: Optional[List[str]] = None + ) -> pl_Dataset: + """Load and prepare CTA 1-day training dataset from Parquet files. + + Args: + dt_range: Date range [start_date, end_date] for dataset + fit_range: Date range [start, end] for fitting normalization params. + If None, uses first 60% of dt_range. + + Returns: + pl_Dataset with features, label, and weight columns + """ + start_date, end_date = dt_range + + if fit_range is None: + # Default: use first 60% for fit + all_dates = pd.date_range(start_date, end_date) + split_idx = int(len(all_dates) * 0.6) + fit_range = [ + all_dates[0].strftime('%Y-%m-%d'), + all_dates[split_idx].strftime('%Y-%m-%d') + ] + + # Load extended history for rolling normalization + load_start = (pd.to_datetime(start_date) - pd.Timedelta(days=120)).strftime('%Y-%m-%d') + + # Load features + df_features = self._load_features(load_start, end_date) + + # Load and normalize labels + df_label = self._load_labels(load_start, end_date, fit_range) + + # Combine + df = df_features.join(df_label, on=['datetime', 'instrument'], how='inner') + + # Filter to requested date range + start_dt = datetime.strptime(start_date, '%Y-%m-%d').date() + end_dt = datetime.strptime(end_date, '%Y-%m-%d').date() + df = df.filter( + (pl.col('datetime') >= start_dt) & + (pl.col('datetime') <= end_dt) + ) + + # Calculate weights + df = self._calculate_weights(df) + + # Clean data + df = self._clean_data(df) + + # Get feature columns + feature_cols = [c for c in df.columns + if any(c.startswith(prefix) for prefix in ['f_a158_', 'f_hf_'])] + + return pl_Dataset( + data=df, + features=feature_cols, + label='label', + weight='weight' if self.weight_factors else None + ) + + def _load_features(self, start_date: str, end_date: str) -> pl.DataFrame: + """Load feature data from Parquet files.""" + feature_dfs = [] + + if 'alpha158' in self.feature_sets: + df_alpha = self._load_alpha158(start_date, end_date) + feature_dfs.append(df_alpha) + + if 'hffactor' in self.feature_sets: + df_hf = self._load_hffactor(start_date, end_date) + feature_dfs.append(df_hf) + + # Join all feature sets + result = feature_dfs[0] + for df in feature_dfs[1:]: + result = result.join(df, on=['datetime', 'instrument'], how='inner') + + return result + + def _load_alpha158(self, start_date: str, end_date: str) -> pl.DataFrame: + """Load alpha158 features from Parquet.""" + # Load using qshare's parquet loader + df = load_from_pq( + self.feature_path, + start_date=start_date, + end_date=end_date + ) + + # Drop non-numeric columns if present + if 'code_init' in df.columns: + df = df.drop('code_init') + + # Rename columns with prefix + rename_map = { + c: f'f_a158_{c}' + for c in df.columns + if c not in ['datetime', 'instrument'] + } + df = df.rename(rename_map) + + return df + + def _load_hffactor(self, start_date: str, end_date: str) -> pl.DataFrame: + """Load hffactor features from Parquet (already pivoted).""" + # Load using qshare's parquet loader + df = load_from_pq( + self.hffactor_path, + start_date=start_date, + end_date=end_date + ) + + # Rename columns with prefix + rename_map = { + c: f'f_hf_{c}' + for c in df.columns + if c not in ['datetime', 'instrument'] + } + df = df.rename(rename_map) + + return df + + def _load_labels( + self, + start_date: str, + end_date: str, + fit_range: List[str] + ) -> pl.DataFrame: + """Load and normalize labels from Parquet files.""" + # Map return type to indicator name + indicator_map = { + 'o2c_twap1min': 'twap_open1m@1_twap_close1m@1', + 'o2o_twap1min': 'twap_open1m@1_twap_open1m@2', + } + indicator = indicator_map.get(self.return_type, self.return_type) + + # Load dominant contract mapping from Parquet + df_contract = load_from_pq( + self.dom_path, + start_date=start_date, + end_date=end_date + ) + + # Load returns from Parquet + df_return = load_from_pq( + self.label_path, + start_date=start_date, + end_date=end_date + ) + + # Filter for the specific indicator + df_return = df_return.filter(pl.col('indicator') == indicator) + + # Merge with dominant contract mapping + df_return = df_return.join( + df_contract, + on=['datetime', 'instrument'], + how='inner' + ) + + # Use code_init as the instrument (continuous contract) + df_return = df_return.with_columns( + (pl.col('code_init') + 'Ind').alias('instrument') + ) + + # Select and rename return column + df_return = df_return.select([ + 'datetime', + 'instrument', + pl.col('value').alias('ret') + ]) + + # Apply normalization + df_return = self._normalize_label(df_return, fit_range) + + return df_return + + def _normalize_label(self, pl_df: pl.DataFrame, fit_range: List[str]) -> pl.DataFrame: + """Apply specified normalization to label.""" + fit_start, fit_end = fit_range + + # Convert fit_range strings to date objects for comparison + fit_start_date = datetime.strptime(fit_start, '%Y-%m-%d').date() + fit_end_date = datetime.strptime(fit_end, '%Y-%m-%d').date() + + if self.normalization == 'zscore': + # Calculate mean/std on fit range + fit_data = pl_df.filter( + (pl.col('datetime') >= fit_start_date) & + (pl.col('datetime') <= fit_end_date) + ) + mean = fit_data['ret'].mean() + std = fit_data['ret'].std() + + result = pl_df.with_columns( + ((pl.col('ret') - mean) / std).clip( + self.label_cap_lower, self.label_cap_upper + ).alias('label') + ).select(['datetime', 'instrument', 'label']) + return result + + elif self.normalization == 'cs_zscore': + # Cross-sectional z-score per datetime + return pl_df.with_columns( + ((pl.col('ret') - pl.col('ret').mean().over('datetime')) / + pl.col('ret').std().over('datetime')).clip( + self.label_cap_lower, self.label_cap_upper + ).alias('label') + ).select(['datetime', 'instrument', 'label']) + + elif self.normalization == 'rolling_20': + return self._apply_rolling_norm(pl_df, window=20, fit_range=fit_range) + + elif self.normalization == 'rolling_60': + return self._apply_rolling_norm(pl_df, window=60, fit_range=fit_range) + + elif self.normalization == 'dual': + # Create all normalization variants + label_zscore = self._normalize_zscore(pl_df, fit_range) + label_cszscore = self._normalize_cs_zscore(pl_df) + label_roll20 = self._normalize_rolling(pl_df, window=20, fit_range=fit_range) + label_roll60 = self._normalize_rolling(pl_df, window=60, fit_range=fit_range) + + # Get blend weights + weights = get_blend_weights(self.blend_weights) + + # Join and blend + pl_df = label_zscore.join(label_cszscore, on=['datetime', 'instrument']) + pl_df = pl_df.join(label_roll20, on=['datetime', 'instrument']) + pl_df = pl_df.join(label_roll60, on=['datetime', 'instrument']) + + return pl_df.with_columns( + (weights[0] * pl.col('label_zscore') + + weights[1] * pl.col('label_cszscore') + + weights[2] * pl.col('label_roll20') + + weights[3] * pl.col('label_roll60')).clip( + self.label_cap_lower, self.label_cap_upper + ).alias('label') + ).select(['datetime', 'instrument', 'label']) + + else: + raise ValueError(f"Unknown normalization: {self.normalization}") + + def _normalize_zscore(self, pl_df: pl.DataFrame, fit_range: List[str]) -> pl.DataFrame: + """Create z-score normalized label.""" + fit_start, fit_end = fit_range + + fit_start_date = datetime.strptime(fit_start, '%Y-%m-%d').date() + fit_end_date = datetime.strptime(fit_end, '%Y-%m-%d').date() + + fit_data = pl_df.filter( + (pl.col('datetime') >= fit_start_date) & + (pl.col('datetime') <= fit_end_date) + ) + + mean = fit_data['ret'].mean() + std = fit_data['ret'].std() + + return pl_df.with_columns( + ((pl.col('ret') - mean) / std).alias('label_zscore') + ).select(['datetime', 'instrument', 'label_zscore']) + + def _normalize_cs_zscore(self, pl_df: pl.DataFrame) -> pl.DataFrame: + """Create cross-sectional z-score normalized label.""" + return pl_df.with_columns( + ((pl.col('ret') - pl.col('ret').mean().over('datetime')) / + pl.col('ret').std().over('datetime')).alias('label_cszscore') + ).select(['datetime', 'instrument', 'label_cszscore']) + + def _normalize_rolling( + self, + pl_df: pl.DataFrame, + window: int, + fit_range: List[str] + ) -> pl.DataFrame: + """Create rolling window normalized label.""" + # Convert to pandas for rolling calculation + pd_df = pl_df.to_pandas().set_index(['datetime', 'instrument']) + + # Unstack to wide format + df_wide = pd_df['ret'].unstack('instrument') + + # Calculate rolling mean and std + rolling_mean = df_wide.rolling(window=window, min_periods=window//2).mean() + rolling_std = df_wide.rolling(window=window, min_periods=window//2).std() + + # Normalize + df_normalized = (df_wide - rolling_mean) / rolling_std + + # Restack + rolling_label = df_normalized.stack().reset_index() + rolling_label.columns = ['datetime', 'instrument', f'label_roll{window}'] + + return pl.from_pandas(rolling_label) + + def _apply_rolling_norm( + self, + pl_df: pl.DataFrame, + window: int, + fit_range: List[str] + ) -> pl.DataFrame: + """Apply rolling normalization and cap.""" + result = self._normalize_rolling(pl_df, window, fit_range) + return result.with_columns( + pl.col(f'label_roll{window}').clip( + self.label_cap_lower, self.label_cap_upper + ).alias('label') + ).select(['datetime', 'instrument', 'label']) + + def _calculate_weights(self, pl_df: pl.DataFrame) -> pl.DataFrame: + """Calculate sample weights based on return magnitude.""" + # Base weights by return magnitude tiers + pl_df = pl_df.with_columns( + pl.when(pl.col('label').abs() > 1.5).then(pl.lit(2.5)) + .when(pl.col('label').abs() > 1.0).then(pl.lit(2.0)) + .when(pl.col('label').abs() > 0.5).then(pl.lit(1.5)) + .when(pl.col('label').abs() > 0.2).then(pl.lit(1.0)) + .otherwise(0.0).alias('weight') + ) + + # Apply negative return multiplier + if self.weight_factors.get('negative'): + pl_df = pl_df.with_columns( + pl.when(pl.col('label') < -0.5) + .then(pl.col('weight') * self.weight_factors['negative']) + .otherwise(pl.col('weight')) + .alias('weight') + ) + + # Apply positive return multiplier + if self.weight_factors.get('positive'): + pl_df = pl_df.with_columns( + pl.when(pl.col('label') > 0.5) + .then(pl.col('weight') * self.weight_factors['positive']) + .otherwise(pl.col('weight')) + .alias('weight') + ) + + return pl_df + + def _clean_data(self, pl_df: pl.DataFrame) -> pl.DataFrame: + """Clean data: remove inf/nan values.""" + # Get numeric columns only + numeric_cols = [ + c for c in pl_df.columns + if pl_df[c].dtype in [pl.Float32, pl.Float64, pl.Int32, pl.Int64] + ] + + # Replace inf with null, then drop nulls + pl_df = pl_df.with_columns([ + pl.when(pl.col(c).is_infinite()).then(None).otherwise(pl.col(c)).alias(c) + for c in numeric_cols + ]) + + return pl_df.drop_nulls() diff --git a/data_ops_research/cta_1d/README.md b/data_ops_research/cta_1d/README.md new file mode 100644 index 0000000..613ae80 --- /dev/null +++ b/data_ops_research/cta_1d/README.md @@ -0,0 +1,44 @@ +# CTA 1D Parquet Dataset + +This directory contains requirements for CTA (Commodity Trading Advisor) futures +Parquet datasets used by alpha_lab. + +## Tables + +### cta_alpha158_1d +Alpha158 features for CTA futures. +- **Source**: `dfs://daily_stock_run.stg_1day_tinysoft_cta_alpha159_0_7_beta` +- **Output**: `/data/parquet/dataset/cta_alpha158_1d/` +- **Columns**: ~163 feature columns + code, m_nDate + +### cta_hffactor_1d +High-frequency factor features (8 columns). +- **Source**: `dfs://daily_stock_run.stg_1day_tinysoft_cta_hffactor` +- **Output**: `/data/parquet/dataset/cta_hffactor_1d/` +- **Transformation**: Pivot from long to wide format + - Input columns: code, m_nDate, factor_name, value + - Output columns: code, m_nDate, vol_1min, skew_1min, ... (8 features) +- **Filter**: Only include factor_name in [vol_1min, skew_1min, volp_1min, + volp_ratio_1min, voln_ratio_1min, trend_strength_1min, pv_corr_1min, + flowin_ratio_1min] + +### cta_dom_1d +Dominant contract mapping for continuous contracts. +- **Source**: `dfs://daily_stock_run.dwm_1day_cta_dom` +- **Output**: `/data/parquet/dataset/cta_dom_1d/` +- **Filter**: version = 'vp_csmax_roll2_cummax' +- **Aggregation**: GROUP BY m_nDate, code_init; SELECT first(code) as code + +### cta_labels_1d +Return labels for different return types. +- **Source**: `dfs://daily_stock_run.stg_1day_tinysoft_cta_hfvalue` +- **Output**: `/data/parquet/dataset/cta_labels_1d/` +- **Filter**: indicator in [twap_open1m@1_twap_close1m@1, twap_open1m@1_twap_open1m@2] +- **Columns**: code, m_nDate, indicator, value + +## Consumer + +Used by: `alpha_lab/cta_1d/src/loader_parquet.py` + +The alpha_lab project will create a parallel loader that reads from these +Parquet tables instead of DolphinDB. diff --git a/data_ops_research/cta_1d/requirements.yaml b/data_ops_research/cta_1d/requirements.yaml new file mode 100644 index 0000000..b751b05 --- /dev/null +++ b/data_ops_research/cta_1d/requirements.yaml @@ -0,0 +1,88 @@ +# CTA 1D Parquet Dataset Requirements +# This file specifies the required Parquet tables for alpha_lab CTA 1D task + +# Table 1: Alpha158 Features +cta_alpha158_1d: + source: + database: dfs://daily_stock_run + table: stg_1day_tinysoft_cta_alpha159_0_7_beta + host: 192.168.1.146 + port: 8848 + target: + path: cta_alpha158_1d/ + partition_freq: 1D + col_datetime: m_nDate + code_format: tscode + description: Alpha158 features for CTA futures (~163 columns) + priority: medium + +# Table 2: HFFactor Features (requires pivot) +cta_hffactor_1d: + source: + database: dfs://daily_stock_run + table: stg_1day_tinysoft_cta_hffactor + host: 192.168.1.146 + port: 8848 + # Long format: code, m_nDate, factor_name, value + # Pivot to wide format during export + pivot: + index: [code, m_nDate] + columns: factor_name + values: value + filter: # Only these 8 columns needed + - vol_1min + - skew_1min + - volp_1min + - volp_ratio_1min + - voln_ratio_1min + - trend_strength_1min + - pv_corr_1min + - flowin_ratio_1min + target: + path: cta_hffactor_1d/ + partition_freq: 1D + col_datetime: m_nDate + code_format: tscode + description: High-frequency factor features (8 columns, pivoted from long format) + priority: medium + notes: Requires pivot transformation from long to wide format + +# Table 3: Dominant Contract Mapping +cta_dom_1d: + source: + database: dfs://daily_stock_run + table: dwm_1day_cta_dom + host: 192.168.1.146 + port: 8848 + # Group and aggregate during export + group_by: [m_nDate, code_init] + filter: "version='vp_csmax_roll2_cummax'" + agg: "first(code) as code" + target: + path: cta_dom_1d/ + partition_freq: 1D + col_datetime: m_nDate + code_format: tscode + description: Dominant contract mapping for continuous contracts + priority: medium + notes: Requires group_by + aggregation, filter by version + +# Table 4: Return Labels +cta_labels_1d: + source: + database: dfs://daily_stock_run + table: stg_1day_tinysoft_cta_hfvalue + host: 192.168.1.146 + port: 8848 + # Filter for specific indicators + indicators: + - twap_open1m@1_twap_close1m@1 # o2c_twap1min + - twap_open1m@1_twap_open1m@2 # o2o_twap1min + target: + path: cta_labels_1d/ + partition_freq: 1D + col_datetime: m_nDate + code_format: tscode + description: Return labels for different return types + priority: medium + notes: Filter indicator column for specific return types