mirror of
https://git.intern.spaceteamaachen.de/ALPAKA/SPATZ.git
synced 2025-06-10 18:15:59 +00:00
57 lines
1.5 KiB
Python
57 lines
1.5 KiB
Python
import math
|
|
import numpy as np
|
|
|
|
from abc import abstractmethod
|
|
from typing import List, AnyStr
|
|
from numpy.typing import ArrayLike
|
|
|
|
from spatz.transforms import *
|
|
from spatz.logger import *
|
|
from spatz.dataset import *
|
|
|
|
|
|
class Sensor:
|
|
def __init__(self, dataset: Dataset, logger: Logger, transforms: List[Transform] = []):
|
|
self._dataset = dataset
|
|
self._logger = logger
|
|
self._transforms = transforms
|
|
|
|
def set_dataset(self, dataset: Dataset):
|
|
self._dataset = dataset
|
|
|
|
def set_logger(self, logger: Logger):
|
|
self._logger = logger
|
|
|
|
def set_transforms(self, transforms: List[Transform]):
|
|
self._transforms = transforms
|
|
|
|
def _log(self, name: AnyStr, value: Any):
|
|
self._logger.write(name, value, self._get_name())
|
|
|
|
@abstractmethod
|
|
def _get_name(self) -> AnyStr:
|
|
raise NotImplementedError()
|
|
|
|
@abstractmethod
|
|
def _sensor_specific_effects(self, x: ArrayLike | float) -> ArrayLike | float:
|
|
raise NotImplementedError()
|
|
|
|
@abstractmethod
|
|
def _get_data(self) -> ArrayLike | float:
|
|
raise NotImplementedError()
|
|
|
|
def __call__(self) -> ArrayLike | float:
|
|
out = self._get_data()
|
|
out = self._sensor_specific_effects(out)
|
|
|
|
for transform in self._transforms:
|
|
out = transform(out)
|
|
|
|
# Log the outputs of the sensor.
|
|
if np.isscalar(out):
|
|
self._log('out', out)
|
|
else:
|
|
for i in range(len(out)):
|
|
self._log(f'out_{i}', out[i])
|
|
|
|
return out |