Internal API#
Here we document the odds and ends that are more helpful for creating your own interfaces or listeners but generally shouldn’t be required to interact with python-can.
BusABC#
The BusABC
class, as the name suggests, provides an abstraction of a CAN bus.
The bus provides a wrapper around a physical or virtual CAN Bus.
An interface specific instance of the BusABC
is created by the Bus
class, see Bus for the user facing API.
Extending the BusABC
class#
- Concrete implementations must implement the following:
send()
to send individual messages_recv_internal()
to receive individual messages (see note below!)set the
channel_info
attribute to a string describing the underlying bus and/or channel
- They might implement the following:
flush_tx_buffer()
to allow discarding any messages yet to be sentshutdown()
to override how the bus should shut down_send_periodic_internal()
to override the software based periodic sending and push it down to the kernel or hardware._apply_filters()
to apply efficient filters to lower level systems like the OS kernel or hardware._detect_available_configs()
to allow the interface to report which configurations are currently available for new connections.state()
property to allow reading and/or changing the bus state.
Note
TL;DR: Only override _recv_internal()
,
never recv()
directly.
Previously, concrete bus classes had to override recv()
directly instead of _recv_internal()
, but that has
changed to allow the abstract base class to handle in-software message
filtering as a fallback. All internal interfaces now implement that new
behaviour. Older (custom) interfaces might still be implemented like that
and thus might not provide message filtering:
Concrete instances are usually created by can.Bus
which takes the users
configuration into account.
Bus Internals#
Several methods are not documented in the main can.Bus
as they are primarily useful for library developers as opposed to
library users. This is the entire ABC bus class with all internal
methods:
- class can.BusABC(channel, can_filters=None, **kwargs)[source]#
The CAN Bus Abstract Base Class that serves as the basis for all concrete interfaces.
This class may be used as an iterator over the received messages and as a context manager for auto-closing the bus when done using it.
Please refer to Error Handling for possible exceptions that may be thrown by certain operations on this bus.
Construct and open a CAN bus instance of the specified type.
Subclasses should call though this method with all given parameters as it handles generic tasks like applying filters.
- Parameters
channel (Any) – The can interface identifier. Expected type is backend dependent.
can_filters (Optional[Sequence[Union[can.typechecking.CanFilter, can.typechecking.CanFilterExtended]]]) – See
set_filters()
for details.kwargs (dict) – Any backend dependent configurations are passed in this dictionary
- Raises
ValueError – If parameters are out of range
CanInterfaceNotImplementedError – If the driver cannot be accessed
CanInitializationError – If the bus cannot be initialized
- RECV_LOGGING_LEVEL = 9#
Log level for received messages
- abstract __init__(channel, can_filters=None, **kwargs)[source]#
Construct and open a CAN bus instance of the specified type.
Subclasses should call though this method with all given parameters as it handles generic tasks like applying filters.
- Parameters
channel (Any) – The can interface identifier. Expected type is backend dependent.
can_filters (Optional[Sequence[Union[can.typechecking.CanFilter, can.typechecking.CanFilterExtended]]]) – See
set_filters()
for details.kwargs (dict) – Any backend dependent configurations are passed in this dictionary
- Raises
ValueError – If parameters are out of range
CanInterfaceNotImplementedError – If the driver cannot be accessed
CanInitializationError – If the bus cannot be initialized
- __iter__()[source]#
Allow iteration on messages as they are received.
>>> for msg in bus: ... print(msg)
- Yields
Message
msg objects.- Return type
- __weakref__#
list of weak references to the object (if defined)
- _apply_filters(filters)[source]#
Hook for applying the filters to the underlying kernel or hardware if supported/implemented by the interface.
- Parameters
filters (Optional[Sequence[Union[can.typechecking.CanFilter, can.typechecking.CanFilterExtended]]]) – See
set_filters()
for details.- Return type
None
- static _detect_available_configs()[source]#
Detect all configurations/channels that this interface could currently connect with.
This might be quite time consuming.
May not to be implemented by every interface on every platform.
- Returns
an iterable of dicts, each being a configuration suitable for usage in the interface’s bus constructor.
- Return type
List[can.typechecking.AutoDetectedConfig]
- _matches_filters(msg)[source]#
Checks whether the given message matches at least one of the current filters. See
set_filters()
for details on how the filters work.This method should not be overridden.
- Parameters
msg (can.message.Message) – the message to check if matching
- Returns
whether the given message matches at least one filter
- Return type
- _recv_internal(timeout)[source]#
Read a message from the bus and tell whether it was filtered. This methods may be called by
recv()
to read a message multiple times if the filters set byset_filters()
do not match and the call has not yet timed out.New implementations should always override this method instead of
recv()
, to be able to take advantage of the software based filtering provided byrecv()
as a fallback. This method should never be called directly.Note
This method is not an @abstractmethod (for now) to allow older external implementations to continue using their existing
recv()
implementation.Note
The second return value (whether filtering was already done) may change over time for some interfaces, like for example in the Kvaser interface. Thus it cannot be simplified to a constant value.
- Parameters
- Returns
a message that was read or None on timeout
a bool that is True if message filtering has already been done and else False
- Raises
CanOperationError – If an error occurred while reading
NotImplementedError – if the bus provides it’s own
recv()
implementation (legacy implementation)
- Return type
- _send_periodic_internal(msgs, period, duration=None)[source]#
Default implementation of periodic message sending using threading.
Override this method to enable a more efficient backend specific approach.
- Parameters
msgs (Union[Sequence[can.message.Message], can.message.Message]) – Messages to transmit
period (float) – Period in seconds between each message
duration (Optional[float]) – The duration between sending each message at the given rate. If no duration is provided, the task will continue indefinitely.
- Returns
A started task instance. Note the task can be stopped (and depending on the backend modified) by calling the
stop()
method.- Return type
- channel_info = 'unknown'#
a string describing the underlying bus and/or channel
- property filters: Optional[Sequence[Union[can.typechecking.CanFilter, can.typechecking.CanFilterExtended]]]#
Modify the filters of this bus. See
set_filters()
for details.
- flush_tx_buffer()[source]#
Discard every message that may be queued in the output buffer(s).
- Return type
None
- recv(timeout=None)[source]#
Block waiting for a message from the Bus.
- Parameters
timeout (Optional[float]) – seconds to wait for a message or None to wait indefinitely
- Returns
- Raises
CanOperationError – If an error occurred while reading
- Return type
- abstract send(msg, timeout=None)[source]#
Transmit a message to the CAN bus.
Override this method to enable the transmit path.
- Parameters
msg (Message) – A message object.
timeout (Optional[float]) – If > 0, wait up to this many seconds for message to be ACK’ed or for transmit queue to be ready depending on driver implementation. If timeout is exceeded, an exception will be raised. Might not be supported by all interfaces. None blocks indefinitely.
- Raises
CanOperationError – If an error occurred while sending
- Return type
None
- send_periodic(msgs, period, duration=None, store_task=True)[source]#
Start sending messages at a given period on this bus.
The task will be active until one of the following conditions are met:
the (optional) duration expires
the Bus instance goes out of scope
the Bus instance is shutdown
stop_all_periodic_tasks()
is calledthe task’s
stop()
method is called.
- Parameters
msgs (Union[can.message.Message, Sequence[can.message.Message]]) – Message(s) to transmit
period (float) – Period in seconds between each message
duration (Optional[float]) – Approximate duration in seconds to continue sending messages. If no duration is provided, the task will continue indefinitely.
store_task (bool) – If True (the default) the task will be attached to this Bus instance. Disable to instead manage tasks manually.
- Returns
A started task instance. Note the task can be stopped (and depending on the backend modified) by calling the task’s
stop()
method.- Return type
Note
Note the duration before the messages stop being sent may not be exactly the same as the duration specified by the user. In general the message will be sent at the given rate until at least duration seconds.
Note
For extremely long running Bus instances with many short lived tasks the default api with
store_task==True
may not be appropriate as the stopped tasks are still taking up memory as they are associated with the Bus instance.
- set_filters(filters=None)[source]#
Apply filtering to all messages received by this Bus.
All messages that match at least one filter are returned. If filters is None or a zero length sequence, all messages are matched.
Calling without passing any filters will reset the applied filters to
None
.- Parameters
filters (Optional[Sequence[Union[can.typechecking.CanFilter, can.typechecking.CanFilterExtended]]]) –
A iterable of dictionaries each containing a “can_id”, a “can_mask”, and an optional “extended” key.
>>> [{"can_id": 0x11, "can_mask": 0x21, "extended": False}]
A filter matches, when
<received_can_id> & can_mask == can_id & can_mask
. Ifextended
is set as well, it only matches messages where<received_is_extended> == extended
. Else it matches every messages based only on the arbitration ID and mask.- Return type
None
- shutdown()[source]#
Called to carry out any interface specific cleanup required in shutting down a bus.
- Return type
None
- property state: can.bus.BusState#
Return the current state of the hardware
- stop_all_periodic_tasks(remove_tasks=True)[source]#
Stop sending any messages that were started using
send_periodic()
.Note
The result is undefined if a single task throws an exception while being stopped.
- Parameters
remove_tasks (bool) – Stop tracking the stopped tasks.
- Return type
None
About the IO module#
Handling of the different file formats is implemented in can.io
.
Each file/IO type is within a separate module and ideally implements both a Reader and a Writer.
The reader usually extends can.io.generic.BaseIOHandler
, while
the writer often additionally extends can.Listener
,
to be able to be passed directly to a can.Notifier
.
Adding support for new file formats#
This assumes that you want to add a new file format, called canstore. Ideally add both reading and writing support for the new file format, although this is not strictly required.
Create a new module: can/io/canstore.py (or simply copy some existing one like can/io/csv.py)
Implement a reader
CanstoreReader
(which often extendscan.io.generic.BaseIOHandler
, but does not have to). Besides from a constructor, only__iter__(self)
needs to be implemented.Implement a writer
CanstoreWriter
(which often extendscan.io.generic.BaseIOHandler
andcan.Listener
, but does not have to). Besides from a constructor, onlyon_message_received(self, msg)
needs to be implemented.Add a case to
can.io.player.LogReader
’s__new__()
.Document the two new classes (and possibly additional helpers) with docstrings and comments. Please mention features and limitations of the implementation.
Add a short section to the bottom of doc/listeners.rst.
Add tests where appropriate, for example by simply adding a test case called class TestCanstoreFileFormat(ReaderWriterTest) to test/logformats_test.py. That should already handle all of the general testing. Just follow the way the other tests in there do it.
Add imports to can/__init__py and can/io/__init__py so that the new classes can be simply imported as from can import CanstoreReader, CanstoreWriter.
IO Utilities#
Contains generic base classes for file IO.
- class can.io.generic.BaseIOHandler(file, mode='rt', *args, **kwargs)[source]#
A generic file handler that can be used for reading and writing.
Can be used as a context manager.
- Attr file
the file-like object that is kept internally, or None if none was opened
- Parameters
- Return type
None
- class can.io.generic.FileIOMessageWriter(file, mode='wt', *args, **kwargs)[source]#
A specialized base class for all writers with file descriptors.
- Parameters
- Return type
None
- class can.io.generic.MessageReader(file, mode='rt', *args, **kwargs)[source]#
The base class for all readers.
- Parameters
- Return type
None
Other Utilities#
Utilities and configuration file parsing.
- can.util.deprecated_args_alias(**aliases)[source]#
Allows to rename/deprecate a function kwarg(s) and optionally have the deprecated kwarg(s) set as alias(es)
Example:
@deprecated_args_alias(oldArg="new_arg", anotherOldArg="another_new_arg") def library_function(new_arg, another_new_arg): pass @deprecated_args_alias(oldArg="new_arg", obsoleteOldArg=None) def library_function(new_arg): pass
- can.util.load_config(path=None, config=None, context=None)[source]#
Returns a dict with configuration details which is loaded from (in this order):
config
can.rc
Environment variables CAN_INTERFACE, CAN_CHANNEL, CAN_BITRATE
Config files
/etc/can.conf
or~/.can
or~/.canrc
where the latter may add or replace values of the former.
Interface can be any of the strings from
can.VALID_INTERFACES
for example: kvaser, socketcan, pcan, usb2can, ixxat, nican, virtual.Note
The key
bustype
is copied tointerface
if that one is missing and does never appear in the result.- Parameters
path (Optional[Union[TextIO, BinaryIO, gzip.GzipFile, str, os.PathLike[str]]]) – Optional path to config file.
config (Optional[Dict[str, Any]]) – A dict which may set the ‘interface’, and/or the ‘channel’, or neither. It may set other values that are passed through.
context (Optional[str]) – Extra ‘context’ pass to config sources. This can be use to section other than ‘default’ in the configuration file.
- Returns
A config dictionary that should contain ‘interface’ & ‘channel’:
{ 'interface': 'python-can backend interface to use', 'channel': 'default channel to use', # possibly more }
Note
None
will be used if all the options are exhausted without finding a value.All unused values are passed from
config
over to this.- Raises
CanInterfaceNotImplementedError if the
interface
name isn’t recognized- Return type
can.typechecking.BusConfig
- can.util.load_environment_config(context=None)[source]#
Loads config dict from environmental variables (if set):
CAN_INTERFACE
CAN_CHANNEL
CAN_BITRATE
CAN_CONFIG
if context is supplied, “_{context}” is appended to the environment variable name we will look at. For example if context=”ABC”:
CAN_INTERFACE_ABC
CAN_CHANNEL_ABC
CAN_BITRATE_ABC
CAN_CONFIG_ABC
- can.util.load_file_config(path=None, section='default')[source]#
Loads configuration from file with following content:
[default] interface = socketcan channel = can0
- can.util.time_perfcounter_correlation()[source]#
Get the perf_counter value nearest to when time.time() is updated
Computed if the default timer used by time.time on this platform has a resolution higher than 10μs, otherwise the current time and perf_counter is directly returned. This was chosen as typical timer resolution on Linux/macOS is ~1μs, and the Windows platform can vary from ~500μs to 10ms.
Note this value is based on when time.time() is observed to update from Python, it is not directly returned by the operating system.