mirror of
				https://github.com/usmannasir/cyberpanel.git
				synced 2025-10-31 02:15:55 +01:00 
			
		
		
		
	- Add error checking for mkdir command (check result == 1) - Add error checking for cp backup command (check result == 1) - Strip trailing slash from wp_path to avoid double slashes - Return proper error messages when backup fails - Prevents rename/delete if backup fails - Fixes: 'Failed to backup file before quarantine' error
		
			
				
	
	
		
			1706 lines
		
	
	
		
			72 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			1706 lines
		
	
	
		
			72 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import json
 | |
| import os
 | |
| import time
 | |
| import mimetypes
 | |
| import base64
 | |
| from django.http import JsonResponse
 | |
| from django.views.decorators.csrf import csrf_exempt
 | |
| from django.views.decorators.http import require_http_methods
 | |
| from websiteFunctions.models import Websites
 | |
| from loginSystem.models import Administrator
 | |
| from .models import FileAccessToken, ScanHistory
 | |
| from plogical.CyberCPLogFileWriter import CyberCPLogFileWriter as logging
 | |
| 
 | |
| 
 | |
| class SecurityError(Exception):
 | |
|     """Custom exception for security violations"""
 | |
|     pass
 | |
| 
 | |
| 
 | |
| class AuthWrapper:
 | |
|     """
 | |
|     Wrapper to provide consistent interface for both FileAccessToken and API Key authentication
 | |
|     """
 | |
|     def __init__(self, domain, wp_path, auth_type, external_app=None, source_obj=None):
 | |
|         self.domain = domain
 | |
|         self.wp_path = wp_path
 | |
|         self.auth_type = auth_type  # 'file_token' or 'api_key'
 | |
|         self.external_app = external_app  # The website's externalApp for command execution
 | |
|         self.source_obj = source_obj  # Original FileAccessToken or AIScannerSettings object
 | |
| 
 | |
| 
 | |
| def validate_access_token(token, scan_id):
 | |
|     """
 | |
|     Validate authentication token - accepts BOTH file access tokens and API keys
 | |
| 
 | |
|     Authentication Flow:
 | |
|     1. Try FileAccessToken (temporary token for active scans)
 | |
|     2. If not found, try API Key (for post-scan file operations)
 | |
| 
 | |
|     Returns: (AuthWrapper object or None, error_message or None)
 | |
|     """
 | |
|     try:
 | |
|         if not token or not token.startswith('cp_'):
 | |
|             logging.writeToFile(f'[API] Invalid token format: {token[:20] if token else "None"}...')
 | |
|             return None, "Invalid token format"
 | |
| 
 | |
|         # OPTION 1: Try FileAccessToken first (for active scans)
 | |
|         try:
 | |
|             file_token = FileAccessToken.objects.get(
 | |
|                 token=token,
 | |
|                 scan_history__scan_id=scan_id,
 | |
|                 is_active=True
 | |
|             )
 | |
| 
 | |
|             if file_token.is_expired():
 | |
|                 logging.writeToFile(f'[API] File token expired for scan {scan_id}, trying API key fallback...')
 | |
|                 # Don't return here - fall through to try API key
 | |
|             else:
 | |
|                 # Get externalApp from the website object
 | |
|                 from websiteFunctions.models import Websites
 | |
|                 try:
 | |
|                     website = Websites.objects.get(domain=file_token.domain)
 | |
|                     external_app = website.externalApp
 | |
|                 except Websites.DoesNotExist:
 | |
|                     logging.writeToFile(f'[API] Website not found for domain: {file_token.domain}')
 | |
|                     return None, "Website not found"
 | |
| 
 | |
|                 logging.writeToFile(f'[API] File token validated successfully for scan {scan_id}, user {external_app}')
 | |
|                 return AuthWrapper(
 | |
|                     domain=file_token.domain,
 | |
|                     wp_path=file_token.wp_path,
 | |
|                     auth_type='file_token',
 | |
|                     external_app=external_app,
 | |
|                     source_obj=file_token
 | |
|                 ), None
 | |
| 
 | |
|         except FileAccessToken.DoesNotExist:
 | |
|             logging.writeToFile(f'[API] File token not found for scan {scan_id}, trying API key fallback...')
 | |
|             # Fall through to try API key
 | |
| 
 | |
|         # OPTION 2: Try API Key (for post-scan file operations)
 | |
|         try:
 | |
|             from .models import AIScannerSettings, ScanHistory
 | |
| 
 | |
|             # Find API key in settings
 | |
|             scanner_settings = AIScannerSettings.objects.get(
 | |
|                 api_key=token
 | |
|             )
 | |
| 
 | |
|             logging.writeToFile(f'[API] Found API key for admin: {scanner_settings.admin.userName}')
 | |
| 
 | |
|             # Get the scan to verify it belongs to this admin and get domain/path
 | |
|             try:
 | |
|                 scan = ScanHistory.objects.get(
 | |
|                     scan_id=scan_id,
 | |
|                     admin=scanner_settings.admin
 | |
|                 )
 | |
| 
 | |
|                 # Get wp_path from WPSites (WordPress installations)
 | |
|                 try:
 | |
|                     from websiteFunctions.models import WPSites
 | |
| 
 | |
|                     # Try to find WordPress site by domain
 | |
|                     # FinalURL contains the full URL, so we use icontains to match domain
 | |
|                     wp_site = WPSites.objects.filter(
 | |
|                         FinalURL__icontains=scan.domain
 | |
|                     ).first()
 | |
| 
 | |
|                     if not wp_site:
 | |
|                         logging.writeToFile(f'[API] WordPress site not found for domain: {scan.domain}')
 | |
|                         return None, "WordPress site not found"
 | |
| 
 | |
|                     wp_path = wp_site.path
 | |
|                     external_app = wp_site.owner.externalApp  # Get externalApp from the website owner
 | |
| 
 | |
|                     logging.writeToFile(f'[API] API key validated successfully for scan {scan_id}, domain {scan.domain}, path {wp_path}, user {external_app}')
 | |
| 
 | |
|                     return AuthWrapper(
 | |
|                         domain=scan.domain,
 | |
|                         wp_path=wp_path,
 | |
|                         auth_type='api_key',
 | |
|                         external_app=external_app,
 | |
|                         source_obj=scanner_settings
 | |
|                     ), None
 | |
| 
 | |
|                 except Exception as e:
 | |
|                     logging.writeToFile(f'[API] Error getting WordPress path for domain {scan.domain}: {str(e)}')
 | |
|                     return None, "WordPress site not found"
 | |
| 
 | |
|             except ScanHistory.DoesNotExist:
 | |
|                 logging.writeToFile(f'[API] Scan {scan_id} not found or does not belong to API key owner')
 | |
|                 return None, "Scan not found or access denied"
 | |
| 
 | |
|         except AIScannerSettings.DoesNotExist:
 | |
|             logging.writeToFile(f'[API] API key not found in settings')
 | |
|             return None, "Invalid token"
 | |
| 
 | |
|     except Exception as e:
 | |
|         logging.writeToFile(f'[API] Token validation error: {str(e)}')
 | |
|         import traceback
 | |
|         logging.writeToFile(f'[API] Traceback: {traceback.format_exc()}')
 | |
|         return None, "Token validation failed"
 | |
| 
 | |
| 
 | |
| def secure_path_check(base_path, requested_path):
 | |
|     """
 | |
|     Ensure requested path is within allowed directory
 | |
|     Prevent directory traversal attacks
 | |
|     """
 | |
|     try:
 | |
|         if requested_path:
 | |
|             full_path = os.path.join(base_path, requested_path.strip('/'))
 | |
|         else:
 | |
|             full_path = base_path
 | |
|             
 | |
|         full_path = os.path.abspath(full_path)
 | |
|         base_path = os.path.abspath(base_path)
 | |
| 
 | |
|         if not full_path.startswith(base_path):
 | |
|             raise SecurityError("Path outside allowed directory")
 | |
| 
 | |
|         return full_path
 | |
|     except Exception as e:
 | |
|         raise SecurityError(f"Path security check failed: {str(e)}")
 | |
| 
 | |
| 
 | |
| @csrf_exempt
 | |
| @require_http_methods(['POST'])
 | |
| def authenticate_worker(request):
 | |
|     """
 | |
|     POST /api/ai-scanner/authenticate
 | |
|     
 | |
|     Request Body:
 | |
|     {
 | |
|         "access_token": "cp_access_abc123...",
 | |
|         "scan_id": "550e8400-e29b-41d4-a716-446655440000",
 | |
|         "worker_id": "scanner-1.domain.com"
 | |
|     }
 | |
|     
 | |
|     Response:
 | |
|     {
 | |
|         "success": true,
 | |
|         "site_info": {
 | |
|             "domain": "client-domain.com",
 | |
|             "wp_path": "/home/client/public_html",
 | |
|             "php_version": "8.1",
 | |
|             "wp_version": "6.3.1"
 | |
|         },
 | |
|         "permissions": ["read_files", "list_directories"],
 | |
|         "expires_at": "2024-12-25T11:00:00Z"
 | |
|     }
 | |
|     """
 | |
|     try:
 | |
|         data = json.loads(request.body)
 | |
|         access_token = data.get('access_token')
 | |
|         scan_id = data.get('scan_id')
 | |
|         worker_id = data.get('worker_id', 'unknown')
 | |
|         
 | |
|         logging.writeToFile(f'[API] Authentication request from worker {worker_id} for scan {scan_id}')
 | |
|         
 | |
|         if not access_token or not scan_id:
 | |
|             return JsonResponse({'error': 'Missing access_token or scan_id'}, status=400)
 | |
| 
 | |
|         # Validate access token
 | |
|         file_token, error = validate_access_token(access_token, scan_id)
 | |
|         if error:
 | |
|             return JsonResponse({'error': error}, status=401)
 | |
|         
 | |
|         # Get website info
 | |
|         try:
 | |
|             website = Websites.objects.get(domain=file_token.domain)
 | |
|             
 | |
|             # Get WordPress info
 | |
|             wp_path = file_token.wp_path
 | |
|             wp_version = 'Unknown'
 | |
|             php_version = 'Unknown'
 | |
|             
 | |
|             # Try to get WP version from wp-includes/version.php using ProcessUtilities
 | |
|             version_file = os.path.join(wp_path, 'wp-includes', 'version.php')
 | |
|             try:
 | |
|                 from plogical.processUtilities import ProcessUtilities
 | |
|                 
 | |
|                 # Use ProcessUtilities to read file as the website user
 | |
|                 command = f'cat "{version_file}"'
 | |
|                 result = ProcessUtilities.outputExecutioner(command, user=website.externalApp, retRequired=True)
 | |
|                 
 | |
|                 if result[1]:  # Check if there's content (ignore return code)
 | |
|                     content = result[1]
 | |
|                     import re
 | |
|                     match = re.search(r'\$wp_version\s*=\s*[\'"]([^\'"]+)[\'"]', content)
 | |
|                     if match:
 | |
|                         wp_version = match.group(1)
 | |
|                         logging.writeToFile(f'[API] Detected WordPress version: {wp_version}')
 | |
|                 else:
 | |
|                     logging.writeToFile(f'[API] Could not read WP version file: {result[1] if len(result) > 1 else "No content returned"}')
 | |
|                     
 | |
|             except Exception as e:
 | |
|                 logging.writeToFile(f'[API] Error reading WP version: {str(e)}')
 | |
|             
 | |
|             # Try to detect PHP version (basic detection)
 | |
|             try:
 | |
|                 import subprocess
 | |
|                 result = subprocess.run(['php', '-v'], capture_output=True, text=True, timeout=5)
 | |
|                 if result.returncode == 0:
 | |
|                     import re
 | |
|                     match = re.search(r'PHP (\d+\.\d+)', result.stdout)
 | |
|                     if match:
 | |
|                         php_version = match.group(1)
 | |
|             except Exception:
 | |
|                 pass
 | |
|             
 | |
|             response_data = {
 | |
|                 'success': True,
 | |
|                 'site_info': {
 | |
|                     'domain': file_token.domain,
 | |
|                     'wp_path': wp_path,
 | |
|                     'php_version': php_version,
 | |
|                     'wp_version': wp_version,
 | |
|                     'scan_id': scan_id
 | |
|                 },
 | |
|                 'permissions': ['read_files', 'list_directories'],
 | |
|                 'expires_at': file_token.expires_at.strftime('%Y-%m-%dT%H:%M:%SZ')
 | |
|             }
 | |
|             
 | |
|             logging.writeToFile(f'[API] Authentication successful for {file_token.domain}')
 | |
|             return JsonResponse(response_data)
 | |
|             
 | |
|         except Websites.DoesNotExist:
 | |
|             logging.writeToFile(f'[API] Website not found: {file_token.domain}')
 | |
|             return JsonResponse({'error': 'Website not found'}, status=404)
 | |
|             
 | |
|     except json.JSONDecodeError:
 | |
|         return JsonResponse({'error': 'Invalid JSON'}, status=400)
 | |
|     except Exception as e:
 | |
|         logging.writeToFile(f'[API] Authentication error: {str(e)}')
 | |
|         return JsonResponse({'error': 'Authentication failed'}, status=500)
 | |
| 
 | |
| 
 | |
| @csrf_exempt  
 | |
| @require_http_methods(['GET'])
 | |
| def list_files(request):
 | |
|     """
 | |
|     GET /api/ai-scanner/files/list?path=wp-content/plugins
 | |
|     
 | |
|     Headers:
 | |
|     Authorization: Bearer cp_access_abc123...
 | |
|     X-Scan-ID: 550e8400-e29b-41d4-a716-446655440000
 | |
|     
 | |
|     Response:
 | |
|     {
 | |
|         "path": "wp-content/plugins",
 | |
|         "items": [
 | |
|             {
 | |
|                 "name": "akismet",
 | |
|                 "type": "directory",
 | |
|                 "modified": "2024-12-20T10:30:00Z"
 | |
|             },
 | |
|             {
 | |
|                 "name": "suspicious-plugin.php",
 | |
|                 "type": "file",
 | |
|                 "size": 15420,
 | |
|                 "modified": "2024-12-24T15:20:00Z",
 | |
|                 "permissions": "644"
 | |
|             }
 | |
|         ]
 | |
|     }
 | |
|     """
 | |
|     try:
 | |
|         # Validate authorization
 | |
|         auth_header = request.META.get('HTTP_AUTHORIZATION', '')
 | |
|         if not auth_header.startswith('Bearer '):
 | |
|             return JsonResponse({'error': 'Missing or invalid Authorization header'}, status=401)
 | |
|         
 | |
|         access_token = auth_header.replace('Bearer ', '')
 | |
|         scan_id = request.META.get('HTTP_X_SCAN_ID', '')
 | |
|         
 | |
|         if not scan_id:
 | |
|             return JsonResponse({'error': 'X-Scan-ID header required'}, status=400)
 | |
| 
 | |
|         # Validate access token
 | |
|         file_token, error = validate_access_token(access_token, scan_id)
 | |
|         if error:
 | |
|             return JsonResponse({'error': error}, status=401)
 | |
| 
 | |
|         # Get parameters
 | |
|         path = request.GET.get('path', '').strip('/')
 | |
|         
 | |
|         try:
 | |
|             # Security check and get full path
 | |
|             full_path = secure_path_check(file_token.wp_path, path)
 | |
|             
 | |
|             # Path existence and type checking will be done by ProcessUtilities
 | |
| 
 | |
|             # List directory contents using ProcessUtilities
 | |
|             items = []
 | |
|             try:
 | |
|                 from plogical.processUtilities import ProcessUtilities
 | |
|                 from websiteFunctions.models import Websites
 | |
|                 
 | |
|                 # Get website object for user context
 | |
|                 try:
 | |
|                     website = Websites.objects.get(domain=file_token.domain)
 | |
|                     user = website.externalApp
 | |
|                 except Websites.DoesNotExist:
 | |
|                     return JsonResponse({'error': 'Website not found'}, status=404)
 | |
|                 
 | |
|                 # Use ls command with ProcessUtilities to list directory as website user
 | |
|                 ls_command = f'ls -la "{full_path}"'
 | |
|                 result = ProcessUtilities.outputExecutioner(ls_command, user=user, retRequired=True)
 | |
|                 
 | |
|                 if result[1]:  # Check if there's content (ignore return code)
 | |
|                     lines = result[1].strip().split('\n')
 | |
|                     for line in lines[1:]:  # Skip the 'total' line
 | |
|                         if not line.strip():
 | |
|                             continue
 | |
|                             
 | |
|                         parts = line.split()
 | |
|                         if len(parts) < 9:
 | |
|                             continue
 | |
|                             
 | |
|                         permissions = parts[0]
 | |
|                         size = parts[4] if parts[4].isdigit() else 0
 | |
|                         name = ' '.join(parts[8:])  # Handle filenames with spaces
 | |
|                         
 | |
|                         # Skip hidden files, current/parent directory entries
 | |
|                         if name.startswith('.') or name in ['.', '..'] or name in ['__pycache__', 'node_modules']:
 | |
|                             continue
 | |
|                         
 | |
|                         item_data = {
 | |
|                             'name': name,
 | |
|                             'type': 'directory' if permissions.startswith('d') else 'file',
 | |
|                             'permissions': permissions[1:4] if len(permissions) >= 4 else '644'
 | |
|                         }
 | |
|                         
 | |
|                         if permissions.startswith('-'):  # Regular file
 | |
|                             try:
 | |
|                                 item_data['size'] = int(size)
 | |
|                             except ValueError:
 | |
|                                 item_data['size'] = 0
 | |
|                                 
 | |
|                             # Only include certain file types
 | |
|                             if name.endswith(('.php', '.js', '.html', '.htm', '.css', '.txt', '.md', '.json', '.xml', '.sql', '.log', '.conf', '.ini', '.yml', '.yaml')):
 | |
|                                 items.append(item_data)
 | |
|                         elif permissions.startswith('d'):  # Directory
 | |
|                             # Directories don't have a size in the same way
 | |
|                             item_data['size'] = 0
 | |
|                             items.append(item_data)
 | |
|                         else:
 | |
|                             # Other file types (links, etc.) - include with size 0
 | |
|                             item_data['size'] = 0
 | |
|                             items.append(item_data)
 | |
|                 else:
 | |
|                     logging.writeToFile(f'[API] Directory listing failed: {result[1] if len(result) > 1 else "No content returned"}')
 | |
|                     return JsonResponse({'error': 'Directory access failed'}, status=403)
 | |
| 
 | |
|             except Exception as e:
 | |
|                 logging.writeToFile(f'[API] Directory listing error: {str(e)}')
 | |
|                 return JsonResponse({'error': 'Directory access failed'}, status=403)
 | |
| 
 | |
|             logging.writeToFile(f'[API] Listed {len(items)} items in {path or "root"} for scan {scan_id}')
 | |
|             
 | |
|             return JsonResponse({
 | |
|                 'path': path,
 | |
|                 'items': sorted(items, key=lambda x: (x['type'] == 'file', x['name'].lower()))
 | |
|             })
 | |
|             
 | |
|         except SecurityError as e:
 | |
|             logging.writeToFile(f'[API] Security violation: {str(e)}')
 | |
|             return JsonResponse({'error': 'Path not allowed'}, status=403)
 | |
| 
 | |
|     except Exception as e:
 | |
|         logging.writeToFile(f'[API] List files error: {str(e)}')
 | |
|         return JsonResponse({'error': 'Internal server error'}, status=500)
 | |
| 
 | |
| 
 | |
| @csrf_exempt
 | |
| @require_http_methods(['GET']) 
 | |
| def get_file_content(request):
 | |
|     """
 | |
|     GET /api/ai-scanner/files/content?path=wp-content/plugins/plugin.php
 | |
|     
 | |
|     Headers:
 | |
|     Authorization: Bearer cp_access_abc123...
 | |
|     X-Scan-ID: 550e8400-e29b-41d4-a716-446655440000
 | |
|     
 | |
|     Response:
 | |
|     {
 | |
|         "path": "wp-content/plugins/plugin.php",
 | |
|         "content": "<?php\n// Plugin code here...",
 | |
|         "size": 15420,
 | |
|         "encoding": "utf-8",
 | |
|         "mime_type": "text/x-php"
 | |
|     }
 | |
|     """
 | |
|     try:
 | |
|         # Validate authorization
 | |
|         auth_header = request.META.get('HTTP_AUTHORIZATION', '')
 | |
|         if not auth_header.startswith('Bearer '):
 | |
|             return JsonResponse({'error': 'Missing or invalid Authorization header'}, status=401)
 | |
|         
 | |
|         access_token = auth_header.replace('Bearer ', '')
 | |
|         scan_id = request.META.get('HTTP_X_SCAN_ID', '')
 | |
|         
 | |
|         if not scan_id:
 | |
|             return JsonResponse({'error': 'X-Scan-ID header required'}, status=400)
 | |
| 
 | |
|         # Get file path
 | |
|         path = request.GET.get('path', '').strip('/')
 | |
|         if not path:
 | |
|             return JsonResponse({'error': 'File path required'}, status=400)
 | |
| 
 | |
|         # Validate access token
 | |
|         file_token, error = validate_access_token(access_token, scan_id)
 | |
|         if error:
 | |
|             return JsonResponse({'error': error}, status=401)
 | |
| 
 | |
|         try:
 | |
|             # Security check and get full path
 | |
|             full_path = secure_path_check(file_token.wp_path, path)
 | |
| 
 | |
|             # File existence, type, and size checking will be done by ProcessUtilities
 | |
| 
 | |
|             # Only allow specific file types for security
 | |
|             allowed_extensions = {
 | |
|                 '.php', '.js', '.html', '.htm', '.css', '.txt', '.md',
 | |
|                 '.json', '.xml', '.sql', '.log', '.conf', '.ini', '.yml', '.yaml'
 | |
|             }
 | |
| 
 | |
|             file_ext = os.path.splitext(full_path)[1].lower()
 | |
|             if file_ext not in allowed_extensions:
 | |
|                 return JsonResponse({'error': f'File type not allowed: {file_ext}'}, status=403)
 | |
| 
 | |
|             # Read file content using ProcessUtilities
 | |
|             try:
 | |
|                 from plogical.processUtilities import ProcessUtilities
 | |
|                 from websiteFunctions.models import Websites
 | |
|                 
 | |
|                 # Get website object for user context
 | |
|                 try:
 | |
|                     website = Websites.objects.get(domain=file_token.domain)
 | |
|                     user = website.externalApp
 | |
|                 except Websites.DoesNotExist:
 | |
|                     return JsonResponse({'error': 'Website not found'}, status=404)
 | |
|                 
 | |
|                 # Check file size first using stat command
 | |
|                 stat_command = f'stat -c %s "{full_path}"'
 | |
|                 stat_result = ProcessUtilities.outputExecutioner(stat_command, user=user, retRequired=True)
 | |
|                 
 | |
|                 if stat_result[1]:  # Check if there's content (ignore return code)
 | |
|                     try:
 | |
|                         file_size = int(stat_result[1].strip())
 | |
|                         if file_size > 10 * 1024 * 1024:  # 10MB limit
 | |
|                             return JsonResponse({'error': 'File too large (max 10MB)'}, status=400)
 | |
|                     except ValueError:
 | |
|                         logging.writeToFile(f'[API] Could not parse file size: {stat_result[1]}')
 | |
|                         file_size = 0
 | |
|                 else:
 | |
|                     logging.writeToFile(f'[API] Could not get file size: {stat_result[1] if len(stat_result) > 1 else "No content returned"}')
 | |
|                     return JsonResponse({'error': 'File not found or inaccessible'}, status=404)
 | |
|                 
 | |
|                 # Use cat command with ProcessUtilities to read file as website user
 | |
|                 cat_command = f'cat "{full_path}"'
 | |
|                 result = ProcessUtilities.outputExecutioner(cat_command, user=user, retRequired=True)
 | |
|                 
 | |
|                 # Check if content was returned (file might be empty, which is valid)
 | |
|                 if len(result) > 1:  # We got a tuple back
 | |
|                     content = result[1] if result[1] is not None else ''
 | |
|                     encoding = 'utf-8'
 | |
|                 else:
 | |
|                     logging.writeToFile(f'[API] File read failed: No result returned')
 | |
|                     return JsonResponse({'error': 'Unable to read file'}, status=400)
 | |
| 
 | |
|             except Exception as e:
 | |
|                 logging.writeToFile(f'[API] File read error: {str(e)}')
 | |
|                 return JsonResponse({'error': 'Unable to read file'}, status=400)
 | |
| 
 | |
|             # Detect MIME type
 | |
|             mime_type, _ = mimetypes.guess_type(full_path)
 | |
|             if not mime_type:
 | |
|                 if file_ext == '.php':
 | |
|                     mime_type = 'text/x-php'
 | |
|                 elif file_ext == '.js':
 | |
|                     mime_type = 'application/javascript'
 | |
|                 else:
 | |
|                     mime_type = 'text/plain'
 | |
| 
 | |
|             # Base64 encode the content for safe transport
 | |
|             try:
 | |
|                 content_base64 = base64.b64encode(content.encode('utf-8')).decode('utf-8')
 | |
|             except UnicodeEncodeError:
 | |
|                 # Handle binary files or encoding issues
 | |
|                 try:
 | |
|                     content_base64 = base64.b64encode(content.encode('latin-1')).decode('utf-8')
 | |
|                     encoding = 'latin-1'
 | |
|                 except:
 | |
|                     logging.writeToFile(f'[API] Failed to encode file content for {path}')
 | |
|                     return JsonResponse({'error': 'File encoding not supported'}, status=400)
 | |
| 
 | |
|             logging.writeToFile(f'[API] File content retrieved: {path} ({file_size} bytes) for scan {scan_id}')
 | |
| 
 | |
|             return JsonResponse({
 | |
|                 'path': path,
 | |
|                 'content': content_base64,
 | |
|                 'size': file_size,
 | |
|                 'encoding': encoding,
 | |
|                 'mime_type': mime_type
 | |
|             })
 | |
|             
 | |
|         except SecurityError as e:
 | |
|             logging.writeToFile(f'[API] Security violation: {str(e)}')
 | |
|             return JsonResponse({'error': 'Path not allowed'}, status=403)
 | |
| 
 | |
|     except Exception as e:
 | |
|         logging.writeToFile(f'[API] Get file content error: {str(e)}')
 | |
|         return JsonResponse({'error': 'Internal server error'}, status=500)
 | |
| 
 | |
| 
 | |
| @csrf_exempt
 | |
| @require_http_methods(['POST'])
 | |
| def scan_callback(request):
 | |
|     """
 | |
|     Receive scan completion callbacks from AI Scanner platform
 | |
| 
 | |
|     POST /api/ai-scanner/callback
 | |
|     Content-Type: application/json
 | |
| 
 | |
|     Expected payload:
 | |
|     {
 | |
|         "scan_id": "uuid",
 | |
|         "status": "completed",
 | |
|         "summary": {
 | |
|             "threat_level": "HIGH|MEDIUM|LOW",
 | |
|             "total_findings": 3,
 | |
|             "files_scanned": 25,
 | |
|             "cost": "$0.0456"
 | |
|         },
 | |
|         "findings": [
 | |
|             {
 | |
|                 "file_path": "wp-content/plugins/file.php",
 | |
|                 "severity": "CRITICAL|HIGH|MEDIUM|LOW",
 | |
|                 "title": "Issue title",
 | |
|                 "description": "Detailed description",
 | |
|                 "ai_confidence": 95
 | |
|             }
 | |
|         ],
 | |
|         "ai_analysis": "AI summary text",
 | |
|         "completed_at": "2025-06-23T11:40:12Z"
 | |
|     }
 | |
|     """
 | |
|     try:
 | |
|         # Parse JSON payload
 | |
|         data = json.loads(request.body)
 | |
| 
 | |
|         scan_id = data.get('scan_id')
 | |
|         status = data.get('status')
 | |
|         summary = data.get('summary', {})
 | |
|         findings = data.get('findings', [])
 | |
|         ai_analysis = data.get('ai_analysis', '')
 | |
|         completed_at = data.get('completed_at')
 | |
| 
 | |
|         logging.writeToFile(f"[API] Received callback for scan {scan_id}: {status}")
 | |
| 
 | |
|         # Update scan status in CyberPanel database
 | |
|         try:
 | |
|             from .models import ScanHistory
 | |
|             from django.utils import timezone
 | |
|             import datetime
 | |
| 
 | |
|             # Find the scan record
 | |
|             scan_record = ScanHistory.objects.get(scan_id=scan_id)
 | |
| 
 | |
|             # Update scan record
 | |
|             scan_record.status = status
 | |
|             scan_record.issues_found = summary.get('total_findings', 0)
 | |
|             scan_record.files_scanned = summary.get('files_scanned', 0)
 | |
| 
 | |
|             # Parse and store cost
 | |
|             cost_str = summary.get('cost', '$0.00')
 | |
|             try:
 | |
|                 # Remove '$' and convert to float
 | |
|                 cost_value = float(cost_str.replace('$', '').replace(',', ''))
 | |
|                 scan_record.cost_usd = cost_value
 | |
|             except (ValueError, AttributeError):
 | |
|                 scan_record.cost_usd = 0.0
 | |
| 
 | |
|             # Store findings and AI analysis
 | |
|             scan_record.set_findings(findings)
 | |
| 
 | |
|             # Build summary dict
 | |
|             summary_dict = {
 | |
|                 'threat_level': summary.get('threat_level', 'UNKNOWN'),
 | |
|                 'total_findings': summary.get('total_findings', 0),
 | |
|                 'files_scanned': summary.get('files_scanned', 0),
 | |
|                 'ai_analysis': ai_analysis
 | |
|             }
 | |
|             scan_record.set_summary(summary_dict)
 | |
| 
 | |
|             # Set completion time
 | |
|             if completed_at:
 | |
|                 try:
 | |
|                     # Parse ISO format datetime
 | |
|                     completed_datetime = datetime.datetime.fromisoformat(completed_at.replace('Z', '+00:00'))
 | |
|                     scan_record.completed_at = completed_datetime
 | |
|                 except ValueError:
 | |
|                     scan_record.completed_at = timezone.now()
 | |
|             else:
 | |
|                 scan_record.completed_at = timezone.now()
 | |
| 
 | |
|             scan_record.save()
 | |
| 
 | |
|             # Also update the ScanStatusUpdate record with final statistics
 | |
|             try:
 | |
|                 from .status_models import ScanStatusUpdate
 | |
|                 status_update, _ = ScanStatusUpdate.objects.get_or_create(scan_id=scan_id)
 | |
|                 status_update.phase = 'completed'
 | |
|                 status_update.progress = 100
 | |
|                 status_update.files_discovered = summary.get('files_scanned', 0)  # Use files_scanned as approximation
 | |
|                 status_update.files_scanned = summary.get('files_scanned', 0)
 | |
|                 status_update.files_remaining = 0
 | |
|                 status_update.threats_found = summary.get('total_findings', 0)
 | |
|                 # Extract critical and high threats from findings if available
 | |
|                 critical_count = 0
 | |
|                 high_count = 0
 | |
|                 for finding in findings:
 | |
|                     severity = finding.get('severity', '').lower()
 | |
|                     if severity == 'critical':
 | |
|                         critical_count += 1
 | |
|                     elif severity == 'high':
 | |
|                         high_count += 1
 | |
|                 status_update.critical_threats = critical_count
 | |
|                 status_update.high_threats = high_count
 | |
|                 status_update.activity_description = f"Scan completed - {summary.get('total_findings', 0)} threats found"
 | |
|                 status_update.save()
 | |
|                 logging.writeToFile(f"[API] Updated ScanStatusUpdate for completed scan {scan_id}")
 | |
|             except Exception as e:
 | |
|                 logging.writeToFile(f"[API] Error updating ScanStatusUpdate: {str(e)}")
 | |
| 
 | |
|             # Update user balance if scan cost money
 | |
|             if scan_record.cost_usd > 0:
 | |
|                 try:
 | |
|                     scanner_settings = scan_record.admin.ai_scanner_settings
 | |
|                     if scanner_settings.balance >= scan_record.cost_usd:
 | |
|                         # Convert to same type to avoid Decimal/float issues
 | |
|                         scanner_settings.balance = float(scanner_settings.balance) - float(scan_record.cost_usd)
 | |
|                         scanner_settings.save()
 | |
|                         logging.writeToFile(f"[API] Deducted ${scan_record.cost_usd} from {scan_record.admin.userName} balance")
 | |
|                     else:
 | |
|                         logging.writeToFile(f"[API] Insufficient balance for scan cost: ${scan_record.cost_usd}")
 | |
|                 except Exception as e:
 | |
|                     logging.writeToFile(f"[API] Error updating balance: {str(e)}")
 | |
| 
 | |
|             # Deactivate file access tokens for this scan
 | |
|             try:
 | |
|                 from .models import FileAccessToken
 | |
|                 FileAccessToken.objects.filter(scan_history=scan_record).update(is_active=False)
 | |
|                 logging.writeToFile(f"[API] Deactivated file access tokens for scan {scan_id}")
 | |
|             except Exception as e:
 | |
|                 logging.writeToFile(f"[API] Error deactivating tokens: {str(e)}")
 | |
| 
 | |
|             logging.writeToFile(f"[API] Scan {scan_id} completed successfully:")
 | |
|             logging.writeToFile(f"[API]   Status: {status}")
 | |
|             logging.writeToFile(f"[API]   Threat Level: {summary.get('threat_level')}")
 | |
|             logging.writeToFile(f"[API]   Findings: {summary.get('total_findings')}")
 | |
|             logging.writeToFile(f"[API]   Files Scanned: {summary.get('files_scanned')}")
 | |
|             logging.writeToFile(f"[API]   Cost: {summary.get('cost')}")
 | |
| 
 | |
|         except ScanHistory.DoesNotExist:
 | |
|             logging.writeToFile(f"[API] Scan record not found: {scan_id}")
 | |
|             return JsonResponse({
 | |
|                 'status': 'error',
 | |
|                 'message': 'Scan record not found',
 | |
|                 'scan_id': scan_id
 | |
|             }, status=404)
 | |
| 
 | |
|         except Exception as e:
 | |
|             logging.writeToFile(f"[API] Failed to update scan record: {str(e)}")
 | |
|             return JsonResponse({
 | |
|                 'status': 'error',
 | |
|                 'message': 'Failed to update scan record',
 | |
|                 'scan_id': scan_id
 | |
|             }, status=500)
 | |
| 
 | |
|         # Return success response
 | |
|         return JsonResponse({
 | |
|             'status': 'success',
 | |
|             'message': 'Callback received successfully',
 | |
|             'scan_id': scan_id
 | |
|         })
 | |
| 
 | |
|     except json.JSONDecodeError:
 | |
|         logging.writeToFile("[API] Invalid JSON in callback request")
 | |
|         return JsonResponse({
 | |
|             'status': 'error',
 | |
|             'message': 'Invalid JSON payload'
 | |
|         }, status=400)
 | |
| 
 | |
|     except Exception as e:
 | |
|         logging.writeToFile(f"[API] Callback processing error: {str(e)}")
 | |
|         return JsonResponse({
 | |
|             'status': 'error',
 | |
|             'message': 'Internal server error'
 | |
|         }, status=500)
 | |
| 
 | |
| 
 | |
| # =============================================================================
 | |
| # File Operation Helper Functions
 | |
| # =============================================================================
 | |
| 
 | |
| def log_file_operation(scan_id, operation, file_path, success, error_message=None, backup_path=None, request=None):
 | |
|     """
 | |
|     Log file operations to the audit log
 | |
|     """
 | |
|     try:
 | |
|         from .models import ScannerFileOperation
 | |
| 
 | |
|         ip_address = None
 | |
|         user_agent = None
 | |
| 
 | |
|         if request:
 | |
|             ip_address = request.META.get('REMOTE_ADDR', '')[:45]
 | |
|             user_agent = request.META.get('HTTP_USER_AGENT', '')[:255]
 | |
| 
 | |
|         ScannerFileOperation.objects.create(
 | |
|             scan_id=scan_id,
 | |
|             operation=operation,
 | |
|             file_path=file_path,
 | |
|             backup_path=backup_path,
 | |
|             success=success,
 | |
|             error_message=error_message,
 | |
|             ip_address=ip_address,
 | |
|             user_agent=user_agent
 | |
|         )
 | |
| 
 | |
|         logging.writeToFile(f'[API] Logged {operation} operation for {file_path}: {"success" if success else "failed"}')
 | |
|     except Exception as e:
 | |
|         logging.writeToFile(f'[API] Failed to log operation: {str(e)}')
 | |
| 
 | |
| 
 | |
| def check_rate_limit(scan_id, endpoint, max_requests):
 | |
|     """
 | |
|     Check if rate limit is exceeded for a scan/endpoint combination
 | |
|     Returns (is_allowed, current_count)
 | |
|     """
 | |
|     try:
 | |
|         from .models import ScannerAPIRateLimit
 | |
| 
 | |
|         rate_limit, created = ScannerAPIRateLimit.objects.get_or_create(
 | |
|             scan_id=scan_id,
 | |
|             endpoint=endpoint,
 | |
|             defaults={'request_count': 0}
 | |
|         )
 | |
| 
 | |
|         if rate_limit.request_count >= max_requests:
 | |
|             logging.writeToFile(f'[API] Rate limit exceeded for scan {scan_id} on endpoint {endpoint}: {rate_limit.request_count}/{max_requests}')
 | |
|             return False, rate_limit.request_count
 | |
| 
 | |
|         rate_limit.request_count += 1
 | |
|         rate_limit.save()
 | |
| 
 | |
|         return True, rate_limit.request_count
 | |
|     except Exception as e:
 | |
|         logging.writeToFile(f'[API] Rate limit check error: {str(e)}')
 | |
|         # On error, allow the request
 | |
|         return True, 0
 | |
| 
 | |
| 
 | |
| def get_website_user(domain):
 | |
|     """
 | |
|     Get the system user for a website domain
 | |
|     """
 | |
|     try:
 | |
|         website = Websites.objects.get(domain=domain)
 | |
|         return website.externalApp
 | |
|     except Websites.DoesNotExist:
 | |
|         raise SecurityError(f"Website not found: {domain}")
 | |
| 
 | |
| 
 | |
| # =============================================================================
 | |
| # File Operation API Endpoints
 | |
| # =============================================================================
 | |
| 
 | |
| @csrf_exempt
 | |
| @require_http_methods(['POST'])
 | |
| def scanner_backup_file(request):
 | |
|     """
 | |
|     POST /api/scanner/backup-file
 | |
| 
 | |
|     Create a backup copy of a file before modification
 | |
| 
 | |
|     Headers:
 | |
|         Authorization: Bearer {file_access_token}
 | |
|         X-Scan-ID: {scan_job_id}
 | |
| 
 | |
|     Request Body:
 | |
|         {
 | |
|             "file_path": "wp-content/plugins/example/plugin.php",
 | |
|             "scan_id": "550e8400-e29b-41d4-a716-446655440000"
 | |
|         }
 | |
| 
 | |
|     Response:
 | |
|         {
 | |
|             "success": true,
 | |
|             "backup_path": "/home/username/public_html/.ai-scanner-backups/2025-10-25/plugin.php.1730000000.bak",
 | |
|             "original_path": "wp-content/plugins/example/plugin.php",
 | |
|             "backup_size": 15420,
 | |
|             "timestamp": "2025-10-25T20:30:00Z"
 | |
|         }
 | |
|     """
 | |
|     try:
 | |
|         # Parse request
 | |
|         data = json.loads(request.body)
 | |
|         file_path = data.get('file_path', '').strip('/')
 | |
|         scan_id = data.get('scan_id', '')
 | |
| 
 | |
|         # Validate authorization
 | |
|         auth_header = request.META.get('HTTP_AUTHORIZATION', '')
 | |
|         if not auth_header.startswith('Bearer '):
 | |
|             return JsonResponse({'success': False, 'error': 'Missing or invalid Authorization header'}, status=401)
 | |
| 
 | |
|         access_token = auth_header.replace('Bearer ', '')
 | |
|         header_scan_id = request.META.get('HTTP_X_SCAN_ID', '')
 | |
| 
 | |
|         if not scan_id or not header_scan_id or scan_id != header_scan_id:
 | |
|             return JsonResponse({'success': False, 'error': 'Scan ID mismatch'}, status=400)
 | |
| 
 | |
|         # Validate access token
 | |
|         file_token, error = validate_access_token(access_token, scan_id)
 | |
|         if error:
 | |
|             log_file_operation(scan_id, 'backup', file_path, False, error, request=request)
 | |
|             return JsonResponse({'success': False, 'error': error}, status=401)
 | |
| 
 | |
|         # Rate limiting
 | |
|         is_allowed, count = check_rate_limit(scan_id, 'backup-file', 100)
 | |
|         if not is_allowed:
 | |
|             return JsonResponse({'success': False, 'error': 'Rate limit exceeded (max 100 backups per scan)'}, status=429)
 | |
| 
 | |
|         # Security check and get full path
 | |
|         try:
 | |
|             full_path = secure_path_check(file_token.wp_path, file_path)
 | |
|         except SecurityError as e:
 | |
|             log_file_operation(scan_id, 'backup', file_path, False, str(e), request=request)
 | |
|             return JsonResponse({'success': False, 'error': 'Path not allowed'}, status=403)
 | |
| 
 | |
|         # Get website user from auth wrapper (already validated during authentication)
 | |
|         user = file_token.external_app
 | |
|         if not user:
 | |
|             error_msg = 'External app not available in auth context'
 | |
|             log_file_operation(scan_id, 'backup', file_path, False, error_msg, request=request)
 | |
|             return JsonResponse({'success': False, 'error': error_msg}, status=500)
 | |
| 
 | |
|         # Check file exists
 | |
|         from plogical.processUtilities import ProcessUtilities
 | |
| 
 | |
|         check_cmd = f'test -f "{full_path}" && echo "exists"'
 | |
|         result = ProcessUtilities.outputExecutioner(check_cmd, user=user, retRequired=True)
 | |
| 
 | |
|         if not result[1] or 'exists' not in result[1]:
 | |
|             log_file_operation(scan_id, 'backup', file_path, False, 'File not found', request=request)
 | |
|             return JsonResponse({'success': False, 'error': 'File not found', 'error_code': 'FILE_NOT_FOUND'}, status=404)
 | |
| 
 | |
|         # Create backup directory
 | |
|         import datetime
 | |
|         # Remove trailing slash from wp_path to avoid double slashes
 | |
|         wp_path_clean = file_token.wp_path.rstrip('/')
 | |
|         backup_dir_name = f'{wp_path_clean}/.ai-scanner-backups/{datetime.datetime.now().strftime("%Y-%m-%d")}'
 | |
| 
 | |
|         logging.writeToFile(f'[API] Creating backup directory: {backup_dir_name}')
 | |
|         mkdir_cmd = f'mkdir -p "{backup_dir_name}"'
 | |
|         mkdir_result = ProcessUtilities.executioner(mkdir_cmd, user=user)
 | |
| 
 | |
|         # executioner returns 1 for success, 0 for failure
 | |
|         if mkdir_result != 1:
 | |
|             error_msg = f'Failed to create backup directory: {backup_dir_name}'
 | |
|             logging.writeToFile(f'[API] {error_msg}, mkdir_result={mkdir_result}')
 | |
|             log_file_operation(scan_id, 'backup', file_path, False, error_msg, request=request)
 | |
|             return JsonResponse({'success': False, 'error': error_msg, 'error_code': 'BACKUP_DIR_FAILED'}, status=500)
 | |
| 
 | |
|         # Create backup filename with timestamp
 | |
|         timestamp = int(time.time())
 | |
|         basename = os.path.basename(full_path)
 | |
|         backup_filename = f'{basename}.{timestamp}.bak'
 | |
|         backup_path = os.path.join(backup_dir_name, backup_filename)
 | |
| 
 | |
|         logging.writeToFile(f'[API] Backing up {full_path} to {backup_path}')
 | |
| 
 | |
|         # Copy file to backup
 | |
|         cp_cmd = f'cp "{full_path}" "{backup_path}"'
 | |
|         cp_result = ProcessUtilities.outputExecutioner(cp_cmd, user=user, retRequired=True)
 | |
| 
 | |
|         # outputExecutioner returns (1, output) for success, (0, output) for failure
 | |
|         # Also check output for error messages as additional safety
 | |
|         if cp_result[0] != 1 or (cp_result[1] and 'error' in cp_result[1].lower()):
 | |
|             error_output = cp_result[1] if len(cp_result) > 1 else 'Unknown error'
 | |
|             error_msg = f'Failed to create backup: {error_output}'
 | |
|             logging.writeToFile(f'[API] Backup failed: cp returned {cp_result[0]}, output: {error_output}')
 | |
|             log_file_operation(scan_id, 'backup', file_path, False, error_msg, request=request)
 | |
|             return JsonResponse({'success': False, 'error': error_msg, 'error_code': 'BACKUP_FAILED'}, status=500)
 | |
| 
 | |
|         # Get file size
 | |
|         stat_cmd = f'stat -c %s "{backup_path}"'
 | |
|         stat_result = ProcessUtilities.outputExecutioner(stat_cmd, user=user, retRequired=True)
 | |
| 
 | |
|         backup_size = 0
 | |
|         if stat_result[1]:
 | |
|             try:
 | |
|                 backup_size = int(stat_result[1].strip())
 | |
|             except ValueError:
 | |
|                 pass
 | |
| 
 | |
|         # Log success
 | |
|         log_file_operation(scan_id, 'backup', file_path, True, backup_path=backup_path, request=request)
 | |
| 
 | |
|         logging.writeToFile(f'[API] Backup created for {file_path}: {backup_path}')
 | |
| 
 | |
|         return JsonResponse({
 | |
|             'success': True,
 | |
|             'backup_path': backup_path,
 | |
|             'original_path': file_path,
 | |
|             'backup_size': backup_size,
 | |
|             'timestamp': datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ')
 | |
|         })
 | |
| 
 | |
|     except json.JSONDecodeError:
 | |
|         return JsonResponse({'success': False, 'error': 'Invalid JSON'}, status=400)
 | |
|     except Exception as e:
 | |
|         logging.writeToFile(f'[API] Backup file error: {str(e)}')
 | |
|         log_file_operation(scan_id if 'scan_id' in locals() else 'unknown', 'backup',
 | |
|                           file_path if 'file_path' in locals() else 'unknown', False, str(e), request=request)
 | |
|         return JsonResponse({'success': False, 'error': 'Internal server error'}, status=500)@csrf_exempt
 | |
| @require_http_methods(['GET'])
 | |
| def scanner_get_file(request):
 | |
|     """
 | |
|     GET /api/scanner/get-file?file_path=wp-content/plugins/plugin.php
 | |
| 
 | |
|     Read the contents of a file for analysis or verification
 | |
| 
 | |
|     Headers:
 | |
|         Authorization: Bearer {file_access_token}
 | |
|         X-Scan-ID: {scan_job_id}
 | |
| 
 | |
|     Response:
 | |
|         {
 | |
|             "success": true,
 | |
|             "file_path": "wp-content/plugins/example/plugin.php",
 | |
|             "content": "<?php\n/*\nPlugin Name: Example Plugin\n*/\n...",
 | |
|             "size": 15420,
 | |
|             "encoding": "utf-8",
 | |
|             "mime_type": "text/x-php",
 | |
|             "last_modified": "2025-10-25T20:30:00Z",
 | |
|             "hash": {
 | |
|                 "md5": "5d41402abc4b2a76b9719d911017c592",
 | |
|                 "sha256": "2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae"
 | |
|             }
 | |
|         }
 | |
|     """
 | |
|     try:
 | |
|         # Validate authorization
 | |
|         auth_header = request.META.get('HTTP_AUTHORIZATION', '')
 | |
|         if not auth_header.startswith('Bearer '):
 | |
|             return JsonResponse({'success': False, 'error': 'Missing or invalid Authorization header'}, status=401)
 | |
| 
 | |
|         access_token = auth_header.replace('Bearer ', '')
 | |
|         scan_id = request.META.get('HTTP_X_SCAN_ID', '')
 | |
| 
 | |
|         if not scan_id:
 | |
|             return JsonResponse({'success': False, 'error': 'X-Scan-ID header required'}, status=400)
 | |
| 
 | |
|         # Get file path
 | |
|         file_path = request.GET.get('file_path', '').strip('/')
 | |
|         if not file_path:
 | |
|             return JsonResponse({'success': False, 'error': 'File path required'}, status=400)
 | |
| 
 | |
|         # Validate access token
 | |
|         file_token, error = validate_access_token(access_token, scan_id)
 | |
|         if error:
 | |
|             log_file_operation(scan_id, 'read', file_path, False, error, request=request)
 | |
|             return JsonResponse({'success': False, 'error': error}, status=401)
 | |
| 
 | |
|         # Rate limiting
 | |
|         is_allowed, count = check_rate_limit(scan_id, 'get-file', 500)
 | |
|         if not is_allowed:
 | |
|             return JsonResponse({'success': False, 'error': 'Rate limit exceeded (max 500 file reads per scan)'}, status=429)
 | |
| 
 | |
|         # Security check and get full path
 | |
|         try:
 | |
|             full_path = secure_path_check(file_token.wp_path, file_path)
 | |
|         except SecurityError as e:
 | |
|             log_file_operation(scan_id, 'read', file_path, False, str(e), request=request)
 | |
|             return JsonResponse({'success': False, 'error': 'Path not allowed'}, status=403)
 | |
| 
 | |
|         # Only allow specific file types for security
 | |
|         allowed_extensions = {
 | |
|             '.php', '.js', '.html', '.htm', '.css', '.txt', '.md',
 | |
|             '.json', '.xml', '.sql', '.log', '.conf', '.ini', '.yml', '.yaml'
 | |
|         }
 | |
| 
 | |
|         file_ext = os.path.splitext(full_path)[1].lower()
 | |
|         if file_ext not in allowed_extensions:
 | |
|             log_file_operation(scan_id, 'read', file_path, False, f'File type not allowed: {file_ext}', request=request)
 | |
|             return JsonResponse({'success': False, 'error': f'File type not allowed: {file_ext}'}, status=403)
 | |
| 
 | |
|         # Get website user from auth wrapper (already validated during authentication)
 | |
|         user = file_token.external_app
 | |
|         if not user:
 | |
|             error_msg = 'External app not available in auth context'
 | |
|             log_file_operation(scan_id, 'read', file_path, False, error_msg, request=request)
 | |
|             return JsonResponse({'success': False, 'error': error_msg}, status=500)
 | |
| 
 | |
|         # Check file size
 | |
|         from plogical.processUtilities import ProcessUtilities
 | |
|         import hashlib
 | |
| 
 | |
|         stat_cmd = f'stat -c "%s %Y" "{full_path}"'
 | |
|         stat_result = ProcessUtilities.outputExecutioner(stat_cmd, user=user, retRequired=True)
 | |
| 
 | |
|         if not stat_result[1]:
 | |
|             log_file_operation(scan_id, 'read', file_path, False, 'File not found', request=request)
 | |
|             return JsonResponse({'success': False, 'error': 'File not found', 'error_code': 'FILE_NOT_FOUND'}, status=404)
 | |
| 
 | |
|         try:
 | |
|             parts = stat_result[1].strip().split()
 | |
|             file_size = int(parts[0])
 | |
|             last_modified_timestamp = int(parts[1])
 | |
| 
 | |
|             if file_size > 10 * 1024 * 1024:  # 10MB limit
 | |
|                 log_file_operation(scan_id, 'read', file_path, False, 'File too large (max 10MB)', request=request)
 | |
|                 return JsonResponse({'success': False, 'error': 'File too large (max 10MB)'}, status=400)
 | |
|         except (ValueError, IndexError):
 | |
|             log_file_operation(scan_id, 'read', file_path, False, 'Could not get file size', request=request)
 | |
|             return JsonResponse({'success': False, 'error': 'Could not get file size'}, status=500)
 | |
| 
 | |
|         # Read file content
 | |
|         cat_cmd = f'cat "{full_path}"'
 | |
|         result = ProcessUtilities.outputExecutioner(cat_cmd, user=user, retRequired=True)
 | |
| 
 | |
|         if len(result) < 2:
 | |
|             log_file_operation(scan_id, 'read', file_path, False, 'Unable to read file', request=request)
 | |
|             return JsonResponse({'success': False, 'error': 'Unable to read file'}, status=400)
 | |
| 
 | |
|         content = result[1] if result[1] is not None else ''
 | |
| 
 | |
|         # Calculate hashes
 | |
|         try:
 | |
|             content_bytes = content.encode('utf-8')
 | |
|             md5_hash = hashlib.md5(content_bytes).hexdigest()
 | |
|             sha256_hash = hashlib.sha256(content_bytes).hexdigest()
 | |
|         except UnicodeEncodeError:
 | |
|             try:
 | |
|                 content_bytes = content.encode('latin-1')
 | |
|                 md5_hash = hashlib.md5(content_bytes).hexdigest()
 | |
|                 sha256_hash = hashlib.sha256(content_bytes).hexdigest()
 | |
|             except:
 | |
|                 md5_hash = ''
 | |
|                 sha256_hash = ''
 | |
| 
 | |
|         # Detect MIME type
 | |
|         mime_type, _ = mimetypes.guess_type(full_path)
 | |
|         if not mime_type:
 | |
|             if file_ext == '.php':
 | |
|                 mime_type = 'text/x-php'
 | |
|             elif file_ext == '.js':
 | |
|                 mime_type = 'application/javascript'
 | |
|             else:
 | |
|                 mime_type = 'text/plain'
 | |
| 
 | |
|         # Format last modified time
 | |
|         import datetime
 | |
|         last_modified = datetime.datetime.fromtimestamp(last_modified_timestamp).strftime('%Y-%m-%dT%H:%M:%SZ')
 | |
| 
 | |
|         # Log success
 | |
|         log_file_operation(scan_id, 'read', file_path, True, request=request)
 | |
| 
 | |
|         logging.writeToFile(f'[API] File content retrieved: {file_path} ({file_size} bytes)')
 | |
| 
 | |
|         return JsonResponse({
 | |
|             'success': True,
 | |
|             'file_path': file_path,
 | |
|             'content': content,
 | |
|             'size': file_size,
 | |
|             'encoding': 'utf-8',
 | |
|             'mime_type': mime_type,
 | |
|             'last_modified': last_modified,
 | |
|             'hash': {
 | |
|                 'md5': md5_hash,
 | |
|                 'sha256': sha256_hash
 | |
|             }
 | |
|         })
 | |
| 
 | |
|     except Exception as e:
 | |
|         logging.writeToFile(f'[API] Get file error: {str(e)}')
 | |
|         log_file_operation(scan_id if 'scan_id' in locals() else 'unknown', 'read',
 | |
|                           file_path if 'file_path' in locals() else 'unknown', False, str(e), request=request)
 | |
|         return JsonResponse({'success': False, 'error': 'Internal server error'}, status=500)
 | |
| 
 | |
| 
 | |
| @csrf_exempt
 | |
| @require_http_methods(['POST'])
 | |
| def scanner_replace_file(request):
 | |
|     """
 | |
|     POST /api/scanner/replace-file
 | |
| 
 | |
|     Overwrite a file with new content (after backup)
 | |
| 
 | |
|     Headers:
 | |
|         Authorization: Bearer {file_access_token}
 | |
|         X-Scan-ID: {scan_job_id}
 | |
| 
 | |
|     Request Body:
 | |
|         {
 | |
|             "file_path": "wp-content/plugins/example/plugin.php",
 | |
|             "content": "<?php\n/*\nPlugin Name: Example Plugin (Clean Version)\n*/\n...",
 | |
|             "backup_before_replace": true,
 | |
|             "verify_hash": "2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae"
 | |
|         }
 | |
| 
 | |
|     Response:
 | |
|         {
 | |
|             "success": true,
 | |
|             "file_path": "wp-content/plugins/example/plugin.php",
 | |
|             "backup_path": "/home/username/public_html/.ai-scanner-backups/2025-10-25/plugin.php.1730000000.bak",
 | |
|             "bytes_written": 14850,
 | |
|             "new_hash": {
 | |
|                 "md5": "abc123...",
 | |
|                 "sha256": "def456..."
 | |
|             },
 | |
|             "timestamp": "2025-10-25T20:35:00Z"
 | |
|         }
 | |
|     """
 | |
|     try:
 | |
|         # Parse request
 | |
|         data = json.loads(request.body)
 | |
|         file_path = data.get('file_path', '').strip('/')
 | |
|         content = data.get('content', '')
 | |
|         backup_before_replace = data.get('backup_before_replace', True)
 | |
|         verify_hash = data.get('verify_hash', '')
 | |
| 
 | |
|         # Validate authorization
 | |
|         auth_header = request.META.get('HTTP_AUTHORIZATION', '')
 | |
|         if not auth_header.startswith('Bearer '):
 | |
|             return JsonResponse({'success': False, 'error': 'Missing or invalid Authorization header'}, status=401)
 | |
| 
 | |
|         access_token = auth_header.replace('Bearer ', '')
 | |
|         scan_id = request.META.get('HTTP_X_SCAN_ID', '')
 | |
| 
 | |
|         if not scan_id:
 | |
|             return JsonResponse({'success': False, 'error': 'X-Scan-ID header required'}, status=400)
 | |
| 
 | |
|         # Validate access token
 | |
|         file_token, error = validate_access_token(access_token, scan_id)
 | |
|         if error:
 | |
|             log_file_operation(scan_id, 'replace', file_path, False, error, request=request)
 | |
|             return JsonResponse({'success': False, 'error': error}, status=401)
 | |
| 
 | |
|         # Rate limiting
 | |
|         is_allowed, count = check_rate_limit(scan_id, 'replace-file', 100)
 | |
|         if not is_allowed:
 | |
|             return JsonResponse({'success': False, 'error': 'Rate limit exceeded (max 100 replacements per scan)'}, status=429)
 | |
| 
 | |
|         # Security check and get full path
 | |
|         try:
 | |
|             full_path = secure_path_check(file_token.wp_path, file_path)
 | |
|         except SecurityError as e:
 | |
|             log_file_operation(scan_id, 'replace', file_path, False, str(e), request=request)
 | |
|             return JsonResponse({'success': False, 'error': 'Path not allowed'}, status=403)
 | |
| 
 | |
|         # Get website user from auth wrapper (already validated during authentication)
 | |
|         user = file_token.external_app
 | |
|         if not user:
 | |
|             error_msg = 'External app not available in auth context'
 | |
|             log_file_operation(scan_id, 'replace', file_path, False, error_msg, request=request)
 | |
|             return JsonResponse({'success': False, 'error': error_msg}, status=500)
 | |
| 
 | |
|         # Verify hash if provided
 | |
|         from plogical.processUtilities import ProcessUtilities
 | |
|         import hashlib
 | |
|         import datetime
 | |
| 
 | |
|         if verify_hash:
 | |
|             cat_cmd = f'cat "{full_path}"'
 | |
|             result = ProcessUtilities.outputExecutioner(cat_cmd, user=user, retRequired=True)
 | |
| 
 | |
|             if result[1]:
 | |
|                 current_hash = hashlib.sha256(result[1].encode('utf-8')).hexdigest()
 | |
|                 if current_hash != verify_hash:
 | |
|                     log_file_operation(scan_id, 'replace', file_path, False, 'Hash verification failed - file was modified', request=request)
 | |
|                     return JsonResponse({
 | |
|                         'success': False,
 | |
|                         'error': 'Hash verification failed - file was modified during scan',
 | |
|                         'error_code': 'HASH_MISMATCH',
 | |
|                         'expected_hash': verify_hash,
 | |
|                         'actual_hash': current_hash
 | |
|                     }, status=400)
 | |
| 
 | |
|         backup_path = None
 | |
| 
 | |
|         # Create backup if requested
 | |
|         if backup_before_replace:
 | |
|             wp_path_clean = file_token.wp_path.rstrip('/')
 | |
|             backup_dir_name = f'{wp_path_clean}/.ai-scanner-backups/{datetime.datetime.now().strftime("%Y-%m-%d")}'
 | |
|             mkdir_cmd = f'mkdir -p "{backup_dir_name}"'
 | |
|             ProcessUtilities.executioner(mkdir_cmd, user=user)
 | |
| 
 | |
|             timestamp = int(time.time())
 | |
|             basename = os.path.basename(full_path)
 | |
|             backup_filename = f'{basename}.{timestamp}.bak'
 | |
|             backup_path = os.path.join(backup_dir_name, backup_filename)
 | |
| 
 | |
|             cp_cmd = f'cp "{full_path}" "{backup_path}"'
 | |
|             cp_result = ProcessUtilities.executioner(cp_cmd, user=user)
 | |
| 
 | |
|             # executioner returns 1 for success, 0 for failure
 | |
|             if cp_result != 1:
 | |
|                 log_file_operation(scan_id, 'replace', file_path, False, 'Failed to create backup', backup_path=backup_path, request=request)
 | |
|                 return JsonResponse({'success': False, 'error': 'Failed to create backup', 'error_code': 'BACKUP_FAILED'}, status=500)
 | |
| 
 | |
|         # Write new content to temp file first (atomic write)
 | |
|         # Write to /tmp (accessible by all users, no permission issues)
 | |
|         tmp_file = f'/tmp/scanner_temp_{scan_id}_{int(time.time())}.tmp'
 | |
| 
 | |
|         try:
 | |
|             # Write content directly to /tmp directory
 | |
|             with open(tmp_file, 'w', encoding='utf-8') as f:
 | |
|                 f.write(content)
 | |
|             logging.writeToFile(f'[API] Wrote {len(content)} bytes to {tmp_file}')
 | |
|         except Exception as e:
 | |
|             error_msg = f'Failed to write temp file: {str(e)}'
 | |
|             logging.writeToFile(f'[API] {error_msg}')
 | |
|             log_file_operation(scan_id, 'replace', file_path, False, error_msg, backup_path=backup_path, request=request)
 | |
|             return JsonResponse({'success': False, 'error': 'Failed to write file', 'error_code': 'WRITE_FAILED'}, status=500)
 | |
| 
 | |
|         # Get original file permissions
 | |
|         stat_cmd = f'stat -c %a "{full_path}"'
 | |
|         stat_result = ProcessUtilities.outputExecutioner(stat_cmd, user=user, retRequired=True)
 | |
|         permissions = '644'  # Default
 | |
|         if stat_result[1]:
 | |
|             permissions = stat_result[1].strip()
 | |
| 
 | |
|         # Set permissions on temp file
 | |
|         chmod_cmd = f'chmod {permissions} "{tmp_file}"'
 | |
|         ProcessUtilities.executioner(chmod_cmd)
 | |
| 
 | |
|         # Verify temp file has content before replacing
 | |
|         check_cmd = f'wc -c "{tmp_file}"'
 | |
|         check_result = ProcessUtilities.outputExecutioner(check_cmd, retRequired=True)
 | |
|         logging.writeToFile(f'[API] Temp file size check: {check_result}')
 | |
| 
 | |
|         # Replace file using cat redirection (more reliable than cp for overwriting)
 | |
|         # This ensures the file contents are actually replaced
 | |
|         replace_cmd = f'cat "{tmp_file}" > "{full_path}"'
 | |
|         logging.writeToFile(f'[API] Executing replace command: {replace_cmd}')
 | |
|         replace_result = ProcessUtilities.executioner(replace_cmd, user=user, shell=True)
 | |
|         logging.writeToFile(f'[API] Replace command result: {replace_result}')
 | |
| 
 | |
|         # Clean up temp file
 | |
|         try:
 | |
|             os.remove(tmp_file)
 | |
|         except:
 | |
|             pass
 | |
| 
 | |
|         # executioner returns 1 for success, 0 for failure
 | |
|         if replace_result != 1:
 | |
|             error_msg = 'Failed to replace file contents'
 | |
|             logging.writeToFile(f'[API] {error_msg}, replace_result={replace_result}')
 | |
|             log_file_operation(scan_id, 'replace', file_path, False, error_msg, backup_path=backup_path, request=request)
 | |
|             return JsonResponse({'success': False, 'error': 'Failed to replace file', 'error_code': 'REPLACE_FAILED'}, status=500)
 | |
| 
 | |
|         logging.writeToFile(f'[API] Successfully replaced {full_path} with new content')
 | |
| 
 | |
|         # Calculate new hash
 | |
|         cat_cmd = f'cat "{full_path}"'
 | |
|         result = ProcessUtilities.outputExecutioner(cat_cmd, user=user, retRequired=True)
 | |
| 
 | |
|         new_md5 = ''
 | |
|         new_sha256 = ''
 | |
|         if result[1]:
 | |
|             try:
 | |
|                 content_bytes = result[1].encode('utf-8')
 | |
|                 new_md5 = hashlib.md5(content_bytes).hexdigest()
 | |
|                 new_sha256 = hashlib.sha256(content_bytes).hexdigest()
 | |
|             except:
 | |
|                 pass
 | |
| 
 | |
|         bytes_written = len(content.encode('utf-8'))
 | |
| 
 | |
|         # Log success
 | |
|         log_file_operation(scan_id, 'replace', file_path, True, backup_path=backup_path, request=request)
 | |
| 
 | |
|         logging.writeToFile(f'[API] File replaced: {file_path} ({bytes_written} bytes)')
 | |
| 
 | |
|         return JsonResponse({
 | |
|             'success': True,
 | |
|             'file_path': file_path,
 | |
|             'backup_path': backup_path,
 | |
|             'bytes_written': bytes_written,
 | |
|             'new_hash': {
 | |
|                 'md5': new_md5,
 | |
|                 'sha256': new_sha256
 | |
|             },
 | |
|             'timestamp': datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ')
 | |
|         })
 | |
| 
 | |
|     except json.JSONDecodeError:
 | |
|         return JsonResponse({'success': False, 'error': 'Invalid JSON'}, status=400)
 | |
|     except Exception as e:
 | |
|         logging.writeToFile(f'[API] Replace file error: {str(e)}')
 | |
|         log_file_operation(scan_id if 'scan_id' in locals() else 'unknown', 'replace',
 | |
|                           file_path if 'file_path' in locals() else 'unknown', False, str(e), request=request)
 | |
|         return JsonResponse({'success': False, 'error': 'Internal server error'}, status=500)
 | |
| 
 | |
| 
 | |
| @csrf_exempt
 | |
| @require_http_methods(['POST'])
 | |
| def scanner_rename_file(request):
 | |
|     """
 | |
|     POST /api/scanner/rename-file
 | |
| 
 | |
|     Rename a file (used for quarantining malicious files)
 | |
| 
 | |
|     Headers:
 | |
|         Authorization: Bearer {file_access_token}
 | |
|         X-Scan-ID: {scan_job_id}
 | |
| 
 | |
|     Request Body:
 | |
|         {
 | |
|             "old_path": "wp-content/uploads/malicious.php",
 | |
|             "new_path": "wp-content/uploads/malicious.php.quarantined.1730000000",
 | |
|             "backup_before_rename": true
 | |
|         }
 | |
| 
 | |
|     Response:
 | |
|         {
 | |
|             "success": true,
 | |
|             "old_path": "wp-content/uploads/malicious.php",
 | |
|             "new_path": "wp-content/uploads/malicious.php.quarantined.1730000000",
 | |
|             "backup_path": "/home/username/public_html/.ai-scanner-backups/2025-10-25/malicious.php.1730000000.bak",
 | |
|             "timestamp": "2025-10-25T20:40:00Z"
 | |
|         }
 | |
|     """
 | |
|     try:
 | |
|         # Parse request
 | |
|         data = json.loads(request.body)
 | |
|         old_path = data.get('old_path', '').strip('/')
 | |
|         new_path = data.get('new_path', '').strip('/')
 | |
|         backup_before_rename = data.get('backup_before_rename', True)
 | |
| 
 | |
|         # Validate authorization
 | |
|         auth_header = request.META.get('HTTP_AUTHORIZATION', '')
 | |
|         if not auth_header.startswith('Bearer '):
 | |
|             return JsonResponse({'success': False, 'error': 'Missing or invalid Authorization header'}, status=401)
 | |
| 
 | |
|         access_token = auth_header.replace('Bearer ', '')
 | |
|         scan_id = request.META.get('HTTP_X_SCAN_ID', '')
 | |
| 
 | |
|         if not scan_id:
 | |
|             return JsonResponse({'success': False, 'error': 'X-Scan-ID header required'}, status=400)
 | |
| 
 | |
|         # Validate access token
 | |
|         file_token, error = validate_access_token(access_token, scan_id)
 | |
|         if error:
 | |
|             log_file_operation(scan_id, 'rename', old_path, False, error, request=request)
 | |
|             return JsonResponse({'success': False, 'error': error}, status=401)
 | |
| 
 | |
|         # Rate limiting
 | |
|         is_allowed, count = check_rate_limit(scan_id, 'rename-file', 50)
 | |
|         if not is_allowed:
 | |
|             return JsonResponse({'success': False, 'error': 'Rate limit exceeded (max 50 renames per scan)'}, status=429)
 | |
| 
 | |
|         # Security check for both paths
 | |
|         try:
 | |
|             full_old_path = secure_path_check(file_token.wp_path, old_path)
 | |
|             full_new_path = secure_path_check(file_token.wp_path, new_path)
 | |
|         except SecurityError as e:
 | |
|             log_file_operation(scan_id, 'rename', old_path, False, str(e), request=request)
 | |
|             return JsonResponse({'success': False, 'error': 'Path not allowed'}, status=403)
 | |
| 
 | |
|         # Get website user from auth wrapper (already validated during authentication)
 | |
|         user = file_token.external_app
 | |
|         if not user:
 | |
|             error_msg = 'External app not available in auth context'
 | |
|             log_file_operation(scan_id, 'rename', old_path, False, error_msg, request=request)
 | |
|             return JsonResponse({'success': False, 'error': error_msg}, status=500)
 | |
| 
 | |
|         # Check source file exists
 | |
|         from plogical.processUtilities import ProcessUtilities
 | |
|         import datetime
 | |
| 
 | |
|         check_cmd = f'test -f "{full_old_path}" && echo "exists"'
 | |
|         result = ProcessUtilities.outputExecutioner(check_cmd, user=user, retRequired=True)
 | |
| 
 | |
|         if not result[1] or 'exists' not in result[1]:
 | |
|             log_file_operation(scan_id, 'rename', old_path, False, 'Source file not found', request=request)
 | |
|             return JsonResponse({'success': False, 'error': 'Source file not found', 'error_code': 'FILE_NOT_FOUND'}, status=404)
 | |
| 
 | |
|         # Check destination doesn't exist
 | |
|         check_cmd = f'test -f "{full_new_path}" && echo "exists"'
 | |
|         result = ProcessUtilities.outputExecutioner(check_cmd, user=user, retRequired=True)
 | |
| 
 | |
|         if result[1] and 'exists' in result[1]:
 | |
|             log_file_operation(scan_id, 'rename', old_path, False, 'Destination file already exists', request=request)
 | |
|             return JsonResponse({'success': False, 'error': 'Destination file already exists', 'error_code': 'FILE_EXISTS'}, status=409)
 | |
| 
 | |
|         backup_path = None
 | |
| 
 | |
|         # Create backup if requested
 | |
|         if backup_before_rename:
 | |
|             wp_path_clean = file_token.wp_path.rstrip('/')
 | |
|             backup_dir_name = f'{wp_path_clean}/.ai-scanner-backups/{datetime.datetime.now().strftime("%Y-%m-%d")}'
 | |
|             mkdir_cmd = f'mkdir -p "{backup_dir_name}"'
 | |
|             mkdir_result = ProcessUtilities.executioner(mkdir_cmd, user=user)
 | |
| 
 | |
|             # executioner returns 1 for success, 0 for failure
 | |
|             if mkdir_result != 1:
 | |
|                 error_msg = f'Failed to create backup directory: {backup_dir_name}'
 | |
|                 logging.writeToFile(f'[API] {error_msg}')
 | |
|                 log_file_operation(scan_id, 'rename', old_path, False, error_msg, request=request)
 | |
|                 return JsonResponse({'success': False, 'error': 'Failed to create backup directory', 'error_code': 'BACKUP_DIR_FAILED'}, status=500)
 | |
| 
 | |
|             timestamp = int(time.time())
 | |
|             basename = os.path.basename(full_old_path)
 | |
|             backup_filename = f'{basename}.{timestamp}.bak'
 | |
|             backup_path = os.path.join(backup_dir_name, backup_filename)
 | |
| 
 | |
|             cp_cmd = f'cp "{full_old_path}" "{backup_path}"'
 | |
|             cp_result = ProcessUtilities.executioner(cp_cmd, user=user)
 | |
| 
 | |
|             # executioner returns 1 for success, 0 for failure
 | |
|             if cp_result != 1:
 | |
|                 error_msg = f'Failed to backup file before rename'
 | |
|                 logging.writeToFile(f'[API] {error_msg}, cp_result={cp_result}')
 | |
|                 log_file_operation(scan_id, 'rename', old_path, False, error_msg, request=request)
 | |
|                 return JsonResponse({'success': False, 'error': 'Failed to backup file before quarantine', 'error_code': 'BACKUP_FAILED'}, status=500)
 | |
| 
 | |
|         # Perform rename
 | |
|         mv_cmd = f'mv "{full_old_path}" "{full_new_path}"'
 | |
|         mv_result = ProcessUtilities.executioner(mv_cmd, user=user)
 | |
| 
 | |
|         # executioner returns 1 for success, 0 for failure
 | |
|         if mv_result != 1:
 | |
|             log_file_operation(scan_id, 'rename', old_path, False, 'Failed to rename file', backup_path=backup_path, request=request)
 | |
|             return JsonResponse({'success': False, 'error': 'Failed to rename file', 'error_code': 'RENAME_FAILED'}, status=500)
 | |
| 
 | |
|         # Verify rename
 | |
|         check_cmd = f'test -f "{full_new_path}" && echo "exists"'
 | |
|         result = ProcessUtilities.outputExecutioner(check_cmd, user=user, retRequired=True)
 | |
| 
 | |
|         if not result[1] or 'exists' not in result[1]:
 | |
|             log_file_operation(scan_id, 'rename', old_path, False, 'Rename verification failed', backup_path=backup_path, request=request)
 | |
|             return JsonResponse({'success': False, 'error': 'Rename verification failed'}, status=500)
 | |
| 
 | |
|         # Log success
 | |
|         log_file_operation(scan_id, 'rename', old_path, True, backup_path=backup_path, request=request)
 | |
| 
 | |
|         logging.writeToFile(f'[API] File renamed: {old_path} -> {new_path}')
 | |
| 
 | |
|         return JsonResponse({
 | |
|             'success': True,
 | |
|             'old_path': old_path,
 | |
|             'new_path': new_path,
 | |
|             'backup_path': backup_path,
 | |
|             'timestamp': datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ')
 | |
|         })
 | |
| 
 | |
|     except json.JSONDecodeError:
 | |
|         return JsonResponse({'success': False, 'error': 'Invalid JSON'}, status=400)
 | |
|     except Exception as e:
 | |
|         logging.writeToFile(f'[API] Rename file error: {str(e)}')
 | |
|         log_file_operation(scan_id if 'scan_id' in locals() else 'unknown', 'rename',
 | |
|                           old_path if 'old_path' in locals() else 'unknown', False, str(e), request=request)
 | |
|         return JsonResponse({'success': False, 'error': 'Internal server error'}, status=500)
 | |
| 
 | |
| 
 | |
| @csrf_exempt
 | |
| @require_http_methods(['POST'])
 | |
| def scanner_delete_file(request):
 | |
|     """
 | |
|     POST /api/scanner/delete-file
 | |
| 
 | |
|     Permanently delete a malicious file (after backup)
 | |
| 
 | |
|     Headers:
 | |
|         Authorization: Bearer {file_access_token}
 | |
|         X-Scan-ID: {scan_job_id}
 | |
| 
 | |
|     Request Body:
 | |
|         {
 | |
|             "file_path": "wp-content/uploads/shell.php",
 | |
|             "backup_before_delete": true,
 | |
|             "confirm_deletion": true
 | |
|         }
 | |
| 
 | |
|     Response:
 | |
|         {
 | |
|             "success": true,
 | |
|             "file_path": "wp-content/uploads/shell.php",
 | |
|             "backup_path": "/home/username/public_html/.ai-scanner-backups/2025-10-25/shell.php.1730000000.bak",
 | |
|             "deleted_at": "2025-10-25T20:45:00Z",
 | |
|             "file_info": {
 | |
|                 "size": 2048,
 | |
|                 "last_modified": "2025-10-20T14:30:00Z",
 | |
|                 "hash": "abc123..."
 | |
|             }
 | |
|         }
 | |
|     """
 | |
|     try:
 | |
|         # Parse request
 | |
|         data = json.loads(request.body)
 | |
|         file_path = data.get('file_path', '').strip('/')
 | |
|         backup_before_delete = data.get('backup_before_delete', True)
 | |
|         confirm_deletion = data.get('confirm_deletion', False)
 | |
| 
 | |
|         # Require explicit confirmation
 | |
|         if not confirm_deletion:
 | |
|             return JsonResponse({
 | |
|                 'success': False,
 | |
|                 'error': 'Deletion not confirmed',
 | |
|                 'error_code': 'CONFIRMATION_REQUIRED',
 | |
|                 'message': 'Set confirm_deletion: true to proceed'
 | |
|             }, status=400)
 | |
| 
 | |
|         # Validate authorization
 | |
|         auth_header = request.META.get('HTTP_AUTHORIZATION', '')
 | |
|         if not auth_header.startswith('Bearer '):
 | |
|             return JsonResponse({'success': False, 'error': 'Missing or invalid Authorization header'}, status=401)
 | |
| 
 | |
|         access_token = auth_header.replace('Bearer ', '')
 | |
|         scan_id = request.META.get('HTTP_X_SCAN_ID', '')
 | |
| 
 | |
|         if not scan_id:
 | |
|             return JsonResponse({'success': False, 'error': 'X-Scan-ID header required'}, status=400)
 | |
| 
 | |
|         # Validate access token
 | |
|         file_token, error = validate_access_token(access_token, scan_id)
 | |
|         if error:
 | |
|             log_file_operation(scan_id, 'delete', file_path, False, error, request=request)
 | |
|             return JsonResponse({'success': False, 'error': error}, status=401)
 | |
| 
 | |
|         # Rate limiting
 | |
|         is_allowed, count = check_rate_limit(scan_id, 'delete-file', 50)
 | |
|         if not is_allowed:
 | |
|             return JsonResponse({'success': False, 'error': 'Rate limit exceeded (max 50 deletions per scan)'}, status=429)
 | |
| 
 | |
|         # Security check and get full path
 | |
|         try:
 | |
|             full_path = secure_path_check(file_token.wp_path, file_path)
 | |
|         except SecurityError as e:
 | |
|             log_file_operation(scan_id, 'delete', file_path, False, str(e), request=request)
 | |
|             return JsonResponse({'success': False, 'error': 'Path not allowed'}, status=403)
 | |
| 
 | |
|         # Check for protected files
 | |
|         protected_files = ['wp-config.php', '.htaccess', 'index.php']
 | |
|         if os.path.basename(full_path) in protected_files:
 | |
|             log_file_operation(scan_id, 'delete', file_path, False, 'Cannot delete protected system file', request=request)
 | |
|             return JsonResponse({'success': False, 'error': 'Cannot delete protected system file', 'error_code': 'PROTECTED_FILE'}, status=403)
 | |
| 
 | |
|         # Get website user from auth wrapper (already validated during authentication)
 | |
|         user = file_token.external_app
 | |
|         if not user:
 | |
|             error_msg = 'External app not available in auth context'
 | |
|             log_file_operation(scan_id, 'delete', file_path, False, error_msg, request=request)
 | |
|             return JsonResponse({'success': False, 'error': error_msg}, status=500)
 | |
| 
 | |
|         # Get file info before deletion
 | |
|         from plogical.processUtilities import ProcessUtilities
 | |
|         import hashlib
 | |
|         import datetime
 | |
| 
 | |
|         stat_cmd = f'stat -c "%s %Y" "{full_path}"'
 | |
|         stat_result = ProcessUtilities.outputExecutioner(stat_cmd, user=user, retRequired=True)
 | |
| 
 | |
|         if not stat_result[1]:
 | |
|             log_file_operation(scan_id, 'delete', file_path, False, 'File not found', request=request)
 | |
|             return JsonResponse({'success': False, 'error': 'File not found', 'error_code': 'FILE_NOT_FOUND'}, status=404)
 | |
| 
 | |
|         file_size = 0
 | |
|         last_modified = ''
 | |
|         try:
 | |
|             parts = stat_result[1].strip().split()
 | |
|             file_size = int(parts[0])
 | |
|             last_modified_timestamp = int(parts[1])
 | |
|             last_modified = datetime.datetime.fromtimestamp(last_modified_timestamp).strftime('%Y-%m-%dT%H:%M:%SZ')
 | |
|         except (ValueError, IndexError):
 | |
|             pass
 | |
| 
 | |
|         # Get file hash
 | |
|         cat_cmd = f'cat "{full_path}"'
 | |
|         result = ProcessUtilities.outputExecutioner(cat_cmd, user=user, retRequired=True)
 | |
| 
 | |
|         file_hash = ''
 | |
|         if result[1]:
 | |
|             try:
 | |
|                 file_hash = hashlib.sha256(result[1].encode('utf-8')).hexdigest()
 | |
|             except:
 | |
|                 pass
 | |
| 
 | |
|         backup_path = None
 | |
| 
 | |
|         # ALWAYS create backup before deletion
 | |
|         wp_path_clean = file_token.wp_path.rstrip('/')
 | |
|         backup_dir_name = f'{wp_path_clean}/.ai-scanner-backups/{datetime.datetime.now().strftime("%Y-%m-%d")}'
 | |
|         mkdir_cmd = f'mkdir -p "{backup_dir_name}"'
 | |
|         mkdir_result = ProcessUtilities.executioner(mkdir_cmd, user=user)
 | |
| 
 | |
|         # executioner returns 1 for success, 0 for failure
 | |
|         if mkdir_result != 1:
 | |
|             error_msg = f'Failed to create backup directory: {backup_dir_name}'
 | |
|             logging.writeToFile(f'[API] {error_msg}')
 | |
|             log_file_operation(scan_id, 'delete', file_path, False, error_msg, request=request)
 | |
|             return JsonResponse({'success': False, 'error': 'Failed to create backup directory', 'error_code': 'BACKUP_DIR_FAILED'}, status=500)
 | |
| 
 | |
|         timestamp = int(time.time())
 | |
|         basename = os.path.basename(full_path)
 | |
|         backup_filename = f'{basename}.{timestamp}.bak'
 | |
|         backup_path = os.path.join(backup_dir_name, backup_filename)
 | |
| 
 | |
|         cp_cmd = f'cp "{full_path}" "{backup_path}"'
 | |
|         cp_result = ProcessUtilities.executioner(cp_cmd, user=user)
 | |
| 
 | |
|         # executioner returns 1 for success, 0 for failure
 | |
|         if cp_result != 1:
 | |
|             error_msg = f'Failed to backup file before deletion'
 | |
|             logging.writeToFile(f'[API] {error_msg}, cp_result={cp_result}')
 | |
|             log_file_operation(scan_id, 'delete', file_path, False, error_msg, backup_path=backup_path, request=request)
 | |
|             return JsonResponse({'success': False, 'error': 'Backup creation failed - deletion blocked', 'error_code': 'BACKUP_FAILED'}, status=500)
 | |
| 
 | |
|         # Delete file
 | |
|         rm_cmd = f'rm -f "{full_path}"'
 | |
|         rm_result = ProcessUtilities.executioner(rm_cmd, user=user)
 | |
| 
 | |
|         # executioner returns 1 for success, 0 for failure
 | |
|         if rm_result != 1:
 | |
|             log_file_operation(scan_id, 'delete', file_path, False, 'Failed to delete file', backup_path=backup_path, request=request)
 | |
|             return JsonResponse({'success': False, 'error': 'Failed to delete file', 'error_code': 'DELETE_FAILED'}, status=500)
 | |
| 
 | |
|         # Verify deletion
 | |
|         check_cmd = f'test -f "{full_path}" && echo "exists"'
 | |
|         result = ProcessUtilities.outputExecutioner(check_cmd, user=user, retRequired=True)
 | |
| 
 | |
|         if result[1] and 'exists' in result[1]:
 | |
|             log_file_operation(scan_id, 'delete', file_path, False, 'Deletion verification failed', backup_path=backup_path, request=request)
 | |
|             return JsonResponse({'success': False, 'error': 'Deletion verification failed'}, status=500)
 | |
| 
 | |
|         # Log success
 | |
|         log_file_operation(scan_id, 'delete', file_path, True, backup_path=backup_path, request=request)
 | |
| 
 | |
|         logging.writeToFile(f'[API] File deleted: {file_path} (backup: {backup_path})')
 | |
| 
 | |
|         return JsonResponse({
 | |
|             'success': True,
 | |
|             'file_path': file_path,
 | |
|             'backup_path': backup_path,
 | |
|             'deleted_at': datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ'),
 | |
|             'file_info': {
 | |
|                 'size': file_size,
 | |
|                 'last_modified': last_modified,
 | |
|                 'hash': file_hash
 | |
|             }
 | |
|         })
 | |
| 
 | |
|     except json.JSONDecodeError:
 | |
|         return JsonResponse({'success': False, 'error': 'Invalid JSON'}, status=400)
 | |
|     except Exception as e:
 | |
|         logging.writeToFile(f'[API] Delete file error: {str(e)}')
 | |
|         log_file_operation(scan_id if 'scan_id' in locals() else 'unknown', 'delete',
 | |
|                           file_path if 'file_path' in locals() else 'unknown', False, str(e), request=request)
 | |
|         return JsonResponse({'success': False, 'error': 'Internal server error'}, status=500)
 |