Files
DemonEditor/app/ui/control.py
2022-05-22 23:55:13 +03:00

322 lines
14 KiB
Python

# -*- coding: utf-8 -*-
#
# The MIT License (MIT)
#
# Copyright (c) 2018-2022 Dmitriy Yefremov
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# Author: Dmitriy Yefremov
#
""" Receiver control module via HTTP API. """
import os
import re
from gi.repository import GLib
from .dialogs import get_builder, get_message
from .uicommons import Gtk, Gdk, UI_RESOURCES_PATH
from ..commons import run_task, run_with_delay, log, run_idle
from ..connections import HttpAPI
from ..settings import IS_DARWIN, IS_LINUX, IS_WIN
class ControlTool(Gtk.Box):
def __init__(self, app, settings, *args, **kwargs):
super().__init__(*args, **kwargs)
self._settings = settings
self._app = app
self._app.connect("layout-changed", self.on_layout_changed)
self._pix = None
handlers = {"on_volume_changed": self.on_volume_changed,
"on_screenshot_draw": self.on_screenshot_draw,
"on_network_toggled": self.on_network_toggled,
"on_network_view_query_tooltip": self.on_network_view_query_tooltip}
builder = get_builder(UI_RESOURCES_PATH + "control.glade", handlers)
self.pack_start(builder.get_object("control_box"), True, True, 0)
self._remote_box = builder.get_object("remote_box")
self._screenshot_area = builder.get_object("screenshot_area")
self._screenshot_button_box = builder.get_object("screenshot_button_box")
self._screenshot_check_button = builder.get_object("screenshot_check_button")
self._screenshot_check_button.bind_property("active", self._screenshot_area, "visible")
self._snr_value_label = builder.get_object("snr_value_label")
self._ber_value_label = builder.get_object("ber_value_label")
self._agc_value_label = builder.get_object("agc_value_label")
self._snr_level_bar = builder.get_object("snr_level_bar")
self._ber_level_bar = builder.get_object("ber_level_bar")
self._agc_level_bar = builder.get_object("agc_level_bar")
self._volume_button = builder.get_object("volume_button")
self._header_box = builder.get_object("control_header_box")
# Network.
self._network_button = builder.get_object("control_network_button")
self._network_model = builder.get_object("network_model")
self.init_actions(app)
if settings.alternate_layout:
self.on_layout_changed(app, True)
self.show()
def init_actions(self, app):
# Remote controller actions.
app.set_action("on_up", lambda a, v: self.on_remote_action(HttpAPI.Remote.UP))
app.set_action("on_down", lambda a, v: self.on_remote_action(HttpAPI.Remote.DOWN))
app.set_action("on_left", lambda a, v: self.on_remote_action(HttpAPI.Remote.LEFT))
app.set_action("on_right", lambda a, v: self.on_remote_action(HttpAPI.Remote.RIGHT))
app.set_action("on_back", lambda a, v: self.on_remote_action(HttpAPI.Remote.BACK))
app.set_action("on_info", lambda a, v: self.on_remote_action(HttpAPI.Remote.INFO))
app.set_action("on_ok", lambda a, v: self.on_remote_action(HttpAPI.Remote.OK))
app.set_action("on_menu", lambda a, v: self.on_remote_action(HttpAPI.Remote.MENU))
app.set_action("on_exit", lambda a, v: self.on_remote_action(HttpAPI.Remote.EXIT))
app.set_action("on_red", lambda a, v: self.on_remote_action(HttpAPI.Remote.RED))
app.set_action("on_green", lambda a, v: self.on_remote_action(HttpAPI.Remote.GREEN))
app.set_action("on_yellow", lambda a, v: self.on_remote_action(HttpAPI.Remote.YELLOW))
app.set_action("on_blue", lambda a, v: self.on_remote_action(HttpAPI.Remote.BLUE))
# Playback.
app.set_action("on_prev_media", lambda a, v: self.on_player_action(HttpAPI.Request.PLAYER_PREV))
app.set_action("on_play_media", lambda a, v: self.on_player_action(HttpAPI.Request.PLAYER_PLAY))
app.set_action("on_stop_media", lambda a, v: self.on_player_action(HttpAPI.Request.PLAYER_STOP))
app.set_action("on_next_media", lambda a, v: self.on_player_action(HttpAPI.Request.PLAYER_NEXT))
# Power.
app.set_action("on_standby", lambda a, v: self.on_power_action(HttpAPI.Power.STANDBY))
app.set_action("on_wake_up", lambda a, v: self.on_power_action(HttpAPI.Power.WAKEUP))
app.set_action("on_reboot", lambda a, v: self.on_power_action(HttpAPI.Power.REBOOT))
app.set_action("on_restart_gui", lambda a, v: self.on_power_action(HttpAPI.Power.RESTART_GUI))
app.set_action("on_shutdown", lambda a, v: self.on_power_action(HttpAPI.Power.DEEP_STANDBY))
# Screenshots.
app.set_action("on_screenshot_all", self.on_screenshot_all)
app.set_action("on_screenshot_video", self.on_screenshot_video)
app.set_action("on_screenshot_osd", self.on_screenshot_osd)
def on_layout_changed(self, app, alt_layout):
children = self._remote_box.get_children()
self._remote_box.reorder_child(children[0], len(children) - 1)
self._remote_box.reorder_child(children[-1], 0)
pack_type = Gtk.PackType.END if alt_layout else Gtk.PackType.START
self._header_box.set_child_packing(self._network_button, False, False, 0, pack_type)
# ***************** Remote controller ********************* #
def on_remote(self, action, state=False):
""" Shows/Hides [R key] remote controller. """
action.set_state(state)
self._remote_revealer.set_visible(state)
self._remote_revealer.set_reveal_child(state)
if state:
self._app.send_http_request(HttpAPI.Request.VOL, "state", self.update_volume)
def on_remote_action(self, action):
self._app.send_http_request(HttpAPI.Request.REMOTE, action, self.on_response)
def on_player_action(self, action):
self._app.send_http_request(action, "", self.on_response)
@run_with_delay(0.5)
def on_volume_changed(self, button, value):
self._app.send_http_request(HttpAPI.Request.VOL, f"{value:.0f}", self.on_response)
def update_volume(self, vol):
if "error_code" in vol:
return
GLib.idle_add(self._volume_button.set_value, int(vol.get("e2current", "0")))
def on_response(self, resp):
if "error_code" in resp:
return
if self._screenshot_check_button.get_active() and self._app.http_api:
ref = "mode=all" if self._app.http_api.is_owif else "d="
self._app.send_http_request(HttpAPI.Request.GRUB, ref, self.update_screenshot)
@run_task
def update_screenshot(self, data):
if "error_code" in data:
return
data = data.get("img_data", None)
if data:
from gi.repository import GdkPixbuf
allocation = self._screenshot_area.get_parent().get_allocation()
loader = GdkPixbuf.PixbufLoader.new_with_type("jpeg")
loader.set_size(allocation.width, allocation.height)
try:
loader.write(data)
pix = loader.get_pixbuf()
except GLib.Error:
pass # NOP
else:
self._pix = pix
self._screenshot_area.queue_draw() # Redrawing the area!
finally:
loader.close()
def on_screenshot_draw(self, area, cr):
""" Called to automatically resize the screenshot. """
if self._pix:
cr.scale(area.get_allocated_width() / self._pix.get_width(),
area.get_allocated_height() / self._pix.get_height())
img_surface = Gdk.cairo_surface_create_from_pixbuf(self._pix, 1, None)
cr.set_source_surface(img_surface, 0, 0)
cr.paint()
def on_screenshot_all(self, action, value=None):
if self._app.http_api:
self._app.send_http_request(HttpAPI.Request.GRUB, "mode=all" if self._app.http_api.is_owif else "d=",
self.on_screenshot)
def on_screenshot_video(self, action, value=None):
if self._app.http_api:
self._app.send_http_request(HttpAPI.Request.GRUB, "mode=video" if self._app.http_api.is_owif else "v=",
self.on_screenshot)
def on_screenshot_osd(self, action, value=None):
if self._app.http_api:
self._app.send_http_request(HttpAPI.Request.GRUB, "mode=osd" if self._app.http_api.is_owif else "o=",
self.on_screenshot)
@run_task
def on_screenshot(self, data):
if "error_code" in data:
return
img = data.get("img_data", None)
if img:
GLib.idle_add(self._screenshot_button_box.set_sensitive, not IS_LINUX)
path = os.path.expanduser("~/Desktop") if not IS_LINUX else None
try:
import tempfile
import subprocess
with tempfile.NamedTemporaryFile(mode="wb", suffix=".jpg", dir=path, delete=IS_LINUX) as tf:
tf.write(img)
if IS_LINUX:
cmd = ["xdg-open", tf.name]
elif IS_DARWIN:
cmd = ["open", tf.name]
else:
cmd = [tf.name]
if not IS_WIN:
subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
# File must be closed.
if IS_WIN:
subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=IS_WIN).communicate()
finally:
GLib.idle_add(self._screenshot_button_box.set_sensitive, True)
def on_power_action(self, action):
self._app.send_http_request(HttpAPI.Request.POWER, action, lambda resp: log("Power status changed..."))
def update_signal(self, sig):
snr = sig.get("e2snr", "0 %").strip() if sig else "0 %"
acg = sig.get("e2acg", "0 %").strip() if sig else "0 %"
ber = (sig.get("e2ber", None) or "").strip() if sig else ""
# Labels.
self._snr_value_label.set_text(snr)
self._agc_value_label.set_text(acg)
self._ber_value_label.set_text(ber)
# Level bars.
self._snr_level_bar.set_value(int(snr.strip("%N/A") or 0))
self._agc_level_bar.set_value(int(acg.rstrip("%N/A") or 0))
self._ber_level_bar.set_value(int(ber.rstrip("N/A") or 0))
# ***************** Network explorer ********************** #
def on_network_toggled(self, button):
self._network_model.clear()
if button.get_active():
self.update_network()
@run_task
def update_network(self):
pattern = re.compile(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})')
ips = [match for match in re.findall(pattern, os.popen("arp -a").read())]
for ip in ips:
if not self._network_button.get_active():
break
url = f"http://{ip}/web/{HttpAPI.Request.INFO.value}"
try:
resp = HttpAPI.get_response(HttpAPI.Request.INFO, url, timeout=5)
except OSError as e:
log(f"{ip} {e}")
else:
if resp.get("e2distroversion", None):
log(f"Receiver found. Model: {resp.get('e2model', 'N/A')} [{ip} ]")
self.append_box_data(resp)
@run_idle
def append_box_data(self, data):
ip = data.get('e2lanip', 'N/A')
itr = self._network_model.append((data.get("e2model", "N/A"), ip, None, data, None))
GLib.timeout_add_seconds(3, self.check_power_state, itr, priority=GLib.PRIORITY_LOW)
def on_network_view_query_tooltip(self, view, x, y, keyboard_mode, tooltip):
result = view.get_dest_row_at_pos(x, y)
if not result:
return False
path, pos = result
model = view.get_model()
data = model[path][3]
dist = data.get("e2distroversion", "N/A")
img = data.get("e2imageversion", "N/A")
txt = f"Distro version: {dist}\nImage version: {img}"
tooltip.set_text(txt)
view.set_tooltip_row(tooltip, path)
return True
def check_power_state(self, itr):
active = self._network_button.get_active()
if not active:
return False
data = self._network_model.get_value(itr, 3)
url = f"http://{data.get('e2lanip', 'N/A')}/web/powerstate"
self.update_power_state(itr, url)
return active
@run_task
def update_power_state(self, itr, url):
try:
resp = HttpAPI.get_response(HttpAPI.Request.POWER, url, timeout=2)
except OSError as e:
log(e)
else:
state = get_message("On" if resp.get("e2instandby", "N/A").strip() == "false" else "Standby")
GLib.idle_add(self._network_model.set_value, itr, 2, state)
if __name__ == "__main__":
pass