python-can库基于PCAN-USB使用方法
一、概述
1.介绍
python-can库为Python提供了控制器局域网的支持,为不同的硬件设备提供了通用的抽象,并提供了一套实用程序,用于在CAN总线上发送和接收消息。
支持硬件接口:
Name |
Documentation |
---|---|
"socketcan" |
SocketCAN |
"kvaser" |
Kvaser’s CANLIB |
"serial" |
CAN over Serial |
"slcan" |
CAN over Serial / SLCAN |
"ixxat" |
IXXAT Virtual CAN Interface |
"pcan" |
PCAN Basic API |
"usb2can" |
USB2CAN Interface |
"nican" |
NI-CAN |
"iscan" |
isCAN |
"neovi" |
neoVI |
"vector" |
Vector |
"virtual" |
Virtual |
"canalystii" |
CANalyst-II |
"systec" |
SYSTEC interface |
2.环境搭建
Python安装:https://www.python.org/ftp/python/3.7.9/python-3.7.9-amd64.exe
PCAN-USB驱动:https://www.peak-system.com/fileadmin/media/files/pcan-basic.zip
库:pip install python-can
3.参考文档
https://python-can.readthedocs.io/en/master/#
二、常用方法 1.接收报文
from can.interfaces.pcan.pcan import PcanBusdef bus_recv(): """轮询接收消息""" try: while True: msg = bus.recv(timeout=100) print(msg) except KeyboardInterrupt: passif __name__ == '__main__': bus = PcanBus(channel='PCAN_USBBUS1', bitrate=500000) bus_recv()
2.发送报文
from can.interfaces.pcan.pcan import PcanBusdef bus_send(): """can消息发送""" while True: time.sleep(0.02) try: bus.send(msg) print("消息发送 {}".format(bus.channel_info)) except can.CanError: print("消息未发送")if __name__ == '__main__': msg = can.Message(arbitration_id=0x181DFF00, data=https://www.it610.com/article/[0xEE, 0xFE, 0xFE, 0xFF, 0xFE, 0xFF, 0xFF, 0xFE], is_extended_id=True)# 报文 bus = PcanBus(channel='PCAN_USBBUS1', bitrate=500000) bus_send()
3.定期发送报文
def bus_send_periodic(): """周期发送报文""" print("开始每200毫秒发送一条消息。持续时间10s") task = bus.send_periodic(msg, 1.5)# 定期发送 if not isinstance(task, can.ModifiableCyclicTaskABC):# 断言task类型 print("此接口似乎不支持") task.stop() return time.sleep(5)# 持续时间 print("发送完成") print("更改运行任务的数据以99开头") msg.data[0] = 0x99 task.modify_data(msg)# 修改data首字节 time.sleep(10)task.stop() print("停止循环发送") print("将停止任务的数据更改为单个 ff 字节") msg.data = https://www.it610.com/article/bytearray([0xff])# 重新定向data msg.dlc = 1# 定义data长度 task.modify_data(msg)# 修改data time.sleep(10) print("重新开始") task.start()# 重新启动已停止的周期性任务 time.sleep(10) task.stop() print("完毕")if __name__ == '__main__': msg = can.Message(arbitration_id=0x181DFF00, data=https://www.it610.com/article/[0xEE, 0xFE, 0xFE, 0xFF, 0xFE, 0xFF, 0xFF, 0xFE], is_extended_id=True)# 报文 bus = PcanBus(channel='PCAN_USBBUS1', bitrate=500000) bus_send_periodic()
4.循环收发消息
from can.interfaces.pcan.pcan import PcanBusdef send_cyclic(stop_event): """循环发送消息""" print("开始每1秒发送1条消息") start_time = time.time() while not stop_event.is_set(): msg.timestamp = time.time() - start_time bus.send(msg) print(f"tx: {msg}") time.sleep(1) print("停止发送消息")def receive(stop_event): """循环接收消息""" print("开始接收消息") while not stop_event.is_set(): rx_msg = bus.recv(1) if rx_msg is not None: print(f"rx: {rx_msg}") print("停止接收消息")def send_and_recv_msg(): """发送一个消息并接收一个消息,需要双通道CAN""" stop_event = threading.Event() t_send_cyclic = threading.Thread(target=send_cyclic, args=(stop_event,)) t_receive = threading.Thread(target=receive, args=(stop_event,)) t_receive.start() t_send_cyclic.start() try: while True: time.sleep(0)# yield except KeyboardInterrupt: pass# 正常退出 stop_event.set() time.sleep(0.5)if __name__ == '__main__': msg = can.Message(arbitration_id=0x181DFF00, data=https://www.it610.com/article/[0xEE, 0xFE, 0xFE, 0xFF, 0xFE, 0xFF, 0xFF, 0xFE], is_extended_id=True)# 报文 bus = PcanBus(channel='PCAN_USBBUS1', bitrate=500000) send_and_recv_msg()
【python-can库基于PCAN-USB使用方法】
推荐阅读
- [iptables]|[iptables] 基于iptables实现的跨网络通信
- 基于Redis分布式BitMap的应用
- 基于多图卷积神经网络的车站级共享单车小时客流量预测(附下载链接)
- 大数据|基于图卷积堆叠的双向单向LSTM神经网络的地铁客流预测
- python环境配置-windows版
- Java刷题时常用的标准库数据结构和相应算法
- 数仓建模—埋点设计与管理
- 数据库系统概论 - 关系数据库
- 数据仓库|数据仓库也SaaS,Snowflake市值缘何超700亿美元()
- python|snowflake 数据库_Snowflake数据分析教程