构建端到端的联邦学习 Pipeline 生产服务
来源:https://www.infoq.cn/article/OX5x5RfMFPrD1CHlgMNh
导读:本次分享的主题为构建端到端的联邦学习 Pipeline 生产服务。联邦学习的优势在于能够保证参与各方在数据不出本地,保持数据独立性的情况下,多方共建模型,共同提升机器学习效果。联邦机制下,安全隐私有了优势,但技术上也会面临更多挑战。作为一个工业级的框架,端到端的联邦学习 Pipeline 致力于完成高弹性、高性能的联邦学习任务,主要包括建模、训练、模型管理、生产发布和在线推理几个方面。本次将为大家分享如何灵活调度管理复杂的联邦学习任务、可视化联邦建模的实现以及在线联邦推理服务的思考与实践,解决实验性机器学习到实际生产应用落地的难点。
主要分享 4 个方面:
- 背景介绍
- 高弹性联邦学习 Pipeline 调度
- 联邦学习任务可视化
- 高性能联邦学习在线推理服务
我们在日常建模过程中,会遇到哪些需求和痛点?
1. 机器学习任务编排
我们是如何编排机器学习任务的:
- 塞满逻辑的 Python 脚本。把机器学习的一些步骤,如特征工程、预处理、验证等,写成一个符合逻辑的 Python 脚本。
- 步骤要并行、要嵌套。通过多线程、多进程等手段来完成。
- N 个业务需要 N 个 Script。生产中我们有非常多的业务,每个业务采用的算法也不同,所以会有多个 Script。
- 外部系统对接。一般其他平台在发起自动化机器学习平台任务时,系统对接上是非常困难的,毕竟还没有那么智能,那么如何更好的写好 Python 脚本呢?
2. 机器学习任务状况观察
当我们千辛万苦的把任务运行起来,还需要不断观察任务的运行情况,运行到哪一步?每一步的数据输出是什么样的?指标输出是什么样的?Loss、AUC 等指标的趋势?然后,任务跑完了吗?
我们该如何观察到各种趋势,来尽可能做参数的调整等操作。这里要大家可以想想,平时是如何观察机器学习任务状态的呢?
如果是我,可能就会看日志。那有没有需求是看日志不能解决的?如果有,那就加几行日志!^_^
3. 联邦学习任务协同建模
刚刚介绍的都是一般机器学习任务中会遇到的挑战,在联邦机器学习下,因为涉及多方协同,会遇到更多的挑战:
- 启动多方任务。如需要跟多方协调启动任务,时间难同步。
- 各方的运行状况。需要通过通信工具,同步任务进度。
- 多方日志、排查问题。如日志是散落的,各个操作系统遇到的问题也不一样。
- 多方任务管理。可能非联邦学习的建模只需要 1 个人,而联邦机制下需要 3 个人或者更多。
接下来分享下 FATE 是如何尝试解决这样的问题,以及当前的方案和 Pipeline 调度。
1. 端到端的联邦学习 Pipeline
这是一个比较典型的纵向联邦学习 Pipeline 的例子:
交集 ( intersect ) -> 联邦统计 -> 联邦特征处理 -> Training -> 验证 -> 模型保存 -> 模型发布 ( 发布到线上服务 FATE-serving,后面会介绍 )
2. FATE-Flow:端到端的联邦学习 Pipeline 调度平台
① FATE-Flow 功能
FATE-Flow 是我们自研的联邦学习调度平台,主要有 5 项功能:
- 用 DAG 定义联邦学习 Pipeline。DAG 具有比较灵活、弹性的特点,是业界工作流调度系统常用的方式;在联邦学习场景下,DAG 是有难点的,由于协作的机制,我们的 Pipeline 是非对称的;我们在设计 DSL 时,考虑更多系统化对接的情况,所以我们采用了 json 格式的 DAG DSL,然后再采用 DSL-Parser 进行解析。
- 联邦任务生命周期管理。对应刚刚所说的痛点,对联邦任务生命周期进行管理,如多方启停、状态检测。
- 联邦任务协同调度。包括:多方任务队列、分发任务、状态同步等协同调度。
- 联邦任务输入输出实时追踪。这个功能比较普遍,一般的调度平台都会有,包括:数据、模型、自定义指标、日志等的实时记录存储。
- 联邦模型管理。联邦学习模型的管理存在一个很大的问题,尤其在纵向联邦学习场景下,就是如何保证多方模型的一致性。
② FATE-Flow 架构
FATE-Flow 架构:
- DSL Parser:是调度的核心,通过 DSL parser 可以拿到上下游关系、依赖等。
- Job Scheduler:是 DAG 层面的调度,把 DAG 作为一个 Job,把 DAG 里面的节点 run 起来,就称为一个 task。
- Federated Task Scheduler:最小调度粒度就是 task,需要调度多方运行同一个组件但参数算法不同的 task,结束后,继续调度下一个组件,这里就会涉及到协同调度的问题。
- Job Controller:联邦任务控制器
- Executor:联邦任务执行节点,支持不同的 Operator 容器,现在支持 Python 和 Script 的 Operator。Executor,在我们目前的应用中拉起 FederatedML 定义的一些组件,如 data io 数据输入输出,特征选择等模块,每次调起一个组件去 run,然后,这些组件会调用基础架构的 API,如 Storage 和 Federation Service ( API 的抽象 ) ,再经过 Proxy 就可以和对端的 FATE-Flow 进行协同调度。
- Tracking Manager:任务输入输出的实时追踪,包括每个 task 输出的 data 和 model。
- Model Manager:联邦模型管理器
3. DAG 定义联邦学习 Pipeline
左边为我们的 DSL,它的结构比较简单,我们可以定义一串 Component,通过 Parser 解析出 DAG 图(如右图,可以清晰地看到整个算法流程的架构)。
构建 DSL 只需要三步:
① Module:模型组件,FATE 当前支持 11 个模型组件,基本满足当前 FATE 所支持的所有算法。
② Input:
- data:数据输入
- model:模型输入
- isometric_model:异构模型,当前只用于 Feature Selection
③ Output:
- data:输出输出
- model:模型输出
可参考下面的例子:
构建 DSL 示例
DSL 怎么工作的呢?它是一个非常酷的模块,就像人类的大脑,它是 FATE-Flow 的中心:
- 组件初始化:
① 根据 DSL 定义和任务配置,解析每个 Component 运行参数
② 分析 DSL 定义 data、model 输入输出,提取依赖关系
- DAG 图:
① 构建依赖关系邻接表
② 拓扑排序进行 DAG 依赖检测,因为用户定义的 DSL 不一定是有效的
- 调度协作:
① 实时输出 Component 无依赖上下游
② Component 依赖度自动递减
- 预测 DSL 推导:
剔除预测阶段无用 Component 数据,模型依赖传递,推导预测 DSL
4. 联邦学习任务多方协同调度
联邦学习任务多方协同调度的流程:
首先,是以任务提交的一种方式,提交任务到 Queue,然后 JobScheduler 会把这个任务拿出来给到 Federated TaskScheduler 调度,Federated TaskScheduler 通过 Parser 取得下游 N 个无依赖的 Component,再调度 Executor ( 由两部分组成:Tracking Manager 和 Task ) 执行,同时这个任务会分发到联邦学习的各个参与方 Host。联邦参与方取得任务,如果是 New Job,则放入队列(参与方会定期调度队列中的 Job),否则启动多个 Executor 执行,Executor 在 run 的过程中,会利用 Federation API 进行联邦学习中的参数交互,对一个联邦学习任务,每一方的 Job id 是保持一致的,每跑一个 Component,它的 Task id 也是一致的。每个 Task 跑完 Initiator TaskScheduler 会收集各方的状态,进行下一步的调度。对于下一步的调度策略我们支持:all_succss,all_done,one_succss 等策略。由于我们基于 Task 为最小的调度单位,所以很容易实现 rerun,specified_task_run 等特定运行。
5. 联邦任务多方生命周期管理
分以下几个部分:
- Task stat:Task 状态信息,如启动时间、运行状态、结束时间、超时时间等
- Task run process:Task 运行进程
- Life cron checker:Task 生命周期定时检测
- Job controller:联邦任务控制器
- Shutdown:kill process、清理任务以及同步指令到所有联邦参与方,保证联邦任务状态一致性
启动 Shutdown 的条件:
- 若 Task 运行时间超过配置超时时间或默认超时时间(一般较长),启动 Shutdown
- 若 Task 运行进程异常终止,启动 Shutdown
- 若 Task 正常运行终止,启动 Shutdown
6. 联邦任务输入输出实时追踪
联邦任务输入输出实时追踪,首先会有几个 Definition 定义:
- metric type:指标类型,如 auc,loss,ks 等等
- metric namespace:自定义指标命名空间,如 train,predict
- metric name:自定义指标名称,如 auc0,hetero_lr_auc0
- metric data:key-value 形式的指标数据
- metric meta: key-value 形式的指标元信息,支持灵活画图
目前的 API 只有 4~5 个:
- log_metric_data(metric_namespace, metric_name, metrics)
- set_metric_meta(metric_namespace, metric_name, metric_meta)
- get_metric_data(metric_namespace, metric_name)
- get_metric_meta(metric_namespace, metric_name)
可能以前收集指标需要经过收集日志等一系列操作,任务像一座座大山一样摆在面前,现在则有可能成为我们的摇钱树,因为我们可以快速的收集各种指标,提交给需求方。
7. 联邦模型管理
左图中的两桶“大饼”,分别代表了某一方的模型,每一个“大饼”则代表了每个组件的 model,如:Dataio、FeatureBinning、FeatureSelection、FeatureTransform、HeteroLR、Pipeline。这里需要做个 Model Binding 模型的绑定,FATE-Flow 的做法还是比较简单的,我们会给每套模型赋予一对标志符 model_id 和 model_version 来唯一标识模型,model_id 由用户自定义的 role 和 party_id 及 model_key 拼接而成,model_version 也是可以自定义的,如果不自定义的话,会默认为 job_id。我们会有一个命名为 Pipeline 的模型存储 Pipeline 建模 DSL 及在线推理 DSL。
下面是某个算法模型数据结构的示例:
示例 1
示例 2
同时每个“大饼”算法模型,也由两部分组成:ModelParam 和 ModelMeta,也就是参数和元的信息。
8. 联邦模型版本管理
模型版本管理我们参考了 Git 的实现思路,但是我们没有做的那么复杂,是基于多叉树版本的记录:
- 支持 commit message;
- 支持分支功能,如 experiment,product,release;
- 支持 tag,如 release;
- 支持 history 查看;
- 支持版本回溯,指定某一版本回滚。
9. 使用样例
上图为,FATE-Flow 的简单使用样例,主要就是使用 FATE-Flow CLI 提交一个 Job,需要提供 Job 的 DSL 描述以及配置文件,那么 FATE-Flow Server 会返回该 Job 的一些必要信息,尤其唯一 Job Id 比较重要。后面则是查询 Job 状态以及停止 Job 的操作指令,CLI 还支持许多丰富的指令,可以参考 github 上的文档。
第三部分介绍下联邦学习建模可视化:
1. FATE-Board
大体的架构如右图,有一些 Job DashBoard 和可视化,两个基本的管理和上面的 Web UI。
FATE-Board 作为 FATE 联邦建模的可视化工具,旨在跟踪和记录联邦建模全过程的信息,并通过可视化的方式呈现模型训练过程的变化以及模型训练结果,帮助用户简单而高效地深入探索模型与理解模型。
2. 建模交互及可视化
00:00 / 00:40
1.0x
- 2.0x
- 1.5x
- 1.25x
- 1.0x
- 0.5x
网页全屏
全屏
00:00
Demo
其基本步骤为:
① 用户配置 pipeline,建立 graph、定义 parameters 等;
② 用户提交 job,返回 job URL,同时启动 job 运行,进入 web 端查看 fateboard;
③ 观察 job 运行状态,查看运行时的统计信息,包括运行进度、日志、模型过程输出等;
④ 查看 job 运行完成的结果,包括模型输出、模型评分、日志等内容及可视化结果。
第四部主要讲怎么发挥模型的最大价值,我们构建了高性能的联邦学习在线推理服务:
1. FATE-Serving
FATE-Serving 设计原则:
- 高性能,基于 GRPC 协议,批量联邦请求,联邦参与方模型结果多级缓存
- 高可用,无状态设计,异常降级功能
- 高弹性,模型 & 数据处理 App 动态加载
FATE-Serving 模块:
- Online FederatedML:高性能在线联邦学习算法包,在线和离线时性能是不一样的,在线的时候,我们没有采用 Python,当前版本是采用 Java 写的一套联邦学习算法。
- Online Federated Pipeline:在线联邦推理 Pipeline,比较简洁高效,后面会详细介绍。
- Dynamic Loaders:推理节点动态加载器。目前支持模型和 APP 两种推理节点,节点通过训练或者编译等方式创建后是序列化后存储在分布式存储上的。对于在线服务,每个请求都需要经过每个节点,为了保证执行效率,需要把这些节点提前从存储中取出缓存在 Server 的内存空间,这也是业界在线服务的常用做法。
- Model Manager:在线模型管理器
- Processing-App Factory:数据处理节点工厂。在解决实际业务问题时,还需在模型预测的基础上在 Pipeline 中引入一些人工或者规则,打包成一个 App 发布到 Serving,在预测的时候加入一个前处理 Processing 和一个后处理 Processing。
- Multi-Level Cache:多级缓存管理器
- Snapshot Manager:快照管理,定期将在线模型、Processing-App 信息落库。
2. 在线联邦模型管理
在线联邦模型管理,我们提供了两个 API:pulish load req 和 pulish online AB Test。为什么是两个 API?因为,Pulish load req 过程只是把整个模型通过 dynamic loaders 得到一个 model object 放入 model pool 中,是一个内存的池子。联邦各方的 model id 和 model version 都是保持一致的(在发布模型的时候,只需要在 Guest 侧发起就可以了,会自动走 proxy,把命令推到其他的 Host 中去),当各方都加载完之后,就可以应用了。
Pulish online AB Test 主要是做 default binding,比如有些业务方并不知道我们的模型版本,需要有一个默认的机制。
图中右下角,为动态加载器的一个基本功能。
3. 在线联邦推理 Pipeline
在线联邦推理 Pipeline 流程:
Req -> Service -> model selection ( 有三种不同策略:规定的方式和两种默认的方式 ) -> loader -> processing ( 前处理 ) -> inference Pipeline ( 是纵向 model,会经过不同的组件,这里举例是三个,然后会把 user_id (用来做对齐的)发到不同的参与方,参与方会拿自己的半模型,进行最后的预测,最后把结果发到 Federated Network ) -> Processing ( 后处理 )
4. 在线推理服务缓存
在线推理服务缓存分为三种:
- Inference result in-process cache:cache key 为 caseid,缓存 120s。
- Remote federated result in-process cache:cache key 为 userid,缓存 600s。
- Remote federated result distributed cache:cache key 为 userid,缓存 24h,跟 A 方的特征更新周期有很大的关系。
在业务的运行当中,按照这几个配比,一天下来发现命中远端联邦推理结果 Cache 的请求量占了 28%,也就是说有 28%的联邦请求都不需要这部分,就会大大的增加了用户的体验。
我们还有一个缓存唤醒的热身过程,我们会跑一些离线任务,然后对活跃度进行统计,把要缓存的用户按比例提取出来,在凌晨的时候发一些批次请求,让整个缓存热身。
5. 联邦模型应用生产服务流程
一个联邦模型应用到生产服务的大致流程:
① 全量加载联邦模型,通过 pulish load req 全量加载模型,每方的 Serving 都加载这个模型。
② 灰度上线联邦模型,pulish online AB test 可以指定多少用户上线 online 模型。(这时就会体现出为什么要做 pulish load req,因为在 Guest 方灰度上线时,这个 Serving 请求的 user_id 落到其他参与方,需要找到对应的模型,但是通常情况下 Serving 都是做负载均衡的,如果灰度这部分的 Serving 已经上线了新的模型,其它 Host 方没有预先加载这个模型,就会找不到这个模型,也就是说模型是被所有 Serving 所 load 的,但是并不是所有 Serving 当前生效的都是这个模型。)
③ 联邦模型效果验证
④ 全量上线联邦模型
作者介绍:
曾纪策
微众银行 | 人工智能系统架构师
本文来自 DataFun 社区
原文链接: