Updated observers and Kalman Filter, added running average

This commit is contained in:
dario 2024-04-19 10:52:34 +02:00 committed by dario
parent dedd445470
commit 88008d9718
21 changed files with 20865 additions and 3544 deletions

1720
data/simulations/16km.csv Normal file

File diff suppressed because it is too large Load Diff

1720
data/simulations/16km.txt Normal file

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

1720
data/simulations/23km.csv Normal file

File diff suppressed because it is too large Load Diff

1720
data/simulations/23km.txt Normal file

File diff suppressed because it is too large Load Diff

1720
data/simulations/28km.csv Normal file

File diff suppressed because it is too large Load Diff

1720
data/simulations/28km.txt Normal file

File diff suppressed because it is too large Load Diff

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -8,7 +8,7 @@ with open('README.md', 'r') as f:
setup(
name='spatz',
version='0.0.10',
packages=['spatz'],
packages=find_packages(exclude=["_tests"]),
long_description=longdescription,
long_description_content_type='text/markdown',
)

View File

@ -4,7 +4,7 @@ import numpy as np
import pandas as pd
from enum import Enum
from typing import List
from typing import List, Literal
from numpy.typing import ArrayLike
from scipy.spatial.transform import Rotation
@ -338,7 +338,7 @@ class Dataset(Advanceable):
"""
return self.fetch_values(['OMEGA_X', 'OMEGA_Y', 'OMEGA_Z'], t)
def get_velocity(self, frame='FL', t: float | None = None) -> ArrayLike:
def get_velocity(self, frame: Literal['L', 'B', 'LF'] = 'LF', t: float | None = None) -> ArrayLike:
"""
Args:
frame (str, optional): The coordinate frame to compute the velocity for. Defaults to 'FL'.
@ -352,6 +352,8 @@ class Dataset(Advanceable):
if frame == 'B':
return self.launch_rail_to_body(t) @ vel
elif frame == 'L':
return np.linalg.inv(self.local_to_launch_rail()) @ vel
return vel

12
spatz/models/average.py Normal file
View File

@ -0,0 +1,12 @@
class MovingAverage:
def __init__(self, k: int, init_value: float) -> None:
self.__k = k
self.__values = [init_value] * k
def update(self, value):
self.__values.append(value)
self.__values = self.__values[1:]
return sum(self.__values) / self.__k

View File

@ -91,7 +91,7 @@ class KalmanFilter:
K = err @ H.T @ inv(H @ err @ H.T + R)
# Compute the corrected state.
x = x + (K @ (z - H @ x)).T
x = x + (K @ (z - H @ x).T).T
# Compute the error after correction.
err = (np.identity(n) - K @ H) @ err

View File

@ -1,4 +1,4 @@
from typing import Any, List, Dict, AnyStr
from typing import Any, List, Dict, AnyStr, Tuple
from numpy.typing import ArrayLike
from spatz.dataset import Dataset
@ -7,7 +7,7 @@ from spatz.transforms import Transform
class Observer:
def __init__(self, dataset: Dataset, logger: Logger, attributes: List[str]):
def __init__(self, dataset: Dataset, logger: Logger, attributes: List[str] = None):
self._dataset = dataset
self._logger = logger
self.__attrs = attributes
@ -30,11 +30,22 @@ class Observer:
ArrayLike: Returns the values of the observed attributes at the start of the simulation.
"""
return self(t=self._dataset.get_start_time())
def _fetch(self, t: float) -> Tuple[ArrayLike, List[str]]:
"""Method for collecting and preprocessing the desired data. Can be overwritten by a subclass.
Args:
t (float): The current time of the simulation.
Returns:
ArrayLike: The collected values.
"""
return self._dataset.fetch_values(self.__attrs, t), self.__attrs
def __call__(self, t: float | None = None) -> ArrayLike:
data = self._dataset.fetch_values(self.__attrs, t)
data, attrs = self._fetch(t)
for attrib, value in zip(self.__attrs, data):
for attrib, value in zip(attrs, data):
self._log(attrib, value)
return data

View File

@ -10,14 +10,17 @@ from spatz.transforms import Transform
class PressureSensor(Sensor):
def __init__(self, dataset: Dataset, logger: Logger, transforms: List[Transform] = [], ts_effects=True):
"""
def __init__(self, dataset: Dataset, logger: Logger, transforms: List[Transform] = [], ts_effects=True, delay=0.0):
"""_summary_
Args:
dataset (Dataset): A dataset instance.
logger (Logger): _description_
transforms (List[Transform], optional): Transforms to apply to the sensor outputs. Defaults to [].
ts_effects (bool, optional): If True, models transsonic effects. Defaults to True.
ts_effects (bool, optional): If True, adds transsonic effects using a very simple model. Defaults to True.
delay (float, optional): Adds a delay to the pressure measurements. Defaults to 0.0.
"""
super(PressureSensor, self).__init__(dataset, logger, transforms)
super(PressureSensor, self).__init__(dataset, logger, transforms, min_value=0)
self._ts_effects = ts_effects

View File

@ -11,10 +11,12 @@ from spatz.dataset import *
class Sensor:
def __init__(self, dataset: Dataset, logger: Logger, transforms: List[Transform] = []):
def __init__(self, dataset: Dataset, logger: Logger, transforms: List[Transform] = [], min_value=-np.inf, max_value=np.inf):
self._dataset = dataset
self._logger = logger
self._transforms = transforms
self._min_value = min_value
self._max_value = max_value
def set_dataset(self, dataset: Dataset):
self._dataset = dataset
@ -52,6 +54,8 @@ class Sensor:
for transform in self._transforms:
out = transform(t, out)
out = np.clip(out, self._min_value, self._max_value)
# Log the outputs of the sensor.
if np.isscalar(out):
self._log('out', out)

View File

@ -106,19 +106,27 @@ class Simulation:
self.__sensors.append(sensor(self.__dataset, self.__logger, *args, **kwargs))
return self.__sensors[-1]
def add_observer(self, attributes: List[str]) -> Observer:
"""Register a new observer for this simulation observing the provided attributes.
def add_observer(self, observer_or_attributes: List[str] | Observer) -> Observer:
"""Register a new observer for this simulation.
Args:
attributes (List[str]): A list of strings describing the attributes to observe.
observer_or_attributes (List[str] | Observer): A list of strings describing the attributes to observe
or a custom observer class.
Returns:
Observer: An observer object which can be called like a function to obtain the desired data.
"""
assert len(attributes) != 0, "Observed attributes list must be nonempty."
assert isinstance(observer_or_attributes, list) or issubclass(observer_or_attributes, Observer)
self.__sensors.append(Observer(self.__dataset, self.__logger, attributes))
if isinstance(observer_or_attributes, list):
attributes = observer_or_attributes
assert len(attributes) != 0, "Observed attributes list must be nonempty."
self.__sensors.append(Observer(self.__dataset, self.__logger, attributes))
else:
observer = observer_or_attributes
self.__sensors.append(observer(self.__dataset, self.__logger))
return self.__sensors[-1]

File diff suppressed because one or more lines are too long