NetCore微服务实现事务一致性masstransit之saga使用

demo如下,一个订单处理的小例子:
NetCore微服务实现事务一致性masstransit之saga使用
文章图片

首先看看结果很简单:
【NetCore微服务实现事务一致性masstransit之saga使用】NetCore微服务实现事务一致性masstransit之saga使用
文章图片

核心代码如下:

using MassTransit; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using OrderProcessor.Event; using ServiceModel; using ServiceModel.Command; using ServiceModel.DTO; using ServiceModel.Event; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace OrderProcessor.Service { public class OrderProcessorStateMachine:MassTransitStateMachine{ private readonly ILogger logger; public OrderProcessorStateMachine() { this.logger = GlobalServiceProvider.Instance.CreateScope().ServiceProvider.GetService>(); this.InstanceState(x => x.State); this.State(() => this.Processing); this.ConfigureCorrelationIds(); this.Initially(this.SetOrderSummitedHandler()); this.During(Processing, this.SetStockReservedHandler(), SetPaymentProcessedHandler(), SetOrderShippedHandler()); SetCompletedWhenFinalized(); }private void ConfigureCorrelationIds() { this.Event(() => this.OrderSubmitted, x => x.CorrelateById(c => c.Message.CorrelationId).SelectId(c => c.Message.CorrelationId)); this.Event(() => this.StockReserved, x => x.CorrelateById(c => c.Message.CorrelationId)); this.Event(() => this.PaymentProcessed, x => x.CorrelateById(c => c.Message.CorrelationId)); this.Event(() => this.OrderShipped, x => x.CorrelateById(c => c.Message.CorrelationId)); }private EventActivityBinder SetOrderSummitedHandler() => When(OrderSubmitted).Then(c => this.UpdateSagaState(c.Instance, c.Data.Order)) .Then(c => this.logger.LogInformation($"Order submitted to {c.Data.CorrelationId} received")) .ThenAsync(c => this.SendCommand("rabbitWarehouseQueue", c)) .TransitionTo(Processing); private EventActivityBinder SetStockReservedHandler() => When(StockReserved).Then(c => this.UpdateSagaState(c.Instance, c.Data.Order)) .Then(c => this.logger.LogInformation($"Stock reserved to {c.Data.CorrelationId} received")) .ThenAsync(c => this.SendCommand("rabbitCashierQueue", c)); private EventActivityBinder SetPaymentProcessedHandler() => When(PaymentProcessed).Then(c => this.UpdateSagaState(c.Instance, c.Data.Order)) .Then(c => this.logger.LogInformation($"Payment processed to {c.Data.CorrelationId} received")) .ThenAsync(c => this.SendCommand("rabbitDispatcherQueue", c)); private EventActivityBinder SetOrderShippedHandler() => When(OrderShipped).Then(c => { this.UpdateSagaState(c.Instance, c.Data.Order); c.Instance.Order.Status = Status.Processed; }) .Publish(c => new OrderProcessed(c.Data.CorrelationId, c.Data.Order)) .Finalize(); private void UpdateSagaState(ProcessingOrderState state, Order order) { var currentDate = DateTime.Now; state.Created = currentDate; state.Updated = currentDate; state.Order = order; }private async Task SendCommand(string endpointKey, BehaviorContext context) where TCommand : class, IMessage { var sendEndpoint = await context.GetSendEndpoint(new Uri("")); await sendEndpoint.Send(new { CorrelationId = context.Data.CorrelationId, Order = context.Data.Order }); } publicState Processing { get; private set; } public Event OrderSubmitted { get; private set; } public Event OrderShipped { get; set; } public Event PaymentProcessed { get; private set; } public Event StockReserved { get; private set; }} }

using MassTransit; using MassTransit.MongoDbIntegration.Saga; using OrderProcessor; using OrderProcessor.Service; var builder = WebApplication.CreateBuilder(args); // Add services to the container.builder.Services.AddControllers(); builder.Services.AddMassTransit(x => { x.UsingRabbitMq((context, cfg) => { var connection = "amqp://lx:admin@ip:5672/my_vhost"; //不加主机会报错 cfg.Host(connection); cfg.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30))); cfg.UseMessageRetry(r => r.Immediate(5)); cfg.ConfigureEndpoints(context); cfg.ReceiveEndpoint("", ep => { ep.StateMachineSaga(new OrderProcessorStateMachine(), MongoDbSagaRepository.Create("connecturl","db")); }); }); }); var app = builder.Build(); app.Run();

NetCore微服务实现事务一致性masstransit之saga使用
文章图片

这是整个订单的几个步骤。
想把代码都贴出来,过程梳理给大家参考,但是时间有限这个点没那么多了,而且我理应要把这个程序跑起来的。明天照常上班,暂不过多研究。
整个demo代码:
exercise/MassTransitDemo/MassTransitSagasDemo at master · liuzhixin405/exercise (github.com)

有兴趣可以还有一个demo:
exercise/MassTransitDemo/SagaTest-master at master · liuzhixin405/exercise (github.com)
masstransit官网:
MassTransit (masstransit-project.com)
不得不说这个东西真的很不错,不过暂时没找到翻译,大概的过了下文档,还有好多不清楚的,英文水平有限。demo都是来自外国大佬贡献的,很遗憾国内有这方面的文章,但是深入一点的都是国外友人的贡献,而且现成的微服务demo写的很好很多,视情况项目可借鉴。

    推荐阅读