Skip to content

Core Modules

BasePlugin

BasePlugin defines the required interface for all OpenRoadSim plugins.

Every plugin must inherit from this class and implement the following lifecycle methods: - on_init(): Called once at startup for initialization. - on_event(): Called when a subscribed event is published. - on_shutdown(): Called when the simulation ends.

Source code in core/base_plugin.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class BasePlugin:
    """
    BasePlugin defines the required interface for all OpenRoadSim plugins.

    Every plugin must inherit from this class and implement the following lifecycle methods:
    - on_init(): Called once at startup for initialization.
    - on_event(): Called when a subscribed event is published.
    - on_shutdown(): Called when the simulation ends.
    """

    def on_init(self, config):
        """
        Called once after the plugin is loaded.

        Args:
            config (dict): Optional configuration data (reserved for future use).
        """
        raise NotImplementedError("Plugin must implement on_init()")

    def on_event(self, topic, data, timestamp):
        """
        Called when a relevant event is published on the EventBus.

        Args:
            topic (str): The topic of the event (e.g., "gps.set_location").
            data (dict): Parameters passed with the event.
            timestamp (float): The simulation time when the event is triggered.
        """
        raise NotImplementedError("Plugin must implement on_event()")

    def on_shutdown(self):
        """
        Called once at the end of the simulation to clean up resources.
        """
        raise NotImplementedError("Plugin must implement on_shutdown()")

on_event(topic, data, timestamp)

Called when a relevant event is published on the EventBus.

Parameters:

Name Type Description Default
topic str

The topic of the event (e.g., "gps.set_location").

required
data dict

Parameters passed with the event.

required
timestamp float

The simulation time when the event is triggered.

required
Source code in core/base_plugin.py
44
45
46
47
48
49
50
51
52
53
def on_event(self, topic, data, timestamp):
    """
    Called when a relevant event is published on the EventBus.

    Args:
        topic (str): The topic of the event (e.g., "gps.set_location").
        data (dict): Parameters passed with the event.
        timestamp (float): The simulation time when the event is triggered.
    """
    raise NotImplementedError("Plugin must implement on_event()")

on_init(config)

Called once after the plugin is loaded.

Parameters:

Name Type Description Default
config dict

Optional configuration data (reserved for future use).

required
Source code in core/base_plugin.py
35
36
37
38
39
40
41
42
def on_init(self, config):
    """
    Called once after the plugin is loaded.

    Args:
        config (dict): Optional configuration data (reserved for future use).
    """
    raise NotImplementedError("Plugin must implement on_init()")

on_shutdown()

Called once at the end of the simulation to clean up resources.

Source code in core/base_plugin.py
55
56
57
58
59
def on_shutdown(self):
    """
    Called once at the end of the simulation to clean up resources.
    """
    raise NotImplementedError("Plugin must implement on_shutdown()")

ScenarioParser

ScenarioParser is responsible for reading YAML scenario files and converting them into a list of executable event dictionaries.

It supports: - Basic event steps with time, target, and action - Looping blocks to repeat steps with offsets - Variable injection (for future use) - Modular scenario imports

Source code in core/scenario_parser.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
class ScenarioParser:
    """
    ScenarioParser is responsible for reading YAML scenario files and converting them
    into a list of executable event dictionaries.

    It supports:
    - Basic event steps with time, target, and action
    - Looping blocks to repeat steps with offsets
    - Variable injection (for future use)
    - Modular scenario imports
    """

    def __init__(self, logger):
        """
        Initializes the parser with a logger.

        Args:
            logger (Logger): The logger instance used for output.
        """
        self.logger = logger
        self.variables = {}

    def load(self, path: str) -> list[dict]:
        """
        Loads and parses a scenario YAML file from disk.

        Args:
            path (str): Path to the scenario file.

        Returns:
            list[dict]: A sorted list of events (each with 'time', 'target', 'action', etc.)
        """
        try:
            with open(path, 'r') as f:
                raw_data = yaml.safe_load(f)
        except Exception as e:
            self.logger.error(f"Failed to load scenario file: {e}")
            return []

        if isinstance(raw_data, dict) and 'import' in raw_data:
            return self._handle_imports(raw_data, os.path.dirname(path))

        # New: handle top-level keys like 'variables' and 'events'
        if isinstance(raw_data, dict):
            if 'variables' in raw_data:
                self._parse_variables(raw_data['variables'])
            if 'events' in raw_data:
                return self._parse_steps(raw_data['events'])

            self.logger.warn("Scenario YAML missing 'events' key.")
            return []

        return self._parse_steps(raw_data)

    def _parse_steps(self, steps):
        """
        Parses the scenario steps into a normalized list of events.

        Args:
            steps (list): List of raw YAML entries.

        Returns:
            list[dict]: Parsed and time-sorted list of events.
        """
        parsed = []
        for i, step in enumerate(steps):
            if 'loop' in step:
                parsed.extend(self._parse_loop(step['loop']))
            elif 'variables' in step:
                self._parse_variables(step['variables'])
            else:
                event = self._normalize_step(step, i)
                if event:
                    parsed.append(event)
        return sorted(parsed, key=lambda e: e["time"])

    def _normalize_step(self, step, index):
        """
        Ensures a step contains the required fields and formats it into a clean event.

        Args:
            step (dict): A raw YAML event step.
            index (int or str): Line or loop identifier for error reporting.

        Returns:
            dict or None: A valid event dict, or None if invalid.
        """
        required_keys = ("time", "target", "action")
        if not all(k in step for k in required_keys):
            self.logger.warn(f"Skipping invalid step at index {index}: {step}")
            return None

        return {
            "time": float(step["time"]),
            "target": step["target"],
            "action": step["action"],
            "params": step.get("params", {}),
            "condition": step.get("condition", None)
        }

    def _parse_loop(self, loop):
        """
        Handles looped steps and unrolls them into repeated time-offset events.

        Args:
            loop (dict): A loop block with count, interval, and steps.

        Returns:
            list[dict]: Flattened list of events.
        """
        count = loop.get("count", 1)
        interval = loop.get("interval", 1)
        steps = loop.get("steps", [])
        events = []

        for i in range(count):
            offset = i * interval
            for step in steps:
                new_step = step.copy()
                new_step["time"] = float(step.get("time", 0)) + offset
                events.append(self._normalize_step(new_step, f"loop-{i}"))
        return events

    def _parse_variables(self, var_block):
        """
        Stores reusable variables for future parameter expansion.

        Args:
            var_block (dict): A dictionary of reusable variable blocks.
        """
        self.variables.update(var_block)
        self.logger.debug(f"Loaded variables: {self.variables}")

    def _handle_imports(self, data, base_dir):
        """
        Handles imported YAML fragments by path and loads them recursively.

        Args:
            data (dict): YAML data that includes an 'import' directive.
            base_dir (str): Base path to resolve relative imports.

        Returns:
            list[dict]: Events loaded from the imported scenario.
        """
        import_path = data['import']
        if not import_path.endswith(".yaml"):
            import_path += ".yaml"
        full_path = os.path.join(base_dir, import_path)

        self.logger.info(f"Importing scenario: {full_path}")
        return self.load(full_path)

__init__(logger)

Initializes the parser with a logger.

Parameters:

Name Type Description Default
logger Logger

The logger instance used for output.

required
Source code in core/scenario_parser.py
40
41
42
43
44
45
46
47
48
def __init__(self, logger):
    """
    Initializes the parser with a logger.

    Args:
        logger (Logger): The logger instance used for output.
    """
    self.logger = logger
    self.variables = {}

load(path)

Loads and parses a scenario YAML file from disk.

Parameters:

Name Type Description Default
path str

Path to the scenario file.

required

Returns:

Type Description
list[dict]

list[dict]: A sorted list of events (each with 'time', 'target', 'action', etc.)

Source code in core/scenario_parser.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def load(self, path: str) -> list[dict]:
    """
    Loads and parses a scenario YAML file from disk.

    Args:
        path (str): Path to the scenario file.

    Returns:
        list[dict]: A sorted list of events (each with 'time', 'target', 'action', etc.)
    """
    try:
        with open(path, 'r') as f:
            raw_data = yaml.safe_load(f)
    except Exception as e:
        self.logger.error(f"Failed to load scenario file: {e}")
        return []

    if isinstance(raw_data, dict) and 'import' in raw_data:
        return self._handle_imports(raw_data, os.path.dirname(path))

    # New: handle top-level keys like 'variables' and 'events'
    if isinstance(raw_data, dict):
        if 'variables' in raw_data:
            self._parse_variables(raw_data['variables'])
        if 'events' in raw_data:
            return self._parse_steps(raw_data['events'])

        self.logger.warn("Scenario YAML missing 'events' key.")
        return []

    return self._parse_steps(raw_data)

EventBus

EventBus is the central messaging system for OpenRoadSim. It handles publishing and subscribing of events between the ScenarioEngine and plugins.

Events are routed based on a topic string in the form 'target.action' (e.g., 'can.send', 'gps.update').

Source code in core/event_bus.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
class EventBus:
    """
    EventBus is the central messaging system for OpenRoadSim.
    It handles publishing and subscribing of events between the ScenarioEngine and plugins.

    Events are routed based on a topic string in the form 'target.action'
    (e.g., 'can.send', 'gps.update').
    """

    def __init__(self, logger):
        """
        Initializes the EventBus.

        Args:
            logger (Logger): An instance of the project's logger to output debug/info messages.
        """
        self.logger = logger
        self.subscriptions = {}  # Maps topic (str) to a list of plugin instances

    def subscribe(self, topic, plugin):
        """
        Subscribes a plugin to a specific event topic.

        Args:
            topic (str): The event topic to listen for (e.g., "echo.say").
            plugin (BasePlugin): An instance of a plugin that implements on_event().
        """
        if topic not in self.subscriptions:
            self.subscriptions[topic] = []
        self.subscriptions[topic].append(plugin)
        self.logger.debug(f"{plugin.name} subscribed to {topic}")

    def publish(self, topic, data, timestamp):
        reporter.log_event(topic, data, timestamp) 
        listeners = self.subscriptions.get(topic, [])
        wildcard_listeners = self.subscriptions.get("*", [])

        if not listeners and not wildcard_listeners:
            self.logger.warn(f"No subscribers for topic: {topic}")
            return

        # Regular topic listeners
        for plugin in listeners:
            try:
                plugin.on_event(topic, data, timestamp)
                reporter.log_plugin_response(plugin.name, topic, "ok", timestamp) 
            except Exception as e:
                self.logger.error(f"Plugin '{plugin.name}' failed on {topic}: {e}")
                reporter.log_error(plugin.name, topic, e) 

        # Wildcard listeners (e.g., EchoPlugin)
        for plugin in wildcard_listeners:
            try:
                plugin.on_event(topic, data, timestamp)
                reporter.log_plugin_response(plugin.name, topic, "ok", timestamp)
            except Exception as e:
                self.logger.error(f"Plugin '{plugin.name}' failed on wildcard topic '{topic}': {e}")
                reporter.log_error(plugin.name, topic, e)

__init__(logger)

Initializes the EventBus.

Parameters:

Name Type Description Default
logger Logger

An instance of the project's logger to output debug/info messages.

required
Source code in core/event_bus.py
36
37
38
39
40
41
42
43
44
def __init__(self, logger):
    """
    Initializes the EventBus.

    Args:
        logger (Logger): An instance of the project's logger to output debug/info messages.
    """
    self.logger = logger
    self.subscriptions = {}  # Maps topic (str) to a list of plugin instances

subscribe(topic, plugin)

Subscribes a plugin to a specific event topic.

Parameters:

Name Type Description Default
topic str

The event topic to listen for (e.g., "echo.say").

required
plugin BasePlugin

An instance of a plugin that implements on_event().

required
Source code in core/event_bus.py
46
47
48
49
50
51
52
53
54
55
56
57
def subscribe(self, topic, plugin):
    """
    Subscribes a plugin to a specific event topic.

    Args:
        topic (str): The event topic to listen for (e.g., "echo.say").
        plugin (BasePlugin): An instance of a plugin that implements on_event().
    """
    if topic not in self.subscriptions:
        self.subscriptions[topic] = []
    self.subscriptions[topic].append(plugin)
    self.logger.debug(f"{plugin.name} subscribed to {topic}")

PluginManager

PluginManager is responsible for dynamically loading and managing simulation plugins.

It: - Loads plugin metadata from each plugin's plugin.yaml - Dynamically imports the plugin's main module (main.py) - Instantiates the plugin class - Registers subscriptions with the EventBus - Manages lifecycle hooks (init and shutdown)

Source code in core/plugin_manager.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
class PluginManager:
    """
    PluginManager is responsible for dynamically loading and managing simulation plugins.

    It:
    - Loads plugin metadata from each plugin's `plugin.yaml`
    - Dynamically imports the plugin's main module (`main.py`)
    - Instantiates the plugin class
    - Registers subscriptions with the EventBus
    - Manages lifecycle hooks (init and shutdown)
    """

    def __init__(self, logger, event_bus, plugin_dir="plugins"):
        """
        Initializes the PluginManager.

        Args:
            logger (Logger): The logging utility instance.
            event_bus (EventBus): The event dispatcher used to route plugin events.
            plugin_dir (str): The directory path where plugins are located.
        """
        self.logger = logger
        self.event_bus = event_bus
        self.plugin_dir = plugin_dir
        self.plugins = []

    def load_plugins(self):
        """
        Discovers, loads, and registers all plugins from the plugin directory.

        - Validates that each plugin folder contains `plugin.yaml` and `main.py`.
        - Dynamically imports the plugin class defined in metadata (`entry_class`, defaults to `Plugin`).
        - Calls each plugin's `on_init()` method.
        - Subscribes the plugin to topics defined in `plugin.yaml` (under `subscriptions`).
        """
        self.logger.info(f"Loading plugins from '{self.plugin_dir}'")

        for plugin_name in os.listdir(self.plugin_dir):
            plugin_path = os.path.join(self.plugin_dir, plugin_name)
            if not os.path.isdir(plugin_path):
                continue

            meta_path = os.path.join(plugin_path, "plugin.yaml")
            code_path = os.path.join(plugin_path, "main.py")

            if not os.path.exists(meta_path) or not os.path.exists(code_path):
                self.logger.warn(f"Skipping plugin '{plugin_name}' (missing plugin.yaml or main.py)")
                continue

            try:
                with open(meta_path, 'r') as f:
                    metadata = yaml.safe_load(f)

                # Dynamic import of the plugin's main class
                spec = importlib.util.spec_from_file_location(f"{plugin_name}.main", code_path)
                module = importlib.util.module_from_spec(spec)
                spec.loader.exec_module(module)

                plugin_class = getattr(module, metadata.get("entry_class", "Plugin"))
                plugin_instance = plugin_class()
                plugin_instance.name = metadata.get("name", plugin_name)

                #Provide Event Bus to the plugin if it needs to use it
                if hasattr(plugin_instance, "event_bus"):
                    plugin_instance.event_bus = self.event_bus


                # Register plugin subscriptions to EventBus
                for sub in metadata.get("subscriptions", []):
                    target = sub.get("target")
                    actions = sub.get("actions", [])
                    for action in actions:
                        topic = f"{target}.{action}"
                        if target == "*" and action == "*":
                            topic = "*"  # Special case: full wildcard
                        self.event_bus.subscribe(topic, plugin_instance)

                self.plugins.append(plugin_instance)
                plugin_instance.on_init({})
                reporter.metadata["plugins"].append(plugin_instance.name) 
                self.logger.info(f"Loaded plugin '{plugin_instance.name}'")

            except Exception as e:
                self.logger.error(f"Failed to load plugin '{plugin_name}': {e}")

    def shutdown_plugins(self):
        """
        Gracefully shuts down all loaded plugins by calling their `on_shutdown()` methods.
        Logs any failures during shutdown.
        """
        for plugin in self.plugins:
            try:
                plugin.on_shutdown()
            except Exception as e:
                self.logger.warn(f"Plugin '{plugin.name}' failed to shut down cleanly: {e}")

__init__(logger, event_bus, plugin_dir='plugins')

Initializes the PluginManager.

Parameters:

Name Type Description Default
logger Logger

The logging utility instance.

required
event_bus EventBus

The event dispatcher used to route plugin events.

required
plugin_dir str

The directory path where plugins are located.

'plugins'
Source code in core/plugin_manager.py
43
44
45
46
47
48
49
50
51
52
53
54
55
def __init__(self, logger, event_bus, plugin_dir="plugins"):
    """
    Initializes the PluginManager.

    Args:
        logger (Logger): The logging utility instance.
        event_bus (EventBus): The event dispatcher used to route plugin events.
        plugin_dir (str): The directory path where plugins are located.
    """
    self.logger = logger
    self.event_bus = event_bus
    self.plugin_dir = plugin_dir
    self.plugins = []

load_plugins()

Discovers, loads, and registers all plugins from the plugin directory.

  • Validates that each plugin folder contains plugin.yaml and main.py.
  • Dynamically imports the plugin class defined in metadata (entry_class, defaults to Plugin).
  • Calls each plugin's on_init() method.
  • Subscribes the plugin to topics defined in plugin.yaml (under subscriptions).
Source code in core/plugin_manager.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def load_plugins(self):
    """
    Discovers, loads, and registers all plugins from the plugin directory.

    - Validates that each plugin folder contains `plugin.yaml` and `main.py`.
    - Dynamically imports the plugin class defined in metadata (`entry_class`, defaults to `Plugin`).
    - Calls each plugin's `on_init()` method.
    - Subscribes the plugin to topics defined in `plugin.yaml` (under `subscriptions`).
    """
    self.logger.info(f"Loading plugins from '{self.plugin_dir}'")

    for plugin_name in os.listdir(self.plugin_dir):
        plugin_path = os.path.join(self.plugin_dir, plugin_name)
        if not os.path.isdir(plugin_path):
            continue

        meta_path = os.path.join(plugin_path, "plugin.yaml")
        code_path = os.path.join(plugin_path, "main.py")

        if not os.path.exists(meta_path) or not os.path.exists(code_path):
            self.logger.warn(f"Skipping plugin '{plugin_name}' (missing plugin.yaml or main.py)")
            continue

        try:
            with open(meta_path, 'r') as f:
                metadata = yaml.safe_load(f)

            # Dynamic import of the plugin's main class
            spec = importlib.util.spec_from_file_location(f"{plugin_name}.main", code_path)
            module = importlib.util.module_from_spec(spec)
            spec.loader.exec_module(module)

            plugin_class = getattr(module, metadata.get("entry_class", "Plugin"))
            plugin_instance = plugin_class()
            plugin_instance.name = metadata.get("name", plugin_name)

            #Provide Event Bus to the plugin if it needs to use it
            if hasattr(plugin_instance, "event_bus"):
                plugin_instance.event_bus = self.event_bus


            # Register plugin subscriptions to EventBus
            for sub in metadata.get("subscriptions", []):
                target = sub.get("target")
                actions = sub.get("actions", [])
                for action in actions:
                    topic = f"{target}.{action}"
                    if target == "*" and action == "*":
                        topic = "*"  # Special case: full wildcard
                    self.event_bus.subscribe(topic, plugin_instance)

            self.plugins.append(plugin_instance)
            plugin_instance.on_init({})
            reporter.metadata["plugins"].append(plugin_instance.name) 
            self.logger.info(f"Loaded plugin '{plugin_instance.name}'")

        except Exception as e:
            self.logger.error(f"Failed to load plugin '{plugin_name}': {e}")

shutdown_plugins()

Gracefully shuts down all loaded plugins by calling their on_shutdown() methods. Logs any failures during shutdown.

Source code in core/plugin_manager.py
116
117
118
119
120
121
122
123
124
125
def shutdown_plugins(self):
    """
    Gracefully shuts down all loaded plugins by calling their `on_shutdown()` methods.
    Logs any failures during shutdown.
    """
    for plugin in self.plugins:
        try:
            plugin.on_shutdown()
        except Exception as e:
            self.logger.warn(f"Plugin '{plugin.name}' failed to shut down cleanly: {e}")

ScenarioEngine

ScenarioEngine is responsible for orchestrating the execution of simulation events according to their scheduled timestamps.

It reads a list of parsed scenario events and dispatches them via the EventBus to subscribed plugins in time-aligned order.

Source code in core/scenario_engine.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
class ScenarioEngine:
    """
    ScenarioEngine is responsible for orchestrating the execution of simulation events
    according to their scheduled timestamps.

    It reads a list of parsed scenario events and dispatches them via the EventBus
    to subscribed plugins in time-aligned order.
    """

    def __init__(self, logger, event_bus):
        """
        Initializes the ScenarioEngine.

        Args:
            logger (Logger): Logger instance for outputting status and debug info.
            event_bus (EventBus): Event bus for publishing events to plugins.
        """
        self.logger = logger
        self.event_bus = event_bus
        self.running = False

    def run(self, events):
        """
        Executes a scenario by processing each event at its designated simulation time.

        This function uses real wall-clock time to delay dispatch until the scheduled `event['time']`.

        Args:
            events (list[dict]): List of events loaded from a scenario file.
                                 Each event must include 'time', 'target', 'action', and optional 'params'.
        """
        self.logger.info(f"Starting scenario with {len(events)} event(s).")
        self.running = True
        start_time = time.time()

        for event in events:
            if not self.running:
                self.logger.warn("Scenario stopped prematurely.")
                break

            event_time = float(event["time"])
            now = time.time()
            elapsed = now - start_time
            wait_time = max(0, event_time - elapsed)

            if wait_time > 0:
                time.sleep(wait_time)

            topic = f"{event['target']}.{event['action']}"
            params = event.get("params", {})

            self.logger.debug(f"Dispatching event @ {event_time:.3f}s → {topic}")
            self.event_bus.publish(topic, params, event_time)

        self.logger.info("Scenario completed.")

    def stop(self):
        """
        Stops the currently running scenario (typically via user interrupt).
        """
        self.logger.warn("Stopping scenario execution.")
        self.running = False

__init__(logger, event_bus)

Initializes the ScenarioEngine.

Parameters:

Name Type Description Default
logger Logger

Logger instance for outputting status and debug info.

required
event_bus EventBus

Event bus for publishing events to plugins.

required
Source code in core/scenario_engine.py
36
37
38
39
40
41
42
43
44
45
46
def __init__(self, logger, event_bus):
    """
    Initializes the ScenarioEngine.

    Args:
        logger (Logger): Logger instance for outputting status and debug info.
        event_bus (EventBus): Event bus for publishing events to plugins.
    """
    self.logger = logger
    self.event_bus = event_bus
    self.running = False

run(events)

Executes a scenario by processing each event at its designated simulation time.

This function uses real wall-clock time to delay dispatch until the scheduled event['time'].

Parameters:

Name Type Description Default
events list[dict]

List of events loaded from a scenario file. Each event must include 'time', 'target', 'action', and optional 'params'.

required
Source code in core/scenario_engine.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def run(self, events):
    """
    Executes a scenario by processing each event at its designated simulation time.

    This function uses real wall-clock time to delay dispatch until the scheduled `event['time']`.

    Args:
        events (list[dict]): List of events loaded from a scenario file.
                             Each event must include 'time', 'target', 'action', and optional 'params'.
    """
    self.logger.info(f"Starting scenario with {len(events)} event(s).")
    self.running = True
    start_time = time.time()

    for event in events:
        if not self.running:
            self.logger.warn("Scenario stopped prematurely.")
            break

        event_time = float(event["time"])
        now = time.time()
        elapsed = now - start_time
        wait_time = max(0, event_time - elapsed)

        if wait_time > 0:
            time.sleep(wait_time)

        topic = f"{event['target']}.{event['action']}"
        params = event.get("params", {})

        self.logger.debug(f"Dispatching event @ {event_time:.3f}s → {topic}")
        self.event_bus.publish(topic, params, event_time)

    self.logger.info("Scenario completed.")

stop()

Stops the currently running scenario (typically via user interrupt).

Source code in core/scenario_engine.py
83
84
85
86
87
88
def stop(self):
    """
    Stops the currently running scenario (typically via user interrupt).
    """
    self.logger.warn("Stopping scenario execution.")
    self.running = False

APIInterface

Singleton class providing a unified API for managing OpenRoadSim simulation lifecycle.

This class encapsulates scenario parsing, plugin management, engine execution, and result reporting. It is designed for use by both GUI and console tools.

Example usage

from core.api_interface import APIInterface api = APIInterface.get_instance(logger) api.load_scenario("scenarios/example.yaml") api.start()

Attributes:

Name Type Description
logger Logger

Logger used for output.

plugin_dir str

Plugin folder path (defaults to 'plugins').

on_status callable

Optional callback for simulation status events.

on_log callable

Optional callback for simulation log messages.

Source code in core/api_interface.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
class APIInterface:
    """
    Singleton class providing a unified API for managing OpenRoadSim simulation lifecycle.

    This class encapsulates scenario parsing, plugin management, engine execution, and result reporting.
    It is designed for use by both GUI and console tools.

    Example usage:
        from core.api_interface import APIInterface
        api = APIInterface.get_instance(logger)
        api.load_scenario("scenarios/example.yaml")
        api.start()

    Attributes:
        logger (Logger): Logger used for output.
        plugin_dir (str): Plugin folder path (defaults to 'plugins').
        on_status (callable): Optional callback for simulation status events.
        on_log (callable): Optional callback for simulation log messages.
    """

    _instance = None
    _lock = threading.Lock()

    @classmethod
    def get_instance(cls, logger=None, plugin_dir="plugins"):
        """
        Retrieves the global singleton instance.

        Args:
            logger (Logger): Logger instance (required on first call).
            plugin_dir (str): Optional path to plugins folder.

        Returns:
            APIInterface: Shared instance of the APIInterface.
        """
        with cls._lock:
            if cls._instance is None:
                if logger is None:
                    raise ValueError("Logger must be provided for first initialization.")
                cls._instance = cls(logger, plugin_dir)
        return cls._instance

    def __init__(self, logger, plugin_dir="plugins"):
        if hasattr(self, "_initialized") and self._initialized:
            return  # prevent reinitialization

        self.logger = logger
        self.plugin_dir = plugin_dir

        self.event_bus = EventBus(logger)
        self.plugin_manager = PluginManager(logger, self.event_bus, plugin_dir=plugin_dir)
        self.parser = ScenarioParser(logger)
        self.engine = ScenarioEngine(logger, self.event_bus)
        self.reporter = Reporter()

        self.thread = None
        self.running = False
        self.events = []

        self.on_status = None
        self.on_log = None

        self._initialized = True

    def load_scenario(self, scenario_path):
        """
        Parses a YAML scenario file and prepares its events for execution.

        Args:
            scenario_path (str): Path to the scenario YAML file.

        Returns:
            bool: True if the scenario was successfully loaded.

        Raises:
            ValueError: If no events are found in the scenario.
        """
        self.events = self.parser.load(scenario_path)
        self.reporter.metadata["scenario_file"] = scenario_path

        if not self.events:
            raise ValueError("No valid events loaded.")

        self._log(f"Loaded {len(self.events)} events from scenario.")
        return True

    def start(self):
        """
        Starts the simulation in a background thread.

        Loads plugins, begins execution of scenario events, and monitors progress.
        """
        if self.running:
            self._log("Simulation already running.")
            return

        self.plugin_manager.load_plugins()
        self._log("Plugins loaded.")

        self.running = True
        self.thread = threading.Thread(target=self._run, daemon=True)
        self.thread.start()

    def _run(self):
        """
        Executes the simulation engine.

        Manages status callbacks and handles errors and cleanup.
        """
        self._status("started")
        try:
            self.engine.run(self.events)
            self._status("completed")
        except Exception as e:
            self._status("error")
            self._log(f"Simulation failed: {e}")
        finally:
            self.plugin_manager.shutdown_plugins()
            self.reporter.write_json("report.json")
            self._log("Simulation report written to report.json")
            self.running = False

    def stop(self):
        """
        Gracefully stops the currently running simulation.
        """
        if self.running:
            self.engine.stop()
            self._status("stopped")
            self._log("Simulation stopped by user.")

    def _log(self, msg):
        """
        Emits a log message through the registered callback or logger.

        Args:
            msg (str): Log content.
        """
        if self.on_log:
            self.on_log(msg)
        else:
            self.logger.info(msg)

    def _status(self, code):
        """
        Emits a status update through the registered callback or logger.

        Args:
            code (str): One of: 'started', 'completed', 'stopped', or 'error'.
        """
        if self.on_status:
            self.on_status(code)
        else:
            self.logger.info(f"Status: {code}")

get_instance(logger=None, plugin_dir='plugins') classmethod

Retrieves the global singleton instance.

Parameters:

Name Type Description Default
logger Logger

Logger instance (required on first call).

None
plugin_dir str

Optional path to plugins folder.

'plugins'

Returns:

Name Type Description
APIInterface

Shared instance of the APIInterface.

Source code in core/api_interface.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
@classmethod
def get_instance(cls, logger=None, plugin_dir="plugins"):
    """
    Retrieves the global singleton instance.

    Args:
        logger (Logger): Logger instance (required on first call).
        plugin_dir (str): Optional path to plugins folder.

    Returns:
        APIInterface: Shared instance of the APIInterface.
    """
    with cls._lock:
        if cls._instance is None:
            if logger is None:
                raise ValueError("Logger must be provided for first initialization.")
            cls._instance = cls(logger, plugin_dir)
    return cls._instance

load_scenario(scenario_path)

Parses a YAML scenario file and prepares its events for execution.

Parameters:

Name Type Description Default
scenario_path str

Path to the scenario YAML file.

required

Returns:

Name Type Description
bool

True if the scenario was successfully loaded.

Raises:

Type Description
ValueError

If no events are found in the scenario.

Source code in core/api_interface.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def load_scenario(self, scenario_path):
    """
    Parses a YAML scenario file and prepares its events for execution.

    Args:
        scenario_path (str): Path to the scenario YAML file.

    Returns:
        bool: True if the scenario was successfully loaded.

    Raises:
        ValueError: If no events are found in the scenario.
    """
    self.events = self.parser.load(scenario_path)
    self.reporter.metadata["scenario_file"] = scenario_path

    if not self.events:
        raise ValueError("No valid events loaded.")

    self._log(f"Loaded {len(self.events)} events from scenario.")
    return True

start()

Starts the simulation in a background thread.

Loads plugins, begins execution of scenario events, and monitors progress.

Source code in core/api_interface.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def start(self):
    """
    Starts the simulation in a background thread.

    Loads plugins, begins execution of scenario events, and monitors progress.
    """
    if self.running:
        self._log("Simulation already running.")
        return

    self.plugin_manager.load_plugins()
    self._log("Plugins loaded.")

    self.running = True
    self.thread = threading.Thread(target=self._run, daemon=True)
    self.thread.start()

stop()

Gracefully stops the currently running simulation.

Source code in core/api_interface.py
174
175
176
177
178
179
180
181
def stop(self):
    """
    Gracefully stops the currently running simulation.
    """
    if self.running:
        self.engine.stop()
        self._status("stopped")
        self._log("Simulation stopped by user.")