mcu: Add new MCUConfigHelper helper class

Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
This commit is contained in:
Kevin O'Connor
2025-09-19 22:23:40 -04:00
parent 1bb674d0b8
commit 64c155121e

View File

@@ -848,21 +848,17 @@ class MCUStatsHelper:
self._get_status_info['last_stats'] = last_stats
return False, '%s: %s' % (self._name, stats)
# Main MCU class
class MCU:
error = error
def __init__(self, config, clocksync):
# Handle process of configuring an mcu
class MCUConfigHelper:
def __init__(self, config, conn_helper):
self._printer = printer = config.get_printer()
self._clocksync = clocksync
self._conn_helper = conn_helper
self._mcu = mcu = conn_helper.get_mcu()
self._serial = conn_helper.get_serial()
self._clocksync = conn_helper.get_clocksync()
self._reactor = printer.get_reactor()
self._name = config.get_name()
if self._name.startswith('mcu '):
self._name = self._name[4:]
# Low-level connection
self._conn_helper = MCUConnectHelper(config, self, clocksync)
self._serial = self._conn_helper.get_serial()
# Config building
printer.lookup_object('pins').register_chip(self._name, self)
self._name = mcu.get_name()
printer.lookup_object('pins').register_chip(self._name, mcu)
self._oid_count = 0
self._config_callbacks = []
self._config_cmds = []
@@ -870,18 +866,7 @@ class MCU:
self._init_cmds = []
self._mcu_freq = 0.
self._reserved_move_slots = 0
# Alter time reporting when debugging
if self.is_fileoutput():
def dummy_estimated_print_time(eventtime):
return 0.
self.estimated_print_time = dummy_estimated_print_time
# Register handlers
self._stats_helper = MCUStatsHelper(self, self._conn_helper)
printer.load_object(config, "error_mcu")
printer.register_event_handler("klippy:mcu_identify",
self._mcu_identify)
printer.register_event_handler("klippy:connect", self._connect)
# Serial callbacks
def _send_config(self, prev_crc):
# Build config commands
for cb in self._config_callbacks:
@@ -923,10 +908,10 @@ class MCU:
% (enum_value, self._name))
raise
def _send_get_config(self):
get_config_cmd = self.lookup_query_command(
get_config_cmd = self._mcu.lookup_query_command(
"get_config",
"config is_config=%c crc=%u is_shutdown=%c move_count=%hu")
if self.is_fileoutput():
if self._mcu.is_fileoutput():
return { 'is_config': 0, 'move_count': 500, 'crc': 0 }
config_params = get_config_cmd.send()
if self._conn_helper.is_shutdown():
@@ -943,7 +928,7 @@ class MCU:
# Not configured - send config and issue get_config again
self._send_config(None)
config_params = self._send_get_config()
if not config_params['is_config'] and not self.is_fileoutput():
if not config_params['is_config'] and not self._mcu.is_fileoutput():
raise error("Unable to configure MCU '%s'" % (self._name,))
else:
start_reason = self._printer.get_start_args().get("start_reason")
@@ -959,31 +944,27 @@ class MCU:
ss_move_count = move_count - self._reserved_move_slots
motion_queuing = self._printer.lookup_object('motion_queuing')
motion_queuing.setup_mcu_movequeue(
self, self._serial.get_serialqueue(), ss_move_count)
self._mcu, self._serial.get_serialqueue(), ss_move_count)
# Log config information
move_msg = "Configured MCU '%s' (%d moves)" % (self._name, move_count)
logging.info(move_msg)
log_info = self._conn_helper.log_info() + "\n" + move_msg
self._printer.set_rollover_info(self._name, log_info, log=False)
def _post_attach_setup_for_config(self):
self._mcu_freq = self.get_constant_float('CLOCK_FREQ')
def post_attach_setup_for_config(self):
self._mcu_freq = self._mcu.get_constant_float('CLOCK_FREQ')
ppins = self._printer.lookup_object('pins')
pin_resolver = ppins.get_pin_resolver(self._name)
for cname, value in self.get_constants().items():
for cname, value in self._mcu.get_constants().items():
if cname.startswith("RESERVE_PINS_"):
for pin in value.split(','):
pin_resolver.reserve_pin(pin, cname[13:])
def _mcu_identify(self):
self._conn_helper.start_attach()
self._post_attach_setup_for_config()
self._stats_helper.post_attach_setup_stats()
# Config creation helpers
def setup_pin(self, pin_type, pin_params):
pcs = {'endstop': MCU_endstop,
'digital_out': MCU_digital_out, 'pwm': MCU_pwm, 'adc': MCU_adc}
if pin_type not in pcs:
raise pins.error("pin type %s not supported on mcu" % (pin_type,))
return pcs[pin_type](self, pin_params)
return pcs[pin_type](self._mcu, pin_params)
def create_oid(self):
self._oid_count += 1
return self._oid_count - 1
@@ -998,10 +979,54 @@ class MCU:
self._config_cmds.append(cmd)
def get_query_slot(self, oid):
slot = self.seconds_to_clock(oid * .01)
t = int(self.estimated_print_time(self._reactor.monotonic()) + 1.5)
return self.print_time_to_clock(t) + slot
t = int(self._mcu.estimated_print_time(self._reactor.monotonic()) + 1.5)
return self._mcu.print_time_to_clock(t) + slot
def seconds_to_clock(self, time):
return int(time * self._mcu_freq)
def request_move_queue_slot(self):
self._reserved_move_slots += 1
# Main MCU class
class MCU:
error = error
def __init__(self, config, clocksync):
self._printer = printer = config.get_printer()
self._clocksync = clocksync
self._name = config.get_name()
if self._name.startswith('mcu '):
self._name = self._name[4:]
# Low-level connection
self._conn_helper = MCUConnectHelper(config, self, clocksync)
self._serial = self._conn_helper.get_serial()
self._config_helper = MCUConfigHelper(self, self._conn_helper)
# Alter time reporting when debugging
if self.is_fileoutput():
def dummy_estimated_print_time(eventtime):
return 0.
self.estimated_print_time = dummy_estimated_print_time
# Register handlers
self._stats_helper = MCUStatsHelper(self, self._conn_helper)
printer.load_object(config, "error_mcu")
printer.register_event_handler("klippy:mcu_identify",
self._mcu_identify)
def _mcu_identify(self):
self._conn_helper.start_attach()
self._config_helper.post_attach_setup_for_config()
self._stats_helper.post_attach_setup_stats()
def setup_pin(self, pin_type, pin_params):
return self._config_helper.setup_pin(pin_type, pin_params)
def create_oid(self):
return self._config_helper.create_oid()
def register_config_callback(self, cb):
self._config_helper.register_config_callback(cb)
def add_config_cmd(self, cmd, is_init=False, on_restart=False):
self._config_helper.add_config_cmd(cmd, is_init, on_restart)
def request_move_queue_slot(self):
self._config_helper.request_move_queue_slot()
def get_query_slot(self, oid):
return self._config_helper.get_query_slot(oid)
def seconds_to_clock(self, time):
return self._config_helper.seconds_to_clock(time)
def min_schedule_time(self):
return MIN_SCHEDULE_TIME
def max_nominal_duration(self):
@@ -1041,9 +1066,6 @@ class MCU:
return self._clocksync.estimated_print_time(eventtime)
def clock32_to_clock64(self, clock32):
return self._clocksync.clock32_to_clock64(clock32)
# Move queue tracking
def request_move_queue_slot(self):
self._reserved_move_slots += 1
def calibrate_clock(self, print_time, eventtime):
offset, freq = self._clocksync.calibrate_clock(print_time, eventtime)
self._conn_helper.check_timeout(eventtime)