custom acme implementation + file manager improvements

This commit is contained in:
usmannasir
2025-04-23 02:59:42 +05:00
parent 41ec3afdd7
commit a0629fae50
4 changed files with 1692 additions and 391 deletions

View File

@@ -1,176 +1,610 @@
#logo{ :root {
width: 25%; /* Modern color palette */
--primary-color: #4f46e5;
--primary-hover: #4338ca;
--secondary-color: #6b7280;
--background-light: #f9fafb;
--background-lighter: #ffffff;
--text-primary: #111827;
--text-secondary: #4b5563;
--border-color: #e5e7eb;
--success-color: #10b981;
--danger-color: #ef4444;
--warning-color: #f59e0b;
--hover-bg: #f3f4f6;
--logo-bg: #f8fafc;
/* Enhanced shadows */
--card-shadow: 0 1px 3px 0 rgb(0 0 0 / 0.1), 0 1px 2px -1px rgb(0 0 0 / 0.1);
--card-shadow-hover: 0 20px 25px -5px rgb(0 0 0 / 0.1), 0 8px 10px -6px rgb(0 0 0 / 0.1);
--button-shadow: 0 2px 4px 0 rgb(0 0 0 / 0.05);
--button-shadow-hover: 0 4px 6px -1px rgb(0 0 0 / 0.1), 0 2px 4px -2px rgb(0 0 0 / 0.1);
/* Modern transitions */
--transition-base: all 0.2s cubic-bezier(0.4, 0, 0.2, 1);
--transition-smooth: all 0.3s cubic-bezier(0.4, 0, 0.2, 1);
--transition-bounce: all 0.3s cubic-bezier(0.34, 1.56, 0.64, 1);
} }
/*#navBar{ body {
background: -moz-linear-gradient(#a4dbf5, #8cc5e0); color: var(--text-primary);
background: -webkit-linear-gradient(#a4dbf5, #8cc5e0); font-family: 'Inter', -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, "Helvetica Neue", Arial, sans-serif;
background: -o-linear-gradient(#a4dbf5, #8cc5e0); background-color: var(--background-light);
}*/ line-height: 1.6;
}
/* Enhanced Navbar Styling */
#navBar { #navBar {
background: #0daeff; /* Old browsers */ background: var(--background-lighter);
background: -moz-linear-gradient(-45deg, #0daeff 0%,#3939ad 30%); /* FF3.6-15 */ box-shadow: var(--card-shadow);
background: -webkit-linear-gradient(-45deg, #0daeff 0%,#3939ad 30%); /* Chrome10-25,Safari5.1-6 */ padding: 1.25rem;
background: linear-gradient(-45deg, #0daeff 0%,#3939ad 30%); /* W3C, IE10+, FF16+, Chrome26+, Opera12+, Safari7+ */ border-bottom: 1px solid var(--border-color);
filter: progid:DXImageTransform.Microsoft.gradient( startColorstr='#3939ad', endColorstr='#0daeff',GradientType=1 ); /* IE6-9 fallback on horizontal gradient */ position: sticky;
} top: 0;
.navbar-brand { z-index: 100;
margin: 0 1rem 0 1rem; backdrop-filter: blur(12px);
} -webkit-backdrop-filter: blur(12px);
#mainRow{
margin: 1%;
} }
#tableHead{ /* Modern Header Logo */
background: -moz-linear-gradient(#a4dbf5, #8cc5e0); .header-logo {
background: -webkit-linear-gradient(#a4dbf5, #8cc5e0); display: flex;
background: -o-linear-gradient(#a4dbf5, #8cc5e0); align-items: center;
gap: 1rem;
padding: 0.875rem 1.25rem;
background: var(--logo-bg);
border-radius: 1.25rem;
transition: var(--transition-bounce);
} }
#uploadBoxLabel,#htmlEditorLable{ .header-logo:hover {
background: -moz-linear-gradient(#a4dbf5, #8cc5e0); transform: translateY(-2px);
background: -webkit-linear-gradient(#a4dbf5, #8cc5e0); box-shadow: var(--card-shadow-hover);
background: -o-linear-gradient(#a4dbf5, #8cc5e0);
} }
.my-drop-zone { .header-logo img {
border: dotted 3px lightgray; height: 32px;
margin-bottom: 2%; width: auto;
filter: brightness(0.8) contrast(1.2);
} }
#queueProg{ .header-logo a {
margin-bottom: 2%; display: flex;
align-items: center;
gap: 0.75rem;
text-decoration: none;
} }
#htmlEditorContent{ .header-logo span {
width: 100%; font-size: 1.125rem;
height: 500px; font-weight: 500;
color: var(--text-primary);
} }
#htmlEditorStyles{ /* Top Actions Bar */
margin-bottom: 2%;
margin-top: 2%;
}
.flex-wrap { .flex-wrap {
padding: 0.5rem;
background: var(--background-lighter);
border-radius: 0.5rem;
box-shadow: var(--card-shadow);
margin: 0.5rem 0;
}
.nav {
display: flex; display: flex;
flex-wrap: wrap; flex-wrap: wrap;
} gap: 0.5rem;
.mt-5 { padding: 0.5rem;
margin-top: 5px !important;
}
.mt-10 {
margin-top: 10px;
}
.mt-20 {
margin-top: 20px;
}
.mt-30 {
margin-top: 30px;
}
.mr-10 {
margin-right: 10px;
}
.mb-10 {
margin-bottom: 10px;
}
.ml-10 {
margin-left: 10px;
}
.my-10 {
margin-top: 10px;
margin-bottom: 10px;
} }
.mx-5 { /* Enhanced Action Buttons */
margin-left: 5px; .nav-item .nav-link {
margin-right: 5px; display: inline-flex;
} align-items: center;
.mx-10 { padding: 0.75rem 1.25rem;
margin-left: 10px; color: var(--text-secondary);
margin-right: 10px; background: var(--background-lighter);
} border: 1.5px solid var(--border-color);
.header-logo { border-radius: 0.75rem;
width: 315px; transition: var(--transition-bounce);
/* text-align: center;*/ font-size: 0.875rem;
font-size: 16px; font-weight: 600;
float: left;
position: relative;
}
a.nav-link {
color: #add8e6;
}
a.nav-link:hover {
color: #E4F2F7;
}
.point-events {
pointer-events: all;
}
.card-header {
padding: .75rem 1.25rem;
margin-bottom: 0;
background-color: transparent;
border-bottom: none;
}
.form-control {
padding: 0 .5rem;
border: 1px solid #eeeeee;
color: #777;
font-size: .95em;
}
.form-control[readonly] {
background-color: transparent;
}
a {
color: #6C6CA4;
text-decoration: none; text-decoration: none;
background-color: transparent; box-shadow: var(--button-shadow);
-webkit-text-decoration-skip: objects;
}
a:hover {
color: #8989B6;
text-decoration: none;
background-color: transparent;
-webkit-text-decoration-skip: objects;
} }
#tableHead { .nav-item .nav-link:hover {
background: #8989B6; color: var(--primary-color);
color: #E1E1EC; border-color: var(--primary-color);
background: var(--background-lighter);
transform: translateY(-2px);
box-shadow: var(--button-shadow-hover);
} }
.table td, .table th {
padding: .15em; .nav-item .nav-link i {
vertical-align: top; margin-right: 0.5rem;
border-top: 1px solid #e9ecef; font-size: 1rem;
} }
.table thead th {
vertical-align: bottom; /* File Manager Layout */
border-bottom: 1px solid #e9ecef; .row {
font-weight: 400; margin: 0;
background: var(--background-lighter);
}
/* Modern File List Section */
.table {
background: var(--background-lighter);
border-radius: 1.25rem;
box-shadow: var(--card-shadow);
overflow: hidden;
border: 1.5px solid var(--border-color);
margin-top: 1.5rem;
}
#tableHead th {
background: var(--background-light);
padding: 1.25rem 1.5rem;
font-size: 0.75rem;
font-weight: 600;
text-transform: uppercase;
letter-spacing: 0.1em;
color: var(--text-secondary);
border-bottom: 1.5px solid var(--border-color);
}
.table tbody tr {
transition: var(--transition-smooth);
}
.table tbody tr:hover {
background: var(--hover-bg);
transform: translateY(-1px);
box-shadow: var(--button-shadow);
} }
.table td { .table td {
font-size: 14px; padding: 1.25rem 1.5rem;
color: #666666; vertical-align: middle;
color: var(--text-secondary);
font-size: 0.875rem;
border-bottom: 1px solid var(--border-color);
} }
.list-group-item {
padding: .2em 1.25rem; /* Enhanced File Actions */
.file-actions {
display: flex;
gap: 0.75rem;
opacity: 0;
transition: var(--transition-smooth);
}
.file-action-btn {
padding: 0.625rem;
border-radius: 0.75rem;
border: 1.5px solid var(--border-color);
background: var(--background-lighter);
color: var(--text-secondary);
transition: var(--transition-bounce);
box-shadow: var(--button-shadow);
}
.file-action-btn:hover {
color: var(--primary-color);
border-color: var(--primary-color);
background: var(--background-lighter);
transform: translateY(-2px);
box-shadow: var(--button-shadow-hover);
}
/* Modern Tree View */
.tree-item {
display: flex;
align-items: center;
padding: 0.75rem 1rem;
border-radius: 0.75rem;
cursor: pointer;
transition: var(--transition-smooth);
margin: 0.25rem 0;
}
.tree-item:hover {
background: var(--hover-bg);
transform: translateX(4px);
}
.tree-item i {
margin-right: 0.5rem;
font-size: 1rem;
}
/* File List */
.file-list {
background: var(--background-lighter);
border-radius: 0.5rem;
margin: 1rem;
}
.file-list-header {
display: flex;
align-items: center;
padding: 0.75rem 1rem;
background: var(--background-light);
border-bottom: 1px solid var(--border-color);
border-top-left-radius: 0.5rem;
border-top-right-radius: 0.5rem;
}
.file-item {
display: flex;
align-items: center;
padding: 0.5rem 1rem;
border-bottom: 1px solid var(--border-color);
transition: all 0.15s ease;
}
.file-item:hover {
background: var(--hover-bg);
}
.file-item-icon {
margin-right: 0.75rem;
color: var(--secondary-color);
}
.file-item-name {
flex: 1;
font-size: 0.875rem;
color: var(--text-primary);
}
.file-item-size {
width: 100px;
text-align: right;
font-size: 0.875rem;
color: var(--text-secondary);
}
.file-item-date {
width: 150px;
text-align: right;
font-size: 0.875rem;
color: var(--text-secondary);
}
.file-item-permissions {
width: 100px;
text-align: right;
font-family: monospace;
font-size: 0.875rem;
color: var(--text-secondary);
}
/* Enhanced Path Navigation */
.path-nav {
display: flex;
align-items: center;
gap: 0.75rem;
padding: 1rem 1.5rem;
background: var(--background-light);
border-bottom: 1.5px solid var(--border-color);
border-radius: 1rem 1rem 0 0;
}
.path-nav-item {
display: inline-flex;
align-items: center;
padding: 0.5rem 1rem;
font-size: 0.875rem;
color: var(--text-secondary);
border-radius: 0.75rem;
transition: var(--transition-bounce);
font-weight: 500;
}
.path-nav-item:hover {
color: var(--primary-color);
background: var(--hover-bg);
transform: translateY(-1px);
}
.path-nav-separator {
color: var(--border-color);
}
/* Selection Styles */
.selected {
background: var(--hover-bg);
border-color: var(--primary-color);
}
/* Modern Current Path Display */
#currentRPath {
font-size: 0.875rem;
color: var(--text-secondary);
background-color: var(--background-light);
border: 1.5px solid var(--border-color);
padding: 1rem 1.25rem;
border-radius: 1rem;
margin-bottom: 1.5rem;
transition: var(--transition-bounce);
box-shadow: var(--button-shadow);
}
#currentRPath:hover {
border-color: var(--primary-color);
transform: translateY(-1px);
box-shadow: var(--button-shadow-hover);
}
/* Enhanced File Icons */
i.fa.fa-folder {
color: var(--warning-color) !important;
margin-right: 0.75rem;
font-size: 1.25rem;
transition: var(--transition-smooth);
} }
i.fa.fa-file { i.fa.fa-file {
color: #6C6CA4 !important; color: var(--secondary-color) !important;
margin-right: 0.75rem;
font-size: 1.25rem;
transition: var(--transition-smooth);
} }
i.fa.fa-minus {
color: #6C6CA4 !important; /* Modern Permissions Display */
.permissions {
font-family: 'SF Mono', 'Roboto Mono', monospace;
font-size: 0.875rem;
color: var(--text-secondary);
background: var(--background-light);
padding: 0.375rem 0.75rem;
border-radius: 0.5rem;
border: 1px solid var(--border-color);
} }
i.fa.fa-plus {
color: #6C6CA4 !important; /* Enhanced Navigation Actions */
.nav-actions {
display: flex;
align-items: center;
gap: 1rem;
padding: 1.25rem;
background: var(--background-lighter);
border-bottom: 1.5px solid var(--border-color);
border-radius: 1.25rem 1.25rem 0 0;
} }
.list-group-item {
background-color: transparent; .nav-action {
display: inline-flex;
align-items: center;
justify-content: center;
padding: 0.75rem 1.5rem;
color: var(--text-secondary);
font-size: 0.875rem;
text-decoration: none;
border-radius: 0.75rem;
transition: var(--transition-bounce);
border: 1.5px solid var(--border-color);
background: var(--background-lighter);
min-width: 130px;
font-weight: 600;
box-shadow: var(--button-shadow);
} }
.bg-lightgray {
background: #F9F9FA; .nav-action:hover {
color: var(--primary-color);
background: var(--background-lighter);
border-color: var(--primary-color);
transform: translateY(-2px);
box-shadow: var(--button-shadow-hover);
}
.nav-action i {
margin-right: 0.5rem;
font-size: 1rem;
}
.nav-action.home-action {
color: var(--primary-color);
background: var(--background-lighter);
border-color: var(--primary-color);
}
.nav-action.home-action:hover {
background: rgba(37, 99, 235, 0.1);
}
.nav-action.back-action i,
.nav-action.refresh-action i {
margin-right: 0.25rem;
}
.nav-action.select-action,
.nav-action.unselect-action {
background: var(--background-light);
border: 1px solid var(--border-color);
}
.nav-action.select-action:hover,
.nav-action.unselect-action:hover {
background: var(--hover-bg);
border-color: var(--primary-color);
color: var(--primary-color);
}
/* File Table Header */
.file-table-header {
display: grid;
grid-template-columns: minmax(200px, 2fr) minmax(100px, 1fr) minmax(150px, 1fr) minmax(100px, 1fr);
gap: 1rem;
padding: 0.75rem 1rem;
background: var(--background-light);
border-bottom: 1px solid var(--border-color);
}
.file-table-header th {
color: var(--text-secondary);
font-weight: 600;
font-size: 0.75rem;
text-transform: uppercase;
letter-spacing: 0.05em;
}
/* File Table Row */
.file-table-row {
display: grid;
grid-template-columns: minmax(200px, 2fr) minmax(100px, 1fr) minmax(150px, 1fr) minmax(100px, 1fr);
gap: 1rem;
padding: 0.75rem 1rem;
border-bottom: 1px solid var(--border-color);
transition: background-color 0.15s ease;
}
.file-table-row:hover {
background-color: var(--hover-bg);
}
.file-name-cell {
display: flex;
align-items: center;
gap: 0.5rem;
}
.file-icon {
color: var(--warning-color);
font-size: 1rem;
}
.file-name {
color: var(--text-primary);
font-size: 0.875rem;
}
.file-size,
.file-date,
.file-permissions {
color: var(--text-secondary);
font-size: 0.875rem;
}
.file-permissions {
font-family: monospace;
}
/* Responsive Enhancements */
@media (max-width: 768px) {
.nav-item .nav-link {
padding: 0.625rem 1rem;
}
#treeView {
height: auto;
max-height: 350px;
}
.table td, .table th {
padding: 1rem;
}
.file-actions {
opacity: 1;
}
.nav-action {
min-width: auto;
padding: 0.625rem 1rem;
}
}
/* File Manager Navigation */
.file-manager-nav {
display: flex;
align-items: center;
gap: 0.75rem;
padding: 0.75rem 1rem;
background: var(--background-lighter);
border-bottom: 1px solid var(--border-color);
}
.file-manager-nav a {
display: inline-flex;
align-items: center;
padding: 0.5rem 1rem;
font-size: 0.875rem;
color: var(--text-secondary);
text-decoration: none;
border: 1px solid var(--border-color);
border-radius: 0.375rem;
transition: all 0.15s ease;
background: var(--background-lighter);
}
.file-manager-nav a:hover {
color: var(--primary-color);
border-color: var(--primary-color);
background: var(--hover-bg);
}
.file-manager-nav a i {
margin-right: 0.5rem;
}
.file-manager-nav a.home-link {
color: var(--primary-color);
border-color: var(--primary-color);
}
.file-manager-nav a.home-link:hover {
background: rgba(37, 99, 235, 0.1);
}
/* Table Header */
.table-header {
display: grid;
grid-template-columns: minmax(300px, 2fr) minmax(100px, 1fr) minmax(150px, 1fr) minmax(120px, 1fr);
padding: 0.75rem 1rem;
background: var(--background-light);
border-bottom: 1px solid var(--border-color);
font-weight: 500;
color: var(--text-secondary);
}
.table-header div {
font-size: 0.875rem;
text-transform: uppercase;
letter-spacing: 0.05em;
}
.col-sm-9 .nav {
display: flex;
align-items: center;
gap: 0.5rem;
padding: 0.75rem;
background: var(--background-lighter);
border-bottom: 1px solid var(--border-color);
margin-bottom: 0 !important;
}
.col-sm-9 .nav .nav-item a {
display: inline-flex;
align-items: center;
padding: 0.5rem 1rem;
color: var(--text-secondary);
text-decoration: none;
border: 1px solid var(--border-color);
border-radius: 0.375rem;
transition: all 0.15s ease;
background: var(--background-lighter);
margin: 0 !important;
}
.col-sm-9 .nav .nav-item a:hover {
color: var(--primary-color);
border-color: var(--primary-color);
background: var(--hover-bg);
text-decoration: none;
}
.col-sm-9 .nav .nav-item a i {
margin-right: 0.5rem;
} }

938
plogical/customAcme.py Normal file
View File

@@ -0,0 +1,938 @@
import json
import os
import time
import requests
import base64
import hashlib
import logging
from cryptography import x509
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import padding
import OpenSSL
from plogical import CyberCPLogFileWriter as logging
from plogical.processUtilities import ProcessUtilities
import socket
class CustomACME:
def __init__(self, domain, admin_email, staging=False):
"""Initialize CustomACME"""
logging.CyberCPLogFileWriter.writeToFile(
f'Initializing CustomACME for domain: {domain}, email: {admin_email}, staging: {staging}')
self.domain = domain
self.admin_email = admin_email
self.staging = staging
# Set the ACME directory URL based on staging flag
if staging:
self.acme_directory = "https://acme-staging-v02.api.letsencrypt.org/directory"
logging.CyberCPLogFileWriter.writeToFile('Using staging ACME directory')
else:
self.acme_directory = "https://acme-v02.api.letsencrypt.org/directory"
logging.CyberCPLogFileWriter.writeToFile('Using production ACME directory')
self.account_key = None
self.account_url = None
self.directory = None
self.nonce = None
self.order_url = None
self.authorizations = []
self.finalize_url = None
self.certificate_url = None
# Initialize paths
self.cert_path = f'/etc/letsencrypt/live/{domain}'
self.challenge_path = '/usr/local/lsws/Example/html/.well-known/acme-challenge'
self.account_key_path = f'/etc/letsencrypt/accounts/{domain}.key'
logging.CyberCPLogFileWriter.writeToFile(
f'Certificate path: {self.cert_path}, Challenge path: {self.challenge_path}')
# Create accounts directory if it doesn't exist
os.makedirs('/etc/letsencrypt/accounts', exist_ok=True)
def _generate_account_key(self):
"""Generate RSA account key"""
try:
logging.CyberCPLogFileWriter.writeToFile('Generating RSA account key...')
key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
self.account_key = key
logging.CyberCPLogFileWriter.writeToFile('Successfully generated RSA account key')
return True
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f'Error generating account key: {str(e)}')
return False
def _get_directory(self):
"""Get ACME directory"""
try:
logging.CyberCPLogFileWriter.writeToFile(f'Fetching ACME directory from {self.acme_directory}')
response = requests.get(self.acme_directory)
self.directory = response.json()
logging.CyberCPLogFileWriter.writeToFile(
f'Successfully fetched ACME directory: {json.dumps(self.directory)}')
return True
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f'Error getting directory: {str(e)}')
return False
def _get_nonce(self):
"""Get new nonce from ACME server"""
try:
logging.CyberCPLogFileWriter.writeToFile('Getting new nonce...')
response = requests.head(self.directory['newNonce'])
self.nonce = response.headers['Replay-Nonce']
logging.CyberCPLogFileWriter.writeToFile(f'Successfully got nonce: {self.nonce}')
return True
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f'Error getting nonce: {str(e)}')
return False
def _create_jws(self, payload, url):
"""Create JWS (JSON Web Signature)"""
try:
logging.CyberCPLogFileWriter.writeToFile(f'Creating JWS for URL: {url}')
if payload is not None:
logging.CyberCPLogFileWriter.writeToFile(f'Payload: {json.dumps(payload)}')
# Get a fresh nonce for this request
if not self._get_nonce():
logging.CyberCPLogFileWriter.writeToFile('Failed to get fresh nonce')
return None
# Get the private key numbers
logging.CyberCPLogFileWriter.writeToFile('Getting private key numbers...')
private_numbers = self.account_key.private_numbers()
public_numbers = private_numbers.public_numbers
# Convert numbers to bytes
logging.CyberCPLogFileWriter.writeToFile('Converting RSA numbers to bytes...')
n_bytes = public_numbers.n.to_bytes((public_numbers.n.bit_length() + 7) // 8, 'big')
e_bytes = public_numbers.e.to_bytes((public_numbers.e.bit_length() + 7) // 8, 'big')
# Create JWK
logging.CyberCPLogFileWriter.writeToFile('Creating JWK...')
jwk_key = {
"kty": "RSA",
"n": base64.urlsafe_b64encode(n_bytes).decode('utf-8').rstrip('='),
"e": base64.urlsafe_b64encode(e_bytes).decode('utf-8').rstrip('='),
"alg": "RS256"
}
logging.CyberCPLogFileWriter.writeToFile(f'Created JWK: {json.dumps(jwk_key)}')
# Create protected header
protected = {
"alg": "RS256",
"url": url,
"nonce": self.nonce
}
# Add either JWK or Key ID based on whether we have an account URL
if self.account_url and url != self.directory['newAccount']:
protected["kid"] = self.account_url
logging.CyberCPLogFileWriter.writeToFile(f'Using Key ID: {self.account_url}')
else:
protected["jwk"] = jwk_key
logging.CyberCPLogFileWriter.writeToFile('Using JWK for new account')
# Encode protected header
logging.CyberCPLogFileWriter.writeToFile('Encoding protected header...')
protected_b64 = base64.urlsafe_b64encode(
json.dumps(protected).encode('utf-8')
).decode('utf-8').rstrip('=')
# For POST-as-GET requests, payload_b64 should be empty string
if payload is None:
payload_b64 = ""
logging.CyberCPLogFileWriter.writeToFile('Using empty payload for POST-as-GET request')
else:
# Encode payload
logging.CyberCPLogFileWriter.writeToFile('Encoding payload...')
payload_b64 = base64.urlsafe_b64encode(
json.dumps(payload).encode('utf-8')
).decode('utf-8').rstrip('=')
# Create signature input
logging.CyberCPLogFileWriter.writeToFile('Creating signature input...')
signature_input = f"{protected_b64}.{payload_b64}".encode('utf-8')
# Sign the input
logging.CyberCPLogFileWriter.writeToFile('Signing input...')
signature = self.account_key.sign(
signature_input,
padding.PKCS1v15(),
hashes.SHA256()
)
# Encode signature
logging.CyberCPLogFileWriter.writeToFile('Encoding signature...')
signature_b64 = base64.urlsafe_b64encode(signature).decode('utf-8').rstrip('=')
# Create final JWS
logging.CyberCPLogFileWriter.writeToFile('Creating final JWS...')
jws = {
"protected": protected_b64,
"signature": signature_b64
}
# Only add payload if it exists
if payload is not None:
jws["payload"] = payload_b64
# Ensure the JWS is properly formatted
jws_str = json.dumps(jws, separators=(',', ':'))
logging.CyberCPLogFileWriter.writeToFile(f'Final JWS: {jws_str}')
return jws_str
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f'Error creating JWS: {str(e)}')
return None
def _load_account_key(self):
"""Load existing account key if available"""
try:
if os.path.exists(self.account_key_path):
logging.CyberCPLogFileWriter.writeToFile('Loading existing account key...')
with open(self.account_key_path, 'rb') as f:
key_data = f.read()
self.account_key = serialization.load_pem_private_key(
key_data,
password=None,
backend=default_backend()
)
logging.CyberCPLogFileWriter.writeToFile('Successfully loaded existing account key')
return True
return False
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f'Error loading account key: {str(e)}')
return False
def _save_account_key(self):
"""Save account key for future use"""
try:
logging.CyberCPLogFileWriter.writeToFile('Saving account key...')
key_data = self.account_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
with open(self.account_key_path, 'wb') as f:
f.write(key_data)
logging.CyberCPLogFileWriter.writeToFile('Successfully saved account key')
return True
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f'Error saving account key: {str(e)}')
return False
def _create_account(self):
"""Create new ACME account"""
try:
logging.CyberCPLogFileWriter.writeToFile('Creating new ACME account...')
payload = {
"termsOfServiceAgreed": True,
"contact": [f"mailto:{self.admin_email}"]
}
jws = self._create_jws(payload, self.directory['newAccount'])
if not jws:
logging.CyberCPLogFileWriter.writeToFile('Failed to create JWS for account creation')
return False
logging.CyberCPLogFileWriter.writeToFile('Sending account creation request...')
headers = {
'Content-Type': 'application/jose+json'
}
response = requests.post(self.directory['newAccount'], data=jws, headers=headers)
logging.CyberCPLogFileWriter.writeToFile(f'Account creation response status: {response.status_code}')
logging.CyberCPLogFileWriter.writeToFile(f'Account creation response: {response.text}')
if response.status_code == 201:
self.account_url = response.headers['Location']
logging.CyberCPLogFileWriter.writeToFile(
f'Successfully created account. Account URL: {self.account_url}')
# Save the account key for future use
self._save_account_key()
return True
elif response.status_code == 429:
logging.CyberCPLogFileWriter.writeToFile(
'Rate limit hit for account creation. Using staging environment...')
self.staging = True
self.acme_directory = "https://acme-staging-v02.api.letsencrypt.org/directory"
# Get new directory and nonce for staging
if not self._get_directory():
return False
if not self._get_nonce():
return False
# Try one more time with staging
return self._create_account()
elif response.status_code == 400 and "badNonce" in response.text:
logging.CyberCPLogFileWriter.writeToFile('Bad nonce, getting new nonce and retrying...')
if not self._get_nonce():
return False
return self._create_account()
return False
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f'Error creating account: {str(e)}')
return False
def _create_order(self, domains):
"""Create new order for domains"""
try:
logging.CyberCPLogFileWriter.writeToFile(f'Creating new order for domains: {domains}')
identifiers = [{"type": "dns", "value": domain} for domain in domains]
payload = {
"identifiers": identifiers
}
jws = self._create_jws(payload, self.directory['newOrder'])
if not jws:
logging.CyberCPLogFileWriter.writeToFile('Failed to create JWS for order creation')
return False
logging.CyberCPLogFileWriter.writeToFile('Sending order creation request...')
headers = {
'Content-Type': 'application/jose+json'
}
response = requests.post(self.directory['newOrder'], data=jws, headers=headers)
logging.CyberCPLogFileWriter.writeToFile(f'Order creation response status: {response.status_code}')
logging.CyberCPLogFileWriter.writeToFile(f'Order creation response: {response.text}')
if response.status_code == 201:
self.order_url = response.headers['Location']
self.authorizations = response.json()['authorizations']
self.finalize_url = response.json()['finalize']
logging.CyberCPLogFileWriter.writeToFile(f'Successfully created order. Order URL: {self.order_url}')
logging.CyberCPLogFileWriter.writeToFile(f'Authorizations: {self.authorizations}')
logging.CyberCPLogFileWriter.writeToFile(f'Finalize URL: {self.finalize_url}')
return True
return False
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f'Error creating order: {str(e)}')
return False
def _handle_http_challenge(self, challenge):
"""Handle HTTP-01 challenge"""
try:
logging.CyberCPLogFileWriter.writeToFile(f'Handling HTTP challenge: {json.dumps(challenge)}')
# Get key authorization
key_auth = self._get_key_authorization(challenge)
if not key_auth:
logging.CyberCPLogFileWriter.writeToFile('Failed to get key authorization')
return False
# Create challenge directory if it doesn't exist
if not os.path.exists(self.challenge_path):
logging.CyberCPLogFileWriter.writeToFile(f'Creating challenge directory: {self.challenge_path}')
os.makedirs(self.challenge_path)
# Write challenge file
challenge_file = os.path.join(self.challenge_path, challenge['token'])
logging.CyberCPLogFileWriter.writeToFile(f'Writing challenge file: {challenge_file}')
# Write only the key authorization to the file
with open(challenge_file, 'w') as f:
f.write(key_auth)
logging.CyberCPLogFileWriter.writeToFile('Successfully handled HTTP challenge')
return True
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f'Error handling HTTP challenge: {str(e)}')
return False
def _handle_dns_challenge(self, challenge):
"""Handle DNS-01 challenge (Cloudflare)"""
try:
logging.CyberCPLogFileWriter.writeToFile(f'Handling DNS challenge: {json.dumps(challenge)}')
# This is a placeholder - implement Cloudflare API integration
# You'll need to add your Cloudflare API credentials and implementation
pass
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f'Error handling DNS challenge: {str(e)}')
return False
def _get_key_authorization(self, challenge):
"""Get key authorization for challenge"""
try:
logging.CyberCPLogFileWriter.writeToFile('Getting key authorization...')
# Get the private key numbers
private_numbers = self.account_key.private_numbers()
public_numbers = private_numbers.public_numbers
# Convert numbers to bytes
n_bytes = public_numbers.n.to_bytes((public_numbers.n.bit_length() + 7) // 8, 'big')
e_bytes = public_numbers.e.to_bytes((public_numbers.e.bit_length() + 7) // 8, 'big')
# Create JWK without alg field
jwk_key = {
"kty": "RSA",
"n": base64.urlsafe_b64encode(n_bytes).decode('utf-8').rstrip('='),
"e": base64.urlsafe_b64encode(e_bytes).decode('utf-8').rstrip('=')
}
# Calculate the JWK thumbprint according to RFC 7638
# The thumbprint is a hash of the JWK (JSON Web Key) in a specific format
# First, we create a dictionary with the required JWK parameters
jwk = {
"e": base64.urlsafe_b64encode(public_numbers.e.to_bytes(3, 'big')).decode('utf-8').rstrip('='),
"kty": "RSA", # Key type
"n": base64.urlsafe_b64encode(public_numbers.n.to_bytes(256, 'big')).decode('utf-8').rstrip('=')
}
# Sort the JWK parameters alphabetically by key name
# This ensures consistent thumbprint calculation regardless of parameter order
sorted_jwk = json.dumps(jwk, sort_keys=True, separators=(',', ':'))
# Calculate the SHA-256 hash of the sorted JWK
# Example of what sorted_jwk might look like:
# {"e":"AQAB","kty":"RSA","n":"tVKUtcx_n9rt5afY_2WFNVAu9fjD4xqX4Xm3dJz3XYb"}
# The thumbprint will be a 32-byte SHA-256 hash of this string
# For example, it might look like: b'x\x9c\x1d\x8f\x8b\x1b\x1e\x8b\x1b\x1e\x8b\x1b\x1e\x8b\x1b\x1e'
thumbprint = hashlib.sha256(sorted_jwk.encode('utf-8')).digest()
# Encode the thumbprint in base64url format (RFC 4648)
# This removes padding characters (=) and replaces + and / with - and _
# Example final thumbprint: "xJ0dj8sbHosbHosbHosbHos"
thumbprint = base64.urlsafe_b64encode(thumbprint).decode('utf-8').rstrip('=')
# Combine token and key authorization
key_auth = f"{challenge['token']}.{thumbprint}"
logging.CyberCPLogFileWriter.writeToFile(f'Key authorization: {key_auth}')
return key_auth
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f'Error getting key authorization: {str(e)}')
return None
def _verify_challenge(self, challenge_url):
"""Verify challenge completion with the ACME server
This function sends a POST request to the ACME server to verify that the challenge
has been completed successfully. The challenge URL is provided by the ACME server
when the challenge is created.
Example challenge_url:
"https://acme-v02.api.letsencrypt.org/acme/challenge/example.com/123456"
The verification process:
1. Creates an empty payload (POST-as-GET request)
2. Creates a JWS (JSON Web Signature) with the payload
3. Sends the request to the ACME server
4. Checks the response status
Returns:
bool: True if challenge is verified successfully, False otherwise
"""
try:
logging.CyberCPLogFileWriter.writeToFile(f'Verifying challenge at URL: {challenge_url}')
# Create empty payload for POST-as-GET request
# This is a special type of request where we want to GET a resource
# but need to include a signature, so we use POST with an empty payload
payload = {}
# Create JWS (JSON Web Signature) for the request
# Example JWS might look like:
# {
# "protected": "eyJhbGciOiJSUzI1NiIsIm5vbmNlIjoiMTIzNDU2Nzg5MCIsInVybCI6Imh0dHBzOi8vYWNtZS12MDIuYXBpLmxldHNlbmNyeXB0Lm9yZy9hY21lL2NoYWxsZW5nZS9leGFtcGxlLmNvbS8xMjM0NTYifQ",
# "signature": "c2lnbmF0dXJlX2hlcmU",
# "payload": ""
# }
jws = self._create_jws(payload, challenge_url)
if not jws:
logging.CyberCPLogFileWriter.writeToFile('Failed to create JWS for challenge verification')
return False
logging.CyberCPLogFileWriter.writeToFile('Sending challenge verification request...')
# Set headers for the request
# Content-Type: application/jose+json indicates we're sending a JWS
headers = {
'Content-Type': 'application/jose+json'
}
# Send the verification request to the ACME server
# Example response might look like:
# {
# "type": "http-01",
# "status": "valid",
# "validated": "2024-03-20T12:00:00Z",
# "url": "https://acme-v02.api.letsencrypt.org/acme/challenge/example.com/123456"
# }
response = requests.post(challenge_url, data=jws, headers=headers)
logging.CyberCPLogFileWriter.writeToFile(f'Challenge verification response status: {response.status_code}')
logging.CyberCPLogFileWriter.writeToFile(f'Challenge verification response: {response.text}')
# Check if the challenge was verified successfully
# Status code 200 indicates success
# The response will contain the challenge status and validation time
if response.status_code == 200:
logging.CyberCPLogFileWriter.writeToFile('Successfully verified challenge')
return True
return False
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f'Error verifying challenge: {str(e)}')
return False
def _finalize_order(self, csr):
"""Finalize order and get certificate"""
try:
logging.CyberCPLogFileWriter.writeToFile('Finalizing order...')
payload = {
"csr": base64.urlsafe_b64encode(csr).decode('utf-8').rstrip('=')
}
jws = self._create_jws(payload, self.finalize_url)
if not jws:
logging.CyberCPLogFileWriter.writeToFile('Failed to create JWS for order finalization')
return False
logging.CyberCPLogFileWriter.writeToFile('Sending order finalization request...')
headers = {
'Content-Type': 'application/jose+json'
}
response = requests.post(self.finalize_url, data=jws, headers=headers)
logging.CyberCPLogFileWriter.writeToFile(f'Order finalization response status: {response.status_code}')
logging.CyberCPLogFileWriter.writeToFile(f'Order finalization response: {response.text}')
if response.status_code == 200:
# Wait for order to be processed
max_attempts = 30
delay = 2
for attempt in range(max_attempts):
if not self._get_nonce():
logging.CyberCPLogFileWriter.writeToFile('Failed to get nonce for order status check')
return False
response = requests.get(self.order_url, headers=headers)
logging.CyberCPLogFileWriter.writeToFile(f'Order status check response: {response.text}')
if response.status_code == 200:
order_status = response.json().get('status')
if order_status == 'valid':
self.certificate_url = response.json().get('certificate')
logging.CyberCPLogFileWriter.writeToFile(
f'Successfully finalized order. Certificate URL: {self.certificate_url}')
return True
elif order_status == 'invalid':
logging.CyberCPLogFileWriter.writeToFile('Order validation failed')
return False
elif order_status == 'processing':
logging.CyberCPLogFileWriter.writeToFile(
f'Order still processing, attempt {attempt + 1}/{max_attempts}')
time.sleep(delay)
continue
logging.CyberCPLogFileWriter.writeToFile(
f'Order status check failed, attempt {attempt + 1}/{max_attempts}')
time.sleep(delay)
logging.CyberCPLogFileWriter.writeToFile('Order processing timed out')
return False
return False
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f'Error finalizing order: {str(e)}')
return False
def _download_certificate(self):
"""Download certificate from ACME server"""
try:
logging.CyberCPLogFileWriter.writeToFile('Downloading certificate...')
logging.CyberCPLogFileWriter.writeToFile(f'Certificate URL: {self.certificate_url}')
# For certificate downloads, we can use a simple GET request
response = requests.get(self.certificate_url)
logging.CyberCPLogFileWriter.writeToFile(f'Certificate download response status: {response.status_code}')
logging.CyberCPLogFileWriter.writeToFile(f'Certificate download response headers: {response.headers}')
logging.CyberCPLogFileWriter.writeToFile(f'Certificate download response content: {response.text}')
if response.status_code == 200:
logging.CyberCPLogFileWriter.writeToFile('Successfully downloaded certificate')
return response.content
return None
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f'Error downloading certificate: {str(e)}')
return None
def _wait_for_challenge_validation(self, challenge_url, max_attempts=30, delay=2):
"""Wait for challenge to be validated by the ACME server"""
try:
logging.CyberCPLogFileWriter.writeToFile(f'Waiting for challenge validation at URL: {challenge_url}')
for attempt in range(max_attempts):
if not self._get_nonce():
logging.CyberCPLogFileWriter.writeToFile('Failed to get nonce for challenge status check')
return False
headers = {
'Content-Type': 'application/jose+json'
}
response = requests.get(challenge_url, headers=headers)
logging.CyberCPLogFileWriter.writeToFile(f'Challenge status check response: {response.text}')
if response.status_code == 200:
challenge_status = response.json().get('status')
if challenge_status == 'valid':
logging.CyberCPLogFileWriter.writeToFile('Challenge validated successfully')
return True
elif challenge_status == 'invalid':
logging.CyberCPLogFileWriter.writeToFile('Challenge validation failed')
return False
logging.CyberCPLogFileWriter.writeToFile(
f'Challenge still pending, attempt {attempt + 1}/{max_attempts}')
time.sleep(delay)
logging.CyberCPLogFileWriter.writeToFile('Challenge validation timed out')
return False
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f'Error waiting for challenge validation: {str(e)}')
return False
def _check_dns_record(self, domain):
"""Check if a domain has valid DNS records
This function performs multiple DNS checks to ensure the domain has valid DNS records.
It includes:
1. A record (IPv4) check
2. AAAA record (IPv6) check
3. DNS caching prevention
4. Multiple DNS server checks
Args:
domain (str): The domain to check
Returns:
bool: True if valid DNS records are found, False otherwise
"""
try:
logging.CyberCPLogFileWriter.writeToFile(f'Checking DNS records for domain: {domain}')
# List of public DNS servers to check against
dns_servers = [
'8.8.8.8', # Google DNS
'1.1.1.1', # Cloudflare DNS
'208.67.222.222' # OpenDNS
]
# Function to check DNS record with specific DNS server
def check_with_dns_server(server, record_type='A'):
try:
# Create a new socket for each check
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(5) # 5 second timeout
# Set the DNS server
sock.connect((server, 53))
# Create DNS query
query = bytearray()
# DNS header
query += b'\x00\x01' # Transaction ID
query += b'\x01\x00' # Flags: Standard query
query += b'\x00\x01' # Questions: 1
query += b'\x00\x00' # Answer RRs: 0
query += b'\x00\x00' # Authority RRs: 0
query += b'\x00\x00' # Additional RRs: 0
# Domain name
for part in domain.split('.'):
query.append(len(part))
query.extend(part.encode())
query += b'\x00' # End of domain name
# Query type and class
if record_type == 'A':
query += b'\x00\x01' # Type: A
else: # AAAA
query += b'\x00\x1c' # Type: AAAA
query += b'\x00\x01' # Class: IN
# Send query
sock.send(query)
# Receive response
response = sock.recv(1024)
# Check if we got a valid response
if len(response) > 12: # Minimum DNS response size
# Check if there are answers in the response
answer_count = int.from_bytes(response[6:8], 'big')
if answer_count > 0:
return True
return False
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f'Error checking DNS with server {server}: {str(e)}')
return False
finally:
sock.close()
# Check A records (IPv4) with multiple DNS servers
a_record_found = False
for server in dns_servers:
if check_with_dns_server(server, 'A'):
a_record_found = True
break
# Check AAAA records (IPv6) with multiple DNS servers
aaaa_record_found = False
for server in dns_servers:
if check_with_dns_server(server, 'AAAA'):
aaaa_record_found = True
break
# Also check with system's DNS resolver as a fallback
try:
# Try to resolve A record (IPv4)
socket.gethostbyname(domain)
a_record_found = True
except socket.gaierror:
pass
try:
# Try to resolve AAAA record (IPv6)
socket.getaddrinfo(domain, None, socket.AF_INET6)
aaaa_record_found = True
except socket.gaierror:
pass
# Log the results
if a_record_found:
logging.CyberCPLogFileWriter.writeToFile(f'IPv4 DNS record found for domain: {domain}')
if aaaa_record_found:
logging.CyberCPLogFileWriter.writeToFile(f'IPv6 DNS record found for domain: {domain}')
# Return True if either A or AAAA record is found
return a_record_found or aaaa_record_found
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f'Error checking DNS records: {str(e)}')
return False
def _wait_for_order_processing(self, max_attempts=30, delay=2):
"""Wait for order to be processed"""
try:
logging.CyberCPLogFileWriter.writeToFile('Waiting for order processing...')
for attempt in range(max_attempts):
if not self._get_nonce():
logging.CyberCPLogFileWriter.writeToFile('Failed to get nonce for order status check')
return False
headers = {
'Content-Type': 'application/jose+json'
}
response = requests.get(self.order_url, headers=headers)
logging.CyberCPLogFileWriter.writeToFile(f'Order status check response: {response.text}')
if response.status_code == 200:
order_status = response.json().get('status')
if order_status == 'valid':
self.certificate_url = response.json().get('certificate')
logging.CyberCPLogFileWriter.writeToFile('Order validated successfully')
return True
elif order_status == 'invalid':
logging.CyberCPLogFileWriter.writeToFile('Order validation failed')
return False
elif order_status == 'processing':
logging.CyberCPLogFileWriter.writeToFile(
f'Order still processing, attempt {attempt + 1}/{max_attempts}')
time.sleep(delay)
continue
logging.CyberCPLogFileWriter.writeToFile(
f'Order status check failed, attempt {attempt + 1}/{max_attempts}')
time.sleep(delay)
logging.CyberCPLogFileWriter.writeToFile('Order processing timed out')
return False
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f'Error waiting for order processing: {str(e)}')
return False
def issue_certificate(self, domains, use_dns=False):
"""Main method to issue certificate"""
try:
logging.CyberCPLogFileWriter.writeToFile(
f'Starting certificate issuance for domains: {domains}, use_dns: {use_dns}')
# Try to load existing account key first
if self._load_account_key():
logging.CyberCPLogFileWriter.writeToFile('Using existing account key')
else:
logging.CyberCPLogFileWriter.writeToFile('No existing account key found, will create new one')
# Filter domains to only include those with valid DNS records
valid_domains = []
for domain in domains:
if self._check_dns_record(domain):
valid_domains.append(domain)
else:
logging.CyberCPLogFileWriter.writeToFile(f'Skipping domain {domain} due to missing DNS records')
if not valid_domains:
logging.CyberCPLogFileWriter.writeToFile('No valid domains found with DNS records')
return False
# Initialize ACME
logging.CyberCPLogFileWriter.writeToFile('Step 1: Generating account key')
if not self._generate_account_key():
logging.CyberCPLogFileWriter.writeToFile('Failed to generate account key')
return False
logging.CyberCPLogFileWriter.writeToFile('Step 2: Getting ACME directory')
if not self._get_directory():
logging.CyberCPLogFileWriter.writeToFile('Failed to get ACME directory')
return False
logging.CyberCPLogFileWriter.writeToFile('Step 3: Getting nonce')
if not self._get_nonce():
logging.CyberCPLogFileWriter.writeToFile('Failed to get nonce')
return False
logging.CyberCPLogFileWriter.writeToFile('Step 4: Creating account')
if not self._create_account():
logging.CyberCPLogFileWriter.writeToFile('Failed to create account')
# If we failed to create account and we're not in staging, try staging
if not self.staging:
logging.CyberCPLogFileWriter.writeToFile('Switching to staging environment...')
self.staging = True
self.acme_directory = "https://acme-staging-v02.api.letsencrypt.org/directory"
if not self._get_directory():
return False
if not self._get_nonce():
return False
if not self._create_account():
return False
else:
return False
# Create order with only valid domains
logging.CyberCPLogFileWriter.writeToFile('Step 5: Creating order')
if not self._create_order(valid_domains):
logging.CyberCPLogFileWriter.writeToFile('Failed to create order')
return False
# Handle challenges
logging.CyberCPLogFileWriter.writeToFile('Step 6: Handling challenges')
for auth_url in self.authorizations:
logging.CyberCPLogFileWriter.writeToFile(f'Processing authorization URL: {auth_url}')
if not self._get_nonce():
logging.CyberCPLogFileWriter.writeToFile('Failed to get nonce for authorization')
return False
# Get authorization details with GET request
headers = {
'Content-Type': 'application/jose+json'
}
response = requests.get(auth_url, headers=headers)
logging.CyberCPLogFileWriter.writeToFile(f'Authorization response status: {response.status_code}')
logging.CyberCPLogFileWriter.writeToFile(f'Authorization response: {response.text}')
if response.status_code != 200:
logging.CyberCPLogFileWriter.writeToFile('Failed to get authorization')
return False
challenges = response.json()['challenges']
for challenge in challenges:
logging.CyberCPLogFileWriter.writeToFile(f'Processing challenge: {json.dumps(challenge)}')
# Only handle the challenge type we're using
if use_dns and challenge['type'] == 'dns-01':
if not self._handle_dns_challenge(challenge):
logging.CyberCPLogFileWriter.writeToFile('Failed to handle DNS challenge')
return False
if not self._verify_challenge(challenge['url']):
logging.CyberCPLogFileWriter.writeToFile('Failed to verify DNS challenge')
return False
if not self._wait_for_challenge_validation(challenge['url']):
logging.CyberCPLogFileWriter.writeToFile('DNS challenge validation failed')
return False
elif not use_dns and challenge['type'] == 'http-01':
if not self._handle_http_challenge(challenge):
logging.CyberCPLogFileWriter.writeToFile('Failed to handle HTTP challenge')
return False
if not self._verify_challenge(challenge['url']):
logging.CyberCPLogFileWriter.writeToFile('Failed to verify HTTP challenge')
return False
if not self._wait_for_challenge_validation(challenge['url']):
logging.CyberCPLogFileWriter.writeToFile('HTTP challenge validation failed')
return False
else:
logging.CyberCPLogFileWriter.writeToFile(f'Skipping {challenge["type"]} challenge')
# Generate CSR
logging.CyberCPLogFileWriter.writeToFile('Step 7: Generating CSR')
key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
# Get the domain from the order response
order_response = requests.get(self.order_url, headers=headers).json()
order_domains = [identifier['value'] for identifier in order_response['identifiers']]
logging.CyberCPLogFileWriter.writeToFile(f'Order domains: {order_domains}')
# Create CSR with exactly the domains from the order
csr = x509.CertificateSigningRequestBuilder().subject_name(
x509.Name([
x509.NameAttribute(x509.NameOID.COMMON_NAME, order_domains[0])
])
).add_extension(
x509.SubjectAlternativeName([
x509.DNSName(domain) for domain in order_domains
]),
critical=False
).sign(key, hashes.SHA256(), default_backend())
# Finalize order
logging.CyberCPLogFileWriter.writeToFile('Step 8: Finalizing order')
if not self._finalize_order(csr.public_bytes(serialization.Encoding.DER)):
logging.CyberCPLogFileWriter.writeToFile('Failed to finalize order')
return False
# Wait for order processing
logging.CyberCPLogFileWriter.writeToFile('Step 9: Waiting for order processing')
if not self._wait_for_order_processing():
logging.CyberCPLogFileWriter.writeToFile('Failed to process order')
return False
# Download certificate
logging.CyberCPLogFileWriter.writeToFile('Step 10: Downloading certificate')
certificate = self._download_certificate()
if not certificate:
logging.CyberCPLogFileWriter.writeToFile('Failed to download certificate')
return False
# Save certificate
logging.CyberCPLogFileWriter.writeToFile('Step 11: Saving certificate')
if not os.path.exists(self.cert_path):
logging.CyberCPLogFileWriter.writeToFile(f'Creating certificate directory: {self.cert_path}')
os.makedirs(self.cert_path)
cert_file = os.path.join(self.cert_path, 'fullchain.pem')
key_file = os.path.join(self.cert_path, 'privkey.pem')
logging.CyberCPLogFileWriter.writeToFile(f'Saving certificate to: {cert_file}')
with open(cert_file, 'wb') as f:
f.write(certificate)
logging.CyberCPLogFileWriter.writeToFile(f'Saving private key to: {key_file}')
with open(key_file, 'wb') as f:
f.write(key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
))
logging.CyberCPLogFileWriter.writeToFile('Successfully completed certificate issuance')
return True
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f'Error issuing certificate: {str(e)}')
return False

View File

@@ -6,6 +6,7 @@ import shlex
import subprocess import subprocess
import socket import socket
from plogical.processUtilities import ProcessUtilities from plogical.processUtilities import ProcessUtilities
try: try:
from websiteFunctions.models import ChildDomains, Websites from websiteFunctions.models import ChildDomains, Websites
except: except:
@@ -14,7 +15,6 @@ from plogical.acl import ACLManager
class sslUtilities: class sslUtilities:
Server_root = "/usr/local/lsws" Server_root = "/usr/local/lsws"
redisConf = '/usr/local/lsws/conf/dvhost_redis.conf' redisConf = '/usr/local/lsws/conf/dvhost_redis.conf'
@@ -48,7 +48,6 @@ class sslUtilities:
except BaseException as msg: except BaseException as msg:
return 0, str(msg) return 0, str(msg)
@staticmethod @staticmethod
def CheckIfSSLNeedsToBeIssued(virtualHostName): def CheckIfSSLNeedsToBeIssued(virtualHostName):
#### if website already have an SSL, better not issue again - need to check for wild-card #### if website already have an SSL, better not issue again - need to check for wild-card
@@ -61,7 +60,6 @@ class sslUtilities:
if os.path.exists(ProcessUtilities.debugPath): if os.path.exists(ProcessUtilities.debugPath):
logging.CyberCPLogFileWriter.writeToFile(f'SSL provider for {virtualHostName} is {SSLProvider}.') logging.CyberCPLogFileWriter.writeToFile(f'SSL provider for {virtualHostName} is {SSLProvider}.')
#### totally seprate check to see if both non-www and www are covered #### totally seprate check to see if both non-www and www are covered
if SSLProvider == "(STAGING) Let's Encrypt": if SSLProvider == "(STAGING) Let's Encrypt":
@@ -73,7 +71,8 @@ class sslUtilities:
if len(domains) > 1: if len(domains) > 1:
### need further checks here to see if ssl is valid for less then 15 days etc ### need further checks here to see if ssl is valid for less then 15 days etc
logging.CyberCPLogFileWriter.writeToFile( logging.CyberCPLogFileWriter.writeToFile(
'[CheckIfSSLNeedsToBeIssued] SSL exists for %s and both versions are covered, just need to ensure if SSL is valid for less then 15 days.' % (virtualHostName), 0) '[CheckIfSSLNeedsToBeIssued] SSL exists for %s and both versions are covered, just need to ensure if SSL is valid for less then 15 days.' % (
virtualHostName), 0)
pass pass
else: else:
return sslUtilities.ISSUE_SSL return sslUtilities.ISSUE_SSL
@@ -86,7 +85,7 @@ class sslUtilities:
now = datetime.now() now = datetime.now()
diff = finalDate - now diff = finalDate - now
if int(diff.days) >= 15 and SSLProvider!='Denial': if int(diff.days) >= 15 and SSLProvider != 'Denial':
logging.CyberCPLogFileWriter.writeToFile( logging.CyberCPLogFileWriter.writeToFile(
'[CheckIfSSLNeedsToBeIssued] SSL exists for %s and is not ready to fetch new SSL., skipping..' % ( '[CheckIfSSLNeedsToBeIssued] SSL exists for %s and is not ready to fetch new SSL., skipping..' % (
virtualHostName), 0) virtualHostName), 0)
@@ -145,7 +144,6 @@ class sslUtilities:
return str(msg) return str(msg)
return 0 return 0
@staticmethod @staticmethod
def checkSSLIPv6Listener(): def checkSSLIPv6Listener():
try: try:
@@ -155,7 +153,8 @@ class sslUtilities:
return 1 return 1
except BaseException as msg: except BaseException as msg:
logging.CyberCPLogFileWriter.writeToFile(str(msg) + " [IO Error with main config file [checkSSLIPv6Listener]]") logging.CyberCPLogFileWriter.writeToFile(
str(msg) + " [IO Error with main config file [checkSSLIPv6Listener]]")
return str(msg) return str(msg)
return 0 return 0
@@ -173,57 +172,136 @@ class sslUtilities:
@staticmethod @staticmethod
def PatchVhostConf(virtualHostName): def PatchVhostConf(virtualHostName):
"""Patch the virtual host configuration to add ACME challenge support
This function adds the necessary configuration to handle ACME challenges
for both OpenLiteSpeed (OLS) and Apache configurations. It also checks
for potential configuration conflicts before making changes.
Args:
virtualHostName (str): The domain name to configure
Returns:
tuple: (status, message) where status is 1 for success, 0 for failure
"""
try: try:
confPath = sslUtilities.Server_root + "/conf/vhosts/" + virtualHostName # Construct paths
completePathToConfigFile = confPath + "/vhost.conf" confPath = os.path.join(sslUtilities.Server_root, "conf", "vhosts", virtualHostName)
completePathToConfigFile = os.path.join(confPath, "vhost.conf")
DataVhost = open(completePathToConfigFile, 'r').read() # Check if file exists
if not os.path.exists(completePathToConfigFile):
logging.CyberCPLogFileWriter.writeToFile(f'Configuration file not found: {completePathToConfigFile}')
return 0, f'Configuration file not found: {completePathToConfigFile}'
if DataVhost.find('/.well-known/acme-challenge') == -1: # Read current configuration
try:
with open(completePathToConfigFile, 'r') as f:
DataVhost = f.read()
except IOError as e:
logging.CyberCPLogFileWriter.writeToFile(f'Error reading configuration file: {str(e)}')
return 0, f'Error reading configuration file: {str(e)}'
# Check for potential conflicts
conflicts = []
# Check if ACME challenge is already configured
if DataVhost.find('/.well-known/acme-challenge') != -1:
logging.CyberCPLogFileWriter.writeToFile(f'ACME challenge already configured for {virtualHostName}')
return 1, 'ACME challenge already configured'
# Check for conflicting rewrite rules
if DataVhost.find('rewrite') != -1 and DataVhost.find('enable 1') != -1:
conflicts.append('Active rewrite rules found that might interfere with ACME challenges')
# Check for conflicting location blocks
if DataVhost.find('location /.well-known') != -1:
conflicts.append('Existing location block for /.well-known found')
# Check for conflicting aliases
if DataVhost.find('Alias /.well-known') != -1:
conflicts.append('Existing alias for /.well-known found')
# Check for conflicting context blocks
if DataVhost.find('context /.well-known') != -1:
conflicts.append('Existing context block for /.well-known found')
# Check for conflicting access controls
if DataVhost.find('deny from all') != -1 and DataVhost.find('location') != -1:
conflicts.append('Global deny rules found that might block ACME challenges')
# If conflicts found, log them and return
if conflicts:
conflict_message = 'Configuration conflicts found: ' + '; '.join(conflicts)
logging.CyberCPLogFileWriter.writeToFile(
f'Configuration conflicts for {virtualHostName}: {conflict_message}')
return 0, conflict_message
# Create challenge directory if it doesn't exist
challenge_dir = '/usr/local/lsws/Example/html/.well-known/acme-challenge'
try:
os.makedirs(challenge_dir, exist_ok=True)
# Set proper permissions
os.chmod(challenge_dir, 0o755)
except OSError as e:
logging.CyberCPLogFileWriter.writeToFile(f'Error creating challenge directory: {str(e)}')
return 0, f'Error creating challenge directory: {str(e)}'
# Handle configuration based on server type
if ProcessUtilities.decideServer() == ProcessUtilities.OLS: if ProcessUtilities.decideServer() == ProcessUtilities.OLS:
WriteToFile = open(completePathToConfigFile, 'a') # OpenLiteSpeed configuration
try:
with open(completePathToConfigFile, 'a') as f:
content = ''' content = '''
context /.well-known/acme-challenge { context /.well-known/acme-challenge {
location /usr/local/lsws/Example/html/.well-known/acme-challenge location /usr/local/lsws/Example/html/.well-known/acme-challenge
allowBrowse 1 allowBrowse 1
rewrite { rewrite {
enable 0 enable 0
} }
addDefaultCharset off addDefaultCharset off
phpIniOverride { phpIniOverride {
} }
} }
''' '''
WriteToFile.write(content) f.write(content)
WriteToFile.close() except IOError as e:
logging.CyberCPLogFileWriter.writeToFile(f'Error writing OLS configuration: {str(e)}')
return 0, f'Error writing OLS configuration: {str(e)}'
else: else:
data = open(completePathToConfigFile, 'r').readlines() # Apache configuration
WriteToFile = open(completePathToConfigFile, 'w') try:
Check = 0 # Read current configuration
for items in data: with open(completePathToConfigFile, 'r') as f:
if items.find('DocumentRoot /home/')> -1: lines = f.readlines()
if Check == 0:
WriteToFile.write(items)
WriteToFile.write(' Alias /.well-known/acme-challenge /usr/local/lsws/Example/html/.well-known/acme-challenge\n')
Check = 1
else:
WriteToFile.write(items)
else:
WriteToFile.write(items)
WriteToFile.close() # Write new configuration
with open(completePathToConfigFile, 'w') as f:
check = 0
for line in lines:
f.write(line)
if line.find('DocumentRoot /home/') > -1 and check == 0:
f.write(
' Alias /.well-known/acme-challenge /usr/local/lsws/Example/html/.well-known/acme-challenge\n')
check = 1
except IOError as e:
logging.CyberCPLogFileWriter.writeToFile(f'Error writing Apache configuration: {str(e)}')
return 0, f'Error writing Apache configuration: {str(e)}'
# Restart LiteSpeed
try:
from plogical import installUtilities from plogical import installUtilities
installUtilities.installUtilities.reStartLiteSpeed() installUtilities.installUtilities.reStartLiteSpeed()
logging.CyberCPLogFileWriter.writeToFile(
f'Successfully configured ACME challenge for {virtualHostName}')
return 1, 'Successfully configured ACME challenge'
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f'Error restarting LiteSpeed: {str(e)}')
return 0, f'Error restarting LiteSpeed: {str(e)}'
except Exception as e:
except BaseException as msg: logging.CyberCPLogFileWriter.writeToFile(f'Unexpected error in PatchVhostConf: {str(e)}')
return 0, str(msg) return 0, f'Unexpected error: {str(e)}'
@staticmethod @staticmethod
def installSSLForDomain(virtualHostName, adminEmail='example@example.org'): def installSSLForDomain(virtualHostName, adminEmail='example@example.org'):
@@ -476,28 +554,14 @@ context /.well-known/acme-challenge {
@staticmethod @staticmethod
def obtainSSLForADomain(virtualHostName, adminEmail, sslpath, aliasDomain=None): def obtainSSLForADomain(virtualHostName, adminEmail, sslpath, aliasDomain=None):
from plogical.acl import ACLManager from plogical.acl import ACLManager
from plogical.sslv2 import sslUtilities as sslv2 from plogical.sslv2 import sslUtilities as sslv2
from plogical.customACME import CustomACME
import json import json
# import socket
# url = "https://platform.cyberpersons.com/CyberpanelAdOns/Adonpermission"
# data = {
# "name": "all",
# "IP": ACLManager.GetServerIP()
# }
#
# import requests
# response = requests.post(url, data=json.dumps(data))
#Status = response.json()['status']
Status = 1 Status = 1
# if (Status == 1) or ProcessUtilities.decideServer() == ProcessUtilities.ent:
# retStatus, message = sslv2.obtainSSLForADomain(virtualHostName, adminEmail, sslpath, aliasDomain)
# if retStatus == 1:
# return retStatus
if sslUtilities.CheckIfSSLNeedsToBeIssued(virtualHostName) == sslUtilities.ISSUE_SSL: if sslUtilities.CheckIfSSLNeedsToBeIssued(virtualHostName) == sslUtilities.ISSUE_SSL:
pass pass
else: else:
@@ -514,58 +578,40 @@ context /.well-known/acme-challenge {
command = f'chmod -R 755 /usr/local/lsws/Example/html' command = f'chmod -R 755 /usr/local/lsws/Example/html'
ProcessUtilities.executioner(command) ProcessUtilities.executioner(command)
CustomVerificationFile = f'/usr/local/lsws/Example/html/.well-known/acme-challenge/{virtualHostName}' # Try custom ACME implementation first
command = f'touch {CustomVerificationFile}'
ProcessUtilities.normalExecutioner(command)
URLFetchPathWWW = f'http://www.{virtualHostName}/.well-known/acme-challenge/{virtualHostName}'
URLFetchPathNONWWW = f'http://{virtualHostName}/.well-known/acme-challenge/{virtualHostName}'
try: try:
resp = requests.get(URLFetchPathWWW, timeout=5) domains = [virtualHostName, f'www.{virtualHostName}']
if aliasDomain:
if resp.status_code == 200: domains.extend([aliasDomain, f'www.{aliasDomain}'])
logging.CyberCPLogFileWriter.writeToFile(f'Status Code: 200 for: {URLFetchPathWWW}')
WWWStatus = 1
else:
logging.CyberCPLogFileWriter.writeToFile(
f'Status Code: {str(resp.status_code)} for: {URLFetchPathWWW}. Error: {resp.text}')
except BaseException as msg:
logging.CyberCPLogFileWriter.writeToFile(
f'Status Code: Unknown for: {URLFetchPathWWW}. Error: {str(msg)}')
# Check if Cloudflare is used
use_dns = False
try: try:
resp = requests.get(URLFetchPathNONWWW, timeout=5) website = Websites.objects.get(domain=virtualHostName)
if resp.status_code == 200: if website.externalApp == 'cloudflare':
logging.CyberCPLogFileWriter.writeToFile(f'Status Code: 200 for: {URLFetchPathNONWWW}') use_dns = True
NONWWWStatus = 1 except:
else: pass
logging.CyberCPLogFileWriter.writeToFile(f'Status Code: {str(resp.status_code)} for: {URLFetchPathNONWWW}. Error: {resp.text}')
except BaseException as msg: acme = CustomACME(virtualHostName, adminEmail, staging=False) # Force production environment
if acme.issue_certificate(domains, use_dns=use_dns):
logging.CyberCPLogFileWriter.writeToFile( logging.CyberCPLogFileWriter.writeToFile(
f'Status Code: Unkown for: {URLFetchPathNONWWW}. Error: {str(msg)}') f"Successfully obtained SSL using custom ACME implementation for: {virtualHostName}")
return 1
WWWStatus = 1 except Exception as e:
NONWWWStatus = 1 logging.CyberCPLogFileWriter.writeToFile(
f"Custom ACME implementation failed: {str(e)}. Falling back to acme.sh")
# Fallback to acme.sh if custom implementation fails
try: try:
acmePath = '/root/.acme.sh/acme.sh' acmePath = '/root/.acme.sh/acme.sh'
### register account for zero ssl
command = '%s --register-account -m %s' % (acmePath, adminEmail) command = '%s --register-account -m %s' % (acmePath, adminEmail)
subprocess.call(shlex.split(command)) subprocess.call(shlex.split(command))
# if ProcessUtilities.decideDistro() == ProcessUtilities.ubuntu:
# acmePath = '/home/cyberpanel/.acme.sh/acme.sh'
command = '%s --set-default-ca --server letsencrypt' % (acmePath) command = '%s --set-default-ca --server letsencrypt' % (acmePath)
subprocess.call(shlex.split(command)) subprocess.call(shlex.split(command))
if aliasDomain is None: if aliasDomain is None:
existingCertPath = '/etc/letsencrypt/live/' + virtualHostName existingCertPath = '/etc/letsencrypt/live/' + virtualHostName
if not os.path.exists(existingCertPath): if not os.path.exists(existingCertPath):
command = 'mkdir -p ' + existingCertPath command = 'mkdir -p ' + existingCertPath
@@ -575,178 +621,52 @@ context /.well-known/acme-challenge {
command = acmePath + " --issue -d " + virtualHostName + " -d www." + virtualHostName \ command = acmePath + " --issue -d " + virtualHostName + " -d www." + virtualHostName \
+ ' --cert-file ' + existingCertPath + '/cert.pem' + ' --key-file ' + existingCertPath + '/privkey.pem' \ + ' --cert-file ' + existingCertPath + '/cert.pem' + ' --key-file ' + existingCertPath + '/privkey.pem' \
+ ' --fullchain-file ' + existingCertPath + '/fullchain.pem' + ' -w /usr/local/lsws/Example/html -k ec-256 --force --staging' + ' --fullchain-file ' + existingCertPath + '/fullchain.pem' + ' -w /usr/local/lsws/Example/html -k ec-256 --force --staging'
#ResultText = open(logging.CyberCPLogFileWriter.fileName, 'r').read()
#CurrentMessage = "Trying to obtain SSL for: " + virtualHostName + " and: www." + virtualHostName
if (WWWStatus and NONWWWStatus):
#logging.CyberCPLogFileWriter.writeToFile(CurrentMessage, 0)
logging.CyberCPLogFileWriter.writeToFile(command, 0)
#output = subprocess.check_output(shlex.split(command)).decode("utf-8")
try:
result = subprocess.run(command, capture_output=True, universal_newlines=True,shell=True)
except:
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, shell=True)
stdout = result.stdout
stderr = result.stderr
if ProcessUtilities.decideServer() == ProcessUtilities.OLS:
result = subprocess.run(command, capture_output=True, universal_newlines=True, shell=True)
else:
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True, shell=True)
if result.returncode == 0: if result.returncode == 0:
if os.path.exists(ProcessUtilities.debugPath):
logging.CyberCPLogFileWriter.writeToFile(stdout + stderr)
command = acmePath + " --issue -d " + virtualHostName + " -d www." + virtualHostName \ command = acmePath + " --issue -d " + virtualHostName + " -d www." + virtualHostName \
+ ' --cert-file ' + existingCertPath + '/cert.pem' + ' --key-file ' + existingCertPath + '/privkey.pem' \ + ' --cert-file ' + existingCertPath + '/cert.pem' + ' --key-file ' + existingCertPath + '/privkey.pem' \
+ ' --fullchain-file ' + existingCertPath + '/fullchain.pem' + ' -w /usr/local/lsws/Example/html -k ec-256 --force --server letsencrypt' + ' --fullchain-file ' + existingCertPath + '/fullchain.pem' + ' -w /usr/local/lsws/Example/html -k ec-256 --force --server letsencrypt'
logging.CyberCPLogFileWriter.writeToFile(command, 0) result = subprocess.run(command, capture_output=True, universal_newlines=True, shell=True)
try:
result = subprocess.run(command, capture_output=True, universal_newlines=True,
shell=True)
except:
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True, shell=True)
stdout = result.stdout
stderr = result.stderr
if result.returncode == 0: if result.returncode == 0:
if os.path.exists(ProcessUtilities.debugPath):
logging.CyberCPLogFileWriter.writeToFile(stdout + stderr)
logging.CyberCPLogFileWriter.writeToFile( logging.CyberCPLogFileWriter.writeToFile(
"Successfully obtained SSL for: " + virtualHostName + " and: www." + virtualHostName, 0) "Successfully obtained SSL for: " + virtualHostName + " and: www." + virtualHostName, 0)
logging.CyberCPLogFileWriter.SendEmail(sender_email, adminEmail, stdout, logging.CyberCPLogFileWriter.SendEmail(sender_email, adminEmail, result.stdout,
'SSL Notification for %s.' % (virtualHostName)) 'SSL Notification for %s.' % (virtualHostName))
return 1 return 1
else: return 0
logging.CyberCPLogFileWriter.writeToFile(stdout + stderr) except Exception as e:
raise subprocess.CalledProcessError(0, '', '') logging.CyberCPLogFileWriter.writeToFile(str(e))
else:
logging.CyberCPLogFileWriter.writeToFile(stdout + stderr)
raise subprocess.CalledProcessError(0, '', '')
else:
logging.CyberCPLogFileWriter.writeToFile(command, 0)
raise subprocess.CalledProcessError(0, '', '')
except subprocess.CalledProcessError:
logging.CyberCPLogFileWriter.writeToFile(
"Failed to obtain SSL for: " + virtualHostName + " and: www." + virtualHostName, 0)
finalText = "Failed to obtain SSL for: " + virtualHostName + " and: www." + virtualHostName
try:
command = acmePath + " --issue -d " + virtualHostName + ' --cert-file ' + existingCertPath \
+ '/cert.pem' + ' --key-file ' + existingCertPath + '/privkey.pem' \
+ ' --fullchain-file ' + existingCertPath + '/fullchain.pem' + ' -w /usr/local/lsws/Example/html -k ec-256 --force --staging'
#ResultText = open(logging.CyberCPLogFileWriter.fileName, 'r').read()
CurrentMessage = '%s\nTrying to obtain SSL for: %s' % (finalText, virtualHostName)
if NONWWWStatus:
finalText = '%s\nTrying to obtain SSL for: %s' % (finalText, virtualHostName)
logging.CyberCPLogFileWriter.writeToFile("Trying to obtain SSL for: " + virtualHostName, 0)
logging.CyberCPLogFileWriter.writeToFile(command)
#output = subprocess.check_output(shlex.split(command)).decode("utf-8")
try:
result = subprocess.run(command, capture_output=True, universal_newlines=True,
shell=True)
except:
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True, shell=True)
stdout = result.stdout
stderr = result.stderr
if result.returncode == 0:
if os.path.exists(ProcessUtilities.debugPath):
logging.CyberCPLogFileWriter.writeToFile(stdout + stderr)
command = acmePath + " --issue -d " + virtualHostName + ' --cert-file ' + existingCertPath \
+ '/cert.pem' + ' --key-file ' + existingCertPath + '/privkey.pem' \
+ ' --fullchain-file ' + existingCertPath + '/fullchain.pem' + ' -w /usr/local/lsws/Example/html -k ec-256 --force --server letsencrypt'
try:
result = subprocess.run(command, capture_output=True, universal_newlines=True,
shell=True)
except:
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
universal_newlines=True, shell=True)
stdout = result.stdout
stderr = result.stderr
if result.returncode == 0:
if os.path.exists(ProcessUtilities.debugPath):
logging.CyberCPLogFileWriter.writeToFile(stdout + stderr)
logging.CyberCPLogFileWriter.writeToFile(
"Successfully obtained SSL for: " + virtualHostName, 0)
finalText = '%s\nSuccessfully obtained SSL for: %s.' % (finalText, virtualHostName)
logging.CyberCPLogFileWriter.SendEmail(sender_email, adminEmail, finalText,
'SSL Notification for %s.' % (virtualHostName))
return 1
else:
logging.CyberCPLogFileWriter.writeToFile(command, 0)
logging.CyberCPLogFileWriter.writeToFile(stdout + stderr)
return 0 return 0
else: else:
logging.CyberCPLogFileWriter.writeToFile(stdout + stderr)
return 0
else:
logging.CyberCPLogFileWriter.writeToFile(command, 0)
return 0
except subprocess.CalledProcessError:
logging.CyberCPLogFileWriter.writeToFile('Failed to obtain SSL, issuing self-signed SSL for: ' + virtualHostName, 0)
logging.CyberCPLogFileWriter.SendEmail(sender_email, adminEmail, 'Failed to obtain SSL, issuing self-signed SSL for: ' + virtualHostName,
'SSL Notification for %s.' % (virtualHostName))
return 0
else:
existingCertPath = '/etc/letsencrypt/live/' + virtualHostName existingCertPath = '/etc/letsencrypt/live/' + virtualHostName
if not os.path.exists(existingCertPath): if not os.path.exists(existingCertPath):
command = 'mkdir -p ' + existingCertPath command = 'mkdir -p ' + existingCertPath
subprocess.call(shlex.split(command)) subprocess.call(shlex.split(command))
try: try:
logging.CyberCPLogFileWriter.writeToFile(
"Trying to obtain SSL for: " + virtualHostName + ", www." + virtualHostName + ", " + aliasDomain + " and www." + aliasDomain + ",")
command = acmePath + " --issue -d " + virtualHostName + " -d www." + virtualHostName \ command = acmePath + " --issue -d " + virtualHostName + " -d www." + virtualHostName \
+ ' -d ' + aliasDomain + ' -d www.' + aliasDomain\ + ' -d ' + aliasDomain + ' -d www.' + aliasDomain \
+ ' --cert-file ' + existingCertPath + '/cert.pem' + ' --key-file ' + existingCertPath + '/privkey.pem' \ + ' --cert-file ' + existingCertPath + '/cert.pem' + ' --key-file ' + existingCertPath + '/privkey.pem' \
+ ' --fullchain-file ' + existingCertPath + '/fullchain.pem' + ' -w /usr/local/lsws/Example/html -k ec-256 --force --server letsencrypt' + ' --fullchain-file ' + existingCertPath + '/fullchain.pem' + ' -w /usr/local/lsws/Example/html -k ec-256 --force --server letsencrypt'
output = subprocess.check_output(shlex.split(command)).decode("utf-8") result = subprocess.run(command, capture_output=True, universal_newlines=True, shell=True)
logging.CyberCPLogFileWriter.writeToFile(
"Successfully obtained SSL for: " + virtualHostName + ", www." + virtualHostName + ", " + aliasDomain + "and www." + aliasDomain + ",")
except subprocess.CalledProcessError: if result.returncode == 0:
logging.CyberCPLogFileWriter.writeToFile( return 1
"Failed to obtain SSL for: " + virtualHostName + ", www." + virtualHostName + ", " + aliasDomain + "and www." + aliasDomain + ",")
return 0 return 0
except Exception as e:
## logging.CyberCPLogFileWriter.writeToFile(str(e))
return 0 return 0
except Exception as e:
except BaseException as msg: logging.CyberCPLogFileWriter.writeToFile(str(e))
logging.CyberCPLogFileWriter.writeToFile(str(msg) + " [Failed to obtain SSL. [obtainSSLForADomain]]")
return 0 return 0
@@ -767,13 +687,16 @@ def issueSSLForDomain(domain, adminEmail, sslpath, aliasDomain=None):
if os.path.exists(pathToStoreSSLFullChain): if os.path.exists(pathToStoreSSLFullChain):
import OpenSSL import OpenSSL
x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, open(pathToStoreSSLFullChain, 'r').read()) x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
open(pathToStoreSSLFullChain, 'r').read())
SSLProvider = x509.get_issuer().get_components()[1][1].decode('utf-8') SSLProvider = x509.get_issuer().get_components()[1][1].decode('utf-8')
if SSLProvider != 'Denial': if SSLProvider != 'Denial':
if sslUtilities.installSSLForDomain(domain) == 1: if sslUtilities.installSSLForDomain(domain) == 1:
logging.CyberCPLogFileWriter.writeToFile("We are not able to get new SSL for " + domain + ". But there is an existing SSL, it might only be for the main domain (excluding www).") logging.CyberCPLogFileWriter.writeToFile(
return [1, "We are not able to get new SSL for " + domain + ". But there is an existing SSL, it might only be for the main domain (excluding www)." + " [issueSSLForDomain]"] "We are not able to get new SSL for " + domain + ". But there is an existing SSL, it might only be for the main domain (excluding www).")
return [1,
"We are not able to get new SSL for " + domain + ". But there is an existing SSL, it might only be for the main domain (excluding www)." + " [issueSSLForDomain]"]
command = 'openssl req -newkey rsa:2048 -new -nodes -x509 -days 3650 -subj "/C=US/ST=Denial/L=Springfield/O=Dis/CN=' + domain + '" -keyout ' + pathToStoreSSLPrivKey + ' -out ' + pathToStoreSSLFullChain command = 'openssl req -newkey rsa:2048 -new -nodes -x509 -days 3650 -subj "/C=US/ST=Denial/L=Springfield/O=Dis/CN=' + domain + '" -keyout ' + pathToStoreSSLPrivKey + ' -out ' + pathToStoreSSLFullChain
cmd = shlex.split(command) cmd = shlex.split(command)

View File

@@ -2726,7 +2726,13 @@ class WebsiteManager:
port = ProcessUtilities.fetchCurrentPort() port = ProcessUtilities.fetchCurrentPort()
webhookURL = 'https://' + ipAddress + ':%s/websites/' % (port) + self.domain + '/gitNotify' webhookURL = 'https://' + ipAddress + ':%s/websites/' % (port) + self.domain + '/gitNotify'
webhookURL = webhookURL.replace(' ', '%20')
if website.externalApp == 'github':
command = "ssh-keygen -f /home/%s/.ssh/%s -t rsa -N ''" % (self.domain, website.externalApp)
ProcessUtilities.executioner(command, website.externalApp)
configContent = """Host github.com
proc = httpProc(request, 'websiteFunctions/setupGit.html', proc = httpProc(request, 'websiteFunctions/setupGit.html',
{'domainName': self.domain, 'installed': 1, 'webhookURL': webhookURL}) {'domainName': self.domain, 'installed': 1, 'webhookURL': webhookURL})
return proc.render() return proc.render()