Inverted-Pendulum-Neural-Ne.../analysis/time_weighting_individual.py

162 lines
7.7 KiB
Python

from multiprocessing import Pool, cpu_count
import os
import numpy as np
import matplotlib.pyplot as plt
from simulation import run_simulation
from data_processing import get_controller_files
# Constants and setup
initial_conditions = {
"small_perturbation": (0.1*np.pi, 0.0, 0.0, 0.0),
"large_perturbation": (-np.pi, 0.0, 0.0, 0),
"overshoot_vertical_test": (-0.1*np.pi, 2*np.pi, 0.0, 0.0),
"overshoot_angle_test": (0.2*np.pi, 2*np.pi, 0.0, 0.3*np.pi),
"extreme_perturbation": (4*np.pi, 0.0, 0.0, 0),
}
loss_functions = ["constant", "linear", "quadratic", "cubic", "inverse", "inverse_squared", "inverse_cubed"]
loss_functions_mirrored = ["linear", "quadratic", "cubic", "inverse", "inverse_squared", "inverse_cubed"]
loss_functions_mirrored = [i+"_mirrored" for i in loss_functions_mirrored]
loss_functions = loss_functions + loss_functions_mirrored
epoch_range = (0, 100) # Start and end of epoch range
epoch_step = 1 # Interval between epochs
dt = 0.02 # Time step for simulation
num_steps = 500 # Number of steps in each simulation
# Plotting functions
def plot_3d_epoch_evolution(epochs, theta_over_epochs, desired_theta, save_path, title, num_steps, dt):
fig = plt.figure(figsize=(7, 5))
ax = fig.add_subplot(111, projection='3d')
time_steps = np.arange(num_steps) * dt
theta_values = np.concatenate(theta_over_epochs)
theta_min = np.min(theta_values)
theta_max = np.max(theta_values)
desired_range_min = desired_theta - 1.5 * np.pi
desired_range_max = desired_theta + 1.5 * np.pi
desired_range_min = max(theta_min, desired_range_min)
desired_range_max = min(theta_max, desired_range_max)
for epoch, theta_vals in reversed(list(zip(epochs, theta_over_epochs))):
masked_theta_vals = np.array(theta_vals)
masked_theta_vals[(masked_theta_vals < desired_range_min) | (masked_theta_vals > desired_range_max)] = np.nan
ax.plot([epoch] * len(time_steps), time_steps, masked_theta_vals)
epochs_array = np.array([epoch for epoch, _ in zip(epochs, theta_over_epochs)])
ax.plot(epochs_array, [time_steps.max()] * len(epochs_array), [desired_theta] * len(epochs_array),
color='r', linestyle='--', linewidth=2, label='Desired Theta at End Time')
ax.set_xlabel("Epoch")
ax.set_ylabel("Time (s)")
ax.set_zlabel("Theta (rad)")
ax.set_zscale('symlog')
ax.set_title(title)
ax.set_zlim(desired_range_min, desired_range_max)
ax.view_init(elev=20, azim=-135)
if not os.path.exists(os.path.dirname(save_path)):
os.makedirs(os.path.dirname(save_path))
plt.savefig(save_path, dpi=300)
plt.close()
print(f"Saved plot as '{save_path}'.")
def plot_theta_vs_epoch(all_results, condition_name, desired_theta, save_path, title, specific_theta_index=-1):
"""
Plots the theta values at a specific time over epochs for different loss functions for a specific condition, and adds a horizontal line at desired theta.
:param all_results: Dictionary with structure {loss_function: {condition_name: (epochs, theta_over_epochs)}}
:param condition_name: The key for the specific condition to plot.
:param desired_theta: The y-value at which to draw a horizontal line across the plot.
:param save_path: Path to save the final plot.
:param title: Title of the plot.
:param specific_theta_index: The index of the theta value to plot. Default is -1 for the last theta.
"""
fig, ax = plt.subplots(figsize=(10, 7)) # Correct usage of plt.subplots for creating a figure and an axes.
if condition_name not in all_results[next(iter(all_results))]:
print(f"No data available for condition '{condition_name}'. Exiting plot function.")
return
for loss_function, conditions in all_results.items():
if condition_name in conditions:
epochs, theta_over_epochs = conditions[condition_name]
# Extract final theta values for each epoch
final_thetas = [thetas[specific_theta_index] for thetas in theta_over_epochs if thetas] # Ensuring thetas is not empty
ax.plot(epochs, final_thetas, label=f"{loss_function}")
# Add a horizontal line at the desired_theta
ax.axhline(y=desired_theta, color='r', linestyle='--', linewidth=2, label='Desired Theta')
ax.set_title(title)
ax.set_xlabel('Epoch')
ax.set_ylabel('Final Theta (rad)')
ax.legend()
plt.yscale('symlog')
plt.savefig(save_path)
plt.close()
print(f"Plot saved to {save_path}")
# Main execution
if __name__ == "__main__":
all_results = {} # Dictionary to store results by loss function
for condition_name, initial_condition in initial_conditions.items():
condition_text = f"IC_{'_'.join(map(lambda x: str(round(x, 2)), initial_condition))}"
desired_theta = initial_condition[-1]
condition_path = f"/home/judson/Neural-Networks-in-GNC/inverted_pendulum/analysis/time_weighting2/{condition_name}"
os.makedirs(condition_path, exist_ok=True) # Create directory if it does not exist
for loss_function in loss_functions:
# Construct the path to the controller directory
directory = f"/home/judson/Neural-Networks-in-GNC/inverted_pendulum/training/time_weighting/{loss_function}/controllers"
# Fetch the controller files according to the specified range and interval
controllers = get_controller_files(directory, epoch_range, epoch_step)
# Pack parameters for parallel processing
tasks = [(c, initial_condition, directory, dt, num_steps) for c in controllers]
# Execute simulations in parallel
print("Starting worker processes")
with Pool(min(cpu_count(), 16)) as pool:
results = pool.map(run_simulation, tasks)
# Sorting the results
results.sort(key=lambda x: x[0]) # Assuming x[0] is the epoch number
epochs, state_histories, torque_histories = zip(*results) # Assuming results contain these
# Convert state_histories to a more manageable form if necessary, e.g., just theta values
theta_over_epochs = [[state[0] for state in history] for history in state_histories]
# Store results for later use
if loss_function not in all_results:
all_results[loss_function] = {}
all_results[loss_function][condition_name] = (epochs, theta_over_epochs)
# continue
# Plotting the 3D epoch evolution
print(f"Plotting the 3d epoch evolution for {loss_function} under {condition_text}")
title = f"Pendulum Angle Evolution for {loss_function} and {condition_text}"
save_path = os.path.join(condition_path, f"epoch_evolution")
save_path = os.path.join(save_path, f"{loss_function}.png")
plot_3d_epoch_evolution(epochs, theta_over_epochs, desired_theta, save_path, title, num_steps, dt)
print("")
# Plot the theta as a function of epoch for all loss functions
continue
specific_theta_index = num_steps // 2
save_path = os.path.join(condition_path, f"theta_at_5sec_across_epochs.png")
plot_theta_vs_epoch(all_results, condition_name, desired_theta, save_path, f"Theta at 5 Seconds across Epochs for {condition_text}", specific_theta_index)
specific_theta_index = -1
save_path = os.path.join(condition_path, f"final_theta_across_epochs.png")
plot_theta_vs_epoch(all_results, condition_name, desired_theta, save_path, f"Final Theta across Epochs for {condition_text}", specific_theta_index)
print(f"Completed plotting for all loss functions under {condition_name} condition.\n")
# import json
# with open("all_results.json", 'w') as file:
# json.dump(all_results, file)