大家好,我是Go学堂的渔夫子。今天跟大家聊聊基于有向无环图的工作流的实现.
01 工作流(workflow)概述
工作流,是对工作流程中的工作按一定的规则组织在一起并按其进行执行的一种模型。比如常见的行政系统中的加班申请、请假申请;工作流要解决的问题就是为了实现某个特定的目标,让多个参与者之间按某种预订的规则自动的传递信息。
本文介绍了一种基于有向无环图实现的工作流,通过有向无环图,可以解决两个问题:从逻辑上,对各个节点的依赖关系进行了组织;从技术上,有依赖关系的节点需要等待执行,无依赖关系的可以并发执行。
该工作流的实际应用是在程序化广告中从接收请求到广告响应之间的流程:接收请求、获取地理位置、获取用户画像、广告召回、广告排序、算法预估、返回响应等各个节点之间的依赖执行。
但本文的目标是介绍其实现思想,所以在示例部分会以穿衣服的流程为例进行讲解。
02 工作流的实现
下面我们以早上起床穿衣所发生的事件为例来讲解有向无环图的实现。穿衣流程中包含的事件有穿内裤、穿裤子、穿袜子、穿鞋、戴手表、穿衬衣、穿外套等。这些事件中有的必须要先穿某些衣服,才能再穿其他衣服(如先穿袜子后才能穿鞋)。有些事件则可以以任意顺序穿上(如袜子和裤子之间可以以任意词序进行穿戴)。如图所示:
文章图片
由上图可以看到,穿内裤、穿袜子、穿衬衣、戴手表之间没有相互依赖,可以并发执行。而穿鞋子则必须等待所依赖的裤子和袜子穿完后才能执行。下面我们就来看看如何实现这样的有向无环图的工作流。
2.1 定义工作流结构
根据上图,我们可以看出一个相对完整的工作流包含开始节点(从哪里开始)、边(经过哪些节点)、结束节点(到哪里结束)。由此,我们定义工作流的结构如下:
type WorkFlow struct {
done chan struct{} //结束标识,该标识由结束节点写入
doneOnce *sync.Once //保证并发时只写入一次
alreadyDone bool //有节点出错时终止流程标记
root *Node //开始节点
End *Node //结束节点
edges []*Edge //所有经过的边,边连接了节点
}
2.2 定义工作流中边 边用来表示两个节点之间的依赖关系。有向图中的边还能表明两个节点哪个是前置节点,哪个是后置节点。后置节点需要等待前置节点的任务执行完成后才能执行。如下图所示:
文章图片
【Go实战 | 基于有向无环图的并发执行流的实现】内裤和裤子两个节点说明只有等穿上内裤后,才能穿裤子,那么穿内裤节点就是穿裤子节点的前置节点;而鞋子只有等裤子和袜子都穿上后才能穿鞋子,那么裤子和袜子就是鞋子的前置节点。
所以边的表示是从哪个节点到哪个节点。定义如下:
type Edge struct {
FromNode *Node
ToNode *Node
}
2.3 定义工作流中的节点 节点即要具体执行逻辑的任务单元。同时每个节点都有所关联的边。因为我们使用的是有向图,所以关联的边又分为入边(即终止于该顶点的边)和出边(即从该顶点开始的边)。如下图:
文章图片
边1是内裤节点的出边,同时也是裤子节点的入边。边2和边3都是鞋子节点的入边。本文将入边称为该节点的依赖边,定义为Dependency,表示只有这些边连接的节点任务执行完后,该节点才能开始执行。将该节点的出边称为孩子边,定义Children,表示该节点执行完后,可以继续执行的子节点。由此,节点的结构定义如下:
type Node struct {
Dependency []*Edge //依赖的边
DepCompleted int32 //表示依赖的边有多少个已执行完成,用于判断该节点是否可以执行了
Task Runnable //任务执行
Children []*Edge //节点的字边
}
在节点的定义中,我们看到有一个Runnable类型。该类型是一个接口,只有一个Run(i interface{})方法,用来对节点执行业务逻辑的抽象。以便在节点执行的时候,统一调用各个节点的Run方法即可。该接口的定义如下:
type Runnable interface {
Run(i interface{})
}
我们以穿鞋子任务为例来实现该接口,定义如下:
type WearShoesAction struct {}func (a *WearShoesAction) Run(i interface{}) {
fmt.Println("我正在穿鞋子...")
}
有了具体的执行任务,我们就可以将该任务构建到一个节点上:
func NewNode(Task Runnable) *Node {
return &Node{
Task: Task,
}
}//构建穿鞋子的节点
shoesNode := NewNode(&WearShoesAction{})
有了节点,我们需要构建边,因为边是连接两个节点的,所以我们再定义一个穿袜子的节点:
type WearSocksAction struct {}func(a *WearSocksAction) Run(i interface{}) {
fmt.Println("我正在穿袜子...")
}
好了,有了两个节点了,我们定义边的函数,以便将两个节点构建成边:
func AddEdge(from *Node, to *Node) *Edge {
edg := &Edge{
FromNode: from,
ToNode: to,
}//该条边是from节点的出边
from.Children = append(from.Children, edge)
//该条边是to节点的入边
to.Dependency = append(to.Dependency, edge)return edg
}
2.4 工作流中的特殊节点 -- 开始节点和结束节点 另外,我们在工作流中看到有一个开始节点和结束节点。开始节点是工作流的根节点,是整个工作流开始执行的触发点。它的任务就是触发子节点,所有该节点中没有具体的业务逻辑要执行,也就是说该节点中的Task是nil。
结束节点代表整个工作流的任务都执行完成了,通过信号的方式告知调用方流程结束。所以,该节点的任务逻辑就是给工作流的done通道写入一个消息,让调用方接收到该消息,以解除阻塞:
type EndWorkFlowAction struct {
done chan struct{} //节点执行完成,往该done写入消息,和workflow中的done共用
s *sync.Once //并发控制,确保只往done中写入一次
}//结束节点的具体执行任务
func (end *EndWorkFlowAction) Run(i interface{}) {
end.s.Do(func() { end.Done <- struct{} })
}
2.5 构建工作流 好了,我们来看下基于以上各个元素的结构体定义,如何构建一个完整的工作流,并让工作流能够工作。
首先,当然是要实例化一个工作流。
func NewWorkFlow() *WorkFlow {
wf := &WorkFlow{
root: &Node{Task: nil},//开始节点,所有具体的节点都是它的子节点,没有具体的执行逻辑,只为出发其他节点的执行
done: make(chan struct{}, 1),
doneOnce: &sync.Once{},
}//加入结束节点
EndNode = &EndWorkFlow{
done: wf.done,
s: wf.doneOnce,
}
wf.End = NewTaskNodereturn wf
}
因为每个工作流实例都必须要有开始节点和结束节点,所以我们在初始化的时候就指定了开始节点和结束节点。
其次,构建节点之间的边。构建边分三类,一类是跟节点和中间节点之间构成的边,这类的特点是根节点只有出边,没有入边。一类是中间节点和中间节点之间构成的边。最后一类是中间节点和结束节点之间构成的边。这类边的特点是结束节点只有入边,没有出边。
func (wf *WorkFlow) AddStartNode(node *Node) {
wf.edges = append(wf.edges, AddEdge(wf.root, node))
}func (wf *WorkFlow) AddEdge(from *Node, to *Node) {
wf.edges = append(wf.edges, AddEdge(from, to))
}func (wf *WokFlow) ConnectToEnd(node *Node) {
wf.edges = append(wf.edges, AddEdge(node, wf.End))
}
通过以上3个函数,我们就可以构造出工作流中各个节点之间的关系图了。 有了关系图,我们需要让
这个关系图流转起来。所以,我们再来看看工作流中相关的执行行为的定义。
func (wf *WorkFlow) StartWithContext(ctx context.Context, i interface{}) {
wf.root.ExecuteWithContext(ctx, wf, i)
}func(wf *WorkFlow) WaitDone() {
<-wf.done
close(wf.done)
}func(wf *WorkFlow)interrupDone() {
wf.alreadyDone = true
wf.s.Do(func() { wf.done <- struct{} })
}
2.6 节点的具体执行逻辑实现 在工作流执行函数中,我们看到调用了根节点的ExecuteWithContext函数。我们来看下该函数的具体实现。
func (n *Node) ExecuteWithContext(ctx context.Context, wf *WorkFlow, i interface{}) {
//所依赖的前置节点没有运行完成,则直接返回
if !n.dependencyHasDone() {
return
}
//有节点运行出错,终止流程的执行
if ctx.Err() != nil {
wf.interruptDone()
return
}//节点具体的运行逻辑
if n.Task != nil {
n.Task.Run(i)
}//运行子节点
if len(n.Children) > 0 {
for idex := 1;
idx < len(n.Children);
idx++ {
go func(child *Edge) {
child.Next.Execute(ctx, wf, i)
}(n.Children[idx])
}n.Children[0].Next.Execute(ctx, wf, i)
}}
大部分逻辑都很简单,我们重点看下dependencyHasDone()函数。该函数是用来检查一个节点所依赖的边是否都执行完了。这里是通过计数来实现的。 其具体实现如下:
func (n *Node) dependencyHasDone() bool {
//该节点没有依赖的前置节点,不需要等待,直接返回true
if n.Dependency == nil {
return true
}//如果该节点只有一个依赖的前置节点,也直接返回
if len(n.Dependency) == 1 {
return true
}//这里将依赖的节点加1,说明有一个依赖的节点完成了
atomic.AddInt32(&n.DepCompleted, 1)//判断当前依赖的节点数量是否和依赖的节点相等,相等,说明都运行完了
return n.DepCompleted == int32(len(n.Dependency))
}
我们以下图为例,来说明end节点是如何被检查所依赖的前置节点都执行完了的。
文章图片
先是root节点开始执行自己的ExecuteWithContext方法。在该方法中,先通过dependencyHasDone函数来判断该节点所依赖的前置节点是否都执行完了,但该节点没有前置节点,即满足n.Dependency == nil,返回true,所以root节点可以继续执行ExecuteWithContext中后面的逻辑。
文章图片
执行root的Task任务,因为root的Task为nil,所以继续执行到子节点中。发现root的Children中有两条边,开始循环让子节点同时执行。即执行裤子节点和袜子节点。
文章图片
我们知道,袜子节点和裤子节点所依赖的前置节点只有root,也就是只有一条入边。当执行到自己的时候,说明root节点已经执行完了,所以执行到了下面的逻辑:
//如果该节点只有一个依赖的前置节点,也直接返回
if len(n.Dependency) == 1 {
return true
}
假设现在裤子节点先执行完了,但袜子节点还没执行完。当裤子节点执行完之后,就会找和自己管理的子节点继续执行,也就是鞋子节点,这时,鞋子节点在执行dependencyHasDone逻辑时,命中了第三部分的逻辑:
//这里将依赖的节点加1,说明有一个依赖的节点完成了
atomic.AddInt32(&n.DepCompleted, 1)//判断当前依赖的节点数量是否和依赖的节点相等,相等,说明都运行完了
return n.DepCompleted == int32(len(n.Dependency))
发现鞋子依赖的边有2条。现在只完成了1条,所以是不可执行状态,就直接返回不再执行了。如下图所示:
文章图片
这时,袜子节点也执行结束了。同样会执行自己的子节点,也就是鞋子节点。这时发现鞋子节点的完成数是2,发现和自己依赖边相等,这时,鞋子节点编程可执行状态。如下图:
文章图片
2.7 完整示例 我们现在来看下穿衣服流程的完整示例。
wf := NewWorkFlow()//构建节点
UnderpantsNode := NewNode(&WearUnderpantsAction{})
SocksNode := NewNode(&WearSocksAction{})
ShirtNode := NewNode(&ShirtNodeAction{})
WatchNode := NewNode(&WatchNodeAction{})
TrousersNode := NewNode(&WearTrouserNodeAction{})
ShoesNode := NewNode(&WearShoesNodeAction{})
CoatNode := NewNode(&WearCoatNodeAction{})//构建节点之间的关系
wf.AddStartNode(UnserpatnsNode)
wf.AddStartNode(SocksNode)
wf.AddStartNode(ShirtNode)
wf.AddStartNode(WatchNode)wf.AddEdge(UnserpatnsNode, TrousersNode)
wf.AddEdge(TrousersNode, ShoesNode)
wf.AddEdge(SocksNode, ShoesNode)
wf.AddEdge(ShirtNode, CoatNode)
wf.AddEdge(WatchNode, CoatNode)wf.ConnectToEnd(ShoesNode)
wf.ConnectToEnd(CoatNode)var completedAction []stringwf.StartWithContext(ctx, completedAction)
wf.WaitDone()fmt.Println("执行其他逻辑")
上面代码中的各个Action的结构体实现Runnable接口即可,这里就不再重复了。到此,我们就构建成功了在开始时就看到的穿衣流程图。
03 一些其他问题
3.1 在各个节点之间,如何传递数据 我们看到WorkFlow的StartWithContext函数中的第二个参数是一个interface{}类型,这个就是用来给各个节点传递参数的。同时在Node的ExecuteWithContext函数中的参数也是interface{}类型。我们可以将该参数定义为指针类型。这样在各个节点执行过程中就可以改变指针指向的内容了。
3.2 如果有节点执行错误,如何终止流程 在一个流程中,有任何一个节点执行出错,我们的处理方式是终止整个流程。即在上面节点的ExecuteWithContext函数中有如下代码的判断:
if ctx.Err() != nil {
wf.interruptDone()
return
}
我们来看下WorkFlow的interruptDone的实现:
wf.alreadyDone = true
wf.s.Do(func() { wf.done <- struct{}{}})
总结
有向无环图是一种解决节点依赖关系的利器。在解决了依赖之间的问题同时,也解决了相互独立节点的并发问题。
推荐阅读
- 【golang】leetcode中级-字母异位词分组&无重复字符的最长子串
- 彻底理解Golang Map
- kratos线上开源年会它来啦~
- 深入浅出 Golang 资源嵌入方案(go-bindata篇)
- 深入浅出 Golang 资源嵌入方案(前篇)
- golang 经典案例总结
- Golang 数组和切片
- Go JSON编码与解码()
- golang map基础知识