mcu: Split _mcu_identify() into separate methods

Split up the _mcu_identify() into several internal methods separated
by functionality.

Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
This commit is contained in:
Kevin O'Connor
2025-09-19 20:34:51 -04:00
parent 07466411ac
commit ce55d41166

View File

@@ -659,7 +659,7 @@ class MCU:
self._printer.request_exit('firmware_restart') self._printer.request_exit('firmware_restart')
self._reactor.pause(self._reactor.monotonic() + 2.000) self._reactor.pause(self._reactor.monotonic() + 2.000)
raise error("Attempt MCU '%s' restart failed" % (self._name,)) raise error("Attempt MCU '%s' restart failed" % (self._name,))
def _connect_file(self, pace=False): def _attach_file(self, pace=False):
# In a debugging mode. Open debug output file and read data dictionary # In a debugging mode. Open debug output file and read data dictionary
start_args = self._printer.get_start_args() start_args = self._printer.get_start_args()
if self._name == 'mcu': if self._name == 'mcu':
@@ -775,40 +775,32 @@ class MCU:
logging.info(move_msg) logging.info(move_msg)
log_info = self._log_info() + "\n" + move_msg log_info = self._log_info() + "\n" + move_msg
self._printer.set_rollover_info(self._name, log_info, log=False) self._printer.set_rollover_info(self._name, log_info, log=False)
def _mcu_identify(self): def _attach(self):
if self.is_fileoutput(): resmeth = self._restart_method
self._connect_file() if resmeth == 'rpi_usb' and not os.path.exists(self._serialport):
else: # Try toggling usb power
resmeth = self._restart_method self._check_restart("enable power")
if resmeth == 'rpi_usb' and not os.path.exists(self._serialport): try:
# Try toggling usb power if self._canbus_iface is not None:
self._check_restart("enable power") cbid = self._printer.lookup_object('canbus_ids')
try: nodeid = cbid.get_nodeid(self._serialport)
if self._canbus_iface is not None: self._serial.connect_canbus(self._serialport, nodeid,
cbid = self._printer.lookup_object('canbus_ids') self._canbus_iface)
nodeid = cbid.get_nodeid(self._serialport) elif self._baud:
self._serial.connect_canbus(self._serialport, nodeid, # Cheetah boards require RTS to be deasserted
self._canbus_iface) # else a reset will trigger the built-in bootloader.
elif self._baud: rts = (resmeth != "cheetah")
# Cheetah boards require RTS to be deasserted self._serial.connect_uart(self._serialport, self._baud, rts)
# else a reset will trigger the built-in bootloader. else:
rts = (resmeth != "cheetah") self._serial.connect_pipe(self._serialport)
self._serial.connect_uart(self._serialport, self._baud, rts) self._clocksync.connect(self._serial)
else: except serialhdl.error as e:
self._serial.connect_pipe(self._serialport) raise error(str(e))
self._clocksync.connect(self._serial) def _post_attach_setup_shutdown(self):
except serialhdl.error as e:
raise error(str(e))
logging.info(self._log_info())
ppins = self._printer.lookup_object('pins')
pin_resolver = ppins.get_pin_resolver(self._name)
for cname, value in self.get_constants().items():
if cname.startswith("RESERVE_PINS_"):
for pin in value.split(','):
pin_resolver.reserve_pin(pin, cname[13:])
self._mcu_freq = self.get_constant_float('CLOCK_FREQ')
self._stats_sumsq_base = self.get_constant_float('STATS_SUMSQ_BASE')
self._emergency_stop_cmd = self.lookup_command("emergency_stop") self._emergency_stop_cmd = self.lookup_command("emergency_stop")
self.register_response(self._handle_shutdown, 'shutdown')
self.register_response(self._handle_shutdown, 'is_shutdown')
def _post_attach_setup_restart(self):
self._reset_cmd = self.try_lookup_command("reset") self._reset_cmd = self.try_lookup_command("reset")
self._config_reset_cmd = self.try_lookup_command("config_reset") self._config_reset_cmd = self.try_lookup_command("config_reset")
ext_only = self._reset_cmd is None and self._config_reset_cmd is None ext_only = self._reset_cmd is None and self._config_reset_cmd is None
@@ -820,13 +812,32 @@ class MCU:
self._is_mcu_bridge = True self._is_mcu_bridge = True
self._printer.register_event_handler("klippy:firmware_restart", self._printer.register_event_handler("klippy:firmware_restart",
self._firmware_restart_bridge) self._firmware_restart_bridge)
def _post_attach_setup_stats(self):
self._stats_sumsq_base = self.get_constant_float('STATS_SUMSQ_BASE')
msgparser = self._serial.get_msgparser()
version, build_versions = msgparser.get_version_info() version, build_versions = msgparser.get_version_info()
self._get_status_info['mcu_version'] = version self._get_status_info['mcu_version'] = version
self._get_status_info['mcu_build_versions'] = build_versions self._get_status_info['mcu_build_versions'] = build_versions
self._get_status_info['mcu_constants'] = msgparser.get_constants() self._get_status_info['mcu_constants'] = msgparser.get_constants()
self.register_response(self._handle_shutdown, 'shutdown')
self.register_response(self._handle_shutdown, 'is_shutdown')
self.register_response(self._handle_mcu_stats, 'stats') self.register_response(self._handle_mcu_stats, 'stats')
def _post_attach_setup_for_config(self):
self._mcu_freq = self.get_constant_float('CLOCK_FREQ')
ppins = self._printer.lookup_object('pins')
pin_resolver = ppins.get_pin_resolver(self._name)
for cname, value in self.get_constants().items():
if cname.startswith("RESERVE_PINS_"):
for pin in value.split(','):
pin_resolver.reserve_pin(pin, cname[13:])
def _mcu_identify(self):
if self.is_fileoutput():
self._attach_file()
else:
self._attach()
logging.info(self._log_info())
self._post_attach_setup_shutdown()
self._post_attach_setup_restart()
self._post_attach_setup_for_config()
self._post_attach_setup_stats()
def _ready(self): def _ready(self):
if self.is_fileoutput(): if self.is_fileoutput():
return return