Files
DemonEditor/app/connections.py

502 lines
18 KiB
Python
Raw Normal View History

2017-11-12 22:23:12 +03:00
import os
2020-01-08 21:33:24 +03:00
import re
2017-11-12 22:23:12 +03:00
import socket
import time
2018-11-17 23:19:17 +03:00
import urllib
2020-01-08 21:33:24 +03:00
import xml.etree.ElementTree as ETree
2017-11-06 22:17:43 +03:00
from enum import Enum
2018-01-20 14:04:07 +03:00
from ftplib import FTP, error_perm
2019-12-17 11:59:57 +03:00
from http.client import RemoteDisconnected
2017-11-12 22:23:12 +03:00
from telnetlib import Telnet
2018-11-08 13:07:24 +03:00
from urllib.error import HTTPError, URLError
from urllib.parse import urlencode
from urllib.request import (urlopen, HTTPPasswordMgrWithDefaultRealm, HTTPBasicAuthHandler, build_opener,
install_opener, Request)
2017-11-06 22:17:43 +03:00
2020-01-17 00:34:18 +03:00
from app.commons import log, run_task
2019-12-22 20:42:29 +03:00
from app.settings import SettingsType
2018-01-07 16:33:18 +03:00
BQ_FILES_LIST = ("tv", "radio", # enigma 2
"myservices.xml", "bouquets.xml", "ubouquets.xml") # neutrino
DATA_FILES_LIST = ("lamedb", "lamedb5", "blacklist", "whitelist",)
2017-10-14 12:24:59 +03:00
STC_XML_FILE = ("satellites.xml", "terrestrial.xml", "cables.xml")
WEB_TV_XML_FILE = ("webtv.xml",)
PICONS_SUF = (".jpg", ".png")
2018-02-12 13:34:00 +03:00
2017-10-14 12:24:59 +03:00
2018-08-04 11:38:38 +03:00
class DownloadType(Enum):
2017-11-06 22:17:43 +03:00
ALL = 0
BOUQUETS = 1
SATELLITES = 2
2018-01-16 18:51:08 +03:00
PICONS = 3
2018-02-12 13:34:00 +03:00
WEBTV = 4
EPG = 5
2017-11-06 22:17:43 +03:00
2018-11-17 23:19:17 +03:00
class HttpRequestType(Enum):
ZAP = "zap?sRef="
INFO = "about"
2020-01-17 00:34:18 +03:00
SIGNAL = "signal"
2020-01-14 18:26:05 +03:00
STREAM = "stream.m3u?ref="
STREAM_CURRENT = "streamcurrent.m3u"
2020-01-08 21:33:24 +03:00
CURRENT = "getcurrent"
TEST = None
2020-01-11 17:58:50 +03:00
TOKEN = "session"
2020-01-28 14:51:23 +03:00
PLAY = "mediaplayerplay?file="
PLAYER_LIST = "mediaplayerlist?path=playlist"
PLAYER_PLAY = "mediaplayercmd?command=play"
PLAYER_NEXT = "mediaplayercmd?command=next"
PLAYER_PREV = "mediaplayercmd?command=previous"
PLAYER_STOP = "mediaplayercmd?command=stop"
PLAYER_REMOVE = "mediaplayerremove?file="
2020-06-08 19:32:18 +03:00
GRUB = "grab?format=jpg&"
2018-11-17 23:19:17 +03:00
2018-11-08 13:07:24 +03:00
class TestException(Exception):
pass
2020-01-06 13:17:56 +03:00
class HttpApiException(Exception):
pass
def download_data(*, settings, download_type=DownloadType.ALL, callback=print, files_filter=None):
2019-12-13 13:31:07 +03:00
with FTP(host=settings.host, user=settings.user, passwd=settings.password) as ftp:
ftp.encoding = "utf-8"
2018-11-06 23:43:13 +03:00
callback("FTP OK.\n")
2019-12-22 20:42:29 +03:00
save_path = settings.data_local_path
2017-12-30 22:58:47 +03:00
os.makedirs(os.path.dirname(save_path), exist_ok=True)
# bouquets
2018-08-04 11:38:38 +03:00
if download_type is DownloadType.ALL or download_type is DownloadType.BOUQUETS:
2019-12-13 13:31:07 +03:00
ftp.cwd(settings.services_path)
file_list = BQ_FILES_LIST + DATA_FILES_LIST if download_type is DownloadType.ALL else BQ_FILES_LIST
2020-05-08 17:13:17 +03:00
for file in filter(lambda f: f.endswith(file_list), ftp.nlst()):
download_file(ftp, file, save_path, callback)
# *.xml and webtv
if download_type in (DownloadType.ALL, DownloadType.SATELLITES):
download_xml(ftp, save_path, settings.satellites_xml_path, STC_XML_FILE, callback)
if download_type in (DownloadType.ALL, DownloadType.WEBTV):
download_xml(ftp, save_path, settings.satellites_xml_path, WEB_TV_XML_FILE, callback)
2020-01-23 00:47:01 +03:00
if download_type is DownloadType.PICONS:
picons_path = settings.picons_local_path
os.makedirs(os.path.dirname(picons_path), exist_ok=True)
download_picons(ftp, settings.picons_path, picons_path, callback, files_filter)
# epg.dat
if download_type is DownloadType.EPG:
2019-12-13 13:31:07 +03:00
stb_path = settings.services_path
epg_options = settings.epg_options
if epg_options:
2020-04-02 16:50:58 +03:00
stb_path = epg_options.get("epg_dat_stb_path", stb_path)
save_path = epg_options.get("epg_dat_path", save_path)
ftp.cwd(stb_path)
2020-05-08 17:13:17 +03:00
for file in filter(lambda f: f.endswith("epg.dat"), ftp.nlst()):
download_file(ftp, file, save_path, callback)
2017-10-26 01:23:05 +03:00
2019-09-10 00:24:51 +03:00
callback("\nDone.\n")
2017-10-14 12:24:59 +03:00
2018-01-16 18:51:08 +03:00
2019-12-13 13:31:07 +03:00
def upload_data(*, settings, download_type=DownloadType.ALL, remove_unused=False,
callback=print, done_callback=None, use_http=False, files_filter=None):
2019-12-22 20:42:29 +03:00
s_type = settings.setting_type
data_path = settings.data_local_path
2019-12-13 13:31:07 +03:00
host = settings.host
2020-01-11 17:58:50 +03:00
base_url = "http{}://{}:{}".format("s" if settings.http_use_ssl else "", host, settings.http_port)
url = "{}/web/".format(base_url)
2018-11-13 14:17:59 +03:00
tn, ht = None, None # telnet, http
2017-11-10 13:38:03 +03:00
2018-11-13 14:17:59 +03:00
try:
2019-12-22 20:42:29 +03:00
if s_type is SettingsType.ENIGMA_2 and use_http:
2020-01-11 17:58:50 +03:00
ht = http(settings.http_user, settings.http_password, base_url, callback, settings.http_use_ssl)
2018-11-13 14:17:59 +03:00
next(ht)
message = ""
if download_type is DownloadType.BOUQUETS:
message = "User bouquets will be updated!"
elif download_type is DownloadType.ALL:
message = "All user data will be reloaded!"
elif download_type is DownloadType.SATELLITES:
message = "Satellites.xml file will be updated!"
params = urlencode({"text": message, "type": 2, "timeout": 5})
2020-01-11 17:58:50 +03:00
ht.send((url + "message?{}".format(params), "Sending info message... "))
2018-11-13 14:17:59 +03:00
if download_type is DownloadType.ALL:
time.sleep(5)
2020-01-11 17:58:50 +03:00
ht.send((url + "powerstate?newstate=0", "Toggle Standby "))
2018-11-13 14:17:59 +03:00
time.sleep(2)
else:
# telnet
2019-12-13 13:31:07 +03:00
tn = telnet(host=host,
user=settings.telnet_user,
password=settings.telnet_password,
timeout=settings.telnet_timeout)
2018-11-13 14:17:59 +03:00
next(tn)
# terminate enigma or neutrino
tn.send("init 4")
2019-12-13 13:31:07 +03:00
with FTP(host=host, user=settings.user, passwd=settings.password) as ftp:
2018-11-13 14:17:59 +03:00
ftp.encoding = "utf-8"
callback("FTP OK.\n")
2019-12-13 13:31:07 +03:00
sat_xml_path = settings.satellites_xml_path
services_path = settings.services_path
2017-11-10 13:38:03 +03:00
2018-08-04 11:38:38 +03:00
if download_type is DownloadType.SATELLITES:
upload_xml(ftp, data_path, sat_xml_path, STC_XML_FILE, callback)
2018-11-13 14:17:59 +03:00
2019-12-22 20:42:29 +03:00
if s_type is SettingsType.NEUTRINO_MP and download_type is DownloadType.WEBTV:
upload_xml(ftp, data_path, sat_xml_path, WEB_TV_XML_FILE, callback)
2018-11-13 14:17:59 +03:00
if download_type is DownloadType.BOUQUETS:
ftp.cwd(services_path)
upload_bouquets(ftp, data_path, remove_unused, callback)
if download_type is DownloadType.ALL:
upload_xml(ftp, data_path, sat_xml_path, STC_XML_FILE, callback)
2019-12-22 20:42:29 +03:00
if s_type is SettingsType.NEUTRINO_MP:
upload_xml(ftp, data_path, sat_xml_path, WEB_TV_XML_FILE, callback)
2018-11-13 14:17:59 +03:00
ftp.cwd(services_path)
upload_bouquets(ftp, data_path, remove_unused, callback)
upload_files(ftp, data_path, DATA_FILES_LIST, callback)
2018-11-13 14:17:59 +03:00
if download_type is DownloadType.PICONS:
upload_picons(ftp, settings.picons_local_path, settings.picons_path, callback, files_filter)
2018-11-13 14:17:59 +03:00
if tn and not use_http:
# resume enigma or restart neutrino
2019-12-22 20:42:29 +03:00
tn.send("init 3" if s_type is SettingsType.ENIGMA_2 else "init 6")
2018-11-13 14:17:59 +03:00
elif ht and use_http:
if download_type is DownloadType.BOUQUETS:
2020-01-11 17:58:50 +03:00
ht.send((url + "servicelistreload?mode=2", "Reloading Userbouquets."))
2018-11-13 14:17:59 +03:00
elif download_type is DownloadType.ALL:
2020-01-11 17:58:50 +03:00
ht.send((url + "servicelistreload?mode=0", "Reloading lamedb and Userbouquets."))
ht.send((url + "powerstate?newstate=4", "Wakeup from Standby."))
2018-02-12 13:34:00 +03:00
2018-11-13 14:17:59 +03:00
if done_callback is not None:
done_callback()
finally:
if tn:
tn.close()
if ht:
ht.close()
2017-11-10 13:38:03 +03:00
2018-11-13 14:17:59 +03:00
def upload_bouquets(ftp, data_path, remove_unused, callback):
if remove_unused:
remove_unused_bouquets(ftp, callback)
upload_files(ftp, data_path, BQ_FILES_LIST, callback)
2018-01-16 18:51:08 +03:00
2018-11-13 14:17:59 +03:00
def upload_files(ftp, data_path, file_list, callback):
for file_name in os.listdir(data_path):
if file_name in STC_XML_FILE or file_name in WEB_TV_XML_FILE:
2018-11-13 14:17:59 +03:00
continue
if file_name.endswith(file_list):
send_file(file_name, data_path, ftp, callback)
2017-11-10 13:38:03 +03:00
2018-11-13 14:17:59 +03:00
def remove_unused_bouquets(ftp, callback):
for file in filter(lambda f: f.endswith(("tv", "radio", "bouquets.xml", "ubouquets.xml")), ftp.nlst()):
callback("Deleting file: {}. Status: {}\n".format(file, ftp.delete(file)))
2018-11-13 14:17:59 +03:00
def upload_xml(ftp, data_path, xml_path, xml_files, callback):
""" Used for transfer *.xml files. """
ftp.cwd(xml_path)
for xml_file in xml_files:
send_file(xml_file, data_path, ftp, callback)
def download_xml(ftp, data_path, xml_path, xml_files, callback):
""" Used for download *.xml files. """
2018-11-13 14:17:59 +03:00
ftp.cwd(xml_path)
list(map(lambda f: download_file(ftp, f, data_path, callback), (f for f in ftp.nlst() if f.endswith(xml_files))))
2018-01-16 18:51:08 +03:00
2017-11-10 13:38:03 +03:00
2020-01-23 00:47:01 +03:00
# ***************** Picons *******************#
def upload_picons(ftp, src, dest, callback, files_filter=None):
try:
ftp.cwd(dest)
except error_perm as e:
if str(e).startswith("550"):
ftp.mkd(dest) # if not exist
ftp.cwd(dest)
2020-01-23 00:47:01 +03:00
for file_name in filter(picons_filter_function(files_filter), os.listdir(src)):
send_file(file_name, src, ftp, callback)
2020-01-23 00:47:01 +03:00
def download_picons(ftp, src, dest, callback, files_filter=None):
2020-01-23 00:47:01 +03:00
try:
ftp.cwd(src)
except error_perm as e:
callback(str(e))
return
for file in filter(picons_filter_function(files_filter), ftp.nlst()):
2020-05-08 17:13:17 +03:00
download_file(ftp, file, dest, callback)
2020-01-23 00:47:01 +03:00
def delete_picons(ftp, callback, dest=None, files_filter=None):
2020-01-23 00:47:01 +03:00
if dest:
try:
ftp.cwd(dest)
except error_perm as e:
callback(str(e))
return
for file in filter(picons_filter_function(files_filter), ftp.nlst()):
callback("Delete file: {}. Status: {}\n".format(file, ftp.delete(file)))
2020-01-23 00:47:01 +03:00
def remove_picons(*, settings, callback, done_callback=None, files_filter=None):
2020-01-23 00:47:01 +03:00
with FTP(host=settings.host, user=settings.user, passwd=settings.password) as ftp:
ftp.encoding = "utf-8"
callback("FTP OK.\n")
delete_picons(ftp, callback, settings.picons_path, files_filter)
2020-01-23 00:47:01 +03:00
if done_callback:
done_callback()
def picons_filter_function(files_filter=None):
return lambda f: f in files_filter if files_filter else f.endswith(PICONS_SUF)
2018-11-06 23:43:13 +03:00
def download_file(ftp, name, save_path, callback):
2018-02-12 13:34:00 +03:00
with open(save_path + name, "wb") as f:
2018-11-13 14:17:59 +03:00
callback("Downloading file: {}. Status: {}\n".format(name, str(ftp.retrbinary("RETR " + name, f.write))))
2018-02-12 13:34:00 +03:00
def send_file(file_name, path, ftp, callback):
2017-11-10 13:38:03 +03:00
""" Opens the file in binary mode and transfers into receiver """
2020-05-05 00:02:52 +03:00
file_src = path + file_name
if not os.path.isfile(file_src):
log("Uploading file: '{}'. File not found. Skipping.".format(file_src))
return
with open(file_src, "rb") as f:
2018-11-13 14:17:59 +03:00
callback("Uploading file: {}. Status: {}\n".format(file_name, str(ftp.storbinary("STOR " + file_name, f))))
2020-01-11 17:58:50 +03:00
def http(user, password, url, callback, use_ssl=False):
init_auth(user, password, url, use_ssl)
data = get_post_data(url, password, url)
2018-11-13 14:17:59 +03:00
while True:
2020-01-11 17:58:50 +03:00
url, message = yield
resp = get_response(HttpRequestType.TEST, url, data).get("e2statetext", None)
callback("HTTP: {} {}\n".format(message, "Successful." if resp and message else ""))
2017-11-10 13:38:03 +03:00
2018-01-18 00:57:58 +03:00
def telnet(host, port=23, user="", password="", timeout=5):
2017-11-12 22:23:12 +03:00
try:
tn = Telnet(host=host, port=port, timeout=timeout)
except socket.timeout:
2018-01-07 16:33:18 +03:00
log("telnet error: socket timeout")
2017-11-12 22:23:12 +03:00
else:
2017-11-14 19:20:16 +03:00
time.sleep(1)
2017-11-12 22:23:12 +03:00
command = yield
2018-01-18 00:57:58 +03:00
if user != "":
2018-01-07 16:33:18 +03:00
tn.read_until(b"login: ")
tn.write(user.encode("utf-8") + b"\n")
time.sleep(timeout)
2018-01-18 00:57:58 +03:00
if password != "":
2018-01-07 16:33:18 +03:00
tn.read_until(b"Password: ")
tn.write(password.encode("utf-8") + b"\n")
time.sleep(timeout)
2017-11-12 22:23:12 +03:00
tn.write("{}\r\n".format(command).encode("utf-8"))
2017-11-14 19:20:16 +03:00
time.sleep(timeout)
2017-11-12 22:23:12 +03:00
command = yield
time.sleep(timeout)
tn.write("{}\r\n".format(command).encode("utf-8"))
time.sleep(timeout)
yield
2019-11-03 18:11:49 +03:00
# ***************** HTTP API *******************#
2018-11-17 23:19:17 +03:00
2019-11-21 16:59:43 +03:00
class HttpAPI:
__MAX_WORKERS = 4
def __init__(self, settings):
self._settings = settings
2020-01-18 15:28:46 +03:00
self._shutdown = False
2020-01-11 17:58:50 +03:00
self._session_id = 0
2020-05-24 18:47:56 +03:00
self._main_url = None
2020-01-18 15:28:46 +03:00
self._base_url = None
2020-01-11 17:58:50 +03:00
self._data = None
2020-06-08 19:32:18 +03:00
self._is_owif = True
self.init()
2019-11-21 16:59:43 +03:00
2019-12-04 23:06:38 +03:00
from concurrent.futures import ThreadPoolExecutor as PoolExecutor
self._executor = PoolExecutor(max_workers=self.__MAX_WORKERS)
2019-11-21 16:59:43 +03:00
2020-01-28 14:51:23 +03:00
def send(self, req_type, ref, callback=print, ref_prefix=""):
2020-01-18 15:28:46 +03:00
if self._shutdown:
return
2019-11-21 16:59:43 +03:00
url = self._base_url + req_type.value
2020-06-08 19:32:18 +03:00
data = self._data
2019-11-03 18:11:49 +03:00
2020-01-14 18:26:05 +03:00
if req_type is HttpRequestType.ZAP or req_type is HttpRequestType.STREAM:
2019-11-03 18:11:49 +03:00
url += urllib.parse.quote(ref)
2020-01-28 14:51:23 +03:00
elif req_type is HttpRequestType.PLAY or req_type is HttpRequestType.PLAYER_REMOVE:
url += "{}{}".format(ref_prefix, urllib.parse.quote(ref).replace("%3A", "%253A"))
2020-05-24 18:47:56 +03:00
elif req_type is HttpRequestType.GRUB:
2020-06-08 19:32:18 +03:00
data = None # Must be disabled for token-based security.
2020-05-24 18:47:56 +03:00
url = "{}/{}{}".format(self._main_url, req_type.value, ref)
2018-11-17 23:19:17 +03:00
2020-03-22 23:26:01 +03:00
def done_callback(f):
callback(f.result())
2020-06-08 19:32:18 +03:00
future = self._executor.submit(get_response, req_type, url, data)
2020-03-22 23:26:01 +03:00
future.add_done_callback(done_callback)
2019-11-21 16:59:43 +03:00
2020-01-17 00:34:18 +03:00
@run_task
def init(self):
2020-01-11 17:58:50 +03:00
user, password = self._settings.http_user, self._settings.http_password
2020-01-03 23:26:55 +03:00
use_ssl = self._settings.http_use_ssl
2020-05-24 18:47:56 +03:00
self._main_url = "http{}://{}:{}".format("s" if use_ssl else "", self._settings.host, self._settings.http_port)
self._base_url = "{}/web/".format(self._main_url)
init_auth(user, password, self._main_url, use_ssl)
url = "{}/web/{}".format(self._main_url, HttpRequestType.TOKEN.value)
2020-01-11 17:58:50 +03:00
s_id = get_session_id(user, password, url)
if s_id != "0":
self._data = urllib.parse.urlencode({"user": user, "password": password, "sessionid": s_id}).encode("utf-8")
2020-06-08 19:32:18 +03:00
self.send(HttpRequestType.INFO, None, self.init_callback)
def init_callback(self, info):
if info:
version = info.get("e2webifversion", "").upper()
self._is_owif = "OWIF" in version
log("HTTP API initialized. Web Interface version: {}".format(version))
@property
def is_owif(self):
""" Returns true if the web interface is OpenWebif. """
return self._is_owif
2020-01-18 15:28:46 +03:00
@run_task
2019-11-21 16:59:43 +03:00
def close(self):
2020-01-18 15:28:46 +03:00
self._shutdown = True
self._executor.shutdown()
2018-11-17 23:19:17 +03:00
2018-11-20 22:21:26 +03:00
2020-01-11 17:58:50 +03:00
def get_response(req_type, url, data=None):
2019-09-28 21:57:41 +03:00
try:
2020-01-11 17:58:50 +03:00
with urlopen(Request(url, data=data), timeout=10) as f:
2020-01-14 18:26:05 +03:00
if req_type is HttpRequestType.STREAM or req_type is HttpRequestType.STREAM_CURRENT:
2020-03-22 23:26:01 +03:00
return {"m3u": f.read().decode("utf-8")}
2020-05-24 18:47:56 +03:00
elif req_type is HttpRequestType.GRUB:
return {"img_data": f.read()}
2020-01-08 21:33:24 +03:00
elif req_type is HttpRequestType.CURRENT:
2020-01-17 00:34:18 +03:00
for el in ETree.fromstring(f.read().decode("utf-8")).iter("e2event"):
return {el.tag: el.text for el in el.iter()} # return first[current] event from the list
2020-01-28 14:51:23 +03:00
elif req_type is HttpRequestType.PLAYER_LIST:
return [{el.tag: el.text for el in el.iter()} for el in
ETree.fromstring(f.read().decode("utf-8")).iter("e2file")]
2019-09-28 21:57:41 +03:00
else:
2020-01-17 00:34:18 +03:00
return {el.tag: el.text for el in ETree.fromstring(f.read().decode("utf-8")).iter()}
except HTTPError as e:
if req_type is HttpRequestType.TEST:
raise e
return {"error_code": e.code}
except (URLError, RemoteDisconnected, ConnectionResetError) as e:
2020-01-08 21:33:24 +03:00
if req_type is HttpRequestType.TEST:
raise e
except ETree.ParseError as e:
log("Parsing response error: {}".format(e))
2020-01-17 00:34:18 +03:00
return {"error_code": -1}
2019-09-28 21:57:41 +03:00
2018-11-08 13:07:24 +03:00
2020-01-11 17:58:50 +03:00
def init_auth(user, password, url, use_ssl=False):
""" Init authentication """
pass_mgr = HTTPPasswordMgrWithDefaultRealm()
pass_mgr.add_password(None, url, user, password)
auth_handler = HTTPBasicAuthHandler(pass_mgr)
if use_ssl:
import ssl
from urllib.request import HTTPSHandler
opener = build_opener(auth_handler, HTTPSHandler(context=ssl._create_unverified_context()))
else:
opener = build_opener(auth_handler)
install_opener(opener)
def get_session_id(user, password, url):
data = urllib.parse.urlencode(dict(user=user, password=password)).encode("utf-8")
return get_response(HttpRequestType.TOKEN, url, data=data).get("e2sessionid", "0")
def get_post_data(base_url, password, user):
s_id = get_session_id(user, password, "{}/web/{}".format(base_url, HttpRequestType.TOKEN.value))
data = None
if s_id != "0":
data = urllib.parse.urlencode({"user": user, "password": password, "sessionid": s_id}).encode("utf-8")
return data
2019-09-28 21:57:41 +03:00
# ***************** Connections testing *******************#
2018-11-08 13:07:24 +03:00
2018-11-09 12:41:36 +03:00
def test_ftp(host, port, user, password, timeout=5):
2018-11-08 13:07:24 +03:00
try:
with FTP(host=host, user=user, passwd=password, timeout=timeout) as ftp:
return ftp.getwelcome()
except (error_perm, ConnectionRefusedError, OSError) as e:
raise TestException(e)
2020-01-03 23:26:55 +03:00
def test_http(host, port, user, password, timeout=5, use_ssl=False, skip_message=False):
2020-01-06 13:17:56 +03:00
params = urlencode({"text": "Connection test", "type": 2, "timeout": timeout})
params = "statusinfo" if skip_message else "message?{}".format(params)
base_url = "http{}://{}:{}".format("s" if use_ssl else "", host, port)
# authentication
init_auth(user, password, base_url, use_ssl)
2020-01-11 17:58:50 +03:00
data = get_post_data(base_url, password, user)
2018-11-08 13:07:24 +03:00
try:
2020-01-11 17:58:50 +03:00
return get_response(HttpRequestType.TEST, "{}/web/{}".format(base_url, params), data).get("e2statetext", "")
2019-12-17 11:59:57 +03:00
except (RemoteDisconnected, URLError, HTTPError) as e:
2018-11-08 13:07:24 +03:00
raise TestException(e)
2018-11-09 12:41:36 +03:00
def test_telnet(host, port, user, password, timeout=5):
2018-11-08 13:07:24 +03:00
try:
gen = telnet_test(host, port, user, password, timeout)
res = next(gen)
2020-01-08 21:33:24 +03:00
msg = str(res, encoding="utf8").strip()
log(msg)
next(gen)
if re.search("password", msg, re.IGNORECASE):
raise TestException(msg)
return msg
2018-11-08 13:07:24 +03:00
except (socket.timeout, OSError) as e:
raise TestException(e)
def telnet_test(host, port, user, password, timeout):
2018-11-04 00:33:50 +03:00
tn = Telnet(host=host, port=port, timeout=timeout)
time.sleep(1)
tn.read_until(b"login: ", timeout=2)
2020-01-08 21:33:24 +03:00
tn.write(user.encode("utf-8") + b"\r")
2018-11-04 00:33:50 +03:00
time.sleep(timeout)
tn.read_until(b"Password: ", timeout=2)
2020-01-08 21:33:24 +03:00
tn.write(password.encode("utf-8") + b"\r")
2018-11-04 00:33:50 +03:00
time.sleep(timeout)
yield tn.read_very_eager()
tn.close()
2020-01-08 21:33:24 +03:00
yield
2018-11-04 00:33:50 +03:00
2017-11-12 22:23:12 +03:00
if __name__ == "__main__":
pass