"""
Interface for CANtact devices from Linklayer Labs
"""
import time
import logging
from unittest.mock import Mock
from can import BusABC, Message
from ..exceptions import (
CanInitializationError,
CanInterfaceNotImplementedError,
error_check,
)
logger = logging.getLogger(__name__)
try:
import cantact
except ImportError:
cantact = None
logger.warning(
"The CANtact module is not installed. Install it using `pip install cantact`"
)
[docs]class CantactBus(BusABC):
"""CANtact interface"""
@staticmethod
def _detect_available_configs():
try:
interface = cantact.Interface()
except (NameError, SystemError, AttributeError):
logger.debug(
"Could not import or instantiate cantact, so no configurations are available"
)
return []
channels = []
for i in range(0, interface.channel_count()):
channels.append({"interface": "cantact", "channel": f"ch:{i}"})
return channels
def __init__(
self,
channel,
bitrate=500000,
poll_interval=0.01,
monitor=False,
bit_timing=None,
_testing=False,
**kwargs,
):
"""
:param int channel:
Channel number (zero indexed, labeled on multi-channel devices)
:param int bitrate:
Bitrate in bits/s
:param bool monitor:
If true, operate in listen-only monitoring mode
:param BitTiming bit_timing:
Optional BitTiming to use for custom bit timing setting. Overrides bitrate if not None.
"""
if _testing:
self.interface = MockInterface()
else:
if cantact is None:
raise CanInterfaceNotImplementedError(
"The CANtact module is not installed. Install it using `python -m pip install cantact`"
)
with error_check(
"Cannot create the cantact.Interface", CanInitializationError
):
self.interface = cantact.Interface()
self.channel = int(channel)
self.channel_info = f"CANtact: ch:{channel}"
# Configure the interface
with error_check("Cannot setup the cantact.Interface", CanInitializationError):
if bit_timing is None:
# use bitrate
self.interface.set_bitrate(int(channel), int(bitrate))
else:
# use custom bit timing
self.interface.set_bit_timing(
int(channel),
int(bit_timing.brp),
int(bit_timing.tseg1),
int(bit_timing.tseg2),
int(bit_timing.sjw),
)
self.interface.set_enabled(int(channel), True)
self.interface.set_monitor(int(channel), monitor)
self.interface.start()
super().__init__(
channel=channel, bitrate=bitrate, poll_interval=poll_interval, **kwargs
)
def _recv_internal(self, timeout):
with error_check("Cannot receive message"):
frame = self.interface.recv(int(timeout * 1000))
if frame is None:
# timeout occurred
return None, False
msg = Message(
arbitration_id=frame["id"],
is_extended_id=frame["extended"],
timestamp=frame["timestamp"],
is_remote_frame=frame["rtr"],
dlc=frame["dlc"],
data=frame["data"][: frame["dlc"]],
channel=frame["channel"],
is_rx=(not frame["loopback"]), # received if not loopback frame
)
return msg, False
[docs] def send(self, msg, timeout=None):
with error_check("Cannot send message"):
self.interface.send(
self.channel,
msg.arbitration_id,
bool(msg.is_extended_id),
bool(msg.is_remote_frame),
msg.dlc,
msg.data,
)
[docs] def shutdown(self):
super().shutdown()
with error_check("Cannot shutdown interface"):
self.interface.stop()
def mock_recv(timeout):
if timeout > 0:
return {
"id": 0x123,
"extended": False,
"timestamp": time.time(),
"loopback": False,
"rtr": False,
"dlc": 8,
"data": [1, 2, 3, 4, 5, 6, 7, 8],
"channel": 0,
}
else:
# simulate timeout when timeout = 0
return None
class MockInterface:
"""
Mock interface to replace real interface when testing.
This allows for tests to run without actual hardware.
"""
start = Mock()
set_bitrate = Mock()
set_bit_timing = Mock()
set_enabled = Mock()
set_monitor = Mock()
stop = Mock()
send = Mock()
channel_count = Mock(return_value=1)
recv = Mock(side_effect=mock_recv)