Actuator-Controller-HITL/code/HITL_Controller.py
SchrodingerError 482d724e20 Initial Commit
2024-08-14 14:42:16 -05:00

148 lines
5.8 KiB
Python

import time
from mcculw.ul import ignore_instacal
import threading
from Physics_Elements import Joint, Spring, StateSpace
from MCC_Interface import MCC3114, MCC202
from typing import List
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from Sensor_Elements import LoadCell, SharedLoadCellJoint, LoadCellSpring, SharedLoadCellSpring, DisplacementSensor
from Controller_Input_Elements import RigidActuatorController
class HITLController():
def __init__(self, stop_event, settings: dict):
self.stop_event = stop_event
self.settings = settings
self.parse_settings()
self.load_cells: List[LoadCell] = []
#self.displacement_actuators: List[DisplacementSensor] = []
self.displacement_sensors: List[DisplacementSensor] = []
self.rigid_actuator_controllers: List[RigidActuatorController] = []
def parse_settings(self):
'''Parse the settings to get the settings for the MCC devices'''
self.pull_from_sim_period = self.settings["pull_from_sim_period"] / 1000.0
self.updated_plotting_shared_attributes_period = self.settings["plotting_update_period"] / 1000.0
def attach_load_cell(self, load_cell: 'LoadCell'):
self.load_cells.append(load_cell)
def attach_displacement_sensor(self, sensor: 'DisplacementSensor'):
self.displacement_sensors.append(sensor)
def attach_rigid_actuator_controller(self, controller: 'RigidActuatorController'):
self.rigid_actuator_controllers.append(controller)
def update_from_sim(self):
# Updates the local sensors based on the sim values
for lc in self.load_cells:
lc.pull_from_sim()
# for rigid_actuator in self.displacement_actuators:
# rigid_actuator.pull_from_sim()
for sensor in self.displacement_sensors:
sensor.pull_from_sim()
def update_from_sim_thread(self):
while self.spare_stop_event.is_set() == False:
self.update_from_sim()
time.sleep(self.pull_from_sim_period / 1000)
def update_internal_sensor_attributes(self):
'''Updates internal attributes. This should be updated as fast as possible because it generates
sensor noise.'''
for lc in self.load_cells:
lc.update_internal_attributes()
# for rigid_actuator in self.displacement_actuators:
# rigid_actuator.update_internal_attributes()
for sensor in self.displacement_sensors:
sensor.update_internal_attributes()
def update_plotting_shared_attributes(self):
for lc in self.load_cells:
lc.update_shared_attributes()
for rigid_actuator_controller in self.rigid_actuator_controllers:
rigid_actuator_controller.update_shared_attributes()
# for rigid_actuator in self.displacement_actuators:
# rigid_actuator.update_shared_attributes()
for sensor in self.displacement_sensors:
sensor.update_shared_attributes()
def update_plotting_thread(self):
while self.spare_stop_event.is_set() == False:
self.update_plotting_shared_attributes()
time.sleep(self.updated_plotting_shared_attributes_period / 1000)
def update_mcc3114(self):
for lc in self.load_cells:
channel = lc.sensor_settings["output_channel"]
voltage = lc.get_noisy_voltage_scalar()
self.mcc3114.voltage_write(channel, voltage)
for sensor in self.displacement_sensors:
channel = sensor.sensor_settings["output_channel"]
voltage = sensor.get_noisy_voltage_scalar()
self.mcc3114.voltage_write(channel, voltage)
def read_from_mcc202(self):
for rigid_actuator_controller in self.rigid_actuator_controllers:
input_channel:int = rigid_actuator_controller.get_input_channel()
voltage = self.mcc202.voltage_read(channel=input_channel)
digital_channel:int = rigid_actuator_controller.get_digital_channel()
digital_command = self.mcc202.digital_read(channel=digital_channel)
rigid_actuator_controller.set_digital_command(digital_command)
rigid_actuator_controller.set_input_voltage(voltage)
rigid_actuator_controller.set_controlled_attribute()
def update_sim_targets(self):
for rigid_actuator_controller in self.rigid_actuator_controllers:
rigid_actuator_controller.update_sim_target()
def controller_loop(self):
while self.stop_event.is_set() == False:
self.loop_time = time.time()
# Update the internal attributes of the sensors
self.update_internal_sensor_attributes()
# Update MCC3114 (analog output)
self.update_mcc3114()
# Get readings from the MCC202
self.read_from_mcc202()
# Update the shared actuators for the sim to respond to
self.update_sim_targets()
def start_mcc(self):
ignore_instacal() # ONLY CALL ONCE
self.mcc3114 = MCC3114()
self.mcc202 = MCC202()
def run_process(self):
self.start_mcc()
# Make the threads for interacting with shared elements
self.spare_stop_event = threading.Event()
self.spare_stop_event.clear()
updating_plotting_elements_thread = threading.Thread(target=self.update_plotting_thread)
update_from_sim_thread = threading.Thread(target=self.update_from_sim_thread)
updating_plotting_elements_thread.start()
update_from_sim_thread.start()
self.controller_loop()
self.spare_stop_event.set()
updating_plotting_elements_thread.join()
update_from_sim_thread.join()