from typing import List from numpy.random import normal from tqdm import tqdm from spatz.dataset import Dataset from spatz.logger import Logger from spatz.sensors import Sensor from spatz.dataset import Dataset, Phase from spatz.logger import Logger from spatz.sensors import Sensor from spatz.observer import Observer class UniformTimeSteps: def __init__(self, dt: float, mu: float = 0, sigma: float = 0, delay_only=True) -> None: """_summary_ Args: dt (float): _description_ mu (float, optional): _description_. Defaults to 0. sigma (float, optional): _description_. Defaults to 0. delay_only (bool, optional): _description_. Defaults to True. """ self.__dt = dt self.__mu = mu self.__sigma = sigma self.__delay_only = delay_only def __call__(self, t): noise = normal(self.__mu, self.__sigma) if self.__delay_only: noise = abs(noise) return self.__dt + noise class Simulation: def __init__(self, time_steps=UniformTimeSteps(0.01)): self.__dataset = None self.__logger = None self.__sensors: List[Sensor] = [] self.__time_steps = time_steps def run(self, verbose=False, until: Phase = None): idx = 0 # Clear all logs and reset the dataset to the first time step. self.__dataset.reset() self.__logger.reset() if verbose: pbar = tqdm(total=self.__dataset.get_length()) while True: t = self.__dataset.get_time() dt = self.__time_steps(t) t_ = t + dt idx += 1 if t_ > self.__dataset.get_length(): break if until is not None and self.__dataset.get_phase() == until: break self.__dataset.step(dt) self.__logger.step(dt) if verbose: pbar.update(dt) yield idx, t_, t_ - t if verbose: pbar.close() def get_dataset(self) -> Dataset: return self.__dataset def get_logger(self) -> Logger: return self.__logger def load(self, path: str): self.__dataset = Dataset(path) self.__logger = Logger() for sensor in self.__sensors: sensor.set_dataset(self.__dataset) sensor.set_logger(self.__logger) return self def add_sensor(self, sensor, *args, **kwargs) -> Sensor: """Register a new sensor for this simulation. A registered sensor can be called like a function and returns the current measurements. The class' constructor arguments have to be given aswell. Args: sensor (_type_): A subclass of the abstract Sensor class. Returns: Sensor: Returns an object of the provided sensor subclass. """ assert issubclass(sensor, Sensor), "Expected a subclass of Sensor." self.__sensors.append(sensor(self.__dataset, self.__logger, *args, **kwargs)) return self.__sensors[-1] def add_observer(self, attributes: List[str]) -> Observer: """Register a new observer for this simulation observing the provided attributes. Args: attributes (List[str]): A list of strings describing the attributes to observe. Returns: Observer: An observer object which can be called like a function to obtain the desired data. """ assert len(attributes) != 0, "Observed attributes list must be nonempty." self.__sensors.append(Observer(self.__dataset, self.__logger, attributes)) return self.__sensors[-1]