usmannasir
2025-08-08 00:56:41 +05:00
parent 8fcf18279b
commit 651324b464
7 changed files with 495 additions and 299 deletions

9
.idea/workspace.xml generated
View File

@@ -6,6 +6,12 @@
<component name="ChangeListManager"> <component name="ChangeListManager">
<list default="true" id="5251c5c9-f2a1-41f2-bc76-10b517091df1" name="Changes" comment=""> <list default="true" id="5251c5c9-f2a1-41f2-bc76-10b517091df1" name="Changes" comment="">
<change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" /> <change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/loginSystem/views.py" beforeDir="false" afterPath="$PROJECT_DIR$/loginSystem/views.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/plogical/adminPass.py" beforeDir="false" afterPath="$PROJECT_DIR$/plogical/adminPass.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/plogical/customACME.py" beforeDir="false" afterPath="$PROJECT_DIR$/plogical/customACME.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/plogical/sslUtilities.py" beforeDir="false" afterPath="$PROJECT_DIR$/plogical/sslUtilities.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/plogical/sslv2.py" beforeDir="false" afterPath="$PROJECT_DIR$/plogical/sslv2.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/websiteFunctions/views.py" beforeDir="false" afterPath="$PROJECT_DIR$/websiteFunctions/views.py" afterDir="false" />
</list> </list>
<option name="SHOW_DIALOG" value="false" /> <option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" /> <option name="HIGHLIGHT_CONFLICTS" value="true" />
@@ -48,7 +54,7 @@
"RunOnceActivity.TerminalTabsStorage.copyFrom.TerminalArrangementManager": "true", "RunOnceActivity.TerminalTabsStorage.copyFrom.TerminalArrangementManager": "true",
"RunOnceActivity.git.unshallow": "true", "RunOnceActivity.git.unshallow": "true",
"SHELLCHECK.PATH": "/Users/cyberpersons/Library/Application Support/JetBrains/PyCharm2025.1/plugins/Shell Script/shellcheck", "SHELLCHECK.PATH": "/Users/cyberpersons/Library/Application Support/JetBrains/PyCharm2025.1/plugins/Shell Script/shellcheck",
"git-widget-placeholder": "stable", "git-widget-placeholder": "v2.4.3",
"last_opened_file_path": "/Users/cyberpersons/cyberpanel", "last_opened_file_path": "/Users/cyberpersons/cyberpanel",
"node.js.detected.package.eslint": "true", "node.js.detected.package.eslint": "true",
"node.js.detected.package.tslint": "true", "node.js.detected.package.tslint": "true",
@@ -117,6 +123,7 @@
<workItem from="1754429757112" duration="3503000" /> <workItem from="1754429757112" duration="3503000" />
<workItem from="1754433799097" duration="517000" /> <workItem from="1754433799097" duration="517000" />
<workItem from="1754448353513" duration="2970000" /> <workItem from="1754448353513" duration="2970000" />
<workItem from="1754511414251" duration="12135000" />
</task> </task>
<servers /> <servers />
</component> </component>

View File

@@ -222,7 +222,7 @@ def loadLoginPage(request):
token = hashPassword.generateToken('admin', '1234567') token = hashPassword.generateToken('admin', '1234567')
email = 'example@example.org' email = 'admin@cyberpanel.net'
admin = Administrator(userName="admin", password=password, type=1, email=email, admin = Administrator(userName="admin", password=password, type=1, email=email,
firstName="Cyber", lastName="Panel", acl=acl, token=token) firstName="Cyber", lastName="Panel", acl=acl, token=token)
admin.save() admin.save()

View File

@@ -47,7 +47,7 @@ def main():
acl = ACL.objects.get(name='admin') acl = ACL.objects.get(name='admin')
token = hashPassword.generateToken('admin', adminPass) token = hashPassword.generateToken('admin', adminPass)
email = 'example@example.org' email = 'admin@cyberpanel.net'
admin = Administrator(userName="admin", password=hashPassword.hash_password(adminPass), type=1, email=email, admin = Administrator(userName="admin", password=hashPassword.hash_password(adminPass), type=1, email=email,
firstName="Cyber", lastName="Panel", acl=acl, token=token) firstName="Cyber", lastName="Panel", acl=acl, token=token)
admin.save() admin.save()

View File

@@ -17,15 +17,17 @@ from plogical import CyberCPLogFileWriter as logging
from plogical.processUtilities import ProcessUtilities from plogical.processUtilities import ProcessUtilities
import socket import socket
class CustomACME: class CustomACME:
def __init__(self, domain, admin_email, staging=False, provider='letsencrypt'): def __init__(self, domain, admin_email, staging=False, provider='letsencrypt'):
"""Initialize CustomACME""" """Initialize CustomACME"""
logging.CyberCPLogFileWriter.writeToFile(f'Initializing CustomACME for domain: {domain}, email: {admin_email}, staging: {staging}, provider: {provider}') logging.CyberCPLogFileWriter.writeToFile(
f'Initializing CustomACME for domain: {domain}, email: {admin_email}, staging: {staging}, provider: {provider}')
self.domain = domain self.domain = domain
self.admin_email = admin_email self.admin_email = admin_email
self.staging = staging self.staging = staging
self.provider = provider self.provider = provider
# Set the ACME directory URL based on provider and staging flag # Set the ACME directory URL based on provider and staging flag
if provider == 'zerossl': if provider == 'zerossl':
if staging: if staging:
@@ -41,7 +43,7 @@ class CustomACME:
else: else:
self.acme_directory = "https://acme-v02.api.letsencrypt.org/directory" self.acme_directory = "https://acme-v02.api.letsencrypt.org/directory"
logging.CyberCPLogFileWriter.writeToFile('Using Let\'s Encrypt production ACME directory') logging.CyberCPLogFileWriter.writeToFile('Using Let\'s Encrypt production ACME directory')
self.account_key = None self.account_key = None
self.account_url = None self.account_url = None
self.directory = None self.directory = None
@@ -50,16 +52,17 @@ class CustomACME:
self.authorizations = [] self.authorizations = []
self.finalize_url = None self.finalize_url = None
self.certificate_url = None self.certificate_url = None
# Initialize paths # Initialize paths
self.cert_path = f'/etc/letsencrypt/live/{domain}' self.cert_path = f'/etc/letsencrypt/live/{domain}'
self.challenge_path = '/usr/local/lsws/Example/html/.well-known/acme-challenge' self.challenge_path = '/usr/local/lsws/Example/html/.well-known/acme-challenge'
self.account_key_path = f'/etc/letsencrypt/accounts/{domain}.key' self.account_key_path = f'/etc/letsencrypt/accounts/{domain}.key'
logging.CyberCPLogFileWriter.writeToFile(f'Certificate path: {self.cert_path}, Challenge path: {self.challenge_path}') logging.CyberCPLogFileWriter.writeToFile(
f'Certificate path: {self.cert_path}, Challenge path: {self.challenge_path}')
# Create accounts directory if it doesn't exist # Create accounts directory if it doesn't exist
os.makedirs('/etc/letsencrypt/accounts', exist_ok=True) os.makedirs('/etc/letsencrypt/accounts', exist_ok=True)
def _generate_account_key(self): def _generate_account_key(self):
"""Generate RSA account key""" """Generate RSA account key"""
try: try:
@@ -82,7 +85,8 @@ class CustomACME:
logging.CyberCPLogFileWriter.writeToFile(f'Fetching ACME directory from {self.acme_directory}') logging.CyberCPLogFileWriter.writeToFile(f'Fetching ACME directory from {self.acme_directory}')
response = requests.get(self.acme_directory) response = requests.get(self.acme_directory)
self.directory = response.json() self.directory = response.json()
logging.CyberCPLogFileWriter.writeToFile(f'Successfully fetched ACME directory: {json.dumps(self.directory)}') logging.CyberCPLogFileWriter.writeToFile(
f'Successfully fetched ACME directory: {json.dumps(self.directory)}')
return True return True
except Exception as e: except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f'Error getting directory: {str(e)}') logging.CyberCPLogFileWriter.writeToFile(f'Error getting directory: {str(e)}')
@@ -93,19 +97,19 @@ class CustomACME:
try: try:
logging.CyberCPLogFileWriter.writeToFile('Getting new nonce...') logging.CyberCPLogFileWriter.writeToFile('Getting new nonce...')
response = requests.head(self.directory['newNonce']) response = requests.head(self.directory['newNonce'])
# Check for nonce in headers (case-insensitive) # Check for nonce in headers (case-insensitive)
nonce_header = None nonce_header = None
for header_name in ['Replay-Nonce', 'replay-nonce', 'REPLAY-NONCE']: for header_name in ['Replay-Nonce', 'replay-nonce', 'REPLAY-NONCE']:
if header_name in response.headers: if header_name in response.headers:
nonce_header = header_name nonce_header = header_name
break break
if not nonce_header: if not nonce_header:
# Log all available headers for debugging # Log all available headers for debugging
logging.CyberCPLogFileWriter.writeToFile(f'Available headers: {list(response.headers.keys())}') logging.CyberCPLogFileWriter.writeToFile(f'Available headers: {list(response.headers.keys())}')
raise KeyError('Replay-Nonce header not found in response') raise KeyError('Replay-Nonce header not found in response')
self.nonce = response.headers[nonce_header] self.nonce = response.headers[nonce_header]
logging.CyberCPLogFileWriter.writeToFile(f'Successfully got nonce: {self.nonce}') logging.CyberCPLogFileWriter.writeToFile(f'Successfully got nonce: {self.nonce}')
return True return True
@@ -119,22 +123,22 @@ class CustomACME:
logging.CyberCPLogFileWriter.writeToFile(f'Creating JWS for URL: {url}') logging.CyberCPLogFileWriter.writeToFile(f'Creating JWS for URL: {url}')
if payload is not None: if payload is not None:
logging.CyberCPLogFileWriter.writeToFile(f'Payload: {json.dumps(payload)}') logging.CyberCPLogFileWriter.writeToFile(f'Payload: {json.dumps(payload)}')
# Get a fresh nonce for this request # Get a fresh nonce for this request
if not self._get_nonce(): if not self._get_nonce():
logging.CyberCPLogFileWriter.writeToFile('Failed to get fresh nonce') logging.CyberCPLogFileWriter.writeToFile('Failed to get fresh nonce')
return None return None
# Get the private key numbers # Get the private key numbers
logging.CyberCPLogFileWriter.writeToFile('Getting private key numbers...') logging.CyberCPLogFileWriter.writeToFile('Getting private key numbers...')
private_numbers = self.account_key.private_numbers() private_numbers = self.account_key.private_numbers()
public_numbers = private_numbers.public_numbers public_numbers = private_numbers.public_numbers
# Convert numbers to bytes # Convert numbers to bytes
logging.CyberCPLogFileWriter.writeToFile('Converting RSA numbers to bytes...') logging.CyberCPLogFileWriter.writeToFile('Converting RSA numbers to bytes...')
n_bytes = public_numbers.n.to_bytes((public_numbers.n.bit_length() + 7) // 8, 'big') n_bytes = public_numbers.n.to_bytes((public_numbers.n.bit_length() + 7) // 8, 'big')
e_bytes = public_numbers.e.to_bytes((public_numbers.e.bit_length() + 7) // 8, 'big') e_bytes = public_numbers.e.to_bytes((public_numbers.e.bit_length() + 7) // 8, 'big')
# Create JWK # Create JWK
logging.CyberCPLogFileWriter.writeToFile('Creating JWK...') logging.CyberCPLogFileWriter.writeToFile('Creating JWK...')
jwk_key = { jwk_key = {
@@ -144,14 +148,14 @@ class CustomACME:
"alg": "RS256" "alg": "RS256"
} }
logging.CyberCPLogFileWriter.writeToFile(f'Created JWK: {json.dumps(jwk_key)}') logging.CyberCPLogFileWriter.writeToFile(f'Created JWK: {json.dumps(jwk_key)}')
# Create protected header # Create protected header
protected = { protected = {
"alg": "RS256", "alg": "RS256",
"url": url, "url": url,
"nonce": self.nonce "nonce": self.nonce
} }
# Add either JWK or Key ID based on whether we have an account URL # Add either JWK or Key ID based on whether we have an account URL
if self.account_url and url != self.directory['newAccount']: if self.account_url and url != self.directory['newAccount']:
protected["kid"] = self.account_url protected["kid"] = self.account_url
@@ -159,13 +163,13 @@ class CustomACME:
else: else:
protected["jwk"] = jwk_key protected["jwk"] = jwk_key
logging.CyberCPLogFileWriter.writeToFile('Using JWK for new account') logging.CyberCPLogFileWriter.writeToFile('Using JWK for new account')
# Encode protected header # Encode protected header
logging.CyberCPLogFileWriter.writeToFile('Encoding protected header...') logging.CyberCPLogFileWriter.writeToFile('Encoding protected header...')
protected_b64 = base64.urlsafe_b64encode( protected_b64 = base64.urlsafe_b64encode(
json.dumps(protected).encode('utf-8') json.dumps(protected).encode('utf-8')
).decode('utf-8').rstrip('=') ).decode('utf-8').rstrip('=')
# For POST-as-GET requests, payload_b64 should be empty string # For POST-as-GET requests, payload_b64 should be empty string
if payload is None: if payload is None:
payload_b64 = "" payload_b64 = ""
@@ -176,11 +180,11 @@ class CustomACME:
payload_b64 = base64.urlsafe_b64encode( payload_b64 = base64.urlsafe_b64encode(
json.dumps(payload).encode('utf-8') json.dumps(payload).encode('utf-8')
).decode('utf-8').rstrip('=') ).decode('utf-8').rstrip('=')
# Create signature input # Create signature input
logging.CyberCPLogFileWriter.writeToFile('Creating signature input...') logging.CyberCPLogFileWriter.writeToFile('Creating signature input...')
signature_input = f"{protected_b64}.{payload_b64}".encode('utf-8') signature_input = f"{protected_b64}.{payload_b64}".encode('utf-8')
# Sign the input # Sign the input
logging.CyberCPLogFileWriter.writeToFile('Signing input...') logging.CyberCPLogFileWriter.writeToFile('Signing input...')
signature = self.account_key.sign( signature = self.account_key.sign(
@@ -188,26 +192,23 @@ class CustomACME:
padding.PKCS1v15(), padding.PKCS1v15(),
hashes.SHA256() hashes.SHA256()
) )
# Encode signature # Encode signature
logging.CyberCPLogFileWriter.writeToFile('Encoding signature...') logging.CyberCPLogFileWriter.writeToFile('Encoding signature...')
signature_b64 = base64.urlsafe_b64encode(signature).decode('utf-8').rstrip('=') signature_b64 = base64.urlsafe_b64encode(signature).decode('utf-8').rstrip('=')
# Create final JWS # Create final JWS
logging.CyberCPLogFileWriter.writeToFile('Creating final JWS...') logging.CyberCPLogFileWriter.writeToFile('Creating final JWS...')
jws = { jws = {
"protected": protected_b64, "protected": protected_b64,
"payload": payload_b64, # Always include payload field, even if empty
"signature": signature_b64 "signature": signature_b64
} }
# Only add payload if it exists
if payload is not None:
jws["payload"] = payload_b64
# Ensure the JWS is properly formatted # Ensure the JWS is properly formatted
jws_str = json.dumps(jws, separators=(',', ':')) jws_str = json.dumps(jws, separators=(',', ':'))
logging.CyberCPLogFileWriter.writeToFile(f'Final JWS: {jws_str}') logging.CyberCPLogFileWriter.writeToFile(f'Final JWS: {jws_str}')
return jws_str return jws_str
except Exception as e: except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f'Error creating JWS: {str(e)}') logging.CyberCPLogFileWriter.writeToFile(f'Error creating JWS: {str(e)}')
@@ -257,16 +258,19 @@ class CustomACME:
"termsOfServiceAgreed": True, "termsOfServiceAgreed": True,
"contact": [f"mailto:{self.admin_email}"] "contact": [f"mailto:{self.admin_email}"]
} }
# Check if External Account Binding is required (for ZeroSSL) # Check if External Account Binding is required (for ZeroSSL)
if self.provider == 'zerossl' and 'meta' in self.directory and 'externalAccountRequired' in self.directory['meta']: if self.provider == 'zerossl' and 'meta' in self.directory and 'externalAccountRequired' in self.directory[
'meta']:
if self.directory['meta']['externalAccountRequired']: if self.directory['meta']['externalAccountRequired']:
logging.CyberCPLogFileWriter.writeToFile('ZeroSSL requires External Account Binding, getting EAB credentials...') logging.CyberCPLogFileWriter.writeToFile(
'ZeroSSL requires External Account Binding, getting EAB credentials...')
# Get EAB credentials from ZeroSSL # Get EAB credentials from ZeroSSL
eab_kid, eab_hmac_key = self._get_zerossl_eab_credentials() eab_kid, eab_hmac_key = self._get_zerossl_eab_credentials()
if not eab_kid or not eab_hmac_key: if not eab_kid or not eab_hmac_key:
logging.CyberCPLogFileWriter.writeToFile('Failed to get ZeroSSL EAB credentials, falling back to Let\'s Encrypt') logging.CyberCPLogFileWriter.writeToFile(
'Failed to get ZeroSSL EAB credentials, falling back to Let\'s Encrypt')
# Fallback to Let's Encrypt # Fallback to Let's Encrypt
self.provider = 'letsencrypt' self.provider = 'letsencrypt'
self.acme_directory = "https://acme-v02.api.letsencrypt.org/directory" self.acme_directory = "https://acme-v02.api.letsencrypt.org/directory"
@@ -275,15 +279,15 @@ class CustomACME:
if not self._get_nonce(): if not self._get_nonce():
return False return False
return self._create_account() return self._create_account()
# Add EAB to payload # Add EAB to payload
payload['externalAccountBinding'] = self._create_eab(eab_kid, eab_hmac_key) payload['externalAccountBinding'] = self._create_eab(eab_kid, eab_hmac_key)
jws = self._create_jws(payload, self.directory['newAccount']) jws = self._create_jws(payload, self.directory['newAccount'])
if not jws: if not jws:
logging.CyberCPLogFileWriter.writeToFile('Failed to create JWS for account creation') logging.CyberCPLogFileWriter.writeToFile('Failed to create JWS for account creation')
return False return False
logging.CyberCPLogFileWriter.writeToFile('Sending account creation request...') logging.CyberCPLogFileWriter.writeToFile('Sending account creation request...')
headers = { headers = {
'Content-Type': 'application/jose+json' 'Content-Type': 'application/jose+json'
@@ -291,15 +295,17 @@ class CustomACME:
response = requests.post(self.directory['newAccount'], data=jws, headers=headers) response = requests.post(self.directory['newAccount'], data=jws, headers=headers)
logging.CyberCPLogFileWriter.writeToFile(f'Account creation response status: {response.status_code}') logging.CyberCPLogFileWriter.writeToFile(f'Account creation response status: {response.status_code}')
logging.CyberCPLogFileWriter.writeToFile(f'Account creation response: {response.text}') logging.CyberCPLogFileWriter.writeToFile(f'Account creation response: {response.text}')
if response.status_code == 201: if response.status_code == 201:
self.account_url = response.headers['Location'] self.account_url = response.headers['Location']
logging.CyberCPLogFileWriter.writeToFile(f'Successfully created account. Account URL: {self.account_url}') logging.CyberCPLogFileWriter.writeToFile(
f'Successfully created account. Account URL: {self.account_url}')
# Save the account key for future use # Save the account key for future use
self._save_account_key() self._save_account_key()
return True return True
elif response.status_code == 429: elif response.status_code == 429:
logging.CyberCPLogFileWriter.writeToFile('Rate limit hit for account creation. Using staging environment...') logging.CyberCPLogFileWriter.writeToFile(
'Rate limit hit for account creation. Using staging environment...')
self.staging = True self.staging = True
self.acme_directory = "https://acme-staging-v02.api.letsencrypt.org/directory" self.acme_directory = "https://acme-staging-v02.api.letsencrypt.org/directory"
# Get new directory and nonce for staging # Get new directory and nonce for staging
@@ -323,7 +329,7 @@ class CustomACME:
"""Get External Account Binding credentials from ZeroSSL""" """Get External Account Binding credentials from ZeroSSL"""
try: try:
logging.CyberCPLogFileWriter.writeToFile('Getting ZeroSSL EAB credentials...') logging.CyberCPLogFileWriter.writeToFile('Getting ZeroSSL EAB credentials...')
# Request EAB credentials from ZeroSSL API # Request EAB credentials from ZeroSSL API
eab_url = 'https://api.zerossl.com/acme/eab-credentials-email' eab_url = 'https://api.zerossl.com/acme/eab-credentials-email'
headers = { headers = {
@@ -332,16 +338,16 @@ class CustomACME:
data = { data = {
'email': self.admin_email 'email': self.admin_email
} }
response = requests.post(eab_url, headers=headers, data=data) response = requests.post(eab_url, headers=headers, data=data)
logging.CyberCPLogFileWriter.writeToFile(f'ZeroSSL EAB response status: {response.status_code}') logging.CyberCPLogFileWriter.writeToFile(f'ZeroSSL EAB response status: {response.status_code}')
logging.CyberCPLogFileWriter.writeToFile(f'ZeroSSL EAB response: {response.text}') logging.CyberCPLogFileWriter.writeToFile(f'ZeroSSL EAB response: {response.text}')
if response.status_code == 200: if response.status_code == 200:
eab_data = response.json() eab_data = response.json()
if 'eab_kid' in eab_data and 'eab_hmac_key' in eab_data: if 'eab_kid' in eab_data and 'eab_hmac_key' in eab_data:
return eab_data['eab_kid'], eab_data['eab_hmac_key'] return eab_data['eab_kid'], eab_data['eab_hmac_key']
return None, None return None, None
except Exception as e: except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f'Error getting ZeroSSL EAB credentials: {str(e)}') logging.CyberCPLogFileWriter.writeToFile(f'Error getting ZeroSSL EAB credentials: {str(e)}')
@@ -351,52 +357,52 @@ class CustomACME:
"""Create External Account Binding for ZeroSSL""" """Create External Account Binding for ZeroSSL"""
try: try:
logging.CyberCPLogFileWriter.writeToFile('Creating External Account Binding...') logging.CyberCPLogFileWriter.writeToFile('Creating External Account Binding...')
# Get the private key numbers # Get the private key numbers
private_numbers = self.account_key.private_numbers() private_numbers = self.account_key.private_numbers()
public_numbers = private_numbers.public_numbers public_numbers = private_numbers.public_numbers
# Convert numbers to bytes # Convert numbers to bytes
n_bytes = public_numbers.n.to_bytes((public_numbers.n.bit_length() + 7) // 8, 'big') n_bytes = public_numbers.n.to_bytes((public_numbers.n.bit_length() + 7) // 8, 'big')
e_bytes = public_numbers.e.to_bytes((public_numbers.e.bit_length() + 7) // 8, 'big') e_bytes = public_numbers.e.to_bytes((public_numbers.e.bit_length() + 7) // 8, 'big')
# Create JWK # Create JWK
jwk = { jwk = {
"kty": "RSA", "kty": "RSA",
"n": base64.urlsafe_b64encode(n_bytes).decode('utf-8').rstrip('='), "n": base64.urlsafe_b64encode(n_bytes).decode('utf-8').rstrip('='),
"e": base64.urlsafe_b64encode(e_bytes).decode('utf-8').rstrip('=') "e": base64.urlsafe_b64encode(e_bytes).decode('utf-8').rstrip('=')
} }
# Create protected header for EAB # Create protected header for EAB
protected = { protected = {
"alg": "HS256", "alg": "HS256",
"kid": eab_kid, "kid": eab_kid,
"url": self.directory['newAccount'] "url": self.directory['newAccount']
} }
# Encode protected header # Encode protected header
protected_b64 = base64.urlsafe_b64encode( protected_b64 = base64.urlsafe_b64encode(
json.dumps(protected).encode('utf-8') json.dumps(protected).encode('utf-8')
).decode('utf-8').rstrip('=') ).decode('utf-8').rstrip('=')
# Encode JWK payload # Encode JWK payload
payload_b64 = base64.urlsafe_b64encode( payload_b64 = base64.urlsafe_b64encode(
json.dumps(jwk).encode('utf-8') json.dumps(jwk).encode('utf-8')
).decode('utf-8').rstrip('=') ).decode('utf-8').rstrip('=')
# Create signature using HMAC-SHA256 # Create signature using HMAC-SHA256
signature_input = f"{protected_b64}.{payload_b64}".encode('utf-8') signature_input = f"{protected_b64}.{payload_b64}".encode('utf-8')
hmac_key = base64.urlsafe_b64decode(eab_hmac_key + '==') # Add padding if needed hmac_key = base64.urlsafe_b64decode(eab_hmac_key + '==') # Add padding if needed
signature = hmac.new(hmac_key, signature_input, hashlib.sha256).digest() signature = hmac.new(hmac_key, signature_input, hashlib.sha256).digest()
signature_b64 = base64.urlsafe_b64encode(signature).decode('utf-8').rstrip('=') signature_b64 = base64.urlsafe_b64encode(signature).decode('utf-8').rstrip('=')
# Create EAB object # Create EAB object
eab = { eab = {
"protected": protected_b64, "protected": protected_b64,
"payload": payload_b64, "payload": payload_b64,
"signature": signature_b64 "signature": signature_b64
} }
logging.CyberCPLogFileWriter.writeToFile('Successfully created External Account Binding') logging.CyberCPLogFileWriter.writeToFile('Successfully created External Account Binding')
return eab return eab
except Exception as e: except Exception as e:
@@ -411,12 +417,12 @@ class CustomACME:
payload = { payload = {
"identifiers": identifiers "identifiers": identifiers
} }
jws = self._create_jws(payload, self.directory['newOrder']) jws = self._create_jws(payload, self.directory['newOrder'])
if not jws: if not jws:
logging.CyberCPLogFileWriter.writeToFile('Failed to create JWS for order creation') logging.CyberCPLogFileWriter.writeToFile('Failed to create JWS for order creation')
return False return False
logging.CyberCPLogFileWriter.writeToFile('Sending order creation request...') logging.CyberCPLogFileWriter.writeToFile('Sending order creation request...')
headers = { headers = {
'Content-Type': 'application/jose+json' 'Content-Type': 'application/jose+json'
@@ -424,7 +430,7 @@ class CustomACME:
response = requests.post(self.directory['newOrder'], data=jws, headers=headers) response = requests.post(self.directory['newOrder'], data=jws, headers=headers)
logging.CyberCPLogFileWriter.writeToFile(f'Order creation response status: {response.status_code}') logging.CyberCPLogFileWriter.writeToFile(f'Order creation response status: {response.status_code}')
logging.CyberCPLogFileWriter.writeToFile(f'Order creation response: {response.text}') logging.CyberCPLogFileWriter.writeToFile(f'Order creation response: {response.text}')
if response.status_code == 201: if response.status_code == 201:
self.order_url = response.headers['Location'] self.order_url = response.headers['Location']
self.authorizations = response.json()['authorizations'] self.authorizations = response.json()['authorizations']
@@ -442,26 +448,26 @@ class CustomACME:
"""Handle HTTP-01 challenge""" """Handle HTTP-01 challenge"""
try: try:
logging.CyberCPLogFileWriter.writeToFile(f'Handling HTTP challenge: {json.dumps(challenge)}') logging.CyberCPLogFileWriter.writeToFile(f'Handling HTTP challenge: {json.dumps(challenge)}')
# Get key authorization # Get key authorization
key_auth = self._get_key_authorization(challenge) key_auth = self._get_key_authorization(challenge)
if not key_auth: if not key_auth:
logging.CyberCPLogFileWriter.writeToFile('Failed to get key authorization') logging.CyberCPLogFileWriter.writeToFile('Failed to get key authorization')
return False return False
# Create challenge directory if it doesn't exist # Create challenge directory if it doesn't exist
if not os.path.exists(self.challenge_path): if not os.path.exists(self.challenge_path):
logging.CyberCPLogFileWriter.writeToFile(f'Creating challenge directory: {self.challenge_path}') logging.CyberCPLogFileWriter.writeToFile(f'Creating challenge directory: {self.challenge_path}')
os.makedirs(self.challenge_path) os.makedirs(self.challenge_path)
# Write challenge file # Write challenge file
challenge_file = os.path.join(self.challenge_path, challenge['token']) challenge_file = os.path.join(self.challenge_path, challenge['token'])
logging.CyberCPLogFileWriter.writeToFile(f'Writing challenge file: {challenge_file}') logging.CyberCPLogFileWriter.writeToFile(f'Writing challenge file: {challenge_file}')
# Write only the key authorization to the file # Write only the key authorization to the file
with open(challenge_file, 'w') as f: with open(challenge_file, 'w') as f:
f.write(key_auth) f.write(key_auth)
logging.CyberCPLogFileWriter.writeToFile('Successfully handled HTTP challenge') logging.CyberCPLogFileWriter.writeToFile('Successfully handled HTTP challenge')
return True return True
except Exception as e: except Exception as e:
@@ -483,22 +489,22 @@ class CustomACME:
"""Get key authorization for challenge""" """Get key authorization for challenge"""
try: try:
logging.CyberCPLogFileWriter.writeToFile('Getting key authorization...') logging.CyberCPLogFileWriter.writeToFile('Getting key authorization...')
# Get the private key numbers # Get the private key numbers
private_numbers = self.account_key.private_numbers() private_numbers = self.account_key.private_numbers()
public_numbers = private_numbers.public_numbers public_numbers = private_numbers.public_numbers
# Convert numbers to bytes # Convert numbers to bytes
n_bytes = public_numbers.n.to_bytes((public_numbers.n.bit_length() + 7) // 8, 'big') n_bytes = public_numbers.n.to_bytes((public_numbers.n.bit_length() + 7) // 8, 'big')
e_bytes = public_numbers.e.to_bytes((public_numbers.e.bit_length() + 7) // 8, 'big') e_bytes = public_numbers.e.to_bytes((public_numbers.e.bit_length() + 7) // 8, 'big')
# Create JWK without alg field # Create JWK without alg field
jwk_key = { jwk_key = {
"kty": "RSA", "kty": "RSA",
"n": base64.urlsafe_b64encode(n_bytes).decode('utf-8').rstrip('='), "n": base64.urlsafe_b64encode(n_bytes).decode('utf-8').rstrip('='),
"e": base64.urlsafe_b64encode(e_bytes).decode('utf-8').rstrip('=') "e": base64.urlsafe_b64encode(e_bytes).decode('utf-8').rstrip('=')
} }
# Calculate the JWK thumbprint according to RFC 7638 # Calculate the JWK thumbprint according to RFC 7638
# The thumbprint is a hash of the JWK (JSON Web Key) in a specific format # The thumbprint is a hash of the JWK (JSON Web Key) in a specific format
# First, we create a dictionary with the required JWK parameters # First, we create a dictionary with the required JWK parameters
@@ -507,23 +513,23 @@ class CustomACME:
"kty": "RSA", # Key type "kty": "RSA", # Key type
"n": base64.urlsafe_b64encode(public_numbers.n.to_bytes(256, 'big')).decode('utf-8').rstrip('=') "n": base64.urlsafe_b64encode(public_numbers.n.to_bytes(256, 'big')).decode('utf-8').rstrip('=')
} }
# Sort the JWK parameters alphabetically by key name # Sort the JWK parameters alphabetically by key name
# This ensures consistent thumbprint calculation regardless of parameter order # This ensures consistent thumbprint calculation regardless of parameter order
sorted_jwk = json.dumps(jwk, sort_keys=True, separators=(',', ':')) sorted_jwk = json.dumps(jwk, sort_keys=True, separators=(',', ':'))
# Calculate the SHA-256 hash of the sorted JWK # Calculate the SHA-256 hash of the sorted JWK
# Example of what sorted_jwk might look like: # Example of what sorted_jwk might look like:
# {"e":"AQAB","kty":"RSA","n":"tVKUtcx_n9rt5afY_2WFNVAu9fjD4xqX4Xm3dJz3XYb"} # {"e":"AQAB","kty":"RSA","n":"tVKUtcx_n9rt5afY_2WFNVAu9fjD4xqX4Xm3dJz3XYb"}
# The thumbprint will be a 32-byte SHA-256 hash of this string # The thumbprint will be a 32-byte SHA-256 hash of this string
# For example, it might look like: b'x\x9c\x1d\x8f\x8b\x1b\x1e\x8b\x1b\x1e\x8b\x1b\x1e\x8b\x1b\x1e' # For example, it might look like: b'x\x9c\x1d\x8f\x8b\x1b\x1e\x8b\x1b\x1e\x8b\x1b\x1e\x8b\x1b\x1e'
thumbprint = hashlib.sha256(sorted_jwk.encode('utf-8')).digest() thumbprint = hashlib.sha256(sorted_jwk.encode('utf-8')).digest()
# Encode the thumbprint in base64url format (RFC 4648) # Encode the thumbprint in base64url format (RFC 4648)
# This removes padding characters (=) and replaces + and / with - and _ # This removes padding characters (=) and replaces + and / with - and _
# Example final thumbprint: "xJ0dj8sbHosbHosbHosbHos" # Example final thumbprint: "xJ0dj8sbHosbHosbHosbHos"
thumbprint = base64.urlsafe_b64encode(thumbprint).decode('utf-8').rstrip('=') thumbprint = base64.urlsafe_b64encode(thumbprint).decode('utf-8').rstrip('=')
# Combine token and key authorization # Combine token and key authorization
key_auth = f"{challenge['token']}.{thumbprint}" key_auth = f"{challenge['token']}.{thumbprint}"
logging.CyberCPLogFileWriter.writeToFile(f'Key authorization: {key_auth}') logging.CyberCPLogFileWriter.writeToFile(f'Key authorization: {key_auth}')
@@ -534,31 +540,31 @@ class CustomACME:
def _verify_challenge(self, challenge_url): def _verify_challenge(self, challenge_url):
"""Verify challenge completion with the ACME server """Verify challenge completion with the ACME server
This function sends a POST request to the ACME server to verify that the challenge This function sends a POST request to the ACME server to verify that the challenge
has been completed successfully. The challenge URL is provided by the ACME server has been completed successfully. The challenge URL is provided by the ACME server
when the challenge is created. when the challenge is created.
Example challenge_url: Example challenge_url:
"https://acme-v02.api.letsencrypt.org/acme/challenge/example.com/123456" "https://acme-v02.api.letsencrypt.org/acme/challenge/example.com/123456"
The verification process: The verification process:
1. Creates an empty payload (POST-as-GET request) 1. Creates an empty payload (POST-as-GET request)
2. Creates a JWS (JSON Web Signature) with the payload 2. Creates a JWS (JSON Web Signature) with the payload
3. Sends the request to the ACME server 3. Sends the request to the ACME server
4. Checks the response status 4. Checks the response status
Returns: Returns:
bool: True if challenge is verified successfully, False otherwise bool: True if challenge is verified successfully, False otherwise
""" """
try: try:
logging.CyberCPLogFileWriter.writeToFile(f'Verifying challenge at URL: {challenge_url}') logging.CyberCPLogFileWriter.writeToFile(f'Verifying challenge at URL: {challenge_url}')
# Create empty payload for POST-as-GET request # Create empty payload for POST-as-GET request
# This is a special type of request where we want to GET a resource # This is a special type of request where we want to GET a resource
# but need to include a signature, so we use POST with an empty payload # but need to include a signature, so we use POST with an empty payload
payload = {} payload = {}
# Create JWS (JSON Web Signature) for the request # Create JWS (JSON Web Signature) for the request
# Example JWS might look like: # Example JWS might look like:
# { # {
@@ -570,15 +576,15 @@ class CustomACME:
if not jws: if not jws:
logging.CyberCPLogFileWriter.writeToFile('Failed to create JWS for challenge verification') logging.CyberCPLogFileWriter.writeToFile('Failed to create JWS for challenge verification')
return False return False
logging.CyberCPLogFileWriter.writeToFile('Sending challenge verification request...') logging.CyberCPLogFileWriter.writeToFile('Sending challenge verification request...')
# Set headers for the request # Set headers for the request
# Content-Type: application/jose+json indicates we're sending a JWS # Content-Type: application/jose+json indicates we're sending a JWS
headers = { headers = {
'Content-Type': 'application/jose+json' 'Content-Type': 'application/jose+json'
} }
# Send the verification request to the ACME server # Send the verification request to the ACME server
# Example response might look like: # Example response might look like:
# { # {
@@ -590,7 +596,7 @@ class CustomACME:
response = requests.post(challenge_url, data=jws, headers=headers) response = requests.post(challenge_url, data=jws, headers=headers)
logging.CyberCPLogFileWriter.writeToFile(f'Challenge verification response status: {response.status_code}') logging.CyberCPLogFileWriter.writeToFile(f'Challenge verification response status: {response.status_code}')
logging.CyberCPLogFileWriter.writeToFile(f'Challenge verification response: {response.text}') logging.CyberCPLogFileWriter.writeToFile(f'Challenge verification response: {response.text}')
# Check if the challenge was verified successfully # Check if the challenge was verified successfully
# Status code 200 indicates success # Status code 200 indicates success
# The response will contain the challenge status and validation time # The response will contain the challenge status and validation time
@@ -609,12 +615,12 @@ class CustomACME:
payload = { payload = {
"csr": base64.urlsafe_b64encode(csr).decode('utf-8').rstrip('=') "csr": base64.urlsafe_b64encode(csr).decode('utf-8').rstrip('=')
} }
jws = self._create_jws(payload, self.finalize_url) jws = self._create_jws(payload, self.finalize_url)
if not jws: if not jws:
logging.CyberCPLogFileWriter.writeToFile('Failed to create JWS for order finalization') logging.CyberCPLogFileWriter.writeToFile('Failed to create JWS for order finalization')
return False return False
logging.CyberCPLogFileWriter.writeToFile('Sending order finalization request...') logging.CyberCPLogFileWriter.writeToFile('Sending order finalization request...')
headers = { headers = {
'Content-Type': 'application/jose+json' 'Content-Type': 'application/jose+json'
@@ -622,7 +628,7 @@ class CustomACME:
response = requests.post(self.finalize_url, data=jws, headers=headers) response = requests.post(self.finalize_url, data=jws, headers=headers)
logging.CyberCPLogFileWriter.writeToFile(f'Order finalization response status: {response.status_code}') logging.CyberCPLogFileWriter.writeToFile(f'Order finalization response status: {response.status_code}')
logging.CyberCPLogFileWriter.writeToFile(f'Order finalization response: {response.text}') logging.CyberCPLogFileWriter.writeToFile(f'Order finalization response: {response.text}')
if response.status_code == 200: if response.status_code == 200:
# Wait for order to be processed # Wait for order to be processed
max_attempts = 30 max_attempts = 30
@@ -631,33 +637,36 @@ class CustomACME:
if not self._get_nonce(): if not self._get_nonce():
logging.CyberCPLogFileWriter.writeToFile('Failed to get nonce for order status check') logging.CyberCPLogFileWriter.writeToFile('Failed to get nonce for order status check')
return False return False
# Use POST-as-GET for order status check # Use POST-as-GET for order status check
jws = self._create_jws(None, self.order_url) jws = self._create_jws(None, self.order_url)
if not jws: if not jws:
logging.CyberCPLogFileWriter.writeToFile('Failed to create JWS for order status check') logging.CyberCPLogFileWriter.writeToFile('Failed to create JWS for order status check')
return False return False
response = requests.post(self.order_url, data=jws, headers=headers) response = requests.post(self.order_url, data=jws, headers=headers)
logging.CyberCPLogFileWriter.writeToFile(f'Order status check response: {response.text}') logging.CyberCPLogFileWriter.writeToFile(f'Order status check response: {response.text}')
if response.status_code == 200: if response.status_code == 200:
order_status = response.json().get('status') order_status = response.json().get('status')
if order_status == 'valid': if order_status == 'valid':
self.certificate_url = response.json().get('certificate') self.certificate_url = response.json().get('certificate')
logging.CyberCPLogFileWriter.writeToFile(f'Successfully finalized order. Certificate URL: {self.certificate_url}') logging.CyberCPLogFileWriter.writeToFile(
f'Successfully finalized order. Certificate URL: {self.certificate_url}')
return True return True
elif order_status == 'invalid': elif order_status == 'invalid':
logging.CyberCPLogFileWriter.writeToFile('Order validation failed') logging.CyberCPLogFileWriter.writeToFile('Order validation failed')
return False return False
elif order_status == 'processing': elif order_status == 'processing':
logging.CyberCPLogFileWriter.writeToFile(f'Order still processing, attempt {attempt + 1}/{max_attempts}') logging.CyberCPLogFileWriter.writeToFile(
f'Order still processing, attempt {attempt + 1}/{max_attempts}')
time.sleep(delay) time.sleep(delay)
continue continue
logging.CyberCPLogFileWriter.writeToFile(f'Order status check failed, attempt {attempt + 1}/{max_attempts}') logging.CyberCPLogFileWriter.writeToFile(
f'Order status check failed, attempt {attempt + 1}/{max_attempts}')
time.sleep(delay) time.sleep(delay)
logging.CyberCPLogFileWriter.writeToFile('Order processing timed out') logging.CyberCPLogFileWriter.writeToFile('Order processing timed out')
return False return False
return False return False
@@ -670,16 +679,31 @@ class CustomACME:
try: try:
logging.CyberCPLogFileWriter.writeToFile('Downloading certificate...') logging.CyberCPLogFileWriter.writeToFile('Downloading certificate...')
logging.CyberCPLogFileWriter.writeToFile(f'Certificate URL: {self.certificate_url}') logging.CyberCPLogFileWriter.writeToFile(f'Certificate URL: {self.certificate_url}')
# For certificate downloads, we can use a simple GET request # Get a fresh nonce for the request
response = requests.get(self.certificate_url) if not self._get_nonce():
logging.CyberCPLogFileWriter.writeToFile('Failed to get nonce for certificate download')
return None
# Use POST-as-GET for certificate download (ACME v2 requirement)
jws = self._create_jws(None, self.certificate_url)
if not jws:
logging.CyberCPLogFileWriter.writeToFile('Failed to create JWS for certificate download')
return None
headers = {
'Content-Type': 'application/jose+json'
}
response = requests.post(self.certificate_url, data=jws, headers=headers)
logging.CyberCPLogFileWriter.writeToFile(f'Certificate download response status: {response.status_code}') logging.CyberCPLogFileWriter.writeToFile(f'Certificate download response status: {response.status_code}')
logging.CyberCPLogFileWriter.writeToFile(f'Certificate download response headers: {response.headers}') logging.CyberCPLogFileWriter.writeToFile(f'Certificate download response headers: {response.headers}')
logging.CyberCPLogFileWriter.writeToFile(f'Certificate download response content: {response.text}')
if response.status_code == 200: if response.status_code == 200:
logging.CyberCPLogFileWriter.writeToFile('Successfully downloaded certificate') logging.CyberCPLogFileWriter.writeToFile('Successfully downloaded certificate')
return response.content # The response should be the PEM-encoded certificate chain
return response.text.encode('utf-8') if isinstance(response.text, str) else response.content
else:
logging.CyberCPLogFileWriter.writeToFile(f'Certificate download failed: {response.text}')
return None return None
except Exception as e: except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f'Error downloading certificate: {str(e)}') logging.CyberCPLogFileWriter.writeToFile(f'Error downloading certificate: {str(e)}')
@@ -693,19 +717,19 @@ class CustomACME:
if not self._get_nonce(): if not self._get_nonce():
logging.CyberCPLogFileWriter.writeToFile('Failed to get nonce for challenge status check') logging.CyberCPLogFileWriter.writeToFile('Failed to get nonce for challenge status check')
return False return False
# Use POST-as-GET for challenge status check # Use POST-as-GET for challenge status check
jws = self._create_jws(None, challenge_url) jws = self._create_jws(None, challenge_url)
if not jws: if not jws:
logging.CyberCPLogFileWriter.writeToFile('Failed to create JWS for challenge status check') logging.CyberCPLogFileWriter.writeToFile('Failed to create JWS for challenge status check')
return False return False
headers = { headers = {
'Content-Type': 'application/jose+json' 'Content-Type': 'application/jose+json'
} }
response = requests.post(challenge_url, data=jws, headers=headers) response = requests.post(challenge_url, data=jws, headers=headers)
logging.CyberCPLogFileWriter.writeToFile(f'Challenge status check response: {response.text}') logging.CyberCPLogFileWriter.writeToFile(f'Challenge status check response: {response.text}')
if response.status_code == 200: if response.status_code == 200:
challenge_status = response.json().get('status') challenge_status = response.json().get('status')
if challenge_status == 'valid': if challenge_status == 'valid':
@@ -714,10 +738,11 @@ class CustomACME:
elif challenge_status == 'invalid': elif challenge_status == 'invalid':
logging.CyberCPLogFileWriter.writeToFile('Challenge validation failed') logging.CyberCPLogFileWriter.writeToFile('Challenge validation failed')
return False return False
logging.CyberCPLogFileWriter.writeToFile(f'Challenge still pending, attempt {attempt + 1}/{max_attempts}') logging.CyberCPLogFileWriter.writeToFile(
f'Challenge still pending, attempt {attempt + 1}/{max_attempts}')
time.sleep(delay) time.sleep(delay)
logging.CyberCPLogFileWriter.writeToFile('Challenge validation timed out') logging.CyberCPLogFileWriter.writeToFile('Challenge validation timed out')
return False return False
except Exception as e: except Exception as e:
@@ -726,40 +751,40 @@ class CustomACME:
def _check_dns_record(self, domain): def _check_dns_record(self, domain):
"""Check if a domain has valid DNS records """Check if a domain has valid DNS records
This function performs multiple DNS checks to ensure the domain has valid DNS records. This function performs multiple DNS checks to ensure the domain has valid DNS records.
It includes: It includes:
1. A record (IPv4) check 1. A record (IPv4) check
2. AAAA record (IPv6) check 2. AAAA record (IPv6) check
3. DNS caching prevention 3. DNS caching prevention
4. Multiple DNS server checks 4. Multiple DNS server checks
Args: Args:
domain (str): The domain to check domain (str): The domain to check
Returns: Returns:
bool: True if valid DNS records are found, False otherwise bool: True if valid DNS records are found, False otherwise
""" """
try: try:
logging.CyberCPLogFileWriter.writeToFile(f'Checking DNS records for domain: {domain}') logging.CyberCPLogFileWriter.writeToFile(f'Checking DNS records for domain: {domain}')
# List of public DNS servers to check against # List of public DNS servers to check against
dns_servers = [ dns_servers = [
'8.8.8.8', # Google DNS '8.8.8.8', # Google DNS
'1.1.1.1', # Cloudflare DNS '1.1.1.1', # Cloudflare DNS
'208.67.222.222' # OpenDNS '208.67.222.222' # OpenDNS
] ]
# Function to check DNS record with specific DNS server # Function to check DNS record with specific DNS server
def check_with_dns_server(server, record_type='A'): def check_with_dns_server(server, record_type='A'):
try: try:
# Create a new socket for each check # Create a new socket for each check
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(5) # 5 second timeout sock.settimeout(5) # 5 second timeout
# Set the DNS server # Set the DNS server
sock.connect((server, 53)) sock.connect((server, 53))
# Create DNS query # Create DNS query
query = bytearray() query = bytearray()
# DNS header # DNS header
@@ -769,54 +794,54 @@ class CustomACME:
query += b'\x00\x00' # Answer RRs: 0 query += b'\x00\x00' # Answer RRs: 0
query += b'\x00\x00' # Authority RRs: 0 query += b'\x00\x00' # Authority RRs: 0
query += b'\x00\x00' # Additional RRs: 0 query += b'\x00\x00' # Additional RRs: 0
# Domain name # Domain name
for part in domain.split('.'): for part in domain.split('.'):
query.append(len(part)) query.append(len(part))
query.extend(part.encode()) query.extend(part.encode())
query += b'\x00' # End of domain name query += b'\x00' # End of domain name
# Query type and class # Query type and class
if record_type == 'A': if record_type == 'A':
query += b'\x00\x01' # Type: A query += b'\x00\x01' # Type: A
else: # AAAA else: # AAAA
query += b'\x00\x1c' # Type: AAAA query += b'\x00\x1c' # Type: AAAA
query += b'\x00\x01' # Class: IN query += b'\x00\x01' # Class: IN
# Send query # Send query
sock.send(query) sock.send(query)
# Receive response # Receive response
response = sock.recv(1024) response = sock.recv(1024)
# Check if we got a valid response # Check if we got a valid response
if len(response) > 12: # Minimum DNS response size if len(response) > 12: # Minimum DNS response size
# Check if there are answers in the response # Check if there are answers in the response
answer_count = int.from_bytes(response[6:8], 'big') answer_count = int.from_bytes(response[6:8], 'big')
if answer_count > 0: if answer_count > 0:
return True return True
return False return False
except Exception as e: except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f'Error checking DNS with server {server}: {str(e)}') logging.CyberCPLogFileWriter.writeToFile(f'Error checking DNS with server {server}: {str(e)}')
return False return False
finally: finally:
sock.close() sock.close()
# Check A records (IPv4) with multiple DNS servers # Check A records (IPv4) with multiple DNS servers
a_record_found = False a_record_found = False
for server in dns_servers: for server in dns_servers:
if check_with_dns_server(server, 'A'): if check_with_dns_server(server, 'A'):
a_record_found = True a_record_found = True
break break
# Check AAAA records (IPv6) with multiple DNS servers # Check AAAA records (IPv6) with multiple DNS servers
aaaa_record_found = False aaaa_record_found = False
for server in dns_servers: for server in dns_servers:
if check_with_dns_server(server, 'AAAA'): if check_with_dns_server(server, 'AAAA'):
aaaa_record_found = True aaaa_record_found = True
break break
# Also check with system's DNS resolver as a fallback # Also check with system's DNS resolver as a fallback
try: try:
# Try to resolve A record (IPv4) # Try to resolve A record (IPv4)
@@ -824,23 +849,23 @@ class CustomACME:
a_record_found = True a_record_found = True
except socket.gaierror: except socket.gaierror:
pass pass
try: try:
# Try to resolve AAAA record (IPv6) # Try to resolve AAAA record (IPv6)
socket.getaddrinfo(domain, None, socket.AF_INET6) socket.getaddrinfo(domain, None, socket.AF_INET6)
aaaa_record_found = True aaaa_record_found = True
except socket.gaierror: except socket.gaierror:
pass pass
# Log the results # Log the results
if a_record_found: if a_record_found:
logging.CyberCPLogFileWriter.writeToFile(f'IPv4 DNS record found for domain: {domain}') logging.CyberCPLogFileWriter.writeToFile(f'IPv4 DNS record found for domain: {domain}')
if aaaa_record_found: if aaaa_record_found:
logging.CyberCPLogFileWriter.writeToFile(f'IPv6 DNS record found for domain: {domain}') logging.CyberCPLogFileWriter.writeToFile(f'IPv6 DNS record found for domain: {domain}')
# Return True if either A or AAAA record is found # Return True if either A or AAAA record is found
return a_record_found or aaaa_record_found return a_record_found or aaaa_record_found
except Exception as e: except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f'Error checking DNS records: {str(e)}') logging.CyberCPLogFileWriter.writeToFile(f'Error checking DNS records: {str(e)}')
return False return False
@@ -853,19 +878,19 @@ class CustomACME:
if not self._get_nonce(): if not self._get_nonce():
logging.CyberCPLogFileWriter.writeToFile('Failed to get nonce for order status check') logging.CyberCPLogFileWriter.writeToFile('Failed to get nonce for order status check')
return False return False
# Use POST-as-GET for order status check # Use POST-as-GET for order status check
jws = self._create_jws(None, self.order_url) jws = self._create_jws(None, self.order_url)
if not jws: if not jws:
logging.CyberCPLogFileWriter.writeToFile('Failed to create JWS for order status check') logging.CyberCPLogFileWriter.writeToFile('Failed to create JWS for order status check')
return False return False
headers = { headers = {
'Content-Type': 'application/jose+json' 'Content-Type': 'application/jose+json'
} }
response = requests.post(self.order_url, data=jws, headers=headers) response = requests.post(self.order_url, data=jws, headers=headers)
logging.CyberCPLogFileWriter.writeToFile(f'Order status check response: {response.text}') logging.CyberCPLogFileWriter.writeToFile(f'Order status check response: {response.text}')
if response.status_code == 200: if response.status_code == 200:
order_status = response.json().get('status') order_status = response.json().get('status')
if order_status == 'valid': if order_status == 'valid':
@@ -876,13 +901,15 @@ class CustomACME:
logging.CyberCPLogFileWriter.writeToFile('Order validation failed') logging.CyberCPLogFileWriter.writeToFile('Order validation failed')
return False return False
elif order_status == 'processing': elif order_status == 'processing':
logging.CyberCPLogFileWriter.writeToFile(f'Order still processing, attempt {attempt + 1}/{max_attempts}') logging.CyberCPLogFileWriter.writeToFile(
f'Order still processing, attempt {attempt + 1}/{max_attempts}')
time.sleep(delay) time.sleep(delay)
continue continue
logging.CyberCPLogFileWriter.writeToFile(f'Order status check failed, attempt {attempt + 1}/{max_attempts}') logging.CyberCPLogFileWriter.writeToFile(
f'Order status check failed, attempt {attempt + 1}/{max_attempts}')
time.sleep(delay) time.sleep(delay)
logging.CyberCPLogFileWriter.writeToFile('Order processing timed out') logging.CyberCPLogFileWriter.writeToFile('Order processing timed out')
return False return False
except Exception as e: except Exception as e:
@@ -892,14 +919,15 @@ class CustomACME:
def issue_certificate(self, domains, use_dns=False): def issue_certificate(self, domains, use_dns=False):
"""Main method to issue certificate""" """Main method to issue certificate"""
try: try:
logging.CyberCPLogFileWriter.writeToFile(f'Starting certificate issuance for domains: {domains}, use_dns: {use_dns}') logging.CyberCPLogFileWriter.writeToFile(
f'Starting certificate issuance for domains: {domains}, use_dns: {use_dns}')
# Try to load existing account key first # Try to load existing account key first
if self._load_account_key(): if self._load_account_key():
logging.CyberCPLogFileWriter.writeToFile('Using existing account key') logging.CyberCPLogFileWriter.writeToFile('Using existing account key')
else: else:
logging.CyberCPLogFileWriter.writeToFile('No existing account key found, will create new one') logging.CyberCPLogFileWriter.writeToFile('No existing account key found, will create new one')
# Filter domains to only include those with valid DNS records # Filter domains to only include those with valid DNS records
valid_domains = [] valid_domains = []
for domain in domains: for domain in domains:
@@ -907,27 +935,27 @@ class CustomACME:
valid_domains.append(domain) valid_domains.append(domain)
else: else:
logging.CyberCPLogFileWriter.writeToFile(f'Skipping domain {domain} due to missing DNS records') logging.CyberCPLogFileWriter.writeToFile(f'Skipping domain {domain} due to missing DNS records')
if not valid_domains: if not valid_domains:
logging.CyberCPLogFileWriter.writeToFile('No valid domains found with DNS records') logging.CyberCPLogFileWriter.writeToFile('No valid domains found with DNS records')
return False return False
# Initialize ACME # Initialize ACME
logging.CyberCPLogFileWriter.writeToFile('Step 1: Generating account key') logging.CyberCPLogFileWriter.writeToFile('Step 1: Generating account key')
if not self._generate_account_key(): if not self._generate_account_key():
logging.CyberCPLogFileWriter.writeToFile('Failed to generate account key') logging.CyberCPLogFileWriter.writeToFile('Failed to generate account key')
return False return False
logging.CyberCPLogFileWriter.writeToFile('Step 2: Getting ACME directory') logging.CyberCPLogFileWriter.writeToFile('Step 2: Getting ACME directory')
if not self._get_directory(): if not self._get_directory():
logging.CyberCPLogFileWriter.writeToFile('Failed to get ACME directory') logging.CyberCPLogFileWriter.writeToFile('Failed to get ACME directory')
return False return False
logging.CyberCPLogFileWriter.writeToFile('Step 3: Getting nonce') logging.CyberCPLogFileWriter.writeToFile('Step 3: Getting nonce')
if not self._get_nonce(): if not self._get_nonce():
logging.CyberCPLogFileWriter.writeToFile('Failed to get nonce') logging.CyberCPLogFileWriter.writeToFile('Failed to get nonce')
return False return False
logging.CyberCPLogFileWriter.writeToFile('Step 4: Creating account') logging.CyberCPLogFileWriter.writeToFile('Step 4: Creating account')
if not self._create_account(): if not self._create_account():
logging.CyberCPLogFileWriter.writeToFile('Failed to create account') logging.CyberCPLogFileWriter.writeToFile('Failed to create account')
@@ -944,13 +972,13 @@ class CustomACME:
return False return False
else: else:
return False return False
# Create order with only valid domains # Create order with only valid domains
logging.CyberCPLogFileWriter.writeToFile('Step 5: Creating order') logging.CyberCPLogFileWriter.writeToFile('Step 5: Creating order')
if not self._create_order(valid_domains): if not self._create_order(valid_domains):
logging.CyberCPLogFileWriter.writeToFile('Failed to create order') logging.CyberCPLogFileWriter.writeToFile('Failed to create order')
return False return False
# Handle challenges # Handle challenges
logging.CyberCPLogFileWriter.writeToFile('Step 6: Handling challenges') logging.CyberCPLogFileWriter.writeToFile('Step 6: Handling challenges')
for auth_url in self.authorizations: for auth_url in self.authorizations:
@@ -958,7 +986,7 @@ class CustomACME:
if not self._get_nonce(): if not self._get_nonce():
logging.CyberCPLogFileWriter.writeToFile('Failed to get nonce for authorization') logging.CyberCPLogFileWriter.writeToFile('Failed to get nonce for authorization')
return False return False
# Get authorization details with POST-as-GET request # Get authorization details with POST-as-GET request
# ACME protocol requires POST with empty payload for fetching resources # ACME protocol requires POST with empty payload for fetching resources
logging.CyberCPLogFileWriter.writeToFile(f'Fetching authorization details for: {auth_url}') logging.CyberCPLogFileWriter.writeToFile(f'Fetching authorization details for: {auth_url}')
@@ -966,22 +994,22 @@ class CustomACME:
if not jws: if not jws:
logging.CyberCPLogFileWriter.writeToFile('Failed to create JWS for authorization request') logging.CyberCPLogFileWriter.writeToFile('Failed to create JWS for authorization request')
return False return False
headers = { headers = {
'Content-Type': 'application/jose+json' 'Content-Type': 'application/jose+json'
} }
response = requests.post(auth_url, data=jws, headers=headers) response = requests.post(auth_url, data=jws, headers=headers)
logging.CyberCPLogFileWriter.writeToFile(f'Authorization response status: {response.status_code}') logging.CyberCPLogFileWriter.writeToFile(f'Authorization response status: {response.status_code}')
logging.CyberCPLogFileWriter.writeToFile(f'Authorization response: {response.text}') logging.CyberCPLogFileWriter.writeToFile(f'Authorization response: {response.text}')
if response.status_code != 200: if response.status_code != 200:
logging.CyberCPLogFileWriter.writeToFile('Failed to get authorization') logging.CyberCPLogFileWriter.writeToFile('Failed to get authorization')
return False return False
challenges = response.json()['challenges'] challenges = response.json()['challenges']
for challenge in challenges: for challenge in challenges:
logging.CyberCPLogFileWriter.writeToFile(f'Processing challenge: {json.dumps(challenge)}') logging.CyberCPLogFileWriter.writeToFile(f'Processing challenge: {json.dumps(challenge)}')
# Only handle the challenge type we're using # Only handle the challenge type we're using
if use_dns and challenge['type'] == 'dns-01': if use_dns and challenge['type'] == 'dns-01':
if not self._handle_dns_challenge(challenge): if not self._handle_dns_challenge(challenge):
@@ -1005,7 +1033,7 @@ class CustomACME:
return False return False
else: else:
logging.CyberCPLogFileWriter.writeToFile(f'Skipping {challenge["type"]} challenge') logging.CyberCPLogFileWriter.writeToFile(f'Skipping {challenge["type"]} challenge')
# Generate CSR # Generate CSR
logging.CyberCPLogFileWriter.writeToFile('Step 7: Generating CSR') logging.CyberCPLogFileWriter.writeToFile('Step 7: Generating CSR')
key = rsa.generate_private_key( key = rsa.generate_private_key(
@@ -1013,7 +1041,7 @@ class CustomACME:
key_size=2048, key_size=2048,
backend=default_backend() backend=default_backend()
) )
# Get the domain from the order response # Get the domain from the order response
# Use POST-as-GET to get order details # Use POST-as-GET to get order details
jws = self._create_jws(None, self.order_url) jws = self._create_jws(None, self.order_url)
@@ -1023,7 +1051,7 @@ class CustomACME:
order_response = requests.post(self.order_url, data=jws, headers=headers).json() order_response = requests.post(self.order_url, data=jws, headers=headers).json()
order_domains = [identifier['value'] for identifier in order_response['identifiers']] order_domains = [identifier['value'] for identifier in order_response['identifiers']]
logging.CyberCPLogFileWriter.writeToFile(f'Order domains: {order_domains}') logging.CyberCPLogFileWriter.writeToFile(f'Order domains: {order_domains}')
# Create CSR with exactly the domains from the order # Create CSR with exactly the domains from the order
csr = x509.CertificateSigningRequestBuilder().subject_name( csr = x509.CertificateSigningRequestBuilder().subject_name(
x509.Name([ x509.Name([
@@ -1035,39 +1063,39 @@ class CustomACME:
]), ]),
critical=False critical=False
).sign(key, hashes.SHA256(), default_backend()) ).sign(key, hashes.SHA256(), default_backend())
# Finalize order # Finalize order
logging.CyberCPLogFileWriter.writeToFile('Step 8: Finalizing order') logging.CyberCPLogFileWriter.writeToFile('Step 8: Finalizing order')
if not self._finalize_order(csr.public_bytes(serialization.Encoding.DER)): if not self._finalize_order(csr.public_bytes(serialization.Encoding.DER)):
logging.CyberCPLogFileWriter.writeToFile('Failed to finalize order') logging.CyberCPLogFileWriter.writeToFile('Failed to finalize order')
return False return False
# Wait for order processing # Wait for order processing
logging.CyberCPLogFileWriter.writeToFile('Step 9: Waiting for order processing') logging.CyberCPLogFileWriter.writeToFile('Step 9: Waiting for order processing')
if not self._wait_for_order_processing(): if not self._wait_for_order_processing():
logging.CyberCPLogFileWriter.writeToFile('Failed to process order') logging.CyberCPLogFileWriter.writeToFile('Failed to process order')
return False return False
# Download certificate # Download certificate
logging.CyberCPLogFileWriter.writeToFile('Step 10: Downloading certificate') logging.CyberCPLogFileWriter.writeToFile('Step 10: Downloading certificate')
certificate = self._download_certificate() certificate = self._download_certificate()
if not certificate: if not certificate:
logging.CyberCPLogFileWriter.writeToFile('Failed to download certificate') logging.CyberCPLogFileWriter.writeToFile('Failed to download certificate')
return False return False
# Save certificate # Save certificate
logging.CyberCPLogFileWriter.writeToFile('Step 11: Saving certificate') logging.CyberCPLogFileWriter.writeToFile('Step 11: Saving certificate')
if not os.path.exists(self.cert_path): if not os.path.exists(self.cert_path):
logging.CyberCPLogFileWriter.writeToFile(f'Creating certificate directory: {self.cert_path}') logging.CyberCPLogFileWriter.writeToFile(f'Creating certificate directory: {self.cert_path}')
os.makedirs(self.cert_path) os.makedirs(self.cert_path)
cert_file = os.path.join(self.cert_path, 'fullchain.pem') cert_file = os.path.join(self.cert_path, 'fullchain.pem')
key_file = os.path.join(self.cert_path, 'privkey.pem') key_file = os.path.join(self.cert_path, 'privkey.pem')
logging.CyberCPLogFileWriter.writeToFile(f'Saving certificate to: {cert_file}') logging.CyberCPLogFileWriter.writeToFile(f'Saving certificate to: {cert_file}')
with open(cert_file, 'wb') as f: with open(cert_file, 'wb') as f:
f.write(certificate) f.write(certificate)
logging.CyberCPLogFileWriter.writeToFile(f'Saving private key to: {key_file}') logging.CyberCPLogFileWriter.writeToFile(f'Saving private key to: {key_file}')
with open(key_file, 'wb') as f: with open(key_file, 'wb') as f:
f.write(key.private_bytes( f.write(key.private_bytes(
@@ -1075,7 +1103,7 @@ class CustomACME:
format=serialization.PrivateFormat.PKCS8, format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption() encryption_algorithm=serialization.NoEncryption()
)) ))
logging.CyberCPLogFileWriter.writeToFile('Successfully completed certificate issuance') logging.CyberCPLogFileWriter.writeToFile('Successfully completed certificate issuance')
return True return True
except Exception as e: except Exception as e:

View File

@@ -6,6 +6,7 @@ import shlex
import subprocess import subprocess
import socket import socket
from plogical.processUtilities import ProcessUtilities from plogical.processUtilities import ProcessUtilities
try: try:
from websiteFunctions.models import ChildDomains, Websites from websiteFunctions.models import ChildDomains, Websites
except: except:
@@ -14,18 +15,17 @@ from plogical.acl import ACLManager
class sslUtilities: class sslUtilities:
Server_root = "/usr/local/lsws" Server_root = "/usr/local/lsws"
redisConf = '/usr/local/lsws/conf/dvhost_redis.conf' redisConf = '/usr/local/lsws/conf/dvhost_redis.conf'
@staticmethod @staticmethod
def parseACMEError(error_output): def parseACMEError(error_output):
"""Parse ACME error output to extract meaningful error messages""" """Parse ACME error output to extract meaningful error messages"""
if not error_output: if not error_output:
return "Unknown error occurred" return "Unknown error occurred"
error_output = str(error_output) error_output = str(error_output)
# Common ACME/Let's Encrypt errors # Common ACME/Let's Encrypt errors
error_patterns = { error_patterns = {
r"rateLimited": "Rate limit exceeded. Too many certificates issued for this domain. Please wait before retrying.", r"rateLimited": "Rate limit exceeded. Too many certificates issued for this domain. Please wait before retrying.",
@@ -61,7 +61,7 @@ class sslUtilities:
r"blacklisted": "Domain is blacklisted by the certificate authority.", r"blacklisted": "Domain is blacklisted by the certificate authority.",
r"PolicyForbids": "Certificate authority policy forbids issuance for this domain." r"PolicyForbids": "Certificate authority policy forbids issuance for this domain."
} }
# Check each pattern # Check each pattern
import re import re
for pattern, message in error_patterns.items(): for pattern, message in error_patterns.items():
@@ -73,7 +73,7 @@ class sslUtilities:
message += f" Detail: {line.split('Detail:')[1].strip()}" message += f" Detail: {line.split('Detail:')[1].strip()}"
break break
return message return message
# Try to extract specific error details from acme.sh output # Try to extract specific error details from acme.sh output
if "[" in error_output and "]" in error_output: if "[" in error_output and "]" in error_output:
# Extract content between brackets which often contains the error # Extract content between brackets which often contains the error
@@ -84,20 +84,20 @@ class sslUtilities:
potential_error = bracket_content[-1] potential_error = bracket_content[-1]
if len(potential_error) > 10: # Make sure it's meaningful if len(potential_error) > 10: # Make sure it's meaningful
return f"SSL issuance failed: {potential_error}" return f"SSL issuance failed: {potential_error}"
# Look for lines starting with "Error:" or containing "error:" # Look for lines starting with "Error:" or containing "error:"
lines = error_output.split('\n') lines = error_output.split('\n')
for line in lines: for line in lines:
if line.strip().startswith('Error:') or 'error:' in line.lower(): if line.strip().startswith('Error:') or 'error:' in line.lower():
return line.strip() return line.strip()
# If we can't parse a specific error, return a portion of the output # If we can't parse a specific error, return a portion of the output
if len(error_output) > 200: if len(error_output) > 200:
# Get the last 200 characters which likely contain the error # Get the last 200 characters which likely contain the error
return f"SSL issuance failed: ...{error_output[-200:]}" return f"SSL issuance failed: ...{error_output[-200:]}"
return f"SSL issuance failed: {error_output}" return f"SSL issuance failed: {error_output}"
@staticmethod @staticmethod
def checkDNSRecords(domain): def checkDNSRecords(domain):
"""Check if domain has valid DNS records using external DNS query""" """Check if domain has valid DNS records using external DNS query"""
@@ -108,23 +108,25 @@ class sslUtilities:
result = subprocess.run(command, shell=True, capture_output=True, text=True) result = subprocess.run(command, shell=True, capture_output=True, text=True)
except TypeError: except TypeError:
# Fallback for Python < 3.7 # Fallback for Python < 3.7
result = subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) result = subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True)
# If there's any output, the domain has A records # If there's any output, the domain has A records
if result.stdout.strip(): if result.stdout.strip():
return True return True
# Also check AAAA records # Also check AAAA records
command = f"dig +short {domain} AAAA @8.8.8.8" command = f"dig +short {domain} AAAA @8.8.8.8"
try: try:
result = subprocess.run(command, shell=True, capture_output=True, text=True) result = subprocess.run(command, shell=True, capture_output=True, text=True)
except TypeError: except TypeError:
# Fallback for Python < 3.7 # Fallback for Python < 3.7
result = subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) result = subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True)
if result.stdout.strip(): if result.stdout.strip():
return True return True
return False return False
except: except:
# Fallback to socket method if dig fails # Fallback to socket method if dig fails
@@ -164,7 +166,6 @@ class sslUtilities:
except BaseException as msg: except BaseException as msg:
return 0, str(msg) return 0, str(msg)
@staticmethod @staticmethod
def CheckIfSSLNeedsToBeIssued(virtualHostName): def CheckIfSSLNeedsToBeIssued(virtualHostName):
#### if website already have an SSL, better not issue again - need to check for wild-card #### if website already have an SSL, better not issue again - need to check for wild-card
@@ -177,7 +178,6 @@ class sslUtilities:
if os.path.exists(ProcessUtilities.debugPath): if os.path.exists(ProcessUtilities.debugPath):
logging.CyberCPLogFileWriter.writeToFile(f'SSL provider for {virtualHostName} is {SSLProvider}.') logging.CyberCPLogFileWriter.writeToFile(f'SSL provider for {virtualHostName} is {SSLProvider}.')
#### totally seprate check to see if both non-www and www are covered #### totally seprate check to see if both non-www and www are covered
if SSLProvider == "(STAGING) Let's Encrypt": if SSLProvider == "(STAGING) Let's Encrypt":
@@ -189,7 +189,8 @@ class sslUtilities:
if len(domains) > 1: if len(domains) > 1:
### need further checks here to see if ssl is valid for less then 15 days etc ### need further checks here to see if ssl is valid for less then 15 days etc
logging.CyberCPLogFileWriter.writeToFile( logging.CyberCPLogFileWriter.writeToFile(
'[CheckIfSSLNeedsToBeIssued] SSL exists for %s and both versions are covered, just need to ensure if SSL is valid for less then 15 days.' % (virtualHostName), 0) '[CheckIfSSLNeedsToBeIssued] SSL exists for %s and both versions are covered, just need to ensure if SSL is valid for less then 15 days.' % (
virtualHostName), 0)
pass pass
else: else:
return sslUtilities.ISSUE_SSL return sslUtilities.ISSUE_SSL
@@ -202,7 +203,7 @@ class sslUtilities:
now = datetime.now() now = datetime.now()
diff = finalDate - now diff = finalDate - now
if int(diff.days) >= 15 and SSLProvider!='Denial': if int(diff.days) >= 15 and SSLProvider != 'Denial':
logging.CyberCPLogFileWriter.writeToFile( logging.CyberCPLogFileWriter.writeToFile(
'[CheckIfSSLNeedsToBeIssued] SSL exists for %s and is not ready to fetch new SSL., skipping..' % ( '[CheckIfSSLNeedsToBeIssued] SSL exists for %s and is not ready to fetch new SSL., skipping..' % (
virtualHostName), 0) virtualHostName), 0)
@@ -260,8 +261,7 @@ class sslUtilities:
logging.CyberCPLogFileWriter.writeToFile(str(msg) + " [IO Error with main config file [checkSSLListener]]") logging.CyberCPLogFileWriter.writeToFile(str(msg) + " [IO Error with main config file [checkSSLListener]]")
return str(msg) return str(msg)
return 0 return 0
@staticmethod @staticmethod
def checkSSLIPv6Listener(): def checkSSLIPv6Listener():
try: try:
@@ -271,7 +271,8 @@ class sslUtilities:
return 1 return 1
except BaseException as msg: except BaseException as msg:
logging.CyberCPLogFileWriter.writeToFile(str(msg) + " [IO Error with main config file [checkSSLIPv6Listener]]") logging.CyberCPLogFileWriter.writeToFile(
str(msg) + " [IO Error with main config file [checkSSLIPv6Listener]]")
return str(msg) return str(msg)
return 0 return 0
@@ -290,14 +291,14 @@ class sslUtilities:
@staticmethod @staticmethod
def PatchVhostConf(virtualHostName): def PatchVhostConf(virtualHostName):
"""Patch the virtual host configuration to add ACME challenge support """Patch the virtual host configuration to add ACME challenge support
This function adds the necessary configuration to handle ACME challenges This function adds the necessary configuration to handle ACME challenges
for both OpenLiteSpeed (OLS) and Apache configurations. It also checks for both OpenLiteSpeed (OLS) and Apache configurations. It also checks
for potential configuration conflicts before making changes. for potential configuration conflicts before making changes.
Args: Args:
virtualHostName (str): The domain name to configure virtualHostName (str): The domain name to configure
Returns: Returns:
tuple: (status, message) where status is 1 for success, 0 for failure tuple: (status, message) where status is 1 for success, 0 for failure
""" """
@@ -305,12 +306,12 @@ class sslUtilities:
# Construct paths # Construct paths
confPath = os.path.join(sslUtilities.Server_root, "conf", "vhosts", virtualHostName) confPath = os.path.join(sslUtilities.Server_root, "conf", "vhosts", virtualHostName)
completePathToConfigFile = os.path.join(confPath, "vhost.conf") completePathToConfigFile = os.path.join(confPath, "vhost.conf")
# Check if file exists # Check if file exists
if not os.path.exists(completePathToConfigFile): if not os.path.exists(completePathToConfigFile):
logging.CyberCPLogFileWriter.writeToFile(f'Configuration file not found: {completePathToConfigFile}') logging.CyberCPLogFileWriter.writeToFile(f'Configuration file not found: {completePathToConfigFile}')
return 0, f'Configuration file not found: {completePathToConfigFile}' return 0, f'Configuration file not found: {completePathToConfigFile}'
# Read current configuration # Read current configuration
try: try:
with open(completePathToConfigFile, 'r') as f: with open(completePathToConfigFile, 'r') as f:
@@ -318,41 +319,42 @@ class sslUtilities:
except IOError as e: except IOError as e:
logging.CyberCPLogFileWriter.writeToFile(f'Error reading configuration file: {str(e)}') logging.CyberCPLogFileWriter.writeToFile(f'Error reading configuration file: {str(e)}')
return 0, f'Error reading configuration file: {str(e)}' return 0, f'Error reading configuration file: {str(e)}'
# Check for potential conflicts # Check for potential conflicts
conflicts = [] conflicts = []
# Check if ACME challenge is already configured # Check if ACME challenge is already configured
if DataVhost.find('/.well-known/acme-challenge') != -1: if DataVhost.find('/.well-known/acme-challenge') != -1:
logging.CyberCPLogFileWriter.writeToFile(f'ACME challenge already configured for {virtualHostName}') logging.CyberCPLogFileWriter.writeToFile(f'ACME challenge already configured for {virtualHostName}')
return 1, 'ACME challenge already configured' return 1, 'ACME challenge already configured'
# Check for conflicting rewrite rules # Check for conflicting rewrite rules
if DataVhost.find('rewrite') != -1 and DataVhost.find('enable 1') != -1: if DataVhost.find('rewrite') != -1 and DataVhost.find('enable 1') != -1:
conflicts.append('Active rewrite rules found that might interfere with ACME challenges') conflicts.append('Active rewrite rules found that might interfere with ACME challenges')
# Check for conflicting location blocks # Check for conflicting location blocks
if DataVhost.find('location /.well-known') != -1: if DataVhost.find('location /.well-known') != -1:
conflicts.append('Existing location block for /.well-known found') conflicts.append('Existing location block for /.well-known found')
# Check for conflicting aliases # Check for conflicting aliases
if DataVhost.find('Alias /.well-known') != -1: if DataVhost.find('Alias /.well-known') != -1:
conflicts.append('Existing alias for /.well-known found') conflicts.append('Existing alias for /.well-known found')
# Check for conflicting context blocks # Check for conflicting context blocks
if DataVhost.find('context /.well-known') != -1: if DataVhost.find('context /.well-known') != -1:
conflicts.append('Existing context block for /.well-known found') conflicts.append('Existing context block for /.well-known found')
# Check for conflicting access controls # Check for conflicting access controls
if DataVhost.find('deny from all') != -1 and DataVhost.find('location') != -1: if DataVhost.find('deny from all') != -1 and DataVhost.find('location') != -1:
conflicts.append('Global deny rules found that might block ACME challenges') conflicts.append('Global deny rules found that might block ACME challenges')
# If conflicts found, log them and return # If conflicts found, log them and return
if conflicts: if conflicts:
conflict_message = 'Configuration conflicts found: ' + '; '.join(conflicts) conflict_message = 'Configuration conflicts found: ' + '; '.join(conflicts)
logging.CyberCPLogFileWriter.writeToFile(f'Configuration conflicts for {virtualHostName}: {conflict_message}') logging.CyberCPLogFileWriter.writeToFile(
f'Configuration conflicts for {virtualHostName}: {conflict_message}')
return 0, conflict_message return 0, conflict_message
# Create challenge directory if it doesn't exist # Create challenge directory if it doesn't exist
challenge_dir = '/usr/local/lsws/Example/html/.well-known/acme-challenge' challenge_dir = '/usr/local/lsws/Example/html/.well-known/acme-challenge'
try: try:
@@ -362,7 +364,7 @@ class sslUtilities:
except OSError as e: except OSError as e:
logging.CyberCPLogFileWriter.writeToFile(f'Error creating challenge directory: {str(e)}') logging.CyberCPLogFileWriter.writeToFile(f'Error creating challenge directory: {str(e)}')
return 0, f'Error creating challenge directory: {str(e)}' return 0, f'Error creating challenge directory: {str(e)}'
# Handle configuration based on server type # Handle configuration based on server type
if ProcessUtilities.decideServer() == ProcessUtilities.OLS: if ProcessUtilities.decideServer() == ProcessUtilities.OLS:
# OpenLiteSpeed configuration # OpenLiteSpeed configuration
@@ -390,35 +392,37 @@ context /.well-known/acme-challenge {
# Read current configuration # Read current configuration
with open(completePathToConfigFile, 'r') as f: with open(completePathToConfigFile, 'r') as f:
lines = f.readlines() lines = f.readlines()
# Write new configuration # Write new configuration
with open(completePathToConfigFile, 'w') as f: with open(completePathToConfigFile, 'w') as f:
check = 0 check = 0
for line in lines: for line in lines:
f.write(line) f.write(line)
if line.find('DocumentRoot /home/') > -1 and check == 0: if line.find('DocumentRoot /home/') > -1 and check == 0:
f.write(' Alias /.well-known/acme-challenge /usr/local/lsws/Example/html/.well-known/acme-challenge\n') f.write(
' Alias /.well-known/acme-challenge /usr/local/lsws/Example/html/.well-known/acme-challenge\n')
check = 1 check = 1
except IOError as e: except IOError as e:
logging.CyberCPLogFileWriter.writeToFile(f'Error writing Apache configuration: {str(e)}') logging.CyberCPLogFileWriter.writeToFile(f'Error writing Apache configuration: {str(e)}')
return 0, f'Error writing Apache configuration: {str(e)}' return 0, f'Error writing Apache configuration: {str(e)}'
# Restart LiteSpeed # Restart LiteSpeed
try: try:
from plogical import installUtilities from plogical import installUtilities
installUtilities.installUtilities.reStartLiteSpeed() installUtilities.installUtilities.reStartLiteSpeed()
logging.CyberCPLogFileWriter.writeToFile(f'Successfully configured ACME challenge for {virtualHostName}') logging.CyberCPLogFileWriter.writeToFile(
f'Successfully configured ACME challenge for {virtualHostName}')
return 1, 'Successfully configured ACME challenge' return 1, 'Successfully configured ACME challenge'
except Exception as e: except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f'Error restarting LiteSpeed: {str(e)}') logging.CyberCPLogFileWriter.writeToFile(f'Error restarting LiteSpeed: {str(e)}')
return 0, f'Error restarting LiteSpeed: {str(e)}' return 0, f'Error restarting LiteSpeed: {str(e)}'
except Exception as e: except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f'Unexpected error in PatchVhostConf: {str(e)}') logging.CyberCPLogFileWriter.writeToFile(f'Unexpected error in PatchVhostConf: {str(e)}')
return 0, f'Unexpected error: {str(e)}' return 0, f'Unexpected error: {str(e)}'
@staticmethod @staticmethod
def installSSLForDomain(virtualHostName, adminEmail='example@example.org'): def installSSLForDomain(virtualHostName, adminEmail='domain@cyberpanel.net'):
try: try:
website = Websites.objects.get(domain=virtualHostName) website = Websites.objects.get(domain=virtualHostName)
@@ -461,7 +465,7 @@ context /.well-known/acme-challenge {
writeDataToFile.writelines(certFile) writeDataToFile.writelines(certFile)
writeDataToFile.writelines(certChain) writeDataToFile.writelines(certChain)
writeDataToFile.writelines(sslProtocol) writeDataToFile.writelines(sslProtocol)
writeDataToFile.writelines(enableECDHE) writeDataToFile.writelines(enableECDHE)
writeDataToFile.writelines(renegProtection) writeDataToFile.writelines(renegProtection)
writeDataToFile.writelines(sslSessionCache) writeDataToFile.writelines(sslSessionCache)
writeDataToFile.writelines(enableSpdy) writeDataToFile.writelines(enableSpdy)
@@ -500,7 +504,7 @@ context /.well-known/acme-challenge {
writeDataToFile.writelines(certFile) writeDataToFile.writelines(certFile)
writeDataToFile.writelines(certChain) writeDataToFile.writelines(certChain)
writeDataToFile.writelines(sslProtocol) writeDataToFile.writelines(sslProtocol)
writeDataToFile.writelines(enableECDHE) writeDataToFile.writelines(enableECDHE)
writeDataToFile.writelines(renegProtection) writeDataToFile.writelines(renegProtection)
writeDataToFile.writelines(sslSessionCache) writeDataToFile.writelines(sslSessionCache)
writeDataToFile.writelines(enableSpdy) writeDataToFile.writelines(enableSpdy)
@@ -674,6 +678,14 @@ context /.well-known/acme-challenge {
import json import json
import socket import socket
# Replace example.org emails with domain-specific email
if adminEmail and ('example.org' in adminEmail or 'example.com' in adminEmail):
import re
# Remove special characters and create domain-based email
clean_domain = re.sub(r'[^a-zA-Z0-9]', '', virtualHostName)
adminEmail = f'{clean_domain}@cyberpanel.net'
logging.CyberCPLogFileWriter.writeToFile(f'Replacing invalid email with {adminEmail}')
Status = 1 Status = 1
if sslUtilities.CheckIfSSLNeedsToBeIssued(virtualHostName) == sslUtilities.ISSUE_SSL: if sslUtilities.CheckIfSSLNeedsToBeIssued(virtualHostName) == sslUtilities.ISSUE_SSL:
@@ -696,23 +708,27 @@ context /.well-known/acme-challenge {
try: try:
# Start with just the main domain # Start with just the main domain
domains = [virtualHostName] domains = [virtualHostName]
# Check if www subdomain has DNS records before adding it (skip for hostnames) # Check if www subdomain has DNS records before adding it (skip for hostnames)
if not isHostname and sslUtilities.checkDNSRecords(f'www.{virtualHostName}'): if not isHostname and sslUtilities.checkDNSRecords(f'www.{virtualHostName}'):
domains.append(f'www.{virtualHostName}') domains.append(f'www.{virtualHostName}')
logging.CyberCPLogFileWriter.writeToFile(f"www.{virtualHostName} has DNS records, including in SSL request") logging.CyberCPLogFileWriter.writeToFile(
f"www.{virtualHostName} has DNS records, including in SSL request")
elif not isHostname: elif not isHostname:
logging.CyberCPLogFileWriter.writeToFile(f"www.{virtualHostName} has no DNS records, excluding from SSL request") logging.CyberCPLogFileWriter.writeToFile(
f"www.{virtualHostName} has no DNS records, excluding from SSL request")
if aliasDomain: if aliasDomain:
domains.append(aliasDomain) domains.append(aliasDomain)
# Check if www.aliasDomain has DNS records # Check if www.aliasDomain has DNS records
if sslUtilities.checkDNSRecords(f'www.{aliasDomain}'): if sslUtilities.checkDNSRecords(f'www.{aliasDomain}'):
domains.append(f'www.{aliasDomain}') domains.append(f'www.{aliasDomain}')
logging.CyberCPLogFileWriter.writeToFile(f"www.{aliasDomain} has DNS records, including in SSL request") logging.CyberCPLogFileWriter.writeToFile(
f"www.{aliasDomain} has DNS records, including in SSL request")
else: else:
logging.CyberCPLogFileWriter.writeToFile(f"www.{aliasDomain} has no DNS records, excluding from SSL request") logging.CyberCPLogFileWriter.writeToFile(
f"www.{aliasDomain} has no DNS records, excluding from SSL request")
# Check if Cloudflare is used # Check if Cloudflare is used
use_dns = False use_dns = False
try: try:
@@ -721,7 +737,7 @@ context /.well-known/acme-challenge {
use_dns = True use_dns = True
except: except:
pass pass
acme = CustomACME(virtualHostName, adminEmail, staging=False, provider='letsencrypt') acme = CustomACME(virtualHostName, adminEmail, staging=False, provider='letsencrypt')
if acme.issue_certificate(domains, use_dns=use_dns): if acme.issue_certificate(domains, use_dns=use_dns):
logging.CyberCPLogFileWriter.writeToFile( logging.CyberCPLogFileWriter.writeToFile(
@@ -734,7 +750,7 @@ context /.well-known/acme-challenge {
error_details = str(e.__dict__) error_details = str(e.__dict__)
else: else:
error_details = error_msg error_details = error_msg
logging.CyberCPLogFileWriter.writeToFile( logging.CyberCPLogFileWriter.writeToFile(
f"Let's Encrypt failed for {virtualHostName}: {error_msg}" f"Let's Encrypt failed for {virtualHostName}: {error_msg}"
) )
@@ -746,23 +762,27 @@ context /.well-known/acme-challenge {
try: try:
# Start with just the main domain # Start with just the main domain
domains = [virtualHostName] domains = [virtualHostName]
# Check if www subdomain has DNS records before adding it (skip for hostnames) # Check if www subdomain has DNS records before adding it (skip for hostnames)
if not isHostname and sslUtilities.checkDNSRecords(f'www.{virtualHostName}'): if not isHostname and sslUtilities.checkDNSRecords(f'www.{virtualHostName}'):
domains.append(f'www.{virtualHostName}') domains.append(f'www.{virtualHostName}')
logging.CyberCPLogFileWriter.writeToFile(f"www.{virtualHostName} has DNS records, including in SSL request") logging.CyberCPLogFileWriter.writeToFile(
f"www.{virtualHostName} has DNS records, including in SSL request")
elif not isHostname: elif not isHostname:
logging.CyberCPLogFileWriter.writeToFile(f"www.{virtualHostName} has no DNS records, excluding from SSL request") logging.CyberCPLogFileWriter.writeToFile(
f"www.{virtualHostName} has no DNS records, excluding from SSL request")
if aliasDomain: if aliasDomain:
domains.append(aliasDomain) domains.append(aliasDomain)
# Check if www.aliasDomain has DNS records # Check if www.aliasDomain has DNS records
if sslUtilities.checkDNSRecords(f'www.{aliasDomain}'): if sslUtilities.checkDNSRecords(f'www.{aliasDomain}'):
domains.append(f'www.{aliasDomain}') domains.append(f'www.{aliasDomain}')
logging.CyberCPLogFileWriter.writeToFile(f"www.{aliasDomain} has DNS records, including in SSL request") logging.CyberCPLogFileWriter.writeToFile(
f"www.{aliasDomain} has DNS records, including in SSL request")
else: else:
logging.CyberCPLogFileWriter.writeToFile(f"www.{aliasDomain} has no DNS records, excluding from SSL request") logging.CyberCPLogFileWriter.writeToFile(
f"www.{aliasDomain} has no DNS records, excluding from SSL request")
acme = CustomACME(virtualHostName, adminEmail, staging=False, provider='zerossl') acme = CustomACME(virtualHostName, adminEmail, staging=False, provider='zerossl')
if acme.issue_certificate(domains, use_dns=use_dns): if acme.issue_certificate(domains, use_dns=use_dns):
logging.CyberCPLogFileWriter.writeToFile( logging.CyberCPLogFileWriter.writeToFile(
@@ -790,37 +810,41 @@ context /.well-known/acme-challenge {
try: try:
# Build domain list for acme.sh # Build domain list for acme.sh
domain_list = " -d " + virtualHostName domain_list = " -d " + virtualHostName
# Check if www subdomain has DNS records (skip for hostnames) # Check if www subdomain has DNS records (skip for hostnames)
if not isHostname and sslUtilities.checkDNSRecords(f'www.{virtualHostName}'): if not isHostname and sslUtilities.checkDNSRecords(f'www.{virtualHostName}'):
domain_list += " -d www." + virtualHostName domain_list += " -d www." + virtualHostName
logging.CyberCPLogFileWriter.writeToFile(f"www.{virtualHostName} has DNS records, including in acme.sh SSL request") logging.CyberCPLogFileWriter.writeToFile(
f"www.{virtualHostName} has DNS records, including in acme.sh SSL request")
elif not isHostname: elif not isHostname:
logging.CyberCPLogFileWriter.writeToFile(f"www.{virtualHostName} has no DNS records, excluding from acme.sh SSL request") logging.CyberCPLogFileWriter.writeToFile(
f"www.{virtualHostName} has no DNS records, excluding from acme.sh SSL request")
command = acmePath + " --issue" + domain_list \ command = acmePath + " --issue" + domain_list \
+ ' --cert-file ' + existingCertPath + '/cert.pem' + ' --key-file ' + existingCertPath + '/privkey.pem' \ + ' --cert-file ' + existingCertPath + '/cert.pem' + ' --key-file ' + existingCertPath + '/privkey.pem' \
+ ' --fullchain-file ' + existingCertPath + '/fullchain.pem' + ' -w /usr/local/lsws/Example/html -k ec-256 --force --staging' \ + ' --fullchain-file ' + existingCertPath + '/fullchain.pem' + ' -w /usr/local/lsws/Example/html -k ec-256 --force --staging' \
+ ' --webroot-path /usr/local/lsws/Example/html' + ' --webroot-path /usr/local/lsws/Example/html'
try: try:
result = subprocess.run(command, capture_output=True, universal_newlines=True, shell=True) result = subprocess.run(command, capture_output=True, universal_newlines=True, shell=True)
except TypeError: except TypeError:
# Fallback for Python < 3.7 # Fallback for Python < 3.7
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, shell=True) result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True, shell=True)
if result.returncode == 0: if result.returncode == 0:
command = acmePath + " --issue" + domain_list \ command = acmePath + " --issue" + domain_list \
+ ' --cert-file ' + existingCertPath + '/cert.pem' + ' --key-file ' + existingCertPath + '/privkey.pem' \ + ' --cert-file ' + existingCertPath + '/cert.pem' + ' --key-file ' + existingCertPath + '/privkey.pem' \
+ ' --fullchain-file ' + existingCertPath + '/fullchain.pem' + ' -w /usr/local/lsws/Example/html -k ec-256 --force --server letsencrypt' \ + ' --fullchain-file ' + existingCertPath + '/fullchain.pem' + ' -w /usr/local/lsws/Example/html -k ec-256 --force --server letsencrypt' \
+ ' --webroot-path /usr/local/lsws/Example/html' + ' --webroot-path /usr/local/lsws/Example/html'
try: try:
result = subprocess.run(command, capture_output=True, universal_newlines=True, shell=True) result = subprocess.run(command, capture_output=True, universal_newlines=True, shell=True)
except TypeError: except TypeError:
# Fallback for Python < 3.7 # Fallback for Python < 3.7
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, shell=True) result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True, shell=True)
if result.returncode == 0: if result.returncode == 0:
logging.CyberCPLogFileWriter.writeToFile( logging.CyberCPLogFileWriter.writeToFile(
"Successfully obtained SSL for: " + virtualHostName + " and: www." + virtualHostName, 0) "Successfully obtained SSL for: " + virtualHostName + " and: www." + virtualHostName, 0)
@@ -840,18 +864,18 @@ context /.well-known/acme-challenge {
try: try:
# Build domain list for acme.sh with alias domains # Build domain list for acme.sh with alias domains
domain_list = " -d " + virtualHostName domain_list = " -d " + virtualHostName
# Check if www subdomain has DNS records # Check if www subdomain has DNS records
if sslUtilities.checkDNSRecords(f'www.{virtualHostName}'): if sslUtilities.checkDNSRecords(f'www.{virtualHostName}'):
domain_list += " -d www." + virtualHostName domain_list += " -d www." + virtualHostName
# Add alias domain # Add alias domain
domain_list += " -d " + aliasDomain domain_list += " -d " + aliasDomain
# Check if www.aliasDomain has DNS records # Check if www.aliasDomain has DNS records
if sslUtilities.checkDNSRecords(f'www.{aliasDomain}'): if sslUtilities.checkDNSRecords(f'www.{aliasDomain}'):
domain_list += " -d www." + aliasDomain domain_list += " -d www." + aliasDomain
command = acmePath + " --issue" + domain_list \ command = acmePath + " --issue" + domain_list \
+ ' --cert-file ' + existingCertPath + '/cert.pem' + ' --key-file ' + existingCertPath + '/privkey.pem' \ + ' --cert-file ' + existingCertPath + '/cert.pem' + ' --key-file ' + existingCertPath + '/privkey.pem' \
+ ' --fullchain-file ' + existingCertPath + '/fullchain.pem' + ' -w /usr/local/lsws/Example/html -k ec-256 --force --server letsencrypt' + ' --fullchain-file ' + existingCertPath + '/fullchain.pem' + ' -w /usr/local/lsws/Example/html -k ec-256 --force --server letsencrypt'
@@ -860,8 +884,9 @@ context /.well-known/acme-challenge {
result = subprocess.run(command, capture_output=True, universal_newlines=True, shell=True) result = subprocess.run(command, capture_output=True, universal_newlines=True, shell=True)
except TypeError: except TypeError:
# Fallback for Python < 3.7 # Fallback for Python < 3.7
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, shell=True) result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True, shell=True)
if result.returncode == 0: if result.returncode == 0:
return 1 return 1
return 0 return 0
@@ -893,35 +918,37 @@ def issueSSLForDomain(domain, adminEmail, sslpath, aliasDomain=None, isHostname=
logging.CyberCPLogFileWriter.writeToFile(f"Certificate for {domain} expires in {diff.days} days") logging.CyberCPLogFileWriter.writeToFile(f"Certificate for {domain} expires in {diff.days} days")
except Exception as e: except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f"Could not check certificate expiry: {str(e)}") logging.CyberCPLogFileWriter.writeToFile(f"Could not check certificate expiry: {str(e)}")
logging.CyberCPLogFileWriter.writeToFile(f"Certificate exists for {domain}, attempting renewal...") logging.CyberCPLogFileWriter.writeToFile(f"Certificate exists for {domain}, attempting renewal...")
# Try to renew using acme.sh # Try to renew using acme.sh
acmePath = '/root/.acme.sh/acme.sh' acmePath = '/root/.acme.sh/acme.sh'
if os.path.exists(acmePath): if os.path.exists(acmePath):
# First set the webroot path for the domain # First set the webroot path for the domain
command = f'{acmePath} --update-account --accountemail {adminEmail}' command = f'{acmePath} --update-account --accountemail {adminEmail}'
subprocess.call(command, shell=True) subprocess.call(command, shell=True)
# Build domain list for renewal # Build domain list for renewal
renewal_domains = f'-d {domain}' renewal_domains = f'-d {domain}'
if not isHostname and sslUtilities.checkDNSRecords(f'www.{domain}'): if not isHostname and sslUtilities.checkDNSRecords(f'www.{domain}'):
renewal_domains += f' -d www.{domain}' renewal_domains += f' -d www.{domain}'
# For expired certificates, use --issue --force instead of --renew # For expired certificates, use --issue --force instead of --renew
if is_expired: if is_expired:
logging.CyberCPLogFileWriter.writeToFile(f"Certificate is expired, using --issue --force for {domain}") logging.CyberCPLogFileWriter.writeToFile(
f"Certificate is expired, using --issue --force for {domain}")
command = f'{acmePath} --issue {renewal_domains} --webroot /usr/local/lsws/Example/html --force' command = f'{acmePath} --issue {renewal_domains} --webroot /usr/local/lsws/Example/html --force'
else: else:
# Try to renew with explicit webroot # Try to renew with explicit webroot
command = f'{acmePath} --renew {renewal_domains} --webroot /usr/local/lsws/Example/html --force' command = f'{acmePath} --renew {renewal_domains} --webroot /usr/local/lsws/Example/html --force'
try: try:
result = subprocess.run(command, capture_output=True, text=True, shell=True) result = subprocess.run(command, capture_output=True, text=True, shell=True)
except TypeError: except TypeError:
# Fallback for Python < 3.7 # Fallback for Python < 3.7
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, shell=True) result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True, shell=True)
if result.returncode == 0: if result.returncode == 0:
logging.CyberCPLogFileWriter.writeToFile(f"Successfully renewed SSL for {domain}") logging.CyberCPLogFileWriter.writeToFile(f"Successfully renewed SSL for {domain}")
if sslUtilities.installSSLForDomain(domain, adminEmail) == 1: if sslUtilities.installSSLForDomain(domain, adminEmail) == 1:
@@ -932,7 +959,7 @@ def issueSSLForDomain(domain, adminEmail, sslpath, aliasDomain=None, isHostname=
error_details = sslUtilities.parseACMEError(error_output) error_details = sslUtilities.parseACMEError(error_output)
logging.CyberCPLogFileWriter.writeToFile(f"Renewal failed for {domain}. Error: {error_details}") logging.CyberCPLogFileWriter.writeToFile(f"Renewal failed for {domain}. Error: {error_details}")
logging.CyberCPLogFileWriter.writeToFile(f"Full error output: {error_output}") logging.CyberCPLogFileWriter.writeToFile(f"Full error output: {error_output}")
if sslUtilities.obtainSSLForADomain(domain, adminEmail, sslpath, aliasDomain, isHostname) == 1: if sslUtilities.obtainSSLForADomain(domain, adminEmail, sslpath, aliasDomain, isHostname) == 1:
if sslUtilities.installSSLForDomain(domain, adminEmail) == 1: if sslUtilities.installSSLForDomain(domain, adminEmail) == 1:
return [1, "None"] return [1, "None"]
@@ -948,13 +975,16 @@ def issueSSLForDomain(domain, adminEmail, sslpath, aliasDomain=None, isHostname=
if os.path.exists(pathToStoreSSLFullChain): if os.path.exists(pathToStoreSSLFullChain):
import OpenSSL import OpenSSL
x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, open(pathToStoreSSLFullChain, 'r').read()) x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
open(pathToStoreSSLFullChain, 'r').read())
SSLProvider = x509.get_issuer().get_components()[1][1].decode('utf-8') SSLProvider = x509.get_issuer().get_components()[1][1].decode('utf-8')
if SSLProvider != 'Denial': if SSLProvider != 'Denial':
if sslUtilities.installSSLForDomain(domain) == 1: if sslUtilities.installSSLForDomain(domain) == 1:
logging.CyberCPLogFileWriter.writeToFile("We are not able to get new SSL for " + domain + ". But there is an existing SSL, it might only be for the main domain (excluding www).") logging.CyberCPLogFileWriter.writeToFile(
return [1, "We are not able to get new SSL for " + domain + ". But there is an existing SSL, it might only be for the main domain (excluding www)." + " [issueSSLForDomain]"] "We are not able to get new SSL for " + domain + ". But there is an existing SSL, it might only be for the main domain (excluding www).")
return [1,
"We are not able to get new SSL for " + domain + ". But there is an existing SSL, it might only be for the main domain (excluding www)." + " [issueSSLForDomain]"]
command = 'openssl req -newkey rsa:2048 -new -nodes -x509 -days 3650 -subj "/C=US/ST=Denial/L=Springfield/O=Dis/CN=' + domain + '" -keyout ' + pathToStoreSSLPrivKey + ' -out ' + pathToStoreSSLFullChain command = 'openssl req -newkey rsa:2048 -new -nodes -x509 -days 3650 -subj "/C=US/ST=Denial/L=Springfield/O=Dis/CN=' + domain + '" -keyout ' + pathToStoreSSLPrivKey + ' -out ' + pathToStoreSSLFullChain
cmd = shlex.split(command) cmd = shlex.split(command)

View File

@@ -10,6 +10,7 @@ import socket
from plogical.acl import ACLManager from plogical.acl import ACLManager
from plogical.processUtilities import ProcessUtilities from plogical.processUtilities import ProcessUtilities
try: try:
from websiteFunctions.models import ChildDomains, Websites from websiteFunctions.models import ChildDomains, Websites
except: except:
@@ -17,7 +18,6 @@ except:
class sslUtilities: class sslUtilities:
Server_root = "/usr/local/lsws" Server_root = "/usr/local/lsws"
redisConf = '/usr/local/lsws/conf/dvhost_redis.conf' redisConf = '/usr/local/lsws/conf/dvhost_redis.conf'
@@ -56,8 +56,7 @@ class sslUtilities:
logging.CyberCPLogFileWriter.writeToFile(str(msg) + " [IO Error with main config file [checkSSLListener]]") logging.CyberCPLogFileWriter.writeToFile(str(msg) + " [IO Error with main config file [checkSSLListener]]")
return str(msg) return str(msg)
return 0 return 0
@staticmethod @staticmethod
def checkSSLIPv6Listener(): def checkSSLIPv6Listener():
try: try:
@@ -67,7 +66,8 @@ class sslUtilities:
return 1 return 1
except BaseException as msg: except BaseException as msg:
logging.CyberCPLogFileWriter.writeToFile(str(msg) + " [IO Error with main config file [checkSSLIPv6Listener]]") logging.CyberCPLogFileWriter.writeToFile(
str(msg) + " [IO Error with main config file [checkSSLIPv6Listener]]")
return str(msg) return str(msg)
return 0 return 0
@@ -84,7 +84,7 @@ class sslUtilities:
return [0, "347 " + str(msg) + " [issueSSLForDomain]"] return [0, "347 " + str(msg) + " [issueSSLForDomain]"]
@staticmethod @staticmethod
def installSSLForDomain(virtualHostName, adminEmail='example@example.org'): def installSSLForDomain(virtualHostName, adminEmail='domain@cyberpanel.net'):
try: try:
website = Websites.objects.get(domain=virtualHostName) website = Websites.objects.get(domain=virtualHostName)
@@ -127,7 +127,7 @@ class sslUtilities:
writeDataToFile.writelines(certFile) writeDataToFile.writelines(certFile)
writeDataToFile.writelines(certChain) writeDataToFile.writelines(certChain)
writeDataToFile.writelines(sslProtocol) writeDataToFile.writelines(sslProtocol)
writeDataToFile.writelines(enableECDHE) writeDataToFile.writelines(enableECDHE)
writeDataToFile.writelines(renegProtection) writeDataToFile.writelines(renegProtection)
writeDataToFile.writelines(sslSessionCache) writeDataToFile.writelines(sslSessionCache)
writeDataToFile.writelines(enableSpdy) writeDataToFile.writelines(enableSpdy)
@@ -166,7 +166,7 @@ class sslUtilities:
writeDataToFile.writelines(certFile) writeDataToFile.writelines(certFile)
writeDataToFile.writelines(certChain) writeDataToFile.writelines(certChain)
writeDataToFile.writelines(sslProtocol) writeDataToFile.writelines(sslProtocol)
writeDataToFile.writelines(enableECDHE) writeDataToFile.writelines(enableECDHE)
writeDataToFile.writelines(renegProtection) writeDataToFile.writelines(renegProtection)
writeDataToFile.writelines(sslSessionCache) writeDataToFile.writelines(sslSessionCache)
writeDataToFile.writelines(enableSpdy) writeDataToFile.writelines(enableSpdy)
@@ -332,7 +332,6 @@ class sslUtilities:
ProcessUtilities.executioner(command) ProcessUtilities.executioner(command)
return 1 return 1
@staticmethod @staticmethod
def FindIfDomainInCloudflare(virtualHostName): def FindIfDomainInCloudflare(virtualHostName):
try: try:
@@ -403,6 +402,14 @@ class sslUtilities:
@staticmethod @staticmethod
def obtainSSLForADomain(virtualHostName, adminEmail, sslpath, aliasDomain=None): def obtainSSLForADomain(virtualHostName, adminEmail, sslpath, aliasDomain=None):
# Replace example.org emails with domain-specific email
if adminEmail and ('example.org' in adminEmail or 'example.com' in adminEmail):
import re
# Remove special characters and create domain-based email
clean_domain = re.sub(r'[^a-zA-Z0-9]', '', virtualHostName)
adminEmail = f'{clean_domain}@cyberpanel.net'
logging.CyberCPLogFileWriter.writeToFile(f'Replacing invalid email with {adminEmail}')
sender_email = 'root@%s' % (socket.gethostname()) sender_email = 'root@%s' % (socket.gethostname())
CF_Check = 0 CF_Check = 0
@@ -419,7 +426,6 @@ class sslUtilities:
if SSLProvider != 'Denial': if SSLProvider != 'Denial':
return 1, 'This domain already have a valid SSL.' return 1, 'This domain already have a valid SSL.'
CF_Check, message = sslUtilities.FindIfDomainInCloudflare(virtualHostName) CF_Check, message = sslUtilities.FindIfDomainInCloudflare(virtualHostName)
DNS_TO_USE = '' DNS_TO_USE = ''
@@ -456,8 +462,8 @@ class sslUtilities:
command = acmePath + f" --issue -d {virtualHostName} -d *.{virtualHostName}" \ command = acmePath + f" --issue -d {virtualHostName} -d *.{virtualHostName}" \
+ ' --cert-file ' + existingCertPath + '/cert.pem' + ' --key-file ' + existingCertPath + '/privkey.pem' \ + ' --cert-file ' + existingCertPath + '/cert.pem' + ' --key-file ' + existingCertPath + '/privkey.pem' \
+ ' --fullchain-file ' + existingCertPath + '/fullchain.pem' + f' --dns {DNS_TO_USE} -k ec-256 --force --server letsencrypt --dnssleep 20' + ' --fullchain-file ' + existingCertPath + '/fullchain.pem' + f' --dns {DNS_TO_USE} -k ec-256 --force --server letsencrypt --dnssleep 20'
#ResultText = open(logging.CyberCPLogFileWriter.fileName, 'r').read() # ResultText = open(logging.CyberCPLogFileWriter.fileName, 'r').read()
#CurrentMessage = "Trying to obtain SSL for: " + virtualHostName + " and: www." + virtualHostName # CurrentMessage = "Trying to obtain SSL for: " + virtualHostName + " and: www." + virtualHostName
# logging.CyberCPLogFileWriter.writeToFile(CurrentMessage, 0) # logging.CyberCPLogFileWriter.writeToFile(CurrentMessage, 0)
logging.CyberCPLogFileWriter.writeToFile(command, 0) logging.CyberCPLogFileWriter.writeToFile(command, 0)
@@ -480,7 +486,7 @@ class sslUtilities:
+ '/cert.pem' + ' --key-file ' + existingCertPath + '/privkey.pem' \ + '/cert.pem' + ' --key-file ' + existingCertPath + '/privkey.pem' \
+ ' --fullchain-file ' + existingCertPath + '/fullchain.pem' + f' --dns {DNS_TO_USE} -k ec-256 --force --server letsencrypt --dnssleep 20' + ' --fullchain-file ' + existingCertPath + '/fullchain.pem' + f' --dns {DNS_TO_USE} -k ec-256 --force --server letsencrypt --dnssleep 20'
#ResultText = open(logging.CyberCPLogFileWriter.fileName, 'r').read() # ResultText = open(logging.CyberCPLogFileWriter.fileName, 'r').read()
CurrentMessage = '%s\nTrying to obtain SSL for: %s' % (finalText, virtualHostName) CurrentMessage = '%s\nTrying to obtain SSL for: %s' % (finalText, virtualHostName)
finalText = '%s\nTrying to obtain SSL for: %s' % (finalText, virtualHostName) finalText = '%s\nTrying to obtain SSL for: %s' % (finalText, virtualHostName)
@@ -494,8 +500,10 @@ class sslUtilities:
'SSL Notification for %s.' % (virtualHostName)) 'SSL Notification for %s.' % (virtualHostName))
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
logging.CyberCPLogFileWriter.writeToFile('Failed to obtain SSL, issuing self-signed SSL for: ' + virtualHostName, 0) logging.CyberCPLogFileWriter.writeToFile(
logging.CyberCPLogFileWriter.SendEmail(sender_email, adminEmail, 'Failed to obtain SSL, issuing self-signed SSL for: ' + virtualHostName, 'Failed to obtain SSL, issuing self-signed SSL for: ' + virtualHostName, 0)
logging.CyberCPLogFileWriter.SendEmail(sender_email, adminEmail,
'Failed to obtain SSL, issuing self-signed SSL for: ' + virtualHostName,
'SSL Notification for %s.' % (virtualHostName)) 'SSL Notification for %s.' % (virtualHostName))
return 0, output return 0, output
else: else:
@@ -510,7 +518,7 @@ class sslUtilities:
"Trying to obtain SSL for: " + virtualHostName + ", www." + virtualHostName + ", " + aliasDomain + " and www." + aliasDomain + ",") "Trying to obtain SSL for: " + virtualHostName + ", www." + virtualHostName + ", " + aliasDomain + " and www." + aliasDomain + ",")
command = acmePath + " --issue -d " + virtualHostName + " -d www." + virtualHostName \ command = acmePath + " --issue -d " + virtualHostName + " -d www." + virtualHostName \
+ ' -d ' + aliasDomain + ' -d www.' + aliasDomain\ + ' -d ' + aliasDomain + ' -d www.' + aliasDomain \
+ ' --cert-file ' + existingCertPath + '/cert.pem' + ' --key-file ' + existingCertPath + '/privkey.pem' \ + ' --cert-file ' + existingCertPath + '/cert.pem' + ' --key-file ' + existingCertPath + '/privkey.pem' \
+ ' --fullchain-file ' + existingCertPath + '/fullchain.pem' + f' --dns {DNS_TO_USE} -k ec-256 --force --server letsencrypt --dnssleep 20' + ' --fullchain-file ' + existingCertPath + '/fullchain.pem' + f' --dns {DNS_TO_USE} -k ec-256 --force --server letsencrypt --dnssleep 20'

File diff suppressed because it is too large Load Diff