Apache DolphinScheduler 已经在DataOps领域提供了强大的分布式可视化工作流调度能力。2022年,我们为其新增了机器学习任务调度的能力,逐步开箱即用式地支持主流的MLops项目/服务商的功能。
Apache DolphinScheduler 目前已经支持的MLOps工具包括MLflow,DVC,Jupyter,OpenMLDB等任务组件,可以让用户低成本,更容易地编排机器学习系统。
本文为大家介绍如何使用Apache DolphinScheduler来打造一个机器学习选股系统,每日自动更新选股模型,智能选股,在交易时间持续监控模型选股效果。
Apache DolphinScheduler
1
系统介绍 概况
01
概况
系统展示
系统展示如下,为2022年6月24的情况:
- Machine learning stock picking daily dashboard
- 为2022-06-23晚上选出的股票,在2022年6月24的实时表现,包括涨跌幅,涨速等信息
- The real-time average returns of the top 10 stocks are as follows
- 实时展示了选出的10个股票在日内平均的收益走势情况
- The distribution of the ups and downs of stocks selected by the system
- 实时展示选出的10个股票的涨跌幅分布
- The distribution of gains and losses across the stock market
- 实时展示了整个市场的股票的涨跌幅分布




系统运行原理
选股逻辑
计算整个股票市场中符合 五日均线高于10日均线信号 的股票作为标的池,对于上述股票池中的每个股票构建以下特征用于训练模型:
1. 股价与布林带三条轨道的相对值
2. 股价与多条均线的相对值;多条均线的多个间隔的斜率
3. 当前K线的形态,用talib pattern计算,如是否为三只乌鸦,是否为十字星等,详情可见K线模式识别(https://www.jianshu.com/p/fd5c7f49db33)
备注:你也可以加上任何你认为有用的信息作为特征。
模型训练
以第二天是否上涨为准进行二分类,使用AutoML工具flaml进行5分钟训练,每天构建120天的数据集,其中后7天用于评估。
胜率与盈亏比得到的效果如下:
• 单纯使用五日均线高于10日均线买卖时:胜率0.46,盈亏比 1.15,期望约为-0.011;
• 以五日均线高于10日均线买卖为信号加上机器学习模型后,胜率为0.58,盈亏比为1.35,期望约为0.363。
以上为同时期不加模型策略与加模型策略的对比,期望从 -0.011 提升到 0.363。
备注:以上对比为2022-06-23当日模型训练(训练集22万条,测试集1.7万条)完后的评估指标,其中模型每天选择置信度前10(可以认为是模型认为上涨概率最大的前10)的股票;

实盘的表现,可见系统展示中2022-06-24的实时表现。
任务调度
系统中所有的任务调度均使用Apache DolphinScheduler完成,包括 数据处理,特征工程,模型训练,模型评估,模型上线,批量实时预测,效果监控 等。
Apache DolphinScheduler可以 定时每天晚上自动更新模型,并上线进行预测。 在当天模型表现不好时,可以调参调特征 一键重新训练新的模型 并评估上线。
还有其对任务的容错机制,可以保证系统能够稳定运行。
前端展示
前端展示使用Observable实现,得益于其Notebook丰富和易用的可视化数据分析特性,构建出监控实时选股系统的效果监控。
技术栈
主要涉及的技术能力如下:

关于量化知识,因用户水平不同可以进行不同的选股逻辑和特征工程。
该系统旨在帮助用户构建符合自己认知的选股系统(当你有很多的选股参考信息,又不知道如何组合才能更好地做决策时)。
项目中涉及到的工作流任务的实现可以在这里找到 GitHub - Apache DolphinScheduler-MLOps-Stock-Analysis(https://github.com/jieguangzhou/DolphinScheduler-MLOps-Stock-Analysis)
02
模块介绍
模块介绍章节涉及到整个系统每个模块具体的实现代码与细节,你也可以根据阅读习惯,先跳到文章最后总结章节看完再回来这里继续阅读。
后端模块介绍
下图所示是一个整体的工作流 run_system,该工作流每天晚上定时启动,进行数据更新,模型训练,模型评估,模型部署和推理,然后推荐每天的股票池。
run_system 工作流包含4个子工作流:
• prepare_datas : 每日数据*载下**,信号计算,特征(在量化交易中成为因子)计算
• training_model:生成训练数据,训练模型,对模型进行评估
• deployment:部署模型
• batch_inference:生成要批量算法预测的股票,并进行预测。

prepare_datas
下图所示是数据准备的工作流 prepare_datas,该工作流会*载下**股票数据并进行信号计算和特征计算:
• download_data : *载下**全市场的股票日线数据
• calc_signals: 进行信号计算(计算每天符合信号条件的股票,如每天5日均线与10日均线金叉的股票)
• calc_features: 特征(量化交易中称为因子)计算(计算每个股票每天的特征值,如收盘价与5日均线的相对值,股票是否是十字星形态等)

download_data
任务类型为Shell 任务类型,用于*载下**股票市场的日线数据。

# 添加项目路径,为Git clone 项目Apache DolphinScheduler-MLOps-Stock-Analysis 下来后地址,方便Python直接在别的目录运行该项目的脚本
export PYTHONPATH=${project}
# 激活Python环境
source ${project}/env/bin/activate
# 设置股票数据*载下**的路径
data_path=${project}/data/daily
# *载下**数据到指定路径
python -m dmsa.data.download ${data_path}
其中自定义参数中的设置的意思为启动工作流时传入project参数的值,即可替换shell中的${project},如传入/home/user/Apache DolphinScheduler-MLOps-Stock-Analysis,则第一行会变成 export PYTHONPATH=/home/user/Apache DolphinScheduler-MLOps-Stock-Analysis,下文如无特殊,将不再介绍该Apache DolphinScheduler的特性。
calc_signals
任务类型为Shell 任务类型,根据前一个任务中*载下**完的数据计算所有的信号。

# 添加项目路径,为Git clone 项目Apache DolphinScheduler-MLOps-Stock-Analysis 下来后地址,方便python直接在别的目录运行该项目的脚本
export PYTHONPATH=${project}
# 激活python环境
source ${project}/env/bin/activate
# 设置股票数据*载下**的路径
data_path=${project}/data/daily
# 根据feature_signal.txt的配置,计算信号,详情可以见项目中的实现
python -m dmsa.data_processing.calc_signals \
--data_path ${data_path} \
--name_file ${project}/feature_signal.txt
计算好的信号会自动存在Mysql中,Mysql的配置可以参考Apache DolphinScheduler-MLOps-Stock-Analysis(https://github.com/jieguangzhou/DolphinScheduler-MLOps-Stock-Analysis),进行简单配置即可。
calc_features
任务类型为Shell 任务类型,根据前一个任务中*载下**完的数据计算所有的特征。

# 添加项目路径,为Git clone 项目Apache DolphinScheduler-MLOps-Stock-Analysis 下来后地址,方便python直接在别的目录运行该项目的脚本
export PYTHONPATH=${project}
# 激活python环境
source ${project}/env/bin/activate
# 设置股票数据*载下**的路径
data_path=${project}/data/daily
# 根据feature_signal.txt的配置,计算特征,详情可以见项目中的实现
python -m dmsa.data_processing.calc_features \
--data_path ${data_path} \
--name_file ${project}/feature_signal.txt
计算好的特征会自动存在Mysql中,Mysql的配置可以参考Apache DolphinScheduler-MLOps-Stock-Analysis(https://github.com/jieguangzhou/DolphinScheduler-MLOps-Stock-Analysis),进行简单配置即可。
如果需要更强大的特征计算与存储能力,可以使用Apache DolphinScheduler中的OpenMLDB组件(https://dolphinscheduler.apache.org/zh-cn/docs/dev/user_doc/guide/task/openmldb.html进行修改即可(可参考Apache DolphinScheduler OpenMLDB Task:打造端到端MLOps工作流 https://openmldb.ai/docs/zh/main/use_case/dolphinscheduler_task_demo.html)。
training_model
下图为工作流 training_model 的各个任务的DAG图,该工作流主要执行以下任务:
• prepare_data : 准备模型训练数据和模型评估数据
• training:训练模型
• Deployment: 部署刚刚训练的模型用户推理评估数据
• evaluate:评估模型,因为该评估的指标为胜率以及盈亏比,所以进行另外的计算评估。
• close_service:评估完成后关闭刚刚部署的服务

prepare_data
任务类型为Shell 任务类型,准备训练数据用于训练模型,准备测试数据用于评估。

export PYTHONPATH=${project}
source ${project}/env/bin/activate
save_data_path=${project}/data/training
# 生成数据集
python -m dmsa.data_processing.build_datas \
--task_type train \
--config ${project}/feature_signal.txt \
--save_path ${save_data_path} \
--data_path ${project}/data/daily
# 把生成的数据集所对应的路径 save_data_path 赋值到变量 data_path 中,并通过自定义参数设为OUT,传递给下游任务
echo "#{setValue(data_path=${save_data_path})}"
training
该任务类型为MLflow任务类型,由Apache DolphinScheduler内置提供,可以用于输入一份数据集,自动进行模型训练。

上图表示,使用AutoML根据传入的数据(csv格式)进行120秒训练,AutoML的工具使用flaml。设置模型实验名称为baseline,模型将上传至MLflow服务中心,并注册模型名字为baseline。
更多的特性,可以查看Apache DolphinScheduler MLflow 组件(https://dolphinscheduler.apache.org/zh-cn/docs/dev/user_doc/guide/task/mlflow.html)。
Deployment

该任务类型为MLflow任务类型,由Apache DolphinScheduler内置提供,可以部署MLflow服务中心的模型。
上图表示,使用将模型名字为baseline,版本号为Production的模型,以Docker形式部署并保留7070端口。
更多的特性,可以查看Apache DolphinScheduler MLflow 组件(https://dolphinscheduler.apache.org/zh-cn/docs/dev/user_doc/guide/task/mlflow.html)。
evaluate
任务类型为Shell 任务类型,用户请求服务获取评估数据的结果并进行评估。

export PYTHONPATH=${project}
source ${project}/env/bin/activate
# 表示每天选择分数最高的3个股票,测试效果
python -m dmsa.evaluate.calc_evaluate \
--source_data_path ${data_path}/source.csv \
--evaluate_data ${data_path}/test.csv \
--api 'http://127.0.0.1:7070/invocations' \
--top_n 3
close_service
任务类型为Shell任务类型,用与关闭刚才启动的模型服务
# 刚才的模型服务会运行一个docker容器提供服务,容器名字会统一命名为 ds-mlflow-{model_name}-{model_version}
# 因此可以简单通过一行命令关闭服务
docker rm -f ds-mlflow-baseline-Production
Deployment
与上面的Deployment几乎一样,另外配一个7000端口用于推理服务。

batch_inference
工作流 batch_inference 用于对需要在线跑的任务进行推理,包含两个任务:
• build_inference_data:生成需要推理的数据
• inference: 调用接口进行推理

build_inference_data
任务类型为Shell 任务类型,用于生成需要推理的数据。

export PYTHONPATH=${project}
source ${project}/env/bin/activate
save_data_path=${project}/data/inference.csv
# 用于生成需要推理的数据
python -m dmsa.data_processing.build_datas \
--task_type inference \
--config ${project}/feature_signal.txt \
--save_path $save_data_path
echo "#{setValue(data_path=${save_data_path})}"
inference
任务类型为Shell 任务类型,根据上一个任务中生成的推理数据进行推理,并将分数前10的股票存到数据库中。

export PYTHONPATH=${project}
source ${project}/env/bin/activate
# 推理,服务地址为 http://127.0.0.1:7000/invocations, 7000是刚才配的端口,invocations是MLflow服务默认的路径
python -m dmsa.evaluate.inference \
--evaluate_data ${data_path} \
--api 'http://127.0.0.1:7000/invocations' \
--top_n 10
Monitoring
另外还有一个实时监控的工作流 monitoring ,用于实时计算监控数据,并存入数据库中,实时监控模型选出股票的效果。
目前包含两个任务,通过 Apache DolphinScheduler 每隔10秒调度启动:
• spot:持续更新市场全貌数据,包括涨跌幅,涨速等
• kline: 从K线层面持续对模型选择的股票的表现进行收益跟踪

spot
定时监控全貌的整体数据,包括获取全部股票当下时间点的涨跌幅,涨速等。

Kline
定时监控昨天选出来的股票在最新交易日的实时收益走势。

前端模块介绍
前端目前用Observable来进行展示,也可以使用Juptyer来替代。
Observable
Observable是一个以JavaScript基础进行拓展来做计算的Notebook,能够让很多不懂代码的数据分析,可视化工作人员简单、低成本地使用强大可视化工具的产品,个人版免费。
目前主要展示了三个图示来监控模型效果:
• 每日推荐股票的实时情况
• 每日推荐股票的实时涨跌分布
• 市场全貌的实时涨跌分布
每日推荐股票的实时情况
通过join 每日推荐股票标,以及市场全貌标来展示,每5秒刷新一次,下面两段代码可以直接copy到Observable的Notebook中即可:
// 获取数据
stockDatas = {
let i = 0;
while (true) {
try {
const data = await db.query("SELECT a.date, a.y_pred, a.score, b.* FROM candidate a LEFT JOIN spot b ON a.code = b.code WHERE a.y_pred=1;")
yield Promises.delay(5000, data);
} catch(e){
console.log(e.name + ":" + e.message);
}
}
}
// 转成表格
stocks = Inputs.table(stockDatas)

每日推荐股票的实时收益曲线
用过Observable其中自带的曲线图来表示
Plot.plot({
marks: [
Plot.ruleY([0]),
Plot.lineY(changesDatas, {x: "time", y: "changes"})
]
})

每日推荐股票的实时涨跌分布
用过Observable其中自带的柱状图来表示:
Plot.plot({
marks: [
Plot.rectY(stockDatas, Plot.binX({y: "count"}, {x: "涨跌幅"})),
Plot.ruleY([0])
],
})

市场全貌的实时涨跌分布
用过Observable其中自带的柱状图来表示:
// 从数据库中读取数据
spotDatas = db.query("SELECT * FROM spot;")
复制
// 画图
Plot.plot({
marks: [
Plot.rectY(spotDatas, Plot.binX({y: "count"}, {x: "涨跌幅"})),
Plot.ruleY([0])
],
x: {ticks: 20}
})

本文展示了使用Apache DolphinScheduler调度机器学习系统中的各类模块,Observable作为前端展示与可视化能力的智能股票选股系统,希望能给大家带来以下收获:
- 了解如何使用Apache DolphinScheduler构建一个选股系统。
- 了解Apache DolphinScheduler灵活简单编排MLOps场景下各个模块的方法。
- 了解Apache DolphinScheduler MLflow组件赋予用户的0成本使用机器学习能力。
- 了解Observable这个强大的可视化产品的能力,以及与Apache DolphinScheduler的结合使用。