NetCore微服务实现事务一致性masstransit之saga使用
demo如下,一个订单处理的小例子:
文章图片
首先看看结果很简单:
【NetCore微服务实现事务一致性masstransit之saga使用】
文章图片
核心代码如下:
using MassTransit; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using OrderProcessor.Event; using ServiceModel; using ServiceModel.Command; using ServiceModel.DTO; using ServiceModel.Event; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace OrderProcessor.Service { public class OrderProcessorStateMachine:MassTransitStateMachine{ private readonly ILoggerlogger; public OrderProcessorStateMachine() { this.logger = GlobalServiceProvider.Instance.CreateScope().ServiceProvider.GetService >(); this.InstanceState(x => x.State); this.State(() => this.Processing); this.ConfigureCorrelationIds(); this.Initially(this.SetOrderSummitedHandler()); this.During(Processing, this.SetStockReservedHandler(), SetPaymentProcessedHandler(), SetOrderShippedHandler()); SetCompletedWhenFinalized(); }private void ConfigureCorrelationIds() { this.Event(() => this.OrderSubmitted, x => x.CorrelateById(c => c.Message.CorrelationId).SelectId(c => c.Message.CorrelationId)); this.Event(() => this.StockReserved, x => x.CorrelateById(c => c.Message.CorrelationId)); this.Event(() => this.PaymentProcessed, x => x.CorrelateById(c => c.Message.CorrelationId)); this.Event(() => this.OrderShipped, x => x.CorrelateById(c => c.Message.CorrelationId)); }private EventActivityBinder SetOrderSummitedHandler() => When(OrderSubmitted).Then(c => this.UpdateSagaState(c.Instance, c.Data.Order)) .Then(c => this.logger.LogInformation($"Order submitted to {c.Data.CorrelationId} received")) .ThenAsync(c => this.SendCommand ("rabbitWarehouseQueue", c)) .TransitionTo(Processing); private EventActivityBinder SetStockReservedHandler() => When(StockReserved).Then(c => this.UpdateSagaState(c.Instance, c.Data.Order)) .Then(c => this.logger.LogInformation($"Stock reserved to {c.Data.CorrelationId} received")) .ThenAsync(c => this.SendCommand ("rabbitCashierQueue", c)); private EventActivityBinder SetPaymentProcessedHandler() => When(PaymentProcessed).Then(c => this.UpdateSagaState(c.Instance, c.Data.Order)) .Then(c => this.logger.LogInformation($"Payment processed to {c.Data.CorrelationId} received")) .ThenAsync(c => this.SendCommand ("rabbitDispatcherQueue", c)); private EventActivityBinder SetOrderShippedHandler() => When(OrderShipped).Then(c => { this.UpdateSagaState(c.Instance, c.Data.Order); c.Instance.Order.Status = Status.Processed; }) .Publish(c => new OrderProcessed(c.Data.CorrelationId, c.Data.Order)) .Finalize(); private void UpdateSagaState(ProcessingOrderState state, Order order) { var currentDate = DateTime.Now; state.Created = currentDate; state.Updated = currentDate; state.Order = order; }private async Task SendCommand (string endpointKey, BehaviorContext context) where TCommand : class, IMessage { var sendEndpoint = await context.GetSendEndpoint(new Uri("")); await sendEndpoint.Send (new { CorrelationId = context.Data.CorrelationId, Order = context.Data.Order }); } publicState Processing { get; private set; } public Event OrderSubmitted { get; private set; } public Event OrderShipped { get; set; } public Event PaymentProcessed { get; private set; } public Event StockReserved { get; private set; }} }
using MassTransit; using MassTransit.MongoDbIntegration.Saga; using OrderProcessor; using OrderProcessor.Service; var builder = WebApplication.CreateBuilder(args); // Add services to the container.builder.Services.AddControllers(); builder.Services.AddMassTransit(x => { x.UsingRabbitMq((context, cfg) => { var connection = "amqp://lx:admin@ip:5672/my_vhost"; //不加主机会报错 cfg.Host(connection); cfg.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30))); cfg.UseMessageRetry(r => r.Immediate(5)); cfg.ConfigureEndpoints(context); cfg.ReceiveEndpoint("", ep => { ep.StateMachineSaga(new OrderProcessorStateMachine(), MongoDbSagaRepository.Create("connecturl","db")); }); }); }); var app = builder.Build(); app.Run();
文章图片
这是整个订单的几个步骤。
想把代码都贴出来,过程梳理给大家参考,但是时间有限这个点没那么多了,而且我理应要把这个程序跑起来的。明天照常上班,暂不过多研究。
整个demo代码:
exercise/MassTransitDemo/MassTransitSagasDemo at master · liuzhixin405/exercise (github.com)
有兴趣可以还有一个demo:
exercise/MassTransitDemo/SagaTest-master at master · liuzhixin405/exercise (github.com)
masstransit官网:
MassTransit (masstransit-project.com)
不得不说这个东西真的很不错,不过暂时没找到翻译,大概的过了下文档,还有好多不清楚的,英文水平有限。demo都是来自外国大佬贡献的,很遗憾国内有这方面的文章,但是深入一点的都是国外友人的贡献,而且现成的微服务demo写的很好很多,视情况项目可借鉴。
推荐阅读
- 微信小程序常见网络请求失败问题总结及解决方案
- Apache|Apache Hudi的多版本清理服务彻底讲解
- Linux|Linux socket编程(C语言socket实现客户端与服务器网络通信)
- linux远程开发|linux远程开发——网络通信(客户端与服务器建立连接)
- android订单倒计时支付实现|android订单倒计时支付实现,Android中微信小程序支付倒计时功能
- 云服务的深度防御以及 SASE、SSE 和云服务如何相互交织
- 天翼云云主机上搭建FTP服务最佳实践
- 毕马威发布《2022金融科技十大趋势展望》,腾讯云微搭低代码入选
- linux服务器安装openwrt|linux服务器安装openwrt,探索openwrt安装宝塔,搭建web网站论坛社区网校
- docker|docker+wordpress搭建个人博客(如何在阿里云服务器上线部署个人博客)