SPATZ/spatz/simulation.py

99 lines
2.5 KiB
Python

from typing import List
from numpy.random import normal
from tqdm import tqdm
from spatz.dataset import Dataset
from spatz.logger import Logger
from spatz.sensors import Sensor
from dataset import Dataset
from logger import Logger
from sensor import Sensor
class UniformTimeSteps:
def __init__(self, dt: float, mu: float = 0, sigma: float = 0, delay_only=True) -> None:
"""_summary_
Args:
dt (float): _description_
mu (float, optional): _description_. Defaults to 0.
sigma (float, optional): _description_. Defaults to 0.
delay_only (bool, optional): _description_. Defaults to True.
"""
self.__dt = dt
self.__mu = mu
self.__sigma = sigma
self.__delay_only = delay_only
def __call__(self, t):
noise = normal(self.__mu, self.__sigma)
if self.__delay_only:
noise = abs(noise)
return self.__dt + noise
class Simulation:
def __init__(self, time_steps=UniformTimeSteps(0.01)):
self.__dataset = None
self.__logger = None
self.__sensors: List[Sensor] = []
self.__time_steps = time_steps
def run(self, verbose=False):
idx = 0
# Clear all logs and reset the dataset to the first time step.
self.__dataset.reset()
self.__logger.reset()
if verbose:
pbar = tqdm(total=self.__dataset.get_length())
while True:
t = self.__dataset.get_time()
dt = self.__time_steps(t)
t_ = t + dt
idx += 1
if t_ > self.__dataset.get_length():
break
self.__dataset.step(dt)
self.__logger.step(dt)
if verbose:
pbar.update(dt)
yield idx, t_, t_ - t
if verbose:
pbar.close()
def get_dataset(self) -> Dataset:
return self.__dataset
def get_logger(self) -> Logger:
return self.__logger
def load(self, path: str):
self.__dataset = Dataset(path)
self.__logger = Logger()
for sensor in self.__sensors:
sensor.set_dataset(self.__dataset)
sensor.set_logger(self.__logger)
return self
def add_sensor(self, sensor, *args, **kwargs) -> Sensor:
assert issubclass(sensor, Sensor), "Expected a subclass of Sensor."
self.__sensors.append(sensor(self.__dataset, self.__logger, *args, **kwargs))
return self.__sensors[-1]