Add firewall rule management features and enhance repository setup

- Implemented functionality to edit existing firewall rules, including validation and error handling.
- Added endpoints for exporting and importing firewall rules in JSON format, allowing users to manage rules more efficiently.
- Enhanced the user interface with modals for editing rules and buttons for exporting/importing rules.
- Updated the `cyberpanel.sh` script to support AlmaLinux 10 and improved LiteSpeed GPG key import with fallback options.
- Refactored repository setup to accommodate different OS versions, ensuring compatibility with CentOS and AlmaLinux.
This commit is contained in:
Master3395
2025-09-17 00:32:07 +02:00
parent 42428bfc6a
commit 694cb03c80
8 changed files with 977 additions and 33 deletions

View File

@@ -168,6 +168,79 @@ class FirewallManager:
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def editRule(self, userID = None, data = None):
"""
Edit an existing firewall rule
"""
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('edit_status', 0)
ruleID = data['id']
newRuleName = data['ruleName']
newRuleProtocol = data['ruleProtocol']
newRulePort = data['rulePort']
newRuleIP = data['ruleIP']
# Get the existing rule
try:
existingRule = FirewallRules.objects.get(id=ruleID)
except FirewallRules.DoesNotExist:
final_dic = {'status': 0, 'edit_status': 0, 'error_message': 'Rule not found'}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
# Store old values for system firewall update
oldProtocol = existingRule.proto
oldPort = existingRule.port
oldIP = existingRule.ipAddress
# Check if any values actually changed
if (existingRule.name == newRuleName and
existingRule.proto == newRuleProtocol and
existingRule.port == newRulePort and
existingRule.ipAddress == newRuleIP):
final_dic = {'status': 1, 'edit_status': 1, 'error_message': "No changes detected"}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
# Check if another rule with the same name already exists (excluding current rule)
if existingRule.name != newRuleName:
duplicateRule = FirewallRules.objects.filter(name=newRuleName).exclude(id=ruleID).first()
if duplicateRule:
final_dic = {'status': 0, 'edit_status': 0, 'error_message': f'A rule with name "{newRuleName}" already exists'}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
# Update the rule in the system firewall
# First remove the old rule
FirewallUtilities.deleteRule(oldProtocol, oldPort, oldIP)
# Then add the new rule
FirewallUtilities.addRule(newRuleProtocol, newRulePort, newRuleIP)
# Update the database record
existingRule.name = newRuleName
existingRule.proto = newRuleProtocol
existingRule.port = newRulePort
existingRule.ipAddress = newRuleIP
existingRule.save()
logging.CyberCPLogFileWriter.writeToFile(f"Firewall rule edited successfully. ID: {ruleID}, Name: {newRuleName}")
final_dic = {'status': 1, 'edit_status': 1, 'error_message': "None"}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'status': 0, 'edit_status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def reloadFirewall(self, userID = None, data = None):
try:
@@ -1755,6 +1828,164 @@ class FirewallManager:
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def exportFirewallRules(self, userID = None):
"""
Export all custom firewall rules to a JSON file, excluding default CyberPanel rules
"""
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('exportStatus', 0)
# Get all firewall rules
rules = FirewallRules.objects.all()
# Default CyberPanel rules to exclude
default_rules = ['CyberPanel Admin', 'SSHCustom']
# Filter out default rules
custom_rules = []
for rule in rules:
if rule.name not in default_rules:
custom_rules.append({
'name': rule.name,
'proto': rule.proto,
'port': rule.port,
'ipAddress': rule.ipAddress
})
# Create export data with metadata
export_data = {
'version': '1.0',
'exported_at': time.strftime('%Y-%m-%d %H:%M:%S'),
'total_rules': len(custom_rules),
'rules': custom_rules
}
# Create JSON response with file download
json_content = json.dumps(export_data, indent=2)
logging.CyberCPLogFileWriter.writeToFile(f"Firewall rules exported successfully. Total rules: {len(custom_rules)}")
# Return file as download
response = HttpResponse(json_content, content_type='application/json')
response['Content-Disposition'] = f'attachment; filename="firewall_rules_export_{int(time.time())}.json"'
return response
except BaseException as msg:
final_dic = {'exportStatus': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def importFirewallRules(self, userID = None, data = None):
"""
Import firewall rules from a JSON file
"""
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL['admin'] == 1:
pass
else:
return ACLManager.loadErrorJson('importStatus', 0)
# Handle file upload
if hasattr(self.request, 'FILES') and 'import_file' in self.request.FILES:
import_file = self.request.FILES['import_file']
# Read file content
import_data = json.loads(import_file.read().decode('utf-8'))
else:
# Fallback to file path method
import_file_path = data.get('import_file_path', '')
if not import_file_path or not os.path.exists(import_file_path):
final_dic = {'importStatus': 0, 'error_message': 'Import file not found or invalid path'}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
# Read and parse the import file
with open(import_file_path, 'r') as f:
import_data = json.load(f)
# Validate the import data structure
if 'rules' not in import_data:
final_dic = {'importStatus': 0, 'error_message': 'Invalid import file format. Missing rules array.'}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
imported_count = 0
skipped_count = 0
error_count = 0
errors = []
# Default CyberPanel rules to exclude from import
default_rules = ['CyberPanel Admin', 'SSHCustom']
for rule_data in import_data['rules']:
try:
# Skip default rules
if rule_data.get('name', '') in default_rules:
skipped_count += 1
continue
# Check if rule already exists
existing_rule = FirewallRules.objects.filter(
name=rule_data['name'],
proto=rule_data['proto'],
port=rule_data['port'],
ipAddress=rule_data['ipAddress']
).first()
if existing_rule:
skipped_count += 1
continue
# Add the rule to the system firewall
FirewallUtilities.addRule(
rule_data['proto'],
rule_data['port'],
rule_data['ipAddress']
)
# Add the rule to the database
new_rule = FirewallRules(
name=rule_data['name'],
proto=rule_data['proto'],
port=rule_data['port'],
ipAddress=rule_data['ipAddress']
)
new_rule.save()
imported_count += 1
except Exception as e:
error_count += 1
errors.append(f"Rule '{rule_data.get('name', 'Unknown')}': {str(e)}")
logging.CyberCPLogFileWriter.writeToFile(f"Error importing rule {rule_data.get('name', 'Unknown')}: {str(e)}")
logging.CyberCPLogFileWriter.writeToFile(f"Firewall rules import completed. Imported: {imported_count}, Skipped: {skipped_count}, Errors: {error_count}")
final_dic = {
'importStatus': 1,
'error_message': "None",
'imported_count': imported_count,
'skipped_count': skipped_count,
'error_count': error_count,
'errors': errors
}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'importStatus': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)