Python微信订餐小程序课程视频
https://edu.csdn.net/course/detail/36074
Python实战量化交易理财系统
【python|微服务简单实现最终一致性】https://edu.csdn.net/course/detail/35475
有花时间去研究masstransit的saga,英文水平不过关,始终无法实现上手他的代码编排的业务,遗憾。
本文通过rabbit和sqlserver实现下单,更新库存,更新产品,模拟数据最终一致性。
项目结构如下,reportService可有可无,这里就相当一个链条,只要两节走通了后面可以接龙,本文有用到不省略。流程:orderservice=>eComm=>reportservice 。
文章图片
下面先看看order的配置,通过控制器新增订单同时发布订单信息到order_exchange交换机,Key是"order.created,这样就把订单推送到了队列,等到库存服务获取订单去更新库存。
文章图片
?
| 12345678910111213 | // POST api/``[HttpPost]``public
async Task Post([FromBody] OrderDetail orderDetail)``{``var
id = await orderCreator.Create(orderDetail);
``publisher.Publish(JsonConvert.SerializeObject(``new
OrderRequest {
OrderId = id,``ProductId = orderDetail.ProductId,``Quantity = orderDetail.Quantity
}),
"order.created"``,
null``);
``}
|
更新库存的代码,然后再发送消息告诉order服务,这里有哪个try包裹,如果这里有失败会触发catch,发送减库存失败的消息。order服务消费到这条消息就会执行相应的删除订单操作。代码如下:
?
| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758 | using
Ecomm.DataAccess;
``using
Ecomm.Models;
``using
Microsoft.Extensions.Hosting;
``using
Newtonsoft.Json;
``using
Plain.RabbitMQ;
``using
System;
``using
System.Collections.Generic;
``using
System.Linq;
``using
System.Threading;
``using
System.Threading.Tasks;
namespace
Ecomm``{``public
class
OrderCreatedListener : IHostedService``{``private
readonly
IPublisher publisher;
``private
readonly
ISubscriber subscriber;
``private
readonly
IInventoryUpdator inventoryUpdator;
public
OrderCreatedListener(IPublisher publisher, ISubscriber subscriber, IInventoryUpdator inventoryUpdator)``{``this``.publisher = publisher;
``this``.subscriber = subscriber;
``this``.inventoryUpdator = inventoryUpdator;
``}
public
Task StartAsync(CancellationToken cancellationToken)``{``subscriber.Subscribe(Subscribe);
``return
Task.CompletedTask;
``}
private
bool
Subscribe(``string
message, IDictionary<``string``,
object``> header)``{``var
response = JsonConvert.DeserializeObject(message);
``try``{``inventoryUpdator.Update(response.ProductId, response.Quantity).GetAwaiter().GetResult();
``publisher.Publish(JsonConvert.SerializeObject(``new
InventoryResponse { OrderId = response.OrderId, IsSuccess =
true
}``),
"inventory.response"``,
null``);
``}``catch
(Exception)``{``publisher.Publish(JsonConvert.SerializeObject(``new
InventoryResponse { OrderId = response.OrderId, IsSuccess =
false
}``),
"inventory.response"``,
null``);
``}
return
true``;
``}
public
Task StopAsync(CancellationToken cancellationToken)``{``return
Task.CompletedTask;
``}``}``}
|
?
| 12345678910111213141516171819202122232425262728293031323334353637383940414243 | using
Ecomm.Models;
``using
Microsoft.Extensions.Hosting;
``using
Newtonsoft.Json;
``using
Plain.RabbitMQ;
``using
System.Collections.Generic;
``using
System.Threading;
``using
System.Threading.Tasks;
namespace
OrderService``{``public
class
InventoryResponseListener : IHostedService``{``private
readonly
ISubscriber subscriber;
``private
readonly
IOrderDeletor orderDeletor;
public
InventoryResponseListener(ISubscriber subscriber, IOrderDeletor orderDeletor)``{``this``.subscriber = subscriber;
``this``.orderDeletor = orderDeletor;
``}
public
Task StartAsync(CancellationToken cancellationToken)``{``subscriber.Subscribe(Subscribe);
``return
Task.CompletedTask;
``}
private
bool
Subscribe(``string
message, IDictionary<``string``,
object``> header)``{``var
response = JsonConvert.DeserializeObject(message);
``if
(!response.IsSuccess)``{``orderDeletor.Delete(response.OrderId).GetAwaiter().GetResult();
``}``return
true``;
``}
public
Task StopAsync(CancellationToken cancellationToken)``{``return
Task.CompletedTask;
``}``}``}
|
上面的代码是整个服务的核心业务,也很简单就是队列相互通信相互确认操作是否顺利,失败就执行回归操作,而这里我们都会写好对应补偿代码:
using Dapper;
using System;
using System.Collections.Generic;
using System.Data.SqlClient;
using System.Linq;
using System.Threading.Tasks;
namespace OrderService
{
public class OrderDeletor : IOrderDeletor
{
private readonly string connectionString;
public OrderDeletor(string connectionString)
{
this.connectionString = connectionString;
} public async Task Delete(int orderId)
{
using var connection = new SqlConnection(connectionString);
connection.Open();
using var transaction = connection.BeginTransaction();
try
{
await connection.ExecuteAsync("DELETE FROM OrderDetail WHERE OrderId = @orderId", new { orderId }, transaction: transaction);
await connection.ExecuteAsync("DELETE FROM [Order] WHERE Id = @orderId", new { orderId }, transaction: transaction);
transaction.Commit();
}
catch
{
transaction.Rollback();
}
}
}
}
库存服务里有发布产品的接口,这里没有做过多的处理,只是把产品新增放到队列,供后面的ReportService服务获取,该服务拿到后会执行产品数量扣除:
文章图片
using Microsoft.Extensions.Hosting;
using Newtonsoft.Json;
using Plain.RabbitMQ;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace ReportService
{
public class ReportDataCollector : IHostedService
{
private const int DEFAULT\_QUANTITY = 100;
private readonly ISubscriber subscriber;
private readonly IMemoryReportStorage memoryReportStorage;
public ReportDataCollector(ISubscriber subscriber, IMemoryReportStorage memoryReportStorage)
{
this.subscriber = subscriber;
this.memoryReportStorage = memoryReportStorage;
} public Task StartAsync(CancellationToken cancellationToken)
{
subscriber.Subscribe(Subscribe);
return Task.CompletedTask;
}
private bool Subscribe(string message, IDictionary header)
//private bool ProcessMessage(string message, IDictionary header)
{
if (message.Contains("Product"))
{
var product = JsonConvert.DeserializeObject(message);
if (memoryReportStorage.Get().Any(r => r.ProductName == product.ProductName))
{
return true;
}
else
{
memoryReportStorage.Add(new Report
{
ProductName = product.ProductName,
Count = DEFAULT\_QUANTITY
});
}
}
else
{
var order = JsonConvert.DeserializeObject(message);
if(memoryReportStorage.Get().Any(r => r.ProductName == order.Name))
{
memoryReportStorage.Get().First(r => r.ProductName == order.Name).Count -= order.Quantity;
}
else
{
memoryReportStorage.Add(new Report
{
ProductName = order.Name,
Count = DEFAULT\_QUANTITY - order.Quantity
});
}
}
return true;
} public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
}
到这里整个流程大概如此。只要理清楚了订单和库存更新这里的业务,后面差不多一样,可以无限递归。代码文末有链接供下载。
这里有一个地方的代码如下,新增库存的时候同时发布消息。假如新增完订单后面崩掉了,这里是个原子操作最佳。
?
| 123456789101112 |
[HttpPost]``public
async Task Post([FromBody] OrderDetail orderDetail)``{``var
id = await orderCreator.Create(orderDetail);
``publisher.Publish(JsonConvert.SerializeObject(``new
OrderRequest {
OrderId = id,``ProductId = orderDetail.ProductId,``Quantity = orderDetail.Quantity
}),
"order.created"``,
null``);
``}
|很遗憾masstransit的saga还没有整明白,那就上cap,完成业务一致性。加了点cap代码因为之前是dapper,所以加了dbcontext和cap相关代码有点小乱。核心代码如下:
?
| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849 |
using
DotNetCore.CAP;
``using
MediatR;
``using
OrderService.Command;
``using
System.Threading;
``using
Ecomm.Models;
``using
System.Collections.Generic;
namespace
OrderService.Handler``{``public
class
InsertOrderDetailHandler : IRequestHandler``{``private
readonly
OrderDbContext context;
``private
readonly
ICapPublisher cap;
``public
InsertOrderDetailHandler(OrderDbContext context, ICapPublisher cap)``{``this``.context = context;
``this``.cap = cap;
``}``public
async System.Threading.Tasks.Task Handle(InsertOrderDetailCommand request, CancellationToken cancellationToken)``{``using``(``var
trans =context.Database.BeginTransaction(cap))``{``var
order =context.Orders.Add(``new
Order``{``UpdatedTime = System.DateTime.Today,``UserId = request.UserId,``UserName = request.UserName``});
``var
orderDetail = context.OrderDetails.Add(``new
OrderDetail``{``OrderId = order.Entity.Id,``ProductId = request.ProductId,``Quantity = request.Quantity,``ProductName = request.ProductName,``});
``context.SaveChanges();
cap.Publish(``"order.created"``,
new
OrderRequest``{``OrderId = order.Entity.Id,``ProductId = orderDetail.Entity.ProductId,``Quantity = orderDetail.Entity.Quantity``},
new
Dictionary<``string``,``string``>()) ;
``trans.Commit();
``return
new
InsertOrderDetailModel { OrderDetailid = orderDetail.Entity.Id, OrderId = order.Entity.Id, Success =
true
};
``}``}``}``}
|到这里差不多要结束了,这里的代码都可以调试运行的。因为加了cap,order服务有两套rabbitmq的配置,有冗余,而且有点坑。调试的时候注意,Plain.RabbitMQ支持的交换机不是持久化的,而cap是持久化的,所以有点不兼容。第一次运行可以先确保Plain.RabbitMQ正常,再删掉交换机,cap跑起来了再建持久化交换机,这样cap消息就会被rabbitmq接收,后面就会被库存服务消费。因为我这里cap不会自动绑定队列,Plain.RabbitMQ是可以的。所以需要新建交换机后再绑定队列。而且这里队列以Plain.RabbitMQ生成的名字来绑定。要不然又可能会调试踩坑无法出坑。 用cap不注意你连消息队列都看不到,看到了队列也看不到消费数据,这点不知道是我不会还是cap有什么难的配置。结束。。。
上例项目demo:
liuzhixin405/SimpleOrders_Next (github.com)
超简单微服务demo
liuzhixin405/SimpleOrders (github.com)
推荐阅读
- python|人工智能初识
- 软件工具|macOS安装Scrapy,不要踩坑了
- 其他|腾讯视频自动签到(Python + 腾讯云函数实现)
- 软件测试|近期腾讯30道自动化软件测试面试题(含答案)!
- 其他|如何写出高质量的代码(优秀的程序员都是这样做的)
- java|看看人家那物流系统,那叫一个优雅(附源码)
- java|看看人家那快速开发代码生成器系统,那叫一个优雅(附源码)!
- 进程|进程 线程 计算机 英语,2022计算机考研408知识点:进程和线程的区别
- TensorFlow|2 TensorFlow入门笔记之建造神经网络并将结果可视化