mirror of
				https://github.com/usmannasir/cyberpanel.git
				synced 2025-10-31 02:15:55 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			572 lines
		
	
	
		
			22 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			572 lines
		
	
	
		
			22 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from plogical.CyberCPLogFileWriter import CyberCPLogFileWriter as logging
 | |
| import subprocess
 | |
| import shlex
 | |
| import os
 | |
| import socket
 | |
| import threading as multi
 | |
| import time
 | |
| import getpass
 | |
| import codecs
 | |
| 
 | |
| class ProcessUtilities(multi.Thread):
 | |
|     debugPath = '/usr/local/CyberCP/debug'
 | |
|     litespeedProcess = "litespeed"
 | |
|     ent = 1
 | |
|     OLS = 0
 | |
|     centos = 1
 | |
|     cent8 = 2
 | |
|     cent9 = 3
 | |
|     ubuntu = 0
 | |
|     ubuntu20 = 3
 | |
|     ubuntu22Check = 0
 | |
|     alma9check = 0
 | |
|     ubuntu24Check = 0  # New flag for Ubuntu 24.04 specific handling
 | |
|     server_address = '/usr/local/lscpd/admin/comm.sock'
 | |
|     token = "unset"
 | |
|     portPath = '/usr/local/lscp/conf/bind.conf'
 | |
| 
 | |
|     def __init__(self, function, extraArgs):
 | |
|         multi.Thread.__init__(self)
 | |
|         self.function = function
 | |
|         self.extraArgs = extraArgs
 | |
| 
 | |
|     def run(self):
 | |
|         try:
 | |
|             if self.function == 'popen':
 | |
|                 self.customPoen()
 | |
|         except BaseException as msg:
 | |
|             logging.writeToFile( str(msg) + ' [ApplicationInstaller.run]')
 | |
| 
 | |
|     @staticmethod
 | |
|     def fetchCurrentPort():
 | |
|         command = 'cat %s' % (ProcessUtilities.portPath)
 | |
|         port = ProcessUtilities.outputExecutioner(command)
 | |
| 
 | |
|         if port.find('*') > -1:
 | |
|             return port.split(':')[1].rstrip('\n')
 | |
|         else:
 | |
|             return '8090'
 | |
| 
 | |
|     @staticmethod
 | |
|     def getLitespeedProcessNumber():
 | |
|         finalListOfProcesses = []
 | |
| 
 | |
|         try:
 | |
|             import psutil
 | |
|             for proc in psutil.process_iter():
 | |
|                 if proc.name().find(ProcessUtilities.litespeedProcess) > -1:
 | |
|                     finalListOfProcesses.append(proc.pid)
 | |
| 
 | |
|         except BaseException as msg:
 | |
|             logging.writeToFile(
 | |
|                 str(msg) + " [getLitespeedProcessNumber]")
 | |
|             return 0
 | |
| 
 | |
|         if len(finalListOfProcesses) > 0:
 | |
|          return finalListOfProcesses
 | |
|         else:
 | |
|             return 0
 | |
| 
 | |
|     @staticmethod
 | |
|     def restartLitespeed():
 | |
|         try:
 | |
|             if ProcessUtilities.decideServer() == ProcessUtilities.OLS:
 | |
|                 command = "systemctl restart lsws"
 | |
|             else:
 | |
|                 command = "/usr/local/lsws/bin/lswsctrl restart"
 | |
| 
 | |
|             cmd = shlex.split(command)
 | |
|             res = subprocess.call(cmd)
 | |
| 
 | |
|             if res == 0:
 | |
|                 return 1
 | |
|             else:
 | |
|                 return 0
 | |
| 
 | |
|         except subprocess.CalledProcessError as msg:
 | |
|             logging.writeToFile(str(msg) + "[restartLitespeed]")
 | |
| 
 | |
|     @staticmethod
 | |
|     def stopLitespeed():
 | |
|         try:
 | |
|             if ProcessUtilities.decideServer() == ProcessUtilities.OLS:
 | |
|                 command = "systemctl stop lsws"
 | |
|             else:
 | |
|                 command = "/usr/local/lsws/bin/lswsctrl stop"
 | |
| 
 | |
|             cmd = shlex.split(command)
 | |
|             res = subprocess.call(cmd)
 | |
| 
 | |
|             if res == 0:
 | |
|                 return 1
 | |
|             else:
 | |
|                 return 0
 | |
| 
 | |
|         except subprocess.CalledProcessError as msg:
 | |
|             logging.writeToFile(str(msg) + "[stopLitespeed]")
 | |
| 
 | |
|     @staticmethod
 | |
|     def normalExecutioner(command, shell=False, User=None):
 | |
|         try:
 | |
| 
 | |
|             f = open(os.devnull, 'w')
 | |
| 
 | |
|             if User == None:
 | |
|                 if shell == False:
 | |
|                     res = subprocess.call(shlex.split(command), stdout=f, stderr=f)
 | |
|                 else:
 | |
|                     res = subprocess.call(command, shell=shell, stdout=f, stderr=f)
 | |
|             else:
 | |
|                 if command.find('export') > -1:
 | |
|                     pass
 | |
|                 elif command.find('sudo') == -1:
 | |
|                     command = 'sudo -u %s %s' % (User, command)
 | |
| 
 | |
|                 if shell == False:
 | |
|                     res = subprocess.call(shlex.split(command), stdout=f, stderr=f)
 | |
|                 else:
 | |
|                     res = subprocess.call(command, shell=shell, stdout=f, stderr=f)
 | |
| 
 | |
|             if os.path.exists(ProcessUtilities.debugPath):
 | |
|                 logging.writeToFile(f"{command} [Exit Code: {res}]")
 | |
| 
 | |
|             if res == 0:
 | |
|                 return 1
 | |
|             else:
 | |
|                 return 0
 | |
|         except subprocess.CalledProcessError as msg:
 | |
|             logging.writeToFile('%s. [ProcessUtilities.normalExecutioner]' % (str(msg)))
 | |
|             return 0
 | |
|         except BaseException as msg:
 | |
|             logging.writeToFile('%s. [ProcessUtilities.normalExecutioner.Base]' % (str(msg)))
 | |
|             return 0
 | |
| 
 | |
|     @staticmethod
 | |
|     def killLiteSpeed():
 | |
|         try:
 | |
|             command = 'systemctl stop lsws'
 | |
|             ProcessUtilities.normalExecutioner(command)
 | |
|         except:
 | |
|             pass
 | |
| 
 | |
|         pids = ProcessUtilities.getLitespeedProcessNumber()
 | |
|         if pids !=0:
 | |
|             for items in pids:
 | |
|                 try:
 | |
|                     command = 'sudo kill -9 ' + str(items)
 | |
|                     ProcessUtilities.normalExecutioner(command)
 | |
|                 except:
 | |
|                     pass
 | |
| 
 | |
|     @staticmethod
 | |
|     def decideServer():
 | |
|         if os.path.exists('/usr/local/lsws/bin/openlitespeed'):
 | |
|             return ProcessUtilities.OLS
 | |
|         else:
 | |
|             return ProcessUtilities.ent
 | |
| 
 | |
|     @staticmethod
 | |
|     def decideDistro():
 | |
|         distroPath = '/etc/lsb-release'
 | |
|         distroPathAlma = '/etc/redhat-release'
 | |
| 
 | |
|         # First check if we're on Ubuntu
 | |
|         if os.path.exists('/etc/os-release'):
 | |
|             with open('/etc/os-release', 'r') as f:
 | |
|                 content = f.read()
 | |
|                 if 'Ubuntu' in content:
 | |
|                     if '24.04' in content:
 | |
|                         ProcessUtilities.ubuntu22Check = 1
 | |
|                         ProcessUtilities.ubuntu24Check = 1  # Specific flag for Ubuntu 24.04
 | |
|                         # Ubuntu 24.04 uses newer package versions, set flag for compatibility
 | |
|                         ProcessUtilities.alma9check = 1  # Reuse flag to indicate Ubuntu 24.04
 | |
|                         return ProcessUtilities.ubuntu20
 | |
|                     elif '22.04' in content:
 | |
|                         ProcessUtilities.ubuntu22Check = 1
 | |
|                         return ProcessUtilities.ubuntu20
 | |
|                     elif '20.04' in content:
 | |
|                         return ProcessUtilities.ubuntu20
 | |
|                     return ProcessUtilities.ubuntu
 | |
| 
 | |
|         # Check for RedHat-based distributions
 | |
|         if os.path.exists(distroPathAlma):
 | |
|             with open(distroPathAlma, 'r') as f:
 | |
|                 content = f.read()
 | |
|                 if any(x in content for x in ['CentOS Linux release 8', 'AlmaLinux release 8', 'Rocky Linux release 8', 
 | |
|                                             'Rocky Linux release 9', 'AlmaLinux release 9', 'CloudLinux release 9', 
 | |
|                                             'CloudLinux release 8']):
 | |
|                     if any(x in content for x in ['AlmaLinux release 9', 'Rocky Linux release 9']):
 | |
|                         ProcessUtilities.alma9check = 1
 | |
|                     return ProcessUtilities.cent8
 | |
| 
 | |
|         # Default to Ubuntu if no other distribution is detected
 | |
|         return ProcessUtilities.ubuntu
 | |
| 
 | |
|     @staticmethod
 | |
|     def containerCheck():
 | |
|         try:
 | |
|             command = 'cat /etc/cgrules.conf'
 | |
|             output = ProcessUtilities.outputExecutioner(command)
 | |
|             if output.find('No such') > -1:
 | |
|                 return 0
 | |
|             else:
 | |
|                 return 1
 | |
|         except BaseException:
 | |
|             return 0
 | |
| 
 | |
|     @staticmethod
 | |
|     def setupUDSConnection():
 | |
|         count = 0
 | |
|         while 1:
 | |
|             try:
 | |
|                 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
 | |
|                 sock.connect(ProcessUtilities.server_address)
 | |
|                 return [sock, "None"]
 | |
|             except BaseException as msg:
 | |
|                 if count == 3:
 | |
|                     logging.writeToFile("Failed to connect to LSCPD socket, run 'systemctl restart lscpd' on command line to fix this issue.")
 | |
|                     return [-1, str(msg)]
 | |
|                 else:
 | |
|                     count = count + 1
 | |
| 
 | |
|                 logging.writeToFile("Failed to connect to LSCPD UDS, error message:" + str(msg) + ". Attempt " + str(count) + ", we will attempt again in 2 seconds. [setupUDSConnection:138]")
 | |
|                 time.sleep(2)
 | |
| 
 | |
|     @staticmethod
 | |
|     def sendCommand(command, user=None, dir=None, retries=3):
 | |
|         """
 | |
|         Send command to lscpd with retry mechanism
 | |
|         
 | |
|         :param command: Command to execute
 | |
|         :param user: User to run command as
 | |
|         :param dir: Directory to run command in
 | |
|         :param retries: Number of retry attempts if connection fails
 | |
|         """
 | |
|         attempt = 0
 | |
|         last_error = None
 | |
|         ret = None
 | |
|         
 | |
|         while attempt < retries:
 | |
|             try:
 | |
|                 ret = ProcessUtilities.setupUDSConnection()
 | |
| 
 | |
|                 if ret[0] == -1:
 | |
|                     attempt += 1
 | |
|                     last_error = ret[1]
 | |
|                     if attempt < retries:
 | |
|                         logging.writeToFile(f"[sendCommand] Connection failed, attempt {attempt}/{retries}. Retrying in 2 seconds...")
 | |
|                         time.sleep(2)
 | |
|                         # Try to restart lscpd if this is the second attempt
 | |
|                         if attempt == 2:
 | |
|                             logging.writeToFile("[sendCommand] Attempting to restart lscpd service...")
 | |
|                             try:
 | |
|                                 subprocess.run(['systemctl', 'restart', 'lscpd'], capture_output=True, text=True)
 | |
|                                 time.sleep(3)  # Give lscpd time to start
 | |
|                             except Exception as e:
 | |
|                                 logging.writeToFile(f"[sendCommand] Failed to restart lscpd: {str(e)}")
 | |
|                         continue
 | |
|                     else:
 | |
|                         logging.writeToFile(f"[sendCommand] All connection attempts failed. Last error: {last_error}")
 | |
|                         return f"-1Connection failed after {retries} attempts: {last_error}"
 | |
|                 
 | |
|                 # If we get here, connection succeeded
 | |
|                 break
 | |
|                 
 | |
|             except Exception as e:
 | |
|                 attempt += 1
 | |
|                 last_error = str(e)
 | |
|                 if attempt < retries:
 | |
|                     logging.writeToFile(f"[sendCommand] Unexpected error, attempt {attempt}/{retries}: {last_error}")
 | |
|                     time.sleep(2)
 | |
|                     continue
 | |
|                 else:
 | |
|                     return f"-1Error after {retries} attempts: {last_error}"
 | |
|         
 | |
|         try:
 | |
|             # At this point, we have a successful connection
 | |
|             if ret is None:
 | |
|                 return "-1Internal error: connection result is None"
 | |
|             sock = ret[0]
 | |
|             
 | |
|             if ProcessUtilities.token == "unset":
 | |
|                 ProcessUtilities.token = os.environ.get('TOKEN')
 | |
|                 del os.environ['TOKEN']
 | |
| 
 | |
|             if user == None:
 | |
|                 if command.find('export') > -1:
 | |
|                     pass
 | |
|                 elif command.find('sudo') == -1:
 | |
|                     command = 'sudo %s' % (command)
 | |
| 
 | |
|                 if os.path.exists(ProcessUtilities.debugPath):
 | |
|                     # Log all commands for debugging
 | |
|                     logging.writeToFile(command)
 | |
| 
 | |
|                 if dir == None:
 | |
|                     sock.sendall((ProcessUtilities.token + command).encode('utf-8'))
 | |
|                 else:
 | |
|                     command = '%s-d %s %s' % (ProcessUtilities.token, dir, command)
 | |
|                     sock.sendall(command.encode('utf-8'))
 | |
|             else:
 | |
|                 if command.startswith('sudo'):
 | |
|                     command = command.replace('sudo', '', 1)  # Replace 'sudo' with an empty string, only once
 | |
| 
 | |
|                 if dir == None:
 | |
|                     command = '%s-u %s %s' % (ProcessUtilities.token, user, command)
 | |
|                 else:
 | |
|                     command = '%s-u %s -d %s %s' % (ProcessUtilities.token, user, dir, command)
 | |
| 
 | |
| 
 | |
| 
 | |
|                 if os.path.exists(ProcessUtilities.debugPath):
 | |
|                     # Log all commands for debugging
 | |
|                     logging.writeToFile(command)
 | |
| 
 | |
|                 sock.sendall(command.encode('utf-8'))
 | |
| 
 | |
|             # Collect all raw bytes first, then decode as a complete unit
 | |
|             raw_data = b""
 | |
| 
 | |
|             while (1):
 | |
|                 currentData = sock.recv(32)
 | |
|                 if len(currentData) == 0 or currentData == None:
 | |
|                     break
 | |
|                 raw_data += currentData
 | |
| 
 | |
|             # Decode all data at once to prevent UTF-8 character boundary issues
 | |
|             try:
 | |
|                 data = raw_data.decode('utf-8', errors='replace')
 | |
|             except BaseException as msg:
 | |
|                 logging.writeToFile('Some data could not be decoded to str, error message: %s' % str(msg))
 | |
|                 data = ""
 | |
| 
 | |
|             sock.close()
 | |
|             
 | |
|             # Log exit code if debug is enabled
 | |
|             if os.path.exists(ProcessUtilities.debugPath):
 | |
|                 if len(data) == 0:
 | |
|                     logging.writeToFile(f"    └─ Empty response from lscpd")
 | |
|                 else:
 | |
|                     try:
 | |
|                         exit_char = data[-1]
 | |
|                         # Log raw data for debugging
 | |
|                         logging.writeToFile(f"    └─ Response length: {len(data)}, last char: {repr(exit_char)}")
 | |
|                         
 | |
|                         if isinstance(exit_char, str):
 | |
|                             exit_code = ord(exit_char)
 | |
|                         else:
 | |
|                             exit_code = "unknown"
 | |
|                         # Log the actual command that was executed (without token)
 | |
|                         clean_command = command.replace(ProcessUtilities.token, '').replace('-u %s ' % user if user else '', '').replace('-d %s ' % dir if dir else '', '').strip()
 | |
|                         logging.writeToFile(f"    └─ {clean_command} [Exit Code: {exit_code}]")
 | |
|                     except Exception as e:
 | |
|                         logging.writeToFile(f"    └─ Failed to log exit code: {str(e)}")
 | |
|             
 | |
|             #logging.writeToFile('Final data: %s.' % (str(data)))
 | |
| 
 | |
|             return data
 | |
|         except BaseException as msg:
 | |
|             logging.writeToFile(str(msg) + " [hey:sendCommand]")
 | |
|             return "0" + str(msg)
 | |
| 
 | |
|     @staticmethod
 | |
|     def executioner(command, user=None, shell=False):
 | |
|         try:
 | |
|             if os.path.exists(ProcessUtilities.debugPath):
 | |
|                 logging.writeToFile(f"[executioner] Called with command: {command}, user: {user}, shell: {shell}")
 | |
|             
 | |
|             if getpass.getuser() == 'root':
 | |
|                 if os.path.exists(ProcessUtilities.debugPath):
 | |
|                     logging.writeToFile(f"[executioner] Running as root, using normalExecutioner")
 | |
|                 ProcessUtilities.normalExecutioner(command, shell, user)
 | |
|                 return 1
 | |
| 
 | |
|             if os.path.exists(ProcessUtilities.debugPath):
 | |
|                 logging.writeToFile(f"[executioner] Not root, using sendCommand via lscpd")
 | |
|             
 | |
|             ret = ProcessUtilities.sendCommand(command, user)
 | |
| 
 | |
|             # Check if we got any response
 | |
|             if not ret or len(ret) == 0:
 | |
|                 logging.writeToFile("Empty response from lscpd for command: %s" % command)
 | |
|                 return 0
 | |
|             
 | |
|             # Extract exit code from last character
 | |
|             try:
 | |
|                 exitCode = ret[-1]
 | |
|                 # Convert the last character to its ASCII value
 | |
|                 if isinstance(exitCode, str):
 | |
|                     exitCode = ord(exitCode)
 | |
|                 elif isinstance(exitCode, bytes):
 | |
|                     exitCode = exitCode[0] if len(exitCode) > 0 else 1
 | |
|                 else:
 | |
|                     # Try the original hex encoding method as fallback
 | |
|                     exitCode = int(codecs.encode(exitCode.encode(), 'hex'))
 | |
|                 
 | |
|                 if os.path.exists(ProcessUtilities.debugPath):
 | |
|                     logging.writeToFile(f'Exit code from lscpd: {exitCode} for command: {command}')
 | |
|                 
 | |
|                 if exitCode == 0:
 | |
|                     return 1
 | |
|                 else:
 | |
|                     return 0
 | |
|             except Exception as e:
 | |
|                 logging.writeToFile(f"Failed to parse exit code: {str(e)} for command: {command}")
 | |
|                 return 0
 | |
| 
 | |
|         except BaseException as msg:
 | |
|             logging.writeToFile(str(msg) + " [executioner]")
 | |
|             return 0
 | |
| 
 | |
|     @staticmethod
 | |
|     def outputExecutioner(command, user=None, shell = None, dir = None, retRequired = None):
 | |
|         try:
 | |
|             if getpass.getuser() == 'root':
 | |
|                 if os.path.exists(ProcessUtilities.debugPath):
 | |
|                     logging.writeToFile(command)
 | |
| 
 | |
|                 if user!=None:
 | |
|                     if not command.startswith('sudo'):
 | |
|                         command = f'sudo -u {user} {command}'
 | |
|                 # Ensure UTF-8 environment for proper character handling
 | |
|                 env = os.environ.copy()
 | |
|                 env['LC_ALL'] = 'C.UTF-8'
 | |
|                 env['LANG'] = 'C.UTF-8'
 | |
|                 env['PYTHONIOENCODING'] = 'utf-8'
 | |
|                 
 | |
|                 if shell == None or shell == True:
 | |
|                     p = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=env, encoding='utf-8', errors='replace')
 | |
|                 else:
 | |
|                     p = subprocess.Popen(shlex.split(command),  stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=env, encoding='utf-8', errors='replace')
 | |
| 
 | |
|                 if retRequired:
 | |
|                     output, _ = p.communicate()
 | |
|                     exit_code = p.returncode
 | |
|                     if os.path.exists(ProcessUtilities.debugPath):
 | |
|                         logging.writeToFile(f"    └─ [Exit Code: {exit_code}]")
 | |
|                     if exit_code == 0:
 | |
|                         return 1, output
 | |
|                     else:
 | |
|                         return 0, output
 | |
|                 else:
 | |
|                     output = p.communicate()[0]
 | |
|                     exit_code = p.returncode
 | |
|                     if os.path.exists(ProcessUtilities.debugPath):
 | |
|                         logging.writeToFile(f"    └─ [Exit Code: {exit_code}]")
 | |
|                     return output
 | |
| 
 | |
|             if type(command) == list:
 | |
|                 command = " ".join(command)
 | |
| 
 | |
|             if retRequired:
 | |
|                 ret = ProcessUtilities.sendCommand(command, user)
 | |
| 
 | |
|                 # Check if we got any response
 | |
|                 if not ret or len(ret) == 0:
 | |
|                     logging.writeToFile("Empty response from lscpd in outputExecutioner for command: %s" % command)
 | |
|                     return 0, ""
 | |
|                 
 | |
|                 # Extract exit code from last character
 | |
|                 try:
 | |
|                     exitCode = ret[-1]
 | |
|                     
 | |
|                     if os.path.exists(ProcessUtilities.debugPath):
 | |
|                         logging.writeToFile(f'Raw exit code character in outputExecutioner: {repr(exitCode)}')
 | |
|                     
 | |
|                     # Convert the last character to its ASCII value
 | |
|                     if isinstance(exitCode, str):
 | |
|                         exitCode = ord(exitCode)
 | |
|                     elif isinstance(exitCode, bytes):
 | |
|                         exitCode = exitCode[0] if len(exitCode) > 0 else 1
 | |
|                     else:
 | |
|                         # Try the original hex encoding method as fallback
 | |
|                         exitCode = int(codecs.encode(exitCode.encode(), 'hex'))
 | |
|                     
 | |
|                     if os.path.exists(ProcessUtilities.debugPath):
 | |
|                         logging.writeToFile(f'Parsed exit code in outputExecutioner: {exitCode} for command: {command}')
 | |
|                     
 | |
|                     if exitCode == 0:
 | |
|                         return 1, ret[:-1]
 | |
|                     else:
 | |
|                         return 0, ret[:-1]
 | |
|                 except Exception as e:
 | |
|                     logging.writeToFile(f"Failed to parse exit code in outputExecutioner: {str(e)} for command: {command}")
 | |
|                     return 0, ret[:-1] if len(ret) > 1 else ""
 | |
|             else:
 | |
|                 return ProcessUtilities.sendCommand(command, user, dir)[:-1]
 | |
|         except BaseException as msg:
 | |
|             logging.writeToFile(str(msg) + "[outputExecutioner:188]")
 | |
| 
 | |
|     def customPoen(self):
 | |
|         try:
 | |
| 
 | |
|             if type(self.extraArgs['command']) == str or type(self.extraArgs['command']) == bytes:
 | |
|                 command = self.extraArgs['command']
 | |
|             else:
 | |
|                 command = " ".join(self.extraArgs['command'])
 | |
| 
 | |
|             if getpass.getuser() == 'root':
 | |
|                 subprocess.call(command, shell=True)
 | |
|             else:
 | |
|                 ProcessUtilities.sendCommand(command, self.extraArgs['user'])
 | |
| 
 | |
|             return 1
 | |
|         except BaseException as msg:
 | |
|             logging.writeToFile(str(msg) + " [customPoen]")
 | |
| 
 | |
|     @staticmethod
 | |
|     def popenExecutioner(command, user=None):
 | |
|         try:
 | |
|             extraArgs = {}
 | |
|             extraArgs['command'] = command
 | |
|             extraArgs['user'] = user
 | |
|             pu = ProcessUtilities("popen", extraArgs)
 | |
|             pu.start()
 | |
|         except BaseException as msg:
 | |
|             logging.writeToFile(str(msg) + " [popenExecutioner]")
 | |
| 
 | |
|     @staticmethod
 | |
|     def BuildCommand(path, functionName, parameters):
 | |
| 
 | |
|         execPath = "/usr/local/CyberCP/bin/python %s %s " % (path, functionName)
 | |
|         for key, value in parameters.items():
 | |
|             execPath = execPath + ' --%s %s' % (key, value)
 | |
| 
 | |
|         return execPath
 | |
| 
 | |
| 
 | |
|     @staticmethod
 | |
|     def fetch_latest_lts_version_for_node():
 | |
|         import requests
 | |
|         url = "https://api.github.com/repos/nodejs/node/releases"
 | |
|         try:
 | |
|             response = requests.get(url)
 | |
|             if response.status_code == 200:
 | |
|                 releases = response.json()
 | |
|                 for release in releases:
 | |
|                     if release.get('prerelease') == False and 'LTS' in release.get('name'):
 | |
|                         lts_version = release.get('tag_name')
 | |
|                         return lts_version
 | |
|             else:
 | |
|                 print("Failed to fetch releases. Status code:", response.status_code)
 | |
|         except Exception as e:
 | |
|             print("An error occurred:", e)
 | |
|         return None
 | |
| 
 | |
|     @staticmethod
 | |
|     def fetch_latest_prestashop_version():
 | |
|         import requests
 | |
|         url = "https://api.github.com/repos/PrestaShop/PrestaShop/releases"
 | |
|         try:
 | |
|             response = requests.get(url)
 | |
|             if response.status_code == 200:
 | |
|                 releases = response.json()
 | |
|                 return releases[0].get('tag_name')
 | |
|             else:
 | |
|                 logging.writeToFile(f"Failed to fetch releases. Status code: {response.status_code}"  )
 | |
|                 print("[fetch_latest_prestashop_version] Failed to fetch releases. Status code:", response.status_code)
 | |
|         except Exception as e:
 | |
|             print("An error occurred:", e)
 | |
|             logging.writeToFile(f"[fetch_latest_prestashop_version] An error occurred: {str(e)}")
 | |
|         return None
 | |
|      |