import os import math import pathlib import pandas as pd from numpy import log10 __all__ = [ 'preprocess_file' ] def filter_zero_steps(df: pd.DataFrame) -> pd.DataFrame: """Filter all rows with time steps of length 0. Args: df (pd.DataFrame): The dataframe to preprocess. Returns: pd.DataFrame: The preprocessed dataframe. """ dts = df['Time'].diff().fillna(1) df['dts'] = dts return df[dts != 0] def preprocess_velocity(df: pd.DataFrame) -> pd.DataFrame: """Transforms the velocities from km/s to m/s. Args: df (pd.DataFrame): The DataFrame to preprocess. Returns: pd.DataFrame: The modified DataFrame. """ for axis in ['x', 'y', 'z']: df['v{}_FL'.format(axis)] *= 1e3 return df def preprocess_position(df: pd.DataFrame) -> pd.DataFrame: for axis in ['x', 'y', 'z']: df['{}_FL'.format(axis)] *= 1e3 return df def compute_acceleration(df: pd.DataFrame) -> pd.DataFrame: """ Args: df (pd.DataFrame): _description_ Returns: pd.DataFrame: _description_ """ # Time deltas. dts = df['Time'].diff().fillna(0) for axis in ['x', 'y', 'z']: # Raw differences between velocity across time steps. das = df['v{}_FL'.format(axis)].diff().fillna(0) # Velocity differences divided by time deltas. df['a{}_FL'.format(axis)] = das.divide(dts).fillna(0) return df def preprocess_rotations(df: pd.DataFrame) -> pd.DataFrame: df['pitch_l'] *= math.pi / 180 df['yaw_l'] *= math.pi / 180 df['roll_l'] *= math.pi / 180 df['declination'] *= math.pi / 180 df['longitude'] *= math.pi / 180 df['latitude'] *= math.pi / 180 return df def compute_omegas(df: pd.DataFrame) -> pd.DataFrame: dts = df['Time'].diff().fillna(0) for axis in ['X', 'Y', 'Z']: # Raw differences between omegas across time steps. dos = df['OMEGA_{}'.format(axis)].diff().fillna(0) # Omega differences divided by time deltas. df['omega_{}'.format(axis)] = dos.divide(dts).fillna(0) return df def preprocess_file(path): """_summary_ Args: path (_type_): _description_ Returns: _type_: _description_ """ df = pd.read_csv(path, delimiter="\t" , encoding='latin') df = df.drop(0, axis=0) df = df.astype(float) df['Phase'] = df['Phase'].replace({ 1: 'onpad', # rocket waiting on pad 2: 'loi', # rocket liftoff 3: 'rci', # rail clearance 4: 'eci', # engine cutoff 5: 'adi' # apogee and drogue }) # Select all the relevant columns df = df[[ 'Time', 'Phase', 'declination', 'longitude', 'latitude', 'altitude', 'mach', 'sonic_velocity', 'x_FL', 'y_FL', 'z_FL', 'vx_FL', 'vy_FL', 'vz_FL', 'OMEGA_X', 'OMEGA_Y', 'OMEGA_Z', 'pitch_l', 'yaw_l', 'roll_l', 'flightpath_speed', 'acc_total', 'atmos_pressure', 'atmos_temperature', 'drag', 'mass_total' ]] df = filter_zero_steps(df) df = preprocess_position(df) df = preprocess_velocity(df) df = compute_acceleration(df) df = compute_omegas(df) df = preprocess_rotations(df) # Set the altitude to meters df['altitude'] *= 1000 renaming = { 'sonic_velocity': 'speedofsound', 'Time': 'time', 'Phase': 'phase', 'flightpath_speed': 'velocity', 'acc_total': 'acceleration', 'atmos_pressure': 'pressure', 'atmos_temperature': 'temperature', 'x_FL': 'x', 'y_FL': 'y', 'z_FL': 'z', 'vx_FL': 'vx', 'vy_FL': 'vy', 'vz_FL': 'vz', 'ax_FL': 'ax', 'ay_FL': 'ay', 'az_FL': 'az' } df = df.rename(renaming, axis=1) return df def to_traj_file(path, name, df: pd.DataFrame): with open(path, 'w') as f: f.write(f"#epsg 0\n#name { name }\n#fields t,px,py,pz,ex,ey,ez,vx,vy,vz\n") df.to_csv(path, columns=['time', 'x', 'y', 'z', 'pitch', 'yaw', 'roll', 'vx_global', 'vy_global', 'vz_global'], header=False, index=False, mode='a') if __name__ == '__main__': for file in os.listdir('data/simulations/'): df = preprocess_file('data/simulations/' + file)