事件消费者之 Saga - 事件溯源

事件消费者之 Saga - 事件溯源
文章图片

本文转载自【何以解耦】:https://codedecoupled.com/php...
什么是 Saga Saga 是一种用于处理漫长业务流程的设计模式。这里的长度并非时间长短,而是指一个业务流程由于跨域而涉及的领域宽度。所以一个 Saga 处理周期可能是一个星期,一个小时,一分钟甚至几秒,它与时间无关。
为什么使用 Saga 在 DDD(领域驱动)中,我们用聚合建立一个以自我为中心的模型,聚合具有良好的自我保护性,外界只能通过 Command 来调用聚合的接口。看起来这是一个很好的设计,然而业务需求层出不穷,当一个业务流程需要多个聚合参与时我们便可使用 Saga。
让我们举一个简单的例子,现有两个独立的聚合,他们分别是订单聚合(Order Aggregate)以及库存聚合(Inventory Aggregate):

  • Order Aggregate
    • PlaceOrderCommand:触发 OrderPlacementConfirmedEvent 事件。
  • Inventory Aggregate
    • CheckInventory:触发 InventoryAvailableEvent 或者 InventoryNotAvailableEvent 事件。
订单聚合提供两个对外接口:
  • PlaceOrderCommand:此接口用于提交用户订单。
库存聚合提供一个对外接口:
  • DeductInventory:此接口用于检查存货是否足够。
以上两个聚合独立存在且无合作关系,订单聚合用于提交用户订单,库存聚合用于查看存货。此时调用 PlaceOrderCommand 并不会检查存货,而业务需求肯定会要求提交订单时确保存货足够,此时订单聚合与库存聚合必须相互合作,于是我们便可使用 Saga。
首先我们需要修改订单聚合接口:
  • Order Aggregate
    • PlacingOrderCommand:触发 OrderPlacingEvent 事件。
    • ConfirmOrderPlacementCommand:触发 OrderPlacementConfirmedEvent 事件。
修改后的订单聚合提供两个对外接口:
  • PlacingOrderCommand:此接口用于提交用户订单。
  • ConfirmOrderPlacementCommand:此接口用于确认用户订单的提交。
然后我们便可使用 Saga 来实现业务需求:
class PlaceOrderSaga extends Saga { public function onOrderPlacingEvent(OrderPlacingEvent $event) { $this ->deductInventoryCommand ->handle( $event->inventoryAggregateId ); }public function onInventoryAvailableEvent(InventoryAvailableEvent $event) { $this ->confirmOrderPlacementCommand ->handle( $event->orderAggregateId ); } }

我们需要谨记,一个 Saga 是一个业务流程的模型,但是它并不具备任何逻辑代码,它仅仅指挥聚合间 API 的调用顺序。在应用层面,它就像一个简单的事件监听器。
我们往往可以用一个简单的流程图来梳理 Saga,比如 PlaceOrderSaga:
事件消费者之 Saga - 事件溯源
文章图片

实现 Saga 以上代码仅仅是一种 Saga 的原型图,在实现 Saga 设计模式时,我们需要注意以下几点:
排顺以及去重 在一个事件驱动系统中,基础设施的不确定性将导致事件信息的顺序颠倒以及内容重复。比如在使用 AWS SQS 时,如果没有使用 FIFQ 队列,消息的发出顺序是不受控的。又比如在 RabbitMQ 中,如果一个消息没有被及时消化,同一个消息可能重发。
基于以上两点,在实现 Saga 时,它必须同时具备排顺以及去重功能,这样我们的应用层 API 将无后顾之忧。
弥补行为 如果 Saga 在运行过程中发生了异常怎么办?比如在我们的例子中,如果最后一步中的 confirmOrderPlacementCommand 由于某种执行失败,我们应该如何处理?此时的库存已经扣除,如果不进行处理,库存一定无法和订单匹配,这将是一个灾难。
在实现 Saga 时,它必须支持弥补行为 ,弥补行为好比数据中的回滚行为,只不过它不是依靠数据库来实现。
在加入弥补行为后,PlaceOrderSaga 代码更新为:
class PlaceOrderSaga extends Saga { public function onOrderPlacingEvent(OrderPlacingEvent $event) { $this ->deductInventoryCommand ->handle( $event->inventoryAggregateId ); }public function onInventoryAvailableEvent(InventoryAvailableEvent $event) { $this ->confirmOrderPlacementCommand ->handle( $event->orderAggregateId ); }public function onInventoryAvailableEventFailed(InventoryAvailableEvent $event) { $this ->increaseInventoryCommand ->handle( $event->inventoryAggregateId ); } }

如果 confirmOrderPlacementCommand 失败,也就是 onInventoryAvailableEvent 失败,我们在 onInventoryAvailableEventFailed 中将库存加回去。
注意事项 Saga 是一种容易理解的设计模式,可在一个跨域的场景中,它是一个非常强大的解决方案。最后我们需要注意的,也是上文中未曾提起的一点,那便是如果弥补行为本身失败了,我们怎么处理?
如果你的基础设施能保证弥补行为的稳定性,那是再好不过的了,如果不行的话,我们只能及时的进行人为修复,那便是我们上文中使用的方式。
【事件消费者之 Saga - 事件溯源】本文转载自【何以解耦】:https://codedecoupled.com/php...,如果你也对 TDD,DDD以及简洁代码感兴趣,欢迎关注公众号【何以解耦】,一起探索软件开发之道。

    推荐阅读