基于 Apache DolphinSchedule 打造机器学习智能选股系统

Apache DolphinScheduler 已经在DataOps领域提供了强大的分布式可视化工作流调度能力。2022年,我们为其新增了机器学习任务调度的能力,逐步开箱即用式地支持主流的MLops项目/服务商的功能。

Apache DolphinScheduler 目前已经支持的MLOps工具包括MLflow,DVC,Jupyter,OpenMLDB等任务组件,可以让用户低成本,更容易地编排机器学习系统。

本文为大家介绍如何使用Apache DolphinScheduler来打造一个机器学习选股系统,每日自动更新选股模型,智能选股,在交易时间持续监控模型选股效果。

Apache DolphinScheduler

1

系统介绍 概况

01

概况

系统展示

系统展示如下,为2022年6月24的情况:

  • Machine learning stock picking daily dashboard
  • 为2022-06-23晚上选出的股票,在2022年6月24的实时表现,包括涨跌幅,涨速等信息
  • The real-time average returns of the top 10 stocks are as follows
  • 实时展示了选出的10个股票在日内平均的收益走势情况
  • The distribution of the ups and downs of stocks selected by the system
  • 实时展示选出的10个股票的涨跌幅分布
  • The distribution of gains and losses across the stock market
  • 实时展示了整个市场的股票的涨跌幅分布

基于ApacheDolphinSchedule打造机器学习智能选股系统

基于ApacheDolphinSchedule打造机器学习智能选股系统

基于ApacheDolphinSchedule打造机器学习智能选股系统

基于ApacheDolphinSchedule打造机器学习智能选股系统

系统运行原理

选股逻辑

计算整个股票市场中符合 五日均线高于10日均线信号 的股票作为标的池,对于上述股票池中的每个股票构建以下特征用于训练模型:

1. 股价与布林带三条轨道的相对值

2. 股价与多条均线的相对值;多条均线的多个间隔的斜率

3. 当前K线的形态,用talib pattern计算,如是否为三只乌鸦,是否为十字星等,详情可见K线模式识别(https://www.jianshu.com/p/fd5c7f49db33)

备注:你也可以加上任何你认为有用的信息作为特征。

模型训练

以第二天是否上涨为准进行二分类,使用AutoML工具flaml进行5分钟训练,每天构建120天的数据集,其中后7天用于评估。

胜率与盈亏比得到的效果如下:

• 单纯使用五日均线高于10日均线买卖时:胜率0.46,盈亏比 1.15,期望约为-0.011;

• 以五日均线高于10日均线买卖为信号加上机器学习模型后,胜率为0.58,盈亏比为1.35,期望约为0.363。

以上为同时期不加模型策略与加模型策略的对比,期望从 -0.011 提升到 0.363。

备注:以上对比为2022-06-23当日模型训练(训练集22万条,测试集1.7万条)完后的评估指标,其中模型每天选择置信度前10(可以认为是模型认为上涨概率最大的前10)的股票;

基于ApacheDolphinSchedule打造机器学习智能选股系统

实盘的表现,可见系统展示中2022-06-24的实时表现。

任务调度

系统中所有的任务调度均使用Apache DolphinScheduler完成,包括 数据处理,特征工程,模型训练,模型评估,模型上线,批量实时预测,效果监控 等。

Apache DolphinScheduler可以 定时每天晚上自动更新模型,并上线进行预测。 在当天模型表现不好时,可以调参调特征 一键重新训练新的模型 并评估上线。

还有其对任务的容错机制,可以保证系统能够稳定运行。

前端展示

前端展示使用Observable实现,得益于其Notebook丰富和易用的可视化数据分析特性,构建出监控实时选股系统的效果监控。

技术栈

主要涉及的技术能力如下:

基于ApacheDolphinSchedule打造机器学习智能选股系统

关于量化知识,因用户水平不同可以进行不同的选股逻辑和特征工程。

该系统旨在帮助用户构建符合自己认知的选股系统(当你有很多的选股参考信息,又不知道如何组合才能更好地做决策时)。

项目中涉及到的工作流任务的实现可以在这里找到 GitHub - Apache DolphinScheduler-MLOps-Stock-Analysis(https://github.com/jieguangzhou/DolphinScheduler-MLOps-Stock-Analysis)

02

模块介绍

模块介绍章节涉及到整个系统每个模块具体的实现代码与细节,你也可以根据阅读习惯,先跳到文章最后总结章节看完再回来这里继续阅读。

后端模块介绍

下图所示是一个整体的工作流 run_system,该工作流每天晚上定时启动,进行数据更新,模型训练,模型评估,模型部署和推理,然后推荐每天的股票池。

run_system 工作流包含4个子工作流:

• prepare_datas : 每日数据*载下**,信号计算,特征(在量化交易中成为因子)计算

• training_model:生成训练数据,训练模型,对模型进行评估

• deployment:部署模型

• batch_inference:生成要批量算法预测的股票,并进行预测。

基于ApacheDolphinSchedule打造机器学习智能选股系统

prepare_datas

下图所示是数据准备的工作流 prepare_datas,该工作流会*载下**股票数据并进行信号计算和特征计算:

download_data : *载下**全市场的股票日线数据

calc_signals: 进行信号计算(计算每天符合信号条件的股票,如每天5日均线与10日均线金叉的股票)

calc_features: 特征(量化交易中称为因子)计算(计算每个股票每天的特征值,如收盘价与5日均线的相对值,股票是否是十字星形态等)

基于ApacheDolphinSchedule打造机器学习智能选股系统

download_data

任务类型为Shell 任务类型,用于*载下**股票市场的日线数据。

基于ApacheDolphinSchedule打造机器学习智能选股系统

# 添加项目路径,为Git clone 项目Apache DolphinScheduler-MLOps-Stock-Analysis 下来后地址,方便Python直接在别的目录运行该项目的脚本

export PYTHONPATH=${project}

# 激活Python环境

source ${project}/env/bin/activate

# 设置股票数据*载下**的路径

data_path=${project}/data/daily

# *载下**数据到指定路径

python -m dmsa.data.download ${data_path}

其中自定义参数中的设置的意思为启动工作流时传入project参数的值,即可替换shell中的${project},如传入/home/user/Apache DolphinScheduler-MLOps-Stock-Analysis,则第一行会变成 export PYTHONPATH=/home/user/Apache DolphinScheduler-MLOps-Stock-Analysis,下文如无特殊,将不再介绍该Apache DolphinScheduler的特性。

calc_signals

任务类型为Shell 任务类型,根据前一个任务中*载下**完的数据计算所有的信号。

基于ApacheDolphinSchedule打造机器学习智能选股系统

# 添加项目路径,为Git clone 项目Apache DolphinScheduler-MLOps-Stock-Analysis 下来后地址,方便python直接在别的目录运行该项目的脚本

export PYTHONPATH=${project}

# 激活python环境

source ${project}/env/bin/activate

# 设置股票数据*载下**的路径

data_path=${project}/data/daily

# 根据feature_signal.txt的配置,计算信号,详情可以见项目中的实现

python -m dmsa.data_processing.calc_signals \

    --data_path ${data_path} \

    --name_file ${project}/feature_signal.txt

计算好的信号会自动存在Mysql中,Mysql的配置可以参考Apache DolphinScheduler-MLOps-Stock-Analysis(https://github.com/jieguangzhou/DolphinScheduler-MLOps-Stock-Analysis),进行简单配置即可。

calc_features

任务类型为Shell 任务类型,根据前一个任务中*载下**完的数据计算所有的特征。

基于ApacheDolphinSchedule打造机器学习智能选股系统

# 添加项目路径,为Git clone 项目Apache DolphinScheduler-MLOps-Stock-Analysis 下来后地址,方便python直接在别的目录运行该项目的脚本

export PYTHONPATH=${project}

# 激活python环境

source ${project}/env/bin/activate

# 设置股票数据*载下**的路径

data_path=${project}/data/daily

# 根据feature_signal.txt的配置,计算特征,详情可以见项目中的实现

python -m dmsa.data_processing.calc_features \

    --data_path ${data_path} \

    --name_file ${project}/feature_signal.txt

计算好的特征会自动存在Mysql中,Mysql的配置可以参考Apache DolphinScheduler-MLOps-Stock-Analysis(https://github.com/jieguangzhou/DolphinScheduler-MLOps-Stock-Analysis),进行简单配置即可。

如果需要更强大的特征计算与存储能力,可以使用Apache DolphinScheduler中的OpenMLDB组件(https://dolphinscheduler.apache.org/zh-cn/docs/dev/user_doc/guide/task/openmldb.html进行修改即可(可参考Apache DolphinScheduler OpenMLDB Task:打造端到端MLOps工作流 https://openmldb.ai/docs/zh/main/use_case/dolphinscheduler_task_demo.html)。

training_model

下图为工作流 training_model 的各个任务的DAG图,该工作流主要执行以下任务:

• prepare_data : 准备模型训练数据和模型评估数据

• training:训练模型

• Deployment: 部署刚刚训练的模型用户推理评估数据

• evaluate:评估模型,因为该评估的指标为胜率以及盈亏比,所以进行另外的计算评估。

• close_service:评估完成后关闭刚刚部署的服务

基于ApacheDolphinSchedule打造机器学习智能选股系统

prepare_data

任务类型为Shell 任务类型,准备训练数据用于训练模型,准备测试数据用于评估。

基于ApacheDolphinSchedule打造机器学习智能选股系统

export PYTHONPATH=${project}

source ${project}/env/bin/activate

save_data_path=${project}/data/training

# 生成数据集

python -m dmsa.data_processing.build_datas \

    --task_type train \

    --config ${project}/feature_signal.txt \

    --save_path ${save_data_path} \

    --data_path ${project}/data/daily

# 把生成的数据集所对应的路径 save_data_path 赋值到变量 data_path 中,并通过自定义参数设为OUT,传递给下游任务

echo "#{setValue(data_path=${save_data_path})}"

training

该任务类型为MLflow任务类型,由Apache DolphinScheduler内置提供,可以用于输入一份数据集,自动进行模型训练。

基于ApacheDolphinSchedule打造机器学习智能选股系统

上图表示,使用AutoML根据传入的数据(csv格式)进行120秒训练,AutoML的工具使用flaml。设置模型实验名称为baseline,模型将上传至MLflow服务中心,并注册模型名字为baseline。

更多的特性,可以查看Apache DolphinScheduler MLflow 组件(https://dolphinscheduler.apache.org/zh-cn/docs/dev/user_doc/guide/task/mlflow.html)。

Deployment

基于ApacheDolphinSchedule打造机器学习智能选股系统

该任务类型为MLflow任务类型,由Apache DolphinScheduler内置提供,可以部署MLflow服务中心的模型。

上图表示,使用将模型名字为baseline,版本号为Production的模型,以Docker形式部署并保留7070端口。

更多的特性,可以查看Apache DolphinScheduler MLflow 组件(https://dolphinscheduler.apache.org/zh-cn/docs/dev/user_doc/guide/task/mlflow.html)。

evaluate

任务类型为Shell 任务类型,用户请求服务获取评估数据的结果并进行评估。

基于ApacheDolphinSchedule打造机器学习智能选股系统

export PYTHONPATH=${project}

source ${project}/env/bin/activate

# 表示每天选择分数最高的3个股票,测试效果

python -m dmsa.evaluate.calc_evaluate \

    --source_data_path ${data_path}/source.csv \

    --evaluate_data ${data_path}/test.csv \

    --api 'http://127.0.0.1:7070/invocations' \

    --top_n 3

close_service

任务类型为Shell任务类型,用与关闭刚才启动的模型服务

# 刚才的模型服务会运行一个docker容器提供服务,容器名字会统一命名为 ds-mlflow-{model_name}-{model_version}

# 因此可以简单通过一行命令关闭服务

docker rm -f ds-mlflow-baseline-Production

Deployment

与上面的Deployment几乎一样,另外配一个7000端口用于推理服务。

基于ApacheDolphinSchedule打造机器学习智能选股系统

batch_inference

工作流 batch_inference 用于对需要在线跑的任务进行推理,包含两个任务:

• build_inference_data:生成需要推理的数据

• inference: 调用接口进行推理

基于ApacheDolphinSchedule打造机器学习智能选股系统

build_inference_data

任务类型为Shell 任务类型,用于生成需要推理的数据。

基于ApacheDolphinSchedule打造机器学习智能选股系统

export PYTHONPATH=${project}

source ${project}/env/bin/activate

save_data_path=${project}/data/inference.csv

# 用于生成需要推理的数据

python -m dmsa.data_processing.build_datas \

    --task_type inference \

    --config ${project}/feature_signal.txt \

    --save_path $save_data_path




echo "#{setValue(data_path=${save_data_path})}"

inference

任务类型为Shell 任务类型,根据上一个任务中生成的推理数据进行推理,并将分数前10的股票存到数据库中。

基于ApacheDolphinSchedule打造机器学习智能选股系统

export PYTHONPATH=${project}

source ${project}/env/bin/activate

# 推理,服务地址为 http://127.0.0.1:7000/invocations, 7000是刚才配的端口,invocations是MLflow服务默认的路径

python -m dmsa.evaluate.inference \

    --evaluate_data ${data_path} \

    --api 'http://127.0.0.1:7000/invocations' \

    --top_n 10

Monitoring

另外还有一个实时监控的工作流 monitoring ,用于实时计算监控数据,并存入数据库中,实时监控模型选出股票的效果。

目前包含两个任务,通过 Apache DolphinScheduler 每隔10秒调度启动:

• spot:持续更新市场全貌数据,包括涨跌幅,涨速等

• kline: 从K线层面持续对模型选择的股票的表现进行收益跟踪

基于ApacheDolphinSchedule打造机器学习智能选股系统

spot

定时监控全貌的整体数据,包括获取全部股票当下时间点的涨跌幅,涨速等。

基于ApacheDolphinSchedule打造机器学习智能选股系统

Kline

定时监控昨天选出来的股票在最新交易日的实时收益走势。

基于ApacheDolphinSchedule打造机器学习智能选股系统

前端模块介绍

前端目前用Observable来进行展示,也可以使用Juptyer来替代。

Observable

Observable是一个以JavaScript基础进行拓展来做计算的Notebook,能够让很多不懂代码的数据分析,可视化工作人员简单、低成本地使用强大可视化工具的产品,个人版免费。

目前主要展示了三个图示来监控模型效果:

• 每日推荐股票的实时情况

• 每日推荐股票的实时涨跌分布

• 市场全貌的实时涨跌分布

每日推荐股票的实时情况

通过join 每日推荐股票标,以及市场全貌标来展示,每5秒刷新一次,下面两段代码可以直接copy到Observable的Notebook中即可:

// 获取数据

stockDatas = {

  let i = 0;

  while (true) {

    try {

      const data = await db.query("SELECT a.date, a.y_pred, a.score, b.* FROM candidate a LEFT JOIN spot b ON a.code = b.code WHERE a.y_pred=1;")

      yield Promises.delay(5000, data);

    } catch(e){

    console.log(e.name + ":" + e.message);

    }

  }

}

// 转成表格

stocks = Inputs.table(stockDatas)

基于ApacheDolphinSchedule打造机器学习智能选股系统

每日推荐股票的实时收益曲线

用过Observable其中自带的曲线图来表示

Plot.plot({

  marks: [

    Plot.ruleY([0]),

    Plot.lineY(changesDatas, {x: "time", y: "changes"})

  ]

})

基于ApacheDolphinSchedule打造机器学习智能选股系统

每日推荐股票的实时涨跌分布

用过Observable其中自带的柱状图来表示:

Plot.plot({

  marks: [

    Plot.rectY(stockDatas, Plot.binX({y: "count"}, {x: "涨跌幅"})),

    Plot.ruleY([0])

  ],

})

基于ApacheDolphinSchedule打造机器学习智能选股系统

市场全貌的实时涨跌分布

用过Observable其中自带的柱状图来表示:

// 从数据库中读取数据

spotDatas = db.query("SELECT * FROM spot;")

复制

// 画图

Plot.plot({

  marks: [

    Plot.rectY(spotDatas, Plot.binX({y: "count"}, {x: "涨跌幅"})),

    Plot.ruleY([0])

  ],

  x: {ticks: 20}

})

基于ApacheDolphinSchedule打造机器学习智能选股系统

本文展示了使用Apache DolphinScheduler调度机器学习系统中的各类模块,Observable作为前端展示与可视化能力的智能股票选股系统,希望能给大家带来以下收获:

  1. 了解如何使用Apache DolphinScheduler构建一个选股系统。
  2. 了解Apache DolphinScheduler灵活简单编排MLOps场景下各个模块的方法。
  3. 了解Apache DolphinScheduler MLflow组件赋予用户的0成本使用机器学习能力。
  4. 了解Observable这个强大的可视化产品的能力,以及与Apache DolphinScheduler的结合使用。