概述 deepstream-occupancy-analytics 项目提供了一种往kafka发送analytics统计数据的方法。但是所有的改动,特别是主程序是用C语言开发的。但写这篇文章的时候,在网上还没发现官方系统性的说明和解释,都是一些零碎的问答。
因此综合,以跨线统计为例,提供了一种python版本发送统计数据的方法,同时详细说明了需要改动和编译哪些C程序以及deepstream python bindings。可以参考定制化自己想要收集并发送的数据内容和格式。
nvidia论坛上有人回复说,在以后的release中,将会提供基于deepstream python发送自定义数据的功能
主要改动分为三点:
- 将自定义的数据结构追加到NvDsEventMsgMeta,例如将
lc_curr_straight
和lc_cum_straight
加入 - 修改eventmsg_payload程序,编译产生
libnvds_msgconv.so
- 同步更改bindschema.cpp, 编译deepstream python bindings
# line crossing current count of frame obj_lc_curr_cnt = user_meta_data.objLCCurrCnt # line crossing cumulative count obj_lc_cum_cnt = user_meta_data.objLCCumCnt msg_meta.lc_curr_straight = obj_lc_curr_cnt["straight"] msg_meta.lc_cum_straight = obj_lc_cum_cnt["straight"]
obj_lc_curr_cnt和obj_lc_cum_cnt的key在config_nvdsananlytics.txt中定义
还有一种更简单的方案。如果场景需求中,时延并不重要,也不需要同时处理大规模视频流的话,可以考虑使用 kafka-python 等python库,直接将获取到的analytics发送出去,不经过
nvmsgconv
和 nvmsgbroker
这两个插件。 如果时延重要,或者要处理大规模视频流,则需要参考下文微调一下C的源代码,重新编译,因为探针函数是阻塞的,并不适合在里面加入复杂的处理逻辑。运行环境
- nvidia-docker2
- deepstream-6.1
构建docker镜像并运行
- clone 该代码仓库, 在
deepstream_python_nvdsanalytics_to_kafka
目录, 运行sh docker/build.sh
构建镜像, e.g:sh docker/build.sh deepstream:6.1-triton-jupyter-python-custom
- 运行docker镜像并进入jupyter环境
docker run --gpusdevice=0-p 8888:8888 -d --shm-size=1g-w /opt/nvidia/deepstream/deepstream-6.1/sources/deepstream_python_apps/mount/-v ~/deepstream_python_nvdsanalytics_to_kafka/:/opt/nvidia/deepstream/deepstream-6.1/sources/deepstream_python_apps/mountdeepstream:6.1-triton-jupyter-python-custom
浏览器输入http://:8888
进入jupyter开发环境
- (可选) 在kubernetes的master节点, 运行
sh /docker/ds-jupyter-statefulset.sh
启动一个deepstream实例. 前提是集群已部署nvidia-device-plugin
/pyds_kafka_example/run.py
,主要参考 deepstream-test4
和 deepstream-nvdsanalytics
pipeline主要结构如下:
【python|利用deepstream python将analytics产生的统计数据发送到kafka】
文章图片
- 运行前,需要在
pyds_kafka_example/cfg_kafka.txt
里修改partition-key的值,设置为deviceId,这样nvmsgbroker插件会将消息体中deviceId对应的值设置为partition-key
- 安装java
apt update && apt install -y openjdk-11-jdk
- 如何没有单独的kafka集群,请参考在deepstream实例中部署kafka并创建topic
tar -xzf kafka_2.13-3.2.1.tgz cd kafka_2.13-3.2.1 bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties bin/kafka-topics.sh --create --topic ds-kafka --bootstrap-server localhost:9092
- 进入
pyds_kafka_example
目录运行deepstream python pipeline, e.g:
python3 run.py -i /opt/nvidia/deepstream/deepstream-6.1/samples/streams/sample_720p.h264 -p /opt/nvidia/deepstream/deepstream-6.1/lib/libnvds_kafka_proto.so --conn-str="localhost; 9092; ds-kafka" -s 0 --no-display
# go to kafka_2.13-3.2.1 directory and run bin/kafka-console-consumer.sh --topic ds-kafka --from-beginning --bootstrap-server localhost:9092
输入如下:
{ "messageid" : "34359fe1-fa36-4268-b6fc-a302dbab8be9", "@timestamp" : "2022-08-20T09:05:01.695Z", "deviceId" : "device_test", "analyticsModule" : { "id" : "XYZ", "description" : "\"Vehicle Detection and License Plate Recognition\"", "source" : "OpenALR", "version" : "1.0", "lc_curr_straight" : 1, "lc_cum_straight" : 39 } }
主要改动 在NvDsEventMsgMeta结构里添加analytics msg meta 在
nvdsmeta_schema.h
的232行,插入自定义的analytics msg meta到 NvDsEventMsgMeta
结构中guint lc_curr_straight; guint lc_cum_straight;
编译libnvds_msgconv.so
- deepstream_schema
在/opt/nvidia/deepstream/deepstream/sources/libs/nvmsgconv
目录中,nvmsgconv/deestream_schema/deepstream_schema.h
文件的93行,加入同样的analytics msg meta定义到NvDsAnalyticsObject
结构
guint lc_curr_straight; guint lc_cum_straight;
- eventmsg_payload
自定义消息体最重要的一步,在nvmsgconv/deepstream_schema/eventmsg_payload.cpp
文件的186行,给generate_analytics_module_object
函数加入自定义的analytics msg meta
// custom analytics data // json_object_set_int_member (analyticsObj, 消息体中的key, 消息体中的value); json_object_set_int_member (analyticsObj, "lc_curr_straight", meta->lc_curr_straight); json_object_set_int_member (analyticsObj, "lc_curr_straight", meta->lc_curr_straight); json_object_set_int_member (analyticsObj, "lc_cum_straight", meta->lc_cum_straight);
在536行generate_event_message
函数中,可以注释无效的消息,减小发送消息的大小
// // place object // placeObj = generate_place_object (privData, meta); // // sensor object // sensorObj = generate_sensor_object (privData, meta); // analytics object analyticsObj = generate_analytics_module_object (privData, meta); // // object object // objectObj = generate_object_object (privData, meta); // // event object // eventObj = generate_event_object (privData, meta); // root object rootObj = json_object_new (); json_object_set_string_member (rootObj, "messageid", msgIdStr); // json_object_set_string_member (rootObj, "mdsversion", "1.0"); json_object_set_string_member (rootObj, "@timestamp", meta->ts); // use the orginal params sensorStr in NvDsEventMsgMeta to accept deviceId that generated by python script json_object_set_string_member (rootObj, "deviceId", meta->sensorStr); // json_object_set_object_member (rootObj, "place", placeObj); // json_object_set_object_member (rootObj, "sensor", sensorObj); json_object_set_object_member (rootObj, "analyticsModule", analyticsObj); // not use these metadata // json_object_set_object_member (rootObj, "object", objectObj); // json_object_set_object_member (rootObj, "event", eventObj); // if (meta->videoPath) //json_object_set_string_member (rootObj, "videoPath", meta->videoPath); // else //json_object_set_string_member (rootObj, "videoPath", "");
- 重新编译自定义的libnvds_msgconv.so
cd /opt/nvidia/deepstream/deepstream/sources/libs/nvmsgconv \ && make \ && cp libnvds_msgconv.so /opt/nvidia/deepstream/deepstream/lib/libnvds_msgconv.so
/deepstream_python_apps/bindings/src/bindschema.cpp
中,加入对应的msg定义.def_readwrite("lc_curr_straight", &NvDsEventMsgMeta::lc_curr_straight) .def_readwrite("lc_cum_straight", &NvDsEventMsgMeta::lc_cum_straight);
接着编译deepstream python binding,并且通过pip安装。
推荐阅读
- python|Python3中的算术运算符
- Python-爬虫|Python爬虫(selenium)
- 职场经验|感觉Selenium不好学(那你可能需要的是Helium)
- 计算机视觉|基于matlab的图像形状与分类毕业设计(含源文)
- http|《彩虹屁》快夸夸我!彩虹屁生成器
- python|使用GitHub的action将每日天气推送到微信和QQ
- 每日推送情话
- opencv|python-opencv 图像处理基础 (二)高斯噪声+椒盐噪声+滤波
- ISP算法|图片添加高斯噪声和椒盐噪声python