SPATZ/spatz/dataset.py

442 lines
14 KiB
Python

import math
import numpy as np
import pandas as pd
from enum import Enum
from typing import List, Literal
from numpy.typing import ArrayLike
from scipy.spatial.transform import Rotation
from spatz.logger import Advanceable
class Phase(Enum):
ONPAD = 1
LOI = 2
RCI = 3
ECI = 4
ADI = 5
def T1(angle):
# return Rotation.from_euler('X', angle, degrees=False).as_matrix()
return np.array([
[1, 0, 0],
[0, math.cos(angle), math.sin(angle)],
[0, -math.sin(angle), math.cos(angle)],
])
def T2(angle):
# return Rotation.from_euler('Y', angle, degrees=False).as_matrix()
return np.array([
[math.cos(angle), 0, -math.sin(angle)],
[0, 1, 0],
[math.sin(angle), 0, math.cos(angle)]
])
def T3(angle):
# return Rotation.from_euler('Z', angle, degrees=False).as_matrix()
return np.array([
[math.cos(angle), math.sin(angle), 0],
[-math.sin(angle), math.cos(angle), 0],
[0, 0, 1]
])
class Dataset(Advanceable):
def __init__(self, path: str, interpolation: str = 'linear'):
"""A wrapper class for a Pandas dataframe containing simulation data.
Args:
df (pd.DataFrame): A Pandas dataframe containing simulation data.
interpolation (str, optional): The interpolation method for obtaining new data points. Defaults to 'linear'.
"""
super().__init__()
self.__df = pd.read_csv(path)
self.__idx = 0
self.__interpolation = interpolation
# Find the liftoff time.
self.__loi = self.__df['time'][self.__df['phase'] == Phase.LOI].min()
self.__rci = self.__df['time'][self.__df['phase'] == Phase.RCI].min()
self.__eci = self.__df['time'][self.__df['phase'] == Phase.ECI].min()
self.__adi = self.__df['time'][self.__df['phase'] == Phase.ADI].min()
def _on_reset(self):
pass
def _get_closest_idx(self, t: float) -> int:
"""Gets an index _idx_ for the dataframe _df_ such that the values at the given time _t_ are somewhere between
_idx_ and _idx+1_.
Args:
t (float): The requested time.
Returns:
int: The computed index.
"""
idx = (self.__df['time'] - t).abs().idxmin()
idx = idx if self.__df['time'].loc[idx] <= t else idx - 1
return idx
def _on_step(self, _: float):
self.__idx = self._get_closest_idx(self.get_time())
def get_phase(self) -> Phase:
"""
Returns:
Phase: Get the current phase of the flight.
"""
t = self.get_time()
if t < self.__loi:
return Phase.ONPAD
if t < self.__rci:
return Phase.LOI
if t < self.__eci:
return Phase.RCI
if t < self.__adi:
return Phase.ECI
return Phase.ADI
def get_time_until(self, phase: Phase) -> float:
"""Returns how much time is left until the given phase is reached.
Negative values represent the time that has passed since the phase was
reached.
Args:
phase (Phase): A phase of the flight.
Returns:
float: Time until or since the phase was reached.
"""
t = self.get_time()
switch = {
Phase.ONPAD: 0 - t,
Phase.LOI: self.__loi - t,
Phase.RCI: self.__rci - t,
Phase.ECI: self.__eci - t,
Phase.ADI: self.__adi - t
}
return switch.get(phase)
def get_length(self) -> float:
"""Returns the time horizon of this dataset.
Returns:
float: The last time step in the dataset.
"""
return max(self.__df['time'])
def get_start_time(self) -> float:
"""
Returns:
float: Returns the starting time of the simulation.
"""
return self.fetch_start_value('time')
def fetch_start_value(self, name: str) -> float:
"""Get the initial value for a given attribute from the dataframe.
Args:
name (str): The name of the value to fetch.
Returns:
float: Returns the requested value.
"""
return self.__df.at[0, name]
def fetch_init_values(self, names: List[str]) -> ArrayLike:
"""Get the initial value for given attributes from the dataframe.
Args:
names (List[str]): Names of the values to get.
Returns:
np.array: Returns a numpy array containing the requested values in the same order as in the input list.
"""
return np.asarray([self.fetch_start_value(name) for name in names])
def fetch_value(self, name: str, t: float | None = None) -> float:
"""Get a specific value from the dataframe.
Args:
name (str): The name of the value to fetch.
t (float): Allows specification of a different time instead of the current time. None for current time.
Returns:
float: Returns the requested value.
"""
idx = self.__idx if t is None else self._get_closest_idx(t)
if self.__interpolation == 'linear':
t_min = self.__df.at[idx, 'time']
t_max = self.__df.at[idx + 1, 'time']
# Sometimes no time passes in-between two samples.
if t_max == t_min:
return self.__df.at[name, idx]
# Compute the weight for interpolation.
alpha = (self.get_time() - t_min) / (t_max - t_min)
# Interpolate linearly between the two data points.
return (1 - alpha) * self.__df.at[idx, name] + alpha * self.__df.at[idx + 1, name]
def fetch_values(self, names: List[str], t: float | None = None) -> ArrayLike:
"""Get specific values from the dataframe.
Args:
names (List[str]): Names of the values to get.
t (float): Allows specification of a different time instead of the current time. None for current time.
Returns:
np.array: Returns a numpy array containing the requested values in the same order as in the input list.
"""
return np.asarray([self.fetch_value(name, t) for name in names])
def local_to_body(self, t: float | None = None) -> ArrayLike:
"""
Args:
t (float): The time to get the transformation matrix for.
Returns:
ArrayLike: The current transformation matrix from local to body-fixed coords.
"""
# Get the rotation in the local coordinate system.
rots = self.fetch_values(['pitch_l', 'yaw_l', 'roll_l'], t)
pitch_l, yaw_l, roll_l = rots[0], rots[1], rots[2]
return T1(roll_l) @ T2(pitch_l - math.pi/2) @ T1(-yaw_l)
def global_to_local(self, t: float | None = None) -> ArrayLike:
"""
Args:
t (float): The time to get the transformation matrix for.
Returns:
ArrayLike: The current transformation matrix from global to local coords.
"""
decl = self.fetch_value('declination', t)
long = self.fetch_value('longitude', t)
t0 = self.get_start_time()
omega_E = (2*math.pi) / (24*60*60)
return T2(-decl) @ T3(long + omega_E * t0)
def global_to_launch_rail(self, t: float | None = None) -> ArrayLike:
"""
Args:
t (float): The time to get the transformation matrix for. Doesn't do anything here because
the transformation remains the same across all time steps.
Returns:
ArrayLike: The current transformation matrix from global to launch rail coords.
"""
init_long = self.fetch_start_value('longitude')
init_lat = self.fetch_start_value('latitude')
return T2(-math.pi/2 - init_lat) @ T3(init_long)
def local_to_launch_rail(self, t: float | None = None) -> ArrayLike:
"""
Args:
t (float): The time to get the transformation matrix for.
Returns:
ArrayLike: The current transformation matrix from local to launch rail coords.
"""
return self.global_to_launch_rail(t) @ np.linalg.inv(self.global_to_local(t))
def launch_rail_to_body(self, t: float | None = None) -> ArrayLike:
"""
Args:
t (float): The time to get the transformation matrix for.
Returns:
ArrayLike: The current transformation matrix from launch rail to local coords.
"""
return self.local_to_body(t) @ np.linalg.inv(self.local_to_launch_rail(t))
def get_mach_number(self, t: float | None = None) -> float:
"""
Args:
t (float): Allows specification of a different time instead of the current time. None for current time.
Returns:
float: Returns the mach number at the specified time.
"""
return self.fetch_value('mach', t)
def is_transsonic(self, t: float | None = None) -> bool:
"""
Args:
t (float): Allows specification of a different time instead of the current time. None for current time.
Returns:
bool: Returns True if the rocket is flying with transsonic speed at the specified time.
"""
mach = self.get_mach_number(t)
return mach > 0.8 and mach < 1.2
def is_supersonic(self, t: float | None = None) -> bool:
"""
Args:
t (float): Allows specification of a different time instead of the current time. None for current time.
Returns:
bool: True if the rocket is flying with supersonic speed at the specified time.
"""
return self.get_mach_number(t) > 1
def get_velocity(self, t: float | None = None) -> float:
"""
Args:
t (float): Allows specification of a different time instead of the current time. None for current time.
Returns:
np.array: Returns the velocity at the current time.
"""
return self.fetch_value('velocity', t)
def get_acceleration(self, frame='FL', t: float | None = None) -> ArrayLike:
"""
Args:
frame (str, optional): The coordinate frame to compute the acceleration for. Defaults to 'FL'.
t (float): Allows specification of a different time instead of the current time. None for current time.
Returns:
ArrayLike: Returns the spacecraft's acceleration at the given time.
"""
acc = self.fetch_values(['ax', 'ay', 'az'], t)
if frame == 'B':
return self.launch_rail_to_body(t) @ acc
return acc
def get_angular_velocities(self, t: float | None = None) -> ArrayLike:
"""
Args:
t (float): Allows specification of a different time instead of the current time. None for current time.
Returns:
ArrayLike: Gets the derivatives in angular velocity across all axes of the rocket.
"""
return self.fetch_values(['OMEGA_X', 'OMEGA_Y', 'OMEGA_Z'], t)
def get_velocity(self, frame: Literal['L', 'B', 'LF'] = 'LF', t: float | None = None) -> ArrayLike:
"""
Args:
frame (str, optional): The coordinate frame to compute the velocity for. Defaults to 'FL'.
t (float): Allows specification of a different time instead of the current time. None for current time.
Returns:
ArrayLike: Returns the spacecraft's velocity at a given time.
"""
vel = self.fetch_values(['vx', 'vy', 'vz'], t)
if frame == 'B':
return self.launch_rail_to_body(t) @ vel
elif frame == 'L':
return np.linalg.inv(self.local_to_launch_rail()) @ vel
return vel
def get_altitude(self, t: float | None = None) -> float:
"""
Args:
t (float | None, optional): Allows specification of a different time instead of the current time. None for current time.
Returns:
float: Returns the altitude in meter at the specified time.
"""
return self.fetch_value('altitude', t)
def get_speed_of_sound(self, t: float | None = None) -> float:
"""
Args:
t (float): Allows specification of a different time instead of the current time. None for current time.
Returns:
float: Returns the speed of sound at the specified time.
"""
return self.fetch_value('speedofsound', t)
def get_rotation(self, t: float | None = None) -> np.array:
"""
Args:
t (float): Allows specification of a different time instead of the current time. None for current time.
Returns:
np.array: returns the rotation at the specified time.
"""
return self.fetch_values(['pitch_l', 'yaw_l', 'roll_l'], t)
def get_temperature(self, t: float | None = None) -> float:
"""
Args:
t (float): Allows specification of a different time instead of the current time. None for current time.
Returns:
np.array: Returns the temperature at the spepcified time.
"""
return self.fetch_value('temperature', t)
def get_pressure(self, t: float | None = None) -> float:
"""
Args:
t (float): Allows specification of a different time instead of the current time. None for current time.
Returns:
np.array: Returns the pressure at the current time of the simulation.
"""
return self.fetch_value('pressure', t)
def get_thrust(self, t: float | None = None) -> float:
"""
Args:
t (float): Allows specification of a different time instead of the current time. None for current time.
Returns:
float: Returns the thrust value for the specified time.
"""
return self.fetch_value('thrust', t)
def get_drag(self, t: float | None = None) -> float:
"""
Args:
t (float): Allows specification of a different time instead of the current time. None for current time.
Returns:
float: Returns the drag value for the specified time.
"""
return self.fetch_value('drag', t)
def get_mass(self, t: float | None = None) -> float:
"""
Args:
t (float): Allows specification of a different time instead of the current time. None for current time.
Returns:
float: Returns the mass value for the specified time.
"""
return self.fetch_value('mass', t)
if __name__ == '__main__':
pass