2025-08-01 14:56:30 +05:00
from plogical . CyberCPLogFileWriter import CyberCPLogFileWriter as logging
import subprocess
import shlex
import os
import socket
import threading as multi
import time
import getpass
import codecs
class ProcessUtilities ( multi . Thread ) :
debugPath = ' /usr/local/CyberCP/debug '
litespeedProcess = " litespeed "
ent = 1
OLS = 0
centos = 1
cent8 = 2
cent9 = 3
ubuntu = 0
ubuntu20 = 3
ubuntu22Check = 0
alma9check = 0
2025-08-29 22:24:13 +05:00
ubuntu24Check = 0 # New flag for Ubuntu 24.04 specific handling
2025-08-01 14:56:30 +05:00
server_address = ' /usr/local/lscpd/admin/comm.sock '
token = " unset "
portPath = ' /usr/local/lscp/conf/bind.conf '
def __init__ ( self , function , extraArgs ) :
multi . Thread . __init__ ( self )
self . function = function
self . extraArgs = extraArgs
def run ( self ) :
try :
if self . function == ' popen ' :
self . customPoen ( )
except BaseException as msg :
logging . writeToFile ( str ( msg ) + ' [ApplicationInstaller.run] ' )
@staticmethod
def fetchCurrentPort ( ) :
command = ' cat %s ' % ( ProcessUtilities . portPath )
port = ProcessUtilities . outputExecutioner ( command )
if port . find ( ' * ' ) > - 1 :
return port . split ( ' : ' ) [ 1 ] . rstrip ( ' \n ' )
else :
return ' 8090 '
@staticmethod
def getLitespeedProcessNumber ( ) :
finalListOfProcesses = [ ]
try :
import psutil
for proc in psutil . process_iter ( ) :
if proc . name ( ) . find ( ProcessUtilities . litespeedProcess ) > - 1 :
finalListOfProcesses . append ( proc . pid )
except BaseException as msg :
logging . writeToFile (
str ( msg ) + " [getLitespeedProcessNumber] " )
return 0
if len ( finalListOfProcesses ) > 0 :
return finalListOfProcesses
else :
return 0
@staticmethod
def restartLitespeed ( ) :
try :
if ProcessUtilities . decideServer ( ) == ProcessUtilities . OLS :
command = " systemctl restart lsws "
else :
command = " /usr/local/lsws/bin/lswsctrl restart "
cmd = shlex . split ( command )
res = subprocess . call ( cmd )
if res == 0 :
return 1
else :
return 0
except subprocess . CalledProcessError as msg :
logging . writeToFile ( str ( msg ) + " [restartLitespeed] " )
@staticmethod
def stopLitespeed ( ) :
try :
if ProcessUtilities . decideServer ( ) == ProcessUtilities . OLS :
command = " systemctl stop lsws "
else :
command = " /usr/local/lsws/bin/lswsctrl stop "
cmd = shlex . split ( command )
res = subprocess . call ( cmd )
if res == 0 :
return 1
else :
return 0
except subprocess . CalledProcessError as msg :
logging . writeToFile ( str ( msg ) + " [stopLitespeed] " )
@staticmethod
def normalExecutioner ( command , shell = False , User = None ) :
try :
f = open ( os . devnull , ' w ' )
if User == None :
if shell == False :
res = subprocess . call ( shlex . split ( command ) , stdout = f , stderr = f )
else :
res = subprocess . call ( command , shell = shell , stdout = f , stderr = f )
else :
if command . find ( ' export ' ) > - 1 :
pass
elif command . find ( ' sudo ' ) == - 1 :
command = ' sudo -u %s %s ' % ( User , command )
if shell == False :
res = subprocess . call ( shlex . split ( command ) , stdout = f , stderr = f )
else :
res = subprocess . call ( command , shell = shell , stdout = f , stderr = f )
if os . path . exists ( ProcessUtilities . debugPath ) :
logging . writeToFile ( f " { command } [Exit Code: { res } ] " )
if res == 0 :
return 1
else :
return 0
except subprocess . CalledProcessError as msg :
logging . writeToFile ( ' %s . [ProcessUtilities.normalExecutioner] ' % ( str ( msg ) ) )
return 0
except BaseException as msg :
logging . writeToFile ( ' %s . [ProcessUtilities.normalExecutioner.Base] ' % ( str ( msg ) ) )
return 0
@staticmethod
def killLiteSpeed ( ) :
try :
command = ' systemctl stop lsws '
ProcessUtilities . normalExecutioner ( command )
except :
pass
pids = ProcessUtilities . getLitespeedProcessNumber ( )
if pids != 0 :
for items in pids :
try :
command = ' sudo kill -9 ' + str ( items )
ProcessUtilities . normalExecutioner ( command )
except :
pass
@staticmethod
def decideServer ( ) :
if os . path . exists ( ' /usr/local/lsws/bin/openlitespeed ' ) :
return ProcessUtilities . OLS
else :
return ProcessUtilities . ent
@staticmethod
def decideDistro ( ) :
distroPath = ' /etc/lsb-release '
distroPathAlma = ' /etc/redhat-release '
# First check if we're on Ubuntu
if os . path . exists ( ' /etc/os-release ' ) :
with open ( ' /etc/os-release ' , ' r ' ) as f :
content = f . read ( )
if ' Ubuntu ' in content :
2025-08-29 22:24:13 +05:00
if ' 24.04 ' in content :
ProcessUtilities . ubuntu22Check = 1
ProcessUtilities . ubuntu24Check = 1 # Specific flag for Ubuntu 24.04
# Ubuntu 24.04 uses newer package versions, set flag for compatibility
ProcessUtilities . alma9check = 1 # Reuse flag to indicate Ubuntu 24.04
return ProcessUtilities . ubuntu20
elif ' 22.04 ' in content :
2025-08-01 14:56:30 +05:00
ProcessUtilities . ubuntu22Check = 1
return ProcessUtilities . ubuntu20
elif ' 20.04 ' in content :
return ProcessUtilities . ubuntu20
return ProcessUtilities . ubuntu
# Check for RedHat-based distributions
if os . path . exists ( distroPathAlma ) :
with open ( distroPathAlma , ' r ' ) as f :
content = f . read ( )
if any ( x in content for x in [ ' CentOS Linux release 8 ' , ' AlmaLinux release 8 ' , ' Rocky Linux release 8 ' ,
' Rocky Linux release 9 ' , ' AlmaLinux release 9 ' , ' CloudLinux release 9 ' ,
2025-09-07 22:57:53 +02:00
' CloudLinux release 8 ' , ' AlmaLinux release 10 ' ] ) :
if any ( x in content for x in [ ' AlmaLinux release 9 ' , ' Rocky Linux release 9 ' , ' AlmaLinux release 10 ' ] ) :
2025-08-01 14:56:30 +05:00
ProcessUtilities . alma9check = 1
return ProcessUtilities . cent8
# Default to Ubuntu if no other distribution is detected
return ProcessUtilities . ubuntu
@staticmethod
def containerCheck ( ) :
try :
command = ' cat /etc/cgrules.conf '
output = ProcessUtilities . outputExecutioner ( command )
if output . find ( ' No such ' ) > - 1 :
return 0
else :
return 1
except BaseException :
return 0
@staticmethod
def setupUDSConnection ( ) :
count = 0
while 1 :
try :
sock = socket . socket ( socket . AF_UNIX , socket . SOCK_STREAM )
sock . connect ( ProcessUtilities . server_address )
return [ sock , " None " ]
except BaseException as msg :
if count == 3 :
logging . writeToFile ( " Failed to connect to LSCPD socket, run ' systemctl restart lscpd ' on command line to fix this issue. " )
return [ - 1 , str ( msg ) ]
else :
count = count + 1
logging . writeToFile ( " Failed to connect to LSCPD UDS, error message: " + str ( msg ) + " . Attempt " + str ( count ) + " , we will attempt again in 2 seconds. [setupUDSConnection:138] " )
time . sleep ( 2 )
@staticmethod
def sendCommand ( command , user = None , dir = None , retries = 3 ) :
"""
Send command to lscpd with retry mechanism
: param command : Command to execute
: param user : User to run command as
: param dir : Directory to run command in
: param retries : Number of retry attempts if connection fails
"""
attempt = 0
last_error = None
ret = None
while attempt < retries :
try :
ret = ProcessUtilities . setupUDSConnection ( )
if ret [ 0 ] == - 1 :
attempt + = 1
last_error = ret [ 1 ]
if attempt < retries :
logging . writeToFile ( f " [sendCommand] Connection failed, attempt { attempt } / { retries } . Retrying in 2 seconds... " )
time . sleep ( 2 )
# Try to restart lscpd if this is the second attempt
if attempt == 2 :
logging . writeToFile ( " [sendCommand] Attempting to restart lscpd service... " )
try :
subprocess . run ( [ ' systemctl ' , ' restart ' , ' lscpd ' ] , capture_output = True , text = True )
time . sleep ( 3 ) # Give lscpd time to start
except Exception as e :
logging . writeToFile ( f " [sendCommand] Failed to restart lscpd: { str ( e ) } " )
continue
else :
logging . writeToFile ( f " [sendCommand] All connection attempts failed. Last error: { last_error } " )
return f " -1Connection failed after { retries } attempts: { last_error } "
# If we get here, connection succeeded
break
except Exception as e :
attempt + = 1
last_error = str ( e )
if attempt < retries :
logging . writeToFile ( f " [sendCommand] Unexpected error, attempt { attempt } / { retries } : { last_error } " )
time . sleep ( 2 )
continue
else :
return f " -1Error after { retries } attempts: { last_error } "
try :
# At this point, we have a successful connection
if ret is None :
return " -1Internal error: connection result is None "
sock = ret [ 0 ]
if ProcessUtilities . token == " unset " :
ProcessUtilities . token = os . environ . get ( ' TOKEN ' )
del os . environ [ ' TOKEN ' ]
if user == None :
if command . find ( ' export ' ) > - 1 :
pass
elif command . find ( ' sudo ' ) == - 1 :
command = ' sudo %s ' % ( command )
if os . path . exists ( ProcessUtilities . debugPath ) :
# Log all commands for debugging
logging . writeToFile ( command )
if dir == None :
sock . sendall ( ( ProcessUtilities . token + command ) . encode ( ' utf-8 ' ) )
else :
command = ' %s -d %s %s ' % ( ProcessUtilities . token , dir , command )
sock . sendall ( command . encode ( ' utf-8 ' ) )
else :
if command . startswith ( ' sudo ' ) :
command = command . replace ( ' sudo ' , ' ' , 1 ) # Replace 'sudo' with an empty string, only once
if dir == None :
command = ' %s -u %s %s ' % ( ProcessUtilities . token , user , command )
else :
command = ' %s -u %s -d %s %s ' % ( ProcessUtilities . token , user , dir , command )
if os . path . exists ( ProcessUtilities . debugPath ) :
# Log all commands for debugging
logging . writeToFile ( command )
sock . sendall ( command . encode ( ' utf-8 ' ) )
# Collect all raw bytes first, then decode as a complete unit
raw_data = b " "
while ( 1 ) :
currentData = sock . recv ( 32 )
if len ( currentData ) == 0 or currentData == None :
break
raw_data + = currentData
# Decode all data at once to prevent UTF-8 character boundary issues
try :
data = raw_data . decode ( ' utf-8 ' , errors = ' replace ' )
except BaseException as msg :
logging . writeToFile ( ' Some data could not be decoded to str, error message: %s ' % str ( msg ) )
data = " "
sock . close ( )
# Log exit code if debug is enabled
if os . path . exists ( ProcessUtilities . debugPath ) :
if len ( data ) == 0 :
logging . writeToFile ( f " └─ Empty response from lscpd " )
else :
try :
exit_char = data [ - 1 ]
# Log raw data for debugging
logging . writeToFile ( f " └─ Response length: { len ( data ) } , last char: { repr ( exit_char ) } " )
if isinstance ( exit_char , str ) :
exit_code = ord ( exit_char )
else :
exit_code = " unknown "
# Log the actual command that was executed (without token)
clean_command = command . replace ( ProcessUtilities . token , ' ' ) . replace ( ' -u %s ' % user if user else ' ' , ' ' ) . replace ( ' -d %s ' % dir if dir else ' ' , ' ' ) . strip ( )
logging . writeToFile ( f " └─ { clean_command } [Exit Code: { exit_code } ] " )
except Exception as e :
logging . writeToFile ( f " └─ Failed to log exit code: { str ( e ) } " )
#logging.writeToFile('Final data: %s.' % (str(data)))
return data
except BaseException as msg :
logging . writeToFile ( str ( msg ) + " [hey:sendCommand] " )
return " 0 " + str ( msg )
@staticmethod
def executioner ( command , user = None , shell = False ) :
try :
if os . path . exists ( ProcessUtilities . debugPath ) :
logging . writeToFile ( f " [executioner] Called with command: { command } , user: { user } , shell: { shell } " )
if getpass . getuser ( ) == ' root ' :
if os . path . exists ( ProcessUtilities . debugPath ) :
logging . writeToFile ( f " [executioner] Running as root, using normalExecutioner " )
ProcessUtilities . normalExecutioner ( command , shell , user )
return 1
if os . path . exists ( ProcessUtilities . debugPath ) :
logging . writeToFile ( f " [executioner] Not root, using sendCommand via lscpd " )
ret = ProcessUtilities . sendCommand ( command , user )
# Check if we got any response
if not ret or len ( ret ) == 0 :
logging . writeToFile ( " Empty response from lscpd for command: %s " % command )
return 0
# Extract exit code from last character
try :
exitCode = ret [ - 1 ]
# Convert the last character to its ASCII value
if isinstance ( exitCode , str ) :
exitCode = ord ( exitCode )
elif isinstance ( exitCode , bytes ) :
exitCode = exitCode [ 0 ] if len ( exitCode ) > 0 else 1
else :
# Try the original hex encoding method as fallback
exitCode = int ( codecs . encode ( exitCode . encode ( ) , ' hex ' ) )
if os . path . exists ( ProcessUtilities . debugPath ) :
logging . writeToFile ( f ' Exit code from lscpd: { exitCode } for command: { command } ' )
if exitCode == 0 :
return 1
else :
return 0
except Exception as e :
logging . writeToFile ( f " Failed to parse exit code: { str ( e ) } for command: { command } " )
return 0
except BaseException as msg :
logging . writeToFile ( str ( msg ) + " [executioner] " )
return 0
@staticmethod
def outputExecutioner ( command , user = None , shell = None , dir = None , retRequired = None ) :
try :
if getpass . getuser ( ) == ' root ' :
if os . path . exists ( ProcessUtilities . debugPath ) :
logging . writeToFile ( command )
if user != None :
if not command . startswith ( ' sudo ' ) :
command = f ' sudo -u { user } { command } '
# Ensure UTF-8 environment for proper character handling
env = os . environ . copy ( )
env [ ' LC_ALL ' ] = ' C.UTF-8 '
env [ ' LANG ' ] = ' C.UTF-8 '
env [ ' PYTHONIOENCODING ' ] = ' utf-8 '
if shell == None or shell == True :
p = subprocess . Popen ( command , shell = True , stdout = subprocess . PIPE , stderr = subprocess . STDOUT , env = env , encoding = ' utf-8 ' , errors = ' replace ' )
else :
p = subprocess . Popen ( shlex . split ( command ) , stdout = subprocess . PIPE , stderr = subprocess . STDOUT , env = env , encoding = ' utf-8 ' , errors = ' replace ' )
if retRequired :
output , _ = p . communicate ( )
exit_code = p . returncode
if os . path . exists ( ProcessUtilities . debugPath ) :
logging . writeToFile ( f " └─ [Exit Code: { exit_code } ] " )
if exit_code == 0 :
return 1 , output
else :
return 0 , output
else :
output = p . communicate ( ) [ 0 ]
exit_code = p . returncode
if os . path . exists ( ProcessUtilities . debugPath ) :
logging . writeToFile ( f " └─ [Exit Code: { exit_code } ] " )
return output
if type ( command ) == list :
command = " " . join ( command )
if retRequired :
ret = ProcessUtilities . sendCommand ( command , user )
# Check if we got any response
if not ret or len ( ret ) == 0 :
logging . writeToFile ( " Empty response from lscpd in outputExecutioner for command: %s " % command )
return 0 , " "
# Extract exit code from last character
try :
exitCode = ret [ - 1 ]
if os . path . exists ( ProcessUtilities . debugPath ) :
logging . writeToFile ( f ' Raw exit code character in outputExecutioner: { repr ( exitCode ) } ' )
# Convert the last character to its ASCII value
if isinstance ( exitCode , str ) :
exitCode = ord ( exitCode )
elif isinstance ( exitCode , bytes ) :
exitCode = exitCode [ 0 ] if len ( exitCode ) > 0 else 1
else :
# Try the original hex encoding method as fallback
exitCode = int ( codecs . encode ( exitCode . encode ( ) , ' hex ' ) )
if os . path . exists ( ProcessUtilities . debugPath ) :
logging . writeToFile ( f ' Parsed exit code in outputExecutioner: { exitCode } for command: { command } ' )
if exitCode == 0 :
return 1 , ret [ : - 1 ]
else :
return 0 , ret [ : - 1 ]
except Exception as e :
logging . writeToFile ( f " Failed to parse exit code in outputExecutioner: { str ( e ) } for command: { command } " )
return 0 , ret [ : - 1 ] if len ( ret ) > 1 else " "
else :
return ProcessUtilities . sendCommand ( command , user , dir ) [ : - 1 ]
except BaseException as msg :
logging . writeToFile ( str ( msg ) + " [outputExecutioner:188] " )
def customPoen ( self ) :
try :
if type ( self . extraArgs [ ' command ' ] ) == str or type ( self . extraArgs [ ' command ' ] ) == bytes :
command = self . extraArgs [ ' command ' ]
else :
command = " " . join ( self . extraArgs [ ' command ' ] )
if getpass . getuser ( ) == ' root ' :
subprocess . call ( command , shell = True )
else :
ProcessUtilities . sendCommand ( command , self . extraArgs [ ' user ' ] )
return 1
except BaseException as msg :
logging . writeToFile ( str ( msg ) + " [customPoen] " )
@staticmethod
def popenExecutioner ( command , user = None ) :
try :
extraArgs = { }
extraArgs [ ' command ' ] = command
extraArgs [ ' user ' ] = user
pu = ProcessUtilities ( " popen " , extraArgs )
pu . start ( )
except BaseException as msg :
logging . writeToFile ( str ( msg ) + " [popenExecutioner] " )
@staticmethod
def BuildCommand ( path , functionName , parameters ) :
execPath = " /usr/local/CyberCP/bin/python %s %s " % ( path , functionName )
for key , value in parameters . items ( ) :
execPath = execPath + ' -- %s %s ' % ( key , value )
return execPath
@staticmethod
def fetch_latest_lts_version_for_node ( ) :
import requests
url = " https://api.github.com/repos/nodejs/node/releases "
try :
response = requests . get ( url )
if response . status_code == 200 :
releases = response . json ( )
for release in releases :
if release . get ( ' prerelease ' ) == False and ' LTS ' in release . get ( ' name ' ) :
lts_version = release . get ( ' tag_name ' )
return lts_version
else :
print ( " Failed to fetch releases. Status code: " , response . status_code )
except Exception as e :
print ( " An error occurred: " , e )
return None
@staticmethod
def fetch_latest_prestashop_version ( ) :
import requests
url = " https://api.github.com/repos/PrestaShop/PrestaShop/releases "
try :
response = requests . get ( url )
if response . status_code == 200 :
releases = response . json ( )
return releases [ 0 ] . get ( ' tag_name ' )
else :
logging . writeToFile ( f " Failed to fetch releases. Status code: { response . status_code } " )
print ( " [fetch_latest_prestashop_version] Failed to fetch releases. Status code: " , response . status_code )
except Exception as e :
print ( " An error occurred: " , e )
logging . writeToFile ( f " [fetch_latest_prestashop_version] An error occurred: { str ( e ) } " )
return None