import os import csv import torch from torchdiffeq import odeint import pandas as pd from PendulumController import PendulumController from PendulumDynamics import PendulumDynamics from initial_conditions import initial_conditions # Device and initial conditions setup device = torch.device("cpu") state_0 = torch.tensor(initial_conditions, dtype=torch.float32, device=device) # Constants (same as in your training code) m = 10.0 g = 9.81 R = 1.0 t_start, t_end, t_points = 0, 10, 1000 t_span = torch.linspace(t_start, t_end, t_points, device=device) # Base path containing the time_weighting_function directories base_path = "/home/judson/Neural-Networks-in-GNC/inverted_pendulum/training/time_weighting_learning_rate_sweep" def compute_constant_loss(controller_path): """ Loads a controller from the given path, sets up the dynamics using the constant weighting function, simulates the system, and returns the computed loss. """ controller = PendulumController().to(device) controller.load_state_dict(torch.load(controller_path, map_location=device)) pendulum_dynamics = PendulumDynamics(controller, m, R, g).to(device) with torch.no_grad(): state_traj = odeint(pendulum_dynamics, state_0, t_span, method='rk4') theta = state_traj[:, :, 0] desired_theta = state_traj[:, :, 3] loss = torch.mean((theta - desired_theta) ** 2) return loss.item() # Dictionary to store the best results for each time weighting function. # Each key maps to a dictionary with keys "csv" and "constant". # Each candidate dictionary contains: # "path": best lr directory path, # "csv_loss": loss from the training log, # "constant_loss": loss computed via the constant method. best_results = {} # Process each time weighting function directory for function_name in os.listdir(base_path): function_path = os.path.join(base_path, function_name) if not os.path.isdir(function_path): continue print(f"Processing weighting function: {function_name}") # Initialize best candidate variables for CSV-based best best_csv_csv_loss = float('inf') best_csv_constant_loss = float('inf') best_csv_path = None # Initialize best candidate variables for constant-based best best_constant_constant_loss = float('inf') best_constant_csv_loss = float('inf') best_constant_path = None # Loop through each learning rate directory (directories named "lr_*") for lr_dir in os.listdir(function_path): if not lr_dir.startswith("lr_"): continue lr_path = os.path.join(function_path, lr_dir) if not os.path.isdir(lr_path): continue # --- Compute CSV loss candidate --- current_csv_loss = None csv_file = os.path.join(lr_path, "training_log.csv") if os.path.exists(csv_file): try: with open(csv_file, 'r') as f: reader = csv.DictReader(f) losses = [] for row in reader: try: loss_value = float(row['Loss']) losses.append(loss_value) except ValueError: continue if losses: current_csv_loss = min(losses) except Exception as e: print(f"Error reading CSV {csv_file}: {e}") # --- Compute constant loss candidate --- current_constant_loss = None controllers_dir = os.path.join(lr_path, "controllers") controller_file = os.path.join(controllers_dir, "controller_200.pth") if os.path.exists(controller_file): try: current_constant_loss = compute_constant_loss(controller_file) except Exception as e: print(f"Error computing constant loss for {controller_file}: {e}") # Update best CSV candidate (based on CSV loss) if current_csv_loss is not None: csv_const_loss_val = current_constant_loss if current_constant_loss is not None else float('inf') if current_csv_loss < best_csv_csv_loss: best_csv_csv_loss = current_csv_loss best_csv_constant_loss = csv_const_loss_val best_csv_path = lr_path # Update best Constant candidate (based on constant loss) if current_constant_loss is not None: csv_loss_val = current_csv_loss if current_csv_loss is not None else float('inf') if current_constant_loss < best_constant_constant_loss: best_constant_constant_loss = current_constant_loss best_constant_csv_loss = csv_loss_val best_constant_path = lr_path best_results[function_name] = { "csv": {"path": best_csv_path, "csv_loss": best_csv_csv_loss, "constant_loss": best_csv_constant_loss}, "constant": {"path": best_constant_path, "csv_loss": best_constant_csv_loss, "constant_loss": best_constant_constant_loss}, } print(f"Finished {function_name}:") print(f" Best CSV candidate - Path: {best_csv_path}, CSV Loss: {best_csv_csv_loss}, Constant Loss: {best_csv_constant_loss}") print(f" Best Constant candidate - Path: {best_constant_path}, CSV Loss: {best_constant_csv_loss}, Constant Loss: {best_constant_constant_loss}") print("Final best results:") print(best_results) # Build summary table rows using pandas. # Extract only the learning rate (e.g., from "lr_0.250" get "0.250") rather than the full path. def extract_lr(path): if path is None: return "N/A" base = os.path.basename(path) if base.startswith("lr_"): return base[3:] return base table_rows = [] for function_name, results in best_results.items(): csv_info = results.get("csv", {}) constant_info = results.get("constant", {}) csv_lr = extract_lr(csv_info.get("path")) constant_lr = extract_lr(constant_info.get("path")) table_rows.append({ "Function Name": function_name, "Candidate": "CSV", "Learning Rate": csv_lr, "CSV Loss": csv_info.get("csv_loss", float('inf')), "Constant Loss": csv_info.get("constant_loss", float('inf')) }) table_rows.append({ "Function Name": "", # Leave blank for the second row "Candidate": "Constant", "Learning Rate": constant_lr, "CSV Loss": constant_info.get("csv_loss", float('inf')), "Constant Loss": constant_info.get("constant_loss", float('inf')) }) df = pd.DataFrame(table_rows, columns=["Function Name", "Candidate", "Learning Rate", "CSV Loss", "Constant Loss"]) # Get the table as a formatted string table_str = df.to_string(index=False) print("\n" + table_str) # Write the dictionary and table to a file output_file = "best_time_weighting_learning_rate_sweep.txt" with open(output_file, "w") as f: f.write("Final best results (dictionary):\n") f.write(str(best_results) + "\n\n") f.write("Summary Table:\n") f.write(table_str) print(f"\nResults have been written to {output_file}")