""" Neousys CAN bus driver """
#
# This kind of interface can be found for example on Neousys POC-551VTC
# One needs to have correct drivers and DLL (Share object for Linux) from Neousys
#
# https://www.neousys-tech.com/en/support-service/resources/category/299-poc-551vtc-driver
#
# Beware this is only tested on Linux kernel higher than v5.3. This should be drop in
# with Windows but you have to replace with correct named DLL
#
# pylint: disable=too-few-public-methods
# pylint: disable=too-many-instance-attributes
# pylint: disable=wrong-import-position
import queue
import logging
import platform
from time import time
from ctypes import (
byref,
CFUNCTYPE,
c_ubyte,
c_uint,
c_ushort,
POINTER,
sizeof,
Structure,
)
try:
from ctypes import WinDLL
except ImportError:
from ctypes import CDLL
from can import BusABC, Message
from ...exceptions import (
CanInitializationError,
CanOperationError,
CanInterfaceNotImplementedError,
)
logger = logging.getLogger(__name__)
class NeousysCanSetup(Structure):
"""C CAN Setup struct"""
_fields_ = [
("bitRate", c_uint),
("recvConfig", c_uint),
("recvId", c_uint),
("recvMask", c_uint),
]
class NeousysCanMsg(Structure):
"""C CAN Message struct"""
_fields_ = [
("id", c_uint),
("flags", c_ushort),
("extra", c_ubyte),
("len", c_ubyte),
("data", c_ubyte * 8),
]
# valid:2~16, sum of the Synchronization, Propagation, and
# Phase Buffer 1 segments, measured in time quanta.
# valid:1~8, the Phase Buffer 2 segment in time quanta.
# valid:1~4, Resynchronization Jump Width in time quanta
# valid:1~1023, CAN_CLK divider used to determine time quanta
class NeousysCanBitClk(Structure):
"""C CAN BIT Clock struct"""
_fields_ = [
("syncPropPhase1Seg", c_ushort),
("phase2Seg", c_ushort),
("jumpWidth", c_ushort),
("quantumPrescaler", c_ushort),
]
NEOUSYS_CAN_MSG_CALLBACK = CFUNCTYPE(None, POINTER(NeousysCanMsg), c_uint)
NEOUSYS_CAN_STATUS_CALLBACK = CFUNCTYPE(None, c_uint)
NEOUSYS_CAN_MSG_EXTENDED_ID = 0x0004
NEOUSYS_CAN_MSG_REMOTE_FRAME = 0x0040
NEOUSYS_CAN_MSG_DATA_NEW = 0x0080
NEOUSYS_CAN_MSG_DATA_LOST = 0x0100
NEOUSYS_CAN_MSG_USE_ID_FILTER = 0x00000008
NEOUSYS_CAN_MSG_USE_DIR_FILTER = (
0x00000010 | NEOUSYS_CAN_MSG_USE_ID_FILTER
) # only accept the direction specified in the message type
NEOUSYS_CAN_MSG_USE_EXT_FILTER = (
0x00000020 | NEOUSYS_CAN_MSG_USE_ID_FILTER
) # filters on only extended identifiers
NEOUSYS_CAN_STATUS_BUS_OFF = 0x00000080
NEOUSYS_CAN_STATUS_EWARN = (
0x00000040 # can controller error level has reached warning level.
)
NEOUSYS_CAN_STATUS_EPASS = (
0x00000020 # can controller error level has reached error passive level.
)
NEOUSYS_CAN_STATUS_LEC_STUFF = 0x00000001 # a bit stuffing error has occurred.
NEOUSYS_CAN_STATUS_LEC_FORM = 0x00000002 # a formatting error has occurred.
NEOUSYS_CAN_STATUS_LEC_ACK = 0x00000003 # an acknowledge error has occurred.
NEOUSYS_CAN_STATUS_LEC_BIT1 = (
0x00000004 # the bus remained a bit level of 1 for longer than is allowed.
)
NEOUSYS_CAN_STATUS_LEC_BIT0 = (
0x00000005 # the bus remained a bit level of 0 for longer than is allowed.
)
NEOUSYS_CAN_STATUS_LEC_CRC = 0x00000006 # a crc error has occurred.
NEOUSYS_CAN_STATUS_LEC_MASK = (
0x00000007 # this is the mask for the can last error code (lec).
)
NEOUSYS_CANLIB = None
try:
if platform.system() == "Windows":
NEOUSYS_CANLIB = WinDLL("./WDT_DIO.dll")
else:
NEOUSYS_CANLIB = CDLL("libwdt_dio.so")
logger.info("Loaded Neousys WDT_DIO Can driver")
except OSError as error:
logger.info("Cannot load Neousys CAN bus dll or shared object: %s", error)
[docs]class NeousysBus(BusABC):
"""Neousys CAN bus Class"""
def __init__(self, channel, device=0, bitrate=500000, **kwargs):
"""
:param channel: channel number
:param device: device number
:param bitrate: bit rate.
"""
super().__init__(channel, **kwargs)
if NEOUSYS_CANLIB is None:
raise CanInterfaceNotImplementedError("Neousys WDT_DIO Can driver missing")
self.channel = channel
self.device = device
self.channel_info = f"Neousys Can: device {self.device}, channel {self.channel}"
self.queue = queue.Queue()
# Init with accept all and wanted bitrate
self.init_config = NeousysCanSetup(bitrate, NEOUSYS_CAN_MSG_USE_ID_FILTER, 0, 0)
self._neousys_recv_cb = NEOUSYS_CAN_MSG_CALLBACK(self._neousys_recv_cb)
self._neousys_status_cb = NEOUSYS_CAN_STATUS_CALLBACK(self._neousys_status_cb)
if NEOUSYS_CANLIB.CAN_RegisterReceived(0, self._neousys_recv_cb) == 0:
raise CanInitializationError("Neousys CAN bus Setup receive callback")
if NEOUSYS_CANLIB.CAN_RegisterStatus(0, self._neousys_status_cb) == 0:
raise CanInitializationError("Neousys CAN bus Setup status callback")
if (
NEOUSYS_CANLIB.CAN_Setup(
channel, byref(self.init_config), sizeof(self.init_config)
)
== 0
):
raise CanInitializationError("Neousys CAN bus Setup Error")
if NEOUSYS_CANLIB.CAN_Start(channel) == 0:
raise CanInitializationError("Neousys CAN bus Start Error")
[docs] def send(self, msg, timeout=None) -> None:
"""
:param msg: message to send
:param timeout: timeout is not used here
"""
tx_msg = NeousysCanMsg(
msg.arbitration_id, 0, 0, msg.dlc, (c_ubyte * 8)(*msg.data)
)
if NEOUSYS_CANLIB.CAN_Send(self.channel, byref(tx_msg), sizeof(tx_msg)) == 0:
raise CanOperationError("Neousys Can can't send message")
def _recv_internal(self, timeout):
try:
return self.queue.get(block=True, timeout=timeout), False
except queue.Empty:
return None, False
def _neousys_recv_cb(self, msg, sizeof_msg) -> None:
"""
:param msg: struct CAN_MSG
:param sizeof_msg: message number
"""
msg_bytes = bytearray(msg.contents.data)
remote_frame = bool(msg.contents.flags & NEOUSYS_CAN_MSG_REMOTE_FRAME)
extended_frame = bool(msg.contents.flags & NEOUSYS_CAN_MSG_EXTENDED_ID)
if msg.contents.flags & NEOUSYS_CAN_MSG_DATA_LOST:
logger.error("_neousys_recv_cb flag CAN_MSG_DATA_LOST")
msg = Message(
timestamp=time(),
arbitration_id=msg.contents.id,
is_remote_frame=remote_frame,
is_extended_id=extended_frame,
channel=self.channel,
dlc=msg.contents.len,
data=msg_bytes[: msg.contents.len],
)
# Reading happens in Callback function and
# with Python-CAN it happens polling
# so cache stuff in array to for poll
try:
self.queue.put(msg)
except queue.Full:
raise CanOperationError("Neousys message Queue is full") from None
def _neousys_status_cb(self, status) -> None:
"""
:param status: BUS Status
"""
logger.info("%s _neousys_status_cb: %d", self.init_config, status)
[docs] def shutdown(self):
super().shutdown()
NEOUSYS_CANLIB.CAN_Stop(self.channel)
@staticmethod
def _detect_available_configs():
# There is only one channel
return [{"interface": "neousys", "channel": 0}]