"""
Interface for Chinese Robotell compatible interfaces (win32/linux).
"""
import io
import time
import logging
from typing import Optional
from can import BusABC, Message
from ..exceptions import CanInterfaceNotImplementedError, CanOperationError
logger = logging.getLogger(__name__)
try:
import serial
except ImportError:
logger.warning(
"You won't be able to use the Robotell can backend without "
"the serial module installed!"
)
serial = None
[docs]class robotellBus(BusABC):
"""
robotell interface
"""
_PACKET_HEAD = 0xAA # Frame starts with 2x FRAME_HEAD bytes
_PACKET_TAIL = 0x55 # Frame ends with 2x FRAME_END bytes
_PACKET_ESC = (
0xA5 # Escape char before any HEAD, TAIL or ESC chat (including in checksum)
)
_CAN_CONFIG_CHANNEL = 0xFF # Configuration channel of CAN
_CAN_SERIALBPS_ID = 0x01FFFE90 # USB Serial port speed
_CAN_ART_ID = 0x01FFFEA0 # Automatic retransmission
_CAN_ABOM_ID = 0x01FFFEB0 # Automatic bus management
_CAN_RESET_ID = 0x01FFFEC0 # ID for initialization
_CAN_BAUD_ID = 0x01FFFED0 # CAN baud rate
_CAN_FILTER_BASE_ID = 0x01FFFEE0 # ID for first filter (filter0)
_CAN_FILTER_MAX_ID = 0x01FFFEE0 + 13 # ID for the last filter (filter13)
_CAN_INIT_FLASH_ID = 0x01FFFEFF # Restore factory settings
_CAN_READ_SERIAL1 = 0x01FFFFF0 # Read first part of device serial number
_CAN_READ_SERIAL2 = 0x01FFFFF1 # Read first part of device serial number
_MAX_CAN_BAUD = 1000000 # Maximum supported CAN baud rate
_FILTER_ID_MASK = 0x0000000F # Filter ID mask
_CAN_FILTER_EXTENDED = 0x40000000 # Enable mask
_CAN_FILTER_ENABLE = 0x80000000 # Enable filter
_CAN_STANDARD_FMT = 0 # Standard message ID
_CAN_EXTENDED_FMT = 1 # 29 Bit extended format ID
_CAN_DATA_FRAME = 0 # Send data frame
_CAN_REMOTE_FRAME = 1 # Request remote frame
def __init__(
self, channel, ttyBaudrate=115200, bitrate=None, rtscts=False, **kwargs
):
"""
:param str channel:
port of underlying serial or usb device (e.g. ``/dev/ttyUSB0``, ``COM8``, ...)
Must not be empty. Can also end with ``@115200`` (or similarly) to specify the baudrate.
:param int ttyBaudrate:
baudrate of underlying serial or usb device (Ignored if set via the ``channel`` parameter)
:param int bitrate:
CAN Bitrate in bit/s. Value is stored in the adapter and will be used as default if no bitrate is specified
:param bool rtscts:
turn hardware handshake (RTS/CTS) on and off
"""
if serial is None:
raise CanInterfaceNotImplementedError("The serial module is not installed")
if not channel: # if None or empty
raise TypeError("Must specify a serial port.")
if "@" in channel:
(channel, ttyBaudrate) = channel.split("@")
self.serialPortOrig = serial.serial_for_url(
channel, baudrate=ttyBaudrate, rtscts=rtscts
)
# Disable flushing queued config ACKs on lookup channel (for unit tests)
self._loopback_test = channel == "loop://"
self._rxbuffer = bytearray() # raw bytes from the serial port
self._rxmsg = [] # extracted CAN messages waiting to be read
self._configmsg = [] # extracted config channel messages
self._writeconfig(self._CAN_RESET_ID, 0) # Not sure if this is really necessary
if bitrate is not None:
self.set_bitrate(bitrate)
self.channel_info = (
f"Robotell USB-CAN s/n {self.get_serial_number(1)} on {channel}"
)
logger.info("Using device: %s", self.channel_info)
super().__init__(channel=channel, **kwargs)
[docs] def set_bitrate(self, bitrate):
"""
:raise ValueError: if *bitrate* is greater than 1000000
:param int bitrate:
Bitrate in bit/s
"""
if bitrate <= self._MAX_CAN_BAUD:
self._writeconfig(self._CAN_BAUD_ID, bitrate)
else:
raise ValueError(f"Invalid bitrate, must be less than {self._MAX_CAN_BAUD}")
[docs] def set_auto_retransmit(self, retrans_flag):
"""
:param bool retrans_flag:
Enable/disable automatic retransmission of unacknowledged CAN frames
"""
self._writeconfig(self._CAN_ART_ID, 1 if retrans_flag else 0)
[docs] def set_auto_bus_management(self, auto_man):
"""
:param bool auto_man:
Enable/disable automatic bus management
"""
# Not sure what "automatic bus management" does. Does not seem to control
# automatic ACK of CAN frames (listen only mode)
self._writeconfig(self._CAN_ABOM_ID, 1 if auto_man else 0)
[docs] def set_serial_rate(self, serial_bps):
"""
:param int serial_bps:
Set the baud rate of the serial port (not CAN) interface
"""
self._writeconfig(self._CAN_SERIALBPS_ID, serial_bps)
[docs] def set_hw_filter(self, filterid, enabled, msgid_value, msgid_mask, extended_msg):
"""
:raise ValueError: if *filterid* is not between 1 and 14
:param int filterid:
ID of filter (1-14)
:param bool enabled:
This filter is enabled
:param int msgid_value:
CAN message ID to filter on. The test unit does not accept an extented message ID unless bit 31 of the ID was set.
:param int msgid_mask:
Mask to apply to CAN messagge ID
:param bool extended_msg:
Filter operates on extended format messages
"""
if filterid < 1 or filterid > 14:
raise ValueError("Invalid filter ID. ID must be between 0 and 13")
else:
configid = self._CAN_FILTER_BASE_ID + (filterid - 1)
msgid_value += self._CAN_FILTER_ENABLE if enabled else 0
msgid_value += self._CAN_FILTER_EXTENDED if extended_msg else 0
self._writeconfig(configid, msgid_value, msgid_mask)
def _getconfigsize(self, configid):
if configid == self._CAN_ART_ID or configid == self._CAN_ABOM_ID:
return 1
if configid == self._CAN_BAUD_ID or configid == self._CAN_INIT_FLASH_ID:
return 4
if configid == self._CAN_SERIALBPS_ID:
return 4
if configid == self._CAN_READ_SERIAL1 or configid <= self._CAN_READ_SERIAL2:
return 8
if self._CAN_FILTER_BASE_ID <= configid <= self._CAN_FILTER_MAX_ID:
return 8
return 0
def _readconfig(self, configid, timeout):
self._writemessage(
msgid=configid,
msgdata=bytearray(8),
datalen=self._getconfigsize(configid),
msgchan=self._CAN_CONFIG_CHANNEL,
msgformat=self._CAN_EXTENDED_FMT,
msgtype=self._CAN_REMOTE_FRAME,
)
# Read message from config channel with result. Flush any previously pending config messages
newmsg = self._readmessage(not self._loopback_test, True, timeout)
if newmsg is None:
logger.warning(
f"Timeout waiting for response when reading config value {configid:04X}."
)
return None
return newmsg[4:12]
def _writeconfig(self, configid, value, value2=0):
configsize = self._getconfigsize(configid)
configdata = bytearray(configsize)
if configsize >= 1:
configdata[0] = value & 0xFF
if configsize >= 4:
configdata[1] = (value >> 8) & 0xFF
configdata[2] = (value >> 16) & 0xFF
configdata[3] = (value >> 24) & 0xFF
if configsize >= 8:
configdata[4] = value2 & 0xFF
configdata[5] = (value2 >> 8) & 0xFF
configdata[6] = (value2 >> 16) & 0xFF
configdata[7] = (value2 >> 24) & 0xFF
self._writemessage(
msgid=configid,
msgdata=configdata,
datalen=configsize,
msgchan=self._CAN_CONFIG_CHANNEL,
msgformat=self._CAN_EXTENDED_FMT,
msgtype=self._CAN_DATA_FRAME,
)
# Read message from config channel to verify. Flush any previously pending config messages
newmsg = self._readmessage(not self._loopback_test, True, 1)
if newmsg is None:
logger.warning(
"Timeout waiting for response when writing config value %d", configid
)
def _readmessage(self, flushold, cfgchannel, timeout):
header = bytearray([self._PACKET_HEAD, self._PACKET_HEAD])
terminator = bytearray([self._PACKET_TAIL, self._PACKET_TAIL])
msgqueue = self._configmsg if cfgchannel else self._rxmsg
if flushold:
del msgqueue[:]
# read what is already in serial port receive buffer - unless we are doing loopback testing
if not self._loopback_test:
while self.serialPortOrig.in_waiting:
self._rxbuffer += self.serialPortOrig.read()
# loop until we have read an appropriate message
start = time.time()
time_left = timeout
while True:
# make sure first bytes in RX buffer is a new packet header
headpos = self._rxbuffer.find(header)
if headpos > 0:
# data does not start with expected header bytes. Log error and ignore garbage
logger.warning("Ignoring extra " + str(headpos) + " garbage bytes")
del self._rxbuffer[:headpos]
headpos = self._rxbuffer.find(header) # should now be at index 0!
# check to see if we have a complete packet in the RX buffer
termpos = self._rxbuffer.find(terminator)
if headpos == 0 and termpos > headpos:
# copy packet into message structure and un-escape bytes
newmsg = bytearray()
idx = headpos + len(header)
while idx < termpos:
if self._rxbuffer[idx] == self._PACKET_ESC:
idx += 1
newmsg.append(self._rxbuffer[idx])
idx += 1
del self._rxbuffer[: termpos + len(terminator)]
# Check one - make sure message structure is the correct length
if len(newmsg) == 17:
# Check two - verify the checksum
cs = 0
for idx in range(16):
cs = (cs + newmsg[idx]) & 0xFF
if newmsg[16] == cs:
# OK, valid message - place it in the correct queue
if newmsg[13] == 0xFF: # Check for config channel
self._configmsg.append(newmsg)
else:
self._rxmsg.append(newmsg)
else:
logger.warning("Incorrect message checksum, discarded message")
else:
logger.warning(
"Invalid message structure length %d, ignoring message",
len(newmsg),
)
# Check if we have a message in the desired queue - if so copy and return
if len(msgqueue) > 0:
newmsg = msgqueue[0]
del msgqueue[:1]
return newmsg
# if we still don't have a complete message, do a blocking read
self.serialPortOrig.timeout = time_left
byte = self.serialPortOrig.read()
if byte:
self._rxbuffer += byte
# If there is time left, try next one with reduced timeout
if timeout is not None:
time_left = timeout - (time.time() - start)
if time_left <= 0:
return None
def _writemessage(self, msgid, msgdata, datalen, msgchan, msgformat, msgtype):
msgbuf = bytearray(17) # Message structure plus checksum byte
msgbuf[0] = msgid & 0xFF
msgbuf[1] = (msgid >> 8) & 0xFF
msgbuf[2] = (msgid >> 16) & 0xFF
msgbuf[3] = (msgid >> 24) & 0xFF
if msgtype == self._CAN_DATA_FRAME:
for idx in range(datalen):
msgbuf[idx + 4] = msgdata[idx]
msgbuf[12] = datalen
msgbuf[13] = msgchan
msgbuf[14] = msgformat
msgbuf[15] = msgtype
cs = 0
for idx in range(16):
cs = (cs + msgbuf[idx]) & 0xFF
msgbuf[16] = cs
packet = bytearray()
packet.append(self._PACKET_HEAD)
packet.append(self._PACKET_HEAD)
for msgbyte in msgbuf:
if (
msgbyte == self._PACKET_ESC
or msgbyte == self._PACKET_HEAD
or msgbyte == self._PACKET_TAIL
):
packet.append(self._PACKET_ESC)
packet.append(msgbyte)
packet.append(self._PACKET_TAIL)
packet.append(self._PACKET_TAIL)
self.serialPortOrig.write(packet)
self.serialPortOrig.flush()
def flush(self):
del self._rxbuffer[:]
del self._rxmsg[:]
del self._configmsg[:]
while self.serialPortOrig.in_waiting:
self.serialPortOrig.read()
def _recv_internal(self, timeout):
msgbuf = self._readmessage(False, False, timeout)
if msgbuf is not None:
msg = Message(
arbitration_id=msgbuf[0]
+ (msgbuf[1] << 8)
+ (msgbuf[2] << 16)
+ (msgbuf[3] << 24),
is_extended_id=(msgbuf[14] == self._CAN_EXTENDED_FMT),
timestamp=time.time(), # Better than nothing...
is_remote_frame=(msgbuf[15] == self._CAN_REMOTE_FRAME),
dlc=msgbuf[12],
data=msgbuf[4 : 4 + msgbuf[12]],
)
return msg, False
return None, False
[docs] def send(self, msg, timeout=None):
if timeout != self.serialPortOrig.write_timeout:
self.serialPortOrig.write_timeout = timeout
self._writemessage(
msg.arbitration_id,
msg.data,
msg.dlc,
0,
self._CAN_EXTENDED_FMT if msg.is_extended_id else self._CAN_STANDARD_FMT,
self._CAN_REMOTE_FRAME if msg.is_remote_frame else self._CAN_DATA_FRAME,
)
[docs] def shutdown(self):
super().shutdown()
self.serialPortOrig.close()
def fileno(self):
try:
return self.serialPortOrig.fileno()
except io.UnsupportedOperation:
raise NotImplementedError(
"fileno is not implemented using current CAN bus on this platform"
)
except Exception as exception:
raise CanOperationError("Cannot fetch fileno") from exception
[docs] def get_serial_number(self, timeout: Optional[int]) -> Optional[str]:
"""Get serial number of the slcan interface.
:param timeout:
seconds to wait for serial number or None to wait indefinitely
:return:
None on timeout or a str object.
"""
sn1 = self._readconfig(self._CAN_READ_SERIAL1, timeout)
if sn1 is None:
return None
sn2 = self._readconfig(self._CAN_READ_SERIAL2, timeout)
if sn2 is None:
return None
serial = ""
for idx in range(0, 8, 2):
serial += "{:02X}{:02X}-".format(sn1[idx], sn1[idx + 1])
for idx in range(0, 4, 2):
serial += "{:02X}{:02X}-".format(sn2[idx], sn2[idx + 1])
return serial[:-1]