【原】使用AWS|【原】使用AWS API Gateway WebSockets和Lambda构建实时聊天应用程序

关键词:AWS API Gateway WebSockets Example 【原】使用AWS|【原】使用AWS API Gateway WebSockets和Lambda构建实时聊天应用程序
文章图片
image.png 需求背景
基于AWS APIGateway的微服务架构下,需要实现 “服务器端到客户端” 的通知推送实时交互功能。
AWS APIGateway RestfulAPI 是HTTP协议的,而 “服务器端到客户端” 采用的Websocket是TCP协议的。
由于协议问题,AWS APIGateway RestfulAPI 被PASS,后发现AWS APIGateway WebsocketAPI 已悄悄上线,故决定使用AWS APIGateway WebsocketAPI搭建一个实时聊天系统,从而确定该方案的可行性。
架构方案
采用 API Gateway Websocket API +Lambda + DynamoDB 搭建一个实时聊天程序,如下为基础架构图:
【原】使用AWS|【原】使用AWS API Gateway WebSockets和Lambda构建实时聊天应用程序
文章图片
image.png 在我们的应用程序中,设备将连接到API网关。当设备连接时,lambda函数将在DynamoDB表中保存连接ID。当设备断开连接时,另一个lambda函数将在DynamoDB表中移除连接ID。在我们想要将消息发送回设备的实例中,第三个lambda函数将使用回调URL将连接ID??和POST数据发送回设备。
实现步骤
Step1 创建Gateway WebSocket API 登陆AWS Console 转到Amazon API Gateway服务,单击WebSocket以创建WebSocket API,提供API名称和路径选择表达式。在示例中,添加 $request.body.action作为选择表达式并点击Create API。

【原】使用AWS|【原】使用AWS API Gateway WebSockets和Lambda构建实时聊天应用程序
文章图片
image.png 【原】使用AWS|【原】使用AWS API Gateway WebSockets和Lambda构建实时聊天应用程序
文章图片
image.png 创建API后,我们将重定向到路由页面。在这里我们可以看到已经预定义的三条路线:$connect,$disconnect和 $default,我们还将创建一个自定义路由onMessage。
在我们的架构中,$connect和$disconnect routes实现以下任务:
$connect - 当调用此路由时,Lambda函数会将连接设备的连接ID添加到DynamoDB。
$disconnect - 调用此路由时,Lambda函数将从DynamoDB中删除已断开连接的设备的连接ID。
onMessage - 当调用此路由时,消息正文将被发送到当时连接的所有设备。
Step2 创建用于"存储连接设备ID"的DynamoDB 【原】使用AWS|【原】使用AWS API Gateway WebSockets和Lambda构建实时聊天应用程序
文章图片
image.png Step3 创建connect Lambda函数 ChatRoomConnectFunction 【原】使用AWS|【原】使用AWS API Gateway WebSockets和Lambda构建实时聊天应用程序
文章图片
image.png 在lambda函数的代码中添加以下代码,此代码将连接设备的连接ID添加到我们创建的DynamoDB表中:

const AWS = require('aws-sdk'); const ddb = new AWS.DynamoDB.DocumentClient(); exports.handler = (event, context, callback) => { const connectionId = event.requestContext.connectionId; addConnectionId(connectionId).then(() => { callback(null, { statusCode: 200, }) }); } function addConnectionId(connectionId) { return ddb.put({ TableName: 'Chat', Item: { connectionid : connectionId }, }).promise(); }

然后发布Lambda新版本

【原】使用AWS|【原】使用AWS API Gateway WebSockets和Lambda构建实时聊天应用程序
文章图片
image.png Step4 创建disconnect Lambda函数 ChatRoomDonnectFunction 【原】使用AWS|【原】使用AWS API Gateway WebSockets和Lambda构建实时聊天应用程序
文章图片
image.png 在lambda函数的代码中添加以下代码,当设备断开连接时,此代码将从DynamoDB表中删除连接ID:
const AWS = require('aws-sdk'); const ddb = new AWS.DynamoDB.DocumentClient(); exports.handler = (event, context, callback) => { const connectionId = event.requestContext.connectionId; deleteConnectionId(connectionId).then(() => { callback(null, { statusCode: 200, }) }); } function deleteConnectionId(connectionId) { return ddb.delete({ TableName: 'Chat', Key: { connectionid : connectionId, }, }).promise(); }

【【原】使用AWS|【原】使用AWS API Gateway WebSockets和Lambda构建实时聊天应用程序】然后发布Lambda新版本
【原】使用AWS|【原】使用AWS API Gateway WebSockets和Lambda构建实时聊天应用程序
文章图片
image.png Step5 配置 $connect和 $disconnect路由,测试WebSocket API是否正常工作 现在我们已经创建了DynamoDB表和两个lambda函数。在创建第三个lambda函数之前,让我们再回到API Gateway并使用我们创建的lambda函数配置路由。首先,单击$ connect route。作为集成类型,选择Lambda函数并选择ChatRoomConnectFunction。
【原】使用AWS|【原】使用AWS API Gateway WebSockets和Lambda构建实时聊天应用程序
文章图片
image.png 在$disconnect路由上执行相同的操作,其中lambda函数将是ChatRoomDonnectFunction:
【原】使用AWS|【原】使用AWS API Gateway WebSockets和Lambda构建实时聊天应用程序
文章图片
image.png 【原】使用AWS|【原】使用AWS API Gateway WebSockets和Lambda构建实时聊天应用程序
文章图片
image.png 为了方便调错,建议开启CloudWatch日志记录
【原】使用AWS|【原】使用AWS API Gateway WebSockets和Lambda构建实时聊天应用程序
文章图片
image.png 部署后,我们将看到两个URL。第一个URL称为WebSocket URL,第二个URL称为连接URL。
【原】使用AWS|【原】使用AWS API Gateway WebSockets和Lambda构建实时聊天应用程序
文章图片
image.png WebSocket URL是用于通过设备将WebSockets连接到API的URL。第二个URL,即Connection(连接)URL,向连接的客户端发送回调消息、获取连接信息或断开客户端连接。
使用wscat工具进行测试(使用 npm install -g wscat 安装)。
wscat -c wss://91ajt7fo78.execute-api.ap-northeast-2.amazonaws.com/dev

【原】使用AWS|【原】使用AWS API Gateway WebSockets和Lambda构建实时聊天应用程序
文章图片
image.png 查看DynamoDB可以看到connectionid
【原】使用AWS|【原】使用AWS API Gateway WebSockets和Lambda构建实时聊天应用程序
文章图片
image.png 控制台 ctrl+z 中断连接之后再次查看DynamoDB会发现刚才的connectionid已被删除:

【原】使用AWS|【原】使用AWS API Gateway WebSockets和Lambda构建实时聊天应用程序
文章图片
image.png Step6 实现发送消息多客户端接收功能 新增lambda函数ChatRoomOnMessageFunction,查询Chat DynamoDB表,获取所有连接ID,并将消息发送给这些连接ID对应的终端:
index.js
扫描DynamoDB以获取表中的所有可用记录,lambda函数将解析“message”属性并将其发送给其他终端:
const AWS = require('aws-sdk'); const ddb = new AWS.DynamoDB.DocumentClient(); require('./patch.js'); let send = undefined; function init(event) { console.log(event)const apigwManagementApi = new AWS.ApiGatewayManagementApi({ apiVersion: '2018-11-29', endpoint: event.requestContext.domainName + '/' + event.requestContext.stage }); send = async (connectionId, data) => { await apigwManagementApi.postToConnection({ ConnectionId: connectionId, Data: `Echo: ${data}` }).promise(); } }exports.handler =(event, context, callback) => { init(event); let message = JSON.parse(event.body).message getConnections().then((data) => { console.log(data.Items); data.Items.forEach(function(connection) { console.log("Connection " +connection.connectionid) send(connection.connectionid, message); }); }); return {} }; function getConnections(){ return ddb.scan({ TableName: 'Chat', }).promise(); }

patch.js
自动为我们的API创建回调URL并发送POST请求:
require('aws-sdk/lib/node_loader'); var AWS = require('aws-sdk/lib/core'); var Service = AWS.Service; var apiLoader = AWS.apiLoader; apiLoader.services['apigatewaymanagementapi'] = {}; AWS.ApiGatewayManagementApi = Service.defineService('apigatewaymanagementapi', ['2018-11-29']); Object.defineProperty(apiLoader.services['apigatewaymanagementapi'], '2018-11-29', { get: function get() { var model = { "metadata": { "apiVersion": "2018-11-29", "endpointPrefix": "execute-api", "signingName": "execute-api", "serviceFullName": "AmazonApiGatewayManagementApi", "serviceId": "ApiGatewayManagementApi", "protocol": "rest-json", "jsonVersion": "1.1", "uid": "apigatewaymanagementapi-2018-11-29", "signatureVersion": "v4" }, "operations": { "PostToConnection": { "http": { "requestUri": "/@connections/{connectionId}", "responseCode": 200 }, "input": { "type": "structure", "members": { "Data": { "type": "blob" }, "ConnectionId": { "location": "uri", "locationName": "connectionId" } }, "required": [ "ConnectionId", "Data" ], "payload": "Data" } } }, "shapes": {} } model.paginators = { "pagination": {} } return model; }, enumerable: true, configurable: true }); module.exports = AWS.ApiGatewayManagementApi;

在API Gateway中新建route onMessage,并指向ChatRoomOnMessageFunction Lambda函数:
【原】使用AWS|【原】使用AWS API Gateway WebSockets和Lambda构建实时聊天应用程序
文章图片
image.png 保存并点击右上角的按钮添加集成响应:
【原】使用AWS|【原】使用AWS API Gateway WebSockets和Lambda构建实时聊天应用程序
文章图片
image.png 部署API到相应阶段(如果部署的时候报错,请先给$default路由配置与onMessage一样的集成响应规则即可):
【原】使用AWS|【原】使用AWS API Gateway WebSockets和Lambda构建实时聊天应用程序
文章图片
image.png Step7 测试功能可用 多个终端,并与API Gateway WebsocketAPI 建立连接:
【原】使用AWS|【原】使用AWS API Gateway WebSockets和Lambda构建实时聊天应用程序
文章图片
image.png 终端发送内容
{"action" : "onMessage" , "message" : "Hello everyone"}

【原】使用AWS|【原】使用AWS API Gateway WebSockets和Lambda构建实时聊天应用程序
文章图片
image.png END
出现以上截图则说明Demo成功。
说明:
发送内容的action字段value值对应了API Gateway Websocket的路由名称,如上截图我们的action值为onMessage,则在发送消息的时候API Gateway Websocket会找onMessage路由对应的集成请求规则进行相应。
思考:
如何在以上Demo的思路之上进行延伸,使用API Gateway Websocket+Lambda+DynamoDB搭建带权限校验的公司内部公共终端推送微服务?如何可视化PC终端推送策略?

    推荐阅读