From 29db73ccd451d3ab12118a28303ba27ad7526083 Mon Sep 17 00:00:00 2001 From: dario Date: Sat, 13 Jan 2024 22:54:13 +0100 Subject: [PATCH] Better logging and added BHI160 sensor --- spatz/logger.py | 16 ++++++++++------ spatz/models/kalman.py | 11 ++++++++--- spatz/sensors/imu/bhi160.py | 20 +++++++++++++++++--- spatz/sensors/sensor.py | 4 +++- spatz/simulation.py | 2 -- spatz/transforms/failures.py | 7 ++++++- 6 files changed, 44 insertions(+), 16 deletions(-) diff --git a/spatz/logger.py b/spatz/logger.py index 1ae1aa2..07862fa 100644 --- a/spatz/logger.py +++ b/spatz/logger.py @@ -1,7 +1,7 @@ import numpy as np import pandas as pd -from typing import Any, Tuple +from typing import Any, Tuple, List from numpy.typing import ArrayLike from abc import abstractmethod @@ -56,7 +56,7 @@ class Logger(Advanceable): def _on_reset(self): self.__df = pd.DataFrame.from_dict({'time': [self.get_time()]}).astype(np.float64) - def write(self, attrib: str, value: Any, domain: str = 'all'): + def write(self, attrib: str | List[str], value: Any | List[Any] | List[ArrayLike], domain: str = 'all'): """Writes a value to the logger. Args: @@ -64,12 +64,16 @@ class Logger(Advanceable): value (Any): The value to log. domain (str, optional): The domain the value belongs to. Defaults to 'any'. """ - name = f'{domain}/{attrib}' + if not isinstance(attrib, str): + for attr, val in zip(attrib, value): + self.write(attr, val, domain=domain) + else: + name = f'{domain}/{attrib}' - if name not in self.__df.columns: - self.__df[name] = pd.Series([pd.NA] * len(self.__df)) + if name not in self.__df.columns: + self.__df[name] = pd.Series([pd.NA] * len(self.__df)) - self.__df.at[self.__idx, name] = value + self.__df.at[self.__idx, name] = value def get_dataframe(self) -> pd.DataFrame: return self.__df \ No newline at end of file diff --git a/spatz/models/kalman.py b/spatz/models/kalman.py index 31674bb..35708ad 100644 --- a/spatz/models/kalman.py +++ b/spatz/models/kalman.py @@ -6,6 +6,9 @@ from numpy.typing import ArrayLike def inv(val): if np.isscalar(val): + if val == 0: + return 0 + return 1 / val if len(val) == 1: @@ -76,6 +79,8 @@ class KalmanFilter: H = self.__H R = self.__R + n = len(x) + if hasattr(self.__H, '__call__'): H = self.__H(dt) @@ -86,9 +91,9 @@ class KalmanFilter: K = err @ H.T @ inv(H @ err @ H.T + R) # Compute the corrected state. - x = x + K @ (z - H @ x) + x = x + (K @ (z - H @ x)).T # Compute the error after correction. - err = (np.identity('TODO') - K @ H) @ err + err = (np.identity(n) - K @ H) @ err - return x, err \ No newline at end of file + return np.squeeze(np.asarray(x)), err \ No newline at end of file diff --git a/spatz/sensors/imu/bhi160.py b/spatz/sensors/imu/bhi160.py index 1d3fb58..697fc56 100644 --- a/spatz/sensors/imu/bhi160.py +++ b/spatz/sensors/imu/bhi160.py @@ -1,4 +1,4 @@ -from typing import List +from typing import List, AnyStr from numpy.typing import ArrayLike from spatz.dataset import ArrayLike, Dataset @@ -8,13 +8,27 @@ from spatz.transforms import Transform class BHI160Gyro(Gyroscope): - def __init__(self, dataset: Dataset, logger: Logger, offset: float = 0, transforms: List[Transform] = ...): + def __init__(self, dataset: Dataset, logger: Logger, offset: float = 0, transforms: List[Transform] = []): super().__init__(dataset, logger, offset, transforms) + def _get_name(self) -> AnyStr: + return 'BHI160' + def _get_data(self) -> ArrayLike: rots = self._dataset.fetch_values(['roll_l', 'pitch_l', 'yaw_l']) - + return rots def _sensor_specific_effects(self, x: ArrayLike) -> ArrayLike: return x + + +class BHI160Acc(Accelerometer): + def __init__(self, dataset: Dataset, logger: Logger, offset: float = 0, transforms: List[Transform] = []): + super().__init__(dataset, logger, offset, transforms) + + def _get_name(self) -> AnyStr: + return 'BHI160' + + def _sensor_specific_effects(self, x: ArrayLike) -> ArrayLike: + return x diff --git a/spatz/sensors/sensor.py b/spatz/sensors/sensor.py index 2cf0276..039b148 100644 --- a/spatz/sensors/sensor.py +++ b/spatz/sensors/sensor.py @@ -47,8 +47,10 @@ class Sensor: out = self._get_data() out = self._sensor_specific_effects(out) + t = self._dataset.get_time() + for transform in self._transforms: - out = transform(out) + out = transform(t, out) # Log the outputs of the sensor. if np.isscalar(out): diff --git a/spatz/simulation.py b/spatz/simulation.py index de2418c..ae3dc4e 100644 --- a/spatz/simulation.py +++ b/spatz/simulation.py @@ -35,7 +35,6 @@ class UniformTimeSteps: return self.__dt + noise - class Simulation: def __init__(self, time_steps=UniformTimeSteps(0.01)): self.__dataset = None @@ -123,4 +122,3 @@ class Simulation: return self.__sensors[-1] - \ No newline at end of file diff --git a/spatz/transforms/failures.py b/spatz/transforms/failures.py index 2bfc259..6b0cd01 100644 --- a/spatz/transforms/failures.py +++ b/spatz/transforms/failures.py @@ -26,4 +26,9 @@ class Downtime(Transform): if self.__state == 1: return x - return np.zeros_like(x) + if np.isscalar(x): + x = 0 + else: + x = np.zeros_like(x) + + return x