Python Flask如何使用Webhook:了解如何在 Flask、Redis、SocketIO 和 Python 中的其他库的帮助下,通过使用 webhook 来创建具有实时图表的流应用程序,包括一些Webhook用法示例。
Webhook用法教程介绍Webhook可以被认为是一种由事件而不是请求驱动的API。Webhook 是一种服务,它允许一个程序在发生特定事件时立即将数据发送到另一个程序,而不是一个应用程序向另一个应用程序发出请求以接收响应。
Webhook 有时被称为反向 API,因为通信是由发送数据的应用程序而不是接收数据的应用程序发起的。随着 Web 服务变得越来越互联,Webhooks 正在将更多的行动视为一种轻量级解决方案,用于启用实时通知和数据更新,而无需开发完整的 API。
Webhooks 通常充当较小数据的信使。它们有助于从服务器端应用程序向客户端应用程序发送消息、警报、通知和实时信息。
例如,假设你希望你的应用程序在提及某个帐户并包含特定主题标签的推文发布时得到通知。与你的应用程序不断向 Twitter 询问满足这些条件的新帖子不同,Twitter 仅在发生此类事件时向你的应用程序发送通知更有意义。
这是 webhook 的目的,而不必重复请求数据(轮询机制),接收应用程序可以坐下来获得它需要的东西,而不必向另一个系统发送重复的请求。
Webhooks 可以开启很多可能性:
- 你可以使用 webhook 将支付网关与你的电子邮件营销软件连接起来,以便在付款退回时向用户发送电子邮件。
- 你可以使用 Webhook 在其他应用程序中同步客户数据。例如,如果用户更改了他的电子邮件地址,你可以确保该更改也反映在你的 CRM 中。
- 你还可以使用 Webhook 将有关事件的信息发送到外部数据库或数据仓库,例如 Amazon 的 Redshift 或 Google Big Query,以进行进一步分析。
- 一个 webhooks 生成器,它模拟内部或外部服务向预先配置的 webhook 端点发出任务。
- 接收这些事件/任务的通知消息的 Webhook 侦听器。收到后,这些票将被呈现并转换为条形图,以生成有价值的见解。图表降低了数据的复杂性,并使任何用户都更容易理解。
Python Flask如何使用Webhook:处理流程图
文章图片
先决条件根据我们的要求,以下组件发挥作用:
- Redis是一种开源的高级键值存储,是构建高性能、可扩展的 Web 应用程序的合适解决方案。Redis 具有三个主要特点,使其与众不同:
- Redis 将其数据库完全保存在内存中,仅将磁盘用于持久性。
- 与许多其他键值数据存储相比,Redis 具有相对丰富的数据类型集。
- Redis 可以将数据复制到任意数量的从站。安装 Redis 不在本教程的范围内,但你可以查看本教程以在 Windows 上安装它。
- Socket.IO是一个用于实时 Web 应用程序的 JavaScript 库。它支持 Web 客户端和服务器之间的实时、双向通信。它有两个部分:一个在浏览器中运行的客户端库和一个服务器端库。
- Faker是一个为你生成假数据的 Python 包。无论你是需要引导数据库、创建美观的 XML 文档、填充持久性以对其进行压力测试,还是对从生产服务中获取的数据进行匿名化,Fake 都是你的正确选择。
- ChartJS是一个开源 Javascript 库,允许你使用 HTML5 画布元素绘制不同类型的图表。HTML5 元素提供了一种使用 Javascript 绘制图形的简单而强大的方法。该库支持 8 种不同类型的图形:线条、条形、甜甜圈、馅饼、雷达、极地区域、气泡和散射。
- Flask是一个用 Python 编写的微型 Web 框架。
相关: 在 Python 中使用 Celery 的异步任务。
Webhook用法教程:设置
设置包非常简单明了。当然,你需要在系统上安装 Python 3,强烈建议设置一个虚拟环境,我们将在其中安装所需的库:
$ pip install Faker==8.2.0 Flask==1.1.2 Flask-SocketIO==5.0.1 redis==3.5.3 requests==2.25.1
在本教程结束时,我们的文件夹结构将如下所示:
文章图片
让我们开始编写实际的代码。首先,让我们为我们的应用程序定义配置参数
config.py
:#Application configuration File
################################
#Secret key that will be used by Flask for securely signing the session cookie
# and can be used for other security related needs
SECRET_KEY = 'SECRET_KEY'
#######################################
#Minimum Number Of Tasks To Generate
MIN_NBR_TASKS = 1
#Maximum Number Of Tasks To Generate
MAX_NBR_TASKS = 100
#Time to wait when producing tasks
WAIT_TIME = 1
#Webhook endpoint Mapping to the listener
WEBHOOK_RECEIVER_URL = 'http://localhost:5001/consumetasks'
#######################################
#Map to the REDIS Server Port
BROKER_URL = 'redis://localhost:6379'
#######################################
接下来,为我们的任务和 webhooks 生产者创建一个初始化文件
init_producer.py
:# init_producer.py
from flask import Flask#Create a Flask instance
app = Flask(__name__)#Load Flask configurations from config.py
app.secret_key = app.config[
'SECRET_KEY']
app.config.from_object("config")
现在让我们编写使用Faker模块生成任务所需的代码:
# tasks_producer.py
import random
from faker.providers import BaseProvider
from faker import Faker
import config
import time
import requests
import json
import uuid# Define a TaskProvider
class TaskProvider(BaseProvider):
def task_priority(self):
severity_levels = [
'Low', 'Moderate', 'Major', 'Critical'
]
return severity_levels[
random.randint(0, len(severity_levels)-1)]# Create a Faker instance and seeding to have the same results every time we execute the script
# Return data in English
fakeTasks = Faker('en_US')
# Seed the Faker instance to have the same results every time we run the program
fakeTasks.seed_instance(0)
# Assign the TaskProvider to the Faker instance
fakeTasks.add_provider(TaskProvider)# Generate A Fake Task
def produce_task(batchid, taskid):
# Message composition
message = {
'batchid': batchid, 'id': taskid, 'owner': fakeTasks.unique.name(), 'priority': fakeTasks.task_priority()
# ,'raised_date':fakeTasks.date_time_this_year()
# ,'description':fakeTasks.text()
}
return messagedef send_webhook(msg):
"""
Send a webhook to a specified URL
:param msg: task details
:return:
"""
try:
# Post a webhook message
# default is a function applied to objects that are not serializable = it converts them to str
resp = requests.post(config.WEBHOOK_RECEIVER_URL, data=https://www.lsbin.com/json.dumps(
msg, sort_keys=True, default=str), headers={'Content-Type': 'application/json'}, timeout=1.0)
# Returns an HTTPError if an error has occurred during the process (used for debugging).
resp.raise_for_status()
except requests.exceptions.HTTPError as err:
#print("An HTTP Error occurred",repr(err))
pass
except requests.exceptions.ConnectionError as err:
#print("An Error Connecting to the API occurred", repr(err))
pass
except requests.exceptions.Timeout as err:
#print("A Timeout Error occurred", repr(err))
pass
except requests.exceptions.RequestException as err:
#print("An Unknown Error occurred", repr(err))
pass
except:
pass
else:
return resp.status_code# Generate A Bunch Of Fake Tasks
def produce_bunch_tasks():
"""
Generate a Bunch of Fake Tasks
"""
n = random.randint(config.MIN_NBR_TASKS, config.MAX_NBR_TASKS)
batchid = str(uuid.uuid4())
for i in range(n):
msg = produce_task(batchid, i)
resp = send_webhook(msg)
time.sleep(config.WAIT_TIME)
print(i, "out of ", n, " -- Status", resp, " -- Message = ", msg)
yield resp, n, msgif __name__ == "__main__":
for resp, total, msg in produce_bunch_tasks():
pass
Python Flask如何使用Webhook?上面的代码利用Faker模块来创建一个虚构的随机任务流,并为每个生成的任务发送一个 webhook 到
WEBHOOK_RECEIVER_URL
我们之前在配置文件中定义的端点config.py
。每批中生成的任务数将是由阈值控制
MIN_NBR_TASKS
并MAX_NBR_TASKS
在 中定义的随机数config.py
。网络挂接JSON消息由以下属性:
batchid
,
taskid
,
owner
和
priority
。生成的每批任务都将由一个名为 的唯一引用标识
batchid
。任务优先级将限于预先选择的选项:低、中、高和严重。
Webhook用法示例:上面代码的主要用途是
produce_bunch_tasks()
函数,它是一个生成器,产生以下内容:- 发出的 webhook 的状态。
- 产生的任务总数。
- 生成的 webhook 消息。
tasks_producer.py
程序:$ python tasks_producer.py
你应该会看到类似于以下内容的输出:
文章图片
现在让我们构建模拟服务生成任务的 Flask 应用程序:
#app_producer.py
from flask import Response, render_template
from init_producer import app
import tasks_producerdef stream_template(template_name, **context):
app.update_template_context(context)
t = app.jinja_env.get_template(template_name)
rv = t.stream(context)
rv.enable_buffering(5)
return rv@app.route("/", methods=[
'GET'])
def index():
return render_template('producer.html')@app.route('/producetasks', methods=[
'POST'])
def producetasks():
print("producetasks")
return Response(stream_template('producer.html', data= https://www.lsbin.com/tasks_producer.produce_bunch_tasks() ))if __name__ =="__main__":
app.run(host="localhost",port=5000, debug=True)
在这个flask应用程序中,我们定义了两条主要路线:
"/"
: 呈现模板网页 (producer.html)"/producetasks"
:调用函数produce_bunch_tasks()
并将生成的任务流传输到 Flask 应用程序。
producer.html
文件:<
!doctype html>
<
html>
<
head>
<
title>Tasks Producer<
/title>
<
style>
.content {
width: 100%;
}
.container{
max-width: none;
}
<
/style>
<
meta name="viewport" content="width=device-width, initial-scale=1.0"/>
<
/head>
<
body class="container">
<
div class="content">
<
form method='post' id="produceTasksForm" action = "/producetasks">
<
button style="height:20%x;
width:100%" type="submit" id="produceTasks">Produce Tasks<
/button>
<
/form>
<
/div>
<
div class="content">
<
div id="Messages" class="content" style="height:400px;
width:100%;
border:2px solid gray;
overflow-y:scroll;
"><
/div>
{% for rsp,total, msg in data: %}
<
script>
var rsp= "{{ rsp }}";
var total = "{{ total }}";
var msg= "{{ msg }}";
var lineidx = "{{ loop.index }}";
//If the webhook request succeeds color it in blue else in red.
if (rsp == '200') {
rsp = rsp.fontcolor("blue");
}
else {
rsp = rsp.fontcolor("red");
}
//Add the details of the generated task to the Messages section.
document.getElementById('Messages').innerHTML += "<
br>" + lineidx+ " out of " + total + " -- "+ rsp + " -- " + msg;
<
/script>
{% endfor %}
<
/div>
<
/body>
<
/html>
三个变量被传递给这个模板文件:
total
:代表产生的任务总数。status
: 表示已调度 webhook 的状态。msg
:webhook JSON 消息。
现在我们的程序已经准备好了,让我们测试一下并检查生成的输出:
$ python app_producer.py
访问
http://localhost:5000
运行 Flask 实例的链接,按下按钮Produce Tasks,你将看到自动生成的连续随机任务流,如下面的屏幕所示:文章图片
文章图片
你会注意到已调度的 webhook 的响应状态等于
None
,并以红色显示,表示无法到达目的地。稍后,当我们激活任务使用者时,你将概述已分派的 webhook 的响应状态等于200
并以蓝色显示,表示成功到达 webhook 端点。Python Flask如何使用Webhook?现在,让我们为我们的任务使用者/处理程序创建初始化文件:
# init_consumer.py
from flask import Flask#Create a Flask instance
app = Flask(__name__)#Load Flask configurations from config.py
app.secret_key = app.config[
'SECRET_KEY']
app.config.from_object("config")#Setup the Flask SocketIO integration while mapping the Redis Server.
from flask_socketio import SocketIO
socketio = SocketIO(app,logger=True,engineio_logger=True,message_queue=app.config[
'BROKER_URL'])
接下来,让我们构建一个 Flask 应用程序来处理分派的 webhooks/任务。处理 webhooks 的第一步是构建自定义端点。此端点需要通过 POST 请求接收数据,并确认成功接收该数据:
#app_consumer.py
from flask import render_template, request,session
from flask_socketio import join_room
from init_consumer import app, socketio
import json
import uuid#Render the assigned template file
@app.route("/", methods=[
'GET'])
def index():
return render_template('consumer.html')# Sending Message through the websocket
def send_message(event, namespace, room, message):
# print("Message = ", message)
socketio.emit(event, message, namespace=namespace, room=room)# Registers a function to be run before the first request to this instance of the application
# Create a unique session ID and store it within the application configuration file
@app.before_first_request
def initialize_params():
if not hasattr(app.config,'uid'):
sid = str(uuid.uuid4())
app.config[
'uid'] = sid
print("initialize_params - Session ID stored =", sid)# Receive the webhooks and emit websocket events
@app.route('/consumetasks', methods=[
'POST'])
def consumetasks():
if request.method == 'POST':
data = request.json
if data:
print("Received Data = "https://www.lsbin.com/, data)
roomid =app.config['uid']
var = json.dumps(data)
send_message(event='msg', namespace='/collectHooks', room=roomid, message=var)
return 'OK'#Execute on connecting
@socketio.on('connect', namespace='/collectHooks')
def socket_connect():
# Display message upon connecting to the namespace
print('Client Connected To NameSpace /collectHooks - ', request.sid)#Execute on disconnecting
@socketio.on('disconnect', namespace='/collectHooks')
def socket_connect():
# Display message upon disconnecting from the namespace
print('Client disconnected From NameSpace /collectHooks - ', request.sid)#Execute upon joining a specific room
@socketio.on('join_room', namespace='/collectHooks')
def on_room():
if app.config[
'uid']:
room = str(app.config[
'uid'])
# Display message upon joining a room specific to the session previously stored.
print(f"Socket joining room {room}")
join_room(room)#Execute upon encountering any error related to the websocket
@socketio.on_error_default
def error_handler(e):
# Display message on error.
print(f"socket error: {e}, {str(request.event)}")#Run using port 5001
if __name__ == "__main__":
socketio.run(app,host='localhost', port=5001,debug=True)
Webhook用法教程:简而言之,我们执行了以下操作:
- 我们添加了一个函数
@app.before_first_request
,该函数在对应用程序的第一个请求之前运行一次,并在后续请求中被忽略。在这个函数中,我们创建了一个唯一的会话 ID 并将其存储在配置文件中,这个唯一的会话 ID 将用于在处理 Web 套接字通信时为每个用户分配一个独占空间。 - 我们定义了一个 webhook 监听器,
"/consumetasks"
它通过 POST 请求接收JSON数据,一旦接收到,它就会同时发出一个 web 套接字事件。 - 为了有效地管理我们通过网络 socker 的连接:
- 我们将为
/collectHooks
命名空间设置值(命名空间用于通过单个共享连接分隔服务器逻辑)。 - 我们将为每个用户会话分配一个专用房间(房间是命名空间的细分或子频道)。
- 我们将为
consumer.html
在templates
文件夹中创建并复制以下代码:<
!DOCTYPE html>
<
html lang="en">
<
head>
<
meta charset="UTF-8">
<
title>Tasks Consumer<
/title>
<
link rel="stylesheet" href="https://www.lsbin.com/{{url_for('static',filename='css/bootstrap.min.css')}}">
<
link rel="stylesheet" href="https://www.lsbin.com/{{url_for('static',filename='css/Chart.min.css')}}">
<
/head>
<
body>
<
div class="content">
<
div id="Messages" class="content" style="height:200px;
width:100%;
border:1px solid gray;
overflow-y:scroll;
"><
/div>
<
/div>
<
div class="container">
<
div class="row">
<
div class="col-12">
<
div class="card">
<
div class="card-body">
<
canvas id="canvas"><
/canvas>
<
/div>
<
/div>
<
/div>
<
/div>
<
/div>
<
!-- import the jquery library -->
<
script src="https://www.lsbin.com/{{%20url_for('static',filename='js/jquery.min.js')%20}}"><
/script>
<
!-- import the socket.io library -->
<
script src="https://www.lsbin.com/{{%20url_for('static',filename='js/socket.io.js')%20}}"><
/script>
<
!-- import the bootstrap library -->
<
script src="https://www.lsbin.com/{{%20url_for('static',filename='js/bootstrap.min.js')%20}}"><
/script>
<
!-- import the Chart library -->
<
script src="https://www.lsbin.com/{{%20url_for('static',filename='js/Chart.min.js')%20}}"><
/script>
<
script>
$(document).ready(function(){
const config = {
//Type of the chart - Bar Chart
type: 'bar',
//Data for our chart
data: {
labels: [
'Low','Moderate','Major','Critical'],
datasets: [
{
label: "Count Of Tasks",
//Setting a color for each bar
backgroundColor: [
'green','blue','yellow','red'],
borderColor: 'rgb(255, 99, 132)',
data: [
0,0,0,0],
fill: false,
}],
},
//Configuration options
options: {
responsive: true,
title: {
display: true,
text: 'Tasks Priority Matrix'
},
tooltips: {
mode: 'index',
intersect: false,
},
hover: {
mode: 'nearest',
intersect: true
},
scales: {
xAxes: [
{
display: true,
scaleLabel: {
display: true,
labelString: 'Priority'
}
}],
yAxes: [
{
display: true
,ticks: {
beginAtZero: true
}
,scaleLabel: {
display: true,
labelString: 'Total'
}
}]
}
}
};
const context = document.getElementById('canvas').getContext('2d');
//Creating the bar chart
const lineChart = new Chart(context, config);
//Reserved for websocket manipulation
var namespace='/collectHooks';
var url = 'http://' + document.domain + ':' + location.port + namespace;
var socket = io.connect(url);
//When connecting to the socket join the room
socket.on('connect', function() {
socket.emit('join_room');
});
//When receiving a message
socket.on('msg' , function(data) {
var msg = JSON.parse(data);
var newLine = $('<
li>'+ 'Batch ID. = ' + msg.batchid + ' -- Task ID. = ' + msg.id + ' -- Owner = ' + msg.owner + ' -- Priority = ' + msg.priority +'<
/li>');
newLine.css("color","blue");
$("#Messages").append(newLine);
//Retrieve the index of the priority of the received message
var lindex = config.data.labels.indexOf(msg.priority);
//Increment the value of the priority of the received message
config.data.datasets[
0].data[
lindex] += 1;
//Update the chart
lineChart.update();
});
});
<
/script>
<
/body>
<
/html>
Python Flask如何使用Webhook?上述模板包括以下内容:
- 用于显示接收到的任务或 webhook 的详细信息的部分消息。
- 一个条形图,显示每个优先级在整个 Web 套接字事件中接收到的任务总数。构建图表的步骤如下:
- 在显示图表的位置放置一个画布元素。
- 在 label 属性中指定优先级,该属性指示要比较的实例的名称。
- 初始化一个数据集属性,它定义了一个对象数组,每个对象都包含我们要比较的数据。
- 每当通过 Web 套接字传输和接收新的 Webhook 时,条形图都会同步更新。
- 打开终端并运行
app_producer.py
:$ python app_producer.py
复制 - 启动Redis服务器,确保Redis实例运行在TCP 6479端口。
- 打开另一个终端并运行
app_consumer.py
:$ python app_consumer.py
复制 - 打开浏览器并访问
http://localhost:5000
链接以可视化任务生产者:
文章图片
点击生成任务按钮,会自动生成一批任务,并逐渐显示在屏幕上,如下图:
文章图片
Webhook用法示例:现在在浏览器中打开另一个选项卡并访问
http://localhost:5001
以可视化任务消费者,任务将逐渐出现在消息部分,并且每当收到 webhook 时,条形图将自动更新:文章图片
将鼠标悬停在任何图表栏上时,会显示显示任务总数的工具提示:
文章图片
虽然 webhook 类似于 API,但它们都扮演着不同的角色,每个角色都有自己独特的用例。希望本文能扩展你的理解,并记住充分利用 webhooks 的关键是知道它们何时是你应用程序的正确选择。
推荐阅读
- 如何使用Python中的套接字在网络中传输文件()
- 如何在Python中压缩和解压缩文件(详细实现教程)
- 如何在Python中使用Pickle进行对象序列化()
- 如何使用ipaddress模块在Python中操作IP地址()
- 如何在Python中发送电子邮件(详细实现教程)
- 如何在Python中处理文件(代码示例详细教程)
- 如何在Python中生成随机数据(详细代码示例)
- 如何在Python中获取目录的大小(代码示例)
- 如何在Python中为IO任务使用线程(代码示例教程)