import math import numpy as np import pandas as pd from enum import Enum from typing import List from numpy.typing import ArrayLike from scipy.spatial.transform import Rotation from spatz.logger import Advanceable class Phase(Enum): ONPAD = 1 LOI = 2 RCI = 3 ECI = 4 ADI = 5 class Dataset(Advanceable): def __init__(self, path: str, interpolation: str = 'linear'): """A wrapper class for a Pandas dataframe containing simulation data. Args: df (pd.DataFrame): A Pandas dataframe containing simulation data. interpolation (str, optional): The interpolation method for obtaining new data points. Defaults to 'linear'. """ super().__init__() self.__df = pd.read_csv(path) self.__idx = 0 self.__interpolation = interpolation # Find the liftoff time. self.__loi = self.__df['time'][self.__df['phase'] == Phase.LOI].min() self.__rci = self.__df['time'][self.__df['phase'] == Phase.RCI].min() self.__eci = self.__df['time'][self.__df['phase'] == Phase.ECI].min() self.__adi = self.__df['time'][self.__df['phase'] == Phase.ADI].min() def _on_reset(self): pass def _on_step(self, _: float): idx = (self.__df['time'] - self.get_time()).abs().idxmin() self.__idx = idx if self.__df['time'].loc[idx] < self.get_time() else idx - 1 def get_phase(self) -> Phase: """ Returns: Phase: Get the current phase of the flight. """ t = self.get_time() if t < self.__loi: return Phase.ONPAD if t < self.__rci: return Phase.LOI if t < self.__eci: return Phase.RCI if t < self.__adi: return Phase.ECI return Phase.ADI def get_time_until(self, phase: Phase) -> float: """Returns how much time is left until the given phase is reached. Negative values represent the time that has passed since the phase was reached. Args: phase (Phase): A phase of the flight. Returns: float: Time until or since the phase was reached. """ t = self.get_time() switch = { Phase.ONPAD: 0 - t, Phase.LOI: self.__loi - t, Phase.RCI: self.__rci - t, Phase.ECI: self.__eci - t, Phase.ADI: self.__adi - t } return switch.get(phase) def get_length(self) -> float: """Returns the time horizon of this dataset. Returns: float: The last time step in the dataset. """ return max(self.__df['time']) @staticmethod def T1(angle): # return Rotation.from_euler('X', angle, degrees=False).as_matrix() return np.array([ [1, 0, 0], [0, math.cos(angle), math.sin(angle)], [0, -math.sin(angle), math.cos(angle)], ]) @staticmethod def T2(angle): # return Rotation.from_euler('Y', angle, degrees=False).as_matrix() return np.array([ [math.cos(angle), 0, -math.sin(angle)], [0, 1, 0], [math.sin(angle), 0, math.cos(angle)] ]) @staticmethod def T3(angle): # return Rotation.from_euler('Z', angle, degrees=False).as_matrix() return np.array([ [math.cos(angle), math.sin(angle), 0], [-math.sin(angle), math.cos(angle), 0], [0, 0, 1] ]) def local_to_body(self) -> ArrayLike: """ Returns: ArrayLike: The current transformation matrix from local to body-fixed coords. """ # Get the rotation in the local coordinate system. rots = self.fetch_values(['pitch_l', 'yaw_l', 'roll_l']) pitch_l, yaw_l, roll_l = rots[0], rots[1], rots[2] return self.T1(roll_l) @ self.T2(pitch_l - math.pi/2) @ self.T1(-yaw_l) def global_to_local(self) -> ArrayLike: """ Returns: ArrayLike: The current transformation matrix from global to local coords. """ decl = self.fetch_value('declination') long = self.fetch_value('longitude') t0 = self.__df['time'].iloc[0] omega_E = (2*math.pi) / (24*60*60) return self.T2(-decl) @ self.T3(long + omega_E * t0) def global_to_launch_rail(self) -> ArrayLike: """ Returns: ArrayLike: The current transformation matrix from global to launch rail coords. """ init_long = self.__df['longitude'].iloc[0] init_lat = self.__df['latitude'].iloc[0] return self.T2(-math.pi/2 - init_lat) @ self.T3(init_long) def local_to_launch_rail(self) -> ArrayLike: """ Returns: ArrayLike: The current transformation matrix from local to launch rail coords. """ return self.global_to_launch_rail() @ np.linalg.inv(self.global_to_local()) def launch_rail_to_body(self) -> ArrayLike: """ Returns: ArrayLike: The current transformation matrix from launch rail to local coords. """ return self.local_to_body() @ np.linalg.inv(self.local_to_launch_rail()) def is_transsonic(self) -> bool: """ Returns: bool: Returns True if the rocket is flying with transsonic speed at the current time of the simulation. """ mach = self.get_mach_number() return mach > 0.8 and mach < 1.2 def is_supersonic(self) -> bool: """ Returns: bool: True if the rocket is flying with supersonic speed at the current time of the simulation. """ return self.get_mach_number() > 1 def fetch_value(self, name: str) -> float: """Get a specific value from the dataframe. Args: name (str): The name of the value to fetch. Returns: float: Returns the requested value. """ if self.__interpolation == 'linear': t_min = self.__df.at[self.__idx, 'time'] t_max = self.__df.at[self.__idx + 1, 'time'] # Sometimes no time passes in-between two samples. if t_max == t_min: return self.__df.at[name, self.__idx] # Compute the weight for interpolation. alpha = (self.get_time() - t_min) / (t_max - t_min) # Interpolate linearly between the two data points. return (1 - alpha) * self.__df.at[self.__idx, name] + alpha * self.__df.at[self.__idx + 1, name] def fetch_values(self, names: List[str]) -> np.array: """Get specific values from the dataframe. Args: names (List[str]): Names of the values to get. Returns: np.array: Returns a numpy array containing the requested values in the same order as in the input list. """ return np.asarray([self.fetch_value(name) for name in names]) def get_velocity(self) -> float: """ Returns: np.array: Returns the velocity at the current time of the simulation. """ return self.fetch_value('velocity') def get_acceleration(self, frame='FL') -> ArrayLike: """_summary_ Args: frame (str, optional): _description_. Defaults to 'FL'. Returns: ArrayLike: _description_ """ acc = self.fetch_values(['ax', 'ay', 'az']) if frame == 'B': return self.launch_rail_to_body() @ acc return acc def get_angular_velocities(self) -> ArrayLike: """ Returns: ArrayLike: Gets the derivatives in angular velocity across all axes of the rocket. """ return self.fetch_values(['omega_X', 'omega_Y', 'omega_Z']) def get_velocity(self, frame='FL') -> ArrayLike: """ Args: frame (str, optional): _description_. Defaults to 'FL'. Returns: ArrayLike: _description_ """ vel = self.fetch_values(['vx', 'vy', 'vz']) if frame == 'B': return self.launch_rail_to_body() @ vel return vel def get_mach_number(self) -> float: """ Returns: float: Returns the mach number at the current time of the simulation. """ return self.fetch_value('mach') def get_speed_of_sound(self) -> float: """ Returns: float: Returns the speed of sound at the current time of the simulation. """ return self.fetch_value('speedofsound') def get_rotation_rates(self) -> np.array: """ Returns: np.array: Returns the rotation rates at the current time of the simulation. """ return self.fetch_values(['OMEGA_X', 'OMEGA_Y', 'OMEGA_Z']) def get_rotation(self) -> np.array: """ Returns: np.array: _description_ """ return self.fetch_values(['pitch_l', 'yaw_l', 'roll_l']) def get_temperature(self) -> float: """ Returns: np.array: Returns the temperature at the current time of the simulation. """ return self.fetch_value('temperature') def get_pressure(self) -> float: """ Returns: np.array: Returns the pressure at the current time of the simulation. """ return self.fetch_value('pressure') def get_thrust(self) -> float: """ Returns: float: Returns the thrust value for the current time of the simulation. """ return self.fetch_value('thrust') def get_drag(self) -> float: """ Returns: float: Returns the drag value for the current time of the simulation. """ return self.fetch_value('drag') def get_mass(self) -> float: """ Returns: float: Returns the mass value for the current time of the simulation. """ return self.fetch_value('mass') if __name__ == '__main__': pass