148 lines
5.8 KiB
Python
148 lines
5.8 KiB
Python
import time
|
|
from mcculw.ul import ignore_instacal
|
|
|
|
import threading
|
|
|
|
from Physics_Elements import Joint, Spring, StateSpace
|
|
from MCC_Interface import MCC3114, MCC202
|
|
|
|
from typing import List
|
|
from typing import TYPE_CHECKING
|
|
if TYPE_CHECKING:
|
|
from Sensor_Elements import LoadCell, SharedLoadCellJoint, LoadCellSpring, SharedLoadCellSpring, DisplacementSensor
|
|
from Controller_Input_Elements import RigidActuatorController
|
|
|
|
class HITLController():
|
|
def __init__(self, stop_event, settings: dict):
|
|
self.stop_event = stop_event
|
|
self.settings = settings
|
|
|
|
self.parse_settings()
|
|
|
|
self.load_cells: List[LoadCell] = []
|
|
#self.displacement_actuators: List[DisplacementSensor] = []
|
|
self.displacement_sensors: List[DisplacementSensor] = []
|
|
|
|
self.rigid_actuator_controllers: List[RigidActuatorController] = []
|
|
|
|
def parse_settings(self):
|
|
'''Parse the settings to get the settings for the MCC devices'''
|
|
self.pull_from_sim_period = self.settings["pull_from_sim_period"] / 1000.0
|
|
self.updated_plotting_shared_attributes_period = self.settings["plotting_update_period"] / 1000.0
|
|
|
|
def attach_load_cell(self, load_cell: 'LoadCell'):
|
|
self.load_cells.append(load_cell)
|
|
|
|
def attach_displacement_sensor(self, sensor: 'DisplacementSensor'):
|
|
self.displacement_sensors.append(sensor)
|
|
|
|
def attach_rigid_actuator_controller(self, controller: 'RigidActuatorController'):
|
|
self.rigid_actuator_controllers.append(controller)
|
|
|
|
def update_from_sim(self):
|
|
# Updates the local sensors based on the sim values
|
|
for lc in self.load_cells:
|
|
lc.pull_from_sim()
|
|
# for rigid_actuator in self.displacement_actuators:
|
|
# rigid_actuator.pull_from_sim()
|
|
for sensor in self.displacement_sensors:
|
|
sensor.pull_from_sim()
|
|
|
|
def update_from_sim_thread(self):
|
|
while self.spare_stop_event.is_set() == False:
|
|
self.update_from_sim()
|
|
time.sleep(self.pull_from_sim_period / 1000)
|
|
|
|
def update_internal_sensor_attributes(self):
|
|
'''Updates internal attributes. This should be updated as fast as possible because it generates
|
|
sensor noise.'''
|
|
for lc in self.load_cells:
|
|
lc.update_internal_attributes()
|
|
# for rigid_actuator in self.displacement_actuators:
|
|
# rigid_actuator.update_internal_attributes()
|
|
for sensor in self.displacement_sensors:
|
|
sensor.update_internal_attributes()
|
|
|
|
def update_plotting_shared_attributes(self):
|
|
for lc in self.load_cells:
|
|
lc.update_shared_attributes()
|
|
for rigid_actuator_controller in self.rigid_actuator_controllers:
|
|
rigid_actuator_controller.update_shared_attributes()
|
|
# for rigid_actuator in self.displacement_actuators:
|
|
# rigid_actuator.update_shared_attributes()
|
|
for sensor in self.displacement_sensors:
|
|
sensor.update_shared_attributes()
|
|
|
|
def update_plotting_thread(self):
|
|
while self.spare_stop_event.is_set() == False:
|
|
self.update_plotting_shared_attributes()
|
|
time.sleep(self.updated_plotting_shared_attributes_period / 1000)
|
|
|
|
def update_mcc3114(self):
|
|
for lc in self.load_cells:
|
|
channel = lc.sensor_settings["output_channel"]
|
|
voltage = lc.get_noisy_voltage_scalar()
|
|
self.mcc3114.voltage_write(channel, voltage)
|
|
for sensor in self.displacement_sensors:
|
|
channel = sensor.sensor_settings["output_channel"]
|
|
voltage = sensor.get_noisy_voltage_scalar()
|
|
self.mcc3114.voltage_write(channel, voltage)
|
|
|
|
def read_from_mcc202(self):
|
|
for rigid_actuator_controller in self.rigid_actuator_controllers:
|
|
input_channel:int = rigid_actuator_controller.get_input_channel()
|
|
voltage = self.mcc202.voltage_read(channel=input_channel)
|
|
digital_channel:int = rigid_actuator_controller.get_digital_channel()
|
|
digital_command = self.mcc202.digital_read(channel=digital_channel)
|
|
|
|
rigid_actuator_controller.set_digital_command(digital_command)
|
|
rigid_actuator_controller.set_input_voltage(voltage)
|
|
rigid_actuator_controller.set_controlled_attribute()
|
|
|
|
def update_sim_targets(self):
|
|
for rigid_actuator_controller in self.rigid_actuator_controllers:
|
|
rigid_actuator_controller.update_sim_target()
|
|
|
|
def controller_loop(self):
|
|
while self.stop_event.is_set() == False:
|
|
self.loop_time = time.time()
|
|
|
|
# Update the internal attributes of the sensors
|
|
self.update_internal_sensor_attributes()
|
|
|
|
# Update MCC3114 (analog output)
|
|
self.update_mcc3114()
|
|
|
|
# Get readings from the MCC202
|
|
self.read_from_mcc202()
|
|
|
|
# Update the shared actuators for the sim to respond to
|
|
self.update_sim_targets()
|
|
|
|
|
|
def start_mcc(self):
|
|
ignore_instacal() # ONLY CALL ONCE
|
|
self.mcc3114 = MCC3114()
|
|
self.mcc202 = MCC202()
|
|
|
|
def run_process(self):
|
|
self.start_mcc()
|
|
|
|
# Make the threads for interacting with shared elements
|
|
self.spare_stop_event = threading.Event()
|
|
self.spare_stop_event.clear()
|
|
|
|
updating_plotting_elements_thread = threading.Thread(target=self.update_plotting_thread)
|
|
update_from_sim_thread = threading.Thread(target=self.update_from_sim_thread)
|
|
|
|
updating_plotting_elements_thread.start()
|
|
update_from_sim_thread.start()
|
|
|
|
self.controller_loop()
|
|
|
|
self.spare_stop_event.set()
|
|
|
|
updating_plotting_elements_thread.join()
|
|
update_from_sim_thread.join()
|
|
|