在sanic|在sanic 中使用 aio kafka

use aio kafka in sanic 一. producer 1. install

pip install aiokafka

2. initialization producer
eg:
@app.listener('before_server_start') async def server_init(app, loop): app.producer = AIOKafkaProducer(loop=loop, value_serializer=lambda v: json.dumps(v).encode('utf-8'), bootstrap_servers=kafka_host) await app.producer.start()

3. use
await app.producer.send("topic_name", dict)

二. consumer 1. initialization consumer
eg:
async def process(consumer): async for msg in consumer: await func(msg)# msg 处理函数 必须使用协程@app.listener("after_server_start")# 必须 after_server_start async def after_server(app, loop): app.consumer = AIOKafkaConsumer( 'user', loop=loop, bootstrap_servers=kafka_host, group_id="my-group4343") await app.consumer.start() await process(app.consumer)

2. use
【在sanic|在sanic 中使用 aio kafka】自定义协程 func function

    推荐阅读