Files
CyberPanel/firewall/firewallManager.py
Master3395 694cb03c80 Add firewall rule management features and enhance repository setup
- Implemented functionality to edit existing firewall rules, including validation and error handling.
- Added endpoints for exporting and importing firewall rules in JSON format, allowing users to manage rules more efficiently.
- Enhanced the user interface with modals for editing rules and buttons for exporting/importing rules.
- Updated the `cyberpanel.sh` script to support AlmaLinux 10 and improved LiteSpeed GPG key import with fallback options.
- Refactored repository setup to accommodate different OS versions, ensuring compatibility with CentOS and AlmaLinux.
2025-09-17 00:32:07 +02:00

1996 lines
75 KiB
Python

#!/usr/local/CyberCP/bin/python
import os
import os.path
import sys
import django
from loginSystem.models import Administrator
from plogical.httpProc import httpProc
sys.path.append('/usr/local/CyberCP')
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "CyberCP.settings")
django.setup()
import json
from plogical.acl import ACLManager
import plogical.CyberCPLogFileWriter as logging
from plogical.virtualHostUtilities import virtualHostUtilities
import subprocess
from django.shortcuts import HttpResponse, render, redirect
from random import randint
import time
from plogical.firewallUtilities import FirewallUtilities
from firewall.models import FirewallRules
from plogical.modSec import modSec
from plogical.csf import CSF
from plogical.processUtilities import ProcessUtilities
from serverStatus.serverStatusUtil import ServerStatusUtil
class FirewallManager:
imunifyPath = '/usr/bin/imunify360-agent'
CLPath = '/etc/sysconfig/cloudlinux'
imunifyAVPath = '/etc/sysconfig/imunify360/integration.conf'
def __init__(self, request = None):
self.request = request
def securityHome(self, request = None, userID = None):
proc = httpProc(request, 'firewall/index.html',
None, 'admin')
return proc.render()
def firewallHome(self, request = None, userID = None):
csfPath = '/etc/csf'
if os.path.exists(csfPath):
return redirect('/configservercsf/')
else:
proc = httpProc(request, 'firewall/firewall.html',
None, 'admin')
return proc.render()
def getCurrentRules(self, userID = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('fetchStatus', 0)
rules = FirewallRules.objects.all()
# Ensure CyberPanel port 7080 rule exists in database for visibility
cyberpanel_rule_exists = False
for rule in rules:
if rule.port == '7080':
cyberpanel_rule_exists = True
break
if not cyberpanel_rule_exists:
# Create database entry for port 7080 (already enabled in system firewall)
try:
cyberpanel_rule = FirewallRules(
name="CyberPanel Admin",
proto="tcp",
port="7080",
ipAddress="0.0.0.0/0"
)
cyberpanel_rule.save()
logging.CyberCPLogFileWriter.writeToFile("Added CyberPanel port 7080 to firewall database for UI visibility")
except Exception as e:
logging.CyberCPLogFileWriter.writeToFile(f"Failed to add CyberPanel port 7080 to database: {str(e)}")
# Refresh rules after potential creation
rules = FirewallRules.objects.all()
json_data = "["
checker = 0
for items in rules:
dic = {
'id': items.id,
'name': items.name,
'proto': items.proto,
'port': items.port,
'ipAddress': items.ipAddress,
}
if checker == 0:
json_data = json_data + json.dumps(dic)
checker = 1
else:
json_data = json_data + ',' + json.dumps(dic)
json_data = json_data + ']'
final_json = json.dumps({'status': 1, 'fetchStatus': 1, 'error_message': "None", "data": json_data})
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'status': 0, 'fetchStatus': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def addRule(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('add_status', 0)
ruleName = data['ruleName']
ruleProtocol = data['ruleProtocol']
rulePort = data['rulePort']
ruleIP = data['ruleIP']
FirewallUtilities.addRule(ruleProtocol, rulePort, ruleIP)
newFWRule = FirewallRules(name=ruleName, proto=ruleProtocol, port=rulePort, ipAddress=ruleIP)
newFWRule.save()
final_dic = {'status': 1, 'add_status': 1, 'error_message': "None"}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'status': 0, 'add_status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def deleteRule(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('delete_status', 0)
ruleID = data['id']
ruleProtocol = data['proto']
rulePort = data['port']
ruleIP = data['ruleIP']
FirewallUtilities.deleteRule(ruleProtocol, rulePort, ruleIP)
delRule = FirewallRules.objects.get(id=ruleID)
delRule.delete()
final_dic = {'status': 1, 'delete_status': 1, 'error_message': "None"}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'status': 0, 'delete_status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def editRule(self, userID = None, data = None):
"""
Edit an existing firewall rule
"""
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('edit_status', 0)
ruleID = data['id']
newRuleName = data['ruleName']
newRuleProtocol = data['ruleProtocol']
newRulePort = data['rulePort']
newRuleIP = data['ruleIP']
# Get the existing rule
try:
existingRule = FirewallRules.objects.get(id=ruleID)
except FirewallRules.DoesNotExist:
final_dic = {'status': 0, 'edit_status': 0, 'error_message': 'Rule not found'}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
# Store old values for system firewall update
oldProtocol = existingRule.proto
oldPort = existingRule.port
oldIP = existingRule.ipAddress
# Check if any values actually changed
if (existingRule.name == newRuleName and
existingRule.proto == newRuleProtocol and
existingRule.port == newRulePort and
existingRule.ipAddress == newRuleIP):
final_dic = {'status': 1, 'edit_status': 1, 'error_message': "No changes detected"}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
# Check if another rule with the same name already exists (excluding current rule)
if existingRule.name != newRuleName:
duplicateRule = FirewallRules.objects.filter(name=newRuleName).exclude(id=ruleID).first()
if duplicateRule:
final_dic = {'status': 0, 'edit_status': 0, 'error_message': f'A rule with name "{newRuleName}" already exists'}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
# Update the rule in the system firewall
# First remove the old rule
FirewallUtilities.deleteRule(oldProtocol, oldPort, oldIP)
# Then add the new rule
FirewallUtilities.addRule(newRuleProtocol, newRulePort, newRuleIP)
# Update the database record
existingRule.name = newRuleName
existingRule.proto = newRuleProtocol
existingRule.port = newRulePort
existingRule.ipAddress = newRuleIP
existingRule.save()
logging.CyberCPLogFileWriter.writeToFile(f"Firewall rule edited successfully. ID: {ruleID}, Name: {newRuleName}")
final_dic = {'status': 1, 'edit_status': 1, 'error_message': "None"}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'status': 0, 'edit_status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def reloadFirewall(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('reload_status', 0)
command = 'sudo firewall-cmd --reload'
res = ProcessUtilities.executioner(command)
if res == 1:
final_dic = {'reload_status': 1, 'error_message': "None"}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
else:
final_dic = {'reload_status': 0,
'error_message': "Can not reload firewall, see CyberCP main log file."}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'reload_status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def startFirewall(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('start_status', 0)
command = 'sudo systemctl start firewalld'
res = ProcessUtilities.executioner(command)
if res == 1:
final_dic = {'start_status': 1, 'error_message': "None"}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
else:
final_dic = {'start_status': 0,
'error_message': "Can not start firewall, see CyberCP main log file."}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'start_status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def stopFirewall(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('stop_status', 0)
command = 'sudo systemctl stop firewalld'
res = ProcessUtilities.executioner(command)
if res == 1:
final_dic = {'stop_status': 1, 'error_message': "None"}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
else:
final_dic = {'stop_status': 0,
'error_message': "Can not stop firewall, see CyberCP main log file."}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'stop_status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def firewallStatus(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson()
command = 'systemctl status firewalld'
status = ProcessUtilities.outputExecutioner(command)
if status.find("dead") > -1:
final_dic = {'status': 1, 'error_message': "none", 'firewallStatus': 0}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
else:
final_dic = {'status': 1, 'error_message': "none", 'firewallStatus': 1}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def secureSSH(self, request = None, userID = None):
proc = httpProc(request, 'firewall/secureSSH.html',
None, 'admin')
return proc.render()
def getSSHConfigs(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson()
type = data['type']
if type == "1":
## temporarily changing permission for sshd files
pathToSSH = "/etc/ssh/sshd_config"
cat = "sudo cat " + pathToSSH
data = ProcessUtilities.outputExecutioner(cat).split('\n')
permitRootLogin = 0
sshPort = "22"
for items in data:
if items.find("PermitRootLogin") > -1:
if items.find("Yes") > -1 or items.find("yes") > -1:
permitRootLogin = 1
continue
if items.find("Port") > -1 and not items.find("GatewayPorts") > -1:
sshPort = items.split(" ")[1].strip("\n")
final_dic = {'status': 1, 'permitRootLogin': permitRootLogin, 'sshPort': sshPort}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
else:
pathToKeyFile = "/root/.ssh/authorized_keys"
cat = "sudo cat " + pathToKeyFile
data = ProcessUtilities.outputExecutioner(cat).split('\n')
json_data = "["
checker = 0
for items in data:
if items.find("ssh-rsa") > -1:
keydata = items.split(" ")
try:
key = "ssh-rsa " + keydata[1][:50] + " .. " + keydata[2]
try:
userName = keydata[2][:keydata[2].index("@")]
except:
userName = keydata[2]
except:
key = "ssh-rsa " + keydata[1][:50]
userName = ''
dic = {'userName': userName,
'key': key,
}
if checker == 0:
json_data = json_data + json.dumps(dic)
checker = 1
else:
json_data = json_data + ',' + json.dumps(dic)
json_data = json_data + ']'
final_json = json.dumps({'status': 1, 'error_message': "None", "data": json_data})
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def saveSSHConfigs(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('saveStatus', 0)
type = data['type']
sshPort = data['sshPort']
rootLogin = data['rootLogin']
if rootLogin == True:
rootLogin = "1"
else:
rootLogin = "0"
execPath = "/usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/firewallUtilities.py"
execPath = execPath + " saveSSHConfigs --type " + str(type) + " --sshPort " + sshPort + " --rootLogin " + rootLogin
output = ProcessUtilities.outputExecutioner(execPath)
if output.find("1,None") > -1:
csfPath = '/etc/csf'
if os.path.exists(csfPath):
dataIn = {'protocol': 'TCP_IN', 'ports': sshPort}
self.modifyPorts(dataIn)
dataIn = {'protocol': 'TCP_OUT', 'ports': sshPort}
self.modifyPorts(dataIn)
else:
try:
updateFW = FirewallRules.objects.get(name="SSHCustom")
FirewallUtilities.deleteRule("tcp", updateFW.port, "0.0.0.0/0")
updateFW.port = sshPort
updateFW.save()
FirewallUtilities.addRule('tcp', sshPort, "0.0.0.0/0")
except:
try:
newFireWallRule = FirewallRules(name="SSHCustom", port=sshPort, proto="tcp")
newFireWallRule.save()
FirewallUtilities.addRule('tcp', sshPort, "0.0.0.0/0")
command = 'firewall-cmd --permanent --remove-service=ssh'
ProcessUtilities.executioner(command)
except BaseException as msg:
logging.CyberCPLogFileWriter.writeToFile(str(msg))
final_dic = {'status': 1, 'saveStatus': 1}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
else:
final_dic = {'status': 0, 'saveStatus': 0, "error_message": output}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'status': 0 ,'saveStatus': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def deleteSSHKey(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('delete_status', 0)
key = data['key']
execPath = "/usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/firewallUtilities.py"
execPath = execPath + " deleteSSHKey --key '" + key + "'"
output = ProcessUtilities.outputExecutioner(execPath)
if output.find("1,None") > -1:
final_dic = {'status': 1, 'delete_status': 1}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
else:
final_dic = {'status': 1, 'delete_status': 1, "error_mssage": output}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'status': 0, 'delete_status': 0, 'error_mssage': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def addSSHKey(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('add_status', 0)
key = data['key']
tempPath = "/home/cyberpanel/" + str(randint(1000, 9999))
writeToFile = open(tempPath, "w")
writeToFile.write(key)
writeToFile.close()
execPath = "sudo /usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/firewallUtilities.py"
execPath = execPath + " addSSHKey --tempPath " + tempPath
output = ProcessUtilities.outputExecutioner(execPath)
if output.find("1,None") > -1:
final_dic = {'status': 1, 'add_status': 1}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
else:
final_dic = {'status': 0, 'add_status': 0, "error_mssage": output}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'status': 0, 'add_status': 0, 'error_mssage': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def loadModSecurityHome(self, request = None, userID = None):
if ProcessUtilities.decideServer() == ProcessUtilities.OLS:
OLS = 1
confPath = os.path.join(virtualHostUtilities.Server_root, "conf/httpd_config.conf")
command = "sudo cat " + confPath
httpdConfig = ProcessUtilities.outputExecutioner(command).splitlines()
modSecInstalled = 0
for items in httpdConfig:
if items.find('module mod_security') > -1:
modSecInstalled = 1
break
else:
OLS = 0
modSecInstalled = 1
proc = httpProc(request, 'firewall/modSecurity.html',
{'modSecInstalled': modSecInstalled, 'OLS': OLS}, 'admin')
return proc.render()
def installModSec(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('installModSec', 0)
execPath = "/usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/modSec.py"
execPath = execPath + " installModSec"
ProcessUtilities.popenExecutioner(execPath)
time.sleep(3)
final_json = json.dumps({'installModSec': 1, 'error_message': "None"})
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'installModSec': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def installStatusModSec(self, userID = None, data = None):
try:
command = "sudo cat " + modSec.installLogPath
installStatus = ProcessUtilities.outputExecutioner(command)
if installStatus.find("[200]") > -1:
execPath = "/usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/modSec.py"
execPath = execPath + " installModSecConfigs"
output = ProcessUtilities.outputExecutioner(execPath)
if output.find("1,None") > -1:
pass
else:
final_json = json.dumps({
'error_message': "Failed to install ModSecurity configurations.",
'requestStatus': installStatus,
'abort': 1,
'installed': 0,
})
return HttpResponse(final_json)
final_json = json.dumps({
'error_message': "None",
'requestStatus': installStatus,
'abort': 1,
'installed': 1,
})
return HttpResponse(final_json)
elif installStatus.find("[404]") > -1:
final_json = json.dumps({
'abort': 1,
'installed': 0,
'error_message': "None",
'requestStatus': installStatus,
})
return HttpResponse(final_json)
else:
final_json = json.dumps({
'abort': 0,
'error_message': "None",
'requestStatus': installStatus,
})
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'abort': 1, 'installed': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def fetchModSecSettings(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('fetchStatus', 0)
if ProcessUtilities.decideServer() == ProcessUtilities.OLS:
modsecurity = 0
SecAuditEngine = 0
SecRuleEngine = 0
SecDebugLogLevel = "9"
SecAuditLogRelevantStatus = '^(?:5|4(?!04))'
SecAuditLogParts = 'ABIJDEFHZ'
SecAuditLogType = 'Serial'
confPath = os.path.join(virtualHostUtilities.Server_root, 'conf/httpd_config.conf')
modSecPath = os.path.join(virtualHostUtilities.Server_root, 'modules', 'mod_security.so')
if os.path.exists(modSecPath):
command = "sudo cat " + confPath
data = ProcessUtilities.outputExecutioner(command).split('\n')
for items in data:
if items.find('modsecurity ') > -1:
if items.find('on') > -1 or items.find('On') > -1:
modsecurity = 1
continue
if items.find('SecAuditEngine ') > -1:
if items.find('on') > -1 or items.find('On') > -1:
SecAuditEngine = 1
continue
if items.find('SecRuleEngine ') > -1:
if items.find('on') > -1 or items.find('On') > -1:
SecRuleEngine = 1
continue
if items.find('SecDebugLogLevel') > -1:
result = items.split(' ')
if result[0] == 'SecDebugLogLevel':
SecDebugLogLevel = result[1]
continue
if items.find('SecAuditLogRelevantStatus') > -1:
result = items.split(' ')
if result[0] == 'SecAuditLogRelevantStatus':
SecAuditLogRelevantStatus = result[1]
continue
if items.find('SecAuditLogParts') > -1:
result = items.split(' ')
if result[0] == 'SecAuditLogParts':
SecAuditLogParts = result[1]
continue
if items.find('SecAuditLogType') > -1:
result = items.split(' ')
if result[0] == 'SecAuditLogType':
SecAuditLogType = result[1]
continue
final_dic = {'fetchStatus': 1,
'installed': 1,
'SecRuleEngine': SecRuleEngine,
'modsecurity': modsecurity,
'SecAuditEngine': SecAuditEngine,
'SecDebugLogLevel': SecDebugLogLevel,
'SecAuditLogParts': SecAuditLogParts,
'SecAuditLogRelevantStatus': SecAuditLogRelevantStatus,
'SecAuditLogType': SecAuditLogType,
}
else:
final_dic = {'fetchStatus': 1,
'installed': 0}
else:
SecAuditEngine = 0
SecRuleEngine = 0
SecDebugLogLevel = "9"
SecAuditLogRelevantStatus = '^(?:5|4(?!04))'
SecAuditLogParts = 'ABIJDEFHZ'
SecAuditLogType = 'Serial'
confPath = os.path.join(virtualHostUtilities.Server_root, 'conf/modsec.conf')
command = "sudo cat " + confPath
data = ProcessUtilities.outputExecutioner(command).split('\n')
for items in data:
if items.find('SecAuditEngine ') > -1:
if items.find('on') > -1 or items.find('On') > -1:
SecAuditEngine = 1
continue
if items.find('SecRuleEngine ') > -1:
if items.find('on') > -1 or items.find('On') > -1:
SecRuleEngine = 1
continue
if items.find('SecDebugLogLevel') > -1:
result = items.split(' ')
if result[0] == 'SecDebugLogLevel':
SecDebugLogLevel = result[1]
continue
if items.find('SecAuditLogRelevantStatus') > -1:
result = items.split(' ')
if result[0] == 'SecAuditLogRelevantStatus':
SecAuditLogRelevantStatus = result[1]
continue
if items.find('SecAuditLogParts') > -1:
result = items.split(' ')
if result[0] == 'SecAuditLogParts':
SecAuditLogParts = result[1]
continue
if items.find('SecAuditLogType') > -1:
result = items.split(' ')
if result[0] == 'SecAuditLogType':
SecAuditLogType = result[1]
continue
final_dic = {'fetchStatus': 1,
'installed': 1,
'SecRuleEngine': SecRuleEngine,
'SecAuditEngine': SecAuditEngine,
'SecDebugLogLevel': SecDebugLogLevel,
'SecAuditLogParts': SecAuditLogParts,
'SecAuditLogRelevantStatus': SecAuditLogRelevantStatus,
'SecAuditLogType': SecAuditLogType,
}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'fetchStatus': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def saveModSecConfigurations(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('saveStatus', 0)
if ProcessUtilities.decideServer() == ProcessUtilities.OLS:
modsecurity = data['modsecurity_status']
SecAuditEngine = data['SecAuditEngine']
SecRuleEngine = data['SecRuleEngine']
SecDebugLogLevel = data['SecDebugLogLevel']
SecAuditLogParts = data['SecAuditLogParts']
SecAuditLogRelevantStatus = data['SecAuditLogRelevantStatus']
SecAuditLogType = data['SecAuditLogType']
if modsecurity == True:
modsecurity = "modsecurity on"
else:
modsecurity = "modsecurity off"
if SecAuditEngine == True:
SecAuditEngine = "SecAuditEngine on"
else:
SecAuditEngine = "SecAuditEngine off"
if SecRuleEngine == True:
SecRuleEngine = "SecRuleEngine On"
else:
SecRuleEngine = "SecRuleEngine off"
SecDebugLogLevel = "SecDebugLogLevel " + str(SecDebugLogLevel)
SecAuditLogParts = "SecAuditLogParts " + str(SecAuditLogParts)
SecAuditLogRelevantStatus = "SecAuditLogRelevantStatus " + SecAuditLogRelevantStatus
SecAuditLogType = "SecAuditLogType " + SecAuditLogType
## writing data temporary to file
tempConfigPath = "/home/cyberpanel/" + str(randint(1000, 9999))
confPath = open(tempConfigPath, "w")
confPath.writelines(modsecurity + "\n")
confPath.writelines(SecAuditEngine + "\n")
confPath.writelines(SecRuleEngine + "\n")
confPath.writelines(SecDebugLogLevel + "\n")
confPath.writelines(SecAuditLogParts + "\n")
confPath.writelines(SecAuditLogRelevantStatus + "\n")
confPath.writelines(SecAuditLogType + "\n")
confPath.close()
## save configuration data
execPath = "/usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/modSec.py"
execPath = execPath + " saveModSecConfigs --tempConfigPath " + tempConfigPath
output = ProcessUtilities.outputExecutioner(execPath)
if output.find("1,None") > -1:
data_ret = {'saveStatus': 1, 'error_message': "None"}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
else:
data_ret = {'saveStatus': 0, 'error_message': output}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
else:
SecAuditEngine = data['SecAuditEngine']
SecRuleEngine = data['SecRuleEngine']
SecDebugLogLevel = data['SecDebugLogLevel']
SecAuditLogParts = data['SecAuditLogParts']
SecAuditLogRelevantStatus = data['SecAuditLogRelevantStatus']
SecAuditLogType = data['SecAuditLogType']
if SecAuditEngine == True:
SecAuditEngine = "SecAuditEngine on"
else:
SecAuditEngine = "SecAuditEngine off"
if SecRuleEngine == True:
SecRuleEngine = "SecRuleEngine On"
else:
SecRuleEngine = "SecRuleEngine off"
SecDebugLogLevel = "SecDebugLogLevel " + str(SecDebugLogLevel)
SecAuditLogParts = "SecAuditLogParts " + str(SecAuditLogParts)
SecAuditLogRelevantStatus = "SecAuditLogRelevantStatus " + SecAuditLogRelevantStatus
SecAuditLogType = "SecAuditLogType " + SecAuditLogType
## writing data temporary to file
tempConfigPath = "/home/cyberpanel/" + str(randint(1000, 9999))
confPath = open(tempConfigPath, "w")
confPath.writelines(SecAuditEngine + "\n")
confPath.writelines(SecRuleEngine + "\n")
confPath.writelines(SecDebugLogLevel + "\n")
confPath.writelines(SecAuditLogParts + "\n")
confPath.writelines(SecAuditLogRelevantStatus + "\n")
confPath.writelines(SecAuditLogType + "\n")
confPath.close()
## save configuration data
execPath = "/usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/modSec.py"
execPath = execPath + " saveModSecConfigs --tempConfigPath " + tempConfigPath
output = ProcessUtilities.outputExecutioner(execPath)
if output.find("1,None") > -1:
data_ret = {'saveStatus': 1, 'error_message': "None"}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
else:
data_ret = {'saveStatus': 0, 'error_message': output}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
except BaseException as msg:
data_ret = {'saveStatus': 0, 'error_message': str(msg)}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
def modSecRules(self, request = None, userID = None):
if ProcessUtilities.decideServer() == ProcessUtilities.OLS:
confPath = os.path.join(virtualHostUtilities.Server_root, "conf/httpd_config.conf")
command = "sudo cat " + confPath
httpdConfig = ProcessUtilities.outputExecutioner(command).split('\n')
modSecInstalled = 0
for items in httpdConfig:
if items.find('module mod_security') > -1:
modSecInstalled = 1
break
else:
modSecInstalled = 1
proc = httpProc(request, 'firewall/modSecurityRules.html',
{'modSecInstalled': modSecInstalled}, 'admin')
return proc.render()
def fetchModSecRules(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('modSecInstalled', 0)
if ProcessUtilities.decideServer() == ProcessUtilities.OLS:
confPath = os.path.join(virtualHostUtilities.Server_root, "conf/httpd_config.conf")
command = "sudo cat " + confPath
httpdConfig = ProcessUtilities.outputExecutioner(command).split('\n')
modSecInstalled = 0
for items in httpdConfig:
if items.find('module mod_security') > -1:
modSecInstalled = 1
break
rulesPath = os.path.join(virtualHostUtilities.Server_root + "/conf/modsec/rules.conf")
if modSecInstalled:
command = "sudo cat " + rulesPath
currentModSecRules = ProcessUtilities.outputExecutioner(command)
final_dic = {'modSecInstalled': 1,
'currentModSecRules': currentModSecRules}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
else:
final_dic = {'modSecInstalled': 0}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
else:
rulesPath = os.path.join(virtualHostUtilities.Server_root + "/conf/rules.conf")
command = "sudo cat " + rulesPath
currentModSecRules = ProcessUtilities.outputExecutioner(command)
final_dic = {'modSecInstalled': 1,
'currentModSecRules': currentModSecRules}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'modSecInstalled': 0,
'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def saveModSecRules(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('saveStatus', 0)
newModSecRules = data['modSecRules']
## writing data temporary to file
rulesPath = open(modSec.tempRulesFile, "w")
rulesPath.write(newModSecRules)
rulesPath.close()
## save configuration data
execPath = "/usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/modSec.py"
execPath = execPath + " saveModSecRules"
output = ProcessUtilities.outputExecutioner(execPath)
if output.find("1,None") > -1:
data_ret = {'saveStatus': 1, 'error_message': "None"}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
else:
data_ret = {'saveStatus': 0, 'error_message': output}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
except BaseException as msg:
data_ret = {'saveStatus': 0, 'error_message': str(msg)}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
def modSecRulesPacks(self, request = None, userID = None):
if ProcessUtilities.decideServer() == ProcessUtilities.OLS:
confPath = os.path.join(virtualHostUtilities.Server_root, "conf/httpd_config.conf")
command = "sudo cat " + confPath
httpdConfig = ProcessUtilities.outputExecutioner(command).split('\n')
modSecInstalled = 0
for items in httpdConfig:
if items.find('module mod_security') > -1:
modSecInstalled = 1
break
else:
modSecInstalled = 1
proc = httpProc(request, 'firewall/modSecurityRulesPacks.html',
{'modSecInstalled': modSecInstalled}, 'admin')
return proc.render()
def getOWASPAndComodoStatus(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('modSecInstalled', 0)
if ProcessUtilities.decideServer() == ProcessUtilities.OLS:
confPath = os.path.join(virtualHostUtilities.Server_root, "conf/httpd_config.conf")
command = "sudo cat " + confPath
httpdConfig = ProcessUtilities.outputExecutioner(command).splitlines()
modSecInstalled = 0
for items in httpdConfig:
if items.find('module mod_security') > -1:
modSecInstalled = 1
break
comodoInstalled = 0
owaspInstalled = 0
if modSecInstalled:
command = "sudo cat " + confPath
httpdConfig = ProcessUtilities.outputExecutioner(command).splitlines()
for items in httpdConfig:
if items.find('modsec/comodo') > -1:
comodoInstalled = 1
elif items.find('modsec/owasp') > -1:
owaspInstalled = 1
if owaspInstalled == 1 and comodoInstalled == 1:
break
final_dic = {
'modSecInstalled': 1,
'owaspInstalled': owaspInstalled,
'comodoInstalled': comodoInstalled
}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
else:
final_dic = {'modSecInstalled': 0}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
else:
comodoInstalled = 0
owaspInstalled = 0
try:
command = 'sudo ls /usr/local/lsws/conf/comodo_litespeed/'
output = ProcessUtilities.outputExecutioner(command)
if output.find('No such') > -1:
comodoInstalled = 0
else:
comodoInstalled = 1
except subprocess.CalledProcessError:
pass
try:
command = 'cat /usr/local/lsws/conf/modsec.conf'
output = ProcessUtilities.outputExecutioner(command)
if output.find('modsec/owasp') > -1:
owaspInstalled = 1
except:
pass
final_dic = {
'modSecInstalled': 1,
'owaspInstalled': owaspInstalled,
'comodoInstalled': comodoInstalled
}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'modSecInstalled': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def installModSecRulesPack(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('installStatus', 0)
packName = data['packName']
if ProcessUtilities.decideServer() == ProcessUtilities.OLS:
execPath = "/usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/modSec.py"
execPath = execPath + " " + packName
output = ProcessUtilities.outputExecutioner(execPath)
if output.find("1,None") > -1:
data_ret = {'installStatus': 1, 'error_message': "None"}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
else:
data_ret = {'installStatus': 0, 'error_message': output}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
else:
# if packName == 'disableOWASP' or packName == 'installOWASP':
# final_json = json.dumps({'installStatus': 0, 'error_message': "OWASP will be available later.", })
# return HttpResponse(final_json)
execPath = "/usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/modSec.py"
execPath = execPath + " " + packName
output = ProcessUtilities.outputExecutioner(execPath)
if output.find("1,None") > -1:
data_ret = {'installStatus': 1, 'error_message': "None"}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
else:
data_ret = {'installStatus': 0, 'error_message': output}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
except BaseException as msg:
data_ret = {'installStatus': 0, 'error_message': str(msg)}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
def getRulesFiles(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('fetchStatus', 0)
packName = data['packName']
confPath = os.path.join('/usr/local/lsws/conf/modsec/owasp-modsecurity-crs-3.0-master/owasp-master.conf')
command = "sudo cat " + confPath
httpdConfig = ProcessUtilities.outputExecutioner(command).splitlines()
json_data = "["
checker = 0
counter = 0
for items in httpdConfig:
if items.find('modsec/' + packName) > -1:
counter = counter + 1
if items[0] == '#':
status = False
else:
status = True
fileName = items.lstrip('#')
fileName = fileName.split('/')[-1]
dic = {
'id': counter,
'fileName': fileName,
'packName': packName,
'status': status,
}
if checker == 0:
json_data = json_data + json.dumps(dic)
checker = 1
else:
json_data = json_data + ',' + json.dumps(dic)
json_data = json_data + ']'
final_json = json.dumps({'fetchStatus': 1, 'error_message': "None", "data": json_data})
return HttpResponse(final_json)
# if ProcessUtilities.decideServer() == ProcessUtilities.OLS:
# confPath = os.path.join('/usr/local/lsws/conf/modsec/owasp-modsecurity-crs-3.0-master/owasp-master.conf')
#
# command = "sudo cat " + confPath
# httpdConfig = ProcessUtilities.outputExecutioner(command).splitlines()
#
# json_data = "["
# checker = 0
# counter = 0
#
# for items in httpdConfig:
#
# if items.find('modsec/' + packName) > -1:
# counter = counter + 1
# if items[0] == '#':
# status = False
# else:
# status = True
#
# fileName = items.lstrip('#')
# fileName = fileName.split('/')[-1]
#
# dic = {
# 'id': counter,
# 'fileName': fileName,
# 'packName': packName,
# 'status': status,
#
# }
#
# if checker == 0:
# json_data = json_data + json.dumps(dic)
# checker = 1
# else:
# json_data = json_data + ',' + json.dumps(dic)
#
# json_data = json_data + ']'
# final_json = json.dumps({'fetchStatus': 1, 'error_message': "None", "data": json_data})
# return HttpResponse(final_json)
# else:
#
# command = 'cat /usr/local/lsws/conf/modsec/owasp-modsecurity-crs-3.0-master/owasp-master.conf'
# files = ProcessUtilities.outputExecutioner(command).splitlines()
#
# json_data = "["
#
# counter = 0
# checker = 0
# for fileName in files:
#
# if fileName == 'categories.conf':
# continue
#
# if fileName.endswith('bak'):
# status = 0
# fileName = fileName.rstrip('.bak')
# elif fileName.endswith('conf'):
# status = 1
# else:
# continue
#
# dic = {
# 'id': counter,
# 'fileName': fileName,
# 'packName': packName,
# 'status': status,
#
# }
#
# counter = counter + 1
#
# if checker == 0:
# json_data = json_data + json.dumps(dic)
# checker = 1
# else:
# json_data = json_data + ',' + json.dumps(dic)
#
# json_data = json_data + ']'
# final_json = json.dumps({'fetchStatus': 1, 'error_message': "None", "data": json_data})
# return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'fetchStatus': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def enableDisableRuleFile(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('saveStatus', 0)
packName = data['packName']
fileName = data['fileName']
currentStatus = data['status']
if currentStatus == True:
functionName = 'disableRuleFile'
else:
functionName = 'enableRuleFile'
execPath = "/usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/modSec.py"
execPath = execPath + " " + functionName + ' --packName ' + packName + ' --fileName "%s"' % (fileName)
output = ProcessUtilities.outputExecutioner(execPath)
if output.find("1,None") > -1:
data_ret = {'saveStatus': 1, 'error_message': "None"}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
else:
data_ret = {'saveStatus': 0, 'error_message': output}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
except BaseException as msg:
data_ret = {'saveStatus': 0, 'error_message': str(msg)}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
def csf(self):
csfInstalled = 1
try:
command = 'csf -h'
output = ProcessUtilities.outputExecutioner(command)
if output.find("command not found") > -1:
csfInstalled = 0
except subprocess.CalledProcessError:
csfInstalled = 0
proc = httpProc(self.request, 'firewall/csf.html',
{'csfInstalled': csfInstalled}, 'admin')
return proc.render()
def installCSF(self):
try:
userID = self.request.session['userID']
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('installStatus', 0)
execPath = "sudo /usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/csf.py"
execPath = execPath + " installCSF"
ProcessUtilities.popenExecutioner(execPath)
time.sleep(2)
data_ret = {"installStatus": 1}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
except BaseException as msg:
final_dic = {'installStatus': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def installStatusCSF(self):
try:
userID = self.request.session['userID']
installStatus = ProcessUtilities.outputExecutioner("sudo cat " + CSF.installLogPath)
if installStatus.find("[200]")>-1:
command = 'sudo rm -f ' + CSF.installLogPath
ProcessUtilities.executioner(command)
final_json = json.dumps({
'error_message': "None",
'requestStatus': installStatus,
'abort':1,
'installed': 1,
})
return HttpResponse(final_json)
elif installStatus.find("[404]") > -1:
command = 'sudo rm -f ' + CSF.installLogPath
ProcessUtilities.executioner(command)
final_json = json.dumps({
'abort':1,
'installed':0,
'error_message': "None",
'requestStatus': installStatus,
})
return HttpResponse(final_json)
else:
final_json = json.dumps({
'abort':0,
'error_message': "None",
'requestStatus': installStatus,
})
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'abort':1, 'installed':0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def removeCSF(self):
try:
userID = self.request.session['userID']
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('installStatus', 0)
execPath = "sudo /usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/csf.py"
execPath = execPath + " removeCSF"
ProcessUtilities.popenExecutioner(execPath)
time.sleep(2)
data_ret = {"installStatus": 1}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
except BaseException as msg:
final_dic = {'installStatus': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def fetchCSFSettings(self):
try:
userID = self.request.session['userID']
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('fetchStatus', 0)
currentSettings = CSF.fetchCSFSettings()
data_ret = {"fetchStatus": 1, 'testingMode' : currentSettings['TESTING'],
'tcpIN' : currentSettings['tcpIN'],
'tcpOUT': currentSettings['tcpOUT'],
'udpIN': currentSettings['udpIN'],
'udpOUT': currentSettings['udpOUT'],
'firewallStatus': currentSettings['firewallStatus']
}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
except BaseException as msg:
final_dic = {'fetchStatus': 0, 'error_message': 'CSF is not installed.'}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def changeStatus(self):
try:
userID = self.request.session['userID']
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson()
data = json.loads(self.request.body)
controller = data['controller']
status = data['status']
execPath = "sudo /usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/csf.py"
execPath = execPath + " changeStatus --controller " + controller + " --status " + status
output = ProcessUtilities.outputExecutioner(execPath)
if output.find("1,None") > -1:
data_ret = {"status": 1}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
else:
data_ret = {'status': 0, 'error_message': output}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
except BaseException as msg:
final_dic = {'status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def modifyPorts(self, data = None):
try:
userID = self.request.session['userID']
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson()
protocol = data['protocol']
ports = data['ports']
portsPath = '/home/cyberpanel/' + str(randint(1000, 9999))
if os.path.exists(portsPath):
os.remove(portsPath)
writeToFile = open(portsPath, 'w')
writeToFile.write(ports)
writeToFile.close()
command = 'chmod 600 %s' % (portsPath)
ProcessUtilities.executioner(command)
execPath = "sudo /usr/local/CyberCP/bin/python " + virtualHostUtilities.cyberPanel + "/plogical/csf.py"
execPath = execPath + " modifyPorts --protocol " + protocol + " --ports " + portsPath
output = ProcessUtilities.outputExecutioner(execPath)
if output.find("1,None") > -1:
data_ret = {"status": 1}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
else:
data_ret = {'status': 0, 'error_message': output}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
except BaseException as msg:
final_dic = {'status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def modifyIPs(self):
try:
userID = self.request.session['userID']
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson()
data = json.loads(self.request.body)
mode = data['mode']
ipAddress = data['ipAddress']
if mode == 'allowIP':
CSF.allowIP(ipAddress)
elif mode == 'blockIP':
CSF.blockIP(ipAddress)
data_ret = {"status": 1}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
except BaseException as msg:
final_dic = {'status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def imunify(self):
ipFile = "/etc/cyberpanel/machineIP"
f = open(ipFile)
ipData = f.read()
ipAddress = ipData.split('\n', 1)[0]
fullAddress = '%s:%s' % (ipAddress, ProcessUtilities.fetchCurrentPort())
data = {}
data['ipAddress'] = fullAddress
data['CL'] = 1
if os.path.exists(FirewallManager.imunifyPath):
data['imunify'] = 1
else:
data['imunify'] = 0
if data['CL'] == 0:
proc = httpProc(self.request, 'firewall/notAvailable.html',
data, 'admin')
return proc.render()
elif data['imunify'] == 0:
proc = httpProc(self.request, 'firewall/notAvailable.html',
data, 'admin')
return proc.render()
else:
proc = httpProc(self.request, 'firewall/imunify.html',
data, 'admin')
return proc.render()
def submitinstallImunify(self):
try:
userID = self.request.session['userID']
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
logging.CyberCPLogFileWriter.statusWriter(ServerStatusUtil.lswsInstallStatusPath,
'Not authorized to install container packages. [404].',
1)
return 0
data = json.loads(self.request.body)
execPath = "/usr/local/CyberCP/bin/python /usr/local/CyberCP/CLManager/CageFS.py"
execPath = execPath + " --function submitinstallImunify --key %s" % (data['key'])
ProcessUtilities.popenExecutioner(execPath)
data_ret = {'status': 1, 'error_message': 'None'}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
except BaseException as msg:
logging.CyberCPLogFileWriter.statusWriter(ServerStatusUtil.lswsInstallStatusPath, str(msg) + ' [404].', 1)
def imunifyAV(self):
ipFile = "/etc/cyberpanel/machineIP"
f = open(ipFile)
ipData = f.read()
ipAddress = ipData.split('\n', 1)[0]
fullAddress = '%s:%s' % (ipAddress, ProcessUtilities.fetchCurrentPort())
data = {}
data['ipAddress'] = fullAddress
if os.path.exists(FirewallManager.imunifyAVPath):
data['imunify'] = 1
else:
data['imunify'] = 0
if data['imunify'] == 0:
proc = httpProc(self.request, 'firewall/notAvailableAV.html',
data, 'admin')
return proc.render()
else:
proc = httpProc(self.request, 'firewall/imunifyAV.html',
data, 'admin')
return proc.render()
def submitinstallImunifyAV(self):
try:
userID = self.request.session['userID']
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
logging.CyberCPLogFileWriter.statusWriter(ServerStatusUtil.lswsInstallStatusPath,
'Not authorized to install container packages. [404].',
1)
return 0
execPath = "/usr/local/CyberCP/bin/python /usr/local/CyberCP/CLManager/CageFS.py"
execPath = execPath + " --function submitinstallImunifyAV"
ProcessUtilities.popenExecutioner(execPath)
data_ret = {'status': 1, 'error_message': 'None'}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
except BaseException as msg:
logging.CyberCPLogFileWriter.statusWriter(ServerStatusUtil.lswsInstallStatusPath, str(msg) + ' [404].', 1)
def litespeed_ent_conf(self, request = None, userID = None):
proc = httpProc(request, 'firewall/litespeed_ent_conf.html',
None, 'admin')
return proc.render()
def fetchlitespeed_Conf(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('modSecInstalled', 0)
file_path = "/usr/local/lsws/conf/pre_main_global.conf"
if not os.path.exists(file_path):
command = "touch /usr/local/lsws/conf/pre_main_global.conf"
ProcessUtilities.executioner(command)
command = f'cat {file_path}'
currentModSecRules = ProcessUtilities.outputExecutioner(command)
final_dic = {'status': 1,
'currentLitespeed_conf': currentModSecRules}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
else:
command = f'cat {file_path}'
currentModSecRules = ProcessUtilities.outputExecutioner(command)
final_dic = {'status': 1,
'currentLitespeed_conf': currentModSecRules}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def saveLitespeed_conf(self, userID = None, data = None):
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('modSecInstalled', 0)
file_path = "/usr/local/lsws/conf/pre_main_global.conf"
command = f'rm -f {file_path}'
ProcessUtilities.executioner(command)
currentLitespeed_conf = data['modSecRules']
tempRulesPath = '/home/cyberpanel/pre_main_global.conf'
WriteToFile = open(tempRulesPath, 'w')
WriteToFile.write(currentLitespeed_conf)
WriteToFile.close()
command = f'mv {tempRulesPath} {file_path}'
ProcessUtilities.executioner(command)
command = f'chmod 644 {file_path} && chown lsadm:lsadm {file_path}'
ProcessUtilities.executioner(command, None, True)
command = f'cat {file_path}'
currentModSecRules = ProcessUtilities.outputExecutioner(command)
final_dic = {'status': 1,
'currentLitespeed_conf': currentModSecRules}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def exportFirewallRules(self, userID = None):
"""
Export all custom firewall rules to a JSON file, excluding default CyberPanel rules
"""
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('exportStatus', 0)
# Get all firewall rules
rules = FirewallRules.objects.all()
# Default CyberPanel rules to exclude
default_rules = ['CyberPanel Admin', 'SSHCustom']
# Filter out default rules
custom_rules = []
for rule in rules:
if rule.name not in default_rules:
custom_rules.append({
'name': rule.name,
'proto': rule.proto,
'port': rule.port,
'ipAddress': rule.ipAddress
})
# Create export data with metadata
export_data = {
'version': '1.0',
'exported_at': time.strftime('%Y-%m-%d %H:%M:%S'),
'total_rules': len(custom_rules),
'rules': custom_rules
}
# Create JSON response with file download
json_content = json.dumps(export_data, indent=2)
logging.CyberCPLogFileWriter.writeToFile(f"Firewall rules exported successfully. Total rules: {len(custom_rules)}")
# Return file as download
response = HttpResponse(json_content, content_type='application/json')
response['Content-Disposition'] = f'attachment; filename="firewall_rules_export_{int(time.time())}.json"'
return response
except BaseException as msg:
final_dic = {'exportStatus': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def importFirewallRules(self, userID = None, data = None):
"""
Import firewall rules from a JSON file
"""
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('importStatus', 0)
# Handle file upload
if hasattr(self.request, 'FILES') and 'import_file' in self.request.FILES:
import_file = self.request.FILES['import_file']
# Read file content
import_data = json.loads(import_file.read().decode('utf-8'))
else:
# Fallback to file path method
import_file_path = data.get('import_file_path', '')
if not import_file_path or not os.path.exists(import_file_path):
final_dic = {'importStatus': 0, 'error_message': 'Import file not found or invalid path'}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
# Read and parse the import file
with open(import_file_path, 'r') as f:
import_data = json.load(f)
# Validate the import data structure
if 'rules' not in import_data:
final_dic = {'importStatus': 0, 'error_message': 'Invalid import file format. Missing rules array.'}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
imported_count = 0
skipped_count = 0
error_count = 0
errors = []
# Default CyberPanel rules to exclude from import
default_rules = ['CyberPanel Admin', 'SSHCustom']
for rule_data in import_data['rules']:
try:
# Skip default rules
if rule_data.get('name', '') in default_rules:
skipped_count += 1
continue
# Check if rule already exists
existing_rule = FirewallRules.objects.filter(
name=rule_data['name'],
proto=rule_data['proto'],
port=rule_data['port'],
ipAddress=rule_data['ipAddress']
).first()
if existing_rule:
skipped_count += 1
continue
# Add the rule to the system firewall
FirewallUtilities.addRule(
rule_data['proto'],
rule_data['port'],
rule_data['ipAddress']
)
# Add the rule to the database
new_rule = FirewallRules(
name=rule_data['name'],
proto=rule_data['proto'],
port=rule_data['port'],
ipAddress=rule_data['ipAddress']
)
new_rule.save()
imported_count += 1
except Exception as e:
error_count += 1
errors.append(f"Rule '{rule_data.get('name', 'Unknown')}': {str(e)}")
logging.CyberCPLogFileWriter.writeToFile(f"Error importing rule {rule_data.get('name', 'Unknown')}: {str(e)}")
logging.CyberCPLogFileWriter.writeToFile(f"Firewall rules import completed. Imported: {imported_count}, Skipped: {skipped_count}, Errors: {error_count}")
final_dic = {
'importStatus': 1,
'error_message': "None",
'imported_count': imported_count,
'skipped_count': skipped_count,
'error_count': error_count,
'errors': errors
}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'importStatus': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)