"""
This module works with CAN data in ASCII log files (*.log).
It is is compatible with "candump -L" from the canutils program
(https://github.com/linux-can/can-utils).
"""
import logging
from typing import Generator, TextIO, Union, Any
from can.message import Message
from .generic import FileIOMessageWriter, MessageReader
from ..typechecking import StringPathLike
log = logging.getLogger("can.io.canutils")
CAN_MSG_EXT = 0x80000000
CAN_ERR_FLAG = 0x20000000
CAN_ERR_BUSERROR = 0x00000080
CAN_ERR_DLC = 8
CANFD_BRS = 0x01
CANFD_ESI = 0x02
[docs]class CanutilsLogReader(MessageReader):
"""
Iterator over CAN messages from a .log Logging File (candump -L).
.. note::
.log-format looks for example like this:
``(0.0) vcan0 001#8d00100100820100``
"""
file: TextIO
def __init__(
self,
file: Union[StringPathLike, TextIO],
*args: Any,
**kwargs: Any,
) -> None:
"""
:param file: a path-like object or as file-like object to read from
If this is a file-like object, is has to opened in text
read mode, not binary read mode.
"""
super().__init__(file, mode="r")
def __iter__(self) -> Generator[Message, None, None]:
for line in self.file:
# skip empty lines
temp = line.strip()
if not temp:
continue
channel_string: str
if temp[-2:].lower() in (" r", " t"):
timestamp_string, channel_string, frame, is_rx_string = temp.split()
is_rx = is_rx_string.strip().lower() == "r"
else:
timestamp_string, channel_string, frame = temp.split()
is_rx = True
timestamp = float(timestamp_string[1:-1])
can_id_string, data = frame.split("#", maxsplit=1)
channel: Union[int, str]
if channel_string.isdigit():
channel = int(channel_string)
else:
channel = channel_string
is_extended = len(can_id_string) > 3
can_id = int(can_id_string, 16)
is_fd = False
brs = False
esi = False
if data and data[0] == "#":
is_fd = True
fd_flags = int(data[1])
brs = bool(fd_flags & CANFD_BRS)
esi = bool(fd_flags & CANFD_ESI)
data = data[2:]
if data and data[0].lower() == "r":
is_remote_frame = True
if len(data) > 1:
dlc = int(data[1:])
else:
dlc = 0
data_bin = None
else:
is_remote_frame = False
dlc = len(data) // 2
data_bin = bytearray()
for i in range(0, len(data), 2):
data_bin.append(int(data[i : (i + 2)], 16))
if can_id & CAN_ERR_FLAG and can_id & CAN_ERR_BUSERROR:
msg = Message(timestamp=timestamp, is_error_frame=True)
else:
msg = Message(
timestamp=timestamp,
arbitration_id=can_id & 0x1FFFFFFF,
is_extended_id=is_extended,
is_remote_frame=is_remote_frame,
is_fd=is_fd,
is_rx=is_rx,
bitrate_switch=brs,
error_state_indicator=esi,
dlc=dlc,
data=data_bin,
channel=channel,
)
yield msg
self.stop()
[docs]class CanutilsLogWriter(FileIOMessageWriter):
"""Logs CAN data to an ASCII log file (.log).
This class is is compatible with "candump -L".
If a message has a timestamp smaller than the previous one (or 0 or None),
it gets assigned the timestamp that was written for the last message.
It the first message does not have a timestamp, it is set to zero.
"""
def __init__(
self,
file: Union[StringPathLike, TextIO],
channel: str = "vcan0",
append: bool = False,
*args: Any,
**kwargs: Any,
):
"""
:param file: a path-like object or as file-like object to write to
If this is a file-like object, is has to opened in text
write mode, not binary write mode.
:param channel: a default channel to use when the message does not
have a channel set
:param bool append: if set to `True` messages are appended to
the file, else the file is truncated
"""
mode = "a" if append else "w"
super().__init__(file, mode=mode)
self.channel = channel
self.last_timestamp = None
[docs] def on_message_received(self, msg):
# this is the case for the very first message:
if self.last_timestamp is None:
self.last_timestamp = msg.timestamp or 0.0
# figure out the correct timestamp
if msg.timestamp is None or msg.timestamp < self.last_timestamp:
timestamp = self.last_timestamp
else:
timestamp = msg.timestamp
channel = msg.channel if msg.channel is not None else self.channel
if isinstance(channel, int) or isinstance(channel, str) and channel.isdigit():
channel = f"can{channel}"
framestr = "(%f) %s" % (timestamp, channel)
if msg.is_error_frame:
framestr += " %08X#" % (CAN_ERR_FLAG | CAN_ERR_BUSERROR)
elif msg.is_extended_id:
framestr += " %08X#" % (msg.arbitration_id)
else:
framestr += " %03X#" % (msg.arbitration_id)
if msg.is_error_frame:
eol = "\n"
else:
eol = " R\n" if msg.is_rx else " T\n"
if msg.is_remote_frame:
framestr += f"R{eol}"
else:
if msg.is_fd:
fd_flags = 0
if msg.bitrate_switch:
fd_flags |= CANFD_BRS
if msg.error_state_indicator:
fd_flags |= CANFD_ESI
framestr += "#%X" % fd_flags
framestr += f"{msg.data.hex().upper()}{eol}"
self.file.write(framestr)