Skip to content

CAN Plugin

Bases: BasePlugin

CanPlugin handles CAN signal injection using the SocketCAN interface.

It listens for can.send events, validates and formats the data, and transmits CAN frames through a configured Linux CAN interface.

Source code in plugins/can/main.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
class CanPlugin(BasePlugin):
    """
    CanPlugin handles CAN signal injection using the SocketCAN interface.

    It listens for `can.send` events, validates and formats the data,
    and transmits CAN frames through a configured Linux CAN interface.
    """

    def __init__(self):
        """
        Initializes the CanPlugin instance.

        Sets up the plugin name, logger, and internal SocketCAN bus handle.
        """
        self.name = "CanPlugin"
        self.logger = Logger(self.name)
        self.bus = None

    def on_init(self, config):
        """
        Initializes the CAN interface using settings from the central config.

        Args:
            config (dict): Optional runtime configuration (unused here).

        Logs errors if `can.interface` is missing or if SocketCAN initialization fails.
        """
        plugin_config = ConfigLoader.get("can")
        interface = plugin_config.get("interface")

        if not interface:
            self.logger.error(
                "Missing 'interface' in CAN plugin configuration. "
                "Please set can.interface in config.yaml."
            )
            self.bus = None
            return

        try:
            self.bus = can.interface.Bus(channel=interface, bustype="socketcan")
            self.logger.info(f"CanPlugin initialized on interface: {interface}")
        except Exception as e:
            self.logger.error(f"Failed to initialize SocketCAN on '{interface}': {e}")
            self.bus = None

    def on_event(self, topic, data, timestamp):
        """
        Handles incoming simulation events targeting the CAN plugin.

        Args:
            topic (str): The event topic (e.g., 'can.send').
            data (dict): The event data, including 'id' (CAN ID) and 'data' (payload bytes).
            timestamp (float): Simulation timestamp in seconds.

        Validates and converts the ID, logs the transmission, and sends a CAN frame if a bus is active.
        """
        id = data.get("id")

        if id is None:
            self.logger.warn(f"[{timestamp:.3f}s] Missing CAN 'id' in event: {data}")
            return

        if isinstance(id, str) and id.startswith("0x"):
            try:
                id = int(id, 16)
            except ValueError:
                self.logger.warn(f"[{timestamp:.3f}s] Invalid hex ID: {id}")
                return

        if not isinstance(id, int):
            self.logger.warn(f"[{timestamp:.3f}s] CAN ID must be an int, got {type(id).__name__}: {id}")
            return

        value = data.get("data", [])
        hex_data = [f"0x{byte:02X}" for byte in value] if isinstance(value, list) else value
        self.logger.info(f"[{timestamp:.3f}s] Injected CAN ID=0x{id:X}, Data={hex_data}")

        if self.bus and isinstance(value, list):
            try:
                msg = can.Message(arbitration_id=id, data=bytearray(value), is_extended_id=False)
                self.bus.send(msg)
                self.logger.debug(f"Sent to SocketCAN: {msg}")
            except Exception as e:
                self.logger.error(f"Failed to send CAN frame: {e}")

    def on_shutdown(self):
        """
        Cleanly shuts down the CAN plugin and closes the SocketCAN bus.

        Called when the simulation ends or the plugin is unloaded.
        """
        self.logger.info("CanPlugin shutting down.")
        if self.bus:
            self.bus.shutdown()

__init__()

Initializes the CanPlugin instance.

Sets up the plugin name, logger, and internal SocketCAN bus handle.

Source code in plugins/can/main.py
37
38
39
40
41
42
43
44
45
def __init__(self):
    """
    Initializes the CanPlugin instance.

    Sets up the plugin name, logger, and internal SocketCAN bus handle.
    """
    self.name = "CanPlugin"
    self.logger = Logger(self.name)
    self.bus = None

on_event(topic, data, timestamp)

Handles incoming simulation events targeting the CAN plugin.

Parameters:

Name Type Description Default
topic str

The event topic (e.g., 'can.send').

required
data dict

The event data, including 'id' (CAN ID) and 'data' (payload bytes).

required
timestamp float

Simulation timestamp in seconds.

required

Validates and converts the ID, logs the transmission, and sends a CAN frame if a bus is active.

Source code in plugins/can/main.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def on_event(self, topic, data, timestamp):
    """
    Handles incoming simulation events targeting the CAN plugin.

    Args:
        topic (str): The event topic (e.g., 'can.send').
        data (dict): The event data, including 'id' (CAN ID) and 'data' (payload bytes).
        timestamp (float): Simulation timestamp in seconds.

    Validates and converts the ID, logs the transmission, and sends a CAN frame if a bus is active.
    """
    id = data.get("id")

    if id is None:
        self.logger.warn(f"[{timestamp:.3f}s] Missing CAN 'id' in event: {data}")
        return

    if isinstance(id, str) and id.startswith("0x"):
        try:
            id = int(id, 16)
        except ValueError:
            self.logger.warn(f"[{timestamp:.3f}s] Invalid hex ID: {id}")
            return

    if not isinstance(id, int):
        self.logger.warn(f"[{timestamp:.3f}s] CAN ID must be an int, got {type(id).__name__}: {id}")
        return

    value = data.get("data", [])
    hex_data = [f"0x{byte:02X}" for byte in value] if isinstance(value, list) else value
    self.logger.info(f"[{timestamp:.3f}s] Injected CAN ID=0x{id:X}, Data={hex_data}")

    if self.bus and isinstance(value, list):
        try:
            msg = can.Message(arbitration_id=id, data=bytearray(value), is_extended_id=False)
            self.bus.send(msg)
            self.logger.debug(f"Sent to SocketCAN: {msg}")
        except Exception as e:
            self.logger.error(f"Failed to send CAN frame: {e}")

on_init(config)

Initializes the CAN interface using settings from the central config.

Parameters:

Name Type Description Default
config dict

Optional runtime configuration (unused here).

required

Logs errors if can.interface is missing or if SocketCAN initialization fails.

Source code in plugins/can/main.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def on_init(self, config):
    """
    Initializes the CAN interface using settings from the central config.

    Args:
        config (dict): Optional runtime configuration (unused here).

    Logs errors if `can.interface` is missing or if SocketCAN initialization fails.
    """
    plugin_config = ConfigLoader.get("can")
    interface = plugin_config.get("interface")

    if not interface:
        self.logger.error(
            "Missing 'interface' in CAN plugin configuration. "
            "Please set can.interface in config.yaml."
        )
        self.bus = None
        return

    try:
        self.bus = can.interface.Bus(channel=interface, bustype="socketcan")
        self.logger.info(f"CanPlugin initialized on interface: {interface}")
    except Exception as e:
        self.logger.error(f"Failed to initialize SocketCAN on '{interface}': {e}")
        self.bus = None

on_shutdown()

Cleanly shuts down the CAN plugin and closes the SocketCAN bus.

Called when the simulation ends or the plugin is unloaded.

Source code in plugins/can/main.py
114
115
116
117
118
119
120
121
122
def on_shutdown(self):
    """
    Cleanly shuts down the CAN plugin and closes the SocketCAN bus.

    Called when the simulation ends or the plugin is unloaded.
    """
    self.logger.info("CanPlugin shutting down.")
    if self.bus:
        self.bus.shutdown()