在sanic|在sanic 中使用 aio kafka
use aio kafka in sanic
一. producer
1. install
pip install aiokafka
2. initialization producer
eg:
@app.listener('before_server_start')
async def server_init(app, loop):
app.producer = AIOKafkaProducer(loop=loop, value_serializer=lambda v: json.dumps(v).encode('utf-8'),
bootstrap_servers=kafka_host)
await app.producer.start()
3. use
await app.producer.send("topic_name", dict)
二. consumer 1. initialization consumer
eg:
async def process(consumer):
async for msg in consumer:
await func(msg)# msg 处理函数 必须使用协程@app.listener("after_server_start")# 必须 after_server_start
async def after_server(app, loop):
app.consumer = AIOKafkaConsumer(
'user',
loop=loop, bootstrap_servers=kafka_host,
group_id="my-group4343")
await app.consumer.start()
await process(app.consumer)
2. use
【在sanic|在sanic 中使用 aio kafka】自定义协程
func
function推荐阅读
- 热闹中的孤独
- 你到家了吗
- Shell-Bash变量与运算符
- JS中的各种宽高度定义及其应用
- 闲杂“细雨”
- 杜月笙的口才
- 2021-02-17|2021-02-17 小儿按摩膻中穴-舒缓咳嗽
- 深入理解Go之generate
- 赢在人生六项精进二阶Day3复盘
- 祖母走了