Files
DemonEditor/app/tools/picons.py

324 lines
11 KiB
Python
Raw Normal View History

2018-03-04 19:37:41 +03:00
import glob
2018-01-12 14:32:36 +03:00
import os
2018-04-23 14:42:41 +03:00
import re
2018-01-12 14:32:36 +03:00
import shutil
2018-01-15 14:56:17 +03:00
from collections import namedtuple
2018-01-12 14:32:36 +03:00
from html.parser import HTMLParser
2021-02-10 23:21:30 +03:00
import requests
2020-10-10 15:19:00 +03:00
from app.commons import run_task, log
2019-12-22 20:42:29 +03:00
from app.settings import SettingsType
2021-02-10 23:21:30 +03:00
from .satellites import _HEADERS
2018-01-12 14:32:36 +03:00
2018-10-21 00:17:22 +03:00
_ENIGMA2_PICON_KEY = "{:X}:{:X}:{}"
2018-02-05 14:44:42 +03:00
_NEUTRINO_PICON_KEY = "{:x}{:04x}{:04x}.png"
2018-01-28 23:10:54 +03:00
Provider = namedtuple("Provider", ["logo", "name", "pos", "url", "on_id", "ssid", "single", "selected"])
2021-03-12 13:37:06 +03:00
Picon = namedtuple("Picon", ["ref", "ssid"])
2018-01-15 14:56:17 +03:00
2018-01-12 14:32:36 +03:00
class PiconsParser(HTMLParser):
2018-01-15 14:56:17 +03:00
""" Parser for package html page. (https://www.lyngsat.com/packages/*provider-name*.html) """
2021-02-10 23:21:30 +03:00
_BASE_URL = "https://www.lyngsat.com"
2018-01-12 14:32:36 +03:00
def __init__(self, entities=False, separator=' ', single=None):
2018-01-12 14:32:36 +03:00
HTMLParser.__init__(self)
self._parse_html_entities = entities
self._separator = separator
self._single = single
2018-01-12 14:32:36 +03:00
self._is_td = False
self._is_th = False
self._current_row = []
self._current_cell = []
2018-01-16 01:16:03 +03:00
self.picons = []
2018-01-12 14:32:36 +03:00
def handle_starttag(self, tag, attrs):
2020-10-10 15:19:00 +03:00
if tag == "td":
2018-01-12 14:32:36 +03:00
self._is_td = True
2020-10-10 15:19:00 +03:00
if tag == "th":
2018-01-12 14:32:36 +03:00
self._is_th = True
if tag == "img":
self._current_row.append(attrs[0][1])
def handle_data(self, data):
""" Save content to a cell """
if self._is_td or self._is_th:
self._current_cell.append(data.strip())
def handle_endtag(self, tag):
2020-10-10 15:19:00 +03:00
if tag == "td":
2018-01-12 14:32:36 +03:00
self._is_td = False
2020-10-10 15:19:00 +03:00
elif tag == "th":
2018-01-12 14:32:36 +03:00
self._is_th = False
2020-10-10 15:19:00 +03:00
if tag in ("td", "th"):
2018-01-12 14:32:36 +03:00
final_cell = self._separator.join(self._current_cell).strip()
self._current_row.append(final_cell)
self._current_cell = []
2020-10-10 15:19:00 +03:00
elif tag == "tr":
2018-01-12 14:32:36 +03:00
row = self._current_row
ln = len(row)
2018-01-16 01:16:03 +03:00
2021-02-10 23:21:30 +03:00
if self._single and ln == 4 and row[0].startswith("/logo/"):
2021-03-12 13:37:06 +03:00
self.picons.append(Picon(row[0].strip(), "0"))
else:
2021-03-12 13:37:06 +03:00
if ln == 9:
url = None
2021-02-10 23:21:30 +03:00
if row[0].startswith("/logo/"):
url = row[0]
2021-02-10 23:21:30 +03:00
elif row[1].startswith("/logo/"):
url = row[1]
2021-03-12 13:37:06 +03:00
if url and row[-3].isdigit():
self.picons.append(Picon(url, row[-3]))
2018-01-16 01:16:03 +03:00
2018-01-12 14:32:36 +03:00
self._current_row = []
def error(self, message):
pass
@staticmethod
2021-02-10 23:21:30 +03:00
def parse(provider, picons_path, picon_ids, s_type=SettingsType.ENIGMA_2):
""" Returns tuple(url, picon file name) list. """
req = requests.get(provider.url, timeout=5)
if req.status_code == 200:
logo_data = req.text
else:
log("Provider picons downloading error: {} {}".format(provider.url, req.reason))
2020-10-10 15:19:00 +03:00
return
2021-02-10 23:21:30 +03:00
on_id, pos, ssid, single = provider.on_id, provider.pos, provider.ssid, provider.single
neg_pos = pos.endswith("W")
pos = int("".join(c for c in pos if c.isdigit()))
# For negative (West) positions 3600 - numeric position value!!!
if neg_pos:
pos = 3600 - pos
parser = PiconsParser(single=provider.single)
parser.reset()
parser.feed(logo_data)
picons = parser.picons
picons_data = []
if picons:
for p in picons:
try:
if single:
on_id, freq = on_id.strip().split("::")
namespace = "{:X}{:X}".format(int(pos), int(freq))
else:
namespace = "{:X}0000".format(int(pos))
name = PiconsParser.format(ssid if single else p.ssid, on_id, namespace, picon_ids, s_type)
p_name = picons_path + (name if name else os.path.basename(p.ref))
picons_data.append(("{}{}".format(PiconsParser._BASE_URL, p.ref), p_name))
except (TypeError, ValueError) as e:
msg = "Picons format parse error: {}".format(p) + "\n" + str(e)
log(msg)
return picons_data
2018-01-12 14:32:36 +03:00
@staticmethod
2019-12-22 20:42:29 +03:00
def format(ssid, on_id, namespace, picon_ids, s_type):
if s_type is SettingsType.ENIGMA_2:
2018-10-21 00:17:22 +03:00
return picon_ids.get(_ENIGMA2_PICON_KEY.format(int(ssid), int(on_id), namespace), None)
2019-12-22 20:42:29 +03:00
elif s_type is SettingsType.NEUTRINO_MP:
2018-10-20 07:27:12 +03:00
tr_id = int(ssid[:-2] if len(ssid) < 4 else ssid[:2])
2018-02-05 14:44:42 +03:00
return _NEUTRINO_PICON_KEY.format(tr_id, int(on_id), int(ssid))
2018-01-12 14:32:36 +03:00
else:
return "{}.png".format(ssid)
2018-01-15 14:56:17 +03:00
class ProviderParser(HTMLParser):
""" Parser for satellite html page. (https://www.lyngsat.com/*sat-name*.html) """
2018-03-07 23:56:21 +03:00
_POSITION_PATTERN = re.compile("at\s\d+\..*(?:E|W)']")
2018-10-21 00:17:22 +03:00
_ONID_TID_PATTERN = re.compile("^\d+-\d+.*")
_TRANSPONDER_FREQUENCY_PATTERN = re.compile("^\d+ [HVLR]+")
2021-02-10 23:21:30 +03:00
_DOMAINS = {"/tvchannels/", "/radiochannels/", "/packages/", "/logo/"}
_BASE_URL = "https://www.lyngsat.com"
2018-03-07 23:56:21 +03:00
2018-01-15 14:56:17 +03:00
def __init__(self, entities=False, separator=' '):
HTMLParser.__init__(self)
self.convert_charrefs = False
2018-01-15 14:56:17 +03:00
self._parse_html_entities = entities
self._separator = separator
self._is_td = False
self._is_th = False
2018-10-20 07:27:12 +03:00
self._is_onid_tid = False
2018-01-15 14:56:17 +03:00
self._is_provider = False
self._current_row = []
self._current_cell = []
self.rows = []
self._ids = set()
self._prv_names = set()
2018-02-04 18:09:37 +03:00
self._positon = None
2018-10-20 07:27:12 +03:00
self._on_id = None
2018-10-21 00:17:22 +03:00
self._freq = None
2018-01-15 14:56:17 +03:00
def handle_starttag(self, tag, attrs):
if tag == 'td':
self._is_td = True
if tag == 'tr':
self._is_th = True
if tag == "img":
2021-02-10 23:21:30 +03:00
if attrs[0][1].startswith("/logo/"):
2018-01-15 14:56:17 +03:00
self._current_row.append(attrs[0][1])
if tag == "a":
url = attrs[0][1]
2020-07-19 20:51:18 +03:00
if any(d in url for d in self._DOMAINS):
self._current_row.append(url)
2018-10-20 07:27:12 +03:00
if tag == "font" and len(attrs) == 1:
atr = attrs[0]
if len(atr) == 2 and atr[1] == "darkgreen":
self._is_onid_tid = True
2018-01-15 14:56:17 +03:00
def handle_data(self, data):
""" Save content to a cell """
if self._is_td or self._is_th:
self._current_cell.append(data.strip())
2018-10-20 07:27:12 +03:00
if self._is_onid_tid:
2018-10-21 00:17:22 +03:00
m = self._ONID_TID_PATTERN.match(data)
if m:
self._on_id, tid = m.group().split("-")
2018-10-20 07:27:12 +03:00
self._is_onid_tid = False
2018-01-15 14:56:17 +03:00
def handle_endtag(self, tag):
if tag == 'td':
self._is_td = False
elif tag == 'tr':
self._is_th = False
if tag in ('td', 'th'):
final_cell = self._separator.join(self._current_cell).strip()
self._current_row.append(final_cell)
self._current_cell = []
elif tag == 'tr':
2021-02-10 23:21:30 +03:00
row = self._current_row
2018-02-04 18:09:37 +03:00
# Satellite position
2018-03-07 23:56:21 +03:00
if not self._positon:
2021-02-10 23:21:30 +03:00
pos = re.findall(self._POSITION_PATTERN, str(row))
2018-03-07 23:56:21 +03:00
if pos:
self._positon = "".join(c for c in str(pos) if c.isdigit() or c in ".EW")
2018-02-04 18:09:37 +03:00
2021-02-10 23:21:30 +03:00
len_row = len(row)
2018-10-21 00:17:22 +03:00
if len_row > 2:
2021-02-10 23:21:30 +03:00
m = self._TRANSPONDER_FREQUENCY_PATTERN.match(row[1])
2018-10-21 00:17:22 +03:00
if m:
self._freq = m.group().split()[0]
2021-02-10 23:21:30 +03:00
if len_row == 14:
# Providers
2021-02-10 23:21:30 +03:00
name = row[6]
self._prv_names.add(name)
2021-02-10 23:21:30 +03:00
m = self._ONID_TID_PATTERN.match(str(row[9]))
2018-10-21 00:17:22 +03:00
if m:
on_id, tid = m.group().split("-")
if on_id not in self._ids:
2021-02-10 23:21:30 +03:00
row[-2] = on_id
2018-10-21 00:17:22 +03:00
self._ids.add(on_id)
2021-02-10 23:21:30 +03:00
row[0] = self._positon
2018-10-21 00:17:22 +03:00
if name + on_id not in self._prv_names:
self._prv_names.add(name + on_id)
2021-02-10 23:21:30 +03:00
logo_data = None
req = requests.get(self._BASE_URL + row[3], timeout=5)
if req.status_code == 200:
logo_data = req.content
else:
log("Downloading provider logo error: {}".format(req.reason))
self.rows.append(Provider(logo=logo_data, name=name, pos=self._positon, url=row[5], on_id=on_id,
2018-10-21 00:17:22 +03:00
ssid=None, single=False, selected=True))
2021-02-10 23:21:30 +03:00
elif 6 < len_row < 14:
# Single services
name, url, ssid = None, None, None
2021-02-10 23:21:30 +03:00
if row[0].startswith("http"):
name, url, ssid = row[1], row[0], row[0]
elif row[1].startswith("http"):
name, url, ssid = row[2], row[1], row[0]
2018-10-21 00:17:22 +03:00
if name and url:
2018-10-21 00:17:22 +03:00
on_id = "{}::{}".format(self._on_id if self._on_id else "1", self._freq)
self.rows.append(Provider(logo=None, name=name, pos=self._positon, url=url, on_id=on_id,
2018-10-20 07:27:12 +03:00
ssid=ssid, single=True, selected=False))
2018-01-15 14:56:17 +03:00
self._current_row = []
def error(self, message):
pass
2018-02-05 14:44:42 +03:00
def reset(self):
super().reset()
2018-02-04 18:09:37 +03:00
2018-01-15 14:56:17 +03:00
2021-02-10 23:21:30 +03:00
def parse_providers(url):
""" Returns a list of providers sorted by logo [single channels after providers]. """
2018-02-05 14:44:42 +03:00
parser = ProviderParser()
2018-01-15 14:56:17 +03:00
2021-02-10 23:21:30 +03:00
request = requests.get(url=url, headers=_HEADERS)
if request.status_code == 200:
parser.feed(request.text)
else:
log("Parse providers error [{}]: {}".format(url, request.reason))
def srt(p):
if p.logo is None:
return 1
return 0
providers = parser.rows
providers.sort(key=srt)
return providers
def download_picon(src_url, dest_path, callback):
""" Downloads and saves the picon to file. """
err_msg = "Picon download error: {} [{}]"
timeout = (3, 5) # connect and read timeouts
if callback:
callback("Downloading: {}.\n".format(os.path.basename(dest_path)))
req = requests.get(src_url, timeout=timeout, stream=True)
if req.status_code != 200:
err_msg = err_msg.format(src_url, req.reason)
log(err_msg)
if callback:
callback(err_msg + "\n")
else:
try:
with open(dest_path, "wb") as f:
for chunk in req:
f.write(chunk)
except OSError as e:
err_msg = "Saving picon [{}] error: {}".format(dest_path, e)
log(err_msg)
if callback:
callback(err_msg + "\n")
2018-01-15 14:56:17 +03:00
2018-03-04 19:37:41 +03:00
@run_task
2019-12-22 20:42:29 +03:00
def convert_to(src_path, dest_path, s_type, callback, done_callback):
2018-03-04 19:37:41 +03:00
""" Converts names format of picons.
Copies resulting files from src to dest and writes state to callback.
"""
2019-12-22 20:42:29 +03:00
pattern = "/*_0_0_0.png" if s_type is SettingsType.ENIGMA_2 else "/*.png"
2018-03-04 19:37:41 +03:00
for file in glob.glob(src_path + pattern):
base_name = os.path.basename(file)
pic_data = base_name.rstrip(".png").split("_")
dest_file = _NEUTRINO_PICON_KEY.format(int(pic_data[4], 16), int(pic_data[5], 16), int(pic_data[3], 16))
dest = "{}/{}".format(dest_path, dest_file)
callback('Converting "{}" to "{}"\n'.format(base_name, dest_file))
shutil.copyfile(file, dest)
done_callback()
2018-01-12 14:32:36 +03:00
if __name__ == "__main__":
pass