mirror of
https://git.intern.spaceteamaachen.de/ALPAKA/SPATZ.git
synced 2025-08-02 01:21:53 +00:00
Merge pull request 'time-rework' (#2) from time-rework into main
Reviewed-on: https://git.intern.spaceteamaachen.de/ALPAKA/SPATZ/pulls/2
This commit is contained in:
commit
1bd93f8a33
1723
data/simulations/19km.txt
Normal file
1723
data/simulations/19km.txt
Normal file
File diff suppressed because it is too large
Load Diff
1724
data/simulations/raw/Result_Export.txt
Normal file
1724
data/simulations/raw/Result_Export.txt
Normal file
File diff suppressed because one or more lines are too long
423
spatz/dataset.py
423
spatz/dataset.py
@ -19,6 +19,33 @@ class Phase(Enum):
|
|||||||
ADI = 5
|
ADI = 5
|
||||||
|
|
||||||
|
|
||||||
|
def T1(angle):
|
||||||
|
# return Rotation.from_euler('X', angle, degrees=False).as_matrix()
|
||||||
|
return np.array([
|
||||||
|
[1, 0, 0],
|
||||||
|
[0, math.cos(angle), math.sin(angle)],
|
||||||
|
[0, -math.sin(angle), math.cos(angle)],
|
||||||
|
])
|
||||||
|
|
||||||
|
|
||||||
|
def T2(angle):
|
||||||
|
# return Rotation.from_euler('Y', angle, degrees=False).as_matrix()
|
||||||
|
return np.array([
|
||||||
|
[math.cos(angle), 0, -math.sin(angle)],
|
||||||
|
[0, 1, 0],
|
||||||
|
[math.sin(angle), 0, math.cos(angle)]
|
||||||
|
])
|
||||||
|
|
||||||
|
|
||||||
|
def T3(angle):
|
||||||
|
# return Rotation.from_euler('Z', angle, degrees=False).as_matrix()
|
||||||
|
return np.array([
|
||||||
|
[math.cos(angle), math.sin(angle), 0],
|
||||||
|
[-math.sin(angle), math.cos(angle), 0],
|
||||||
|
[0, 0, 1]
|
||||||
|
])
|
||||||
|
|
||||||
|
|
||||||
class Dataset(Advanceable):
|
class Dataset(Advanceable):
|
||||||
def __init__(self, path: str, interpolation: str = 'linear'):
|
def __init__(self, path: str, interpolation: str = 'linear'):
|
||||||
"""A wrapper class for a Pandas dataframe containing simulation data.
|
"""A wrapper class for a Pandas dataframe containing simulation data.
|
||||||
@ -42,9 +69,23 @@ class Dataset(Advanceable):
|
|||||||
def _on_reset(self):
|
def _on_reset(self):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
def _get_closest_idx(self, t: float) -> int:
|
||||||
|
"""Gets an index _idx_ for the dataframe _df_ such that the values at the given time _t_ are somewhere between
|
||||||
|
_idx_ and _idx+1_.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
t (float): The requested time.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
int: The computed index.
|
||||||
|
"""
|
||||||
|
idx = (self.__df['time'] - t).abs().idxmin()
|
||||||
|
idx = idx if self.__df['time'].loc[idx] <= t else idx - 1
|
||||||
|
|
||||||
|
return idx
|
||||||
|
|
||||||
def _on_step(self, _: float):
|
def _on_step(self, _: float):
|
||||||
idx = (self.__df['time'] - self.get_time()).abs().idxmin()
|
self.__idx = self._get_closest_idx(self.get_time())
|
||||||
self.__idx = idx if self.__df['time'].loc[idx] < self.get_time() else idx - 1
|
|
||||||
|
|
||||||
def get_phase(self) -> Phase:
|
def get_phase(self) -> Phase:
|
||||||
"""
|
"""
|
||||||
@ -97,100 +138,16 @@ class Dataset(Advanceable):
|
|||||||
float: The last time step in the dataset.
|
float: The last time step in the dataset.
|
||||||
"""
|
"""
|
||||||
return max(self.__df['time'])
|
return max(self.__df['time'])
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def T1(angle):
|
|
||||||
# return Rotation.from_euler('X', angle, degrees=False).as_matrix()
|
|
||||||
return np.array([
|
|
||||||
[1, 0, 0],
|
|
||||||
[0, math.cos(angle), math.sin(angle)],
|
|
||||||
[0, -math.sin(angle), math.cos(angle)],
|
|
||||||
])
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def T2(angle):
|
|
||||||
# return Rotation.from_euler('Y', angle, degrees=False).as_matrix()
|
|
||||||
return np.array([
|
|
||||||
[math.cos(angle), 0, -math.sin(angle)],
|
|
||||||
[0, 1, 0],
|
|
||||||
[math.sin(angle), 0, math.cos(angle)]
|
|
||||||
])
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def T3(angle):
|
|
||||||
# return Rotation.from_euler('Z', angle, degrees=False).as_matrix()
|
|
||||||
return np.array([
|
|
||||||
[math.cos(angle), math.sin(angle), 0],
|
|
||||||
[-math.sin(angle), math.cos(angle), 0],
|
|
||||||
[0, 0, 1]
|
|
||||||
])
|
|
||||||
|
|
||||||
def local_to_body(self) -> ArrayLike:
|
|
||||||
"""
|
|
||||||
Returns:
|
|
||||||
ArrayLike: The current transformation matrix from local to body-fixed coords.
|
|
||||||
"""
|
|
||||||
# Get the rotation in the local coordinate system.
|
|
||||||
rots = self.fetch_values(['pitch_l', 'yaw_l', 'roll_l'])
|
|
||||||
pitch_l, yaw_l, roll_l = rots[0], rots[1], rots[2]
|
|
||||||
|
|
||||||
return self.T1(roll_l) @ self.T2(pitch_l - math.pi/2) @ self.T1(-yaw_l)
|
|
||||||
|
|
||||||
def global_to_local(self) -> ArrayLike:
|
def get_start_time(self) -> float:
|
||||||
"""
|
"""
|
||||||
Returns:
|
Returns:
|
||||||
ArrayLike: The current transformation matrix from global to local coords.
|
float: Returns the starting time of the simulation.
|
||||||
"""
|
"""
|
||||||
decl = self.fetch_value('declination')
|
return self.fetch_start_value('time')
|
||||||
long = self.fetch_value('longitude')
|
|
||||||
t0 = self.__df['time'].iloc[0]
|
|
||||||
|
|
||||||
omega_E = (2*math.pi) / (24*60*60)
|
def fetch_start_value(self, name: str) -> float:
|
||||||
|
"""Get the initial value for a given attribute from the dataframe.
|
||||||
return self.T2(-decl) @ self.T3(long + omega_E * t0)
|
|
||||||
|
|
||||||
def global_to_launch_rail(self) -> ArrayLike:
|
|
||||||
"""
|
|
||||||
Returns:
|
|
||||||
ArrayLike: The current transformation matrix from global to launch rail coords.
|
|
||||||
"""
|
|
||||||
init_long = self.__df['longitude'].iloc[0]
|
|
||||||
init_lat = self.__df['latitude'].iloc[0]
|
|
||||||
|
|
||||||
return self.T2(-math.pi/2 - init_lat) @ self.T3(init_long)
|
|
||||||
|
|
||||||
def local_to_launch_rail(self) -> ArrayLike:
|
|
||||||
"""
|
|
||||||
Returns:
|
|
||||||
ArrayLike: The current transformation matrix from local to launch rail coords.
|
|
||||||
"""
|
|
||||||
return self.global_to_launch_rail() @ np.linalg.inv(self.global_to_local())
|
|
||||||
|
|
||||||
def launch_rail_to_body(self) -> ArrayLike:
|
|
||||||
"""
|
|
||||||
Returns:
|
|
||||||
ArrayLike: The current transformation matrix from launch rail to local coords.
|
|
||||||
"""
|
|
||||||
return self.local_to_body() @ np.linalg.inv(self.local_to_launch_rail())
|
|
||||||
|
|
||||||
def is_transsonic(self) -> bool:
|
|
||||||
"""
|
|
||||||
Returns:
|
|
||||||
bool: Returns True if the rocket is flying with transsonic speed at the current time of the simulation.
|
|
||||||
"""
|
|
||||||
mach = self.get_mach_number()
|
|
||||||
|
|
||||||
return mach > 0.8 and mach < 1.2
|
|
||||||
|
|
||||||
def is_supersonic(self) -> bool:
|
|
||||||
"""
|
|
||||||
Returns:
|
|
||||||
bool: True if the rocket is flying with supersonic speed at the current time of the simulation.
|
|
||||||
"""
|
|
||||||
return self.get_mach_number() > 1
|
|
||||||
|
|
||||||
def fetch_value(self, name: str) -> float:
|
|
||||||
"""Get a specific value from the dataframe.
|
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
name (str): The name of the value to fetch.
|
name (str): The name of the value to fetch.
|
||||||
@ -198,22 +155,10 @@ class Dataset(Advanceable):
|
|||||||
Returns:
|
Returns:
|
||||||
float: Returns the requested value.
|
float: Returns the requested value.
|
||||||
"""
|
"""
|
||||||
if self.__interpolation == 'linear':
|
return self.__df.at[0, name]
|
||||||
t_min = self.__df.at[self.__idx, 'time']
|
|
||||||
t_max = self.__df.at[self.__idx + 1, 'time']
|
def fetch_init_values(self, names: List[str]) -> ArrayLike:
|
||||||
|
"""Get the initial value for given attributes from the dataframe.
|
||||||
# Sometimes no time passes in-between two samples.
|
|
||||||
if t_max == t_min:
|
|
||||||
return self.__df.at[name, self.__idx]
|
|
||||||
|
|
||||||
# Compute the weight for interpolation.
|
|
||||||
alpha = (self.get_time() - t_min) / (t_max - t_min)
|
|
||||||
|
|
||||||
# Interpolate linearly between the two data points.
|
|
||||||
return (1 - alpha) * self.__df.at[self.__idx, name] + alpha * self.__df.at[self.__idx + 1, name]
|
|
||||||
|
|
||||||
def fetch_values(self, names: List[str]) -> np.array:
|
|
||||||
"""Get specific values from the dataframe.
|
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
names (List[str]): Names of the values to get.
|
names (List[str]): Names of the values to get.
|
||||||
@ -221,116 +166,274 @@ class Dataset(Advanceable):
|
|||||||
Returns:
|
Returns:
|
||||||
np.array: Returns a numpy array containing the requested values in the same order as in the input list.
|
np.array: Returns a numpy array containing the requested values in the same order as in the input list.
|
||||||
"""
|
"""
|
||||||
return np.asarray([self.fetch_value(name) for name in names])
|
return np.asarray([self.fetch_start_value(name) for name in names])
|
||||||
|
|
||||||
def get_velocity(self) -> float:
|
def fetch_value(self, name: str, t: float | None = None) -> float:
|
||||||
"""
|
"""Get a specific value from the dataframe.
|
||||||
Returns:
|
|
||||||
np.array: Returns the velocity at the current time of the simulation.
|
|
||||||
"""
|
|
||||||
return self.fetch_value('velocity')
|
|
||||||
|
|
||||||
def get_acceleration(self, frame='FL') -> ArrayLike:
|
|
||||||
"""_summary_
|
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
frame (str, optional): _description_. Defaults to 'FL'.
|
name (str): The name of the value to fetch.
|
||||||
|
t (float): Allows specification of a different time instead of the current time. None for current time.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
ArrayLike: _description_
|
float: Returns the requested value.
|
||||||
"""
|
"""
|
||||||
acc = self.fetch_values(['ax', 'ay', 'az'])
|
idx = self.__idx if t is None else self._get_closest_idx(t)
|
||||||
|
|
||||||
|
if self.__interpolation == 'linear':
|
||||||
|
t_min = self.__df.at[idx, 'time']
|
||||||
|
t_max = self.__df.at[idx + 1, 'time']
|
||||||
|
|
||||||
|
# Sometimes no time passes in-between two samples.
|
||||||
|
if t_max == t_min:
|
||||||
|
return self.__df.at[name, idx]
|
||||||
|
|
||||||
|
# Compute the weight for interpolation.
|
||||||
|
alpha = (self.get_time() - t_min) / (t_max - t_min)
|
||||||
|
|
||||||
|
# Interpolate linearly between the two data points.
|
||||||
|
return (1 - alpha) * self.__df.at[idx, name] + alpha * self.__df.at[idx + 1, name]
|
||||||
|
|
||||||
|
def fetch_values(self, names: List[str], t: float | None = None) -> ArrayLike:
|
||||||
|
"""Get specific values from the dataframe.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
names (List[str]): Names of the values to get.
|
||||||
|
t (float): Allows specification of a different time instead of the current time. None for current time.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
np.array: Returns a numpy array containing the requested values in the same order as in the input list.
|
||||||
|
"""
|
||||||
|
return np.asarray([self.fetch_value(name, t) for name in names])
|
||||||
|
|
||||||
|
def local_to_body(self, t: float | None = None) -> ArrayLike:
|
||||||
|
"""
|
||||||
|
Args:
|
||||||
|
t (float): The time to get the transformation matrix for.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
ArrayLike: The current transformation matrix from local to body-fixed coords.
|
||||||
|
"""
|
||||||
|
# Get the rotation in the local coordinate system.
|
||||||
|
rots = self.fetch_values(['pitch_l', 'yaw_l', 'roll_l'], t)
|
||||||
|
pitch_l, yaw_l, roll_l = rots[0], rots[1], rots[2]
|
||||||
|
|
||||||
|
return T1(roll_l) @ T2(pitch_l - math.pi/2) @ T1(-yaw_l)
|
||||||
|
|
||||||
|
def global_to_local(self, t: float | None = None) -> ArrayLike:
|
||||||
|
"""
|
||||||
|
Args:
|
||||||
|
t (float): The time to get the transformation matrix for.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
ArrayLike: The current transformation matrix from global to local coords.
|
||||||
|
"""
|
||||||
|
decl = self.fetch_value('declination', t)
|
||||||
|
long = self.fetch_value('longitude', t)
|
||||||
|
t0 = self.get_start_time()
|
||||||
|
|
||||||
|
omega_E = (2*math.pi) / (24*60*60)
|
||||||
|
|
||||||
|
return T2(-decl) @ T3(long + omega_E * t0)
|
||||||
|
|
||||||
|
def global_to_launch_rail(self, t: float | None = None) -> ArrayLike:
|
||||||
|
"""
|
||||||
|
Args:
|
||||||
|
t (float): The time to get the transformation matrix for. Doesn't do anything here because
|
||||||
|
the transformation remains the same across all time steps.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
ArrayLike: The current transformation matrix from global to launch rail coords.
|
||||||
|
"""
|
||||||
|
init_long = self.fetch_start_value('longitude')
|
||||||
|
init_lat = self.fetch_start_value('latitude')
|
||||||
|
|
||||||
|
return T2(-math.pi/2 - init_lat) @ T3(init_long)
|
||||||
|
|
||||||
|
def local_to_launch_rail(self, t: float | None = None) -> ArrayLike:
|
||||||
|
"""
|
||||||
|
Args:
|
||||||
|
t (float): The time to get the transformation matrix for.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
ArrayLike: The current transformation matrix from local to launch rail coords.
|
||||||
|
"""
|
||||||
|
return self.global_to_launch_rail(t) @ np.linalg.inv(self.global_to_local(t))
|
||||||
|
|
||||||
|
def launch_rail_to_body(self, t: float | None = None) -> ArrayLike:
|
||||||
|
"""
|
||||||
|
Args:
|
||||||
|
t (float): The time to get the transformation matrix for.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
ArrayLike: The current transformation matrix from launch rail to local coords.
|
||||||
|
"""
|
||||||
|
return self.local_to_body(t) @ np.linalg.inv(self.local_to_launch_rail(t))
|
||||||
|
|
||||||
|
def get_mach_number(self, t: float | None = None) -> float:
|
||||||
|
"""
|
||||||
|
Args:
|
||||||
|
t (float): Allows specification of a different time instead of the current time. None for current time.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
float: Returns the mach number at the specified time.
|
||||||
|
"""
|
||||||
|
return self.fetch_value('mach', t)
|
||||||
|
|
||||||
|
def is_transsonic(self, t: float | None = None) -> bool:
|
||||||
|
"""
|
||||||
|
Args:
|
||||||
|
t (float): Allows specification of a different time instead of the current time. None for current time.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: Returns True if the rocket is flying with transsonic speed at the specified time.
|
||||||
|
"""
|
||||||
|
mach = self.get_mach_number(t)
|
||||||
|
|
||||||
|
return mach > 0.8 and mach < 1.2
|
||||||
|
|
||||||
|
def is_supersonic(self, t: float | None = None) -> bool:
|
||||||
|
"""
|
||||||
|
Args:
|
||||||
|
t (float): Allows specification of a different time instead of the current time. None for current time.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: True if the rocket is flying with supersonic speed at the specified time.
|
||||||
|
"""
|
||||||
|
return self.get_mach_number(t) > 1
|
||||||
|
|
||||||
|
def get_velocity(self, t: float | None = None) -> float:
|
||||||
|
"""
|
||||||
|
Args:
|
||||||
|
t (float): Allows specification of a different time instead of the current time. None for current time.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
np.array: Returns the velocity at the current time.
|
||||||
|
"""
|
||||||
|
return self.fetch_value('velocity', t)
|
||||||
|
|
||||||
|
def get_acceleration(self, frame='FL', t: float | None = None) -> ArrayLike:
|
||||||
|
"""
|
||||||
|
Args:
|
||||||
|
frame (str, optional): The coordinate frame to compute the acceleration for. Defaults to 'FL'.
|
||||||
|
t (float): Allows specification of a different time instead of the current time. None for current time.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
ArrayLike: Returns the spacecraft's acceleration at the given time.
|
||||||
|
"""
|
||||||
|
acc = self.fetch_values(['ax', 'ay', 'az'], t)
|
||||||
|
|
||||||
if frame == 'B':
|
if frame == 'B':
|
||||||
return self.launch_rail_to_body() @ acc
|
return self.launch_rail_to_body(t) @ acc
|
||||||
|
|
||||||
return acc
|
return acc
|
||||||
|
|
||||||
def get_angular_velocities(self) -> ArrayLike:
|
def get_angular_velocities(self, t: float | None = None) -> ArrayLike:
|
||||||
"""
|
"""
|
||||||
|
Args:
|
||||||
|
t (float): Allows specification of a different time instead of the current time. None for current time.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
ArrayLike: Gets the derivatives in angular velocity across all axes of the rocket.
|
ArrayLike: Gets the derivatives in angular velocity across all axes of the rocket.
|
||||||
"""
|
"""
|
||||||
return self.fetch_values(['omega_X', 'omega_Y', 'omega_Z'])
|
return self.fetch_values(['OMEGA_X', 'OMEGA_Y', 'OMEGA_Z'], t)
|
||||||
|
|
||||||
def get_velocity(self, frame='FL') -> ArrayLike:
|
def get_velocity(self, frame='FL', t: float | None = None) -> ArrayLike:
|
||||||
"""
|
"""
|
||||||
Args:
|
Args:
|
||||||
frame (str, optional): _description_. Defaults to 'FL'.
|
frame (str, optional): The coordinate frame to compute the velocity for. Defaults to 'FL'.
|
||||||
|
t (float): Allows specification of a different time instead of the current time. None for current time.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
ArrayLike: _description_
|
ArrayLike: Returns the spacecraft's velocity at a given time.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
vel = self.fetch_values(['vx', 'vy', 'vz'])
|
vel = self.fetch_values(['vx', 'vy', 'vz'], t)
|
||||||
|
|
||||||
if frame == 'B':
|
if frame == 'B':
|
||||||
return self.launch_rail_to_body() @ vel
|
return self.launch_rail_to_body(t) @ vel
|
||||||
|
|
||||||
return vel
|
return vel
|
||||||
|
|
||||||
def get_mach_number(self) -> float:
|
def get_altitude(self, t: float | None = None) -> float:
|
||||||
"""
|
"""
|
||||||
Returns:
|
Args:
|
||||||
float: Returns the mach number at the current time of the simulation.
|
t (float | None, optional): Allows specification of a different time instead of the current time. None for current time.
|
||||||
"""
|
|
||||||
return self.fetch_value('mach')
|
|
||||||
|
|
||||||
def get_speed_of_sound(self) -> float:
|
|
||||||
"""
|
|
||||||
Returns:
|
Returns:
|
||||||
float: Returns the speed of sound at the current time of the simulation.
|
float: Returns the altitude in meter at the specified time.
|
||||||
"""
|
"""
|
||||||
return self.fetch_value('speedofsound')
|
return self.fetch_value('altitude', t)
|
||||||
|
|
||||||
def get_rotation_rates(self) -> np.array:
|
def get_speed_of_sound(self, t: float | None = None) -> float:
|
||||||
"""
|
"""
|
||||||
Returns:
|
Args:
|
||||||
np.array: Returns the rotation rates at the current time of the simulation.
|
t (float): Allows specification of a different time instead of the current time. None for current time.
|
||||||
"""
|
|
||||||
return self.fetch_values(['OMEGA_X', 'OMEGA_Y', 'OMEGA_Z'])
|
|
||||||
|
|
||||||
def get_rotation(self) -> np.array:
|
|
||||||
"""
|
|
||||||
Returns:
|
Returns:
|
||||||
np.array: _description_
|
float: Returns the speed of sound at the specified time.
|
||||||
"""
|
"""
|
||||||
return self.fetch_values(['pitch_l', 'yaw_l', 'roll_l'])
|
return self.fetch_value('speedofsound', t)
|
||||||
|
|
||||||
def get_temperature(self) -> float:
|
def get_rotation(self, t: float | None = None) -> np.array:
|
||||||
"""
|
"""
|
||||||
|
Args:
|
||||||
|
t (float): Allows specification of a different time instead of the current time. None for current time.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
np.array: Returns the temperature at the current time of the simulation.
|
np.array: returns the rotation at the specified time.
|
||||||
"""
|
"""
|
||||||
return self.fetch_value('temperature')
|
return self.fetch_values(['pitch_l', 'yaw_l', 'roll_l'], t)
|
||||||
|
|
||||||
|
def get_temperature(self, t: float | None = None) -> float:
|
||||||
|
"""
|
||||||
|
Args:
|
||||||
|
t (float): Allows specification of a different time instead of the current time. None for current time.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
np.array: Returns the temperature at the spepcified time.
|
||||||
|
"""
|
||||||
|
return self.fetch_value('temperature', t)
|
||||||
|
|
||||||
def get_pressure(self) -> float:
|
def get_pressure(self, t: float | None = None) -> float:
|
||||||
"""
|
"""
|
||||||
|
Args:
|
||||||
|
t (float): Allows specification of a different time instead of the current time. None for current time.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
np.array: Returns the pressure at the current time of the simulation.
|
np.array: Returns the pressure at the current time of the simulation.
|
||||||
"""
|
"""
|
||||||
return self.fetch_value('pressure')
|
return self.fetch_value('pressure', t)
|
||||||
|
|
||||||
def get_thrust(self) -> float:
|
def get_thrust(self, t: float | None = None) -> float:
|
||||||
"""
|
"""
|
||||||
|
Args:
|
||||||
|
t (float): Allows specification of a different time instead of the current time. None for current time.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
float: Returns the thrust value for the current time of the simulation.
|
float: Returns the thrust value for the specified time.
|
||||||
"""
|
"""
|
||||||
return self.fetch_value('thrust')
|
return self.fetch_value('thrust', t)
|
||||||
|
|
||||||
def get_drag(self) -> float:
|
def get_drag(self, t: float | None = None) -> float:
|
||||||
"""
|
"""
|
||||||
|
Args:
|
||||||
|
t (float): Allows specification of a different time instead of the current time. None for current time.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
float: Returns the drag value for the current time of the simulation.
|
float: Returns the drag value for the specified time.
|
||||||
"""
|
"""
|
||||||
return self.fetch_value('drag')
|
return self.fetch_value('drag', t)
|
||||||
|
|
||||||
def get_mass(self) -> float:
|
def get_mass(self, t: float | None = None) -> float:
|
||||||
"""
|
"""
|
||||||
|
Args:
|
||||||
|
t (float): Allows specification of a different time instead of the current time. None for current time.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
float: Returns the mass value for the current time of the simulation.
|
float: Returns the mass value for the specified time.
|
||||||
"""
|
"""
|
||||||
return self.fetch_value('mass')
|
return self.fetch_value('mass', t)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
import numpy as np
|
import numpy as np
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
|
|
||||||
from typing import Any, Tuple
|
from typing import Any, Tuple, List
|
||||||
from numpy.typing import ArrayLike
|
from numpy.typing import ArrayLike
|
||||||
|
|
||||||
from abc import abstractmethod
|
from abc import abstractmethod
|
||||||
@ -56,7 +56,7 @@ class Logger(Advanceable):
|
|||||||
def _on_reset(self):
|
def _on_reset(self):
|
||||||
self.__df = pd.DataFrame.from_dict({'time': [self.get_time()]}).astype(np.float64)
|
self.__df = pd.DataFrame.from_dict({'time': [self.get_time()]}).astype(np.float64)
|
||||||
|
|
||||||
def write(self, attrib: str, value: Any, domain: str = 'all'):
|
def write(self, attrib: str | List[str], value: Any | List[Any] | List[ArrayLike], domain: str = 'all'):
|
||||||
"""Writes a value to the logger.
|
"""Writes a value to the logger.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
@ -64,12 +64,16 @@ class Logger(Advanceable):
|
|||||||
value (Any): The value to log.
|
value (Any): The value to log.
|
||||||
domain (str, optional): The domain the value belongs to. Defaults to 'any'.
|
domain (str, optional): The domain the value belongs to. Defaults to 'any'.
|
||||||
"""
|
"""
|
||||||
name = f'{domain}/{attrib}'
|
if not isinstance(attrib, str):
|
||||||
|
for attr, val in zip(attrib, value):
|
||||||
|
self.write(attr, val, domain=domain)
|
||||||
|
else:
|
||||||
|
name = f'{domain}/{attrib}'
|
||||||
|
|
||||||
if name not in self.__df.columns:
|
if name not in self.__df.columns:
|
||||||
self.__df[name] = pd.Series([pd.NA] * len(self.__df))
|
self.__df[name] = pd.Series([pd.NA] * len(self.__df))
|
||||||
|
|
||||||
self.__df.at[self.__idx, name] = value
|
self.__df.at[self.__idx, name] = value
|
||||||
|
|
||||||
def get_dataframe(self) -> pd.DataFrame:
|
def get_dataframe(self) -> pd.DataFrame:
|
||||||
return self.__df
|
return self.__df
|
@ -6,6 +6,9 @@ from numpy.typing import ArrayLike
|
|||||||
|
|
||||||
def inv(val):
|
def inv(val):
|
||||||
if np.isscalar(val):
|
if np.isscalar(val):
|
||||||
|
if val == 0:
|
||||||
|
return 0
|
||||||
|
|
||||||
return 1 / val
|
return 1 / val
|
||||||
|
|
||||||
if len(val) == 1:
|
if len(val) == 1:
|
||||||
@ -76,6 +79,8 @@ class KalmanFilter:
|
|||||||
H = self.__H
|
H = self.__H
|
||||||
R = self.__R
|
R = self.__R
|
||||||
|
|
||||||
|
n = len(x)
|
||||||
|
|
||||||
if hasattr(self.__H, '__call__'):
|
if hasattr(self.__H, '__call__'):
|
||||||
H = self.__H(dt)
|
H = self.__H(dt)
|
||||||
|
|
||||||
@ -86,9 +91,9 @@ class KalmanFilter:
|
|||||||
K = err @ H.T @ inv(H @ err @ H.T + R)
|
K = err @ H.T @ inv(H @ err @ H.T + R)
|
||||||
|
|
||||||
# Compute the corrected state.
|
# Compute the corrected state.
|
||||||
x = x + K @ (z - H @ x)
|
x = x + (K @ (z - H @ x)).T
|
||||||
|
|
||||||
# Compute the error after correction.
|
# Compute the error after correction.
|
||||||
err = (np.identity('TODO') - K @ H) @ err
|
err = (np.identity(n) - K @ H) @ err
|
||||||
|
|
||||||
return x, err
|
return np.squeeze(np.asarray(x)), err
|
@ -19,7 +19,7 @@ class AltitudeModel:
|
|||||||
alt_b (float, optional): The reference altitude in meters. Defaults to 0m.
|
alt_b (float, optional): The reference altitude in meters. Defaults to 0m.
|
||||||
unit (PressUnit, optional): The unit used for all pressure values. Defaults to PressUnit.hPa.
|
unit (PressUnit, optional): The unit used for all pressure values. Defaults to PressUnit.hPa.
|
||||||
"""
|
"""
|
||||||
self.__press_0 = press_b / (1 - (alt_b / 44330)**5255)
|
self.__press_0 = press_b / (1 - (alt_b / 44330)**5.255)
|
||||||
|
|
||||||
def predict(self, press: float, unit: PressUnit = PressUnit.hPa) -> float:
|
def predict(self, press: float, unit: PressUnit = PressUnit.hPa) -> float:
|
||||||
"""Estimates the altitude based on a pressure measurement.
|
"""Estimates the altitude based on a pressure measurement.
|
||||||
@ -41,4 +41,4 @@ class AltitudeModel:
|
|||||||
|
|
||||||
press = to_hpa[unit]
|
press = to_hpa[unit]
|
||||||
|
|
||||||
return 44330 * (1 - (press / self.__press_0)**(1 / 5255))
|
return 44330 * (1 - (press / self.__press_0)**(1 / 5.255))
|
||||||
|
@ -24,8 +24,15 @@ class Observer:
|
|||||||
def _log(self, name: AnyStr, value: Any):
|
def _log(self, name: AnyStr, value: Any):
|
||||||
self._logger.write(name, value, self._get_name())
|
self._logger.write(name, value, self._get_name())
|
||||||
|
|
||||||
def __call__(self) -> ArrayLike:
|
def get_start_value(self) -> ArrayLike:
|
||||||
data = self._dataset.fetch_values(self.__attrs)
|
"""
|
||||||
|
Returns:
|
||||||
|
ArrayLike: Returns the values of the observed attributes at the start of the simulation.
|
||||||
|
"""
|
||||||
|
return self(t=self._dataset.get_start_time())
|
||||||
|
|
||||||
|
def __call__(self, t: float | None = None) -> ArrayLike:
|
||||||
|
data = self._dataset.fetch_values(self.__attrs, t)
|
||||||
|
|
||||||
for attrib, value in zip(self.__attrs, data):
|
for attrib, value in zip(self.__attrs, data):
|
||||||
self._log(attrib, value)
|
self._log(attrib, value)
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
from typing import List
|
from typing import List, AnyStr
|
||||||
|
|
||||||
from numpy.typing import ArrayLike
|
from numpy.typing import ArrayLike
|
||||||
from spatz.dataset import ArrayLike, Dataset
|
from spatz.dataset import ArrayLike, Dataset
|
||||||
@ -8,13 +8,27 @@ from spatz.transforms import Transform
|
|||||||
|
|
||||||
|
|
||||||
class BHI160Gyro(Gyroscope):
|
class BHI160Gyro(Gyroscope):
|
||||||
def __init__(self, dataset: Dataset, logger: Logger, offset: float = 0, transforms: List[Transform] = ...):
|
def __init__(self, dataset: Dataset, logger: Logger, offset: float = 0, transforms: List[Transform] = []):
|
||||||
super().__init__(dataset, logger, offset, transforms)
|
super().__init__(dataset, logger, offset, transforms)
|
||||||
|
|
||||||
|
def _get_name(self) -> AnyStr:
|
||||||
|
return 'BHI160'
|
||||||
|
|
||||||
def _get_data(self) -> ArrayLike:
|
def _get_data(self) -> ArrayLike:
|
||||||
rots = self._dataset.fetch_values(['roll_l', 'pitch_l', 'yaw_l'])
|
rots = self._dataset.fetch_values(['roll_l', 'pitch_l', 'yaw_l'])
|
||||||
|
|
||||||
|
return rots
|
||||||
|
|
||||||
def _sensor_specific_effects(self, x: ArrayLike) -> ArrayLike:
|
def _sensor_specific_effects(self, x: ArrayLike) -> ArrayLike:
|
||||||
return x
|
return x
|
||||||
|
|
||||||
|
|
||||||
|
class BHI160Acc(Accelerometer):
|
||||||
|
def __init__(self, dataset: Dataset, logger: Logger, offset: float = 0, transforms: List[Transform] = []):
|
||||||
|
super().__init__(dataset, logger, offset, transforms)
|
||||||
|
|
||||||
|
def _get_name(self) -> AnyStr:
|
||||||
|
return 'BHI160'
|
||||||
|
|
||||||
|
def _sensor_specific_effects(self, x: ArrayLike) -> ArrayLike:
|
||||||
|
return x
|
||||||
|
@ -8,8 +8,8 @@ from spatz.transforms import GaussianNoise, Transform
|
|||||||
|
|
||||||
|
|
||||||
class MS5611_01BA03(PressureSensor):
|
class MS5611_01BA03(PressureSensor):
|
||||||
def __init__(self, dataset: Dataset, logger: Logger, transforms: List[Transform] = []):
|
def __init__(self, dataset: Dataset, logger: Logger, transforms: List[Transform] = [], ts_effects=True):
|
||||||
super().__init__(dataset, logger, transforms)
|
super().__init__(dataset, logger, transforms, ts_effects)
|
||||||
|
|
||||||
# Noise model obtained by a test flight using this sensor.
|
# Noise model obtained by a test flight using this sensor.
|
||||||
self.__pad_noise = GaussianNoise(0, 0.03)
|
self.__pad_noise = GaussianNoise(0, 0.03)
|
||||||
|
@ -21,6 +21,9 @@ class PressureSensor(Sensor):
|
|||||||
|
|
||||||
self._ts_effects = ts_effects
|
self._ts_effects = ts_effects
|
||||||
|
|
||||||
|
def set_transsonic_effects(self, active: bool):
|
||||||
|
self._ts_effects = active
|
||||||
|
|
||||||
def _get_data(self) -> float:
|
def _get_data(self) -> float:
|
||||||
x = self._dataset.get_pressure()
|
x = self._dataset.get_pressure()
|
||||||
|
|
||||||
|
@ -39,13 +39,18 @@ class Sensor:
|
|||||||
@abstractmethod
|
@abstractmethod
|
||||||
def _get_data(self) -> ArrayLike | float:
|
def _get_data(self) -> ArrayLike | float:
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
def get_init_data() -> ArrayLike:
|
||||||
|
pass
|
||||||
|
|
||||||
def __call__(self) -> ArrayLike | float:
|
def __call__(self) -> ArrayLike | float:
|
||||||
out = self._get_data()
|
out = self._get_data()
|
||||||
out = self._sensor_specific_effects(out)
|
out = self._sensor_specific_effects(out)
|
||||||
|
|
||||||
|
t = self._dataset.get_time()
|
||||||
|
|
||||||
for transform in self._transforms:
|
for transform in self._transforms:
|
||||||
out = transform(out)
|
out = transform(t, out)
|
||||||
|
|
||||||
# Log the outputs of the sensor.
|
# Log the outputs of the sensor.
|
||||||
if np.isscalar(out):
|
if np.isscalar(out):
|
||||||
|
@ -35,7 +35,6 @@ class UniformTimeSteps:
|
|||||||
return self.__dt + noise
|
return self.__dt + noise
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class Simulation:
|
class Simulation:
|
||||||
def __init__(self, time_steps=UniformTimeSteps(0.01)):
|
def __init__(self, time_steps=UniformTimeSteps(0.01)):
|
||||||
self.__dataset = None
|
self.__dataset = None
|
||||||
@ -123,4 +122,3 @@ class Simulation:
|
|||||||
|
|
||||||
return self.__sensors[-1]
|
return self.__sensors[-1]
|
||||||
|
|
||||||
|
|
@ -26,4 +26,9 @@ class Downtime(Transform):
|
|||||||
if self.__state == 1:
|
if self.__state == 1:
|
||||||
return x
|
return x
|
||||||
|
|
||||||
return np.zeros_like(x)
|
if np.isscalar(x):
|
||||||
|
x = 0
|
||||||
|
else:
|
||||||
|
x = np.zeros_like(x)
|
||||||
|
|
||||||
|
return x
|
||||||
|
1066
tests.ipynb
Normal file
1066
tests.ipynb
Normal file
File diff suppressed because one or more lines are too long
Loading…
x
Reference in New Issue
Block a user