Inverted-Pendulum-Neural-Ne.../analysis/find_best_time_weighting_learning_rate_sweep.py

180 lines
7.0 KiB
Python

import os
import csv
import torch
from torchdiffeq import odeint
import pandas as pd
from PendulumController import PendulumController
from PendulumDynamics import PendulumDynamics
from initial_conditions import initial_conditions
# Device and initial conditions setup
device = torch.device("cpu")
state_0 = torch.tensor(initial_conditions, dtype=torch.float32, device=device)
# Constants (same as in your training code)
m = 10.0
g = 9.81
R = 1.0
t_start, t_end, t_points = 0, 10, 1000
t_span = torch.linspace(t_start, t_end, t_points, device=device)
# Base path containing the time_weighting_function directories
base_path = "/home/judson/Neural-Networks-in-GNC/inverted_pendulum/training/time_weighting_learning_rate_sweep"
def compute_constant_loss(controller_path):
"""
Loads a controller from the given path, sets up the dynamics using the constant weighting function,
simulates the system, and returns the computed loss.
"""
controller = PendulumController().to(device)
controller.load_state_dict(torch.load(controller_path, map_location=device))
pendulum_dynamics = PendulumDynamics(controller, m, R, g).to(device)
with torch.no_grad():
state_traj = odeint(pendulum_dynamics, state_0, t_span, method='rk4')
theta = state_traj[:, :, 0]
desired_theta = state_traj[:, :, 3]
loss = torch.mean((theta - desired_theta) ** 2)
return loss.item()
# Dictionary to store the best results for each time weighting function.
# Each key maps to a dictionary with keys "csv" and "constant".
# Each candidate dictionary contains:
# "path": best lr directory path,
# "csv_loss": loss from the training log,
# "constant_loss": loss computed via the constant method.
best_results = {}
# Process each time weighting function directory
for function_name in os.listdir(base_path):
function_path = os.path.join(base_path, function_name)
if not os.path.isdir(function_path):
continue
print(f"Processing weighting function: {function_name}")
# Initialize best candidate variables for CSV-based best
best_csv_csv_loss = float('inf')
best_csv_constant_loss = float('inf')
best_csv_path = None
# Initialize best candidate variables for constant-based best
best_constant_constant_loss = float('inf')
best_constant_csv_loss = float('inf')
best_constant_path = None
# Loop through each learning rate directory (directories named "lr_*")
for lr_dir in os.listdir(function_path):
if not lr_dir.startswith("lr_"):
continue
lr_path = os.path.join(function_path, lr_dir)
if not os.path.isdir(lr_path):
continue
# --- Compute CSV loss candidate ---
current_csv_loss = None
csv_file = os.path.join(lr_path, "training_log.csv")
if os.path.exists(csv_file):
try:
with open(csv_file, 'r') as f:
reader = csv.DictReader(f)
losses = []
for row in reader:
try:
loss_value = float(row['Loss'])
losses.append(loss_value)
except ValueError:
continue
if losses:
current_csv_loss = min(losses)
except Exception as e:
print(f"Error reading CSV {csv_file}: {e}")
# --- Compute constant loss candidate ---
current_constant_loss = None
controllers_dir = os.path.join(lr_path, "controllers")
controller_file = os.path.join(controllers_dir, "controller_200.pth")
if os.path.exists(controller_file):
try:
current_constant_loss = compute_constant_loss(controller_file)
except Exception as e:
print(f"Error computing constant loss for {controller_file}: {e}")
# Update best CSV candidate (based on CSV loss)
if current_csv_loss is not None:
csv_const_loss_val = current_constant_loss if current_constant_loss is not None else float('inf')
if current_csv_loss < best_csv_csv_loss:
best_csv_csv_loss = current_csv_loss
best_csv_constant_loss = csv_const_loss_val
best_csv_path = lr_path
# Update best Constant candidate (based on constant loss)
if current_constant_loss is not None:
csv_loss_val = current_csv_loss if current_csv_loss is not None else float('inf')
if current_constant_loss < best_constant_constant_loss:
best_constant_constant_loss = current_constant_loss
best_constant_csv_loss = csv_loss_val
best_constant_path = lr_path
best_results[function_name] = {
"csv": {"path": best_csv_path, "csv_loss": best_csv_csv_loss, "constant_loss": best_csv_constant_loss},
"constant": {"path": best_constant_path, "csv_loss": best_constant_csv_loss, "constant_loss": best_constant_constant_loss},
}
print(f"Finished {function_name}:")
print(f" Best CSV candidate - Path: {best_csv_path}, CSV Loss: {best_csv_csv_loss}, Constant Loss: {best_csv_constant_loss}")
print(f" Best Constant candidate - Path: {best_constant_path}, CSV Loss: {best_constant_csv_loss}, Constant Loss: {best_constant_constant_loss}")
print("Final best results:")
print(best_results)
# Build summary table rows using pandas.
# Extract only the learning rate (e.g., from "lr_0.250" get "0.250") rather than the full path.
def extract_lr(path):
if path is None:
return "N/A"
base = os.path.basename(path)
if base.startswith("lr_"):
return base[3:]
return base
table_rows = []
for function_name, results in best_results.items():
csv_info = results.get("csv", {})
constant_info = results.get("constant", {})
csv_lr = extract_lr(csv_info.get("path"))
constant_lr = extract_lr(constant_info.get("path"))
table_rows.append({
"Function Name": function_name,
"Candidate": "CSV",
"Learning Rate": csv_lr,
"CSV Loss": csv_info.get("csv_loss", float('inf')),
"Constant Loss": csv_info.get("constant_loss", float('inf'))
})
table_rows.append({
"Function Name": "", # Leave blank for the second row
"Candidate": "Constant",
"Learning Rate": constant_lr,
"CSV Loss": constant_info.get("csv_loss", float('inf')),
"Constant Loss": constant_info.get("constant_loss", float('inf'))
})
df = pd.DataFrame(table_rows, columns=["Function Name", "Candidate", "Learning Rate", "CSV Loss", "Constant Loss"])
# Get the table as a formatted string
table_str = df.to_string(index=False)
print("\n" + table_str)
# Write the dictionary and table to a file
output_file = "best_time_weighting_learning_rate_sweep.txt"
with open(output_file, "w") as f:
f.write("Final best results (dictionary):\n")
f.write(str(best_results) + "\n\n")
f.write("Summary Table:\n")
f.write(table_str)
print(f"\nResults have been written to {output_file}")