import math import numpy as np import pandas as pd from enum import Enum from typing import List, Literal from numpy.typing import ArrayLike from scipy.spatial.transform import Rotation from spatz.logger import Advanceable class Phase(Enum): ONPAD = 1 LOI = 2 RCI = 3 ECI = 4 ADI = 5 def T1(angle): # return Rotation.from_euler('X', angle, degrees=False).as_matrix() return np.array([ [1, 0, 0], [0, math.cos(angle), math.sin(angle)], [0, -math.sin(angle), math.cos(angle)], ]) def T2(angle): # return Rotation.from_euler('Y', angle, degrees=False).as_matrix() return np.array([ [math.cos(angle), 0, -math.sin(angle)], [0, 1, 0], [math.sin(angle), 0, math.cos(angle)] ]) def T3(angle): # return Rotation.from_euler('Z', angle, degrees=False).as_matrix() return np.array([ [math.cos(angle), math.sin(angle), 0], [-math.sin(angle), math.cos(angle), 0], [0, 0, 1] ]) class Dataset(Advanceable): def __init__(self, path: str, interpolation: str = 'linear'): """A wrapper class for a Pandas dataframe containing simulation data. Args: df (pd.DataFrame): A Pandas dataframe containing simulation data. interpolation (str, optional): The interpolation method for obtaining new data points. Defaults to 'linear'. """ super().__init__() self.__df = pd.read_csv(path) self.__idx = 0 self.__interpolation = interpolation # Find the liftoff time. self.__loi = self.__df['time'][self.__df['phase'] == Phase.LOI].min() self.__rci = self.__df['time'][self.__df['phase'] == Phase.RCI].min() self.__eci = self.__df['time'][self.__df['phase'] == Phase.ECI].min() self.__adi = self.__df['time'][self.__df['phase'] == Phase.ADI].min() def _on_reset(self): pass def _get_closest_idx(self, t: float) -> int: """Gets an index _idx_ for the dataframe _df_ such that the values at the given time _t_ are somewhere between _idx_ and _idx+1_. Args: t (float): The requested time. Returns: int: The computed index. """ idx = (self.__df['time'] - t).abs().idxmin() idx = idx if self.__df['time'].loc[idx] <= t else idx - 1 return idx def _on_step(self, _: float): self.__idx = self._get_closest_idx(self.get_time()) def get_phase(self) -> Phase: """ Returns: Phase: Get the current phase of the flight. """ t = self.get_time() if t < self.__loi: return Phase.ONPAD if t < self.__rci: return Phase.LOI if t < self.__eci: return Phase.RCI if t < self.__adi: return Phase.ECI return Phase.ADI def get_time_until(self, phase: Phase) -> float: """Returns how much time is left until the given phase is reached. Negative values represent the time that has passed since the phase was reached. Args: phase (Phase): A phase of the flight. Returns: float: Time until or since the phase was reached. """ t = self.get_time() switch = { Phase.ONPAD: 0 - t, Phase.LOI: self.__loi - t, Phase.RCI: self.__rci - t, Phase.ECI: self.__eci - t, Phase.ADI: self.__adi - t } return switch.get(phase) def get_length(self) -> float: """Returns the time horizon of this dataset. Returns: float: The last time step in the dataset. """ return max(self.__df['time']) def get_start_time(self) -> float: """ Returns: float: Returns the starting time of the simulation. """ return self.fetch_start_value('time') def fetch_start_value(self, name: str) -> float: """Get the initial value for a given attribute from the dataframe. Args: name (str): The name of the value to fetch. Returns: float: Returns the requested value. """ return self.__df.at[0, name] def fetch_init_values(self, names: List[str]) -> ArrayLike: """Get the initial value for given attributes from the dataframe. Args: names (List[str]): Names of the values to get. Returns: np.array: Returns a numpy array containing the requested values in the same order as in the input list. """ return np.asarray([self.fetch_start_value(name) for name in names]) def fetch_value(self, name: str, t: float | None = None) -> float: """Get a specific value from the dataframe. Args: name (str): The name of the value to fetch. t (float): Allows specification of a different time instead of the current time. None for current time. Returns: float: Returns the requested value. """ idx = self.__idx if t is None else self._get_closest_idx(t) if self.__interpolation == 'linear': t_min = self.__df.at[idx, 'time'] t_max = self.__df.at[idx + 1, 'time'] # Sometimes no time passes in-between two samples. if t_max == t_min: return self.__df.at[name, idx] # Compute the weight for interpolation. alpha = (self.get_time() - t_min) / (t_max - t_min) # Interpolate linearly between the two data points. return (1 - alpha) * self.__df.at[idx, name] + alpha * self.__df.at[idx + 1, name] def fetch_values(self, names: List[str], t: float | None = None) -> ArrayLike: """Get specific values from the dataframe. Args: names (List[str]): Names of the values to get. t (float): Allows specification of a different time instead of the current time. None for current time. Returns: np.array: Returns a numpy array containing the requested values in the same order as in the input list. """ return np.asarray([self.fetch_value(name, t) for name in names]) def local_to_body(self, t: float | None = None) -> ArrayLike: """ Args: t (float): The time to get the transformation matrix for. Returns: ArrayLike: The current transformation matrix from local to body-fixed coords. """ # Get the rotation in the local coordinate system. rots = self.fetch_values(['pitch_l', 'yaw_l', 'roll_l'], t) pitch_l, yaw_l, roll_l = rots[0], rots[1], rots[2] return T1(roll_l) @ T2(pitch_l - math.pi/2) @ T1(-yaw_l) def global_to_local(self, t: float | None = None) -> ArrayLike: """ Args: t (float): The time to get the transformation matrix for. Returns: ArrayLike: The current transformation matrix from global to local coords. """ decl = self.fetch_value('declination', t) long = self.fetch_value('longitude', t) t0 = self.get_start_time() omega_E = (2*math.pi) / (24*60*60) return T2(-decl) @ T3(long + omega_E * t0) def global_to_launch_rail(self, t: float | None = None) -> ArrayLike: """ Args: t (float): The time to get the transformation matrix for. Doesn't do anything here because the transformation remains the same across all time steps. Returns: ArrayLike: The current transformation matrix from global to launch rail coords. """ init_long = self.fetch_start_value('longitude') init_lat = self.fetch_start_value('latitude') return T2(-math.pi/2 - init_lat) @ T3(init_long) def local_to_launch_rail(self, t: float | None = None) -> ArrayLike: """ Args: t (float): The time to get the transformation matrix for. Returns: ArrayLike: The current transformation matrix from local to launch rail coords. """ return self.global_to_launch_rail(t) @ np.linalg.inv(self.global_to_local(t)) def launch_rail_to_body(self, t: float | None = None) -> ArrayLike: """ Args: t (float): The time to get the transformation matrix for. Returns: ArrayLike: The current transformation matrix from launch rail to local coords. """ return self.local_to_body(t) @ np.linalg.inv(self.local_to_launch_rail(t)) def get_mach_number(self, t: float | None = None) -> float: """ Args: t (float): Allows specification of a different time instead of the current time. None for current time. Returns: float: Returns the mach number at the specified time. """ return self.fetch_value('mach', t) def is_transsonic(self, t: float | None = None) -> bool: """ Args: t (float): Allows specification of a different time instead of the current time. None for current time. Returns: bool: Returns True if the rocket is flying with transsonic speed at the specified time. """ mach = self.get_mach_number(t) return mach > 0.8 and mach < 1.2 def is_supersonic(self, t: float | None = None) -> bool: """ Args: t (float): Allows specification of a different time instead of the current time. None for current time. Returns: bool: True if the rocket is flying with supersonic speed at the specified time. """ return self.get_mach_number(t) > 1 def get_total_velocity(self, t: float | None = None) -> float: """ Args: t (float): Allows specification of a different time instead of the current time. None for current time. Returns: np.array: Returns the velocity at the current time. """ return self.fetch_value('velocity', t) def get_acceleration(self, frame='FL', t: float | None = None) -> ArrayLike: """ Args: frame (str, optional): The coordinate frame to compute the acceleration for. Defaults to 'FL'. t (float): Allows specification of a different time instead of the current time. None for current time. Returns: ArrayLike: Returns the spacecraft's acceleration at the given time. """ acc = self.fetch_values(['ax', 'ay', 'az'], t) if frame == 'B': return self.launch_rail_to_body(t) @ acc return acc def get_angular_velocities(self, t: float | None = None) -> ArrayLike: """ Args: t (float): Allows specification of a different time instead of the current time. None for current time. Returns: ArrayLike: Gets the derivatives in angular velocity across all axes of the rocket. """ return self.fetch_values(['OMEGA_X', 'OMEGA_Y', 'OMEGA_Z'], t) def get_velocity(self, frame: Literal['L', 'B', 'LF'] = 'LF', t: float | None = None) -> ArrayLike: """ Args: frame (str, optional): The coordinate frame to compute the velocity for. Defaults to 'FL'. t (float): Allows specification of a different time instead of the current time. None for current time. Returns: ArrayLike: Returns the spacecraft's velocity at a given time. """ vel = self.fetch_values(['vx', 'vy', 'vz'], t) if frame == 'B': return self.launch_rail_to_body(t) @ vel elif frame == 'L': return np.linalg.inv(self.local_to_launch_rail()) @ vel return vel def get_altitude(self, t: float | None = None) -> float: """ Args: t (float | None, optional): Allows specification of a different time instead of the current time. None for current time. Returns: float: Returns the altitude in meter at the specified time. """ return self.fetch_value('altitude', t) def get_speed_of_sound(self, t: float | None = None) -> float: """ Args: t (float): Allows specification of a different time instead of the current time. None for current time. Returns: float: Returns the speed of sound at the specified time. """ return self.fetch_value('speedofsound', t) def get_rotation(self, t: float | None = None) -> np.array: """ Args: t (float): Allows specification of a different time instead of the current time. None for current time. Returns: np.array: returns the rotation at the specified time. """ return self.fetch_values(['pitch_l', 'yaw_l', 'roll_l'], t) def get_temperature(self, t: float | None = None) -> float: """ Args: t (float): Allows specification of a different time instead of the current time. None for current time. Returns: np.array: Returns the temperature at the spepcified time. """ return self.fetch_value('temperature', t) def get_pressure(self, t: float | None = None) -> float: """ Args: t (float): Allows specification of a different time instead of the current time. None for current time. Returns: np.array: Returns the pressure at the current time of the simulation. """ return self.fetch_value('pressure', t) def get_thrust(self, t: float | None = None) -> float: """ Args: t (float): Allows specification of a different time instead of the current time. None for current time. Returns: float: Returns the thrust value for the specified time. """ return self.fetch_value('thrust', t) def get_drag(self, t: float | None = None) -> float: """ Args: t (float): Allows specification of a different time instead of the current time. None for current time. Returns: float: Returns the drag value for the specified time. """ return self.fetch_value('drag', t) def get_mass(self, t: float | None = None) -> float: """ Args: t (float): Allows specification of a different time instead of the current time. None for current time. Returns: float: Returns the mass value for the specified time. """ return self.fetch_value('mass', t) if __name__ == '__main__': pass