Skip to content

Replay GPS Plugin

Bases: BasePlugin

Replays GPS coordinates from an NMEA log file and emits 'gps.set_location' events to simulate GPS movement in the OpenRoadSim environment.

Source code in plugins/replay_gps/main.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
class ReplayGPSPlugin(BasePlugin):
    """
    Replays GPS coordinates from an NMEA log file and emits 'gps.set_location' events
    to simulate GPS movement in the OpenRoadSim environment.
    """

    def __init__(self):
        """
        Initializes the ReplayGPSPlugin instance.

        Sets up logging, thread management variables, and placeholder for event bus.
        """
        self.name = "ReplayGPSPlugin"
        self.logger = Logger(self.name)
        self.thread = None
        self.running = False
        self.event_bus = None  # Will be injected externally

    def on_init(self, config):
        """
        Called once when the plugin is initialized.

        Args:
            config (dict): Optional configuration parameters (not used in this plugin).

        Logs initialization info.
        """
        self.logger.info("ReplayGPSPlugin initialized.")

    def on_event(self, topic, data, scenario_timestamp):
        """
        Handles events targeted to this plugin.

        Specifically listens for 'start_replay' events to start replaying GPS data
        from a specified NMEA file.

        Args:
            topic (str): The event topic (e.g., 'gps.start_replay').
            data (dict): Event parameters; expects 'file' and optional 'speed'.
            scenario_timestamp (float): Simulation time when the event was triggered.
        """
        if topic.endswith("start_replay"):
            file = data.get("file")
            speed = float(data.get("speed", 1.0))
            if file:
                self.thread = threading.Thread(
                    target=self._replay_file,
                    args=(file, speed, scenario_timestamp),
                    daemon=True
                )
                self.running = True
                self.thread.start()
                self.thread.join()
            else:
                self.logger.error("Missing 'file' parameter for start_replay.")

    def _replay_file(self, filepath, speed, scenario_start_time):
        """
        Internal method to replay NMEA GPS data from the specified file.

        Reads NMEA GGA sentences, parses latitude and longitude, and emits
        gps.set_location events simulating real-time movement scaled by speed.

        Args:
            filepath (str): Path to the NMEA file.
            speed (float): Playback speed factor (1.0 = real time).
            scenario_start_time (float): Simulation time offset to align replay.
        """
        self.logger.info(f"Replaying GPS from {filepath} at {speed}x speed")

        try:
            with open(filepath, 'r') as f:
                base_nmea_time = None
                sim_time = scenario_start_time

                for line in f:
                    if not self.running:
                        break

                    if line.startswith('$GPGGA'):
                        parts = line.strip().split(',')

                        # Parse NMEA time (HHMMSS.sss)
                        nmea_raw = parts[1]
                        if not nmea_raw:
                            continue
                        hh = int(nmea_raw[0:2])
                        mm = int(nmea_raw[2:4])
                        ss = float(nmea_raw[4:])
                        nmea_seconds = hh * 3600 + mm * 60 + ss

                        if base_nmea_time is None:
                            base_nmea_time = nmea_seconds
                            offset = 0.0
                        else:
                            offset = (nmea_seconds - base_nmea_time) / speed

                        sim_time = scenario_start_time + offset

                        lat, lon = self._parse_nmea(parts)
                        if lat is not None and lon is not None:
                            self._emit_gps(lat, lon, sim_time)
                            time.sleep(0.05)  # small sleep to reduce CPU usage

        except Exception as e:
            self.logger.error(f"Replay failed: {e}")

    def _emit_gps(self, lat, lon, sim_time):
        """
        Emits the simulated GPS location event on the event bus.

        Args:
            lat (float): Latitude in decimal degrees.
            lon (float): Longitude in decimal degrees.
            sim_time (float): Simulation timestamp for the event.
        """
        event = {
            "lat": lat,
            "lon": lon
        }
        self.event_bus.publish("gps.set_location", event, sim_time)

    def _parse_nmea(self, parts):
        """
        Parses latitude and longitude from a GPGGA NMEA sentence parts list.

        Args:
            parts (list): List of strings split from a GPGGA sentence.

        Returns:
            tuple: (latitude, longitude) as floats in decimal degrees,
                   or (None, None) if parsing fails.
        """
        try:
            lat_raw = parts[2]
            lat_dir = parts[3]
            lon_raw = parts[4]
            lon_dir = parts[5]

            lat = self._convert_to_decimal(lat_raw, lat_dir)
            lon = self._convert_to_decimal(lon_raw, lon_dir)
            return lat, lon
        except Exception:
            return None, None

    def _convert_to_decimal(self, coord, direction):
        """
        Converts NMEA coordinate format (degrees and minutes) to decimal degrees.

        Args:
            coord (str): Coordinate in NMEA format (ddmm.mmmm or dddmm.mmmm).
            direction (str): Cardinal direction ('N','S','E','W').

        Returns:
            float: Decimal degrees, negative for 'S' or 'W'.
        """
        if not coord:
            return None
        deg_len = 2 if len(coord.split('.')[0]) <= 4 else 3
        degrees = int(coord[:deg_len])
        minutes = float(coord[deg_len:])
        decimal = degrees + minutes / 60
        if direction in ['S', 'W']:
            decimal = -decimal
        return decimal

    def on_shutdown(self):
        """
        Called when the plugin is shutting down.

        Sets running flag to False to stop replay thread cleanly
        and logs the shutdown event.
        """
        self.logger.info("ReplayGPSPlugin shutting down.")
        self.running = False

__init__()

Initializes the ReplayGPSPlugin instance.

Sets up logging, thread management variables, and placeholder for event bus.

Source code in plugins/replay_gps/main.py
35
36
37
38
39
40
41
42
43
44
45
def __init__(self):
    """
    Initializes the ReplayGPSPlugin instance.

    Sets up logging, thread management variables, and placeholder for event bus.
    """
    self.name = "ReplayGPSPlugin"
    self.logger = Logger(self.name)
    self.thread = None
    self.running = False
    self.event_bus = None  # Will be injected externally

on_event(topic, data, scenario_timestamp)

Handles events targeted to this plugin.

Specifically listens for 'start_replay' events to start replaying GPS data from a specified NMEA file.

Parameters:

Name Type Description Default
topic str

The event topic (e.g., 'gps.start_replay').

required
data dict

Event parameters; expects 'file' and optional 'speed'.

required
scenario_timestamp float

Simulation time when the event was triggered.

required
Source code in plugins/replay_gps/main.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def on_event(self, topic, data, scenario_timestamp):
    """
    Handles events targeted to this plugin.

    Specifically listens for 'start_replay' events to start replaying GPS data
    from a specified NMEA file.

    Args:
        topic (str): The event topic (e.g., 'gps.start_replay').
        data (dict): Event parameters; expects 'file' and optional 'speed'.
        scenario_timestamp (float): Simulation time when the event was triggered.
    """
    if topic.endswith("start_replay"):
        file = data.get("file")
        speed = float(data.get("speed", 1.0))
        if file:
            self.thread = threading.Thread(
                target=self._replay_file,
                args=(file, speed, scenario_timestamp),
                daemon=True
            )
            self.running = True
            self.thread.start()
            self.thread.join()
        else:
            self.logger.error("Missing 'file' parameter for start_replay.")

on_init(config)

Called once when the plugin is initialized.

Parameters:

Name Type Description Default
config dict

Optional configuration parameters (not used in this plugin).

required

Logs initialization info.

Source code in plugins/replay_gps/main.py
47
48
49
50
51
52
53
54
55
56
def on_init(self, config):
    """
    Called once when the plugin is initialized.

    Args:
        config (dict): Optional configuration parameters (not used in this plugin).

    Logs initialization info.
    """
    self.logger.info("ReplayGPSPlugin initialized.")

on_shutdown()

Called when the plugin is shutting down.

Sets running flag to False to stop replay thread cleanly and logs the shutdown event.

Source code in plugins/replay_gps/main.py
195
196
197
198
199
200
201
202
203
def on_shutdown(self):
    """
    Called when the plugin is shutting down.

    Sets running flag to False to stop replay thread cleanly
    and logs the shutdown event.
    """
    self.logger.info("ReplayGPSPlugin shutting down.")
    self.running = False