SPATZ/spatz/dataset.py

337 lines
10 KiB
Python

import math
import numpy as np
import pandas as pd
from enum import Enum
from typing import List
from numpy.typing import ArrayLike
from scipy.spatial.transform import Rotation
from spatz.logger import Advanceable
class Phase(Enum):
ONPAD = 1
LOI = 2
RCI = 3
ECI = 4
ADI = 5
class Dataset(Advanceable):
def __init__(self, path: str, interpolation: str = 'linear'):
"""A wrapper class for a Pandas dataframe containing simulation data.
Args:
df (pd.DataFrame): A Pandas dataframe containing simulation data.
interpolation (str, optional): The interpolation method for obtaining new data points. Defaults to 'linear'.
"""
super().__init__()
self.__df = pd.read_csv(path)
self.__idx = 0
self.__interpolation = interpolation
# Find the liftoff time.
self.__loi = self.__df['time'][self.__df['phase'] == Phase.LOI].min()
self.__rci = self.__df['time'][self.__df['phase'] == Phase.RCI].min()
self.__eci = self.__df['time'][self.__df['phase'] == Phase.ECI].min()
self.__adi = self.__df['time'][self.__df['phase'] == Phase.ADI].min()
def _on_reset(self):
pass
def _on_step(self, _: float):
idx = (self.__df['time'] - self.get_time()).abs().idxmin()
self.__idx = idx if self.__df['time'].loc[idx] < self.get_time() else idx - 1
def get_phase(self) -> Phase:
"""
Returns:
Phase: Get the current phase of the flight.
"""
t = self.get_time()
if t < self.__loi:
return Phase.ONPAD
if t < self.__rci:
return Phase.LOI
if t < self.__eci:
return Phase.RCI
if t < self.__adi:
return Phase.ECI
return Phase.ADI
def get_time_until(self, phase: Phase) -> float:
"""Returns how much time is left until the given phase is reached.
Negative values represent the time that has passed since the phase was
reached.
Args:
phase (Phase): A phase of the flight.
Returns:
float: Time until or since the phase was reached.
"""
t = self.get_time()
switch = {
Phase.ONPAD: 0 - t,
Phase.LOI: self.__loi - t,
Phase.RCI: self.__rci - t,
Phase.ECI: self.__eci - t,
Phase.ADI: self.__adi - t
}
return switch.get(phase)
def get_length(self) -> float:
"""Returns the time horizon of this dataset.
Returns:
float: The last time step in the dataset.
"""
return max(self.__df['time'])
@staticmethod
def T1(angle):
# return Rotation.from_euler('X', angle, degrees=False).as_matrix()
return np.array([
[1, 0, 0],
[0, math.cos(angle), math.sin(angle)],
[0, -math.sin(angle), math.cos(angle)],
])
@staticmethod
def T2(angle):
# return Rotation.from_euler('Y', angle, degrees=False).as_matrix()
return np.array([
[math.cos(angle), 0, -math.sin(angle)],
[0, 1, 0],
[math.sin(angle), 0, math.cos(angle)]
])
@staticmethod
def T3(angle):
# return Rotation.from_euler('Z', angle, degrees=False).as_matrix()
return np.array([
[math.cos(angle), math.sin(angle), 0],
[-math.sin(angle), math.cos(angle), 0],
[0, 0, 1]
])
def local_to_body(self) -> ArrayLike:
"""
Returns:
ArrayLike: The current transformation matrix from local to body-fixed coords.
"""
# Get the rotation in the local coordinate system.
rots = self.__fetch_values(['pitch_l', 'yaw_l', 'roll_l'])
pitch_l, yaw_l, roll_l = rots[0], rots[1], rots[2]
return self.T1(roll_l) @ self.T2(pitch_l - math.pi/2) @ self.T1(-yaw_l)
def global_to_local(self) -> ArrayLike:
"""
Returns:
ArrayLike: The current transformation matrix from global to local coords.
"""
decl = self.__fetch_value('declination')
long = self.__fetch_value('longitude')
t0 = self.__df['time'].iloc[0]
omega_E = (2*math.pi) / (24*60*60)
return self.T2(-decl) @ self.T3(long + omega_E * t0)
def global_to_launch_rail(self) -> ArrayLike:
"""
Returns:
ArrayLike: The current transformation matrix from global to launch rail coords.
"""
init_long = self.__df['longitude'].iloc[0]
init_lat = self.__df['latitude'].iloc[0]
return self.T2(-math.pi/2 - init_lat) @ self.T3(init_long)
def local_to_launch_rail(self) -> ArrayLike:
"""
Returns:
ArrayLike: The current transformation matrix from local to launch rail coords.
"""
return self.global_to_launch_rail() @ np.linalg.inv(self.global_to_local())
def launch_rail_to_body(self) -> ArrayLike:
"""
Returns:
ArrayLike: The current transformation matrix from launch rail to local coords.
"""
return self.local_to_body() @ np.linalg.inv(self.local_to_launch_rail())
def is_transsonic(self) -> bool:
"""
Returns:
bool: Returns True if the rocket is flying with transsonic speed at the current time of the simulation.
"""
mach = self.get_mach_number()
return mach > 0.8 and mach < 1.2
def is_supersonic(self) -> bool:
"""
Returns:
bool: True if the rocket is flying with supersonic speed at the current time of the simulation.
"""
return self.get_mach_number() > 1
def __fetch_value(self, name: str) -> float:
"""Get a specific value from the dataframe.
Args:
name (str): The name of the value to fetch.
Returns:
float: Returns the requested value.
"""
if self.__interpolation == 'linear':
t_min = self.__df['time'].iloc[self.__idx]
t_max = self.__df['time'].iloc[self.__idx + 1]
# Sometimes no time passes in-between two samples.
if t_max == t_min:
return self.__df[name].iloc[self.__idx]
# Compute the weight for interpolation.
alpha = (self.get_time() - t_min) / (t_max - t_min)
# Interpolate linearly between the two data points.
return (1 - alpha) * self.__df[name].iloc[self.__idx] + alpha * self.__df[name].iloc[self.__idx + 1]
def __fetch_values(self, names: List[str]) -> np.array:
"""Get specific values from the dataframe.
Args:
names (List[str]): Names of the values to get.
Returns:
np.array: Returns a numpy array containing the requested values in the same order as in the input list.
"""
return np.asarray([self.__fetch_value(name) for name in names])
def get_velocity(self) -> float:
"""
Returns:
np.array: Returns the velocity at the current time of the simulation.
"""
return self.__fetch_value('velocity')
def get_acceleration(self, frame='FL') -> ArrayLike:
"""_summary_
Args:
frame (str, optional): _description_. Defaults to 'FL'.
Returns:
ArrayLike: _description_
"""
acc = self.__fetch_values(['ax', 'ay', 'az'])
if frame == 'B':
return self.launch_rail_to_body() @ acc
return acc
def get_angular_velocities(self) -> ArrayLike:
"""
Returns:
ArrayLike: Gets the derivatives in angular velocity across all axes of the rocket.
"""
return self.__fetch_values(['omega_X', 'omega_Y', 'omega_Z'])
def get_velocity(self, frame='FL') -> ArrayLike:
"""
Args:
frame (str, optional): _description_. Defaults to 'FL'.
Returns:
ArrayLike: _description_
"""
vel = self.__fetch_values(['vx', 'vy', 'vz'])
if frame == 'B':
return self.launch_rail_to_body() @ vel
return vel
def get_mach_number(self) -> float:
"""
Returns:
float: Returns the mach number at the current time of the simulation.
"""
return self.__fetch_value('mach')
def get_speed_of_sound(self) -> float:
"""
Returns:
float: Returns the speed of sound at the current time of the simulation.
"""
return self.__fetch_value('speedofsound')
def get_rotation_rates(self) -> np.array:
"""
Returns:
np.array: Returns the rotation rates at the current time of the simulation.
"""
return self.__fetch_values(['OMEGA_X', 'OMEGA_Y', 'OMEGA_Z'])
def get_rotation(self) -> np.array:
"""
Returns:
np.array: _description_
"""
return self.__fetch_values(['pitch_l', 'yaw_l', 'roll_l'])
def get_temperature(self) -> float:
"""
Returns:
np.array: Returns the temperature at the current time of the simulation.
"""
return self.__fetch_value('temperature')
def get_pressure(self) -> float:
"""
Returns:
np.array: Returns the pressure at the current time of the simulation.
"""
return self.__fetch_value('pressure')
def get_thrust(self) -> float:
"""
Returns:
float: Returns the thrust value for the current time of the simulation.
"""
return self.__fetch_value('thrust')
def get_drag(self) -> float:
"""
Returns:
float: Returns the drag value for the current time of the simulation.
"""
return self.__fetch_value('drag')
def get_mass(self) -> float:
"""
Returns:
float: Returns the mass value for the current time of the simulation.
"""
return self.__fetch_value('mass')
if __name__ == '__main__':
pass