mirror of
				https://github.com/Klipper3d/klipper.git
				synced 2025-10-31 18:36:09 +01:00 
			
		
		
		
	Remove support for builtin pin aliases from the console.py tool. Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
		
			
				
	
	
		
			241 lines
		
	
	
		
			9.5 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			241 lines
		
	
	
		
			9.5 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
| #!/usr/bin/env python2
 | |
| # Script to implement a test console with firmware over serial port
 | |
| #
 | |
| # Copyright (C) 2016-2021  Kevin O'Connor <kevin@koconnor.net>
 | |
| #
 | |
| # This file may be distributed under the terms of the GNU GPLv3 license.
 | |
| import sys, optparse, os, re, logging
 | |
| import util, reactor, serialhdl, pins, msgproto, clocksync
 | |
| 
 | |
| help_txt = """
 | |
|   This is a debugging console for the Klipper micro-controller.
 | |
|   In addition to mcu commands, the following artificial commands are
 | |
|   available:
 | |
|     PINS  : Load pin name aliases (eg, "PINS arduino")
 | |
|     DELAY : Send a command at a clock time (eg, "DELAY 9999 get_uptime")
 | |
|     FLOOD : Send a command many times (eg, "FLOOD 22 .01 get_uptime")
 | |
|     SUPPRESS : Suppress a response message (eg, "SUPPRESS analog_in_state 4")
 | |
|     SET   : Create a local variable (eg, "SET myvar 123.4")
 | |
|     STATS : Report serial statistics
 | |
|     LIST  : List available mcu commands, local commands, and local variables
 | |
|     HELP  : Show this text
 | |
|   All commands also support evaluation by enclosing an expression in { }.
 | |
|   For example, "reset_step_clock oid=4 clock={clock + freq}".  In addition
 | |
|   to user defined variables (via the SET command) the following builtin
 | |
|   variables may be used in expressions:
 | |
|     clock : The current mcu clock time (as estimated by the host)
 | |
|     freq  : The mcu clock frequency
 | |
| """
 | |
| 
 | |
| re_eval = re.compile(r'\{(?P<eval>[^}]*)\}')
 | |
| 
 | |
| class KeyboardReader:
 | |
|     def __init__(self, reactor, serialport, baud, canbus_iface, canbus_nodeid):
 | |
|         self.serialport = serialport
 | |
|         self.baud = baud
 | |
|         self.canbus_iface = canbus_iface
 | |
|         self.canbus_nodeid = canbus_nodeid
 | |
|         self.ser = serialhdl.SerialReader(reactor)
 | |
|         self.reactor = reactor
 | |
|         self.start_time = reactor.monotonic()
 | |
|         self.clocksync = clocksync.ClockSync(self.reactor)
 | |
|         self.fd = sys.stdin.fileno()
 | |
|         util.set_nonblock(self.fd)
 | |
|         self.mcu_freq = 0
 | |
|         self.pins = pins.PinResolver(validate_aliases=False)
 | |
|         self.data = ""
 | |
|         reactor.register_fd(self.fd, self.process_kbd)
 | |
|         reactor.register_callback(self.connect)
 | |
|         self.local_commands = {
 | |
|             "SET": self.command_SET,
 | |
|             "DELAY": self.command_DELAY, "FLOOD": self.command_FLOOD,
 | |
|             "SUPPRESS": self.command_SUPPRESS, "STATS": self.command_STATS,
 | |
|             "LIST": self.command_LIST, "HELP": self.command_HELP,
 | |
|         }
 | |
|         self.eval_globals = {}
 | |
|     def connect(self, eventtime):
 | |
|         self.output(help_txt)
 | |
|         self.output("="*20 + " attempting to connect " + "="*20)
 | |
|         if self.canbus_iface is not None:
 | |
|             self.ser.connect_canbus(self.serialport, self.canbus_nodeid,
 | |
|                                     self.canbus_iface)
 | |
|         elif self.baud:
 | |
|             self.ser.connect_uart(self.serialport, self.baud)
 | |
|         else:
 | |
|             self.ser.connect_pipe(self.serialport)
 | |
|         msgparser = self.ser.get_msgparser()
 | |
|         message_count = len(msgparser.get_messages())
 | |
|         version, build_versions = msgparser.get_version_info()
 | |
|         self.output("Loaded %d commands (%s / %s)"
 | |
|                     % (message_count, version, build_versions))
 | |
|         self.output("MCU config: %s" % (" ".join(
 | |
|             ["%s=%s" % (k, v) for k, v in msgparser.get_constants().items()])))
 | |
|         self.clocksync.connect(self.ser)
 | |
|         self.ser.handle_default = self.handle_default
 | |
|         self.ser.register_response(self.handle_output, '#output')
 | |
|         self.mcu_freq = msgparser.get_constant_float('CLOCK_FREQ')
 | |
|         self.output("="*20 + "       connected       " + "="*20)
 | |
|         return self.reactor.NEVER
 | |
|     def output(self, msg):
 | |
|         sys.stdout.write("%s\n" % (msg,))
 | |
|         sys.stdout.flush()
 | |
|     def handle_default(self, params):
 | |
|         tdiff = params['#receive_time'] - self.start_time
 | |
|         msg = self.ser.get_msgparser().format_params(params)
 | |
|         self.output("%07.3f: %s" % (tdiff, msg))
 | |
|     def handle_output(self, params):
 | |
|         tdiff = params['#receive_time'] - self.start_time
 | |
|         self.output("%07.3f: %s: %s" % (tdiff, params['#name'], params['#msg']))
 | |
|     def handle_suppress(self, params):
 | |
|         pass
 | |
|     def update_evals(self, eventtime):
 | |
|         self.eval_globals['freq'] = self.mcu_freq
 | |
|         self.eval_globals['clock'] = self.clocksync.get_clock(eventtime)
 | |
|     def command_SET(self, parts):
 | |
|         val = parts[2]
 | |
|         try:
 | |
|             val = float(val)
 | |
|         except ValueError:
 | |
|             pass
 | |
|         self.eval_globals[parts[1]] = val
 | |
|     def command_DELAY(self, parts):
 | |
|         try:
 | |
|             val = int(parts[1])
 | |
|         except ValueError as e:
 | |
|             self.output("Error: %s" % (str(e),))
 | |
|             return
 | |
|         try:
 | |
|             self.ser.send(' '.join(parts[2:]), minclock=val)
 | |
|         except msgproto.error as e:
 | |
|             self.output("Error: %s" % (str(e),))
 | |
|             return
 | |
|     def command_FLOOD(self, parts):
 | |
|         try:
 | |
|             count = int(parts[1])
 | |
|             delay = float(parts[2])
 | |
|         except ValueError as e:
 | |
|             self.output("Error: %s" % (str(e),))
 | |
|             return
 | |
|         msg = ' '.join(parts[3:])
 | |
|         delay_clock = int(delay * self.mcu_freq)
 | |
|         msg_clock = int(self.clocksync.get_clock(self.reactor.monotonic())
 | |
|                         + self.mcu_freq * .200)
 | |
|         try:
 | |
|             for i in range(count):
 | |
|                 next_clock = msg_clock + delay_clock
 | |
|                 self.ser.send(msg, minclock=msg_clock, reqclock=next_clock)
 | |
|                 msg_clock = next_clock
 | |
|         except msgproto.error as e:
 | |
|             self.output("Error: %s" % (str(e),))
 | |
|             return
 | |
|     def command_SUPPRESS(self, parts):
 | |
|         oid = None
 | |
|         try:
 | |
|             name = parts[1]
 | |
|             if len(parts) > 2:
 | |
|                 oid = int(parts[2])
 | |
|         except ValueError as e:
 | |
|             self.output("Error: %s" % (str(e),))
 | |
|             return
 | |
|         self.ser.register_response(self.handle_suppress, name, oid)
 | |
|     def command_STATS(self, parts):
 | |
|         curtime = self.reactor.monotonic()
 | |
|         self.output(' '.join([self.ser.stats(curtime),
 | |
|                               self.clocksync.stats(curtime)]))
 | |
|     def command_LIST(self, parts):
 | |
|         self.update_evals(self.reactor.monotonic())
 | |
|         mp = self.ser.get_msgparser()
 | |
|         cmds = [msgformat for msgtag, msgtype, msgformat in mp.get_messages()
 | |
|                 if msgtype == 'command']
 | |
|         out = "Available mcu commands:"
 | |
|         out += "\n  ".join([""] + sorted(cmds))
 | |
|         out += "\nAvailable artificial commands:"
 | |
|         out += "\n  ".join([""] + [n for n in sorted(self.local_commands)])
 | |
|         out += "\nAvailable local variables:"
 | |
|         lvars = sorted(self.eval_globals.items())
 | |
|         out += "\n  ".join([""] + ["%s: %s" % (k, v) for k, v in lvars])
 | |
|         self.output(out)
 | |
|     def command_HELP(self, parts):
 | |
|         self.output(help_txt)
 | |
|     def translate(self, line, eventtime):
 | |
|         evalparts = re_eval.split(line)
 | |
|         if len(evalparts) > 1:
 | |
|             self.update_evals(eventtime)
 | |
|             try:
 | |
|                 for i in range(1, len(evalparts), 2):
 | |
|                     e = eval(evalparts[i], dict(self.eval_globals))
 | |
|                     if type(e) == type(0.):
 | |
|                         e = int(e)
 | |
|                     evalparts[i] = str(e)
 | |
|             except:
 | |
|                 self.output("Unable to evaluate: %s" % (line,))
 | |
|                 return None
 | |
|             line = ''.join(evalparts)
 | |
|             self.output("Eval: %s" % (line,))
 | |
|         try:
 | |
|             line = self.pins.update_command(line).strip()
 | |
|         except:
 | |
|             self.output("Unable to map pin: %s" % (line,))
 | |
|             return None
 | |
|         if line:
 | |
|             parts = line.split()
 | |
|             if parts[0] in self.local_commands:
 | |
|                 self.local_commands[parts[0]](parts)
 | |
|                 return None
 | |
|         return line
 | |
|     def process_kbd(self, eventtime):
 | |
|         self.data += str(os.read(self.fd, 4096).decode())
 | |
| 
 | |
|         kbdlines = self.data.split('\n')
 | |
|         for line in kbdlines[:-1]:
 | |
|             line = line.strip()
 | |
|             cpos = line.find('#')
 | |
|             if cpos >= 0:
 | |
|                 line = line[:cpos]
 | |
|                 if not line:
 | |
|                     continue
 | |
|             msg = self.translate(line.strip(), eventtime)
 | |
|             if msg is None:
 | |
|                 continue
 | |
|             try:
 | |
|                 self.ser.send(msg)
 | |
|             except msgproto.error as e:
 | |
|                 self.output("Error: %s" % (str(e),))
 | |
|         self.data = kbdlines[-1]
 | |
| 
 | |
| def main():
 | |
|     usage = "%prog [options] <serialdevice>"
 | |
|     opts = optparse.OptionParser(usage)
 | |
|     opts.add_option("-v", action="store_true", dest="verbose",
 | |
|                     help="enable debug messages")
 | |
|     opts.add_option("-b", "--baud", type="int", dest="baud", help="baud rate")
 | |
|     opts.add_option("-c", "--canbus_iface", dest="canbus_iface",
 | |
|                     help="Use CAN bus interface; serialdevice is the chip UUID")
 | |
|     opts.add_option("-i", "--canbus_nodeid", type="int", dest="canbus_nodeid",
 | |
|                     default=64, help="The CAN nodeid to use (default 64)")
 | |
|     options, args = opts.parse_args()
 | |
|     if len(args) != 1:
 | |
|         opts.error("Incorrect number of arguments")
 | |
|     serialport = args[0]
 | |
| 
 | |
|     baud = options.baud
 | |
|     if baud is None and not (serialport.startswith("/dev/rpmsg_")
 | |
|                              or serialport.startswith("/tmp/")):
 | |
|         baud = 250000
 | |
| 
 | |
|     debuglevel = logging.INFO
 | |
|     if options.verbose:
 | |
|         debuglevel = logging.DEBUG
 | |
|     logging.basicConfig(level=debuglevel)
 | |
| 
 | |
|     r = reactor.Reactor()
 | |
|     kbd = KeyboardReader(r, serialport, baud, options.canbus_iface,
 | |
|                          options.canbus_nodeid)
 | |
|     try:
 | |
|         r.run()
 | |
|     except KeyboardInterrupt:
 | |
|         sys.stdout.write("\n")
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     main()
 |