SPATZ/spatz/sensors/sensor.py

66 lines
1.8 KiB
Python

import math
import numpy as np
from abc import abstractmethod
from typing import List, AnyStr
from numpy.typing import ArrayLike
from spatz.transforms import *
from spatz.logger import *
from spatz.dataset import *
class Sensor:
def __init__(self, dataset: Dataset, logger: Logger, transforms: List[Transform] = [], min_value=-np.inf, max_value=np.inf):
self._dataset = dataset
self._logger = logger
self._transforms = transforms
self._min_value = min_value
self._max_value = max_value
def set_dataset(self, dataset: Dataset):
self._dataset = dataset
def set_logger(self, logger: Logger):
self._logger = logger
def set_transforms(self, transforms: List[Transform]):
self._transforms = transforms
def _log(self, name: AnyStr, value: Any):
self._logger.write(name, value, self._get_name())
@abstractmethod
def _get_name(self) -> AnyStr:
raise NotImplementedError()
@abstractmethod
def _sensor_specific_effects(self, x: ArrayLike | float) -> ArrayLike | float:
raise NotImplementedError()
@abstractmethod
def _get_data(self) -> ArrayLike | float:
raise NotImplementedError()
def get_init_data() -> ArrayLike:
pass
def __call__(self) -> ArrayLike | float:
out = self._get_data()
out = self._sensor_specific_effects(out)
t = self._dataset.get_time()
for transform in self._transforms:
out = transform(t, out)
out = np.clip(out, self._min_value, self._max_value)
# Log the outputs of the sensor.
if np.isscalar(out):
self._log('out', out)
else:
for i in range(len(out)):
self._log(f'out_{i}', out[i])
return out