Files
klipper/klippy/extras/motion_queuing.py
2025-10-18 12:28:06 -04:00

285 lines
13 KiB
Python

# Helper code for low-level motion queuing and flushing
#
# Copyright (C) 2025 Kevin O'Connor <kevin@koconnor.net>
#
# This file may be distributed under the terms of the GNU GPLv3 license.
import logging
import chelper
BGFLUSH_LOW_TIME = 0.200
BGFLUSH_HIGH_TIME = 0.400
BGFLUSH_SG_LOW_TIME = 0.450
BGFLUSH_SG_HIGH_TIME = 0.700
BGFLUSH_EXTRA_TIME = 0.250
MOVE_HISTORY_EXPIRE = 30.
MIN_KIN_TIME = 0.100
STEPCOMPRESS_FLUSH_TIME = 0.050
SDS_CHECK_TIME = 0.001 # step+dir+step filter in stepcompress.c
DRIP_SEGMENT_TIME = 0.050
DRIP_TIME = 0.100
class PrinterMotionQueuing:
def __init__(self, config):
self.printer = printer = config.get_printer()
self.reactor = printer.get_reactor()
# C trapq tracking
self.trapqs = []
ffi_main, ffi_lib = chelper.get_ffi()
self.trapq_finalize_moves = ffi_lib.trapq_finalize_moves
# C steppersync tracking
self.steppersyncmgr = ffi_main.gc(ffi_lib.steppersyncmgr_alloc(),
ffi_lib.steppersyncmgr_free)
self.syncemitters = []
self.steppersyncs = []
self.steppersyncmgr_gen_steps = ffi_lib.steppersyncmgr_gen_steps
# History expiration
self.clear_history_time = 0.
# Flush notification callbacks
self.flush_callbacks = []
# Kinematic step generation scan window time tracking
self.kin_flush_delay = SDS_CHECK_TIME
# MCU tracking
self.all_mcus = [m for n, m in printer.lookup_objects(module='mcu')]
self.mcu = self.all_mcus[0]
self.can_pause = True
if self.mcu.is_fileoutput():
self.can_pause = False
# Flush tracking
flush_handler = self._flush_handler
if not self.can_pause:
flush_handler = self._flush_handler_debug
self.flush_timer = self.reactor.register_timer(flush_handler)
self.do_kick_flush_timer = True
self.last_flush_time = self.last_step_gen_time = 0.
self.need_flush_time = self.need_step_gen_time = 0.
# "Drip" timing (for homing and probing moves)
self.drip_start_times = []
# Register handlers
printer.register_event_handler("klippy:shutdown", self._handle_shutdown)
# C trapq tracking
def allocate_trapq(self):
ffi_main, ffi_lib = chelper.get_ffi()
trapq = ffi_main.gc(ffi_lib.trapq_alloc(), ffi_lib.trapq_free)
self.trapqs.append(trapq)
return trapq
def wipe_trapq(self, trapq):
# Expire any remaining movement in the trapq (force to history list)
self.trapq_finalize_moves(trapq, self.reactor.NEVER, 0.)
def lookup_trapq_append(self):
ffi_main, ffi_lib = chelper.get_ffi()
return ffi_lib.trapq_append
# C steppersync tracking
def _lookup_steppersync(self, mcu):
for ss_mcu, ss in self.steppersyncs:
if ss_mcu is mcu:
return ss
ffi_main, ffi_lib = chelper.get_ffi()
ss = ffi_lib.steppersyncmgr_alloc_steppersync(self.steppersyncmgr)
self.steppersyncs.append((mcu, ss))
return ss
def allocate_syncemitter(self, mcu, name, alloc_stepcompress=True):
name = name.encode("utf-8")[:15]
ss = self._lookup_steppersync(mcu)
ffi_main, ffi_lib = chelper.get_ffi()
se = ffi_lib.steppersync_alloc_syncemitter(ss, name, alloc_stepcompress)
self.syncemitters.append(se)
return se
def setup_mcu_movequeue(self, mcu, serialqueue, move_count):
# Setup steppersync object for the mcu's main movequeue
ffi_main, ffi_lib = chelper.get_ffi()
ss = self._lookup_steppersync(mcu)
ffi_lib.steppersync_setup_movequeue(ss, serialqueue, move_count)
mcu_freq = float(mcu.seconds_to_clock(1.))
ffi_lib.steppersync_set_time(ss, 0., mcu_freq)
def stats(self, eventtime):
# Globally calibrate mcu clocks (and step generation clocks)
sync_time = self.last_step_gen_time
ffi_main, ffi_lib = chelper.get_ffi()
for mcu, ss in self.steppersyncs:
offset, freq = mcu.calibrate_clock(sync_time, eventtime)
ffi_lib.steppersync_set_time(ss, offset, freq)
# Calculate history expiration
est_print_time = self.mcu.estimated_print_time(eventtime)
self.clear_history_time = max(0., est_print_time - MOVE_HISTORY_EXPIRE)
return False, ""
# Flush notification callbacks
def register_flush_callback(self, callback, can_add_trapq=False):
if can_add_trapq:
self.flush_callbacks = [callback] + self.flush_callbacks
else:
self.flush_callbacks = self.flush_callbacks + [callback]
def unregister_flush_callback(self, callback):
if callback in self.flush_callbacks:
fcbs = list(self.flush_callbacks)
fcbs.remove(callback)
self.flush_callbacks = fcbs
# Kinematic step generation scan window time tracking
def get_kin_flush_delay(self):
return self.kin_flush_delay
def check_step_generation_scan_windows(self):
ffi_main, ffi_lib = chelper.get_ffi()
kin_flush_delay = SDS_CHECK_TIME
for se in self.syncemitters:
sk = ffi_lib.syncemitter_get_stepper_kinematics(se)
if sk == ffi_main.NULL:
continue
trapq = ffi_lib.itersolve_get_trapq(sk)
if trapq == ffi_main.NULL:
continue
pre_active = ffi_lib.itersolve_get_gen_steps_pre_active(sk)
post_active = ffi_lib.itersolve_get_gen_steps_post_active(sk)
kin_flush_delay = max(kin_flush_delay, pre_active, post_active)
self.kin_flush_delay = kin_flush_delay
# Flush tracking
def _handle_shutdown(self):
self.can_pause = False
def _advance_flush_time(self, want_flush_time, want_step_gen_time=0.):
flush_time = max(want_flush_time, self.last_flush_time,
want_step_gen_time - STEPCOMPRESS_FLUSH_TIME)
step_gen_time = max(want_step_gen_time, self.last_step_gen_time,
flush_time)
# Invoke flush callbacks (if any)
with self.reactor.assert_no_pause():
for cb in self.flush_callbacks:
cb(flush_time, step_gen_time)
# Determine maximum history to keep
trapq_free_time = step_gen_time - self.kin_flush_delay
clear_history_time = self.clear_history_time
if not self.can_pause:
clear_history_time = max(0., trapq_free_time - MOVE_HISTORY_EXPIRE)
# Generate stepper movement and transmit
ret = self.steppersyncmgr_gen_steps(self.steppersyncmgr, flush_time,
step_gen_time, clear_history_time)
if ret:
raise self.mcu.error("Internal error in stepcompress")
self.last_flush_time = flush_time
self.last_step_gen_time = step_gen_time
# Move processed trapq entries to history list, and expire old history
for trapq in self.trapqs:
self.trapq_finalize_moves(trapq, trapq_free_time,
clear_history_time)
def _await_flush_time(self, want_flush_time):
while 1:
if self.last_flush_time >= want_flush_time or not self.can_pause:
return
systime = self.reactor.monotonic()
est_print_time = self.mcu.estimated_print_time(systime)
wait = want_flush_time - BGFLUSH_HIGH_TIME - est_print_time
if wait <= 0.:
return
self.reactor.pause(systime + min(1., wait))
def flush_all_steps(self):
flush_time = self.need_step_gen_time
self._await_flush_time(flush_time)
self._advance_flush_time(flush_time)
def calc_step_gen_restart(self, est_print_time):
kin_time = max(est_print_time + MIN_KIN_TIME, self.last_step_gen_time)
return kin_time + self.kin_flush_delay
def _flush_handler(self, eventtime):
try:
est_print_time = self.mcu.estimated_print_time(eventtime)
aggr_sg_time = self.need_step_gen_time - 2.*self.kin_flush_delay
if self.last_step_gen_time < aggr_sg_time:
# Actively stepping - want more aggressive flushing
want_sg_time = est_print_time + BGFLUSH_SG_HIGH_TIME
batch_time = BGFLUSH_SG_HIGH_TIME - BGFLUSH_SG_LOW_TIME
next_batch_time = self.last_step_gen_time + batch_time
if next_batch_time > est_print_time:
# Improve run-to-run reproducibility by batching from last
if next_batch_time > want_sg_time + 0.005:
# Delay flushing until next wakeup
next_batch_time = self.last_step_gen_time
want_sg_time = next_batch_time
want_sg_time = min(want_sg_time, aggr_sg_time)
# Flush motion queues (if needed)
if want_sg_time > self.last_step_gen_time:
self._advance_flush_time(0., want_sg_time)
else:
# Not stepping (or only step remnants) - use relaxed flushing
want_flush_time = est_print_time + BGFLUSH_HIGH_TIME
max_flush_time = self.need_flush_time + BGFLUSH_EXTRA_TIME
want_flush_time = min(want_flush_time, max_flush_time)
# Flush motion queues (if needed)
if want_flush_time > self.last_flush_time:
self._advance_flush_time(want_flush_time)
# Reschedule timer
aggr_sg_time = self.need_step_gen_time - 2.*self.kin_flush_delay
if self.last_step_gen_time < aggr_sg_time:
waketime = self.last_step_gen_time - BGFLUSH_SG_LOW_TIME
else:
self.do_kick_flush_timer = True
max_flush_time = self.need_flush_time + BGFLUSH_EXTRA_TIME
if self.last_flush_time >= max_flush_time:
return self.reactor.NEVER
waketime = self.last_flush_time - BGFLUSH_LOW_TIME
return eventtime + waketime - est_print_time
except:
logging.exception("Exception in flush_handler")
self.printer.invoke_shutdown("Exception in flush_handler")
return self.reactor.NEVER
def _flush_handler_debug(self, eventtime):
# Use custom flushing code when in batch output mode
try:
faux_time = self.need_flush_time - 1.5
batch_time = BGFLUSH_SG_HIGH_TIME - BGFLUSH_SG_LOW_TIME
flush_count = 0
while self.last_step_gen_time < faux_time:
target = self.last_step_gen_time + batch_time
if flush_count > 100. and faux_time > target:
target += int((faux_time-target) / batch_time) * batch_time
self._advance_flush_time(0., target)
flush_count += 1
if flush_count:
return self.reactor.NOW
self._advance_flush_time(self.need_flush_time + BGFLUSH_EXTRA_TIME)
self.do_kick_flush_timer = True
return self.reactor.NEVER
except:
logging.exception("Exception in flush_handler_debug")
self.printer.invoke_shutdown("Exception in flush_handler_debug")
return self.reactor.NEVER
def note_mcu_movequeue_activity(self, mq_time, is_step_gen=True):
if is_step_gen:
mq_time += self.kin_flush_delay
self.need_step_gen_time = max(self.need_step_gen_time, mq_time)
self.need_flush_time = max(self.need_flush_time, mq_time)
if self.do_kick_flush_timer:
self.do_kick_flush_timer = False
self.reactor.update_timer(self.flush_timer, self.reactor.NOW)
# "Drip" timing (for homing and probing moves)
def drip_update_time(self, start_time, end_time, drip_completion):
self.drip_start_times.append(start_time)
self._await_flush_time(start_time)
# Disable background flushing from timer
self.reactor.update_timer(self.flush_timer, self.reactor.NEVER)
self.do_kick_flush_timer = False
self._advance_flush_time(start_time - SDS_CHECK_TIME, start_time)
# Flush in segments until drip_completion signal
flush_time = start_time
while flush_time < end_time:
if drip_completion.test():
break
curtime = self.reactor.monotonic()
est_print_time = self.mcu.estimated_print_time(curtime)
wait_time = flush_time - est_print_time - DRIP_TIME
if wait_time > 0. and self.can_pause:
# Pause before sending more steps
drip_completion.wait(curtime + wait_time)
continue
flush_time = min(flush_time + DRIP_SEGMENT_TIME, end_time)
self.note_mcu_movequeue_activity(flush_time)
self._advance_flush_time(flush_time - SDS_CHECK_TIME, flush_time)
# Restore background flushing
self.reactor.update_timer(self.flush_timer, self.reactor.NOW)
self._advance_flush_time(flush_time + self.kin_flush_delay)
self.drip_start_times.remove(start_time)
def check_drip_timing(self):
if not self.drip_start_times:
return None
return min(self.drip_start_times)
def load_config(config):
return PrinterMotionQueuing(config)