Skip to content

四火的唠叨

一个纯正程序员的啰嗦

Menu
  • 所有文章
  • About Me
  • 关于四火
  • 旅行映像
  • 独立游戏
  • 资源链接
Menu

一种工作流心跳机制的设计

Posted on 04/28/201606/23/2019 by 四火

最近工作中一直和 SWF(Amazon 的 Simple Work Flow)打交道,在一个基于 SWF 的工作流框架上面开发和修 bug。SWF 的 activity 超时时间是 5 分钟,在 activity task 开始执行以后,activity worker 需要主动发送心跳请求告知 service 端:“我还活着,我还在干活”,如果出现超过 5 分钟(可以配置)没有心跳,SWF 的 service 端就认为,你已经挂了,我需要把这个 activity 安排到别的 activity worker 上来执行了。借用 AWS 官网的一张图:

heartbeat

每台机器上有若干个 activity task 在被执行。可以看到,在 activity 任务启动起来以后,需要用不断的心跳来告知 service 端任务还在进行,activity worker 还活着。这个 “汇报” 需要 activity worker 所在的 host 主动进行,这也是 SWF 的 service 端无状态(几年前写过一点东西介绍它)的基本要求之一。任务都是由 worker 端去 pull 的,这些行为也都是 worker 端主动触发的。

这个机制描述起来很简单,但是实际在相关设计实现的时候,有许多有趣和值得琢磨的地方。

在我手头的这个 workflow 里面,心跳机制是这样实现的:

  • 有两个 queue,一个是 main queue,是 dequeue(双端队列);另一个是 backup queue,普通队列。二者都是用来存放需要发送心跳的 activity 信息(heartbeatable 对象)。
  • 每秒钟都尝试执行这样一个方法 A:从 main queue 里面 poll 一个 heartbeatable 对象(如果 queue 为空就忽略本次执行),检查该心跳所代表的 activity task 是否还在工作,如果是,就发送一个心跳。发送成功以后,就把这个 heartbeatable 对象扔到 backup queue 里面去。这样,一秒一个,逐渐地,main queue 的 heartbeatable 对象全部慢慢被转移到 backup queue 里去了。
  • 每隔两分钟(称为一个 cycle)执行方法 B:把 backup queue 里面所有的 heartbeatable 对象全部转移到 main queue 里去,于是就又可以继续执行上面一步的逐个心跳逻辑。

这个机制的基本好处是,所有 activity task 的心跳统一管理,通常情况下保证了心跳不会过快(默认配置下是一秒一个,或者不发送),同时保证了没有谁会被遗漏:

double dequeue mechanism

但是,这里又会浮现好多好多问题:

为什么要使用两个 queue?

首先,有这样一个事实:方法 A 在执行的时候,理论上每秒钟会执行一次,但是这里并没有强制的保证,使得前一秒的 A 执行一定会在这一秒的 A 开始之前完成。换言之,它们的理论启动时间是按序的,但是实际启动时间和实际的心跳执行时间是不定的,需要处理并发的情形。而到底最多可能存在多少个执行 A 的线程并行,取决于用于此心跳功能的线程池的配置。因此,在执行和判断的过程中,需要对当前 poll 出来的 heartbeatable 对象加锁。

使用两个 queue,这主要是为了记录在本次 cycle 里面,能够很容易判断某一个 heartbeatable 对象是否已经完成心跳行为。还没有完成心跳的,都在 main queue 里;完成了的,都放到 backup queue 里。

如果使用一个 queue,那么也是有解决方案的:

  • 有一个公共计数器,每个 cycle 开始的时候,给计数器+1。
  • 每一个 heartbeatable 对象自身需要携带一个私有计数器,用以标识当前这轮 cycle 的心跳是否已经完成。
  • 每次完成的 heartbeatable 对象给自己的计数器+1 以后扔到队尾;每次 A 取新的 heartbeatable 对象的时候从队首取。
  • 如果取到的对象自己的计数器已经等于公共计数器的数值,说明整个 queue 里面的对象心跳都已经完成了。

当然,这种方法的弊端在于,判断是否还需要发送心跳这件事情,不仅需要从 queue 里取对象,还要判断对象的计数器数值,明显比两个 queue 的解决方案复杂和开销大。因为两个 queue 的解决方案下,只需要尝试从 main queue 里面取对象就好,取不到了就说明本次 cycle 里没有需要发送心跳的对象了。看起来是多了一个 queue,但是方案其实还是简单一些。

心跳的频率保持在多久为好?

显然不是越高越好,不只是成本,因为心跳也是需要消耗资源的,比如 CPU 资源;而且,心跳在 service 端也有 throttling,当前 activity worker 发起太频繁的心跳,当前心跳可能被拒,还可能会让别的 activity worker 的正常心跳被拒了。

我们要解决的最核心问题是,正常情况下,必须保持上限 5 分钟内能发起一次成功心跳就好。

要这么说,尽量增大 cycle,那我设计一个每隔 5 分钟就执行一次的定时器就好了。但是问题没那么简单,首先要考虑心跳的发起不一定成功。如果在接近 5 分钟的时候才去尝试发起心跳,一旦失败了,也没有时间重试了。因此,要 trade-off。比如,配置 cycle 为 120 秒,这样的好处是,5 分钟的超时时间内,可以覆盖 1~2 个完整的 cycle。如果 cycle 配置为 3 分钟,那么 5 分钟无法严格保证一定覆盖有一个完整的 cycle。

确定心跳频率的有两个重要参数,一个是方法 A 的执行频率,一个则是一个 cycle 的时间长度。例如,前者为 1 per second,后者为 2 分钟,那么在理想情况下,一个 cycle 120 秒,可以处理 120 个 activity task,换言之,极限是 120 个 activity task 在这台机器上一起执行。超过了这个数,就意味着在一个 cycle 内,无法完成所有的心跳发送任务。

当然,实际情况没有那么理想,考虑到暂时性的网络问题,线程、CPU 资源的竞争等等,实际可以并行的 activity task 要比这个数低不少。

异常处理和重试

在上图中,步骤③有三个箭头,表示了心跳出现不同种情形的处理:

  • 有一些常规异常,比如表示资源不存在,或者任务已经 cancel 了,这种情况发生的时候,要把相应的 activity task 给 cancel 掉,同时,把自己这个 heartbeatable 对象永久移除出 queue。
  • 重试情形 1:throttling 导致的异常,这种异常发生的时候,把当前 heartbeatable 对象再 addFirst 回 main queue,因为这不是当前有什么不可解决的或者不明原因的问题造成的,只需要简单重试即可。
  • 重试情形 2:其它未知原因的异常,这种情况当然需要重试(之前我们缺少这样的重试机制,导致下一次该 activity task 能够得到心跳的机会被推到了下一个 cycle,这显然是不够合理的),但是,可以把 heartbeatable 对象放到 queue 尾部去重试(addLast),并且附上一个私有计数器,如果重试超过一定次数,就挪到下一个 cycle(backup queue)去。这个放到 queue 尾部的办法,使得重试可以在当前 cycle 里进行,又可以使得这个重试能够尽量不影响其他 heartbeatable 对象的心跳及时发送。整个重试过程其实就是把当前失败对象再放回 queue 的过程,没有线程阻塞。

曾经遇到过一些这方面的问题,经过改进才有了上述的机制:

在 CPU 或者 load 达到一定程度的时候(比如这个时候有一个进程在 call service,占用了大量的 CPU 资源),就很容易发生心跳无法及时进行的问题,比如有时候线程已经初始化了,但是会 stuck 若干时间,因为没有足够的资源去进行。等到某一时刻,资源被释放(比如这个 call service 的进程结束),这个时候之前积攒的心跳任务会一下子爆发出来。不但这些心跳的顺序无法保证,而且严重的情况下会导致 throttling。如果没有当前 cycle 内的重试机制,那么下一次该对象的心跳需要等到下一个 cycle,很容易造成 activity task 的 timeout。

下面再说一个和心跳异常有关的问题。

有这样一个例子,在这个工作流框架内,我们需要管理 EMR 资源,有一个 activity 把 EMR cluster 初始化完成,另一个 activity 把实际执行的 steps 提交上去。但是发现在实际运行时有如下的问题:EMR cluster 已经初始化完成,但是 steps 迟迟没有办法提交上去,导致了这个 cluster 空闲太长时间,被框架内的 monitor 认为已经没有人使用了,需要回收,于是 EMR cluster 就被 terminate 了。但是这之后,steps 才被提交上去,但是这时候 cluster 已经处于 terminating 状态了,自然这个 step 提交就失败了。而经过分析,造成这个 EMR cluster 非预期的 termination,包括这样几个原因:

  • decision task timeout。在 EMR cluster 创建好之后,SWF 会问 decider 下一步该干嘛,这时候如果因为 CPU 高负荷等各种原因,导致 decision task timeout,SWF 就会一直等在那里,而如果这个 timeout 的时间配得太长,这段超时就足以让上面的这个 EMR cluster 空闲过长时间导致被误回收了。
  • 判断 EMR cluster 空闲到一定时间就要回收的逻辑有问题。我们以前的实现是,每隔 2 分钟执行一次 “EMR 资源操作”,包括检查资源状态,进行资源操作,然后如果发现该 EMR 资源创建后经过了 4 次资源操作,依然没有 step 提交上去,就认为空闲时间过长,需要回收(2 x 4 = 8 分钟)。但是问题在于,实际由于种种原因(和心跳的执行间隔实际时间不确定的原理一样),间隔执行 EMR 资源操作并不能严格保证每隔 2 分钟一次,有时一段时间都得不到执行,而有时候会迎来一次集中爆发,这个时候就可能实际 EMR 资源空闲了远远不到 8 分钟就被回收了。因此,这个逻辑最好是能够用绝对的 “空闲时间” 来判断,例如 EMR 资源创建时记录好时间,之后每次检查时都用当前时间去和创建时间比较,空闲超过 8 分钟再回收。
  • 由于之前提到过的心跳无法按时完成导致 activity task timeout,于是这个 EMR cluster 创建的任务实际已经完成了,但是被当做超时给无视了。

最后,我想说的是。设计一个好的工作流框架,还是有很多困难的地方,需要尤其考虑周全的地方。即便是基于 SWF 这样现有的 workflow 来搭积木和叠加功能,也有很多不易和有趣的地方。

文章未经特殊标明皆为本人原创,未经许可不得用于任何商业用途,转载请保持完整性并注明来源链接 《四火的唠叨》

×Scan to share with WeChat

你可能也喜欢看:

  1. 看 JDK 源码,解几个疑问
  2. 工作流系统的设计
  3. Notes: Spark metrics
  4. Issue record: “No thread for socket” about Memcached
  5. CommonJS

1 thought on “一种工作流心跳机制的设计”

  1. 任国强 says:
    05/13/2016 at 2:34 PM

    很喜欢这篇文章,以前做组态软件,了解到主备机的难点在于心跳机制,标记下一定细看。

    Reply

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

订阅·联系

四火,啰嗦的程序员一枚,现居西雅图

Amazon Google Groovy Hadoop Haskell Java JavaScript LeetCode Oracle Spark 互联网 亚马逊 前端 华为 历史 同步 团队 图解笔记 基础设施 工作 工作流 工具 工程师 应用系统 异步 微博 思考 技术 数据库 曼联 测试 生活 眼界 程序员 管理 系统设计 缓存 编程范型 美股 英语 西雅图 设计 问题 面向对象 面试

分类

  • Algorithm and Data Structure (30)
  • Concurrency and Asynchronization (6)
  • System Architecture and Design (43)
  • Distributed System (18)
  • Tools Frameworks and Libs (13)
  • Storage and Data Access (8)
  • Front-end Development (33)
  • Programming Languages and Paradigms (55)
  • Testing and Quality Assurance (4)
  • Network and Communication (6)
  • Authentication and Authorization (6)
  • Automation and Operation Excellence (13)
  • Machine Learning and Artificial Intelligence (6)
  • Product Design (7)
  • Hiring and Interviews (14)
  • Project and Team Management (14)
  • Engineering Culture (17)
  • Critical Thinking (25)
  • Career Growth (57)
  • Life Experience and Thoughts (45)

推荐文章

  • 谈谈分布式锁
  • 常见分布式系统设计图解(汇总)
  • 系统设计中的快速估算技巧
  • 从链表存在环的问题说起
  • 技术面试中,什么样的问题才是好问题?
  • 从物理时钟到逻辑时钟
  • 近期面试观摩的一些思考
  • RSA 背后的算法
  • 谈谈 Ops(汇总 + 最终篇):工具和实践
  • 不要让业务牵着鼻子走
  • 倔强的程序员
  • 谈谈微信的信息流
  • 评审的艺术——谈谈现实中的代码评审
  • Blog 安全问题小记
  • 求第 K 个数的问题
  • 一些前端框架的比较(下)——Ember.js 和 React
  • 一些前端框架的比较(上)——GWT、AngularJS 和 Backbone.js
  • 工作流系统的设计
  • Spark 的性能调优
  • “残酷” 的事实
  • 七年工作,几个故事
  • 从 Java 和 JavaScript 来学习 Haskell 和 Groovy(汇总)
  • 一道随机数题目的求解
  • 层次
  • Dynamo 的实现技术和去中心化
  • 也谈谈全栈工程师
  • 多重继承的演变
  • 编程范型:工具的选择
  • GWT 初体验
  • java.util.concurrent 并发包诸类概览
  • 从 DCL 的对象安全发布谈起
  • 不同团队的困惑
  • 不适合 Hadoop 解决的问题
  • 留心那些潜在的系统设计问题
  • 再谈大楼扔鸡蛋的问题
  • 几种华丽无比的开发方式
  • 我眼中的工程师文化
  • 观点的碰撞
  • 谈谈盗版软件问题
  • 对几个软件开发传统观点的质疑和反驳
  • MVC 框架的映射和解耦
  • 编程的未来
  • DAO 的演进
  • 致那些自嘲码农的苦逼程序员
  • Java 多线程发展简史
  • 珍爱生命,远离微博
  • 网站性能优化的三重境界
  • OSCache 框架源码解析
  • “ 你不适合做程序员”
  • 画圆画方的故事

近期评论

  • panshenlian.com on 初涉 ML Workflow 系统:Kubeflow Pipelines、Flyte 和 Metaflow
  • panzhixiang on 关于近期求职的近况和思考
  • Anonymous on 闲聊投资:亲自体验和护城河
  • 四火 on 关于近期求职的近况和思考
  • YC on 关于近期求职的近况和思考
  • mafulong on 常见分布式基础设施系统设计图解(四):分布式工作流系统
  • 四火 on 常见分布式基础设施系统设计图解(八):分布式键值存储系统
  • Anonymous on 我裸辞了
  • https://umlcn.com on 资源链接
  • Anonymous on 我裸辞了
© 2025 四火的唠叨 | Powered by Minimalist Blog WordPress Theme