Better logging and added BHI160 sensor

This commit is contained in:
dario 2024-01-13 22:54:13 +01:00
parent 4def93041e
commit 29db73ccd4
6 changed files with 44 additions and 16 deletions

View File

@ -1,7 +1,7 @@
import numpy as np import numpy as np
import pandas as pd import pandas as pd
from typing import Any, Tuple from typing import Any, Tuple, List
from numpy.typing import ArrayLike from numpy.typing import ArrayLike
from abc import abstractmethod from abc import abstractmethod
@ -56,7 +56,7 @@ class Logger(Advanceable):
def _on_reset(self): def _on_reset(self):
self.__df = pd.DataFrame.from_dict({'time': [self.get_time()]}).astype(np.float64) self.__df = pd.DataFrame.from_dict({'time': [self.get_time()]}).astype(np.float64)
def write(self, attrib: str, value: Any, domain: str = 'all'): def write(self, attrib: str | List[str], value: Any | List[Any] | List[ArrayLike], domain: str = 'all'):
"""Writes a value to the logger. """Writes a value to the logger.
Args: Args:
@ -64,12 +64,16 @@ class Logger(Advanceable):
value (Any): The value to log. value (Any): The value to log.
domain (str, optional): The domain the value belongs to. Defaults to 'any'. domain (str, optional): The domain the value belongs to. Defaults to 'any'.
""" """
name = f'{domain}/{attrib}' if not isinstance(attrib, str):
for attr, val in zip(attrib, value):
self.write(attr, val, domain=domain)
else:
name = f'{domain}/{attrib}'
if name not in self.__df.columns: if name not in self.__df.columns:
self.__df[name] = pd.Series([pd.NA] * len(self.__df)) self.__df[name] = pd.Series([pd.NA] * len(self.__df))
self.__df.at[self.__idx, name] = value self.__df.at[self.__idx, name] = value
def get_dataframe(self) -> pd.DataFrame: def get_dataframe(self) -> pd.DataFrame:
return self.__df return self.__df

View File

@ -6,6 +6,9 @@ from numpy.typing import ArrayLike
def inv(val): def inv(val):
if np.isscalar(val): if np.isscalar(val):
if val == 0:
return 0
return 1 / val return 1 / val
if len(val) == 1: if len(val) == 1:
@ -76,6 +79,8 @@ class KalmanFilter:
H = self.__H H = self.__H
R = self.__R R = self.__R
n = len(x)
if hasattr(self.__H, '__call__'): if hasattr(self.__H, '__call__'):
H = self.__H(dt) H = self.__H(dt)
@ -86,9 +91,9 @@ class KalmanFilter:
K = err @ H.T @ inv(H @ err @ H.T + R) K = err @ H.T @ inv(H @ err @ H.T + R)
# Compute the corrected state. # Compute the corrected state.
x = x + K @ (z - H @ x) x = x + (K @ (z - H @ x)).T
# Compute the error after correction. # Compute the error after correction.
err = (np.identity('TODO') - K @ H) @ err err = (np.identity(n) - K @ H) @ err
return x, err return np.squeeze(np.asarray(x)), err

View File

@ -1,4 +1,4 @@
from typing import List from typing import List, AnyStr
from numpy.typing import ArrayLike from numpy.typing import ArrayLike
from spatz.dataset import ArrayLike, Dataset from spatz.dataset import ArrayLike, Dataset
@ -8,13 +8,27 @@ from spatz.transforms import Transform
class BHI160Gyro(Gyroscope): class BHI160Gyro(Gyroscope):
def __init__(self, dataset: Dataset, logger: Logger, offset: float = 0, transforms: List[Transform] = ...): def __init__(self, dataset: Dataset, logger: Logger, offset: float = 0, transforms: List[Transform] = []):
super().__init__(dataset, logger, offset, transforms) super().__init__(dataset, logger, offset, transforms)
def _get_name(self) -> AnyStr:
return 'BHI160'
def _get_data(self) -> ArrayLike: def _get_data(self) -> ArrayLike:
rots = self._dataset.fetch_values(['roll_l', 'pitch_l', 'yaw_l']) rots = self._dataset.fetch_values(['roll_l', 'pitch_l', 'yaw_l'])
return rots
def _sensor_specific_effects(self, x: ArrayLike) -> ArrayLike: def _sensor_specific_effects(self, x: ArrayLike) -> ArrayLike:
return x return x
class BHI160Acc(Accelerometer):
def __init__(self, dataset: Dataset, logger: Logger, offset: float = 0, transforms: List[Transform] = []):
super().__init__(dataset, logger, offset, transforms)
def _get_name(self) -> AnyStr:
return 'BHI160'
def _sensor_specific_effects(self, x: ArrayLike) -> ArrayLike:
return x

View File

@ -47,8 +47,10 @@ class Sensor:
out = self._get_data() out = self._get_data()
out = self._sensor_specific_effects(out) out = self._sensor_specific_effects(out)
t = self._dataset.get_time()
for transform in self._transforms: for transform in self._transforms:
out = transform(out) out = transform(t, out)
# Log the outputs of the sensor. # Log the outputs of the sensor.
if np.isscalar(out): if np.isscalar(out):

View File

@ -35,7 +35,6 @@ class UniformTimeSteps:
return self.__dt + noise return self.__dt + noise
class Simulation: class Simulation:
def __init__(self, time_steps=UniformTimeSteps(0.01)): def __init__(self, time_steps=UniformTimeSteps(0.01)):
self.__dataset = None self.__dataset = None
@ -123,4 +122,3 @@ class Simulation:
return self.__sensors[-1] return self.__sensors[-1]

View File

@ -26,4 +26,9 @@ class Downtime(Transform):
if self.__state == 1: if self.__state == 1:
return x return x
return np.zeros_like(x) if np.isscalar(x):
x = 0
else:
x = np.zeros_like(x)
return x