feature: add onboarding function for server and mail hostname

This commit is contained in:
usmannasir
2024-01-18 13:04:47 +05:00
parent b3de5abe10
commit d13ba6727f
7 changed files with 287 additions and 24 deletions

View File

@@ -54,6 +54,7 @@ class CyberCPLogFileWriter:
@staticmethod
def writeToFile(message, email=None):
try:
print("[" + time.strftime("%m.%d.%Y_%H-%M-%S") + "] "+ message)
file = open(CyberCPLogFileWriter.fileName,'a')
file.writelines("[" + time.strftime(
"%m.%d.%Y_%H-%M-%S") + "] "+ message + "\n")

View File

@@ -1131,6 +1131,7 @@ LogFile /var/log/clamav/clamav.log
str((msg) + " [changeclamavConfig]")
print(0, str(msg))
return [0, str(msg) + " [changeclamavConfig]"]
@staticmethod
def installMailScanner(install, SpamAssassin):
try:
@@ -1393,6 +1394,29 @@ LogFile /var/log/clamav/clamav.log
str(msg) + " [checkIfMailScannerInstalled]")
return 0
@staticmethod
def FetchPostfixHostname():
PostfixPath = '/etc/postfix/main.cf'
if os.path.exists(PostfixPath):
PostFixConf = open(PostfixPath, 'r').readlines()
for line in PostFixConf:
if line.find('myhostname') > -1:
hostname = line.split('=')[1].strip(' ').rstrip('\n')
return hostname
else:
return 'localhost'
@staticmethod
def reverse_dns_lookup(ip_address):
try:
import socket
host_name, _, _ = socket.gethostbyaddr(ip_address)
return host_name
except socket.herror as e:
# Handle errors, e.g., if reverse DNS lookup fails
return None
####### Imported below functions from mailserver/mailservermanager, need to refactor later
class MailServerManagerUtils(multi.Thread):

View File

@@ -406,6 +406,16 @@ context /.well-known/acme-challenge {
if retStatus == 1:
return retStatus
#### if website already have an SSL, better not issue again - need to check for wild-card
filePath = '/etc/letsencrypt/live/%s/fullchain.pem' % (virtualHostName)
if os.path.exists(filePath):
import OpenSSL
x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, open(filePath, 'r').read())
SSLProvider = x509.get_issuer().get_components()[1][1].decode('utf-8')
if SSLProvider != 'Denial':
return 1
sender_email = 'root@%s' % (socket.gethostname())
sslUtilities.PatchVhostConf(virtualHostName)

View File

@@ -403,6 +403,16 @@ class sslUtilities:
Namecheck_Check = 0
CyberPanel_Check = 0
#### if website already have an SSL, better not issue again - need to check for wild-card
filePath = '/etc/letsencrypt/live/%s/fullchain.pem' % (virtualHostName)
if os.path.exists(filePath):
import OpenSSL
x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, open(filePath, 'r').read())
SSLProvider = x509.get_issuer().get_components()[1][1].decode('utf-8')
if SSLProvider != 'Denial':
return 1, 'This domain already have a valid SSL.'
CF_Check, message = sslUtilities.FindIfDomainInCloudflare(virtualHostName)

View File

@@ -1 +1,40 @@
import imaplib
import getpass
from email import message_from_string
# IMAP server settings
imap_server = 'mail.wpmautic.net'
imap_port = 993
# User credentials
email_address = 'usman@wpmautic.net'
password = getpass.getpass("Enter your email password: ")
# Connect to the IMAP server
mail = imaplib.IMAP4_SSL(imap_server, imap_port)
# Log in to the mailbox
mail.login(email_address, password)
# Select the INBOX
mail.select("inbox")
# Search for all emails in the INBOX
result, data = mail.search(None, "ALL")
email_ids = data[0].split()
# Fetch and print header information for each email
for email_id in email_ids:
result, message_data = mail.fetch(email_id, "(BODY[HEADER.FIELDS (FROM TO SUBJECT DATE)])")
raw_email = message_data[0][1].decode('utf-8')
msg = message_from_string(raw_email)
print(f"Email ID: {email_id}")
print(f"From: {msg['From']}")
print(f"To: {msg['To']}")
print(f"Subject: {msg['Subject']}")
print(f"Date: {msg['Date']}")
print("-" * 30)
# Logout
mail.logout()

View File

@@ -0,0 +1,39 @@
import socket
def reverse_dns_lookup(ip_address):
try:
host_name, _, _ = socket.gethostbyaddr(ip_address)
return host_name
except socket.herror as e:
# Handle errors, e.g., if reverse DNS lookup fails
return None
# Example usage
ip_address_to_check = "95.217.248.69"
result = reverse_dns_lookup(ip_address_to_check)
if result:
print(f"Reverse DNS lookup for {ip_address_to_check}: {result}")
else:
print(f"Reverse DNS lookup failed for {ip_address_to_check}")
import socket
def reverse_dns_lookup_bypass_cache(ip_address):
try:
# Use getnameinfo to bypass DNS cache
host_name, _ = socket.getnameinfo((ip_address, 0), socket.NI_NAMEREQD)
return host_name
except socket.herror as e:
# Handle errors, e.g., if reverse DNS lookup fails
return None
# Example usage
ip_address_to_check = "95.217.248.69"
result = reverse_dns_lookup_bypass_cache(ip_address_to_check)
if result:
print(f"Reverse DNS lookup for {ip_address_to_check}: {result}")
else:
print(f"Reverse DNS lookup failed for {ip_address_to_check}")

View File

@@ -3,6 +3,7 @@ import os
import os.path
import sys
import django
# PACKAGE_PARENT = '..'
# SCRIPT_DIR = os.path.dirname(os.path.realpath(os.path.join(os.getcwd(), os.path.expanduser(__file__))))
# sys.path.append(os.path.normpath(os.path.join(SCRIPT_DIR, PACKAGE_PARENT)))
@@ -50,6 +51,143 @@ class virtualHostUtilities:
redisConf = '/usr/local/lsws/conf/dvhost_redis.conf'
vhostConfPath = '/usr/local/lsws/conf'
@staticmethod
def OnBoardingHostName(Domain, tempStatusPath):
import json
import OpenSSL
logging.CyberCPLogFileWriter.statusWriter(tempStatusPath, 'Setting up hostname,10')
admin = Administrator.objects.get(pk=1)
config = json.loads(admin.config)
### probably need to add temporary dns resolver nameserver here - pending
try:
CurrentHostName = config['hostname']
except:
CurrentHostName = ''
####
PostFixHostname = mailUtilities.FetchPostfixHostname()
serverIP = ACLManager.fetchIP()
rDNS = mailUtilities.reverse_dns_lookup(serverIP)
print(f'Postfix Hostname: {PostFixHostname}. Server IP {serverIP}. rDNS: {rDNS}')
### Case 1 if hostname already exists check if same hostname in postfix and rdns
filePath = '/etc/letsencrypt/live/%s/fullchain.pem' % (PostFixHostname)
if (CurrentHostName == PostFixHostname and CurrentHostName == rDNS) and os.path.exists(filePath):
# expireData = x509.get_notAfter().decode('ascii')
# finalDate = datetime.strptime(expireData, '%Y%m%d%H%M%SZ')
# now = datetime.now()
# diff = finalDate - now
message = 'Hostname is already set, the same hostname is also used with mail service and rDNS. Let see if valid SSL also exists for this hostname..,10'
print(message)
logging.CyberCPLogFileWriter.statusWriter(tempStatusPath, message)
logging.CyberCPLogFileWriter.writeToFile(message)
x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, open(filePath, 'r').read())
SSLProvider = x509.get_issuer().get_components()[1][1].decode('utf-8')
try:
child = ChildDomains.objects.get(domain=CurrentHostName)
website = child.master
path = child.path
except:
website = Websites.objects.get(domain=CurrentHostName)
path = f'/home/{CurrentHostName}/public_html'
if SSLProvider == 'Denial':
message = 'It seems that the hostname used with mail service and rDNS does not have a valid SSL certificate, CyberPanel will try to issue valid SSL and restart related services,20'
logging.CyberCPLogFileWriter.statusWriter(tempStatusPath, message)
logging.CyberCPLogFileWriter.writeToFile(message)
virtualHostUtilities.issueSSL(CurrentHostName, path, website.adminEmail)
### once SSL is issued, re-read the SSL file and check if valid ssl got issued.
x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, open(filePath, 'r').read())
SSLProvider = x509.get_issuer().get_components()[1][1].decode('utf-8')
if SSLProvider == 'Denial':
message = 'Hostname SSL was already issued, and same hostname was used in mail server SSL, rDNS was also configured but we found invalid SSL. However, we tried to issue SSL and it failed. [404]'
logging.CyberCPLogFileWriter.statusWriter(tempStatusPath, message)
logging.CyberCPLogFileWriter.writeToFile(message)
return 0
else:
message = "It looks like your current hostname is already the mail server hostname and rDNS is also set and there is a valid SSL, nothing needed to do."
print(message)
logging.CyberCPLogFileWriter.statusWriter(tempStatusPath, message)
logging.CyberCPLogFileWriter.writeToFile(message)
command = 'systemctl restart postfix && systemctl restart dovecot && postmap -F hash:/etc/postfix/vmail_ssl.map'
ProcessUtilities.executioner(command, 'root', True)
logging.CyberCPLogFileWriter.statusWriter(tempStatusPath, 'Completed. [200]')
else:
### Case 2 where postfix hostname either does not exist or does not match with server hostname or
### hostname does not exists at all
#first check if hostname is already configured as rDNS, if not return error
if Domain != rDNS:
message = 'Domain that you have provided is not configured as rDNS for your server IP. [404]'
logging.CyberCPLogFileWriter.statusWriter(tempStatusPath, message)
logging.CyberCPLogFileWriter.writeToFile(message)
return 0
### now issue hostname ssl
try:
website = Websites.objects.get(domain=Domain)
path = "/home/" + Domain + "/public_html"
except:
website = ChildDomains.objects.get(domain=Domain)
path = website.path
filePath = '/etc/letsencrypt/live/%s/fullchain.pem' % (Domain)
x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, open(filePath, 'r').read())
SSLProvider = x509.get_issuer().get_components()[1][1].decode('utf-8')
if SSLProvider == 'Denial':
virtualHostUtilities.issueSSLForHostName(Domain, path)
if SSLProvider == 'Denial':
message = 'Failed to issue Hostname SSL, either its DNS record is not propagated or the domain ie behind Cloudflare. [404]'
logging.CyberCPLogFileWriter.statusWriter(tempStatusPath, message)
logging.CyberCPLogFileWriter.writeToFile(message)
return 0
logging.CyberCPLogFileWriter.statusWriter(tempStatusPath, 'Hostname SSL issued,50')
x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, open(filePath, 'r').read())
SSLProvider = x509.get_issuer().get_components()[1][1].decode('utf-8')
if SSLProvider == 'Denial':
virtualHostUtilities.issueSSLForMailServer(Domain, path)
if SSLProvider == 'Denial':
message = 'Failed to issue Mail server SSL, either its DNS record is not propagated or the domain ie behind Cloudflare. [404]'
logging.CyberCPLogFileWriter.statusWriter(tempStatusPath, message)
logging.CyberCPLogFileWriter.writeToFile(message)
return 0
else:
config['hostname'] = Domain
admin.config = json.dumps(config)
admin.save()
command = 'systemctl restart postfix && systemctl restart dovecot && postmap -F hash:/etc/postfix/vmail_ssl.map'
ProcessUtilities.executioner(command, 'root', True)
logging.CyberCPLogFileWriter.statusWriter(tempStatusPath, 'Completed. [200]')
@staticmethod
def setupAutoDiscover(mailDomain, tempStatusPath, virtualHostName, admin):
@@ -59,7 +197,8 @@ class virtualHostUtilities:
childPath = '/home/%s/%s' % (virtualHostName, childDomain)
result = virtualHostUtilities.createDomain(virtualHostName, childDomain, 'PHP 7.3', childPath, 1, 0, 0,
admin.userName, 0, "/home/cyberpanel/" + str(randint(1000, 9999)))
admin.userName, 0,
"/home/cyberpanel/" + str(randint(1000, 9999)))
if result[0] == 0:
sslUtilities.issueSSLForDomain(childDomain, admin.email, childPath)
@@ -102,7 +241,6 @@ class virtualHostUtilities:
postfixMapFileContent = ''
if postfixMapFileContent.find('/live/%s/' % (childDomain)) == -1:
mapContent = '%s /etc/letsencrypt/live/%s/privkey.pem /etc/letsencrypt/live/%s/fullchain.pem\n' % (
childDomain, childDomain, childDomain)
@@ -132,7 +270,6 @@ class virtualHostUtilities:
if LimitsCheck:
if ACLManager.websitesLimitCheck(admin, 1) == 0:
logging.CyberCPLogFileWriter.statusWriter(tempStatusPath,
'You\'ve reached maximum websites limit as a reseller. [404]')
return 0, 'You\'ve reached maximum websites limit as a reseller.'
@@ -265,7 +402,8 @@ class virtualHostUtilities:
CLPath = '/etc/sysconfig/cloudlinux'
if os.path.exists(CLPath):
command = '/usr/share/cloudlinux/hooks/post_modify_user.py create --username %s --owner %s' % (virtualHostUser, admin.userName)
command = '/usr/share/cloudlinux/hooks/post_modify_user.py create --username %s --owner %s' % (
virtualHostUser, admin.userName)
ProcessUtilities.executioner(command)
### For autodiscover of mail clients.
@@ -351,7 +489,8 @@ class virtualHostUtilities:
groupName = 'nobody'
numberOfTotalLines = int(ProcessUtilities.outputExecutioner('wc -l %s' % (fileName), groupName).split(" ")[0])
numberOfTotalLines = int(
ProcessUtilities.outputExecutioner('wc -l %s' % (fileName), groupName).split(" ")[0])
if numberOfTotalLines < 25:
data = ProcessUtilities.outputExecutioner('cat %s' % (fileName), groupName)
@@ -514,11 +653,9 @@ class virtualHostUtilities:
print("0," + str(retValues[1]))
return 0, retValues[1]
command = 'chmod 600 %s' % (destPrivKey)
ProcessUtilities.normalExecutioner(command)
## removing old certs for lscpd
if os.path.exists(destPrivKey):
os.remove(destPrivKey)
@@ -1056,7 +1193,6 @@ class virtualHostUtilities:
postFixPath = '/home/cyberpanel/postfix'
if os.path.exists(postFixPath):
retValues = mailUtilities.setupDKIM(virtualHostName)
if retValues[0] == 0:
@@ -1080,7 +1216,8 @@ class virtualHostUtilities:
## Now restart litespeed after initial configurations are done
if LimitsCheck:
website = ChildDomains(master=master, domain=virtualHostName, path=path, phpSelection=phpVersion, ssl=ssl)
website = ChildDomains(master=master, domain=virtualHostName, path=path, phpSelection=phpVersion,
ssl=ssl)
website.save()
if ssl == 1:
@@ -1258,7 +1395,8 @@ class virtualHostUtilities:
def getDiskUsage(path, totalAllowed):
try:
totalUsageInMB = subprocess.check_output('du -hs %s --block-size=1M' % (path), shell=True).decode("utf-8").split()[0]
totalUsageInMB = \
subprocess.check_output('du -hs %s --block-size=1M' % (path), shell=True).decode("utf-8").split()[0]
percentage = float(100) / float(totalAllowed)
@@ -1333,7 +1471,6 @@ class virtualHostUtilities:
else:
group = 'nobody'
confPath = f'{virtualHostUtilities.vhostConfPath}/vhosts/{vhostName}/vhost.conf'
htpassword = f'{vhostPassDir}/{wpid}'
htpasstemp = f'/usr/local/CyberCP/{wpid}'
@@ -1443,7 +1580,6 @@ class virtualHostUtilities:
hashed = bcrypt.hashpw(password, bcrypt.gensalt())
UserPass = f'{username}:{hashed.decode()}:{username}'
writeToFile = open(htpasstemp, 'w')
writeToFile.write(UserPass)
writeToFile.close()
@@ -1456,7 +1592,6 @@ class virtualHostUtilities:
command = f'chmod 640 {htpassword}'
ProcessUtilities.executioner(command, externalApp, True)
command = f'sudo -u {externalApp} -g {group} chown {externalApp}:{group} {htpassword}'
ProcessUtilities.executioner(command)
@@ -1469,7 +1604,6 @@ class virtualHostUtilities:
return 0, str(msg)
def main():
parser = argparse.ArgumentParser(description='CyberPanel Installer')
parser.add_argument('function', help='Specific a function to call!')
@@ -1567,7 +1701,8 @@ def main():
virtualHostUtilities.createVirtualHost(args.virtualHostName, args.administratorEmail, args.phpVersion,
args.virtualHostUser, int(args.ssl), dkimCheck, openBasedir,
args.websiteOwner, args.package, apache, tempStatusPath, int(args.mailDomain))
args.websiteOwner, args.package, apache, tempStatusPath,
int(args.mailDomain))
elif args.function == "setupAutoDiscover":
admin = Administrator.objects.get(userName=args.websiteOwner)
virtualHostUtilities.setupAutoDiscover(1, '/home/cyberpanel/templogs', args.virtualHostName, admin)
@@ -1642,7 +1777,12 @@ def main():
elif args.function == 'switchServer':
virtualHostUtilities.switchServer(args.virtualHostName, args.phpVersion, int(args.server), args.tempStatusPath)
elif args.function == 'EnableDisablePP':
virtualHostUtilities.EnableDisablePP(args.virtualHostName, args.username, args.password, args.path, args.wpid, args.virtualHostUser)
virtualHostUtilities.EnableDisablePP(args.virtualHostName, args.username, args.password, args.path, args.wpid,
args.virtualHostUser)
elif args.function == 'OnBoardingHostName':
# in virtualHostName pass domain for which hostname should be set up
# in path pass temporary path where status of the function will be stored
virtualHostUtilities.OnBoardingHostName(args.virtualHostName, args.path)
if __name__ == "__main__":