from django.core.management.base import BaseCommand from django.utils import timezone from datetime import datetime, timedelta import json import time class Command(BaseCommand): help = 'Run scheduled AI security scans' def add_arguments(self, parser): parser.add_argument( '--daemon', action='store_true', help='Run as daemon, checking for scheduled scans every minute', ) parser.add_argument( '--scan-id', type=int, help='Run a specific scheduled scan by ID', ) parser.add_argument( '--verbose', action='store_true', help='Show detailed information about all scheduled scans', ) parser.add_argument( '--force', action='store_true', help='Force run all active scheduled scans immediately, ignoring schedule', ) def handle(self, *args, **options): self.verbose = options.get('verbose', False) self.force = options.get('force', False) if options['daemon']: self.stdout.write('Starting scheduled scan daemon...') self.run_daemon() elif options['scan_id']: self.stdout.write(f'Running scheduled scan ID {options["scan_id"]}...') self.run_scheduled_scan_by_id(options['scan_id']) elif options['force']: self.stdout.write('Force running all active scheduled scans...') self.force_run_all_scans() else: self.stdout.write('Checking for scheduled scans to run...') self.check_and_run_scans() def run_daemon(self): """Run as daemon, checking for scans every minute""" while True: try: self.stdout.write(f'\n[{timezone.now().strftime("%Y-%m-%d %H:%M:%S UTC")}] Checking for scheduled scans...') self.check_and_run_scans() time.sleep(60) # Check every minute except KeyboardInterrupt: self.stdout.write('\nDaemon stopped by user') break except Exception as e: self.stderr.write(f'Error in daemon: {str(e)}') from plogical.CyberCPLogFileWriter import CyberCPLogFileWriter as logging logging.writeToFile(f'[Scheduled Scan Daemon] Error: {str(e)}') time.sleep(60) # Continue after error def force_run_all_scans(self): """Force run all active scheduled scans immediately""" from aiScanner.models import ScheduledScan from plogical.CyberCPLogFileWriter import CyberCPLogFileWriter as logging # Find all active scheduled scans active_scans = ScheduledScan.objects.filter(status='active') if active_scans.count() == 0: self.stdout.write('No active scheduled scans found') logging.writeToFile('[Scheduled Scan Force] No active scheduled scans found') return self.stdout.write(f'Found {active_scans.count()} active scheduled scans to force run') logging.writeToFile(f'[Scheduled Scan Force] Found {active_scans.count()} active scheduled scans to force run') for scan in active_scans: self.stdout.write(f'Force running scheduled scan: {scan.name} (ID: {scan.id})') logging.writeToFile(f'[Scheduled Scan Force] Force running scheduled scan: {scan.name} (ID: {scan.id})') self.execute_scheduled_scan(scan) def check_and_run_scans(self): """Check for scheduled scans that need to run""" from aiScanner.models import ScheduledScan from plogical.CyberCPLogFileWriter import CyberCPLogFileWriter as logging now = timezone.now() # Log all scheduled scans and their status all_scans = ScheduledScan.objects.all() self.stdout.write(f'Total scheduled scans: {all_scans.count()}') logging.writeToFile(f'[Scheduled Scan Check] Total scheduled scans: {all_scans.count()}') for scan in all_scans: if self.verbose: self.stdout.write(f'\n--- Scan Details: {scan.name} (ID: {scan.id}) ---') self.stdout.write(f' Owner: {scan.admin.userName}') self.stdout.write(f' Frequency: {scan.frequency}') self.stdout.write(f' Scan Type: {scan.scan_type}') self.stdout.write(f' Status: {scan.status}') self.stdout.write(f' Domains: {", ".join(scan.domain_list)}') self.stdout.write(f' Created: {scan.created_at.strftime("%Y-%m-%d %H:%M:%S UTC")}') if scan.last_run: self.stdout.write(f' Last Run: {scan.last_run.strftime("%Y-%m-%d %H:%M:%S UTC")}') else: self.stdout.write(f' Last Run: Never') if scan.status != 'active': reason = f'Scan "{scan.name}" (ID: {scan.id}) is not active (status: {scan.status})' self.stdout.write(f' ❌ {reason}') logging.writeToFile(f'[Scheduled Scan Check] {reason}') continue if scan.next_run is None: reason = f'Scan "{scan.name}" (ID: {scan.id}) has no next_run scheduled' self.stdout.write(f' ❌ {reason}') logging.writeToFile(f'[Scheduled Scan Check] {reason}') # Try to calculate next run if self.verbose: self.stdout.write(f' 🔧 Attempting to calculate next run time...') try: scan.next_run = scan.calculate_next_run() scan.save() self.stdout.write(f' ✅ Next run set to: {scan.next_run.strftime("%Y-%m-%d %H:%M:%S UTC")}') except Exception as e: self.stdout.write(f' ❌ Failed to calculate next run: {str(e)}') continue if scan.next_run > now: time_until_run = scan.next_run - now days = int(time_until_run.total_seconds() // 86400) hours = int((time_until_run.total_seconds() % 86400) // 3600) minutes = int((time_until_run.total_seconds() % 3600) // 60) time_str = "" if days > 0: time_str = f"{days}d {hours}h {minutes}m" else: time_str = f"{hours}h {minutes}m" reason = f'Scan "{scan.name}" (ID: {scan.id}) scheduled to run in {time_str} at {scan.next_run.strftime("%Y-%m-%d %H:%M:%S UTC")}' self.stdout.write(f' ⏰ {reason}') logging.writeToFile(f'[Scheduled Scan Check] {reason}') continue # Find scans that are due to run due_scans = ScheduledScan.objects.filter( status='active', next_run__lte=now ) if due_scans.count() == 0: self.stdout.write('No scheduled scans are due to run at this time') logging.writeToFile('[Scheduled Scan Check] No scheduled scans are due to run at this time') else: self.stdout.write(f'Found {due_scans.count()} scans due to run') logging.writeToFile(f'[Scheduled Scan Check] Found {due_scans.count()} scans due to run') for scan in due_scans: self.stdout.write(f'Running scheduled scan: {scan.name} (ID: {scan.id})') logging.writeToFile(f'[Scheduled Scan Check] Running scheduled scan: {scan.name} (ID: {scan.id})') self.execute_scheduled_scan(scan) def run_scheduled_scan_by_id(self, scan_id): """Run a specific scheduled scan by ID""" from aiScanner.models import ScheduledScan try: scan = ScheduledScan.objects.get(id=scan_id) self.stdout.write(f'Running scheduled scan: {scan.name}') self.execute_scheduled_scan(scan) except ScheduledScan.DoesNotExist: self.stderr.write(f'Scheduled scan with ID {scan_id} not found') def execute_scheduled_scan(self, scheduled_scan): """Execute a scheduled scan""" from aiScanner.models import ScheduledScanExecution, ScanHistory from aiScanner.aiScannerManager import AIScannerManager from loginSystem.models import Administrator from websiteFunctions.models import Websites from plogical.CyberCPLogFileWriter import CyberCPLogFileWriter as logging # Create execution record execution = ScheduledScanExecution.objects.create( scheduled_scan=scheduled_scan, status='running', started_at=timezone.now() ) try: # Update last run time scheduled_scan.last_run = timezone.now() scheduled_scan.next_run = scheduled_scan.calculate_next_run() scheduled_scan.save() # Get domains to scan domains_to_scan = [] admin = scheduled_scan.admin # Validate domains still exist and user has access for domain in scheduled_scan.domain_list: try: website = Websites.objects.get(domain=domain, admin=admin) domains_to_scan.append(domain) except Websites.DoesNotExist: logging.writeToFile(f'[Scheduled Scan] Domain {domain} no longer accessible for user {admin.userName}') continue if not domains_to_scan: execution.status = 'failed' execution.error_message = 'No accessible domains found for scanning' execution.completed_at = timezone.now() execution.save() self.stderr.write(f'No accessible domains for scheduled scan {scheduled_scan.name}') return execution.set_scanned_domains(domains_to_scan) execution.total_scans = len(domains_to_scan) execution.save() # Initialize scanner manager sm = AIScannerManager() scan_ids = [] successful_scans = 0 failed_scans = 0 total_cost = 0.0 # Execute scans for each domain for domain in domains_to_scan: try: self.stdout.write(f'Starting scan for domain: {domain}') # Create a fake request object for the scanner manager class FakeRequest: def __init__(self, admin_id, domain, scan_type): self.session = {'userID': admin_id} self.method = 'POST' self.POST = { 'domain': domain, 'scan_type': scan_type } # Create JSON body that startScan expects import json self.body = json.dumps({ 'domain': domain, 'scan_type': scan_type }).encode('utf-8') def get_host(self): # Get the hostname from CyberPanel settings try: from plogical.acl import ACLManager server_ip = ACLManager.fetchIP() return f"{server_ip}:8090" # Default CyberPanel port except: return "localhost:8090" # Fallback fake_request = FakeRequest(admin.pk, domain, scheduled_scan.scan_type) # Start the scan result = sm.startScan(fake_request, admin.pk) if hasattr(result, 'content'): # It's an HTTP response, parse the JSON import json response_data = json.loads(result.content.decode('utf-8')) else: # It's already a dict response_data = result if response_data.get('success'): scan_id = response_data.get('scan_id') if scan_id: scan_ids.append(scan_id) successful_scans += 1 # Get cost estimate if available if 'cost_estimate' in response_data: total_cost += float(response_data['cost_estimate']) logging.writeToFile(f'[Scheduled Scan] Successfully started scan {scan_id} for {domain}') else: failed_scans += 1 logging.writeToFile(f'[Scheduled Scan] Failed to get scan ID for {domain}') else: failed_scans += 1 error_msg = response_data.get('error', 'Unknown error') logging.writeToFile(f'[Scheduled Scan] Failed to start scan for {domain}: {error_msg}') except Exception as e: failed_scans += 1 error_msg = str(e) logging.writeToFile(f'[Scheduled Scan] Exception starting scan for {domain}: {error_msg}') # Small delay between scans to avoid overwhelming the system time.sleep(2) # Update execution record execution.successful_scans = successful_scans execution.failed_scans = failed_scans execution.total_cost = total_cost execution.set_scan_ids(scan_ids) execution.status = 'completed' if failed_scans == 0 else 'completed' # Always completed if we tried all execution.completed_at = timezone.now() execution.save() # Send notifications if configured if scheduled_scan.email_notifications: self.send_notifications(scheduled_scan, execution) self.stdout.write( f'Scheduled scan completed: {successful_scans} successful, {failed_scans} failed' ) except Exception as e: # Update execution record with error execution.status = 'failed' execution.error_message = str(e) execution.completed_at = timezone.now() execution.save() logging.writeToFile(f'[Scheduled Scan] Failed to execute scheduled scan {scheduled_scan.name}: {str(e)}') self.stderr.write(f'Failed to execute scheduled scan {scheduled_scan.name}: {str(e)}') # Send failure notification if scheduled_scan.email_notifications and scheduled_scan.notify_on_failure: self.send_failure_notification(scheduled_scan, str(e)) def send_notifications(self, scheduled_scan, execution): """Send email notifications for completed scan""" try: # Determine if we should send notification should_notify = False if execution.status == 'failed' and scheduled_scan.notify_on_failure: should_notify = True elif execution.status == 'completed': if scheduled_scan.notify_on_completion: should_notify = True elif scheduled_scan.notify_on_threats and execution.successful_scans > 0: # Check if any scans found threats # This would require checking the scan results, which might not be available immediately # For now, we'll just send completion notifications should_notify = scheduled_scan.notify_on_completion if should_notify: self.send_execution_notification(scheduled_scan, execution) except Exception as e: from plogical.CyberCPLogFileWriter import CyberCPLogFileWriter as logging logging.writeToFile(f'[Scheduled Scan] Failed to send notification: {str(e)}') def send_execution_notification(self, scheduled_scan, execution): """Send notification email for scan execution""" try: # Get notification emails notification_emails = scheduled_scan.notification_email_list if not notification_emails: # Use admin email as fallback notification_emails = [scheduled_scan.admin.email] if scheduled_scan.admin.email else [] if not notification_emails: return # Prepare email content subject = f'AI Scanner: Scheduled Scan "{scheduled_scan.name}" Completed' status_text = execution.status.title() if execution.status == 'completed': if execution.failed_scans == 0: status_text = 'Completed Successfully' else: status_text = f'Completed with {execution.failed_scans} failures' message = f""" Scheduled AI Security Scan Report Scan Name: {scheduled_scan.name} Status: {status_text} Execution Time: {execution.execution_time.strftime('%Y-%m-%d %H:%M:%S UTC')} Results: - Total Domains: {execution.total_scans} - Successful Scans: {execution.successful_scans} - Failed Scans: {execution.failed_scans} - Total Cost: ${execution.total_cost:.4f} Domains Scanned: {', '.join(execution.scanned_domains)} {f'Error Message: {execution.error_message}' if execution.error_message else ''} Scan IDs: {', '.join(execution.scan_id_list)} View detailed results in your CyberPanel AI Scanner dashboard. """ # Send email using CyberPanel's email system from plogical.mailUtilities import mailUtilities sender = 'noreply@cyberpanel.local' mailUtilities.SendEmail(sender, notification_emails, message) # Log notification sent from plogical.CyberCPLogFileWriter import CyberCPLogFileWriter as logging logging.writeToFile(f'[Scheduled Scan] Notification sent for {scheduled_scan.name} to {len(notification_emails)} recipients') except Exception as e: from plogical.CyberCPLogFileWriter import CyberCPLogFileWriter as logging logging.writeToFile(f'[Scheduled Scan] Failed to send notification email: {str(e)}') def send_failure_notification(self, scheduled_scan, error_message): """Send notification email for scan failure""" try: # Get notification emails notification_emails = scheduled_scan.notification_email_list if not notification_emails: # Use admin email as fallback notification_emails = [scheduled_scan.admin.email] if scheduled_scan.admin.email else [] if not notification_emails: return # Prepare email content subject = f'AI Scanner: Scheduled Scan "{scheduled_scan.name}" Failed' message = f""" Scheduled AI Security Scan Failure Scan Name: {scheduled_scan.name} Status: Failed Time: {timezone.now().strftime('%Y-%m-%d %H:%M:%S UTC')} Error: {error_message} Please check your CyberPanel AI Scanner configuration and try again. """ # Send email using CyberPanel's email system from plogical.mailUtilities import mailUtilities sender = 'noreply@cyberpanel.local' mailUtilities.SendEmail(sender, notification_emails, message) # Log notification sent from plogical.CyberCPLogFileWriter import CyberCPLogFileWriter as logging logging.writeToFile(f'[Scheduled Scan] Failure notification sent for {scheduled_scan.name}') except Exception as e: from plogical.CyberCPLogFileWriter import CyberCPLogFileWriter as logging logging.writeToFile(f'[Scheduled Scan] Failed to send failure notification email: {str(e)}')