Implement Banned IPs management system: Add functionality to list, add, remove, and delete banned IPs. Update UI components for managing banned IPs, including a new tab in the firewall section and enhanced user notifications. Refactor existing code for better organization and maintainability.

This commit is contained in:
Master3395
2025-09-20 18:52:07 +02:00
parent 1823978d95
commit 7512a486e0
11 changed files with 1388 additions and 21 deletions

View File

@@ -1755,6 +1755,267 @@ class FirewallManager:
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def getBannedIPs(self, userID=None):
"""
Get list of banned IP addresses
"""
try:
admin = Administrator.objects.get(pk=userID)
if admin.acl.adminStatus != 1:
return ACLManager.loadError()
# For now, we'll use a simple file-based storage
# In production, you might want to use a database
banned_ips_file = '/etc/cyberpanel/banned_ips.json'
banned_ips = []
if os.path.exists(banned_ips_file):
try:
with open(banned_ips_file, 'r') as f:
banned_ips = json.load(f)
except:
banned_ips = []
# Filter out expired bans
current_time = time.time()
active_banned_ips = []
for banned_ip in banned_ips:
if banned_ip.get('expires') == 'Never' or banned_ip.get('expires', 0) > current_time:
banned_ip['active'] = True
if banned_ip.get('expires') != 'Never':
banned_ip['expires'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(banned_ip['expires']))
else:
banned_ip['expires'] = 'Never'
banned_ip['banned_on'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(banned_ip.get('banned_on', current_time)))
else:
banned_ip['active'] = False
banned_ip['expires'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(banned_ip.get('expires', current_time)))
banned_ip['banned_on'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(banned_ip.get('banned_on', current_time)))
active_banned_ips.append(banned_ip)
final_dic = {'status': 1, 'bannedIPs': active_banned_ips}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def addBannedIP(self, userID=None, data=None):
"""
Add a banned IP address
"""
try:
admin = Administrator.objects.get(pk=userID)
if admin.acl.adminStatus != 1:
return ACLManager.loadError()
ip = data.get('ip', '').strip()
reason = data.get('reason', '').strip()
duration = data.get('duration', '24h')
if not ip or not reason:
final_dic = {'status': 0, 'error_message': 'IP address and reason are required'}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
# Validate IP address format
import ipaddress
try:
ipaddress.ip_address(ip.split('/')[0]) # Handle CIDR notation
except ValueError:
final_dic = {'status': 0, 'error_message': 'Invalid IP address format'}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
# Calculate expiration time
current_time = time.time()
if duration == 'permanent':
expires = 'Never'
else:
duration_map = {
'1h': 3600,
'24h': 86400,
'7d': 604800,
'30d': 2592000
}
duration_seconds = duration_map.get(duration, 86400)
expires = current_time + duration_seconds
# Load existing banned IPs
banned_ips_file = '/etc/cyberpanel/banned_ips.json'
banned_ips = []
if os.path.exists(banned_ips_file):
try:
with open(banned_ips_file, 'r') as f:
banned_ips = json.load(f)
except:
banned_ips = []
# Check if IP is already banned
for banned_ip in banned_ips:
if banned_ip.get('ip') == ip and banned_ip.get('active', True):
final_dic = {'status': 0, 'error_message': f'IP address {ip} is already banned'}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
# Add new banned IP
new_banned_ip = {
'id': int(time.time()),
'ip': ip,
'reason': reason,
'duration': duration,
'banned_on': current_time,
'expires': expires,
'active': True
}
banned_ips.append(new_banned_ip)
# Ensure directory exists
os.makedirs(os.path.dirname(banned_ips_file), exist_ok=True)
# Save to file
with open(banned_ips_file, 'w') as f:
json.dump(banned_ips, f, indent=2)
# Apply firewall rule to block the IP
try:
# Add iptables rule to block the IP
if '/' in ip:
# CIDR notation
subprocess.run(['iptables', '-A', 'INPUT', '-s', ip, '-j', 'DROP'], check=True)
else:
# Single IP
subprocess.run(['iptables', '-A', 'INPUT', '-s', ip, '-j', 'DROP'], check=True)
logging.CyberCPLogFileWriter.writeToFile(f'Banned IP {ip} with reason: {reason}')
except subprocess.CalledProcessError as e:
logging.CyberCPLogFileWriter.writeToFile(f'Failed to add iptables rule for {ip}: {str(e)}')
final_dic = {'status': 1, 'message': f'IP address {ip} has been banned successfully'}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def removeBannedIP(self, userID=None, data=None):
"""
Remove/unban an IP address
"""
try:
admin = Administrator.objects.get(pk=userID)
if admin.acl.adminStatus != 1:
return ACLManager.loadError()
banned_ip_id = data.get('id')
# Load existing banned IPs
banned_ips_file = '/etc/cyberpanel/banned_ips.json'
banned_ips = []
if os.path.exists(banned_ips_file):
try:
with open(banned_ips_file, 'r') as f:
banned_ips = json.load(f)
except:
banned_ips = []
# Find and update the banned IP
ip_to_unban = None
for banned_ip in banned_ips:
if banned_ip.get('id') == banned_ip_id:
banned_ip['active'] = False
banned_ip['unbanned_on'] = time.time()
ip_to_unban = banned_ip['ip']
break
if not ip_to_unban:
final_dic = {'status': 0, 'error_message': 'Banned IP not found'}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
# Save updated banned IPs
with open(banned_ips_file, 'w') as f:
json.dump(banned_ips, f, indent=2)
# Remove iptables rule
try:
# Remove iptables rule to unblock the IP
if '/' in ip_to_unban:
# CIDR notation
subprocess.run(['iptables', '-D', 'INPUT', '-s', ip_to_unban, '-j', 'DROP'], check=False)
else:
# Single IP
subprocess.run(['iptables', '-D', 'INPUT', '-s', ip_to_unban, '-j', 'DROP'], check=False)
logging.CyberCPLogFileWriter.writeToFile(f'Unbanned IP {ip_to_unban}')
except subprocess.CalledProcessError as e:
logging.CyberCPLogFileWriter.writeToFile(f'Failed to remove iptables rule for {ip_to_unban}: {str(e)}')
final_dic = {'status': 1, 'message': f'IP address {ip_to_unban} has been unbanned successfully'}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
def deleteBannedIP(self, userID=None, data=None):
"""
Permanently delete a banned IP record
"""
try:
admin = Administrator.objects.get(pk=userID)
if admin.acl.adminStatus != 1:
return ACLManager.loadError()
banned_ip_id = data.get('id')
# Load existing banned IPs
banned_ips_file = '/etc/cyberpanel/banned_ips.json'
banned_ips = []
if os.path.exists(banned_ips_file):
try:
with open(banned_ips_file, 'r') as f:
banned_ips = json.load(f)
except:
banned_ips = []
# Find and remove the banned IP
ip_to_delete = None
updated_banned_ips = []
for banned_ip in banned_ips:
if banned_ip.get('id') == banned_ip_id:
ip_to_delete = banned_ip['ip']
else:
updated_banned_ips.append(banned_ip)
if not ip_to_delete:
final_dic = {'status': 0, 'error_message': 'Banned IP record not found'}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
# Save updated banned IPs
with open(banned_ips_file, 'w') as f:
json.dump(updated_banned_ips, f, indent=2)
logging.CyberCPLogFileWriter.writeToFile(f'Deleted banned IP record for {ip_to_delete}')
final_dic = {'status': 1, 'message': f'Banned IP record for {ip_to_delete} has been deleted successfully'}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)
except BaseException as msg:
final_dic = {'status': 0, 'error_message': str(msg)}
final_json = json.dumps(final_dic)
return HttpResponse(final_json)