"""
This wrapper is for windows or direct access via CANAL API.
Socket CAN is recommended under Unix/Linux systems.
"""
from ctypes import *
from enum import IntEnum
import logging
import can
from ...exceptions import error_check
from ...typechecking import StringPathLike
log = logging.getLogger("can.usb2can")
# type definitions
flags_t = c_ulong
pConfigureStr = c_char_p
handle_t = c_long
timeout_t = c_ulong
filter_t = c_ulong
# flags mappings
IS_ERROR_FRAME = 4
IS_REMOTE_FRAME = 2
IS_ID_TYPE = 1
[docs]class CanalError(IntEnum):
SUCCESS = 0
BAUDRATE = 1
BUS_OFF = 2
BUS_PASSIVE = 3
BUS_WARNING = 4
CAN_ID = 5
CAN_MESSAGE = 6
CHANNEL = 7
FIFO_EMPTY = 8
FIFO_FULL = 9
FIFO_SIZE = 10
FIFO_WAIT = 11
GENERIC = 12
HARDWARE = 13
INIT_FAIL = 14
INIT_MISSING = 15
INIT_READY = 16
NOT_SUPPORTED = 17
OVERRUN = 18
RCV_EMPTY = 19
REGISTER = 20
TRM_FULL = 21
ERRFRM_STUFF = 22
ERRFRM_FORM = 23
ERRFRM_ACK = 24
ERRFRM_BIT1 = 25
ERRFRM_BIT0 = 26
ERRFRM_CRC = 27
LIBRARY = 28
PROCADDRESS = 29
ONLY_ONE_INSTANCE = 30
SUB_DRIVER = 31
TIMEOUT = 32
NOT_OPEN = 33
PARAMETER = 34
MEMORY = 35
INTERNAL = 36
COMMUNICATION = 37
class CanalStatistics(Structure):
_fields_ = [
("ReceiveFrams", c_ulong),
("TransmistFrams", c_ulong),
("ReceiveData", c_ulong),
("TransmitData", c_ulong),
("Overruns", c_ulong),
("BusWarnings", c_ulong),
("BusOff", c_ulong),
]
stat = CanalStatistics
class CanalStatus(Structure):
_fields_ = [
("channel_status", c_ulong),
("lasterrorcode", c_ulong),
("lasterrorsubcode", c_ulong),
("lasterrorstr", c_byte * 80),
]
# data type for the CAN Message
class CanalMsg(Structure):
_fields_ = [
("flags", c_ulong),
("obid", c_ulong),
("id", c_ulong),
("sizeData", c_ubyte),
("data", c_ubyte * 8),
("timestamp", c_ulong),
]
[docs]class Usb2CanAbstractionLayer:
"""A low level wrapper around the usb2can library.
Documentation: http://www.8devices.com/media/products/usb2can/downloads/CANAL_API.pdf
"""
def __init__(self, dll: StringPathLike = "usb2can.dll") -> None:
"""
:param dll:
the path to the usb2can DLL to load
:raises ~can.exceptions.CanInterfaceNotImplementedError:
if the DLL could not be loaded
"""
try:
self.__m_dllBasic = windll.LoadLibrary(dll)
if self.__m_dllBasic is None:
raise Exception("__m_dllBasic is None")
except Exception as error:
message = f"DLL failed to load at path: {dll}"
raise can.CanInterfaceNotImplementedError(message) from error
[docs] def open(self, configuration: str, flags: int):
"""
Opens a CAN connection using `CanalOpen()`.
:param configuration:
the configuration: "device_id; baudrate"
:param flags:
the flags to be set
:returns:
Valid handle for CANAL API functions on success
:raises ~can.exceptions.CanInterfaceNotImplementedError:
if any error occurred
"""
try:
# we need to convert this into bytes, since the underlying DLL cannot
# handle non-ASCII configuration strings
config_ascii = configuration.encode("ascii", "ignore")
result = self.__m_dllBasic.CanalOpen(config_ascii, flags)
except Exception as ex:
# catch any errors thrown by this call and re-raise
raise can.CanInitializationError(
f'CanalOpen() failed, configuration: "{configuration}", error: {ex}'
)
else:
# any greater-than-zero return value indicates a success
# (see https://grodansparadis.gitbooks.io/the-vscp-daemon/canal_interface_specification.html)
# raise an error if the return code is <= 0
if result <= 0:
raise can.CanInitializationError(
f'CanalOpen() failed, configuration: "{configuration}"',
error_code=result,
)
else:
return result
[docs] def close(self, handle) -> CanalError:
with error_check("Failed to close"):
return CanalError(self.__m_dllBasic.CanalClose(handle))
[docs] def send(self, handle, msg) -> CanalError:
with error_check("Failed to transmit frame"):
return CanalError(self.__m_dllBasic.CanalSend(handle, msg))
[docs] def receive(self, handle, msg) -> CanalError:
with error_check("Receive error"):
return CanalError(self.__m_dllBasic.CanalReceive(handle, msg))
[docs] def blocking_send(self, handle, msg, timeout) -> CanalError:
with error_check("Blocking send error"):
return CanalError(self.__m_dllBasic.CanalBlockingSend(handle, msg, timeout))
[docs] def blocking_receive(self, handle, msg, timeout) -> CanalError:
with error_check("Blocking Receive Failed"):
return CanalError(
self.__m_dllBasic.CanalBlockingReceive(handle, msg, timeout)
)
[docs] def get_status(self, handle, status) -> CanalError:
with error_check("Get status failed"):
return CanalError(self.__m_dllBasic.CanalGetStatus(handle, status))
[docs] def get_statistics(self, handle, statistics) -> CanalError:
with error_check("Get Statistics failed"):
return CanalError(self.__m_dllBasic.CanalGetStatistics(handle, statistics))
[docs] def get_version(self):
with error_check("Failed to get version info"):
return self.__m_dllBasic.CanalGetVersion()
[docs] def get_library_version(self):
with error_check("Failed to get DLL version"):
return self.__m_dllBasic.CanalGetDllVersion()
[docs] def get_vendor_string(self):
with error_check("Failed to get vendor string"):
return self.__m_dllBasic.CanalGetVendorString()