bug fix: to wp install, improved file manager and custom ssl implementation

This commit is contained in:
usmannasir
2025-04-27 15:28:50 +05:00
parent a2e4e4eca5
commit 031a8e7851
11 changed files with 1578 additions and 459 deletions

View File

@@ -173,57 +173,133 @@ class sslUtilities:
@staticmethod
def PatchVhostConf(virtualHostName):
"""Patch the virtual host configuration to add ACME challenge support
This function adds the necessary configuration to handle ACME challenges
for both OpenLiteSpeed (OLS) and Apache configurations. It also checks
for potential configuration conflicts before making changes.
Args:
virtualHostName (str): The domain name to configure
Returns:
tuple: (status, message) where status is 1 for success, 0 for failure
"""
try:
confPath = sslUtilities.Server_root + "/conf/vhosts/" + virtualHostName
completePathToConfigFile = confPath + "/vhost.conf"
DataVhost = open(completePathToConfigFile, 'r').read()
if DataVhost.find('/.well-known/acme-challenge') == -1:
if ProcessUtilities.decideServer() == ProcessUtilities.OLS:
WriteToFile = open(completePathToConfigFile, 'a')
content = '''
# Construct paths
confPath = os.path.join(sslUtilities.Server_root, "conf", "vhosts", virtualHostName)
completePathToConfigFile = os.path.join(confPath, "vhost.conf")
# Check if file exists
if not os.path.exists(completePathToConfigFile):
logging.CyberCPLogFileWriter.writeToFile(f'Configuration file not found: {completePathToConfigFile}')
return 0, f'Configuration file not found: {completePathToConfigFile}'
# Read current configuration
try:
with open(completePathToConfigFile, 'r') as f:
DataVhost = f.read()
except IOError as e:
logging.CyberCPLogFileWriter.writeToFile(f'Error reading configuration file: {str(e)}')
return 0, f'Error reading configuration file: {str(e)}'
# Check for potential conflicts
conflicts = []
# Check if ACME challenge is already configured
if DataVhost.find('/.well-known/acme-challenge') != -1:
logging.CyberCPLogFileWriter.writeToFile(f'ACME challenge already configured for {virtualHostName}')
return 1, 'ACME challenge already configured'
# Check for conflicting rewrite rules
if DataVhost.find('rewrite') != -1 and DataVhost.find('enable 1') != -1:
conflicts.append('Active rewrite rules found that might interfere with ACME challenges')
# Check for conflicting location blocks
if DataVhost.find('location /.well-known') != -1:
conflicts.append('Existing location block for /.well-known found')
# Check for conflicting aliases
if DataVhost.find('Alias /.well-known') != -1:
conflicts.append('Existing alias for /.well-known found')
# Check for conflicting context blocks
if DataVhost.find('context /.well-known') != -1:
conflicts.append('Existing context block for /.well-known found')
# Check for conflicting access controls
if DataVhost.find('deny from all') != -1 and DataVhost.find('location') != -1:
conflicts.append('Global deny rules found that might block ACME challenges')
# If conflicts found, log them and return
if conflicts:
conflict_message = 'Configuration conflicts found: ' + '; '.join(conflicts)
logging.CyberCPLogFileWriter.writeToFile(f'Configuration conflicts for {virtualHostName}: {conflict_message}')
return 0, conflict_message
# Create challenge directory if it doesn't exist
challenge_dir = '/usr/local/lsws/Example/html/.well-known/acme-challenge'
try:
os.makedirs(challenge_dir, exist_ok=True)
# Set proper permissions
os.chmod(challenge_dir, 0o755)
except OSError as e:
logging.CyberCPLogFileWriter.writeToFile(f'Error creating challenge directory: {str(e)}')
return 0, f'Error creating challenge directory: {str(e)}'
# Handle configuration based on server type
if ProcessUtilities.decideServer() == ProcessUtilities.OLS:
# OpenLiteSpeed configuration
try:
with open(completePathToConfigFile, 'a') as f:
content = '''
context /.well-known/acme-challenge {
location /usr/local/lsws/Example/html/.well-known/acme-challenge
allowBrowse 1
rewrite {
enable 0
}
addDefaultCharset off
phpIniOverride {
}
}
'''
WriteToFile.write(content)
WriteToFile.close()
else:
data = open(completePathToConfigFile, 'r').readlines()
WriteToFile = open(completePathToConfigFile, 'w')
Check = 0
for items in data:
if items.find('DocumentRoot /home/')> -1:
if Check == 0:
WriteToFile.write(items)
WriteToFile.write(' Alias /.well-known/acme-challenge /usr/local/lsws/Example/html/.well-known/acme-challenge\n')
Check = 1
else:
WriteToFile.write(items)
else:
WriteToFile.write(items)
WriteToFile.close()
f.write(content)
except IOError as e:
logging.CyberCPLogFileWriter.writeToFile(f'Error writing OLS configuration: {str(e)}')
return 0, f'Error writing OLS configuration: {str(e)}'
else:
# Apache configuration
try:
# Read current configuration
with open(completePathToConfigFile, 'r') as f:
lines = f.readlines()
# Write new configuration
with open(completePathToConfigFile, 'w') as f:
check = 0
for line in lines:
f.write(line)
if line.find('DocumentRoot /home/') > -1 and check == 0:
f.write(' Alias /.well-known/acme-challenge /usr/local/lsws/Example/html/.well-known/acme-challenge\n')
check = 1
except IOError as e:
logging.CyberCPLogFileWriter.writeToFile(f'Error writing Apache configuration: {str(e)}')
return 0, f'Error writing Apache configuration: {str(e)}'
# Restart LiteSpeed
try:
from plogical import installUtilities
installUtilities.installUtilities.reStartLiteSpeed()
except BaseException as msg:
return 0, str(msg)
logging.CyberCPLogFileWriter.writeToFile(f'Successfully configured ACME challenge for {virtualHostName}')
return 1, 'Successfully configured ACME challenge'
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f'Error restarting LiteSpeed: {str(e)}')
return 0, f'Error restarting LiteSpeed: {str(e)}'
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f'Unexpected error in PatchVhostConf: {str(e)}')
return 0, f'Unexpected error: {str(e)}'
@staticmethod
def installSSLForDomain(virtualHostName, adminEmail='example@example.org'):
@@ -476,28 +552,14 @@ context /.well-known/acme-challenge {
@staticmethod
def obtainSSLForADomain(virtualHostName, adminEmail, sslpath, aliasDomain=None):
from plogical.acl import ACLManager
from plogical.sslv2 import sslUtilities as sslv2
from plogical.customACME import CustomACME
import json
#
# url = "https://platform.cyberpersons.com/CyberpanelAdOns/Adonpermission"
# data = {
# "name": "all",
# "IP": ACLManager.GetServerIP()
# }
#
# import requests
# response = requests.post(url, data=json.dumps(data))
#Status = response.json()['status']
import socket
Status = 1
# if (Status == 1) or ProcessUtilities.decideServer() == ProcessUtilities.ent:
# retStatus, message = sslv2.obtainSSLForADomain(virtualHostName, adminEmail, sslpath, aliasDomain)
# if retStatus == 1:
# return retStatus
if sslUtilities.CheckIfSSLNeedsToBeIssued(virtualHostName) == sslUtilities.ISSUE_SSL:
pass
else:
@@ -514,58 +576,55 @@ context /.well-known/acme-challenge {
command = f'chmod -R 755 /usr/local/lsws/Example/html'
ProcessUtilities.executioner(command)
CustomVerificationFile = f'/usr/local/lsws/Example/html/.well-known/acme-challenge/{virtualHostName}'
command = f'touch {CustomVerificationFile}'
ProcessUtilities.normalExecutioner(command)
URLFetchPathWWW = f'http://www.{virtualHostName}/.well-known/acme-challenge/{virtualHostName}'
URLFetchPathNONWWW = f'http://{virtualHostName}/.well-known/acme-challenge/{virtualHostName}'
# Try Let's Encrypt first
try:
resp = requests.get(URLFetchPathWWW, timeout=5)
if resp.status_code == 200:
logging.CyberCPLogFileWriter.writeToFile(f'Status Code: 200 for: {URLFetchPathWWW}')
WWWStatus = 1
else:
domains = [virtualHostName, f'www.{virtualHostName}']
if aliasDomain:
domains.extend([aliasDomain, f'www.{aliasDomain}'])
# Check if Cloudflare is used
use_dns = False
try:
website = Websites.objects.get(domain=virtualHostName)
if website.externalApp == 'cloudflare':
use_dns = True
except:
pass
acme = CustomACME(virtualHostName, adminEmail, staging=False, provider='letsencrypt')
if acme.issue_certificate(domains, use_dns=use_dns):
logging.CyberCPLogFileWriter.writeToFile(
f'Status Code: {str(resp.status_code)} for: {URLFetchPathWWW}. Error: {resp.text}')
except BaseException as msg:
f"Successfully obtained SSL using Let's Encrypt for: {virtualHostName}")
return 1
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(
f'Status Code: Unknown for: {URLFetchPathWWW}. Error: {str(msg)}')
f"Let's Encrypt failed: {str(e)}. Trying ZeroSSL...")
# Try ZeroSSL if Let's Encrypt fails
try:
resp = requests.get(URLFetchPathNONWWW, timeout=5)
if resp.status_code == 200:
logging.CyberCPLogFileWriter.writeToFile(f'Status Code: 200 for: {URLFetchPathNONWWW}')
NONWWWStatus = 1
else:
logging.CyberCPLogFileWriter.writeToFile(f'Status Code: {str(resp.status_code)} for: {URLFetchPathNONWWW}. Error: {resp.text}')
except BaseException as msg:
domains = [virtualHostName, f'www.{virtualHostName}']
if aliasDomain:
domains.extend([aliasDomain, f'www.{aliasDomain}'])
acme = CustomACME(virtualHostName, adminEmail, staging=False, provider='zerossl')
if acme.issue_certificate(domains, use_dns=use_dns):
logging.CyberCPLogFileWriter.writeToFile(
f"Successfully obtained SSL using ZeroSSL for: {virtualHostName}")
return 1
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(
f'Status Code: Unkown for: {URLFetchPathNONWWW}. Error: {str(msg)}')
WWWStatus = 1
NONWWWStatus = 1
f"ZeroSSL failed: {str(e)}. Falling back to acme.sh")
# Fallback to acme.sh if both ACME providers fail
try:
acmePath = '/root/.acme.sh/acme.sh'
### register account for zero ssl
command = '%s --register-account -m %s' % (acmePath, adminEmail)
subprocess.call(shlex.split(command))
# if ProcessUtilities.decideDistro() == ProcessUtilities.ubuntu:
# acmePath = '/home/cyberpanel/.acme.sh/acme.sh'
command = '%s --set-default-ca --server letsencrypt' % (acmePath)
subprocess.call(shlex.split(command))
if aliasDomain is None:
existingCertPath = '/etc/letsencrypt/live/' + virtualHostName
if not os.path.exists(existingCertPath):
command = 'mkdir -p ' + existingCertPath
@@ -575,178 +634,51 @@ context /.well-known/acme-challenge {
command = acmePath + " --issue -d " + virtualHostName + " -d www." + virtualHostName \
+ ' --cert-file ' + existingCertPath + '/cert.pem' + ' --key-file ' + existingCertPath + '/privkey.pem' \
+ ' --fullchain-file ' + existingCertPath + '/fullchain.pem' + ' -w /usr/local/lsws/Example/html -k ec-256 --force --staging'
#ResultText = open(logging.CyberCPLogFileWriter.fileName, 'r').read()
#CurrentMessage = "Trying to obtain SSL for: " + virtualHostName + " and: www." + virtualHostName
if (WWWStatus and NONWWWStatus):
#logging.CyberCPLogFileWriter.writeToFile(CurrentMessage, 0)
logging.CyberCPLogFileWriter.writeToFile(command, 0)
#output = subprocess.check_output(shlex.split(command)).decode("utf-8")
try:
result = subprocess.run(command, capture_output=True, universal_newlines=True,shell=True)
except:
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, shell=True)
stdout = result.stdout
stderr = result.stderr
if result.returncode == 0:
if os.path.exists(ProcessUtilities.debugPath):
logging.CyberCPLogFileWriter.writeToFile(stdout + stderr)
command = acmePath + " --issue -d " + virtualHostName + " -d www." + virtualHostName \
+ ' --cert-file ' + existingCertPath + '/cert.pem' + ' --key-file ' + existingCertPath + '/privkey.pem' \
+ ' --fullchain-file ' + existingCertPath + '/fullchain.pem' + ' -w /usr/local/lsws/Example/html -k ec-256 --force --server letsencrypt'
logging.CyberCPLogFileWriter.writeToFile(command, 0)
try:
result = subprocess.run(command, capture_output=True, universal_newlines=True,
shell=True)
except:
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True, shell=True)
stdout = result.stdout
stderr = result.stderr
if result.returncode == 0:
if os.path.exists(ProcessUtilities.debugPath):
logging.CyberCPLogFileWriter.writeToFile(stdout + stderr)
logging.CyberCPLogFileWriter.writeToFile(
"Successfully obtained SSL for: " + virtualHostName + " and: www." + virtualHostName, 0)
logging.CyberCPLogFileWriter.SendEmail(sender_email, adminEmail, stdout,
'SSL Notification for %s.' % (virtualHostName))
return 1
else:
logging.CyberCPLogFileWriter.writeToFile(stdout + stderr)
raise subprocess.CalledProcessError(0, '', '')
else:
logging.CyberCPLogFileWriter.writeToFile(stdout + stderr)
raise subprocess.CalledProcessError(0, '', '')
if ProcessUtilities.decideServer() == ProcessUtilities.OLS:
result = subprocess.run(command, capture_output=True, universal_newlines=True, shell=True)
else:
logging.CyberCPLogFileWriter.writeToFile(command, 0)
raise subprocess.CalledProcessError(0, '', '')
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, shell=True)
except subprocess.CalledProcessError:
logging.CyberCPLogFileWriter.writeToFile(
"Failed to obtain SSL for: " + virtualHostName + " and: www." + virtualHostName, 0)
finalText = "Failed to obtain SSL for: " + virtualHostName + " and: www." + virtualHostName
try:
command = acmePath + " --issue -d " + virtualHostName + ' --cert-file ' + existingCertPath \
+ '/cert.pem' + ' --key-file ' + existingCertPath + '/privkey.pem' \
+ ' --fullchain-file ' + existingCertPath + '/fullchain.pem' + ' -w /usr/local/lsws/Example/html -k ec-256 --force --staging'
#ResultText = open(logging.CyberCPLogFileWriter.fileName, 'r').read()
CurrentMessage = '%s\nTrying to obtain SSL for: %s' % (finalText, virtualHostName)
if NONWWWStatus:
finalText = '%s\nTrying to obtain SSL for: %s' % (finalText, virtualHostName)
logging.CyberCPLogFileWriter.writeToFile("Trying to obtain SSL for: " + virtualHostName, 0)
logging.CyberCPLogFileWriter.writeToFile(command)
#output = subprocess.check_output(shlex.split(command)).decode("utf-8")
try:
result = subprocess.run(command, capture_output=True, universal_newlines=True,
shell=True)
except:
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True, shell=True)
stdout = result.stdout
stderr = result.stderr
if result.returncode == 0:
if os.path.exists(ProcessUtilities.debugPath):
logging.CyberCPLogFileWriter.writeToFile(stdout + stderr)
command = acmePath + " --issue -d " + virtualHostName + ' --cert-file ' + existingCertPath \
+ '/cert.pem' + ' --key-file ' + existingCertPath + '/privkey.pem' \
+ ' --fullchain-file ' + existingCertPath + '/fullchain.pem' + ' -w /usr/local/lsws/Example/html -k ec-256 --force --server letsencrypt'
try:
result = subprocess.run(command, capture_output=True, universal_newlines=True,
shell=True)
except:
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True, shell=True)
stdout = result.stdout
stderr = result.stderr
if result.returncode == 0:
if os.path.exists(ProcessUtilities.debugPath):
logging.CyberCPLogFileWriter.writeToFile(stdout + stderr)
logging.CyberCPLogFileWriter.writeToFile(
"Successfully obtained SSL for: " + virtualHostName, 0)
finalText = '%s\nSuccessfully obtained SSL for: %s.' % (finalText, virtualHostName)
logging.CyberCPLogFileWriter.SendEmail(sender_email, adminEmail, finalText,
'SSL Notification for %s.' % (virtualHostName))
return 1
else:
logging.CyberCPLogFileWriter.writeToFile(command, 0)
logging.CyberCPLogFileWriter.writeToFile(stdout + stderr)
return 0
else:
logging.CyberCPLogFileWriter.writeToFile(stdout + stderr)
return 0
else:
logging.CyberCPLogFileWriter.writeToFile(command, 0)
return 0
except subprocess.CalledProcessError:
logging.CyberCPLogFileWriter.writeToFile('Failed to obtain SSL, issuing self-signed SSL for: ' + virtualHostName, 0)
logging.CyberCPLogFileWriter.SendEmail(sender_email, adminEmail, 'Failed to obtain SSL, issuing self-signed SSL for: ' + virtualHostName,
'SSL Notification for %s.' % (virtualHostName))
return 0
if result.returncode == 0:
command = acmePath + " --issue -d " + virtualHostName + " -d www." + virtualHostName \
+ ' --cert-file ' + existingCertPath + '/cert.pem' + ' --key-file ' + existingCertPath + '/privkey.pem' \
+ ' --fullchain-file ' + existingCertPath + '/fullchain.pem' + ' -w /usr/local/lsws/Example/html -k ec-256 --force --server letsencrypt'
result = subprocess.run(command, capture_output=True, universal_newlines=True, shell=True)
if result.returncode == 0:
logging.CyberCPLogFileWriter.writeToFile(
"Successfully obtained SSL for: " + virtualHostName + " and: www." + virtualHostName, 0)
logging.CyberCPLogFileWriter.SendEmail(sender_email, adminEmail, result.stdout,
'SSL Notification for %s.' % (virtualHostName))
return 1
return 0
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(str(e))
return 0
else:
existingCertPath = '/etc/letsencrypt/live/' + virtualHostName
if not os.path.exists(existingCertPath):
command = 'mkdir -p ' + existingCertPath
subprocess.call(shlex.split(command))
try:
logging.CyberCPLogFileWriter.writeToFile(
"Trying to obtain SSL for: " + virtualHostName + ", www." + virtualHostName + ", " + aliasDomain + " and www." + aliasDomain + ",")
command = acmePath + " --issue -d " + virtualHostName + " -d www." + virtualHostName \
+ ' -d ' + aliasDomain + ' -d www.' + aliasDomain\
+ ' --cert-file ' + existingCertPath + '/cert.pem' + ' --key-file ' + existingCertPath + '/privkey.pem' \
+ ' --fullchain-file ' + existingCertPath + '/fullchain.pem' + ' -w /usr/local/lsws/Example/html -k ec-256 --force --server letsencrypt'
output = subprocess.check_output(shlex.split(command)).decode("utf-8")
logging.CyberCPLogFileWriter.writeToFile(
"Successfully obtained SSL for: " + virtualHostName + ", www." + virtualHostName + ", " + aliasDomain + "and www." + aliasDomain + ",")
except subprocess.CalledProcessError:
logging.CyberCPLogFileWriter.writeToFile(
"Failed to obtain SSL for: " + virtualHostName + ", www." + virtualHostName + ", " + aliasDomain + "and www." + aliasDomain + ",")
result = subprocess.run(command, capture_output=True, universal_newlines=True, shell=True)
if result.returncode == 0:
return 1
return 0
##
return 0
except BaseException as msg:
logging.CyberCPLogFileWriter.writeToFile(str(msg) + " [Failed to obtain SSL. [obtainSSLForADomain]]")
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(str(e))
return 0
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(str(e))
return 0