Bases: BasePlugin
CanPlugin handles CAN signal injection using the SocketCAN interface.
It listens for can.send
events, validates and formats the data,
and transmits CAN frames through a configured Linux CAN interface.
Source code in plugins/can/main.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122 | class CanPlugin(BasePlugin):
"""
CanPlugin handles CAN signal injection using the SocketCAN interface.
It listens for `can.send` events, validates and formats the data,
and transmits CAN frames through a configured Linux CAN interface.
"""
def __init__(self):
"""
Initializes the CanPlugin instance.
Sets up the plugin name, logger, and internal SocketCAN bus handle.
"""
self.name = "CanPlugin"
self.logger = Logger(self.name)
self.bus = None
def on_init(self, config):
"""
Initializes the CAN interface using settings from the central config.
Args:
config (dict): Optional runtime configuration (unused here).
Logs errors if `can.interface` is missing or if SocketCAN initialization fails.
"""
plugin_config = ConfigLoader.get("can")
interface = plugin_config.get("interface")
if not interface:
self.logger.error(
"Missing 'interface' in CAN plugin configuration. "
"Please set can.interface in config.yaml."
)
self.bus = None
return
try:
self.bus = can.interface.Bus(channel=interface, bustype="socketcan")
self.logger.info(f"CanPlugin initialized on interface: {interface}")
except Exception as e:
self.logger.error(f"Failed to initialize SocketCAN on '{interface}': {e}")
self.bus = None
def on_event(self, topic, data, timestamp):
"""
Handles incoming simulation events targeting the CAN plugin.
Args:
topic (str): The event topic (e.g., 'can.send').
data (dict): The event data, including 'id' (CAN ID) and 'data' (payload bytes).
timestamp (float): Simulation timestamp in seconds.
Validates and converts the ID, logs the transmission, and sends a CAN frame if a bus is active.
"""
id = data.get("id")
if id is None:
self.logger.warn(f"[{timestamp:.3f}s] Missing CAN 'id' in event: {data}")
return
if isinstance(id, str) and id.startswith("0x"):
try:
id = int(id, 16)
except ValueError:
self.logger.warn(f"[{timestamp:.3f}s] Invalid hex ID: {id}")
return
if not isinstance(id, int):
self.logger.warn(f"[{timestamp:.3f}s] CAN ID must be an int, got {type(id).__name__}: {id}")
return
value = data.get("data", [])
hex_data = [f"0x{byte:02X}" for byte in value] if isinstance(value, list) else value
self.logger.info(f"[{timestamp:.3f}s] Injected CAN ID=0x{id:X}, Data={hex_data}")
if self.bus and isinstance(value, list):
try:
msg = can.Message(arbitration_id=id, data=bytearray(value), is_extended_id=False)
self.bus.send(msg)
self.logger.debug(f"Sent to SocketCAN: {msg}")
except Exception as e:
self.logger.error(f"Failed to send CAN frame: {e}")
def on_shutdown(self):
"""
Cleanly shuts down the CAN plugin and closes the SocketCAN bus.
Called when the simulation ends or the plugin is unloaded.
"""
self.logger.info("CanPlugin shutting down.")
if self.bus:
self.bus.shutdown()
|
__init__()
Initializes the CanPlugin instance.
Sets up the plugin name, logger, and internal SocketCAN bus handle.
Source code in plugins/can/main.py
37
38
39
40
41
42
43
44
45 | def __init__(self):
"""
Initializes the CanPlugin instance.
Sets up the plugin name, logger, and internal SocketCAN bus handle.
"""
self.name = "CanPlugin"
self.logger = Logger(self.name)
self.bus = None
|
on_event(topic, data, timestamp)
Handles incoming simulation events targeting the CAN plugin.
Parameters:
Name |
Type |
Description |
Default |
topic
|
str
|
The event topic (e.g., 'can.send').
|
required
|
data
|
dict
|
The event data, including 'id' (CAN ID) and 'data' (payload bytes).
|
required
|
timestamp
|
float
|
Simulation timestamp in seconds.
|
required
|
Validates and converts the ID, logs the transmission, and sends a CAN frame if a bus is active.
Source code in plugins/can/main.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112 | def on_event(self, topic, data, timestamp):
"""
Handles incoming simulation events targeting the CAN plugin.
Args:
topic (str): The event topic (e.g., 'can.send').
data (dict): The event data, including 'id' (CAN ID) and 'data' (payload bytes).
timestamp (float): Simulation timestamp in seconds.
Validates and converts the ID, logs the transmission, and sends a CAN frame if a bus is active.
"""
id = data.get("id")
if id is None:
self.logger.warn(f"[{timestamp:.3f}s] Missing CAN 'id' in event: {data}")
return
if isinstance(id, str) and id.startswith("0x"):
try:
id = int(id, 16)
except ValueError:
self.logger.warn(f"[{timestamp:.3f}s] Invalid hex ID: {id}")
return
if not isinstance(id, int):
self.logger.warn(f"[{timestamp:.3f}s] CAN ID must be an int, got {type(id).__name__}: {id}")
return
value = data.get("data", [])
hex_data = [f"0x{byte:02X}" for byte in value] if isinstance(value, list) else value
self.logger.info(f"[{timestamp:.3f}s] Injected CAN ID=0x{id:X}, Data={hex_data}")
if self.bus and isinstance(value, list):
try:
msg = can.Message(arbitration_id=id, data=bytearray(value), is_extended_id=False)
self.bus.send(msg)
self.logger.debug(f"Sent to SocketCAN: {msg}")
except Exception as e:
self.logger.error(f"Failed to send CAN frame: {e}")
|
on_init(config)
Initializes the CAN interface using settings from the central config.
Parameters:
Name |
Type |
Description |
Default |
config
|
dict
|
Optional runtime configuration (unused here).
|
required
|
Logs errors if can.interface
is missing or if SocketCAN initialization fails.
Source code in plugins/can/main.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72 | def on_init(self, config):
"""
Initializes the CAN interface using settings from the central config.
Args:
config (dict): Optional runtime configuration (unused here).
Logs errors if `can.interface` is missing or if SocketCAN initialization fails.
"""
plugin_config = ConfigLoader.get("can")
interface = plugin_config.get("interface")
if not interface:
self.logger.error(
"Missing 'interface' in CAN plugin configuration. "
"Please set can.interface in config.yaml."
)
self.bus = None
return
try:
self.bus = can.interface.Bus(channel=interface, bustype="socketcan")
self.logger.info(f"CanPlugin initialized on interface: {interface}")
except Exception as e:
self.logger.error(f"Failed to initialize SocketCAN on '{interface}': {e}")
self.bus = None
|
on_shutdown()
Cleanly shuts down the CAN plugin and closes the SocketCAN bus.
Called when the simulation ends or the plugin is unloaded.
Source code in plugins/can/main.py
114
115
116
117
118
119
120
121
122 | def on_shutdown(self):
"""
Cleanly shuts down the CAN plugin and closes the SocketCAN bus.
Called when the simulation ends or the plugin is unloaded.
"""
self.logger.info("CanPlugin shutting down.")
if self.bus:
self.bus.shutdown()
|