mirror of
				https://github.com/Klipper3d/klipper.git
				synced 2025-10-31 18:36:09 +01:00 
			
		
		
		
	This class provides endpoints that allow connected hosts to fetch the state of printer objects and subscribe to state "pushed" over the connection. Signed-off-by: Eric Callahan <arksine.code@gmail.com>
		
			
				
	
	
		
			418 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			418 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # Klippy WebHooks registration and server connection
 | |
| #
 | |
| # Copyright (C) 2020 Eric Callahan <arksine.code@gmail.com>
 | |
| #
 | |
| # This file may be distributed under the terms of the GNU GPLv3 license
 | |
| import logging
 | |
| import socket
 | |
| import errno
 | |
| import json
 | |
| import homing
 | |
| 
 | |
| SOCKET_LOCATION = "/tmp/moonraker"
 | |
| 
 | |
| # Json decodes strings as unicode types in Python 2.x.  This doesn't
 | |
| # play well with some parts of Klipper (particuarly displays), so we
 | |
| # need to create an object hook. This solution borrowed from:
 | |
| #
 | |
| # https://stackoverflow.com/questions/956867/
 | |
| #
 | |
| def byteify(data, ignore_dicts=False):
 | |
|     if isinstance(data, unicode):
 | |
|         return data.encode('utf-8')
 | |
|     if isinstance(data, list):
 | |
|         return [byteify(i, True) for i in data]
 | |
|     if isinstance(data, dict) and not ignore_dicts:
 | |
|         return {byteify(k, True): byteify(v, True)
 | |
|                 for k, v in data.items()}
 | |
|     return data
 | |
| 
 | |
| def json_loads_byteified(data):
 | |
|     return byteify(
 | |
|         json.loads(data, object_hook=byteify), True)
 | |
| 
 | |
| class WebRequestError(homing.CommandError):
 | |
|     def __init__(self, message,):
 | |
|         Exception.__init__(self, message)
 | |
| 
 | |
|     def to_dict(self):
 | |
|         return {
 | |
|             'error': 'WebRequestError',
 | |
|             'message': self.message}
 | |
| 
 | |
| class Sentinel:
 | |
|     pass
 | |
| 
 | |
| class WebRequest:
 | |
|     error = WebRequestError
 | |
|     def __init__(self, base_request):
 | |
|         self.id = base_request['id']
 | |
|         self.path = base_request['path']
 | |
|         self.method = base_request['method']
 | |
|         self.args = base_request['args']
 | |
|         self.response = None
 | |
| 
 | |
|     def get(self, item, default=Sentinel):
 | |
|         if item not in self.args:
 | |
|             if default == Sentinel:
 | |
|                 raise WebRequestError("Invalid Argument [%s]" % item)
 | |
|             return default
 | |
|         return self.args[item]
 | |
| 
 | |
|     def get_int(self, item):
 | |
|         return int(self.get(item))
 | |
| 
 | |
|     def get_float(self, item):
 | |
|         return float(self.get(item))
 | |
| 
 | |
|     def get_args(self):
 | |
|         return self.args
 | |
| 
 | |
|     def get_path(self):
 | |
|         return self.path
 | |
| 
 | |
|     def get_method(self):
 | |
|         return self.method
 | |
| 
 | |
|     def set_error(self, error):
 | |
|         self.response = error.to_dict()
 | |
| 
 | |
|     def send(self, data):
 | |
|         if self.response is not None:
 | |
|             raise WebRequestError("Multiple calls to send not allowed")
 | |
|         self.response = data
 | |
| 
 | |
|     def finish(self):
 | |
|         if self.response is None:
 | |
|             # No error was set and the user never executed
 | |
|             # send, default response is "ok"
 | |
|             self.response = "ok"
 | |
|         return {"request_id": self.id, "response": self.response}
 | |
| 
 | |
| class ServerConnection:
 | |
|     def __init__(self, webhooks, printer):
 | |
|         self.printer = printer
 | |
|         self.webhooks = webhooks
 | |
|         self.reactor = printer.get_reactor()
 | |
| 
 | |
|         # Klippy Connection
 | |
|         self.fd = self.fd_handle = self.mutex = None
 | |
|         self.is_server_connected = False
 | |
|         self.partial_data = ""
 | |
|         is_fileinput = (printer.get_start_args().get('debuginput')
 | |
|                         is not None)
 | |
|         if is_fileinput:
 | |
|             # Do not try to connect in klippy batch mode
 | |
|             return
 | |
|         self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
 | |
|         self.socket.setblocking(0)
 | |
|         try:
 | |
|             self.socket.connect(SOCKET_LOCATION)
 | |
|         except socket.error:
 | |
|             logging.debug(
 | |
|                 "ServerConnection: Moonraker server not detected")
 | |
|             return
 | |
|         logging.debug("ServerConnection: Moonraker connection established")
 | |
|         self.is_server_connected = True
 | |
|         self.fd = self.socket.fileno()
 | |
|         self.fd_handle = self.reactor.register_fd(
 | |
|             self.fd, self.process_received)
 | |
|         self.mutex = self.reactor.mutex()
 | |
|         printer.register_event_handler('klippy:disconnect', self.close_socket)
 | |
| 
 | |
|     def close_socket(self):
 | |
|         if self.is_server_connected:
 | |
|             logging.info("ServerConnection: lost connection to Moonraker")
 | |
|             self.is_server_connected = False
 | |
|             self.reactor.unregister_fd(self.fd_handle)
 | |
|             try:
 | |
|                 self.socket.close()
 | |
|             except socket.error:
 | |
|                 pass
 | |
| 
 | |
|     def is_connected(self):
 | |
|         return self.is_server_connected
 | |
| 
 | |
|     def process_received(self, eventtime):
 | |
|         try:
 | |
|             data = self.socket.recv(4096)
 | |
|         except socket.error as e:
 | |
|             # If bad file descriptor allow connection to be
 | |
|             # closed by the data check
 | |
|             if e.errno == errno.EBADF:
 | |
|                 data = ''
 | |
|             else:
 | |
|                 return
 | |
|         if data == '':
 | |
|             # Socket Closed
 | |
|             self.close_socket()
 | |
|             return
 | |
|         requests = data.split('\x03')
 | |
|         requests[0] = self.partial_data + requests[0]
 | |
|         self.partial_data = requests.pop()
 | |
|         for req in requests:
 | |
|             logging.debug(
 | |
|                 "ServerConnection: Request received: %s" % (req))
 | |
|             try:
 | |
|                 web_request = WebRequest(json_loads_byteified(req))
 | |
|             except Exception:
 | |
|                 logging.exception(
 | |
|                     "ServerConnection: Error decoding Server Request %s"
 | |
|                     % (req))
 | |
|                 continue
 | |
|             self.reactor.register_callback(
 | |
|                 lambda e, s=self, wr=web_request: s._process_request(wr))
 | |
| 
 | |
|     def _process_request(self, web_request):
 | |
|         try:
 | |
|             func = self.webhooks.get_callback(
 | |
|                 web_request.get_path())
 | |
|             func(web_request)
 | |
|         except homing.CommandError as e:
 | |
|             web_request.set_error(WebRequestError(e.message))
 | |
|         except Exception as e:
 | |
|             msg = "Internal Error on WebRequest: %s" % (web_request.get_path())
 | |
|             logging.exception(msg)
 | |
|             web_request.set_error(WebRequestError(e.message))
 | |
|             self.printer.invoke_shutdown(msg)
 | |
|         result = web_request.finish()
 | |
|         logging.debug(
 | |
|             "ServerConnection: Sending response - %s" % (str(result)))
 | |
|         self.send({'method': "response", 'params': result})
 | |
| 
 | |
|     def send(self, data):
 | |
|         if not self.is_server_connected:
 | |
|             return
 | |
|         with self.mutex:
 | |
|             retries = 10
 | |
|             data = json.dumps(data) + "\x03"
 | |
|             while data:
 | |
|                 try:
 | |
|                     sent = self.socket.send(data)
 | |
|                 except socket.error as e:
 | |
|                     if e.errno == errno.EBADF or e.errno == errno.EPIPE \
 | |
|                             or not retries:
 | |
|                         sent = 0
 | |
|                     else:
 | |
|                         retries -= 1
 | |
|                         waketime = self.reactor.monotonic() + .001
 | |
|                         self.reactor.pause(waketime)
 | |
|                         continue
 | |
|                 retries = 10
 | |
|                 if sent > 0:
 | |
|                     data = data[sent:]
 | |
|                 else:
 | |
|                     logging.info(
 | |
|                         "ServerConnection: Error sending server data,"
 | |
|                         " closing socket")
 | |
|                     self.close_socket()
 | |
|                     break
 | |
| 
 | |
| class WebHooks:
 | |
|     def __init__(self, printer):
 | |
|         self.printer = printer
 | |
|         self._endpoints = {"list_endpoints": self._handle_list_endpoints}
 | |
|         self._static_paths = []
 | |
|         self.register_endpoint("info", self._handle_info_request)
 | |
|         self.register_endpoint("emergency_stop", self._handle_estop_request)
 | |
|         start_args = printer.get_start_args()
 | |
|         log_file = start_args.get('log_file')
 | |
|         if log_file is not None:
 | |
|             self.register_static_path("klippy.log", log_file)
 | |
|         self.sconn = ServerConnection(self, printer)
 | |
|         StatusHandler(self)
 | |
| 
 | |
|     def register_endpoint(self, path, callback):
 | |
|         if path in self._endpoints:
 | |
|             raise WebRequestError("Path already registered to an endpoint")
 | |
|         self._endpoints[path] = callback
 | |
| 
 | |
|     def register_static_path(self, resource_id, file_path):
 | |
|         static_path_info = {
 | |
|             'resource_id': resource_id, 'file_path': file_path}
 | |
|         self._static_paths.append(static_path_info)
 | |
| 
 | |
|     def _handle_list_endpoints(self, web_request):
 | |
|         web_request.send({
 | |
|             'hooks': self._endpoints.keys(),
 | |
|             'static_paths': self._static_paths})
 | |
| 
 | |
|     def _handle_info_request(self, web_request):
 | |
|         if web_request.get_method() != 'GET':
 | |
|             raise web_request.error("Invalid Request Method")
 | |
|         start_args = self.printer.get_start_args()
 | |
|         state_message, msg_type = self.printer.get_state_message()
 | |
|         version = start_args['software_version']
 | |
|         cpu_info = start_args['cpu_info']
 | |
|         error = msg_type == "error"
 | |
|         web_request.send(
 | |
|             {'cpu': cpu_info, 'version': version,
 | |
|              'hostname': socket.gethostname(),
 | |
|              'is_ready': msg_type == "ready",
 | |
|              'error_detected': error,
 | |
|              'message': state_message})
 | |
| 
 | |
|     def _handle_estop_request(self, web_request):
 | |
|         if web_request.get_method() != 'POST':
 | |
|             raise web_request.error("Invalid Request Method")
 | |
|         self.printer.invoke_shutdown("Shutdown due to webhooks request")
 | |
| 
 | |
|     def get_connection(self):
 | |
|         return self.sconn
 | |
| 
 | |
|     def get_callback(self, path):
 | |
|         cb = self._endpoints.get(path, None)
 | |
|         if cb is None:
 | |
|             msg = "webhooks: No registered callback for path '%s'" % (path)
 | |
|             logging.info(msg)
 | |
|             raise WebRequestError(msg)
 | |
|         return cb
 | |
| 
 | |
|     def call_remote_method(self, method, **kwargs):
 | |
|         self.sconn.send({'method': method, 'params': kwargs})
 | |
| 
 | |
|     def _action_call_remote_method(self, method, **kwargs):
 | |
|         self.call_remote_method(method, **kwargs)
 | |
|         return ""
 | |
| 
 | |
|     def get_status(self, eventtime=0.):
 | |
|         return {
 | |
|             "action_call_remote_method": self._action_call_remote_method
 | |
|         }
 | |
| 
 | |
| SUBSCRIPTION_REFRESH_TIME = .25
 | |
| 
 | |
| class StatusHandler:
 | |
|     def __init__(self, webhooks):
 | |
|         self.printer = webhooks.printer
 | |
|         self.webhooks = webhooks
 | |
|         self.ready = self.timer_started = False
 | |
|         self.reactor = self.printer.get_reactor()
 | |
|         self.available_objects = {}
 | |
|         self.subscriptions = {}
 | |
|         self.subscription_timer = self.reactor.register_timer(
 | |
|             self._batch_subscription_handler, self.reactor.NEVER)
 | |
| 
 | |
|         # Register events
 | |
|         self.printer.register_event_handler(
 | |
|             "klippy:ready", self._handle_ready)
 | |
|         self.printer.register_event_handler(
 | |
|             "gcode:request_restart", self._handle_restart)
 | |
| 
 | |
|         # Register webhooks
 | |
|         webhooks.register_endpoint(
 | |
|             "objects/list",
 | |
|             self._handle_object_request)
 | |
|         webhooks.register_endpoint(
 | |
|             "objects/status",
 | |
|             self._handle_status_request)
 | |
|         webhooks.register_endpoint(
 | |
|             "objects/subscription",
 | |
|             self._handle_subscription_request)
 | |
| 
 | |
|     def _handle_ready(self):
 | |
|         eventtime = self.reactor.monotonic()
 | |
|         self.available_objects = {}
 | |
|         objs = self.printer.lookup_objects()
 | |
|         status_objs = {n: o for n, o in objs if hasattr(o, "get_status")}
 | |
|         for name, obj in status_objs.items():
 | |
|             attrs = obj.get_status(eventtime)
 | |
|             self.available_objects[name] = attrs.keys()
 | |
|         self.ready = True
 | |
| 
 | |
|     def _handle_restart(self, eventtime):
 | |
|         self.ready = False
 | |
|         self.reactor.update_timer(self.subscription_timer, self.reactor.NEVER)
 | |
| 
 | |
|     def _batch_subscription_handler(self, eventtime):
 | |
|         status = self._process_status_request(self.subscriptions, eventtime)
 | |
|         self.webhooks.call_remote_method(
 | |
|             "process_status_update", status=status)
 | |
|         return eventtime + SUBSCRIPTION_REFRESH_TIME
 | |
| 
 | |
|     def _process_status_request(self, requested_objects, eventtime):
 | |
|         result = {}
 | |
|         if self.ready:
 | |
|             for name, req_items in requested_objects.items():
 | |
|                 obj = self.printer.lookup_object(name, None)
 | |
|                 if obj is not None and name in self.available_objects:
 | |
|                     status = obj.get_status(eventtime)
 | |
|                     if not req_items:
 | |
|                         # return all items excluding callables
 | |
|                         result[name] = {k: v for k, v in status.items()
 | |
|                                         if not callable(v)}
 | |
|                     else:
 | |
|                         # return requested items excluding callables
 | |
|                         result[name] = {k: v for k, v in status.items()
 | |
|                                         if k in req_items and not callable(v)}
 | |
|         else:
 | |
|             result = {"status": "Klippy Not Ready"}
 | |
|         return result
 | |
| 
 | |
|     def _handle_object_request(self, web_request):
 | |
|         if web_request.get_method() != 'GET':
 | |
|             raise web_request.error("Invalid Request Method")
 | |
|         web_request.send(dict(self.available_objects))
 | |
| 
 | |
|     def _handle_status_request(self, web_request):
 | |
|         if web_request.get_method() != 'GET':
 | |
|             raise web_request.error("Invalid Request Method")
 | |
|         args = web_request.get_args()
 | |
|         eventtime = self.reactor.monotonic()
 | |
|         result = self._process_status_request(args, eventtime)
 | |
|         web_request.send(result)
 | |
| 
 | |
|     def _handle_subscription_request(self, web_request):
 | |
|         method = web_request.get_method()
 | |
|         if method == 'POST':
 | |
|             # add a subscription
 | |
|             args = web_request.get_args()
 | |
|             if args:
 | |
|                 self.add_subscripton(args)
 | |
|             else:
 | |
|                 raise web_request.error("Invalid argument")
 | |
|         else:
 | |
|             # get subscription info
 | |
|             result = dict(self.subscriptions)
 | |
|             web_request.send(result)
 | |
| 
 | |
|     def add_subscripton(self, new_sub):
 | |
|         if not new_sub:
 | |
|             return
 | |
|         for obj_name, req_items in new_sub.items():
 | |
|             if obj_name not in self.available_objects:
 | |
|                 logging.info(
 | |
|                     "webhooks: Object {%s} not available for subscription"
 | |
|                     % (obj_name))
 | |
|                 continue
 | |
|             # validate requested items
 | |
|             if req_items:
 | |
|                 avail_items = set(self.available_objects[obj_name])
 | |
|                 invalid_items = set(req_items) - avail_items
 | |
|                 if invalid_items:
 | |
|                     logging.info(
 | |
|                         "webhooks: Removed invalid items [%s] from "
 | |
|                         "subscription request %s" %
 | |
|                         (", ".join(invalid_items), obj_name))
 | |
|                     req_items = list(set(req_items) - invalid_items)
 | |
|                     if not req_items:
 | |
|                         # No valid items remaining
 | |
|                         continue
 | |
|             # Add or update subscription
 | |
|             existing_items = self.subscriptions.get(obj_name, None)
 | |
|             if existing_items is not None:
 | |
|                 if req_items == [] or existing_items == []:
 | |
|                     # Subscribe to all items
 | |
|                     self.subscriptions[obj_name] = []
 | |
|                 else:
 | |
|                     req_items = list(set(req_items) | set(existing_items))
 | |
|                     self.subscriptions[obj_name] = req_items
 | |
|             else:
 | |
|                 self.subscriptions[obj_name] = req_items
 | |
|         if not self.timer_started:
 | |
|             self.reactor.update_timer(self.subscription_timer, self.reactor.NOW)
 | |
|             self.timer_started = True
 | |
| 
 | |
| def add_early_printer_objects(printer):
 | |
|     printer.add_object('webhooks', WebHooks(printer))
 |