import ctypes
import time
from typing import Dict, List, Optional, Tuple
import can
from ...exceptions import CanInitializationError
from .boa import *
[docs]class EtasBus(can.BusABC):
def __init__(
self,
channel: str,
can_filters: Optional[can.typechecking.CanFilters] = None,
receive_own_messages: bool = False,
bitrate: int = 1000000,
fd: bool = True,
data_bitrate: int = 2000000,
**kwargs: object,
):
self.receive_own_messages = receive_own_messages
nodeRange = CSI_NodeRange(CSI_NODE_MIN, CSI_NODE_MAX)
self.tree = ctypes.POINTER(CSI_Tree)()
CSI_CreateProtocolTree(ctypes.c_char_p(b""), nodeRange, ctypes.byref(self.tree))
oci_can_v = BOA_Version(1, 4, 0, 0)
self.ctrl = OCI_ControllerHandle()
OCI_CreateCANControllerNoSearch(
channel.encode(),
ctypes.byref(oci_can_v),
self.tree,
ctypes.byref(self.ctrl),
)
ctrlConf = OCI_CANConfiguration()
ctrlConf.baudrate = bitrate
ctrlConf.samplePoint = 80
ctrlConf.samplesPerBit = OCI_CAN_THREE_SAMPLES_PER_BIT
ctrlConf.BTL_Cycles = 10
ctrlConf.SJW = 1
ctrlConf.syncEdge = OCI_CAN_SINGLE_SYNC_EDGE
ctrlConf.physicalMedia = OCI_CAN_MEDIA_HIGH_SPEED
if receive_own_messages:
ctrlConf.selfReceptionMode = OCI_SELF_RECEPTION_ON
else:
ctrlConf.selfReceptionMode = OCI_SELF_RECEPTION_OFF
ctrlConf.busParticipationMode = OCI_BUSMODE_ACTIVE
if fd:
ctrlConf.canFDEnabled = True
ctrlConf.canFDConfig.dataBitRate = data_bitrate
ctrlConf.canFDConfig.dataBTL_Cycles = 10
ctrlConf.canFDConfig.dataSamplePoint = 80
ctrlConf.canFDConfig.dataSJW = 1
ctrlConf.canFDConfig.flags = 0
ctrlConf.canFDConfig.canFdTxConfig = OCI_CANFDTX_USE_CAN_AND_CANFD_FRAMES
ctrlConf.canFDConfig.canFdRxConfig.canRxMode = (
OCI_CAN_RXMODE_CAN_FRAMES_USING_CAN_MESSAGE
)
ctrlConf.canFDConfig.canFdRxConfig.canFdRxMode = (
OCI_CANFDRXMODE_CANFD_FRAMES_USING_CANFD_MESSAGE
)
ctrlProp = OCI_CANControllerProperties()
ctrlProp.mode = OCI_CONTROLLER_MODE_RUNNING
ec = OCI_OpenCANController(
self.ctrl, ctypes.byref(ctrlConf), ctypes.byref(ctrlProp)
)
if ec != 0x0 and ec != 0x40004000: # accept BOA_WARN_PARAM_ADAPTED
raise CanInitializationError(
f"OCI_OpenCANController failed with error 0x{ec:X}"
)
# RX
rxQConf = OCI_CANRxQueueConfiguration()
rxQConf.onFrame.function = ctypes.cast(None, OCI_CANRxCallbackFunctionSingleMsg)
rxQConf.onFrame.userData = None
rxQConf.onEvent.function = ctypes.cast(None, OCI_CANRxCallbackFunctionSingleMsg)
rxQConf.onEvent.userData = None
if receive_own_messages:
rxQConf.selfReceptionMode = OCI_SELF_RECEPTION_ON
else:
rxQConf.selfReceptionMode = OCI_SELF_RECEPTION_OFF
self.rxQueue = OCI_QueueHandle()
OCI_CreateCANRxQueue(
self.ctrl, ctypes.byref(rxQConf), ctypes.byref(self.rxQueue)
)
self._oci_filters = None
self.filters = can_filters
# TX
txQConf = OCI_CANTxQueueConfiguration()
txQConf.reserved = 0
self.txQueue = OCI_QueueHandle()
OCI_CreateCANTxQueue(
self.ctrl, ctypes.byref(txQConf), ctypes.byref(self.txQueue)
)
# Common
timerCapabilities = OCI_TimerCapabilities()
OCI_GetTimerCapabilities(self.ctrl, ctypes.byref(timerCapabilities))
self.tickFrequency = timerCapabilities.tickFrequency # clock ticks per second
# all timestamps are hardware timestamps relative to the CAN device powerup
# calculate an offset to make them relative to epoch
now = OCI_Time()
OCI_GetTimerValue(self.ctrl, ctypes.byref(now))
self.timeOffset = time.time() - (float(now.value) / self.tickFrequency)
self.channel_info = channel
def _recv_internal(
self, timeout: Optional[float]
) -> Tuple[Optional[can.Message], bool]:
ociMsgs = (ctypes.POINTER(OCI_CANMessageEx) * 1)()
ociMsg = OCI_CANMessageEx()
ociMsgs[0] = ctypes.pointer(ociMsg)
count = ctypes.c_uint32()
if timeout is not None: # wait for specified time
t = OCI_Time(round(timeout * self.tickFrequency))
else: # wait indefinitely
t = OCI_NO_TIME
OCI_ReadCANDataEx(
self.rxQueue,
t,
ociMsgs,
1,
ctypes.byref(count),
None,
)
msg = None
if count.value != 0:
if ociMsg.type == OCI_CANFDRX_MESSAGE.value:
ociRxMsg = ociMsg.data.canFDRxMessage
msg = can.Message(
timestamp=float(ociRxMsg.timeStamp) / self.tickFrequency
+ self.timeOffset,
arbitration_id=ociRxMsg.frameID,
is_extended_id=bool(ociRxMsg.flags & OCI_CAN_MSG_FLAG_EXTENDED),
is_remote_frame=bool(
ociRxMsg.flags & OCI_CAN_MSG_FLAG_REMOTE_FRAME
),
# is_error_frame=False,
# channel=None,
dlc=ociRxMsg.size,
data=ociRxMsg.data[0 : ociRxMsg.size],
is_fd=True,
is_rx=not bool(ociRxMsg.flags & OCI_CAN_MSG_FLAG_SELFRECEPTION),
bitrate_switch=bool(
ociRxMsg.flags & OCI_CAN_MSG_FLAG_FD_DATA_BIT_RATE
),
# error_state_indicator=False,
# check=False,
)
elif ociMsg.type == OCI_CAN_RX_MESSAGE.value:
ociRxMsg = ociMsg.data.rxMessage
msg = can.Message(
timestamp=float(ociRxMsg.timeStamp) / self.tickFrequency
+ self.timeOffset,
arbitration_id=ociRxMsg.frameID,
is_extended_id=bool(ociRxMsg.flags & OCI_CAN_MSG_FLAG_EXTENDED),
is_remote_frame=bool(
ociRxMsg.flags & OCI_CAN_MSG_FLAG_REMOTE_FRAME
),
# is_error_frame=False,
# channel=None,
dlc=ociRxMsg.dlc,
data=ociRxMsg.data[0 : ociRxMsg.dlc],
# is_fd=False,
is_rx=not bool(ociRxMsg.flags & OCI_CAN_MSG_FLAG_SELFRECEPTION),
# bitrate_switch=False,
# error_state_indicator=False,
# check=False,
)
return (msg, True)
[docs] def send(self, msg: can.Message, timeout: Optional[float] = None) -> None:
ociMsgs = (ctypes.POINTER(OCI_CANMessageEx) * 1)()
ociMsg = OCI_CANMessageEx()
ociMsgs[0] = ctypes.pointer(ociMsg)
if msg.is_fd:
ociMsg.type = OCI_CANFDTX_MESSAGE
ociTxMsg = ociMsg.data.canFDTxMessage
ociTxMsg.size = msg.dlc
else:
ociMsg.type = OCI_CAN_TX_MESSAGE
ociTxMsg = ociMsg.data.txMessage
ociTxMsg.dlc = msg.dlc
# set fields common to CAN / CAN-FD
ociTxMsg.frameID = msg.arbitration_id
ociTxMsg.flags = 0
if msg.is_extended_id:
ociTxMsg.flags |= OCI_CAN_MSG_FLAG_EXTENDED
if msg.is_remote_frame:
ociTxMsg.flags |= OCI_CAN_MSG_FLAG_REMOTE_FRAME
ociTxMsg.data = tuple(msg.data)
if msg.is_fd:
ociTxMsg.flags |= OCI_CAN_MSG_FLAG_FD_DATA
if msg.bitrate_switch:
ociTxMsg.flags |= OCI_CAN_MSG_FLAG_FD_DATA_BIT_RATE
OCI_WriteCANDataEx(self.txQueue, OCI_NO_TIME, ociMsgs, 1, None)
def _apply_filters(self, filters: Optional[can.typechecking.CanFilters]) -> None:
if self._oci_filters:
OCI_RemoveCANFrameFilterEx(self.rxQueue, self._oci_filters, 1)
# "accept all" filter
if filters is None:
filters = [{"can_id": 0x0, "can_mask": 0x0}]
self._oci_filters = (ctypes.POINTER(OCI_CANRxFilterEx) * len(filters))()
for i, filter in enumerate(filters):
f = OCI_CANRxFilterEx()
f.frameIDValue = filter["can_id"]
f.frameIDMask = filter["can_mask"]
f.tag = 0
f.flagsValue = 0
if self.receive_own_messages:
# mask out the SR bit, i.e. ignore the bit -> receive all
f.flagsMask = 0
else:
# enable the SR bit in the mask. since the bit is 0 in flagsValue -> do not self-receive
f.flagsMask = OCI_CAN_MSG_FLAG_SELFRECEPTION
if filter.get("extended"):
f.flagsValue |= OCI_CAN_MSG_FLAG_EXTENDED
f.flagsMask |= OCI_CAN_MSG_FLAG_EXTENDED
self._oci_filters[i].contents = f
OCI_AddCANFrameFilterEx(self.rxQueue, self._oci_filters, len(self._oci_filters))
[docs] def flush_tx_buffer(self) -> None:
OCI_ResetQueue(self.txQueue)
[docs] def shutdown(self) -> None:
# Cleanup TX
if self.txQueue:
OCI_DestroyCANTxQueue(self.txQueue)
self.txQueue = None
# Cleanup RX
if self.rxQueue:
OCI_DestroyCANRxQueue(self.rxQueue)
self.rxQueue = None
# Cleanup common
if self.ctrl:
OCI_CloseCANController(self.ctrl)
OCI_DestroyCANController(self.ctrl)
self.ctrl = None
if self.tree:
CSI_DestroyProtocolTree(self.tree)
self.tree = None
@property
def state(self) -> can.BusState:
status = OCI_CANControllerStatus()
OCI_GetCANControllerStatus(self.ctrl, ctypes.byref(status))
if status.stateCode & OCI_CAN_STATE_ACTIVE:
return can.BusState.ACTIVE
elif status.stateCode & OCI_CAN_STATE_PASSIVE:
return can.BusState.PASSIVE
@state.setter
def state(self, new_state: can.BusState) -> None:
# disabled, OCI_AdaptCANConfiguration does not allow changing the bus mode
# if new_state == can.BusState.ACTIVE:
# self.ctrlConf.busParticipationMode = OCI_BUSMODE_ACTIVE
# else:
# self.ctrlConf.busParticipationMode = OCI_BUSMODE_PASSIVE
# ec = OCI_AdaptCANConfiguration(self.ctrl, ctypes.byref(self.ctrlConf))
# if ec != 0x0:
# raise CanOperationError(f"OCI_AdaptCANConfiguration failed with error 0x{ec:X}")
raise NotImplementedError("Setting state is not implemented.")
def _detect_available_configs() -> List[can.typechecking.AutoDetectedConfig]:
nodeRange = CSI_NodeRange(CSI_NODE_MIN, CSI_NODE_MAX)
tree = ctypes.POINTER(CSI_Tree)()
CSI_CreateProtocolTree(ctypes.c_char_p(b""), nodeRange, ctypes.byref(tree))
nodes: Dict[str, str] = []
def _findNodes(tree, prefix):
uri = f"{prefix}/{tree.contents.item.uriName.decode()}"
if "CAN:" in uri:
nodes.append({"interface": "etas", "channel": uri})
elif tree.contents.child:
_findNodes(
tree.contents.child,
f"{prefix}/{tree.contents.item.uriName.decode()}",
)
if tree.contents.sibling:
_findNodes(tree.contents.sibling, prefix)
_findNodes(tree, "ETAS:/")
CSI_DestroyProtocolTree(tree)
return nodes