from django.shortcuts import render, redirect from django.http import JsonResponse from django.views.decorators.csrf import csrf_exempt from django.views.decorators.http import require_http_methods from loginSystem.views import loadLoginPage from .aiScannerManager import AIScannerManager import json import os def aiScannerHome(request): """Main AI Scanner page""" try: userID = request.session['userID'] sm = AIScannerManager() return sm.scannerHome(request, userID) except KeyError: return redirect(loadLoginPage) def setupPayment(request): """Setup payment method for AI scanner""" try: userID = request.session['userID'] sm = AIScannerManager() return sm.setupPayment(request, userID) except KeyError: return JsonResponse({'success': False, 'error': 'Not authenticated'}) def setupComplete(request): """Handle return from payment setup""" try: userID = request.session['userID'] sm = AIScannerManager() return sm.setupComplete(request, userID) except KeyError: return redirect(loadLoginPage) def startScan(request): """Start a new AI security scan""" try: userID = request.session['userID'] sm = AIScannerManager() return sm.startScan(request, userID) except KeyError: return JsonResponse({'success': False, 'error': 'Not authenticated'}) def refreshBalance(request): """Refresh account balance from API""" try: userID = request.session['userID'] sm = AIScannerManager() return sm.refreshBalance(request, userID) except KeyError: return JsonResponse({'success': False, 'error': 'Not authenticated'}) def addPaymentMethod(request): """Add a new payment method""" try: userID = request.session['userID'] sm = AIScannerManager() return sm.addPaymentMethod(request, userID) except KeyError: return JsonResponse({'success': False, 'error': 'Not authenticated'}) def paymentMethodComplete(request): """Handle return from adding payment method""" try: userID = request.session['userID'] sm = AIScannerManager() return sm.paymentMethodComplete(request, userID) except KeyError: return redirect(loadLoginPage) @csrf_exempt def scanCallback(request): """Handle scan results callback from AI Scanner API""" sm = AIScannerManager() return sm.scanCallback(request) def getScanHistory(request): """Get scan history for user""" try: userID = request.session['userID'] from loginSystem.models import Administrator from .models import ScanHistory from plogical.acl import ACLManager admin = Administrator.objects.get(pk=userID) currentACL = ACLManager.loadedACL(userID) # Get scan history with ACL respect if currentACL['admin'] == 1: # Admin can see all scans scans = ScanHistory.objects.all().order_by('-started_at')[:20] else: # Users can only see their own scans and their sub-users' scans user_admins = ACLManager.loadUserObjects(userID) scans = ScanHistory.objects.filter(admin__in=user_admins).order_by('-started_at')[:20] scan_data = [] for scan in scans: scan_data.append({ 'scan_id': scan.scan_id, 'domain': scan.domain, 'status': scan.status, 'scan_type': scan.scan_type, 'started_at': scan.started_at.strftime('%Y-%m-%d %H:%M:%S'), 'completed_at': scan.completed_at.strftime('%Y-%m-%d %H:%M:%S') if scan.completed_at else None, 'cost_usd': float(scan.cost_usd) if scan.cost_usd else 0, 'files_scanned': scan.files_scanned, 'issues_found': scan.issues_found, 'findings': scan.findings[:5] if scan.findings else [], # First 5 findings 'summary': scan.summary }) return JsonResponse({'success': True, 'scans': scan_data}) except KeyError: return JsonResponse({'success': False, 'error': 'Not authenticated'}) except Exception as e: return JsonResponse({'success': False, 'error': str(e)}) @require_http_methods(['GET']) def getScanDetails(request, scan_id): """Get detailed scan results""" try: userID = request.session['userID'] from loginSystem.models import Administrator from .models import ScanHistory from .status_models import ScanStatusUpdate from plogical.acl import ACLManager admin = Administrator.objects.get(pk=userID) currentACL = ACLManager.loadedACL(userID) # Get scan with ACL respect try: scan = ScanHistory.objects.get(scan_id=scan_id) # Check if user has access to this scan if currentACL['admin'] != 1: # Non-admin users can only see their own scans and their sub-users' scans user_admins = ACLManager.loadUserObjects(userID) if scan.admin not in user_admins: return JsonResponse({'success': False, 'error': 'Access denied to this scan'}) except ScanHistory.DoesNotExist: return JsonResponse({'success': False, 'error': 'Scan not found'}) # Get the status update for more detailed information try: status_update = ScanStatusUpdate.objects.get(scan_id=scan_id) # Use detailed information from status update if available files_scanned = status_update.files_scanned if status_update.files_scanned > 0 else scan.files_scanned files_discovered = status_update.files_discovered threats_found = status_update.threats_found critical_threats = status_update.critical_threats high_threats = status_update.high_threats except ScanStatusUpdate.DoesNotExist: # Fall back to basic information from scan history files_scanned = scan.files_scanned files_discovered = scan.files_scanned # Approximate threats_found = scan.issues_found critical_threats = 0 high_threats = 0 scan_data = { 'scan_id': scan.scan_id, 'domain': scan.domain, 'status': scan.status, 'scan_type': scan.scan_type, 'started_at': scan.started_at.strftime('%Y-%m-%d %H:%M:%S'), 'completed_at': scan.completed_at.strftime('%Y-%m-%d %H:%M:%S') if scan.completed_at else None, 'cost_usd': float(scan.cost_usd) if scan.cost_usd else 0, 'files_scanned': files_scanned, 'files_discovered': files_discovered, 'issues_found': scan.issues_found, 'threats_found': threats_found, 'critical_threats': critical_threats, 'high_threats': high_threats, 'findings': scan.findings, 'summary': scan.summary, 'error_message': scan.error_message } return JsonResponse({'success': True, 'scan': scan_data}) except KeyError: return JsonResponse({'success': False, 'error': 'Not authenticated'}) except ScanHistory.DoesNotExist: return JsonResponse({'success': False, 'error': 'Scan not found'}) except Exception as e: return JsonResponse({'success': False, 'error': str(e)}) @require_http_methods(['GET']) def getPlatformMonitorUrl(request, scan_id): """Get the platform monitor URL for a scan""" try: userID = request.session['userID'] from loginSystem.models import Administrator from .models import ScanHistory, AIScannerSettings from plogical.CyberCPLogFileWriter import CyberCPLogFileWriter as logging import requests # Get scan to verify ownership try: scan = ScanHistory.objects.get(scan_id=scan_id) admin = Administrator.objects.get(pk=userID) # Verify access from plogical.acl import ACLManager currentACL = ACLManager.loadedACL(userID) if currentACL['admin'] != 1: user_admins = ACLManager.loadUserObjects(userID) if scan.admin not in user_admins: return JsonResponse({'success': False, 'error': 'Access denied'}) except ScanHistory.DoesNotExist: return JsonResponse({'success': False, 'error': 'Scan not found'}) # Get API key - first try current user, then scan owner api_key = None # Try current user's API key first try: current_user_settings = admin.ai_scanner_settings if current_user_settings and current_user_settings.api_key: api_key = current_user_settings.api_key except AIScannerSettings.DoesNotExist: pass except Exception as e: pass # If current user doesn't have API key, try scan owner's if not api_key: try: scanner_settings = scan.admin.ai_scanner_settings if scanner_settings and scanner_settings.api_key: api_key = scanner_settings.api_key except AIScannerSettings.DoesNotExist: pass except Exception as e: pass # If still no API key, check if this might be a VPS free scan if not api_key: try: from plogical.acl import ACLManager from .aiScannerManager import AIScannerManager server_ip = ACLManager.fetchIP() sm = AIScannerManager() vps_info = sm.check_vps_free_scans(server_ip) if (vps_info.get('success') and vps_info.get('is_vps') and vps_info.get('free_scans_available', 0) > 0): vps_key_data = sm.get_or_create_vps_api_key(server_ip) if vps_key_data and vps_key_data.get('api_key'): api_key = vps_key_data.get('api_key') except Exception as e: pass # If still no API key, return error if not api_key: logging.writeToFile(f"[AI Scanner] No API key found for scan {scan_id}") return JsonResponse({'success': False, 'error': 'API key not configured. Please configure your AI Scanner API key.'}) # Call platform API to get monitor URL try: url = f"https://platform.cyberpersons.com/ai-scanner/api/scan/{scan_id}/monitor-url/" headers = { 'X-API-Key': api_key, 'Content-Type': 'application/json' } logging.writeToFile(f"[AI Scanner] Fetching platform monitor URL for scan {scan_id}") response = requests.get(url, headers=headers, timeout=10) try: data = response.json() except json.JSONDecodeError as e: logging.writeToFile(f"[AI Scanner] JSON decode error: {str(e)}") return JsonResponse({'success': False, 'error': 'Invalid response from platform'}) if response.status_code == 200 and data.get('success'): logging.writeToFile(f"[AI Scanner] Got monitor URL: {data.get('monitor_url')}") return JsonResponse({ 'success': True, 'monitor_url': data.get('monitor_url'), 'platform_scan_id': data.get('platform_scan_id') }) else: error_msg = data.get('error', 'Failed to get monitor URL') logging.writeToFile(f"[AI Scanner] Failed to get monitor URL: {error_msg}") return JsonResponse({ 'success': False, 'error': error_msg, 'scan_exists': data.get('scan_exists', False) }) except requests.exceptions.Timeout: logging.writeToFile(f"[AI Scanner] Platform request timeout for scan {scan_id}") return JsonResponse({'success': False, 'error': 'Platform request timeout'}) except requests.exceptions.RequestException as e: logging.writeToFile(f"[AI Scanner] Platform request error: {str(e)}") return JsonResponse({'success': False, 'error': f'Platform error: {str(e)}'}) except Exception as e: logging.writeToFile(f"[AI Scanner] Unexpected error: {str(e)}") return JsonResponse({'success': False, 'error': f'Error: {str(e)}'}) except KeyError: return JsonResponse({'success': False, 'error': 'Not authenticated'}) except Exception as e: logging.writeToFile(f"[AI Scanner] getPlatformMonitorUrl error: {str(e)}") return JsonResponse({'success': False, 'error': str(e)}) def getPlatformScanStatus(request, scan_id): """Get real-time scan status from AI Scanner platform""" try: userID = request.session['userID'] from loginSystem.models import Administrator from .models import ScanHistory from plogical.acl import ACLManager admin = Administrator.objects.get(pk=userID) currentACL = ACLManager.loadedACL(userID) # Get scan with ACL respect try: scan = ScanHistory.objects.get(scan_id=scan_id) # Check if user has access to this scan if currentACL['admin'] != 1: # Non-admin users can only see their own scans and their sub-users' scans user_admins = ACLManager.loadUserObjects(userID) if scan.admin not in user_admins: return JsonResponse({'success': False, 'error': 'Access denied to this scan'}) except ScanHistory.DoesNotExist: return JsonResponse({'success': False, 'error': 'Scan not found'}) scanner_settings = admin.ai_scanner_settings if not scanner_settings.api_key: return JsonResponse({'success': False, 'error': 'API key not configured'}) # Get real-time status from platform sm = AIScannerManager() platform_status = sm.get_scan_status(scanner_settings.api_key, scan_id) if platform_status and platform_status.get('success'): status_data = platform_status.get('data', {}) # Return formatted status data for frontend return JsonResponse({ 'success': True, 'scan_id': scan_id, 'phase': status_data.get('status', 'unknown'), 'progress': status_data.get('progress', 0), 'current_file': status_data.get('current_file', ''), 'files_discovered': status_data.get('files_discovered', 0), 'files_scanned': status_data.get('files_scanned', 0), 'files_remaining': status_data.get('files_remaining', 0), 'threats_found': status_data.get('findings_count', 0), 'critical_threats': status_data.get('critical_threats', 0), 'high_threats': status_data.get('high_threats', 0), 'activity_description': status_data.get('activity_description', ''), 'last_updated': status_data.get('last_updated', ''), 'is_active': status_data.get('status') in ['scanning', 'discovering_files', 'starting'], 'cost': status_data.get('cost', '$0.00') }) else: # No live status available, return scan database status return JsonResponse({ 'success': True, 'scan_id': scan_id, 'phase': scan.status, 'progress': 100 if scan.status == 'completed' else 0, 'current_file': '', 'files_discovered': scan.files_scanned, 'files_scanned': scan.files_scanned, 'files_remaining': 0, 'threats_found': scan.issues_found, 'critical_threats': 0, 'high_threats': 0, 'activity_description': scan.error_message if scan.status == 'failed' else 'Scan completed', 'last_updated': scan.completed_at.isoformat() if scan.completed_at else scan.started_at.isoformat(), 'is_active': False, 'cost': f'${scan.cost_usd:.4f}' if scan.cost_usd else '$0.00' }) except KeyError: return JsonResponse({'success': False, 'error': 'Not authenticated'}) except ScanHistory.DoesNotExist: return JsonResponse({'success': False, 'error': 'Scan not found'}) except Exception as e: return JsonResponse({'success': False, 'error': str(e)}) # File Access API for AI Scanner @csrf_exempt @require_http_methods(['POST']) def aiScannerAuthenticate(request): """Authenticate AI scanner access""" try: data = json.loads(request.body) access_token = data.get('access_token') scan_id = data.get('scan_id') if not access_token or not scan_id: return JsonResponse({'success': False, 'error': 'Missing parameters'}) from .models import FileAccessToken, ScanHistory # Validate token try: file_token = FileAccessToken.objects.get( token=access_token, scan_history__scan_id=scan_id, is_active=True ) if file_token.is_expired(): return JsonResponse({'success': False, 'error': 'Token expired'}) # Get WordPress info from websiteFunctions.models import Websites try: website = Websites.objects.get(domain=file_token.domain) # Detect WordPress path and version wp_path = file_token.wp_path wp_version = 'Unknown' php_version = 'Unknown' # Try to get WP version from wp-includes/version.php version_file = os.path.join(wp_path, 'wp-includes', 'version.php') if os.path.exists(version_file): try: with open(version_file, 'r') as f: content = f.read() import re match = re.search(r'\$wp_version\s*=\s*[\'"]([^\'"]+)[\'"]', content) if match: wp_version = match.group(1) except: pass return JsonResponse({ 'success': True, 'site_info': { 'domain': file_token.domain, 'wp_path': wp_path, 'php_version': php_version, 'wp_version': wp_version, 'scan_id': scan_id } }) except Websites.DoesNotExist: return JsonResponse({'success': False, 'error': 'Website not found'}) except FileAccessToken.DoesNotExist: return JsonResponse({'success': False, 'error': 'Invalid token'}) except Exception as e: return JsonResponse({'success': False, 'error': str(e)}) @csrf_exempt @require_http_methods(['GET']) def aiScannerListFiles(request): """List directory contents for AI scanner""" try: path = request.GET.get('path', '') access_token = request.headers.get('Authorization', '').replace('Bearer ', '') if not access_token: return JsonResponse({'success': False, 'error': 'No authorization token'}) from .models import FileAccessToken # Validate token try: file_token = FileAccessToken.objects.get(token=access_token, is_active=True) if file_token.is_expired(): return JsonResponse({'success': False, 'error': 'Token expired'}) # Construct full path full_path = os.path.join(file_token.wp_path, path) # Security check - ensure path is within WordPress directory if not os.path.abspath(full_path).startswith(os.path.abspath(file_token.wp_path)): return JsonResponse({'success': False, 'error': 'Path not allowed'}) if not os.path.exists(full_path): return JsonResponse({'success': False, 'error': 'Path not found'}) if not os.path.isdir(full_path): return JsonResponse({'success': False, 'error': 'Path is not a directory'}) # List directory contents items = [] try: for item in os.listdir(full_path): item_path = os.path.join(full_path, item) # Skip hidden files and certain directories if item.startswith('.') or item in ['__pycache__', 'node_modules']: continue if os.path.isdir(item_path): items.append({ 'name': item, 'type': 'directory', 'path': os.path.join(path, item).replace('\\', '/') if path else item }) else: # Only include certain file types if item.endswith(('.php', '.js', '.html', '.htm', '.css', '.txt', '.md', '.json', '.xml')): items.append({ 'name': item, 'type': 'file', 'path': os.path.join(path, item).replace('\\', '/') if path else item, 'size': os.path.getsize(item_path) }) return JsonResponse({ 'success': True, 'path': path, 'items': items }) except PermissionError: return JsonResponse({'success': False, 'error': 'Permission denied'}) except FileAccessToken.DoesNotExist: return JsonResponse({'success': False, 'error': 'Invalid token'}) except Exception as e: return JsonResponse({'success': False, 'error': str(e)}) @csrf_exempt @require_http_methods(['GET']) def aiScannerGetFile(request): """Get file content for AI scanner""" try: file_path = request.GET.get('path') access_token = request.headers.get('Authorization', '').replace('Bearer ', '') if not access_token or not file_path: return JsonResponse({'success': False, 'error': 'Missing parameters'}) from .models import FileAccessToken # Validate token try: file_token = FileAccessToken.objects.get(token=access_token, is_active=True) if file_token.is_expired(): return JsonResponse({'success': False, 'error': 'Token expired'}) # Construct full path full_path = os.path.join(file_token.wp_path, file_path) # Security check - ensure path is within WordPress directory if not os.path.abspath(full_path).startswith(os.path.abspath(file_token.wp_path)): return JsonResponse({'success': False, 'error': 'Path not allowed'}) if not os.path.exists(full_path): return JsonResponse({'success': False, 'error': 'File not found'}) if not os.path.isfile(full_path): return JsonResponse({'success': False, 'error': 'Path is not a file'}) # Check file size (max 10MB as per API limits) file_size = os.path.getsize(full_path) if file_size > 10 * 1024 * 1024: # 10MB return JsonResponse({'success': False, 'error': 'File too large'}) # Read file content try: with open(full_path, 'r', encoding='utf-8', errors='ignore') as f: content = f.read() return JsonResponse({ 'success': True, 'path': file_path, 'content': content, 'size': file_size }) except UnicodeDecodeError: # Try binary mode for non-text files with open(full_path, 'rb') as f: content = f.read() # Return base64 encoded for binary files import base64 return JsonResponse({ 'success': True, 'path': file_path, 'content': base64.b64encode(content).decode('utf-8'), 'encoding': 'base64', 'size': file_size }) except FileAccessToken.DoesNotExist: return JsonResponse({'success': False, 'error': 'Invalid token'}) except Exception as e: return JsonResponse({'success': False, 'error': str(e)})