Added more sensors for STAHR

This commit is contained in:
dario 2024-10-04 11:11:19 +02:00
parent 4f100f0bd4
commit 5479773e32
9 changed files with 194 additions and 38 deletions

View File

@ -50,6 +50,7 @@ class Accelerometer(Sensor):
self._orientation = orientation self._orientation = orientation
def _get_data(self) -> ArrayLike | float: def _get_data(self) -> ArrayLike | float:
"""
acc = self._dataset.get_acceleration('global') acc = self._dataset.get_acceleration('global')
self._logger.write('global_ax', acc[0], self._get_name()) self._logger.write('global_ax', acc[0], self._get_name())
@ -70,6 +71,9 @@ class Accelerometer(Sensor):
# Add the effects of the imu's offset. # Add the effects of the imu's offset.
omega = self._dataset.get_angular_velocity() omega = self._dataset.get_angular_velocity()
acc += (np.cross(omega, self._offset) + np.cross(omega, np.cross(omega, self._offset))) acc += (np.cross(omega, self._offset) + np.cross(omega, np.cross(omega, self._offset)))
"""
acc = self._dataset.get_acceleration('local')
return acc return acc

View File

@ -30,7 +30,7 @@ class BMI088Gyro(Gyroscope):
class BMI088Acc(Accelerometer): class BMI088Acc(Accelerometer):
def __init__(self, dataset: Dataset, logger: Logger, orientation = np.identity(3), offset: float = 0, transforms: List[Transform] = []): def __init__(self, dataset: Dataset, logger: Logger, orientation = np.identity(3), offset: float = 0, transforms: List[Transform] = []):
super().__init__(dataset, logger, CoordSystem.RIGHT_HANDED, orientation, offset, transforms) super().__init__(dataset, logger, orientation, offset, transforms)
self.__noise = GaussianNoise(0, 0.05) self.__noise = GaussianNoise(0, 0.05)

View File

@ -0,0 +1,21 @@
import numpy as np
from typing import List, AnyStr
from numpy.typing import ArrayLike
from spatz.dataset import ArrayLike, Dataset
from spatz.logger import ArrayLike, Logger
from spatz.sensors import IMU, Accelerometer, Gyroscope, CoordSystem
from spatz.transforms import Transform, GaussianNoise
class IAM_20380HT(Gyroscope):
def __init__(self, dataset: Dataset, logger: Logger, orientation=np.identity(3), transforms: List[Transform] = []):
super().__init__(dataset, logger, orientation, transforms)
def _get_name(self) -> AnyStr:
return 'IAM-20380HT'
def _sensor_specific_effects(self, x: ArrayLike) -> ArrayLike:
return x

View File

@ -0,0 +1,26 @@
import numpy as np
from typing import List, AnyStr
from numpy.typing import ArrayLike
from spatz.dataset import Dataset
from spatz.logger import Logger
from spatz.sensors import Accelerometer
from spatz.transforms import Transform, GaussianNoise
class SCA3300(Accelerometer):
def __init__(self, dataset: Dataset, logger: Logger, orientation=np.identity(3), offset=0, transforms: List[Transform] = []):
super().__init__(dataset, logger, orientation, offset, transforms)
self.__noise = GaussianNoise(0, 0.001)
def _get_name(self) -> AnyStr:
return 'SCA3300'
def _sensor_specific_effects(self, x: ArrayLike) -> ArrayLike:
x /= 9.81
x[0], x[1] = x[1], x[0]
return self.__noise(0, x)

View File

@ -2,7 +2,7 @@ from abc import abstractmethod
class Advanceable: class Advanceable:
def __init__(self) -> None: def __init__(self, initial_time=0) -> None:
self.reset() self.reset()
def advance(self, dt: float): def advance(self, dt: float):

View File

@ -1,4 +1,5 @@
import numpy as np import numpy as np
import math
from typing import Literal from typing import Literal
from numpy.typing import NDArray from numpy.typing import NDArray
@ -6,48 +7,96 @@ from numpy.typing import NDArray
from spatz.simulations.csv_source import CSVSource from spatz.simulations.csv_source import CSVSource
def T1(angle):
# return Rotation.from_euler('X', angle, degrees=False).as_matrix()
return np.array([
[1, 0, 0],
[0, math.cos(angle), math.sin(angle)],
[0, -math.sin(angle), math.cos(angle)],
])
def T2(angle):
# return Rotation.from_euler('Y', angle, degrees=False).as_matrix()
return np.array([
[math.cos(angle), 0, -math.sin(angle)],
[0, 1, 0],
[math.sin(angle), 0, math.cos(angle)]
])
def T3(angle):
# return Rotation.from_euler('Z', angle, degrees=False).as_matrix()
return np.array([
[math.cos(angle), math.sin(angle), 0],
[-math.sin(angle), math.cos(angle), 0],
[0, 0, 1]
])
class ASTOSSource(CSVSource): class ASTOSSource(CSVSource):
def __init__(self, path: str, interpolation: Literal['linear'] = 'linear') -> None: def __init__(self, path: str, interpolation: Literal['linear'] = 'linear') -> None:
super().__init__(path, 'time', interpolation) super().__init__(path, 'time', interpolation)
def get_length(self) -> float:
pass
def get_position(self) -> NDArray: def get_position(self) -> NDArray:
return self.fetch_values(['x', 'y', 'z']) return self.fetch_values(['x', 'y', 'z'])
def get_velocity(self, frame: Literal['global', 'local']) -> NDArray: def get_velocity(self, frame: Literal['global', 'local']) -> NDArray:
if frame == 'local': vel = self.fetch_values(['vx', 'vy', 'vz'])
pass
return self.fetch_values(['vx', 'vy', 'vz']) if frame == 'local':
return self.global_to_local() @ vel
return vel
def get_acceleration(self, frame: Literal['global', 'local']) -> NDArray: def get_acceleration(self, frame: Literal['global', 'local']) -> NDArray:
if frame == 'local': acc = self.fetch_values(['ax', 'ay', 'az'])
pass
return self.fetch_values(['ax', 'ay', 'az']) if frame == 'local':
return self.global_to_local() @ acc
return acc
def get_attitude(self) -> NDArray: def get_attitude(self) -> NDArray:
pass raise NotImplementedError()
def local_to_global(self) -> NDArray: def local_to_global(self) -> NDArray:
pass return self.global_to_local().T
def global_to_local(self) -> NDArray: def global_to_local(self) -> NDArray:
pass # ASTOS local to body
rots = self.fetch_values(['pitch_l', 'yaw_l', 'roll_l'])
pitch_l, yaw_l, roll_l = rots[0], rots[1], rots[2]
local_to_body = T1(roll_l) @ T2(pitch_l - math.pi/2) @ T1(-yaw_l)
# ASTOS global to launch rail.
init_long = self.fetch_init_value('longitude')
init_lat = self.fetch_init_value('latitude')
global_to_launch_rail = T2(-math.pi/2 - init_lat) @ T3(init_long)
# ASTOS global to local.
decl = self.fetch_value('declination')
long = self.fetch_value('longitude')
t0 = self.get_start_time()
omega_E = (2*math.pi) / (24*60*60)
global_to_local = T2(-decl) @ T3(long + omega_E * t0)
# ASTOS local to launch rail inverted
local_to_launch_rail_inv = global_to_launch_rail @ global_to_local
return local_to_body @ local_to_launch_rail_inv
def get_angular_velocity(self) -> NDArray: def get_angular_velocity(self) -> NDArray:
pass return self.fetch_values(['OMEGA_X', 'OMEGA_Y', 'OMEGA_Z'])
def get_static_pressure(self) -> float: def get_static_pressure(self) -> float:
pass return self.fetch_value('pressure')
def get_longitude(self) -> float: def get_longitude(self) -> float:
pass return self.fetch_value('longitude')
def get_latitude(self) -> float: def get_latitude(self) -> float:
pass return self.fetch_value('latitude')
def get_altitude(self) -> float: def get_altitude(self) -> float:
pass return self.fetch_value('altitude')

View File

@ -15,15 +15,19 @@ class CSVSource(DataSource):
Args: Args:
time_col (str): The name of the column that contains time data. time_col (str): The name of the column that contains time data.
""" """
super().__init__()
self._df = pd.read_csv(path) self._df = pd.read_csv(path)
self._time_col = time_col self._time_col = time_col
self._idx = 0 self._idx = 0
self._interpolation = interpolation self._interpolation = interpolation
self.__init_t = min(self._df[self._time_col])
super().__init__(initial_time=self.__init_t)
def get_length(self) -> float: def get_length(self) -> float:
return max(self._df[self._time_col]) return max(self._df[self._time_col])
def get_start_time(self) -> float:
return self.__init_t
def _on_reset(self): def _on_reset(self):
pass pass
@ -64,7 +68,7 @@ class CSVSource(DataSource):
# Sometimes no time passes in-between two samples. # Sometimes no time passes in-between two samples.
if t_max == t_min: if t_max == t_min:
return self._df.at[name, idx] return self._df.at[idx, name]
# Compute the weight for interpolation. # Compute the weight for interpolation.
alpha = (self.get_time() - t_min) / (t_max - t_min) alpha = (self.get_time() - t_min) / (t_max - t_min)
@ -88,4 +92,26 @@ class CSVSource(DataSource):
Returns: Returns:
np.array: Returns a numpy array containing the requested values in the same order as in the input list. np.array: Returns a numpy array containing the requested values in the same order as in the input list.
""" """
return np.asarray([self.fetch_value(name, t, custom_interpolation) for name in names]) return np.asarray([self.fetch_value(name, t, custom_interpolation) for name in names])
def fetch_init_value(self, name: str, custom_interpolation=None) -> float:
"""Get a specific start value from the dataframe.
Args:
name (str): Name of the value to get.
Returns:
float: Returns the requested value.
"""
return self.fetch_value(name, t=self.__init_t, custom_interpolation=custom_interpolation)
def fetch_init_values(self, names: List[str], custom_interpolation=None) -> NDArray:
"""Get specific start values from the dataframe.
Args:
names (List[str]): Names of the values to get.
Returns:
NDArray: Returns a numpy array containing the requested values in the same order as in the input list.
"""
return self.fetch_values(names, t=self.__init_t, custom_interpolation=custom_interpolation)

View File

@ -10,8 +10,8 @@ from ambiance import Atmosphere
class DataSource(Advanceable): class DataSource(Advanceable):
def __init__(self) -> None: def __init__(self, initial_time=0) -> None:
super().__init__() super().__init__(initial_time=initial_time)
def get_speed_of_sound(self) -> float: def get_speed_of_sound(self) -> float:
return Atmosphere(self.get_altitude()).speed_of_sound return Atmosphere(self.get_altitude()).speed_of_sound
@ -22,52 +22,55 @@ class DataSource(Advanceable):
return speed / self.get_speed_of_sound() return speed / self.get_speed_of_sound()
def get_temperature(self) -> float: def get_temperature(self) -> float:
return Atmosphere(self.get_altitude()).temperature return Atmosphere(self.get_altitude()).temperature[0]
def get_start_time(self) -> float:
return 0
@abstractmethod @abstractmethod
def get_length(self) -> float: def get_length(self) -> float:
pass raise NotImplementedError()
@abstractmethod @abstractmethod
def get_position(self) -> NDArray: def get_position(self) -> NDArray:
pass raise NotImplementedError()
@abstractmethod @abstractmethod
def get_velocity(self, frame: Literal['global', 'local']) -> NDArray: def get_velocity(self, frame: Literal['global', 'local']) -> NDArray:
pass raise NotImplementedError()
@abstractmethod @abstractmethod
def get_acceleration(self, frame: Literal['global', 'local']) -> NDArray: def get_acceleration(self, frame: Literal['global', 'local']) -> NDArray:
pass raise NotImplementedError()
@abstractmethod @abstractmethod
def get_attitude(self) -> NDArray: def get_attitude(self) -> NDArray:
pass raise NotImplementedError()
@abstractmethod @abstractmethod
def local_to_global(self) -> NDArray: def local_to_global(self) -> NDArray:
pass raise NotImplementedError()
@abstractmethod @abstractmethod
def global_to_local(self) -> NDArray: def global_to_local(self) -> NDArray:
pass raise NotImplementedError()
@abstractmethod @abstractmethod
def get_angular_velocity(self) -> NDArray: def get_angular_velocity(self) -> NDArray:
pass raise NotImplementedError()
@abstractmethod @abstractmethod
def get_static_pressure(self) -> float: def get_static_pressure(self) -> float:
pass raise NotImplementedError()
@abstractmethod @abstractmethod
def get_longitude(self) -> float: def get_longitude(self) -> float:
pass raise NotImplementedError()
@abstractmethod @abstractmethod
def get_latitude(self) -> float: def get_latitude(self) -> float:
pass raise NotImplementedError()
@abstractmethod @abstractmethod
def get_altitude(self) -> float: def get_altitude(self) -> float:
pass raise NotImplementedError()

View File

@ -0,0 +1,27 @@
import numpy as np
from typing import Literal
from numpy.typing import NDArray
from spatz.simulations.csv_source import CSVSource
class TeleMega(CSVSource):
def __init__(self, path: str, interpolation: Literal['linear'] = 'linear') -> None:
super().__init__(path, 'time', interpolation)
def get_altitude(self) -> float:
return self.fetch_value('altitude')
def get_acceleration(self, frame: Literal['global', 'local']) -> NDArray:
acc_local = self.fetch_values(['accel_x', 'accel_y', 'accel_z'])
if frame == 'global':
raise NotImplementedError()
return acc_local
def get_angular_velocity(self) -> NDArray:
return self.fetch_values(['gyro_roll', 'gyro_pitch', 'gyro_yaw'])
def get_static_pressure(self) -> float:
return self.fetch_value('pressure')