From 5479773e325a9fd200ac970360e70188b17d0400 Mon Sep 17 00:00:00 2001 From: dario Date: Fri, 4 Oct 2024 11:11:19 +0200 Subject: [PATCH] Added more sensors for STAHR --- spatz/sensors/imu/accelerometer.py | 4 ++ spatz/sensors/imu/bmi088.py | 2 +- spatz/sensors/imu/iam_20380ht.py | 21 ++++++++ spatz/sensors/imu/sca3300.py | 26 ++++++++++ spatz/simulations/advanceable.py | 2 +- spatz/simulations/astos_source.py | 83 ++++++++++++++++++++++++------ spatz/simulations/csv_source.py | 34 ++++++++++-- spatz/simulations/data_source.py | 33 ++++++------ spatz/simulations/telemega.py | 27 ++++++++++ 9 files changed, 194 insertions(+), 38 deletions(-) create mode 100644 spatz/sensors/imu/iam_20380ht.py create mode 100644 spatz/sensors/imu/sca3300.py create mode 100644 spatz/simulations/telemega.py diff --git a/spatz/sensors/imu/accelerometer.py b/spatz/sensors/imu/accelerometer.py index 91f382f..979916d 100644 --- a/spatz/sensors/imu/accelerometer.py +++ b/spatz/sensors/imu/accelerometer.py @@ -50,6 +50,7 @@ class Accelerometer(Sensor): self._orientation = orientation def _get_data(self) -> ArrayLike | float: + """ acc = self._dataset.get_acceleration('global') self._logger.write('global_ax', acc[0], self._get_name()) @@ -70,6 +71,9 @@ class Accelerometer(Sensor): # Add the effects of the imu's offset. omega = self._dataset.get_angular_velocity() acc += (np.cross(omega, self._offset) + np.cross(omega, np.cross(omega, self._offset))) + """ + + acc = self._dataset.get_acceleration('local') return acc \ No newline at end of file diff --git a/spatz/sensors/imu/bmi088.py b/spatz/sensors/imu/bmi088.py index a71bcde..63a8a3e 100644 --- a/spatz/sensors/imu/bmi088.py +++ b/spatz/sensors/imu/bmi088.py @@ -30,7 +30,7 @@ class BMI088Gyro(Gyroscope): class BMI088Acc(Accelerometer): def __init__(self, dataset: Dataset, logger: Logger, orientation = np.identity(3), offset: float = 0, transforms: List[Transform] = []): - super().__init__(dataset, logger, CoordSystem.RIGHT_HANDED, orientation, offset, transforms) + super().__init__(dataset, logger, orientation, offset, transforms) self.__noise = GaussianNoise(0, 0.05) diff --git a/spatz/sensors/imu/iam_20380ht.py b/spatz/sensors/imu/iam_20380ht.py new file mode 100644 index 0000000..341c9d4 --- /dev/null +++ b/spatz/sensors/imu/iam_20380ht.py @@ -0,0 +1,21 @@ +import numpy as np + +from typing import List, AnyStr + +from numpy.typing import ArrayLike +from spatz.dataset import ArrayLike, Dataset +from spatz.logger import ArrayLike, Logger +from spatz.sensors import IMU, Accelerometer, Gyroscope, CoordSystem +from spatz.transforms import Transform, GaussianNoise + + +class IAM_20380HT(Gyroscope): + def __init__(self, dataset: Dataset, logger: Logger, orientation=np.identity(3), transforms: List[Transform] = []): + super().__init__(dataset, logger, orientation, transforms) + + def _get_name(self) -> AnyStr: + return 'IAM-20380HT' + + def _sensor_specific_effects(self, x: ArrayLike) -> ArrayLike: + return x + \ No newline at end of file diff --git a/spatz/sensors/imu/sca3300.py b/spatz/sensors/imu/sca3300.py new file mode 100644 index 0000000..12b63bf --- /dev/null +++ b/spatz/sensors/imu/sca3300.py @@ -0,0 +1,26 @@ +import numpy as np + +from typing import List, AnyStr + +from numpy.typing import ArrayLike +from spatz.dataset import Dataset +from spatz.logger import Logger +from spatz.sensors import Accelerometer +from spatz.transforms import Transform, GaussianNoise + + +class SCA3300(Accelerometer): + def __init__(self, dataset: Dataset, logger: Logger, orientation=np.identity(3), offset=0, transforms: List[Transform] = []): + super().__init__(dataset, logger, orientation, offset, transforms) + + self.__noise = GaussianNoise(0, 0.001) + + def _get_name(self) -> AnyStr: + return 'SCA3300' + + def _sensor_specific_effects(self, x: ArrayLike) -> ArrayLike: + x /= 9.81 + + x[0], x[1] = x[1], x[0] + + return self.__noise(0, x) \ No newline at end of file diff --git a/spatz/simulations/advanceable.py b/spatz/simulations/advanceable.py index ba9d7f8..e197e3d 100644 --- a/spatz/simulations/advanceable.py +++ b/spatz/simulations/advanceable.py @@ -2,7 +2,7 @@ from abc import abstractmethod class Advanceable: - def __init__(self) -> None: + def __init__(self, initial_time=0) -> None: self.reset() def advance(self, dt: float): diff --git a/spatz/simulations/astos_source.py b/spatz/simulations/astos_source.py index f7a640a..7e67ee1 100644 --- a/spatz/simulations/astos_source.py +++ b/spatz/simulations/astos_source.py @@ -1,4 +1,5 @@ import numpy as np +import math from typing import Literal from numpy.typing import NDArray @@ -6,48 +7,96 @@ from numpy.typing import NDArray from spatz.simulations.csv_source import CSVSource +def T1(angle): + # return Rotation.from_euler('X', angle, degrees=False).as_matrix() + return np.array([ + [1, 0, 0], + [0, math.cos(angle), math.sin(angle)], + [0, -math.sin(angle), math.cos(angle)], + ]) + + +def T2(angle): + # return Rotation.from_euler('Y', angle, degrees=False).as_matrix() + return np.array([ + [math.cos(angle), 0, -math.sin(angle)], + [0, 1, 0], + [math.sin(angle), 0, math.cos(angle)] + ]) + + +def T3(angle): + # return Rotation.from_euler('Z', angle, degrees=False).as_matrix() + return np.array([ + [math.cos(angle), math.sin(angle), 0], + [-math.sin(angle), math.cos(angle), 0], + [0, 0, 1] + ]) + + class ASTOSSource(CSVSource): def __init__(self, path: str, interpolation: Literal['linear'] = 'linear') -> None: super().__init__(path, 'time', interpolation) - def get_length(self) -> float: - pass - def get_position(self) -> NDArray: return self.fetch_values(['x', 'y', 'z']) def get_velocity(self, frame: Literal['global', 'local']) -> NDArray: - if frame == 'local': - pass + vel = self.fetch_values(['vx', 'vy', 'vz']) - return self.fetch_values(['vx', 'vy', 'vz']) + if frame == 'local': + return self.global_to_local() @ vel + + return vel def get_acceleration(self, frame: Literal['global', 'local']) -> NDArray: - if frame == 'local': - pass + acc = self.fetch_values(['ax', 'ay', 'az']) - return self.fetch_values(['ax', 'ay', 'az']) + if frame == 'local': + return self.global_to_local() @ acc + + return acc def get_attitude(self) -> NDArray: - pass + raise NotImplementedError() def local_to_global(self) -> NDArray: - pass + return self.global_to_local().T def global_to_local(self) -> NDArray: - pass + # ASTOS local to body + rots = self.fetch_values(['pitch_l', 'yaw_l', 'roll_l']) + pitch_l, yaw_l, roll_l = rots[0], rots[1], rots[2] + local_to_body = T1(roll_l) @ T2(pitch_l - math.pi/2) @ T1(-yaw_l) + + # ASTOS global to launch rail. + init_long = self.fetch_init_value('longitude') + init_lat = self.fetch_init_value('latitude') + global_to_launch_rail = T2(-math.pi/2 - init_lat) @ T3(init_long) + + # ASTOS global to local. + decl = self.fetch_value('declination') + long = self.fetch_value('longitude') + t0 = self.get_start_time() + omega_E = (2*math.pi) / (24*60*60) + global_to_local = T2(-decl) @ T3(long + omega_E * t0) + + # ASTOS local to launch rail inverted + local_to_launch_rail_inv = global_to_launch_rail @ global_to_local + + return local_to_body @ local_to_launch_rail_inv def get_angular_velocity(self) -> NDArray: - pass + return self.fetch_values(['OMEGA_X', 'OMEGA_Y', 'OMEGA_Z']) def get_static_pressure(self) -> float: - pass + return self.fetch_value('pressure') def get_longitude(self) -> float: - pass + return self.fetch_value('longitude') def get_latitude(self) -> float: - pass + return self.fetch_value('latitude') def get_altitude(self) -> float: - pass \ No newline at end of file + return self.fetch_value('altitude') \ No newline at end of file diff --git a/spatz/simulations/csv_source.py b/spatz/simulations/csv_source.py index d9a032b..bf5f3eb 100644 --- a/spatz/simulations/csv_source.py +++ b/spatz/simulations/csv_source.py @@ -15,15 +15,19 @@ class CSVSource(DataSource): Args: time_col (str): The name of the column that contains time data. """ - super().__init__() - self._df = pd.read_csv(path) self._time_col = time_col self._idx = 0 self._interpolation = interpolation + self.__init_t = min(self._df[self._time_col]) + + super().__init__(initial_time=self.__init_t) def get_length(self) -> float: return max(self._df[self._time_col]) + + def get_start_time(self) -> float: + return self.__init_t def _on_reset(self): pass @@ -64,7 +68,7 @@ class CSVSource(DataSource): # Sometimes no time passes in-between two samples. if t_max == t_min: - return self._df.at[name, idx] + return self._df.at[idx, name] # Compute the weight for interpolation. alpha = (self.get_time() - t_min) / (t_max - t_min) @@ -88,4 +92,26 @@ class CSVSource(DataSource): Returns: np.array: Returns a numpy array containing the requested values in the same order as in the input list. """ - return np.asarray([self.fetch_value(name, t, custom_interpolation) for name in names]) \ No newline at end of file + return np.asarray([self.fetch_value(name, t, custom_interpolation) for name in names]) + + def fetch_init_value(self, name: str, custom_interpolation=None) -> float: + """Get a specific start value from the dataframe. + + Args: + name (str): Name of the value to get. + + Returns: + float: Returns the requested value. + """ + return self.fetch_value(name, t=self.__init_t, custom_interpolation=custom_interpolation) + + def fetch_init_values(self, names: List[str], custom_interpolation=None) -> NDArray: + """Get specific start values from the dataframe. + + Args: + names (List[str]): Names of the values to get. + + Returns: + NDArray: Returns a numpy array containing the requested values in the same order as in the input list. + """ + return self.fetch_values(names, t=self.__init_t, custom_interpolation=custom_interpolation) \ No newline at end of file diff --git a/spatz/simulations/data_source.py b/spatz/simulations/data_source.py index 45e0195..cf1e3ae 100644 --- a/spatz/simulations/data_source.py +++ b/spatz/simulations/data_source.py @@ -10,8 +10,8 @@ from ambiance import Atmosphere class DataSource(Advanceable): - def __init__(self) -> None: - super().__init__() + def __init__(self, initial_time=0) -> None: + super().__init__(initial_time=initial_time) def get_speed_of_sound(self) -> float: return Atmosphere(self.get_altitude()).speed_of_sound @@ -22,52 +22,55 @@ class DataSource(Advanceable): return speed / self.get_speed_of_sound() def get_temperature(self) -> float: - return Atmosphere(self.get_altitude()).temperature + return Atmosphere(self.get_altitude()).temperature[0] + + def get_start_time(self) -> float: + return 0 @abstractmethod def get_length(self) -> float: - pass + raise NotImplementedError() @abstractmethod def get_position(self) -> NDArray: - pass + raise NotImplementedError() @abstractmethod def get_velocity(self, frame: Literal['global', 'local']) -> NDArray: - pass + raise NotImplementedError() @abstractmethod def get_acceleration(self, frame: Literal['global', 'local']) -> NDArray: - pass + raise NotImplementedError() @abstractmethod def get_attitude(self) -> NDArray: - pass + raise NotImplementedError() @abstractmethod def local_to_global(self) -> NDArray: - pass + raise NotImplementedError() @abstractmethod def global_to_local(self) -> NDArray: - pass + raise NotImplementedError() @abstractmethod def get_angular_velocity(self) -> NDArray: - pass + raise NotImplementedError() @abstractmethod def get_static_pressure(self) -> float: - pass + raise NotImplementedError() @abstractmethod def get_longitude(self) -> float: - pass + raise NotImplementedError() @abstractmethod def get_latitude(self) -> float: - pass + raise NotImplementedError() @abstractmethod def get_altitude(self) -> float: - pass \ No newline at end of file + raise NotImplementedError() \ No newline at end of file diff --git a/spatz/simulations/telemega.py b/spatz/simulations/telemega.py new file mode 100644 index 0000000..b1fc52a --- /dev/null +++ b/spatz/simulations/telemega.py @@ -0,0 +1,27 @@ +import numpy as np + +from typing import Literal +from numpy.typing import NDArray + +from spatz.simulations.csv_source import CSVSource + +class TeleMega(CSVSource): + def __init__(self, path: str, interpolation: Literal['linear'] = 'linear') -> None: + super().__init__(path, 'time', interpolation) + + def get_altitude(self) -> float: + return self.fetch_value('altitude') + + def get_acceleration(self, frame: Literal['global', 'local']) -> NDArray: + acc_local = self.fetch_values(['accel_x', 'accel_y', 'accel_z']) + + if frame == 'global': + raise NotImplementedError() + + return acc_local + + def get_angular_velocity(self) -> NDArray: + return self.fetch_values(['gyro_roll', 'gyro_pitch', 'gyro_yaw']) + + def get_static_pressure(self) -> float: + return self.fetch_value('pressure')