moss.engine¶
Module Contents¶
Classes¶
Functions¶
Data¶
API¶
- moss.engine.__all__¶
[‘Engine’, ‘TlPolicy’, ‘Verbosity’, ‘SLEEP’, ‘WALKING’, ‘DRIVING’, ‘FINISHED’]
- moss.engine._import()¶
- moss.engine._moss¶
‘_import(…)’
- moss.engine._thread_local¶
‘local(…)’
- moss.engine._populate_waiting_at_lane_end(enable: numpy.ndarray, status: numpy.ndarray, lane_id: numpy.ndarray, lane_length_array: numpy.ndarray, v: numpy.ndarray, s: numpy.ndarray, speed_threshold: float, distance_to_end: float)¶
- moss.engine._populate_waiting_at_lane(enable: numpy.ndarray, status: numpy.ndarray, lane_id: numpy.ndarray, v: numpy.ndarray, speed_threshold: float)¶
- moss.engine.SLEEP¶
0
- moss.engine.WALKING¶
1
- moss.engine.DRIVING¶
2
- moss.engine.FINISHED¶
3
- class moss.engine.Engine(name: str, map_file: str, person_file: str, start_step: int = 0, step_interval: float = 1, seed: int = 43, verbose_level=Verbosity.NO_OUTPUT, person_limit: int = -1, junction_yellow_time: float = 0, phase_pressure_coeff: float = 1.5, speed_stat_interval: int = 0, output_dir: str = '', out_xmin: float = -inf, out_ymin: float = -inf, out_xmax: float = inf, out_ymax: float = inf, device: int = 0)¶
Moss Engine
NOTE: Cannot create multiple Engines on different device. For that purpose, use
moss.parallel.ParallelEngineinstead.Initialization
Args:
name: The name of the task (for directory naming in output)
map_file: The path to the map file (Protobuf format)
person_file: The path to the person file (Protobuf format)
start_step: The starting step of the simulation
step_interval: The interval of each step (unit: seconds)
seed: The random seed
verbose_level: The verbosity level
person_limit: The maximum number of persons to simulate (-1 means no limit)
junction_yellow_time: The yellow time of the junction traffic light
phase_pressure_coeff: The coefficient of the phase pressure
speed_stat_interval: The interval of speed statistics. Set to
0to disable speed statistics.output_dir: The AVRO output directory
out_xmin: The minimum x coordinate of the output bounding box
out_ymin: The minimum y coordinate of the output bounding box
out_xmax: The maximum x coordinate of the output bounding box
out_ymax: The maximum y coordinate of the output bounding box
device: The CUDA device index
- __version__¶
None
- speed_stat_interval¶
None
The interval of speed statistics. Set to
0to disable speed statistics.
- id2lanes¶
None
Dictionary of lanes (Protobuf format) indexed by lane id
- id2roads¶
None
Dictionary of roads (Protobuf format) indexed by road id
- id2junctions¶
None
Dictionary of junctions (Protobuf format) indexed by junction id
- id2aois¶
None
Dictionary of AOIs (Protobuf format) indexed by AOI id
- lane_index2id¶
None
Numpy array of lane ids indexed by lane index
- junc_index2id¶
‘get_junction_ids(…)’
Numpy array of junction ids indexed by junction index
- road_index2id¶
‘get_road_ids(…)’
Numpy array of road ids indexed by road index
- lane_id2index¶
None
Dictionary of lane index indexed by lane id
- junc_id2index¶
None
Dictionary of junction id indexed by junction index
- road_id2index¶
None
Dictionary of road index indexed by road id
- lane_length_array¶
‘array(…)’
Numpy array of lane length indexed by lane index
- start_step¶
None
The starting step of the simulation
- device¶
None
The CUDA device index
- _map_warning()¶
- property person_count: int¶
The number of vehicles in the agent file
- property lane_count: int¶
The number of lanes
- property road_count: int¶
The number of roads
- property junction_count: int¶
The number of junctions
- get_map(dict_return: bool = True) Union[pycityproto.city.map.v2.map_pb2.Map, Dict]¶
Get the Map object. Map is a protobuf message defined in
pycityproto.city.map.v2.map_pb2in thepycityprotopackage. The documentation url is https://docs.fiblab.net/cityproto#city.map.v2.MapArgs:
dict_return: Whether to return the object as a dictionary
Returns:
The Map object or the dictionary
- get_persons(dict_return: bool = True) Union[pycityproto.city.person.v2.person_pb2.Persons, Dict]¶
Get the Persons object. Persons is a protobuf message defined in
pycityproto.city.person.v2.person_pb2in thepycityprotopackage. The documentation url is https://docs.fiblab.net/cityproto#city.person.v2.PersonsArgs:
dict_return: Whether to return the object as a dictionary
Returns:
The Persons object or the dictionary
- get_current_time() float¶
Get the current time
- fetch_persons(fields: List[str] = []) Dict[str, numpy.typing.NDArray]¶
Fetch the persons’ information.
Args:
fields: The fields to fetch, should be a subset of [“id”, “enable”, “status”, “lane_id”, “lane_parent_id”, “s”, “aoi_id”, “v”, “shadow_lane_id”, “shadow_s”, “lc_yaw”, “lc_completed_ratio”, “is_forward”, “x”, “y”, “z”, “dir”, “pitch”, “schedule_index”, “trip_index”, “departure_time”, “traveling_time”, “total_distance”, “cum_co2”, “cum_energy”]. If empty, fetch all fields.
The result values is a dictionary with the following keys:
id: The id of the person
enable: Whether the person is enabled
status: The status of the person
lane_id: The id of the lane the person is on
lane_parent_id: The id of the road the lane belongs to
s: The s value of the person
aoi_id: The id of the AOI the person is in
v: The velocity of the person
shadow_lane_id: The id of the shadow lane the person is on
shadow_s: The s value of the shadow lane
lc_yaw: The yaw of the lane change
lc_completed_ratio: The completed ratio of the lane change
is_forward: Whether the person is moving forward
x: The x coordinate of the person
y: The y coordinate of the person
z: The z coordinate of the person
dir: The direction of the person
pitch: The pitch of the person
schedule_index: The index of the schedule
trip_index: The index of the trip
departure_time: The departure time of the person
traveling_time: The traveling time of the person
total_distance: The total distance of the person
cum_co2: The cumulative CO2 of the person
cum_energy: The cumulative energy of the person
We strongly recommend using
pd.DataFrame(e.fetch_persons())to convert the result to a DataFrame for better visualization and analysis.
- fetch_lanes() Dict[str, numpy.typing.NDArray]¶
Fetch the lanes’ information.
The result values is a dictionary with the following keys:
id: The id of the lane
status: The status of the lane
v_avg: The average speed of the lane
We strongly recommend using
pd.DataFrame(e.fetch_lanes())to convert the result to a DataFrame for better visualization and analysis.
- get_running_person_count() int¶
Get the total number of running persons (including driving and walking)
- get_lane_statuses() numpy.typing.NDArray[numpy.int8]¶
Get the traffic light status of each lane,
0-green /1-yellow /2-red /3-restriction. The lane id of the entryican be obtained bye.lane_index2id[i].
- get_lane_waiting_vehicle_counts(speed_threshold: float = 0.1) Tuple[numpy.typing.NDArray[numpy.int32], numpy.typing.NDArray[numpy.int32]]¶
Get the number of vehicles of each lane with speed lower than
speed_thresholdReturns:
Dict: lane id -> number of vehicles
- get_lane_waiting_at_end_vehicle_counts(speed_threshold: float = 0.1, distance_to_end: float = 100) Tuple[numpy.typing.NDArray[numpy.int32], numpy.typing.NDArray[numpy.int32]]¶
Get the number of vehicles of each lane with speed lower than
speed_thresholdand distance to end lower thandistance_to_endReturns:
Dict: lane id -> number of vehicles
- get_lane_ids() numpy.typing.NDArray[numpy.int32]¶
Get the ids of the lanes as a numpy array
- get_lane_average_vehicle_speed(lane_index: int) float¶
Get the average speed of the vehicles on the lane
lane_index
- get_junction_ids() numpy.typing.NDArray[numpy.int32]¶
Get the ids of the junctions
- get_junction_phase_lanes() List[List[Tuple[List[int], List[int]]]]¶
Get the
indexof theinandoutlanes of each phase of each junctionExamples: TODO
- get_junction_phase_ids() numpy.typing.NDArray[numpy.int32]¶
Get the phase id of each junction,
-1if it has no traffic lights. The junction id of the entryican be obtained bye.junc_index2id[i].
- get_junction_phase_counts() numpy.typing.NDArray[numpy.int32]¶
Get the number of available phases of each junction. The junction id of the entry
ican be obtained bye.junc_index2id[i].
- get_junction_dynamic_roads() List[List[int]]¶
Get the ids of the dynamic roads connected to each junction. The junction id of the entry
ican be obtained by `e.junc_index2id
- get_road_lane_plans(road_index: int) List[List[slice]]¶
Get the dynamic lane plan of the road
road_index, represented as list of lane groups:[ [slice(lane_start, lane_end), ...] ]
- get_road_average_vehicle_speed(road_index: int) float¶
Get the average speed of the vehicles on the road
road_index
- get_finished_person_count() int¶
Get the number of the finished persons
- get_finished_person_average_traveling_time() float¶
Get the average traveling time of the finished persons
- get_running_person_average_traveling_time() float¶
Get the average traveling time of the running persons
- get_departed_person_average_traveling_time() float¶
Get the average traveling time of the departed persons (running+finished)
- get_road_lane_plan_index(road_index: int) int¶
Get the lane plan of road
road_index
- get_road_vehicle_counts() Tuple[numpy.typing.NDArray[numpy.int32], numpy.typing.NDArray[numpy.int32]]¶
Get the number of vehicles of each road
Returns:
Dict: road id -> number of vehicles
- get_road_waiting_vehicle_counts(speed_threshold: float = 0.1) Tuple[numpy.typing.NDArray[numpy.int32], numpy.typing.NDArray[numpy.int32]]¶
Get the number of vehicles with speed lower than
speed_thresholdof each roadReturns:
Dict: road id -> number of vehicles
- set_person_enable(person_index: int, enable: bool)¶
Enable or disable person
person_indexArgs:
person_index: The index of the person
enable: Whether to enable the person
- set_person_enable_batch(person_indices: List[int], enable: Union[bool, List[bool]])¶
Enable or disable person in
person_indicesArgs:
person_indices: The indices of the persons
enable: Whether to enable the persons, can be a boolean or a list of booleans
- set_tl_policy(junction_index: int, policy: moss.engine.TlPolicy)¶
Set the traffic light policy of junction
junction_indextopolicy
- set_tl_policy_batch(junction_indices: List[int], policy: moss.engine.TlPolicy)¶
Set the traffic light policy of all junctions in
junction_indicestopolicy
- set_tl_duration(junction_index: int, duration: int)¶
Set the traffic light switch duration of junction
junction_indextodurationNOTE: This is only effective for
TlPolicy.FIXED_TIMEandTlPolicy.MAX_PRESSURE.NOTE: Set duration to
0to use the predefined duration in themap_file
- set_tl_duration_batch(junction_indices: List[int], duration: int)¶
Set the traffic light switch duration of all junctions in
junction_indicestodurationNOTE: This is only effective for
TlPolicy.FIXED_TIMEandTlPolicy.MAX_PRESSURENOTE: Set duration to
0to use the predefined duration in themap_file
- set_tl_phase(junction_index: Union[str, int], phase_index: int)¶
Set the phase of
junction_indextophase_index
- set_tl_phase_batch(junction_indices: List[int], phase_indices: List[int])¶
Set the phase of
junction_indextophase_indexin batch
- set_road_lane_plan(road_index: int, plan_index: int)¶
Set the lane plan of road
road_index
- set_road_lane_plan_batch(road_indices: List[int], plan_indices: List[int])¶
Set the lane plan of road
road_index
- set_lane_restriction(lane_index: int, flag: bool)¶
Set the restriction state of lane
lane_index
- set_lane_restriction_batch(lane_indices: List[int], flags: List[bool])¶
Set the restriction state of lane
lane_index
- set_lane_max_speed(lane_index: int, max_speed: float)¶
Set the max_speed of lane
lane_index
- set_lane_max_speed_batch(lane_indices: List[int], max_speeds: Union[float, List[float]])¶
Set the max_speed of lane
lane_index
- set_vehicle_route(person_id: int, route: List[int])¶
Set the route of vehicle
person_idArgs:
person_id: The id of the person (must be a vehicle)
route: The route (road id list) of the vehicle
- next_step(n=1)¶
Move forward
nsteps
- make_checkpoint() int¶
Make a checkpoint of the current state of the simulator and return the checkpoint id
- restore_checkpoint(checkpoint_id: int)¶
Restore the state of the simulator to a previous checkpoint