mirror of
https://github.com/usmannasir/cyberpanel.git
synced 2025-10-26 07:46:35 +01:00
323 lines
12 KiB
Python
323 lines
12 KiB
Python
from django.db import models
|
|
from loginSystem.models import Administrator
|
|
import json
|
|
|
|
# Import the status update model
|
|
from .status_models import ScanStatusUpdate
|
|
|
|
|
|
class AIScannerSettings(models.Model):
|
|
"""Store AI scanner configuration and API keys for administrators"""
|
|
admin = models.OneToOneField(Administrator, on_delete=models.CASCADE, related_name='ai_scanner_settings')
|
|
api_key = models.CharField(max_length=255, blank=True, null=True)
|
|
balance = models.DecimalField(max_digits=10, decimal_places=4, default=0.0000)
|
|
is_payment_configured = models.BooleanField(default=False)
|
|
created_at = models.DateTimeField(auto_now_add=True)
|
|
updated_at = models.DateTimeField(auto_now=True)
|
|
|
|
class Meta:
|
|
db_table = 'ai_scanner_settings'
|
|
|
|
def __str__(self):
|
|
return f"AI Scanner Settings for {self.admin.userName}"
|
|
|
|
|
|
class ScanHistory(models.Model):
|
|
"""Store scan history and results"""
|
|
SCAN_STATUS_CHOICES = [
|
|
('pending', 'Pending'),
|
|
('running', 'Running'),
|
|
('completed', 'Completed'),
|
|
('failed', 'Failed'),
|
|
('cancelled', 'Cancelled'),
|
|
]
|
|
|
|
SCAN_TYPE_CHOICES = [
|
|
('full', 'Full Scan'),
|
|
('quick', 'Quick Scan'),
|
|
('custom', 'Custom Scan'),
|
|
]
|
|
|
|
admin = models.ForeignKey(Administrator, on_delete=models.CASCADE, related_name='scan_history')
|
|
scan_id = models.CharField(max_length=100, unique=True)
|
|
domain = models.CharField(max_length=255)
|
|
scan_type = models.CharField(max_length=20, choices=SCAN_TYPE_CHOICES, default='full')
|
|
status = models.CharField(max_length=20, choices=SCAN_STATUS_CHOICES, default='pending')
|
|
cost_usd = models.DecimalField(max_digits=10, decimal_places=6, null=True, blank=True)
|
|
files_scanned = models.IntegerField(default=0)
|
|
issues_found = models.IntegerField(default=0)
|
|
findings_json = models.TextField(blank=True, null=True) # Store JSON findings
|
|
summary_json = models.TextField(blank=True, null=True) # Store JSON summary
|
|
error_message = models.TextField(blank=True, null=True)
|
|
started_at = models.DateTimeField(auto_now_add=True)
|
|
completed_at = models.DateTimeField(null=True, blank=True)
|
|
|
|
class Meta:
|
|
db_table = 'ai_scanner_history'
|
|
ordering = ['-started_at']
|
|
|
|
def __str__(self):
|
|
return f"Scan {self.scan_id} - {self.domain} ({self.status})"
|
|
|
|
@property
|
|
def findings(self):
|
|
"""Parse findings JSON"""
|
|
if self.findings_json:
|
|
try:
|
|
return json.loads(self.findings_json)
|
|
except json.JSONDecodeError:
|
|
return []
|
|
return []
|
|
|
|
@property
|
|
def summary(self):
|
|
"""Parse summary JSON"""
|
|
if self.summary_json:
|
|
try:
|
|
return json.loads(self.summary_json)
|
|
except json.JSONDecodeError:
|
|
return {}
|
|
return {}
|
|
|
|
def set_findings(self, findings_list):
|
|
"""Set findings from list/dict"""
|
|
self.findings_json = json.dumps(findings_list)
|
|
|
|
def set_summary(self, summary_dict):
|
|
"""Set summary from dict"""
|
|
self.summary_json = json.dumps(summary_dict)
|
|
|
|
|
|
class FileAccessToken(models.Model):
|
|
"""Temporary tokens for file access during scans"""
|
|
token = models.CharField(max_length=100, unique=True)
|
|
scan_history = models.ForeignKey(ScanHistory, on_delete=models.CASCADE, related_name='access_tokens')
|
|
domain = models.CharField(max_length=255)
|
|
wp_path = models.CharField(max_length=500)
|
|
expires_at = models.DateTimeField()
|
|
created_at = models.DateTimeField(auto_now_add=True)
|
|
is_active = models.BooleanField(default=True)
|
|
|
|
class Meta:
|
|
db_table = 'ai_scanner_file_tokens'
|
|
|
|
def __str__(self):
|
|
return f"Access token {self.token} for {self.domain}"
|
|
|
|
def is_expired(self):
|
|
from django.utils import timezone
|
|
return timezone.now() > self.expires_at
|
|
|
|
|
|
class ScheduledScan(models.Model):
|
|
"""Store scheduled scan configurations"""
|
|
FREQUENCY_CHOICES = [
|
|
('daily', 'Daily'),
|
|
('weekly', 'Weekly'),
|
|
('monthly', 'Monthly'),
|
|
('quarterly', 'Quarterly'),
|
|
]
|
|
|
|
SCAN_TYPE_CHOICES = [
|
|
('full', 'Full Scan'),
|
|
('quick', 'Quick Scan'),
|
|
('custom', 'Custom Scan'),
|
|
]
|
|
|
|
STATUS_CHOICES = [
|
|
('active', 'Active'),
|
|
('paused', 'Paused'),
|
|
('disabled', 'Disabled'),
|
|
]
|
|
|
|
admin = models.ForeignKey(Administrator, on_delete=models.CASCADE, related_name='scheduled_scans')
|
|
name = models.CharField(max_length=200, help_text="Name for this scheduled scan")
|
|
domains = models.TextField(help_text="JSON array of domains to scan")
|
|
frequency = models.CharField(max_length=20, choices=FREQUENCY_CHOICES, default='weekly')
|
|
scan_type = models.CharField(max_length=20, choices=SCAN_TYPE_CHOICES, default='full')
|
|
time_of_day = models.TimeField(help_text="Time of day to run the scan (UTC)")
|
|
day_of_week = models.IntegerField(null=True, blank=True, help_text="Day of week for weekly scans (0=Monday, 6=Sunday)")
|
|
day_of_month = models.IntegerField(null=True, blank=True, help_text="Day of month for monthly scans (1-31)")
|
|
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='active')
|
|
last_run = models.DateTimeField(null=True, blank=True)
|
|
next_run = models.DateTimeField(null=True, blank=True)
|
|
created_at = models.DateTimeField(auto_now_add=True)
|
|
updated_at = models.DateTimeField(auto_now=True)
|
|
|
|
# Notification settings
|
|
email_notifications = models.BooleanField(default=True)
|
|
notification_emails = models.TextField(blank=True, help_text="JSON array of email addresses")
|
|
notify_on_threats = models.BooleanField(default=True)
|
|
notify_on_completion = models.BooleanField(default=False)
|
|
notify_on_failure = models.BooleanField(default=True)
|
|
|
|
class Meta:
|
|
db_table = 'ai_scanner_scheduled_scans'
|
|
ordering = ['-created_at']
|
|
|
|
def __str__(self):
|
|
return f"Scheduled Scan: {self.name} ({self.frequency})"
|
|
|
|
@property
|
|
def domain_list(self):
|
|
"""Parse domains JSON"""
|
|
if self.domains:
|
|
try:
|
|
return json.loads(self.domains)
|
|
except json.JSONDecodeError:
|
|
return []
|
|
return []
|
|
|
|
@property
|
|
def notification_email_list(self):
|
|
"""Parse notification emails JSON"""
|
|
if self.notification_emails:
|
|
try:
|
|
return json.loads(self.notification_emails)
|
|
except json.JSONDecodeError:
|
|
return []
|
|
return []
|
|
|
|
def set_domains(self, domain_list):
|
|
"""Set domains from list"""
|
|
self.domains = json.dumps(domain_list)
|
|
|
|
def set_notification_emails(self, email_list):
|
|
"""Set notification emails from list"""
|
|
self.notification_emails = json.dumps(email_list)
|
|
|
|
def calculate_next_run(self):
|
|
"""Calculate next run time based on frequency"""
|
|
from django.utils import timezone
|
|
from datetime import datetime, timedelta
|
|
import calendar
|
|
|
|
now = timezone.now()
|
|
|
|
if self.frequency == 'daily':
|
|
# Daily: next run is tomorrow at specified time
|
|
next_run = now.replace(hour=self.time_of_day.hour, minute=self.time_of_day.minute, second=0, microsecond=0)
|
|
if next_run <= now:
|
|
next_run += timedelta(days=1)
|
|
|
|
elif self.frequency == 'weekly':
|
|
# Weekly: next run is on specified day of week at specified time
|
|
days_ahead = self.day_of_week - now.weekday()
|
|
if days_ahead <= 0: # Target day already happened this week
|
|
days_ahead += 7
|
|
next_run = now + timedelta(days=days_ahead)
|
|
next_run = next_run.replace(hour=self.time_of_day.hour, minute=self.time_of_day.minute, second=0, microsecond=0)
|
|
|
|
elif self.frequency == 'monthly':
|
|
# Monthly: next run is on specified day of month at specified time
|
|
year = now.year
|
|
month = now.month
|
|
day = min(self.day_of_month, calendar.monthrange(year, month)[1])
|
|
|
|
next_run = now.replace(day=day, hour=self.time_of_day.hour, minute=self.time_of_day.minute, second=0, microsecond=0)
|
|
|
|
if next_run <= now:
|
|
# Move to next month
|
|
if month == 12:
|
|
year += 1
|
|
month = 1
|
|
else:
|
|
month += 1
|
|
day = min(self.day_of_month, calendar.monthrange(year, month)[1])
|
|
next_run = next_run.replace(year=year, month=month, day=day)
|
|
|
|
elif self.frequency == 'quarterly':
|
|
# Quarterly: next run is 3 months from now
|
|
next_run = now.replace(hour=self.time_of_day.hour, minute=self.time_of_day.minute, second=0, microsecond=0)
|
|
month = now.month
|
|
year = now.year
|
|
|
|
# Add 3 months
|
|
month += 3
|
|
if month > 12:
|
|
year += 1
|
|
month -= 12
|
|
|
|
day = min(self.day_of_month or 1, calendar.monthrange(year, month)[1])
|
|
next_run = next_run.replace(year=year, month=month, day=day)
|
|
|
|
if next_run <= now:
|
|
# Add another 3 months
|
|
month += 3
|
|
if month > 12:
|
|
year += 1
|
|
month -= 12
|
|
day = min(self.day_of_month or 1, calendar.monthrange(year, month)[1])
|
|
next_run = next_run.replace(year=year, month=month, day=day)
|
|
|
|
else:
|
|
# Default to weekly
|
|
next_run = now + timedelta(weeks=1)
|
|
|
|
return next_run
|
|
|
|
def save(self, *args, **kwargs):
|
|
"""Override save to calculate next_run"""
|
|
if not self.next_run or self.status == 'active':
|
|
self.next_run = self.calculate_next_run()
|
|
super().save(*args, **kwargs)
|
|
|
|
|
|
class ScheduledScanExecution(models.Model):
|
|
"""Track individual executions of scheduled scans"""
|
|
STATUS_CHOICES = [
|
|
('pending', 'Pending'),
|
|
('running', 'Running'),
|
|
('completed', 'Completed'),
|
|
('failed', 'Failed'),
|
|
('cancelled', 'Cancelled'),
|
|
]
|
|
|
|
scheduled_scan = models.ForeignKey(ScheduledScan, on_delete=models.CASCADE, related_name='executions')
|
|
execution_time = models.DateTimeField(auto_now_add=True)
|
|
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='pending')
|
|
domains_scanned = models.TextField(blank=True, help_text="JSON array of domains that were scanned")
|
|
total_scans = models.IntegerField(default=0)
|
|
successful_scans = models.IntegerField(default=0)
|
|
failed_scans = models.IntegerField(default=0)
|
|
total_cost = models.DecimalField(max_digits=10, decimal_places=6, default=0.0)
|
|
scan_ids = models.TextField(blank=True, help_text="JSON array of scan IDs created")
|
|
error_message = models.TextField(blank=True, null=True)
|
|
started_at = models.DateTimeField(null=True, blank=True)
|
|
completed_at = models.DateTimeField(null=True, blank=True)
|
|
|
|
class Meta:
|
|
db_table = 'ai_scanner_scheduled_executions'
|
|
ordering = ['-execution_time']
|
|
|
|
def __str__(self):
|
|
return f"Execution of {self.scheduled_scan.name} at {self.execution_time}"
|
|
|
|
@property
|
|
def scanned_domains(self):
|
|
"""Parse domains scanned JSON"""
|
|
if self.domains_scanned:
|
|
try:
|
|
return json.loads(self.domains_scanned)
|
|
except json.JSONDecodeError:
|
|
return []
|
|
return []
|
|
|
|
@property
|
|
def scan_id_list(self):
|
|
"""Parse scan IDs JSON"""
|
|
if self.scan_ids:
|
|
try:
|
|
return json.loads(self.scan_ids)
|
|
except json.JSONDecodeError:
|
|
return []
|
|
return []
|
|
|
|
def set_scanned_domains(self, domain_list):
|
|
"""Set scanned domains from list"""
|
|
self.domains_scanned = json.dumps(domain_list)
|
|
|
|
def set_scan_ids(self, scan_id_list):
|
|
"""Set scan IDs from list"""
|
|
self.scan_ids = json.dumps(scan_id_list)
|