diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..e69de29 diff --git a/spatz/__init__.py b/spatz/__init__.py new file mode 100644 index 0000000..d63e346 --- /dev/null +++ b/spatz/__init__.py @@ -0,0 +1,11 @@ +import spatz.connections as connections +from connections import * + +import spatz.utils as utils +from utils import * + +import spatz.transforms as transforms +from spatz.transforms import * + +from spatz.dataset import * +from spatz.simulation import * diff --git a/spatz/connections/__init__.py b/spatz/connections/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/spatz/connections/serial.py b/spatz/connections/serial.py new file mode 100644 index 0000000..e69de29 diff --git a/spatz/dataset.py b/spatz/dataset.py new file mode 100644 index 0000000..3b3a5cc --- /dev/null +++ b/spatz/dataset.py @@ -0,0 +1,337 @@ +import math + +import numpy as np +import pandas as pd + +from enum import Enum +from typing import List +from numpy.typing import ArrayLike +from scipy.spatial.transform import Rotation + +from logger import Advanceable + + +class Phase(Enum): + ONPAD = 1 + LOI = 2 + RCI = 3 + ECI = 4 + ADI = 5 + + +class Dataset(Advanceable): + def __init__(self, path: str, interpolation: str = 'linear'): + """A wrapper class for a Pandas dataframe containing simulation data. + + Args: + df (pd.DataFrame): A Pandas dataframe containing simulation data. + interpolation (str, optional): The interpolation method for obtaining new data points. Defaults to 'linear'. + """ + super().__init__() + + self.__df = pd.read_csv(path) + self.__idx = 0 + self.__interpolation = interpolation + + # Find the liftoff time. + self.__loi = self.__df['time'][self.__df['phase'] == Phase.LOI].min() + self.__rci = self.__df['time'][self.__df['phase'] == Phase.RCI].min() + self.__eci = self.__df['time'][self.__df['phase'] == Phase.ECI].min() + self.__adi = self.__df['time'][self.__df['phase'] == Phase.ADI].min() + + def _on_reset(self): + pass + + def _on_step(self, _: float): + idx = (self.__df['time'] - self.get_time()).abs().idxmin() + self.__idx = idx if self.__df['time'].loc[idx] < self.get_time() else idx - 1 + + def get_phase(self) -> Phase: + """ + Returns: + Phase: Get the current phase of the flight. + """ + t = self.get_time() + + if t < self.__loi: + return Phase.ONPAD + + if t < self.__rci: + return Phase.LOI + + if t < self.__eci: + return Phase.RCI + + if t < self.__adi: + return Phase.ECI + + return Phase.ADI + + def get_time_until(self, phase: Phase) -> float: + """Returns how much time is left until the given phase is reached. + Negative values represent the time that has passed since the phase was + reached. + + Args: + phase (Phase): A phase of the flight. + + Returns: + float: Time until or since the phase was reached. + """ + t = self.get_time() + + switch = { + Phase.ONPAD: 0 - t, + Phase.LOI: self.__loi - t, + Phase.RCI: self.__rci - t, + Phase.ECI: self.__eci - t, + Phase.ADI: self.__adi - t + } + + return switch.get(phase) + + def get_length(self) -> float: + """Returns the time horizon of this dataset. + + Returns: + float: The last time step in the dataset. + """ + return max(self.__df['time']) + + @staticmethod + def T1(angle): + # return Rotation.from_euler('X', angle, degrees=False).as_matrix() + return np.array([ + [1, 0, 0], + [0, math.cos(angle), math.sin(angle)], + [0, -math.sin(angle), math.cos(angle)], + ]) + + @staticmethod + def T2(angle): + # return Rotation.from_euler('Y', angle, degrees=False).as_matrix() + return np.array([ + [math.cos(angle), 0, -math.sin(angle)], + [0, 1, 0], + [math.sin(angle), 0, math.cos(angle)] + ]) + + @staticmethod + def T3(angle): + # return Rotation.from_euler('Z', angle, degrees=False).as_matrix() + return np.array([ + [math.cos(angle), math.sin(angle), 0], + [-math.sin(angle), math.cos(angle), 0], + [0, 0, 1] + ]) + + def local_to_body(self) -> ArrayLike: + """ + Returns: + ArrayLike: The current transformation matrix from local to body-fixed coords. + """ + # Get the rotation in the local coordinate system. + rots = self.__fetch_values(['pitch_l', 'yaw_l', 'roll_l']) + pitch_l, yaw_l, roll_l = rots[0], rots[1], rots[2] + + return self.T1(roll_l) @ self.T2(pitch_l - math.pi/2) @ self.T1(-yaw_l) + + def global_to_local(self) -> ArrayLike: + """ + Returns: + ArrayLike: The current transformation matrix from global to local coords. + """ + decl = self.__fetch_value('declination') + long = self.__fetch_value('longitude') + t0 = self.__df['time'].iloc[0] + + omega_E = (2*math.pi) / (24*60*60) + + return self.T2(-decl) @ self.T3(long + omega_E * t0) + + def global_to_launch_rail(self) -> ArrayLike: + """ + Returns: + ArrayLike: The current transformation matrix from global to launch rail coords. + """ + init_long = self.__df['longitude'].iloc[0] + init_lat = self.__df['latitude'].iloc[0] + + return self.T2(-math.pi/2 - init_lat) @ self.T3(init_long) + + def local_to_launch_rail(self) -> ArrayLike: + """ + Returns: + ArrayLike: The current transformation matrix from local to launch rail coords. + """ + return self.global_to_launch_rail() @ np.linalg.inv(self.global_to_local()) + + def launch_rail_to_body(self) -> ArrayLike: + """ + Returns: + ArrayLike: The current transformation matrix from launch rail to local coords. + """ + return self.local_to_body() @ np.linalg.inv(self.local_to_launch_rail()) + + def is_transsonic(self) -> bool: + """ + Returns: + bool: Returns True if the rocket is flying with transsonic speed at the current time of the simulation. + """ + mach = self.get_mach_number() + + return mach > 0.8 and mach < 1.2 + + def is_supersonic(self) -> bool: + """ + Returns: + bool: True if the rocket is flying with supersonic speed at the current time of the simulation. + """ + return self.get_mach_number() > 1 + + def __fetch_value(self, name: str) -> float: + """Get a specific value from the dataframe. + + Args: + name (str): The name of the value to fetch. + + Returns: + float: Returns the requested value. + """ + if self.__interpolation == 'linear': + t_min = self.__df['time'].iloc[self.__idx] + t_max = self.__df['time'].iloc[self.__idx + 1] + + # Sometimes no time passes in-between two samples. + if t_max == t_min: + return self.__df[name].iloc[self.__idx] + + # Compute the weight for interpolation. + alpha = (self.get_time() - t_min) / (t_max - t_min) + + # Interpolate linearly between the two data points. + return (1 - alpha) * self.__df[name].iloc[self.__idx] + alpha * self.__df[name].iloc[self.__idx + 1] + + def __fetch_values(self, names: List[str]) -> np.array: + """Get specific values from the dataframe. + + Args: + names (List[str]): Names of the values to get. + + Returns: + np.array: Returns a numpy array containing the requested values in the same order as in the input list. + """ + return np.asarray([self.__fetch_value(name) for name in names]) + + def get_velocity(self) -> float: + """ + Returns: + np.array: Returns the velocity at the current time of the simulation. + """ + return self.__fetch_value('velocity') + + def get_acceleration(self, frame='FL') -> ArrayLike: + """_summary_ + + Args: + frame (str, optional): _description_. Defaults to 'FL'. + + Returns: + ArrayLike: _description_ + """ + acc = self.__fetch_values(['ax', 'ay', 'az']) + + if frame == 'B': + return self.launch_rail_to_body() @ acc + + return acc + + def get_angular_velocities(self) -> ArrayLike: + """ + Returns: + ArrayLike: Gets the derivatives in angular velocity across all axes of the rocket. + """ + return self.__fetch_values(['omega_X', 'omega_Y', 'omega_Z']) + + def get_velocity(self, frame='FL') -> ArrayLike: + """ + Args: + frame (str, optional): _description_. Defaults to 'FL'. + + Returns: + ArrayLike: _description_ + """ + + vel = self.__fetch_values(['vx', 'vy', 'vz']) + + if frame == 'B': + return self.launch_rail_to_body() @ vel + + return vel + + def get_mach_number(self) -> float: + """ + Returns: + float: Returns the mach number at the current time of the simulation. + """ + return self.__fetch_value('mach') + + def get_speed_of_sound(self) -> float: + """ + Returns: + float: Returns the speed of sound at the current time of the simulation. + """ + return self.__fetch_value('speedofsound') + + def get_rotation_rates(self) -> np.array: + """ + Returns: + np.array: Returns the rotation rates at the current time of the simulation. + """ + return self.__fetch_values(['OMEGA_X', 'OMEGA_Y', 'OMEGA_Z']) + + def get_rotation(self) -> np.array: + """ + Returns: + np.array: _description_ + """ + return self.__fetch_values(['pitch_l', 'yaw_l', 'roll_l']) + + def get_temperature(self) -> float: + """ + Returns: + np.array: Returns the temperature at the current time of the simulation. + """ + return self.__fetch_value('temperature') + + def get_pressure(self) -> float: + """ + Returns: + np.array: Returns the pressure at the current time of the simulation. + """ + return self.__fetch_value('pressure') + + def get_thrust(self) -> float: + """ + Returns: + float: Returns the thrust value for the current time of the simulation. + """ + return self.__fetch_value('thrust') + + def get_drag(self) -> float: + """ + Returns: + float: Returns the drag value for the current time of the simulation. + """ + return self.__fetch_value('drag') + + def get_mass(self) -> float: + """ + Returns: + float: Returns the mass value for the current time of the simulation. + """ + return self.__fetch_value('mass') + + +if __name__ == '__main__': + pass \ No newline at end of file diff --git a/spatz/logger.py b/spatz/logger.py new file mode 100644 index 0000000..5c9de3d --- /dev/null +++ b/spatz/logger.py @@ -0,0 +1,75 @@ +import numpy as np +import pandas as pd + +from typing import Any, Tuple +from numpy.typing import ArrayLike + +from abc import abstractmethod + + +class Advanceable: + def __init__(self) -> None: + self.reset() + + def step(self, dt: float): + """Advances the simulation data in time. + + Args: + dt (float): The step in time to make. + """ + self.__t += dt + self._on_step(dt) + + def reset(self): + """ + Reset the Avanceable object to its initial state. + """ + self.__t = 0 + self._on_reset() + + @abstractmethod + def _on_step(self, dt: float): + pass + + @abstractmethod + def _on_reset(self): + pass + + def get_time(self) -> float: + """ + Returns: + float: Returns the current time of the Advanceable. + """ + return self.__t + + +class Logger(Advanceable): + def __init__(self) -> None: + super().__init__() + self.__idx = -1 + + def _on_step(self, _: float): + self.__df = pd.concat([self.__df, pd.Series().copy()], ignore_index=True) + self.__idx += 1 + self.__df.loc[self.__idx, 'time'] = self.get_time() + + def _on_reset(self): + self.__df = pd.DataFrame.from_dict({'time': [self.get_time()]}) + + def write(self, attrib: str, value: Any, domain: str = 'all'): + """Writes a value to the logger. + + Args: + attrib (str): The name of the value to log. + value (Any): The value to log. + domain (str, optional): The domain the value belongs to. Defaults to 'any'. + """ + name = domain + '/' + attrib + + if name not in self.__df.columns: + self.__df[name] = pd.Series([pd.NA] * len(self.__df)) + + self.__df.loc[self.__idx, name] = value + + def get_dataframe(self) -> pd.DataFrame: + return self.__df \ No newline at end of file diff --git a/spatz/sensors/__init__.py b/spatz/sensors/__init__.py new file mode 100644 index 0000000..1cc8b7d --- /dev/null +++ b/spatz/sensors/__init__.py @@ -0,0 +1,6 @@ +from sensor import Sensor +from compound import CompoundSensor + +from imu import Accelerometer +from imu import Gyroscope +from imu import IMU \ No newline at end of file diff --git a/spatz/sensors/compound.py b/spatz/sensors/compound.py new file mode 100644 index 0000000..a40a5fe --- /dev/null +++ b/spatz/sensors/compound.py @@ -0,0 +1,22 @@ +import numpy as np + +from typing import List +from numpy.typing import ArrayLike + +from sensor import Sensor +from spatz.dataset import Dataset, List +from spatz.logger import Logger +from spatz.transforms import Transform + + +class CompoundSensor(Sensor): + def __init__(self, dataset: Dataset, logger: Logger, sensors: List[Sensor], transforms: List[Transform] = []): + super().__init__(dataset, logger, transforms) + + self.__sensors = sensors + + def _get_data(self) -> ArrayLike: + x = np.stack([sensor() for sensor in self.__sensors]) + x = self._sensor_specific_effects(x) + + return x \ No newline at end of file diff --git a/spatz/sensors/imu/__init__.py b/spatz/sensors/imu/__init__.py new file mode 100644 index 0000000..85c7f74 --- /dev/null +++ b/spatz/sensors/imu/__init__.py @@ -0,0 +1,2 @@ +from accelerometer import Accelerometer +from gyroscope import Gyroscope \ No newline at end of file diff --git a/spatz/sensors/imu/accelerometer.py b/spatz/sensors/imu/accelerometer.py new file mode 100644 index 0000000..fdd2848 --- /dev/null +++ b/spatz/sensors/imu/accelerometer.py @@ -0,0 +1,50 @@ +import numpy as np + +from typing import List +from numpy.typing import ArrayLike + +from spatz.sensors import Sensor +from spatz.transforms import Transform +from spatz.dataset import Dataset +from spatz.logger import Logger + + +__all__=[ + 'Accelerometer' +] + + +# Local definition of gravitation +g = 9.81 + + +class Accelerometer(Sensor): + def __init__(self, dataset: Dataset, logger: Logger, offset: float = 0, transforms: List[Transform] = []): + super().__init__(dataset, logger, transforms) + + self._offset = np.array([offset, 0, 0]) + + def _get_data(self) -> ArrayLike | float: + acc = self._dataset.get_acceleration(frame='FL') + acc += np.array([0, 0, g]) + + self._logger.write('FL_x', acc[0], self._get_name()) + self._logger.write('FL_y', acc[1], self._get_name()) + self._logger.write('FL_z', acc[2], self._get_name()) + + # Convert FL to body + acc = self._dataset.launch_rail_to_body() @ acc + + self._logger.write('B_x', acc[0], self._get_name()) + self._logger.write('B_y', acc[1], self._get_name()) + self._logger.write('B_z', acc[2], self._get_name()) + + # Flip axes to sensor's perspective. + acc *= -1 + + # Add the effects of the imu's offset. + omega = self._dataset.get_angular_velocities() + acc += (np.cross(omega, self._offset) + np.cross(omega, np.cross(omega, self._offset))) + + return acc + \ No newline at end of file diff --git a/spatz/sensors/imu/gyroscope.py b/spatz/sensors/imu/gyroscope.py new file mode 100644 index 0000000..ce6df4f --- /dev/null +++ b/spatz/sensors/imu/gyroscope.py @@ -0,0 +1,20 @@ +from numpy.typing import ArrayLike +from typing import List + +from spatz.sensors import Sensor +from spatz.transforms import Transform +from spatz.dataset import Dataset +from spatz.logger import Logger + + +class Gyroscope(Sensor): + def __init__(self, dataset: Dataset, logger: Logger, offset: float = 0, transforms: List[Transform] = []): + super().__init__(dataset, logger, transforms) + + self._offset = offset + + def _get_data(self) -> ArrayLike | float: + # Rotation in rad/sec + x = self._dataset.get_rotation_rates() + + return x diff --git a/spatz/sensors/imu/imu.py b/spatz/sensors/imu/imu.py new file mode 100644 index 0000000..75a7281 --- /dev/null +++ b/spatz/sensors/imu/imu.py @@ -0,0 +1,25 @@ +from typing import List +from spatz.dataset import Dataset, List +from spatz.logger import Logger +from spatz.sensors import CompoundSensor, Accelerometer, Gyroscope +from spatz.sensors.sensor import Sensor +from spatz.transforms import Transform + + +class IMU(CompoundSensor): + def __init__(self, + dataset: Dataset, + logger: Logger, + acc: Accelerometer, + gyro: Gyroscope, + transforms: List[Transform] = []): + """_summary_ + + Args: + dataset (Dataset): _description_ + logger (Logger): _description_ + acc (Accelerometer): _description_ + gyro (Gyroscope): _description_ + transforms (List[Transform], optional): _description_. Defaults to []. + """ + super().__init__(dataset, logger, [acc, gyro], transforms) diff --git a/spatz/sensors/imu/wsen_isds.py b/spatz/sensors/imu/wsen_isds.py new file mode 100644 index 0000000..8e17c87 --- /dev/null +++ b/spatz/sensors/imu/wsen_isds.py @@ -0,0 +1,54 @@ +import numpy as np + +from typing import AnyStr, List +from numpy.typing import ArrayLike + +from spatz.sensors import Accelerometer, Gyroscope, IMU +from spatz.transforms import Transform, GaussianNoise +from spatz.dataset import Dataset +from spatz.logger import Logger + + +class WSEN_ISDS(IMU): + pass + + +class WSEN_ISDS_ACC(Accelerometer): + def __init__(self, dataset: Dataset, logger: Logger, offset: float, transforms: List[Transform] = []): + super().__init__(dataset, logger, offset, transforms) + + self.__variance = 0.05 + self.__noise = GaussianNoise(np.zeros(3), np.identity(3) * self.__variance) + + def _get_name(self) -> AnyStr: + return 'WSEN_ISDS' + + def _sensor_specific_effects(self, x: ArrayLike) -> ArrayLike: + t = self._dataset.get_time() + + # Apply noise to the true values. + y = self.__noise(t, x) + noise = y - x + + # Log the chosen noise values. + self._logger.write('acc_x_noise', noise[0], self._get_name()) + self._logger.write('acc_y_noise', noise[1], self._get_name()) + self._logger.write('acc_z_noise', noise[2], self._get_name()) + + return y + + +class WSEN_ISDS_GYRO(Gyroscope): + def __init__(self, dataset: Dataset, logger: Logger, offset: float, transforms: List[Transform] = []): + super().__init__(dataset, logger, offset, transforms) + + def _get_name(self) -> AnyStr: + return 'WSEN_ISDS' + + def _sensor_specific_effects(self, x: ArrayLike) -> ArrayLike: + # Convert to degrees per second. + x = (x / np.pi) * 180 + + # TODO: Noise model. + + return x diff --git a/spatz/sensors/pressure/__init__.py b/spatz/sensors/pressure/__init__.py new file mode 100644 index 0000000..34e714b --- /dev/null +++ b/spatz/sensors/pressure/__init__.py @@ -0,0 +1,2 @@ +from pressure import PressureSensor +from ms5611_01ba03 import MS5611_01BA03 \ No newline at end of file diff --git a/spatz/sensors/pressure/ms5611_01ba03.py b/spatz/sensors/pressure/ms5611_01ba03.py new file mode 100644 index 0000000..58aafc8 --- /dev/null +++ b/spatz/sensors/pressure/ms5611_01ba03.py @@ -0,0 +1,33 @@ +from typing import List, AnyStr +from numpy.typing import ArrayLike + +from pressure import PressureSensor + +from spatz.dataset import Dataset, Phase +from spatz.logger import Logger +from spatz.transforms import GaussianNoise, Transform + + +class MS5611_01BA03(PressureSensor): + def __init__(self, dataset: Dataset, logger: Logger, transforms: List[Transform] = []): + super().__init__(dataset, logger, transforms) + + # Noise model obtained by a test flight using this sensor. + self.__pad_noise = GaussianNoise(0, 0.03) + self.__flight_noise = GaussianNoise(0, 1.5) + + def _get_name(self) -> AnyStr: + return 'MS5611_01BA03' + + def _sensor_specific_effects(self, x: ArrayLike | float) -> ArrayLike | float: + t = self._dataset.get_time() + + # Transform from Pa to hPa + x /= 1e2 + + noisy = self.__pad_noise(t, x) if self._dataset.get_phase() == Phase.ONPAD else self.__flight_noise(t, x) + + # Log the noise added to the pressure measurements. + self._logger.write('noise', noisy - x, domain=self._get_name()) + + return noisy diff --git a/spatz/sensors/pressure/pressure.py b/spatz/sensors/pressure/pressure.py new file mode 100644 index 0000000..1627f33 --- /dev/null +++ b/spatz/sensors/pressure/pressure.py @@ -0,0 +1,46 @@ +import math +import numpy as np + +from typing import List + +from spatz.sensors import Sensor +from spatz.logger import Logger +from spatz.dataset import Dataset +from spatz.transforms import Transform + + +class PressureSensor(Sensor): + def __init__(self, dataset: Dataset, logger: Logger, transforms: List[Transform] = [], ts_effects=True): + """ + Args: + dataset (Dataset): A dataset instance. + transforms (List[Transform], optional): Transforms to apply to the sensor outputs. Defaults to []. + ts_effects (bool, optional): If True, models transsonic effects. Defaults to True. + """ + super(PressureSensor, self).__init__(dataset, logger, transforms) + + self._ts_effects = ts_effects + + def _get_data(self) -> float: + x = self._dataset.get_pressure() + + if self._ts_effects: + # Pre-defined constants. + _p = 3e6 + sigma = 40 + + # How far away from transsonic speed (mach 1) are we? + vvec = self._dataset.get_velocity() + dv = np.abs(np.linalg.norm(vvec) - self._dataset.get_speed_of_sound()) + + # Model transsonic effects by a peak at mach 1 which decays the further we are away from it. + ts_eff = _p * math.exp(-0.5* (dv / sigma)**2 ) / (sigma * math.sqrt(2*math.pi)) + + # Log the values for the transsonic effect. + self._logger.write('ts_effects', ts_eff, domain=self._get_name()) + self._logger.write('mach_no', self._dataset.get_mach_number(), domain='mach') + self._logger.write('speedofsound', self._dataset.get_speed_of_sound(), domain='mach') + + x = x + ts_eff + + return x diff --git a/spatz/sensors/sensor.py b/spatz/sensors/sensor.py new file mode 100644 index 0000000..dd6e6c2 --- /dev/null +++ b/spatz/sensors/sensor.py @@ -0,0 +1,64 @@ +import math +import numpy as np + +from abc import abstractmethod +from typing import List, AnyStr +from numpy.typing import ArrayLike + +from spatz.transforms import * +from spatz.logger import * +from spatz.dataset import * + + +class Sensor: + def __init__(self, dataset: Dataset, logger: Logger, transforms: List[Transform] = []): + self._dataset = dataset + self._logger = logger + self._transforms = transforms + + def set_dataset(self, dataset: Dataset): + self._dataset = dataset + + def set_logger(self, logger: Logger): + self._logger = logger + + def _log(self, name: AnyStr, value: Any): + self._logger.write(name, value, self._get_name()) + + @abstractmethod + def _get_name(self) -> AnyStr: + raise NotImplementedError() + + @abstractmethod + def _sensor_specific_effects(self, x: ArrayLike | float) -> ArrayLike | float: + raise NotImplementedError() + + @abstractmethod + def _get_data(self) -> ArrayLike | float: + raise NotImplementedError() + + def __call__(self) -> ArrayLike | float: + out = self._get_data() + out = self._sensor_specific_effects(out) + + for transform in self._transforms: + out = transform(out) + + # Log the outputs of the sensor. + if np.isscalar(out): + self._log('out', out) + else: + for i in range(len(out)): + self._log(f'out_{i}', out[i]) + + return out + + +class CompoundSensor(Sensor): + def __init__(self, sensors: List[Sensor]): + super(CompoundSensor, self).__init__(None) + + self.__sensors = sensors + + def _get_data(self) -> ArrayLike: + return np.stack([sensor() for sensor in self.__sensors]) \ No newline at end of file diff --git a/spatz/sensors/temperature/__init__.py b/spatz/sensors/temperature/__init__.py new file mode 100644 index 0000000..b5ff6cd --- /dev/null +++ b/spatz/sensors/temperature/__init__.py @@ -0,0 +1 @@ +from temperature import TemperatureSensor \ No newline at end of file diff --git a/spatz/sensors/temperature/temperature.py b/spatz/sensors/temperature/temperature.py new file mode 100644 index 0000000..e436991 --- /dev/null +++ b/spatz/sensors/temperature/temperature.py @@ -0,0 +1,13 @@ +from typing import List + +from spatz.sensors import Sensor +from spatz.dataset import Dataset +from spatz.transforms import Transform + + +class TemperatureSensor(Sensor): + def __init__(self, dataset: Dataset, transforms: List[Transform] = []): + super(TemperatureSensor, self).__init__(dataset, transforms) + + def _get_data(self) -> float: + return self._dataset.get_temperature() \ No newline at end of file diff --git a/spatz/simulation.py b/spatz/simulation.py new file mode 100644 index 0000000..05e8f3b --- /dev/null +++ b/spatz/simulation.py @@ -0,0 +1,99 @@ +from typing import List +from numpy.random import normal +from tqdm import tqdm + +from spatz.dataset import Dataset +from spatz.logger import Logger +from spatz.sensors import Sensor +from dataset import Dataset +from logger import Logger +from sensor import Sensor + + +class UniformTimeSteps: + def __init__(self, dt: float, mu: float = 0, sigma: float = 0, delay_only=True) -> None: + """_summary_ + + Args: + dt (float): _description_ + mu (float, optional): _description_. Defaults to 0. + sigma (float, optional): _description_. Defaults to 0. + delay_only (bool, optional): _description_. Defaults to True. + """ + self.__dt = dt + self.__mu = mu + self.__sigma = sigma + self.__delay_only = delay_only + + def __call__(self, t): + noise = normal(self.__mu, self.__sigma) + + if self.__delay_only: + noise = abs(noise) + + return self.__dt + noise + + + +class Simulation: + def __init__(self, time_steps=UniformTimeSteps(0.01)): + self.__dataset = None + self.__logger = None + self.__sensors: List[Sensor] = [] + self.__time_steps = time_steps + + def run(self, verbose=False): + idx = 0 + + # Clear all logs and reset the dataset to the first time step. + self.__dataset.reset() + self.__logger.reset() + + if verbose: + pbar = tqdm(total=self.__dataset.get_length()) + + while True: + t = self.__dataset.get_time() + dt = self.__time_steps(t) + t_ = t + dt + idx += 1 + + if t_ > self.__dataset.get_length(): + break + + self.__dataset.step(dt) + self.__logger.step(dt) + + if verbose: + pbar.update(dt) + + yield idx, t_, t_ - t + + if verbose: + pbar.close() + + def get_dataset(self) -> Dataset: + return self.__dataset + + def get_logger(self) -> Logger: + return self.__logger + + def load(self, path: str): + self.__dataset = Dataset(path) + self.__logger = Logger() + + for sensor in self.__sensors: + sensor.set_dataset(self.__dataset) + sensor.set_logger(self.__logger) + + return self + + def add_sensor(self, sensor, *args, **kwargs) -> Sensor: + assert issubclass(sensor, Sensor), "Expected a subclass of Sensor." + + self.__sensors.append(sensor(self.__dataset, self.__logger, *args, **kwargs)) + + return self.__sensors[-1] + + + \ No newline at end of file diff --git a/spatz/transforms/__init__.py b/spatz/transforms/__init__.py new file mode 100644 index 0000000..89411c1 --- /dev/null +++ b/spatz/transforms/__init__.py @@ -0,0 +1,3 @@ +from failures import * +from noise import * +from transform import * \ No newline at end of file diff --git a/spatz/transforms/failures.py b/spatz/transforms/failures.py new file mode 100644 index 0000000..2bfc259 --- /dev/null +++ b/spatz/transforms/failures.py @@ -0,0 +1,29 @@ +import numpy as np + +from numpy.typing import ArrayLike +from typing import Any, Tuple + +from spatz.transforms import Transform + + +class Downtime(Transform): + def __init__(self, mu_duration: float, sigma_duration: float) -> None: + super().__init__() + + self.__mu = mu_duration + self.__sigma = sigma_duration + self.__state = 1 + self.__until = abs(np.random.normal(mu_duration, sigma_duration)) + + def get_state(self) -> Tuple[int, float]: + return self.__state, self.__until + + def __call__(self, t: float, x: ArrayLike) -> Any: + if t >= self.__until: + self.__state = 1 - self.__state + self.__until = t + abs(np.random.normal(self.__mu, self.__sigma)) + + if self.__state == 1: + return x + + return np.zeros_like(x) diff --git a/spatz/transforms/noise.py b/spatz/transforms/noise.py new file mode 100644 index 0000000..dcb14e3 --- /dev/null +++ b/spatz/transforms/noise.py @@ -0,0 +1,34 @@ +import numpy as np + +from numpy.typing import ArrayLike +from typing import Any, Tuple + +from spatz.transforms import Transform + + +class GaussianNoise(Transform): + def __init__(self, mu: ArrayLike, sigma: ArrayLike) -> None: + super().__init__() + + self.__mu = mu + self.__sigma = sigma + + def __call__(self, t: float, x: ArrayLike) -> ArrayLike: + assert np.shape(self.__mu) == np.shape(x), "Mu and x have to match in shape." + + if np.isscalar(x): + noise = np.random.normal(0, 1) + x += self.__sigma * noise + self.__mu + else: + noise = np.random.normal(0, 1, np.shape(x)) + x += self.__sigma @ noise + self.__mu + + return x + + +class PinkNoise(Transform): + def __init__(self) -> None: + super().__init__() + + def __call__(self, t: float, x: ArrayLike) -> Any: + pass diff --git a/spatz/transforms/transform.py b/spatz/transforms/transform.py new file mode 100644 index 0000000..24d7498 --- /dev/null +++ b/spatz/transforms/transform.py @@ -0,0 +1,12 @@ +import numpy as np + +from numpy.typing import ArrayLike +from typing import Any, Tuple + + +class Transform: + def apply(self, t: float, x: np.array): + y = self(t, x) + assert x.shape == y.shape, "Transform has to maintain the array's shape." + + return y \ No newline at end of file