Actuator-Controller-HITL/code/Visualization.py
SchrodingerError 482d724e20 Initial Commit
2024-08-14 14:42:16 -05:00

594 lines
31 KiB
Python

import vpython
import time
from copy import deepcopy
import multiprocessing
import threading
import pyqtgraph as pg
from pyqtgraph.Qt import QtWidgets, QtCore
from Physics_Elements import Joint, Spring, SharedRigidActuator, SharedSpring, SharedJoint
from Sensor_Elements import SharedLoadCellJoint, SharedLoadCellSpring, SharedDisplacementSensor
from Controller_Input_Elements import SharedRigidActuatorController
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from Object_Sharing import SharedFloat
'''
TODO
2. Make this in a separate thread
'''
class Visualization():
def __init__(self, stop_event, scene_settings:dict):
self.stop_event = stop_event
self.scene_settings = scene_settings
self.attached_attributes = []
self.shared_attributes_object_settings = []
self.vpython_objects = []
def draw_scene(self):
vpython.scene = vpython.canvas(width=self.scene_settings["canvas_width"], height=self.scene_settings["canvas_height"])
side = self.scene_settings["cube_size"]
thk = self.scene_settings["wall_thickness"]
wall_length = side + 2*thk
wall_bottom = vpython.box(pos=vpython.vector(0, -thk/2, 0), size=vpython.vector(wall_length, thk, wall_length), color=vpython.color.gray(0.9))
wall_left = vpython.box(pos=vpython.vector(-(side + thk)/2, side/2, 0), size=vpython.vector(thk, side, side), color=vpython.color.gray(0.7))
wall_right = vpython.box(pos=vpython.vector((side + thk)/2, side/2, 0), size=vpython.vector(thk, side, side), color=vpython.color.gray(0.7))
wall_back = vpython.box(pos=vpython.vector(0, (side)/2, -(side+ thk)/2), size=vpython.vector(wall_length, side, thk), color=vpython.color.gray(0.7))
def attach_shared_attributes(self, shared_attributes, object_settings):
self.attached_attributes.append(shared_attributes)
self.shared_attributes_object_settings.append(object_settings)
shared_attributes.set_connected_to_visualization(True)
def generate_vpython_objects(self):
for i in range(len(self.attached_attributes)):
object_settings = self.shared_attributes_object_settings[i]
if object_settings["type"] == "sphere":
shared_attribute:Joint = self.attached_attributes[i]
object_pos = deepcopy(shared_attribute.get_pos())
object_pos = vpython.vector(object_pos[0], object_pos[1], object_pos[2])
object_color = self.return_vpython_color(object_settings["color"])
self.vpython_objects.append(vpython.sphere(pos=object_pos, radius=object_settings["radius"], color=object_color))
elif object_settings["type"] == "helix":
shared_attribute:Spring = self.attached_attributes[i]
object_pos = deepcopy(shared_attribute.get_pos())
object_pos = vpython.vector(object_pos[0], object_pos[1], object_pos[2])
object_axis = deepcopy(shared_attribute.get_axis_vector())
object_axis = vpython.vector(object_axis[0], object_axis[1], object_axis[2])
object_color = self.return_vpython_color(object_settings["color"])
self.vpython_objects.append(vpython.helix(pos=object_pos, axis=object_axis, radius=object_settings["radius"], thickness=object_settings["thickness"], color=object_color))
elif object_settings["type"] == "cylinder":
shared_attribute:Spring = self.attached_attributes[i]
object_pos = deepcopy(shared_attribute.get_pos())
object_pos = vpython.vector(object_pos[0], object_pos[1], object_pos[2])
object_axis = deepcopy(shared_attribute.get_axis_vector())
object_axis = vpython.vector(object_axis[0], object_axis[1], object_axis[2])
object_color = self.return_vpython_color(object_settings["color"])
self.vpython_objects.append(vpython.cylinder(pos=object_pos, axis=object_axis, radius=object_settings["radius"], color=object_color))
def return_vpython_color(self, color_str):
if color_str == "blue":
return vpython.color.blue
elif color_str == "black":
return vpython.color.black
elif color_str == "red":
return vpython.color.red
elif color_str == "green":
return vpython.color.green
elif color_str == "white":
return vpython.color.white
def update_scene(self):
for i in range(len(self.vpython_objects)):
element = self.vpython_objects[i]
shared_attributes = self.attached_attributes[i]
if isinstance(element, vpython.sphere):
updated_pos = shared_attributes.get_pos()
element.pos = deepcopy(vpython.vector(*(updated_pos)))
elif isinstance(element, vpython.helix):
updated_pos = shared_attributes.get_pos()
element.pos = deepcopy(vpython.vector(*(updated_pos)))
updated_axis = shared_attributes.get_axis_vector()
element.axis = deepcopy(vpython.vector(*(updated_axis)))
elif isinstance(element, vpython.cylinder):
updated_pos = shared_attributes.get_pos()
element.pos = deepcopy(vpython.vector(*(updated_pos)))
updated_axis = shared_attributes.get_axis_vector()
element.axis = deepcopy(vpython.vector(*(updated_axis)))
def run_process(self):
# Draw scene
self.draw_scene()
# Generate vpython objects from settings
self.generate_vpython_objects()
while self.stop_event.is_set() == False:
# Update pos and axes of all vpython objects
self.update_scene()
class Plotter():
def __init__(self, plot_settings: dict, stop_event: multiprocessing.Event):
self.plot_settings = plot_settings
self.title = plot_settings["title"]
self.window_length = self.plot_settings["window_length"]
self.update_interval = self.plot_settings["update_interval"]
self.pull_interval = self.plot_settings["pull_interval"]
self.stop_event: multiprocessing.Event = stop_event
self.attached_attributes = []
self.shared_attributes_plot_settings = []
self.pull_data_stop_event: threading.Event = None
self.shared_x: SharedFloat = None
self.plot_setup = {}
def attach_shared_attributes(self, shared_attributes, plot_settings):
self.attached_attributes.append(shared_attributes)
self.shared_attributes_plot_settings.append(plot_settings)
shared_attributes.set_connected_to_plotter(True)
def attach_shared_x(self, shared_x):
self.shared_x = shared_x
def pull_data_thread(self):
while self.pull_data_stop_event.is_set() == False:
self.pull_data()
time.sleep(self.pull_interval / 1000)
def pull_data(self):
self.raw_xdata.append(self.shared_x.get())
for i in range(len(self.shared_attributes_plot_settings)):
foo = self.attached_attributes[i]._getvalue()
for key, value in self.shared_attributes_plot_settings[i].items():
if isinstance(foo, SharedLoadCellJoint):
bar: SharedLoadCellJoint = self.attached_attributes[i]
if key == "force":
force_vector = bar.get_true_force_vector()
for subkey, settings in value.items():
if subkey == "x":
ylabel: str = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(force_vector[0])
elif subkey == "y":
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(force_vector[1])
elif subkey == "z":
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(force_vector[2])
elif key == "true_voltage":
voltage_scalar = bar.get_true_voltage_scalar()
for subkey, settings in value.items():
if subkey == "scalar":
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(voltage_scalar)
elif key == "noisy_voltage":
voltage_scalar = bar.get_noisy_voltage_scalar()
for subkey, settings in value.items():
if subkey == "scalar":
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(voltage_scalar)
elif isinstance(foo, SharedLoadCellSpring):
bar: SharedLoadCellSpring = self.attached_attributes[i]
if key == "force":
force_vector = bar.get_true_force_vector()
for subkey, settings in value.items():
if subkey == "x":
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(force_vector[0])
elif subkey == "y":
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(force_vector[1])
elif subkey == "z":
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(force_vector[2])
elif subkey == "scalar":
force_scalar = bar.get_true_force_scalar()
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(force_scalar)
elif key == "true_voltage":
voltage_scalar = bar.get_true_voltage_scalar()
for subkey, settings in value.items():
if subkey == "scalar":
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(voltage_scalar)
elif key == "noisy_voltage":
voltage_scalar = bar.get_noisy_voltage_scalar()
for subkey, settings in value.items():
if subkey == "scalar":
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(voltage_scalar)
elif isinstance(foo, SharedRigidActuator):
bar: SharedRigidActuator = self.attached_attributes[i]
if key == "pos":
pos_vector = bar.get_pos()
for subkey, settings in value.items():
if subkey == "x":
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(pos_vector[0])
elif subkey == "y":
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(pos_vector[1])
elif subkey == "z":
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(pos_vector[2])
if key == "vel":
pos_vector = self.attached_attributes[i].get_vel()
for subkey, settings in value.items():
if subkey == "x":
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(pos_vector[0])
elif subkey == "y":
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(pos_vector[1])
elif subkey == "z":
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(pos_vector[2])
elif isinstance(foo, SharedRigidActuatorController):
bar: SharedRigidActuatorController = self.attached_attributes[i]
if key == "input_voltage":
input_voltage = bar.get_input_voltage()
for subkey, settings in value.items():
if subkey == "scalar":
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(input_voltage)
elif key == "digital_input":
digital_input = bar.get_digital_command()
for subkey, settings in value.items():
if subkey == "scalar":
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(digital_input)
elif key == "pos":
controlled_vel = bar.get_controlled_pos()
for subkey, settings in value.items():
if subkey == "x":
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(controlled_vel[0])
elif subkey == "y":
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(controlled_vel[1])
elif subkey == "z":
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(controlled_vel[2])
elif key == "vel":
controlled_vel = bar.get_controlled_vel()
for subkey, settings in value.items():
if subkey == "x":
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(controlled_vel[0])
elif subkey == "y":
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(controlled_vel[1])
elif subkey == "z":
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(controlled_vel[2])
if subkey == "scalar":
vel_scalar = bar.get_controlled_vel_scalar()
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(vel_scalar)
elif key == "disp":
disp_scalar = bar.get_controlled_length()
for subkey, settings in value.items():
if subkey == "scalar":
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(disp_scalar)
elif isinstance(foo, SharedDisplacementSensor):
bar: SharedDisplacementSensor = self.attached_attributes[i]
if key == "voltage":
for subkey, settings in value.items():
if subkey == "noisy":
voltage_scalar = bar.get_noisy_voltage()
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(voltage_scalar)
elif subkey == "true":
voltage_scalar = bar.get_true_voltage()
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(voltage_scalar)
if key == "disp":
disp_scalar = bar.get_displacement()
for subkey, settings in value.items():
if subkey == "scalar":
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(disp_scalar)
elif key == "length":
length_scalar = bar.get_current_length()
for subkey, settings in value.items():
if subkey == "scalar":
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(length_scalar)
elif isinstance(foo, SharedSpring):
bar: SharedSpring = self.attached_attributes[i]
if key == "force":
force_vector = bar.get_spring_force()
for subkey, settings in value.items():
if subkey == "x":
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(force_vector[0])
elif subkey == "y":
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(force_vector[1])
elif subkey == "z":
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(force_vector[2])
elif subkey == "scalar":
force_scalar = bar.get_spring_force_scalar()
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(force_scalar)
elif key == "length":
length = bar.get_length()
for subkey, settings in value.items():
if subkey == "scalar":
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(length)
elif isinstance(foo, SharedJoint):
bar: SharedJoint = self.attached_attributes[i]
if key == "accel":
accel_vector = bar.get_accel()
for subkey, settings in value.items():
if subkey == "x":
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(accel_vector[0])
elif subkey == "y":
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(accel_vector[1])
elif subkey == "z":
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(accel_vector[2])
elif key == "vel":
vel_vector = bar.get_vel()
for subkey, settings in value.items():
if subkey == "x":
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(vel_vector[0])
elif subkey == "y":
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(vel_vector[1])
elif subkey == "z":
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(vel_vector[2])
elif key == "pos":
pos_vector = bar.get_pos()
for subkey, settings in value.items():
if subkey == "x":
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(pos_vector[0])
elif subkey == "y":
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(pos_vector[1])
if subkey == "z":
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(pos_vector[2])
elif key == "force":
force_vector = bar.get_spring_force()
for subkey, settings in value.items():
if subkey == "x":
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(force_vector[0])
elif subkey == "y":
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(force_vector[1])
elif subkey == "z":
ylabel = settings["ylabel"]
(subplot, line) = self.raw_data_map[ylabel]
self.raw_ydata[subplot][line].append(force_vector[2])
def update_live_window(self):
if self.stop_event.is_set():
self.timer.stop()
self.pull_data_stop_event.set()
return
final_time = self.raw_xdata[-1]
target_time = final_time - self.window_length
closest_index = min(range(len(self.raw_xdata)), key=lambda i: abs(self.raw_xdata[i] - target_time))
self.window_xdata = self.raw_xdata[closest_index:]
# Attach the data to the appropriate sublines in the subplots
for i in range(self.num_plots):
for j in range(len(self.subplot_keys[i])):
window_ydata = self.raw_ydata[i][j][closest_index:]
if self.plot_settings["step_or_plot"] == "plot":
if len(self.window_xdata) == len(window_ydata):
self.subplot_curves[i][j].setData(self.window_xdata, window_ydata)
elif len(self.window_xdata) > len(window_ydata):
self.subplot_curves[i][j].setData(self.window_xdata[:-1], window_ydata)
elif self.plot_settings["step_or_plot"] == "step":
if len(self.window_xdata) == len(window_ydata):
self.subplot_curves[i][j].setData(self.window_xdata, window_ydata[:-1])
elif len(self.window_xdata) > len(window_ydata):
self.subplot_curves[i][j].setData(self.window_xdata[:-1], window_ydata[:-1])
def generate_plots(self):
# Create the application
self.app = QtWidgets.QApplication([])
# Create a plot window
self.win = pg.GraphicsLayoutWidget(show=True, title=self.title)
self.win.resize(1000, 600)
self.win.setWindowTitle(self.plot_settings["title"])
self.num_plots = self.plot_settings["num_subplots"]
self.plots = []
for i in range(self.num_plots):
plot = self.win.addPlot(title=self.plot_settings["subplots"][i]["ylabel"])
plot.setLabel('left', self.plot_settings["subplots"][i]["ylabel"])
plot.addLegend()
self.plots.append(plot)
# Move us to the next row for the subplot
if i < self.num_plots - 1:
self.win.nextRow()
def generate_curves(self):
self.subplot_keys = [[] for _ in range(self.num_plots)] # list for each subplot
self.subplot_curves = [[] for _ in range(self.num_plots)] # list for each subplot
self.raw_xdata = []
self.raw_ydata = [[] for _ in range(self.num_plots)] # list for each subplot
self.raw_data_map = {}
for setting in self.shared_attributes_plot_settings:
'''
Each setting looks like {
"accel": {
"x": {
"subplot": 1,
"ylabel": "Main Mass x-Accel"
},
"y": {
"subplot": 1,
"ylabel": "Main Mass y-Accel"
}
},
"vel": {
"x": {
"subplot": 2,
"ylabel": "Main Mass x-Vel"
}
},
"pos": {
"x": {
"subplot": 3,
"ylabel": "Main Mass x-Vel"
}
}
}
'''
for key, value in setting.items():
''' key would be like "accel"
value would be like {
"x": {
"subplot": 1,
"ylabel": "Main Mass x-Accel"
},
"y": {
"subplot": 1,
"ylabel": "Main Mass y-Accel"
}
}
'''
for line, line_settings in value.items():
''' line would be like "x"
line_settings would be like {
"subplot": 1,
"ylabel": "Main Mass x-Accel",
"color": "w"
}
'''
subplot = line_settings["subplot"]
label = line_settings["ylabel"]
color = line_settings.get("color", "w") # Default to white if no color is specified
self.subplot_keys[subplot].append(label)
if self.plot_settings["step_or_plot"] == "plot":
self.subplot_curves[subplot].append(self.plots[subplot].plot(name=label, pen=pg.mkPen(color=color)))
elif self.plot_settings["step_or_plot"] == "step":
self.subplot_curves[subplot].append(self.plots[subplot].plot(name=label, stepMode=True, pen=pg.mkPen(color=color)))
self.raw_data_map[label] = (subplot, len(self.raw_ydata[subplot]))
self.raw_ydata[subplot].append([])
self.window_xdata = []
self.window_ydata = [[] for _ in range(self.num_plots)] # list for each subplot
def show_plot(self):
pass
def run_process(self):
# Generate the figure and subplots
self.generate_plots()
# Generate the curves
self.generate_curves()
self.pull_data_stop_event = threading.Event()
self.pull_data_stop_event.clear()
pull_data_thread = threading.Thread(target=self.pull_data_thread)
pull_data_thread.start()
self.timer = QtCore.QTimer()
self.timer.timeout.connect(self.update_live_window)
self.timer.start(self.update_interval)
# Start the Qt event loop
QtWidgets.QApplication.instance().exec_()
pull_data_thread.join()