ai scanner

This commit is contained in:
usmannasir
2025-06-25 19:27:36 +05:00
parent 35d7102c48
commit 098e6d5227
16 changed files with 3122 additions and 1 deletions

View File

@@ -65,6 +65,7 @@ INSTALLED_APPS = [
'containerization',
'CLManager',
'IncBackups',
'aiScanner',
# 'WebTerminal'
]

View File

@@ -44,5 +44,6 @@ urlpatterns = [
path('container/', include('containerization.urls')),
path('CloudLinux/', include('CLManager.urls')),
path('IncrementalBackups/', include('IncBackups.urls')),
path('aiscanner/', include('aiScanner.urls')),
# path('Terminal/', include('WebTerminal.urls')),
]

0
aiScanner/__init__.py Normal file
View File

3
aiScanner/admin.py Normal file
View File

@@ -0,0 +1,3 @@
from django.contrib import admin
# Register your models here.

View File

@@ -0,0 +1,679 @@
import requests
import json
import uuid
import secrets
from datetime import datetime, timedelta
from django.shortcuts import render, redirect
from django.http import JsonResponse
from django.utils import timezone
from django.conf import settings
from django.urls import reverse
from django.contrib import messages
from loginSystem.models import Administrator
from .models import AIScannerSettings, ScanHistory, FileAccessToken
from plogical.acl import ACLManager
from plogical.CyberCPLogFileWriter import CyberCPLogFileWriter as logging
class AIScannerManager:
AI_SCANNER_API_BASE = 'https://platform.cyberpersons.com/ai-scanner'
def __init__(self):
self.logger = logging
def scannerHome(self, request, userID):
"""Main AI Scanner page"""
try:
admin = Administrator.objects.get(pk=userID)
# Check ACL permissions (with fallback for new field)
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL.aiScannerAccess == 0:
return ACLManager.loadError()
except AttributeError:
# Field doesn't exist yet, allow access for now
self.logger.writeToFile(f'[AIScannerManager.scannerHome] aiScannerAccess field not found, allowing access')
pass
# Get or create scanner settings
scanner_settings, created = AIScannerSettings.objects.get_or_create(
admin=admin,
defaults={'balance': 0.0000, 'is_payment_configured': False}
)
# Get current pricing from API
pricing_data = self.get_ai_scanner_pricing()
# Get recent scan history
recent_scans = ScanHistory.objects.filter(admin=admin)[:10]
# Get current balance if payment is configured
current_balance = scanner_settings.balance
self.logger.writeToFile(f'[AIScannerManager.scannerHome] Stored balance: {current_balance}')
if scanner_settings.is_payment_configured:
# Try to fetch latest balance from API (now supports flexible auth)
self.logger.writeToFile(f'[AIScannerManager.scannerHome] Fetching balance from API...')
api_balance = self.get_account_balance(scanner_settings.api_key)
self.logger.writeToFile(f'[AIScannerManager.scannerHome] API returned balance: {api_balance}')
if api_balance is not None:
scanner_settings.balance = api_balance
scanner_settings.save()
current_balance = api_balance
self.logger.writeToFile(f'[AIScannerManager.scannerHome] Updated balance to: {current_balance}')
else:
self.logger.writeToFile(f'[AIScannerManager.scannerHome] API balance call failed, keeping stored balance: {current_balance}')
# Get user's websites for scan selection
from websiteFunctions.models import Websites
try:
websites = Websites.objects.filter(admin=admin)
self.logger.writeToFile(f'[AIScannerManager.scannerHome] Found {websites.count()} websites for {admin.userName}')
except Exception as e:
self.logger.writeToFile(f'[AIScannerManager.scannerHome] Error fetching websites: {str(e)}')
websites = []
# Build context safely
self.logger.writeToFile(f'[AIScannerManager.scannerHome] Building context for {admin.userName}')
context = {
'admin': admin,
'scanner_settings': scanner_settings,
'pricing_data': pricing_data,
'recent_scans': recent_scans,
'current_balance': current_balance,
'websites': websites,
'is_payment_configured': scanner_settings.is_payment_configured,
}
self.logger.writeToFile(f'[AIScannerManager.scannerHome] Context built successfully, rendering template')
return render(request, 'aiScanner/scanner.html', context)
except Exception as e:
import traceback
self.logger.writeToFile(f'[AIScannerManager.scannerHome] Error: {str(e)}')
self.logger.writeToFile(f'[AIScannerManager.scannerHome] Traceback: {traceback.format_exc()}')
return render(request, 'aiScanner/scanner.html', {
'error': f'Failed to load AI Scanner page: {str(e)}',
'is_payment_configured': False,
'websites': [],
'recent_scans': [],
'current_balance': 0,
'pricing_data': None
})
def setupPayment(self, request, userID):
"""Setup payment method for AI scanner"""
try:
if request.method != 'POST':
return JsonResponse({'success': False, 'error': 'Invalid request method'})
admin = Administrator.objects.get(pk=userID)
# Check ACL permissions (with fallback for new field)
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL.aiScannerAccess == 0:
return JsonResponse({'success': False, 'error': 'Access denied'})
except AttributeError:
# Field doesn't exist yet, allow access for now
pass
# Get admin email and domain
cyberpanel_host = request.get_host() # Keep full host including port
cyberpanel_domain = cyberpanel_host.split(':')[0] # Domain only for email fallback
admin_email = admin.email if hasattr(admin, 'email') and admin.email else f'{admin.userName}@{cyberpanel_domain}'
self.logger.writeToFile(f'[AIScannerManager.setupPayment] Admin: {admin.userName}, Email: {admin_email}, Host: {cyberpanel_host}')
# Setup payment with AI Scanner API
self.logger.writeToFile(f'[AIScannerManager.setupPayment] Attempting payment setup for {admin_email} on {cyberpanel_host}')
setup_data = self.setup_ai_scanner_payment(admin_email, cyberpanel_host)
if setup_data:
self.logger.writeToFile(f'[AIScannerManager.setupPayment] Payment setup successful for {admin_email}')
return JsonResponse({
'success': True,
'payment_url': setup_data['payment_url'],
'token': setup_data['token']
})
else:
self.logger.writeToFile(f'[AIScannerManager.setupPayment] Payment setup failed for {admin_email}')
return JsonResponse({
'success': False,
'error': 'Failed to setup payment. Please check the logs and try again.'
})
except Exception as e:
self.logger.writeToFile(f'[AIScannerManager.setupPayment] Error: {str(e)}')
return JsonResponse({'success': False, 'error': 'Internal server error'})
def setupComplete(self, request, userID):
"""Handle return from payment setup"""
try:
admin = Administrator.objects.get(pk=userID)
status = request.GET.get('status')
# Log all URL parameters for debugging
self.logger.writeToFile(f'[AIScannerManager.setupComplete] All URL params: {dict(request.GET)}')
if status == 'success':
api_key = request.GET.get('api_key')
balance = request.GET.get('balance', '0.00')
self.logger.writeToFile(f'[AIScannerManager.setupComplete] API Key: {api_key[:20] if api_key else "None"}...')
self.logger.writeToFile(f'[AIScannerManager.setupComplete] Balance from URL: {balance}')
if api_key:
# Update scanner settings
scanner_settings, created = AIScannerSettings.objects.get_or_create(
admin=admin,
defaults={
'api_key': api_key,
'balance': float(balance),
'is_payment_configured': True
}
)
if not created:
scanner_settings.api_key = api_key
scanner_settings.balance = float(balance)
scanner_settings.is_payment_configured = True
scanner_settings.save()
self.logger.writeToFile(f'[AIScannerManager.setupComplete] Saved balance: {scanner_settings.balance}')
messages.success(request, f'Payment setup successful! You have ${balance} credit.')
self.logger.writeToFile(f'[AIScannerManager] Payment setup completed for {admin.userName} with balance ${balance}')
else:
messages.error(request, 'Payment setup completed but API key not received.')
elif status in ['failed', 'cancelled', 'error']:
error = request.GET.get('error', 'Payment setup failed')
messages.error(request, f'Payment setup failed: {error}')
self.logger.writeToFile(f'[AIScannerManager] Payment setup failed for {admin.userName}: {error}')
return redirect('aiScannerHome')
except Exception as e:
self.logger.writeToFile(f'[AIScannerManager.setupComplete] Error: {str(e)}')
messages.error(request, 'An error occurred during payment setup.')
return redirect('aiScannerHome')
def startScan(self, request, userID):
"""Start a new AI security scan"""
try:
if request.method != 'POST':
return JsonResponse({'success': False, 'error': 'Invalid request method'})
admin = Administrator.objects.get(pk=userID)
# Check ACL permissions (with fallback for new field)
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL.aiScannerAccess == 0:
return JsonResponse({'success': False, 'error': 'Access denied'})
except AttributeError:
# Field doesn't exist yet, allow access for now
pass
# Get scanner settings
try:
scanner_settings = AIScannerSettings.objects.get(admin=admin)
if not scanner_settings.is_payment_configured or not scanner_settings.api_key:
return JsonResponse({'success': False, 'error': 'Payment not configured'})
except AIScannerSettings.DoesNotExist:
return JsonResponse({'success': False, 'error': 'Scanner not configured'})
# Parse request data
data = json.loads(request.body)
domain = data.get('domain')
scan_type = data.get('scan_type', 'full')
if not domain:
return JsonResponse({'success': False, 'error': 'Domain is required'})
# Validate domain belongs to user
from websiteFunctions.models import Websites
try:
website = Websites.objects.get(domain=domain, admin=admin)
except Websites.DoesNotExist:
return JsonResponse({'success': False, 'error': 'Domain not found or access denied'})
# Generate scan ID and file access token
scan_id = f'cp_{uuid.uuid4().hex[:12]}'
file_access_token = self.generate_file_access_token()
# Create scan history record
scan_history = ScanHistory.objects.create(
admin=admin,
scan_id=scan_id,
domain=domain,
scan_type=scan_type,
status='pending'
)
# Create file access token
FileAccessToken.objects.create(
token=file_access_token,
scan_history=scan_history,
domain=domain,
wp_path=f'/home/{domain}/public_html', # Adjust path as needed
expires_at=timezone.now() + timedelta(hours=2)
)
# Submit scan to AI Scanner API
callback_url = f"https://{request.get_host()}/api/ai-scanner/callback"
file_access_base_url = f"https://{request.get_host()}/api/ai-scanner/"
scan_response = self.submit_wordpress_scan(
scanner_settings.api_key,
domain,
scan_type,
callback_url,
file_access_token,
file_access_base_url,
scan_id
)
if scan_response:
scan_history.status = 'running'
scan_history.save()
return JsonResponse({
'success': True,
'scan_id': scan_id,
'message': 'Scan started successfully'
})
else:
scan_history.status = 'failed'
scan_history.error_message = 'Failed to submit scan to AI Scanner API'
scan_history.save()
return JsonResponse({'success': False, 'error': 'Failed to start scan'})
except Exception as e:
self.logger.writeToFile(f'[AIScannerManager.startScan] Error: {str(e)}')
return JsonResponse({'success': False, 'error': 'Internal server error'})
def refreshBalance(self, request, userID):
"""Refresh account balance from API"""
try:
if request.method != 'POST':
return JsonResponse({'success': False, 'error': 'Invalid request method'})
admin = Administrator.objects.get(pk=userID)
# Get scanner settings
try:
scanner_settings = AIScannerSettings.objects.get(admin=admin)
if not scanner_settings.is_payment_configured or not scanner_settings.api_key:
return JsonResponse({'success': False, 'error': 'Payment not configured'})
except AIScannerSettings.DoesNotExist:
return JsonResponse({'success': False, 'error': 'Scanner not configured'})
# Fetch balance from API
api_balance = self.get_account_balance(scanner_settings.api_key)
if api_balance is not None:
old_balance = scanner_settings.balance
scanner_settings.balance = api_balance
scanner_settings.save()
self.logger.writeToFile(f'[AIScannerManager.refreshBalance] Updated balance from ${old_balance} to ${api_balance} for {admin.userName}')
return JsonResponse({
'success': True,
'balance': float(api_balance),
'message': f'Balance refreshed: ${api_balance:.4f}'
})
else:
return JsonResponse({'success': False, 'error': 'Failed to fetch balance from API'})
except Exception as e:
self.logger.writeToFile(f'[AIScannerManager.refreshBalance] Error: {str(e)}')
return JsonResponse({'success': False, 'error': 'Internal server error'})
def addPaymentMethod(self, request, userID):
"""Add a new payment method for the user"""
try:
if request.method != 'POST':
return JsonResponse({'success': False, 'error': 'Invalid request method'})
admin = Administrator.objects.get(pk=userID)
# Check if user has scanner configured
try:
scanner_settings = AIScannerSettings.objects.get(admin=admin)
if not scanner_settings.is_payment_configured or not scanner_settings.api_key:
return JsonResponse({'success': False, 'error': 'Initial payment setup required first'})
except AIScannerSettings.DoesNotExist:
return JsonResponse({'success': False, 'error': 'Scanner not configured'})
# Get admin email and domain
cyberpanel_host = request.get_host() # Keep full host including port
cyberpanel_domain = cyberpanel_host.split(':')[0] # Domain only for email fallback
admin_email = admin.email if hasattr(admin, 'email') and admin.email else f'{admin.userName}@{cyberpanel_domain}'
self.logger.writeToFile(f'[AIScannerManager.addPaymentMethod] Setting up new payment method for {admin.userName} (API key authentication)')
# Call platform API to add payment method
setup_data = self.setup_add_payment_method(scanner_settings.api_key, admin_email, cyberpanel_host)
if setup_data:
self.logger.writeToFile(f'[AIScannerManager.addPaymentMethod] Payment method setup successful for {admin_email}')
return JsonResponse({
'success': True,
'setup_url': setup_data['setup_url'],
'token': setup_data.get('token', '')
})
else:
self.logger.writeToFile(f'[AIScannerManager.addPaymentMethod] Payment method setup failed for {admin_email}')
return JsonResponse({
'success': False,
'error': 'Failed to setup payment method. Please try again.'
})
except Exception as e:
self.logger.writeToFile(f'[AIScannerManager.addPaymentMethod] Error: {str(e)}')
return JsonResponse({'success': False, 'error': 'Internal server error'})
def paymentMethodComplete(self, request, userID):
"""Handle return from adding payment method"""
try:
admin = Administrator.objects.get(pk=userID)
status = request.GET.get('status')
# Log all URL parameters for debugging
self.logger.writeToFile(f'[AIScannerManager.paymentMethodComplete] All URL params: {dict(request.GET)}')
if status == 'success':
payment_method_id = request.GET.get('payment_method_id')
card_last4 = request.GET.get('card_last4')
card_brand = request.GET.get('card_brand')
self.logger.writeToFile(f'[AIScannerManager.paymentMethodComplete] Payment method added: {payment_method_id} ({card_brand} ****{card_last4})')
if payment_method_id:
messages.success(request, f'Payment method added successfully! New {card_brand} card ending in {card_last4}.')
self.logger.writeToFile(f'[AIScannerManager] Payment method added for {admin.userName}: {card_brand} ****{card_last4}')
else:
messages.success(request, 'Payment method added successfully!')
elif status in ['failed', 'cancelled', 'error']:
error = request.GET.get('error', 'Failed to add payment method')
messages.error(request, f'Failed to add payment method: {error}')
self.logger.writeToFile(f'[AIScannerManager] Payment method add failed for {admin.userName}: {error}')
return redirect('aiScannerHome')
except Exception as e:
self.logger.writeToFile(f'[AIScannerManager.paymentMethodComplete] Error: {str(e)}')
messages.error(request, 'An error occurred while adding payment method.')
return redirect('aiScannerHome')
def scanCallback(self, request):
"""Handle scan results callback from AI Scanner API"""
try:
if request.method != 'POST':
return JsonResponse({'success': False, 'error': 'Invalid request method'})
data = json.loads(request.body)
scan_id = data.get('scan_id')
status = data.get('status')
if not scan_id:
return JsonResponse({'success': False, 'error': 'Scan ID required'})
# Find scan history record
try:
scan_history = ScanHistory.objects.get(scan_id=scan_id)
except ScanHistory.DoesNotExist:
self.logger.writeToFile(f'[AIScannerManager.scanCallback] Scan not found: {scan_id}')
return JsonResponse({'success': False, 'error': 'Scan not found'})
# Update scan status and results
scan_history.status = status
scan_history.completed_at = timezone.now()
if status == 'completed':
findings = data.get('findings', [])
summary = data.get('summary', {})
cost_usd = data.get('cost_usd', 0)
files_scanned = data.get('files_scanned', 0)
scan_history.set_findings(findings)
scan_history.set_summary(summary)
scan_history.cost_usd = cost_usd
scan_history.files_scanned = files_scanned
scan_history.issues_found = len(findings)
# Update user balance
scanner_settings = scan_history.admin.ai_scanner_settings
if cost_usd and scanner_settings.balance >= cost_usd:
scanner_settings.balance -= cost_usd
scanner_settings.save()
self.logger.writeToFile(f'[AIScannerManager] Scan completed: {scan_id}, Cost: ${cost_usd}, Issues: {len(findings)}')
elif status == 'failed':
error_message = data.get('error', 'Scan failed')
scan_history.error_message = error_message
self.logger.writeToFile(f'[AIScannerManager] Scan failed: {scan_id}, Error: {error_message}')
scan_history.save()
# Deactivate file access tokens
FileAccessToken.objects.filter(scan_history=scan_history).update(is_active=False)
return JsonResponse({'success': True})
except Exception as e:
self.logger.writeToFile(f'[AIScannerManager.scanCallback] Error: {str(e)}')
return JsonResponse({'success': False, 'error': 'Internal server error'})
# API Helper Methods
def get_ai_scanner_pricing(self):
"""Get current pricing from AI Scanner API"""
try:
response = requests.get(f'{self.AI_SCANNER_API_BASE}/api/plan/', timeout=10)
if response.status_code == 200:
return response.json()
return None
except Exception as e:
self.logger.writeToFile(f'[AIScannerManager.get_ai_scanner_pricing] Error: {str(e)}')
return None
def setup_ai_scanner_payment(self, user_email, cyberpanel_host):
"""Setup payment method with AI Scanner API"""
try:
payload = {
'email': user_email,
'domain': cyberpanel_host.split(':')[0], # Send domain without port
'return_url': f'https://{cyberpanel_host}/aiscanner/setup-complete/' # Include port in URL
}
self.logger.writeToFile(f'[AIScannerManager.setup_ai_scanner_payment] Sending request to: {self.AI_SCANNER_API_BASE}/cyberpanel/setup-payment/')
self.logger.writeToFile(f'[AIScannerManager.setup_ai_scanner_payment] Payload: {payload}')
response = requests.post(
f'{self.AI_SCANNER_API_BASE}/cyberpanel/setup-payment/',
json=payload,
timeout=10
)
self.logger.writeToFile(f'[AIScannerManager.setup_ai_scanner_payment] Response status: {response.status_code}')
self.logger.writeToFile(f'[AIScannerManager.setup_ai_scanner_payment] Response content: {response.text}')
if response.status_code == 200:
data = response.json()
if data.get('success'):
return {
'payment_url': data['payment_url'],
'token': data['token']
}
else:
self.logger.writeToFile(f'[AIScannerManager.setup_ai_scanner_payment] API returned success=false: {data.get("error", "Unknown error")}')
else:
self.logger.writeToFile(f'[AIScannerManager.setup_ai_scanner_payment] Non-200 status code: {response.status_code}')
return None
except Exception as e:
self.logger.writeToFile(f'[AIScannerManager.setup_ai_scanner_payment] Exception: {str(e)}')
return None
def get_account_balance(self, api_key):
"""Get current account balance"""
try:
self.logger.writeToFile(f'[AIScannerManager.get_account_balance] Requesting balance from: {self.AI_SCANNER_API_BASE}/api/account/balance/')
response = requests.get(
f'{self.AI_SCANNER_API_BASE}/api/account/balance/',
headers={'X-API-Key': api_key},
timeout=10
)
self.logger.writeToFile(f'[AIScannerManager.get_account_balance] Response status: {response.status_code}')
self.logger.writeToFile(f'[AIScannerManager.get_account_balance] Response content: {response.text}')
if response.status_code == 200:
data = response.json()
if data.get('success'):
# Use the new balance_usd field from flexible API
balance = float(data.get('balance_usd', data.get('balance', 0)))
auth_method = data.get('authenticated_via', 'unknown')
self.logger.writeToFile(f'[AIScannerManager.get_account_balance] Parsed balance: {balance} (auth: {auth_method})')
return balance
else:
# Even failed responses now include balance_usd hint
balance_hint = data.get('balance_usd', 0)
self.logger.writeToFile(f'[AIScannerManager.get_account_balance] API returned success=false: {data.get("error", "Unknown error")} (balance hint: {balance_hint})')
# Return the balance hint if available, even on auth failure
if balance_hint > 0:
return float(balance_hint)
else:
self.logger.writeToFile(f'[AIScannerManager.get_account_balance] Non-200 status code: {response.status_code}')
return None
except Exception as e:
self.logger.writeToFile(f'[AIScannerManager.get_account_balance] Exception: {str(e)}')
return None
def submit_wordpress_scan(self, api_key, domain, scan_type, callback_url, file_access_token, file_access_base_url, scan_id):
"""Submit scan request to AI Scanner API"""
try:
payload = {
'site_url': domain,
'scan_type': scan_type,
'cyberpanel_callback': callback_url,
'file_access_token': file_access_token,
'file_access_base_url': file_access_base_url,
'scan_id': scan_id
}
self.logger.writeToFile(f'[AIScannerManager.submit_wordpress_scan] Submitting scan {scan_id} for {domain}')
self.logger.writeToFile(f'[AIScannerManager.submit_wordpress_scan] Payload: {payload}')
response = requests.post(
f'{self.AI_SCANNER_API_BASE}/api/scan/submit-v2/',
headers={'X-API-Key': api_key},
json=payload,
timeout=10
)
self.logger.writeToFile(f'[AIScannerManager.submit_wordpress_scan] Response status: {response.status_code}')
self.logger.writeToFile(f'[AIScannerManager.submit_wordpress_scan] Response content: {response.text}')
if response.status_code == 200:
data = response.json()
if data.get('success'):
platform_scan_id = data.get('scan_id')
self.logger.writeToFile(f'[AIScannerManager.submit_wordpress_scan] Platform assigned scan ID: {platform_scan_id}')
return platform_scan_id
else:
self.logger.writeToFile(f'[AIScannerManager.submit_wordpress_scan] Platform returned success=false: {data.get("error", "Unknown error")}')
else:
self.logger.writeToFile(f'[AIScannerManager.submit_wordpress_scan] Non-200 status code: {response.status_code}')
return None
except Exception as e:
self.logger.writeToFile(f'[AIScannerManager.submit_wordpress_scan] Error: {str(e)}')
return None
def get_scan_status(self, api_key, scan_id):
"""Get scan status from AI Scanner API"""
try:
response = requests.get(
f'{self.AI_SCANNER_API_BASE}/api/scan/{scan_id}/status/',
headers={'X-API-Key': api_key},
timeout=10
)
if response.status_code == 200:
return response.json()
return None
except Exception as e:
self.logger.writeToFile(f'[AIScannerManager.get_scan_status] Error: {str(e)}')
return None
def get_scan_results(self, api_key, scan_id):
"""Get scan results from AI Scanner API"""
try:
response = requests.get(
f'{self.AI_SCANNER_API_BASE}/api/scan/{scan_id}/results/',
headers={'X-API-Key': api_key},
timeout=10
)
if response.status_code == 200:
return response.json()
return None
except Exception as e:
self.logger.writeToFile(f'[AIScannerManager.get_scan_results] Error: {str(e)}')
return None
def setup_add_payment_method(self, api_key, user_email, cyberpanel_host):
"""Setup additional payment method with AI Scanner API"""
try:
payload = {
'domain': cyberpanel_host.split(':')[0], # Send domain without port
'return_url': f'https://{cyberpanel_host}/aiscanner/payment-method-complete/', # Include port in URL
'action': 'add_payment_method' # Indicate this is adding a payment method, not initial setup
}
self.logger.writeToFile(f'[AIScannerManager.setup_add_payment_method] Sending request to: {self.AI_SCANNER_API_BASE}/cyberpanel/add-payment-method/')
self.logger.writeToFile(f'[AIScannerManager.setup_add_payment_method] Payload: {payload}')
response = requests.post(
f'{self.AI_SCANNER_API_BASE}/cyberpanel/add-payment-method/',
headers={'X-API-Key': api_key},
json=payload,
timeout=10
)
self.logger.writeToFile(f'[AIScannerManager.setup_add_payment_method] Response status: {response.status_code}')
self.logger.writeToFile(f'[AIScannerManager.setup_add_payment_method] Response content: {response.text}')
if response.status_code == 200:
data = response.json()
if data.get('success'):
return {
'setup_url': data['setup_url'],
'token': data.get('token', '')
}
else:
self.logger.writeToFile(f'[AIScannerManager.setup_add_payment_method] API returned success=false: {data.get("error", "Unknown error")}')
else:
self.logger.writeToFile(f'[AIScannerManager.setup_add_payment_method] Non-200 status code: {response.status_code}')
return None
except Exception as e:
self.logger.writeToFile(f'[AIScannerManager.setup_add_payment_method] Exception: {str(e)}')
return None
def generate_file_access_token(self):
"""Generate secure file access token"""
return f'cp_{secrets.token_urlsafe(32)}'

627
aiScanner/api.py Normal file
View File

@@ -0,0 +1,627 @@
import json
import os
import time
import mimetypes
import base64
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods
from websiteFunctions.models import Websites
from loginSystem.models import Administrator
from .models import FileAccessToken, ScanHistory
from plogical.CyberCPLogFileWriter import CyberCPLogFileWriter as logging
class SecurityError(Exception):
"""Custom exception for security violations"""
pass
def validate_access_token(token, scan_id):
"""
Implement proper token validation
- Check token format
- Verify token hasn't expired
- Confirm token is for the correct scan
- Log access attempts
"""
try:
if not token or not token.startswith('cp_'):
logging.writeToFile(f'[API] Invalid token format: {token[:20] if token else "None"}...')
return None, "Invalid token format"
# Find the token in database
try:
file_token = FileAccessToken.objects.get(
token=token,
scan_history__scan_id=scan_id,
is_active=True
)
if file_token.is_expired():
logging.writeToFile(f'[API] Token expired for scan {scan_id}')
return None, "Token expired"
logging.writeToFile(f'[API] Token validated successfully for scan {scan_id}')
return file_token, None
except FileAccessToken.DoesNotExist:
logging.writeToFile(f'[API] Token not found for scan {scan_id}')
return None, "Invalid token"
except Exception as e:
logging.writeToFile(f'[API] Token validation error: {str(e)}')
return None, "Token validation failed"
def secure_path_check(base_path, requested_path):
"""
Ensure requested path is within allowed directory
Prevent directory traversal attacks
"""
try:
if requested_path:
full_path = os.path.join(base_path, requested_path.strip('/'))
else:
full_path = base_path
full_path = os.path.abspath(full_path)
base_path = os.path.abspath(base_path)
if not full_path.startswith(base_path):
raise SecurityError("Path outside allowed directory")
return full_path
except Exception as e:
raise SecurityError(f"Path security check failed: {str(e)}")
@csrf_exempt
@require_http_methods(['POST'])
def authenticate_worker(request):
"""
POST /api/ai-scanner/authenticate
Request Body:
{
"access_token": "cp_access_abc123...",
"scan_id": "550e8400-e29b-41d4-a716-446655440000",
"worker_id": "scanner-1.domain.com"
}
Response:
{
"success": true,
"site_info": {
"domain": "client-domain.com",
"wp_path": "/home/client/public_html",
"php_version": "8.1",
"wp_version": "6.3.1"
},
"permissions": ["read_files", "list_directories"],
"expires_at": "2024-12-25T11:00:00Z"
}
"""
try:
data = json.loads(request.body)
access_token = data.get('access_token')
scan_id = data.get('scan_id')
worker_id = data.get('worker_id', 'unknown')
logging.writeToFile(f'[API] Authentication request from worker {worker_id} for scan {scan_id}')
if not access_token or not scan_id:
return JsonResponse({'error': 'Missing access_token or scan_id'}, status=400)
# Validate access token
file_token, error = validate_access_token(access_token, scan_id)
if error:
return JsonResponse({'error': error}, status=401)
# Get website info
try:
website = Websites.objects.get(domain=file_token.domain)
# Get WordPress info
wp_path = file_token.wp_path
wp_version = 'Unknown'
php_version = 'Unknown'
# Try to get WP version from wp-includes/version.php using ProcessUtilities
version_file = os.path.join(wp_path, 'wp-includes', 'version.php')
try:
from plogical.processUtilities import ProcessUtilities
# Use ProcessUtilities to read file as the website user
command = f'cat "{version_file}"'
result = ProcessUtilities.outputExecutioner(command, user=website.externalApp, retRequired=True)
if result[1]: # Check if there's content (ignore return code)
content = result[1]
import re
match = re.search(r'\$wp_version\s*=\s*[\'"]([^\'"]+)[\'"]', content)
if match:
wp_version = match.group(1)
logging.writeToFile(f'[API] Detected WordPress version: {wp_version}')
else:
logging.writeToFile(f'[API] Could not read WP version file: {result[1] if len(result) > 1 else "No content returned"}')
except Exception as e:
logging.writeToFile(f'[API] Error reading WP version: {str(e)}')
# Try to detect PHP version (basic detection)
try:
import subprocess
result = subprocess.run(['php', '-v'], capture_output=True, text=True, timeout=5)
if result.returncode == 0:
import re
match = re.search(r'PHP (\d+\.\d+)', result.stdout)
if match:
php_version = match.group(1)
except Exception:
pass
response_data = {
'success': True,
'site_info': {
'domain': file_token.domain,
'wp_path': wp_path,
'php_version': php_version,
'wp_version': wp_version,
'scan_id': scan_id
},
'permissions': ['read_files', 'list_directories'],
'expires_at': file_token.expires_at.strftime('%Y-%m-%dT%H:%M:%SZ')
}
logging.writeToFile(f'[API] Authentication successful for {file_token.domain}')
return JsonResponse(response_data)
except Websites.DoesNotExist:
logging.writeToFile(f'[API] Website not found: {file_token.domain}')
return JsonResponse({'error': 'Website not found'}, status=404)
except json.JSONDecodeError:
return JsonResponse({'error': 'Invalid JSON'}, status=400)
except Exception as e:
logging.writeToFile(f'[API] Authentication error: {str(e)}')
return JsonResponse({'error': 'Authentication failed'}, status=500)
@csrf_exempt
@require_http_methods(['GET'])
def list_files(request):
"""
GET /api/ai-scanner/files/list?path=wp-content/plugins
Headers:
Authorization: Bearer cp_access_abc123...
X-Scan-ID: 550e8400-e29b-41d4-a716-446655440000
Response:
{
"path": "wp-content/plugins",
"items": [
{
"name": "akismet",
"type": "directory",
"modified": "2024-12-20T10:30:00Z"
},
{
"name": "suspicious-plugin.php",
"type": "file",
"size": 15420,
"modified": "2024-12-24T15:20:00Z",
"permissions": "644"
}
]
}
"""
try:
# Validate authorization
auth_header = request.META.get('HTTP_AUTHORIZATION', '')
if not auth_header.startswith('Bearer '):
return JsonResponse({'error': 'Missing or invalid Authorization header'}, status=401)
access_token = auth_header.replace('Bearer ', '')
scan_id = request.META.get('HTTP_X_SCAN_ID', '')
if not scan_id:
return JsonResponse({'error': 'X-Scan-ID header required'}, status=400)
# Validate access token
file_token, error = validate_access_token(access_token, scan_id)
if error:
return JsonResponse({'error': error}, status=401)
# Get parameters
path = request.GET.get('path', '').strip('/')
try:
# Security check and get full path
full_path = secure_path_check(file_token.wp_path, path)
# Path existence and type checking will be done by ProcessUtilities
# List directory contents using ProcessUtilities
items = []
try:
from plogical.processUtilities import ProcessUtilities
from websiteFunctions.models import Websites
# Get website object for user context
try:
website = Websites.objects.get(domain=file_token.domain)
user = website.externalApp
except Websites.DoesNotExist:
return JsonResponse({'error': 'Website not found'}, status=404)
# Use ls command with ProcessUtilities to list directory as website user
ls_command = f'ls -la "{full_path}"'
result = ProcessUtilities.outputExecutioner(ls_command, user=user, retRequired=True)
if result[1]: # Check if there's content (ignore return code)
lines = result[1].strip().split('\n')
for line in lines[1:]: # Skip the 'total' line
if not line.strip():
continue
parts = line.split()
if len(parts) < 9:
continue
permissions = parts[0]
size = parts[4] if parts[4].isdigit() else 0
name = ' '.join(parts[8:]) # Handle filenames with spaces
# Skip hidden files, current/parent directory entries
if name.startswith('.') or name in ['.', '..'] or name in ['__pycache__', 'node_modules']:
continue
item_data = {
'name': name,
'type': 'directory' if permissions.startswith('d') else 'file',
'permissions': permissions[1:4] if len(permissions) >= 4 else '644'
}
if permissions.startswith('-'): # Regular file
try:
item_data['size'] = int(size)
except ValueError:
item_data['size'] = 0
# Only include certain file types
if name.endswith(('.php', '.js', '.html', '.htm', '.css', '.txt', '.md', '.json', '.xml', '.sql', '.log', '.conf', '.ini', '.yml', '.yaml')):
items.append(item_data)
elif permissions.startswith('d'): # Directory
# Directories don't have a size in the same way
item_data['size'] = 0
items.append(item_data)
else:
# Other file types (links, etc.) - include with size 0
item_data['size'] = 0
items.append(item_data)
else:
logging.writeToFile(f'[API] Directory listing failed: {result[1] if len(result) > 1 else "No content returned"}')
return JsonResponse({'error': 'Directory access failed'}, status=403)
except Exception as e:
logging.writeToFile(f'[API] Directory listing error: {str(e)}')
return JsonResponse({'error': 'Directory access failed'}, status=403)
logging.writeToFile(f'[API] Listed {len(items)} items in {path or "root"} for scan {scan_id}')
return JsonResponse({
'path': path,
'items': sorted(items, key=lambda x: (x['type'] == 'file', x['name'].lower()))
})
except SecurityError as e:
logging.writeToFile(f'[API] Security violation: {str(e)}')
return JsonResponse({'error': 'Path not allowed'}, status=403)
except Exception as e:
logging.writeToFile(f'[API] List files error: {str(e)}')
return JsonResponse({'error': 'Internal server error'}, status=500)
@csrf_exempt
@require_http_methods(['GET'])
def get_file_content(request):
"""
GET /api/ai-scanner/files/content?path=wp-content/plugins/plugin.php
Headers:
Authorization: Bearer cp_access_abc123...
X-Scan-ID: 550e8400-e29b-41d4-a716-446655440000
Response:
{
"path": "wp-content/plugins/plugin.php",
"content": "<?php\n// Plugin code here...",
"size": 15420,
"encoding": "utf-8",
"mime_type": "text/x-php"
}
"""
try:
# Validate authorization
auth_header = request.META.get('HTTP_AUTHORIZATION', '')
if not auth_header.startswith('Bearer '):
return JsonResponse({'error': 'Missing or invalid Authorization header'}, status=401)
access_token = auth_header.replace('Bearer ', '')
scan_id = request.META.get('HTTP_X_SCAN_ID', '')
if not scan_id:
return JsonResponse({'error': 'X-Scan-ID header required'}, status=400)
# Get file path
path = request.GET.get('path', '').strip('/')
if not path:
return JsonResponse({'error': 'File path required'}, status=400)
# Validate access token
file_token, error = validate_access_token(access_token, scan_id)
if error:
return JsonResponse({'error': error}, status=401)
try:
# Security check and get full path
full_path = secure_path_check(file_token.wp_path, path)
# File existence, type, and size checking will be done by ProcessUtilities
# Only allow specific file types for security
allowed_extensions = {
'.php', '.js', '.html', '.htm', '.css', '.txt', '.md',
'.json', '.xml', '.sql', '.log', '.conf', '.ini', '.yml', '.yaml'
}
file_ext = os.path.splitext(full_path)[1].lower()
if file_ext not in allowed_extensions:
return JsonResponse({'error': f'File type not allowed: {file_ext}'}, status=403)
# Read file content using ProcessUtilities
try:
from plogical.processUtilities import ProcessUtilities
from websiteFunctions.models import Websites
# Get website object for user context
try:
website = Websites.objects.get(domain=file_token.domain)
user = website.externalApp
except Websites.DoesNotExist:
return JsonResponse({'error': 'Website not found'}, status=404)
# Check file size first using stat command
stat_command = f'stat -c %s "{full_path}"'
stat_result = ProcessUtilities.outputExecutioner(stat_command, user=user, retRequired=True)
if stat_result[1]: # Check if there's content (ignore return code)
try:
file_size = int(stat_result[1].strip())
if file_size > 10 * 1024 * 1024: # 10MB limit
return JsonResponse({'error': 'File too large (max 10MB)'}, status=400)
except ValueError:
logging.writeToFile(f'[API] Could not parse file size: {stat_result[1]}')
file_size = 0
else:
logging.writeToFile(f'[API] Could not get file size: {stat_result[1] if len(stat_result) > 1 else "No content returned"}')
return JsonResponse({'error': 'File not found or inaccessible'}, status=404)
# Use cat command with ProcessUtilities to read file as website user
cat_command = f'cat "{full_path}"'
result = ProcessUtilities.outputExecutioner(cat_command, user=user, retRequired=True)
# Check if content was returned (file might be empty, which is valid)
if len(result) > 1: # We got a tuple back
content = result[1] if result[1] is not None else ''
encoding = 'utf-8'
else:
logging.writeToFile(f'[API] File read failed: No result returned')
return JsonResponse({'error': 'Unable to read file'}, status=400)
except Exception as e:
logging.writeToFile(f'[API] File read error: {str(e)}')
return JsonResponse({'error': 'Unable to read file'}, status=400)
# Detect MIME type
mime_type, _ = mimetypes.guess_type(full_path)
if not mime_type:
if file_ext == '.php':
mime_type = 'text/x-php'
elif file_ext == '.js':
mime_type = 'application/javascript'
else:
mime_type = 'text/plain'
# Base64 encode the content for safe transport
try:
content_base64 = base64.b64encode(content.encode('utf-8')).decode('utf-8')
except UnicodeEncodeError:
# Handle binary files or encoding issues
try:
content_base64 = base64.b64encode(content.encode('latin-1')).decode('utf-8')
encoding = 'latin-1'
except:
logging.writeToFile(f'[API] Failed to encode file content for {path}')
return JsonResponse({'error': 'File encoding not supported'}, status=400)
logging.writeToFile(f'[API] File content retrieved: {path} ({file_size} bytes) for scan {scan_id}')
return JsonResponse({
'path': path,
'content': content_base64,
'size': file_size,
'encoding': encoding,
'mime_type': mime_type
})
except SecurityError as e:
logging.writeToFile(f'[API] Security violation: {str(e)}')
return JsonResponse({'error': 'Path not allowed'}, status=403)
except Exception as e:
logging.writeToFile(f'[API] Get file content error: {str(e)}')
return JsonResponse({'error': 'Internal server error'}, status=500)
@csrf_exempt
@require_http_methods(['POST'])
def scan_callback(request):
"""
Receive scan completion callbacks from AI Scanner platform
POST /api/ai-scanner/callback
Content-Type: application/json
Expected payload:
{
"scan_id": "uuid",
"status": "completed",
"summary": {
"threat_level": "HIGH|MEDIUM|LOW",
"total_findings": 3,
"files_scanned": 25,
"cost": "$0.0456"
},
"findings": [
{
"file_path": "wp-content/plugins/file.php",
"severity": "CRITICAL|HIGH|MEDIUM|LOW",
"title": "Issue title",
"description": "Detailed description",
"ai_confidence": 95
}
],
"ai_analysis": "AI summary text",
"completed_at": "2025-06-23T11:40:12Z"
}
"""
try:
# Parse JSON payload
data = json.loads(request.body)
scan_id = data.get('scan_id')
status = data.get('status')
summary = data.get('summary', {})
findings = data.get('findings', [])
ai_analysis = data.get('ai_analysis', '')
completed_at = data.get('completed_at')
logging.writeToFile(f"[API] Received callback for scan {scan_id}: {status}")
# Update scan status in CyberPanel database
try:
from .models import ScanHistory
from django.utils import timezone
import datetime
# Find the scan record
scan_record = ScanHistory.objects.get(scan_id=scan_id)
# Update scan record
scan_record.status = status
scan_record.issues_found = summary.get('total_findings', 0)
scan_record.files_scanned = summary.get('files_scanned', 0)
# Parse and store cost
cost_str = summary.get('cost', '$0.00')
try:
# Remove '$' and convert to float
cost_value = float(cost_str.replace('$', '').replace(',', ''))
scan_record.cost_usd = cost_value
except (ValueError, AttributeError):
scan_record.cost_usd = 0.0
# Store findings and AI analysis
scan_record.set_findings(findings)
# Build summary dict
summary_dict = {
'threat_level': summary.get('threat_level', 'UNKNOWN'),
'total_findings': summary.get('total_findings', 0),
'files_scanned': summary.get('files_scanned', 0),
'ai_analysis': ai_analysis
}
scan_record.set_summary(summary_dict)
# Set completion time
if completed_at:
try:
# Parse ISO format datetime
completed_datetime = datetime.datetime.fromisoformat(completed_at.replace('Z', '+00:00'))
scan_record.completed_at = completed_datetime
except ValueError:
scan_record.completed_at = timezone.now()
else:
scan_record.completed_at = timezone.now()
scan_record.save()
# Update user balance if scan cost money
if scan_record.cost_usd > 0:
try:
scanner_settings = scan_record.admin.ai_scanner_settings
if scanner_settings.balance >= scan_record.cost_usd:
# Convert to same type to avoid Decimal/float issues
scanner_settings.balance = float(scanner_settings.balance) - float(scan_record.cost_usd)
scanner_settings.save()
logging.writeToFile(f"[API] Deducted ${scan_record.cost_usd} from {scan_record.admin.userName} balance")
else:
logging.writeToFile(f"[API] Insufficient balance for scan cost: ${scan_record.cost_usd}")
except Exception as e:
logging.writeToFile(f"[API] Error updating balance: {str(e)}")
# Deactivate file access tokens for this scan
try:
from .models import FileAccessToken
FileAccessToken.objects.filter(scan_history=scan_record).update(is_active=False)
logging.writeToFile(f"[API] Deactivated file access tokens for scan {scan_id}")
except Exception as e:
logging.writeToFile(f"[API] Error deactivating tokens: {str(e)}")
logging.writeToFile(f"[API] Scan {scan_id} completed successfully:")
logging.writeToFile(f"[API] Status: {status}")
logging.writeToFile(f"[API] Threat Level: {summary.get('threat_level')}")
logging.writeToFile(f"[API] Findings: {summary.get('total_findings')}")
logging.writeToFile(f"[API] Files Scanned: {summary.get('files_scanned')}")
logging.writeToFile(f"[API] Cost: {summary.get('cost')}")
except ScanHistory.DoesNotExist:
logging.writeToFile(f"[API] Scan record not found: {scan_id}")
return JsonResponse({
'status': 'error',
'message': 'Scan record not found',
'scan_id': scan_id
}, status=404)
except Exception as e:
logging.writeToFile(f"[API] Failed to update scan record: {str(e)}")
return JsonResponse({
'status': 'error',
'message': 'Failed to update scan record',
'scan_id': scan_id
}, status=500)
# Return success response
return JsonResponse({
'status': 'success',
'message': 'Callback received successfully',
'scan_id': scan_id
})
except json.JSONDecodeError:
logging.writeToFile("[API] Invalid JSON in callback request")
return JsonResponse({
'status': 'error',
'message': 'Invalid JSON payload'
}, status=400)
except Exception as e:
logging.writeToFile(f"[API] Callback processing error: {str(e)}")
return JsonResponse({
'status': 'error',
'message': 'Internal server error'
}, status=500)

6
aiScanner/apps.py Normal file
View File

@@ -0,0 +1,6 @@
from django.apps import AppConfig
class AiscannerConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'aiScanner'

View File

109
aiScanner/models.py Normal file
View File

@@ -0,0 +1,109 @@
from django.db import models
from loginSystem.models import Administrator
import json
# Import the status update model
from .status_models import ScanStatusUpdate
class AIScannerSettings(models.Model):
"""Store AI scanner configuration and API keys for administrators"""
admin = models.OneToOneField(Administrator, on_delete=models.CASCADE, related_name='ai_scanner_settings')
api_key = models.CharField(max_length=255, blank=True, null=True)
balance = models.DecimalField(max_digits=10, decimal_places=4, default=0.0000)
is_payment_configured = models.BooleanField(default=False)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'ai_scanner_settings'
def __str__(self):
return f"AI Scanner Settings for {self.admin.userName}"
class ScanHistory(models.Model):
"""Store scan history and results"""
SCAN_STATUS_CHOICES = [
('pending', 'Pending'),
('running', 'Running'),
('completed', 'Completed'),
('failed', 'Failed'),
('cancelled', 'Cancelled'),
]
SCAN_TYPE_CHOICES = [
('full', 'Full Scan'),
('quick', 'Quick Scan'),
('custom', 'Custom Scan'),
]
admin = models.ForeignKey(Administrator, on_delete=models.CASCADE, related_name='scan_history')
scan_id = models.CharField(max_length=100, unique=True)
domain = models.CharField(max_length=255)
scan_type = models.CharField(max_length=20, choices=SCAN_TYPE_CHOICES, default='full')
status = models.CharField(max_length=20, choices=SCAN_STATUS_CHOICES, default='pending')
cost_usd = models.DecimalField(max_digits=10, decimal_places=6, null=True, blank=True)
files_scanned = models.IntegerField(default=0)
issues_found = models.IntegerField(default=0)
findings_json = models.TextField(blank=True, null=True) # Store JSON findings
summary_json = models.TextField(blank=True, null=True) # Store JSON summary
error_message = models.TextField(blank=True, null=True)
started_at = models.DateTimeField(auto_now_add=True)
completed_at = models.DateTimeField(null=True, blank=True)
class Meta:
db_table = 'ai_scanner_history'
ordering = ['-started_at']
def __str__(self):
return f"Scan {self.scan_id} - {self.domain} ({self.status})"
@property
def findings(self):
"""Parse findings JSON"""
if self.findings_json:
try:
return json.loads(self.findings_json)
except json.JSONDecodeError:
return []
return []
@property
def summary(self):
"""Parse summary JSON"""
if self.summary_json:
try:
return json.loads(self.summary_json)
except json.JSONDecodeError:
return {}
return {}
def set_findings(self, findings_list):
"""Set findings from list/dict"""
self.findings_json = json.dumps(findings_list)
def set_summary(self, summary_dict):
"""Set summary from dict"""
self.summary_json = json.dumps(summary_dict)
class FileAccessToken(models.Model):
"""Temporary tokens for file access during scans"""
token = models.CharField(max_length=100, unique=True)
scan_history = models.ForeignKey(ScanHistory, on_delete=models.CASCADE, related_name='access_tokens')
domain = models.CharField(max_length=255)
wp_path = models.CharField(max_length=500)
expires_at = models.DateTimeField()
created_at = models.DateTimeField(auto_now_add=True)
is_active = models.BooleanField(default=True)
class Meta:
db_table = 'ai_scanner_file_tokens'
def __str__(self):
return f"Access token {self.token} for {self.domain}"
def is_expired(self):
from django.utils import timezone
return timezone.now() > self.expires_at

145
aiScanner/status_api.py Normal file
View File

@@ -0,0 +1,145 @@
import json
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods
from django.utils import timezone
from .status_models import ScanStatusUpdate
from plogical.CyberCPLogFileWriter import CyberCPLogFileWriter as logging
@csrf_exempt
@require_http_methods(['POST'])
def receive_status_update(request):
"""
Receive real-time scan status updates from platform
POST /api/ai-scanner/status-webhook
Expected payload:
{
"scan_id": "550e8400-e29b-41d4-a716-446655440000",
"phase": "scanning_files",
"progress": 75,
"current_file": "wp-content/plugins/suspicious/malware.php",
"files_discovered": 1247,
"files_scanned": 935,
"files_remaining": 312,
"threats_found": 5,
"critical_threats": 2,
"high_threats": 3,
"activity_description": "Scanning file 935/1247: wp-content/plugins/suspicious/malware.php"
}
"""
try:
data = json.loads(request.body)
scan_id = data.get('scan_id')
if not scan_id:
logging.writeToFile('[Status API] Missing scan_id in status update')
return JsonResponse({'error': 'scan_id required'}, status=400)
# Update or create status record
status_update, created = ScanStatusUpdate.objects.update_or_create(
scan_id=scan_id,
defaults={
'phase': data.get('phase', ''),
'progress': int(data.get('progress', 0)),
'current_file': data.get('current_file', ''),
'files_discovered': int(data.get('files_discovered', 0)),
'files_scanned': int(data.get('files_scanned', 0)),
'files_remaining': int(data.get('files_remaining', 0)),
'threats_found': int(data.get('threats_found', 0)),
'critical_threats': int(data.get('critical_threats', 0)),
'high_threats': int(data.get('high_threats', 0)),
'activity_description': data.get('activity_description', ''),
'last_updated': timezone.now()
}
)
action = "Created" if created else "Updated"
# Extended logging for debugging
logging.writeToFile(f'[Status API] ✅ {action} status update for scan {scan_id}')
logging.writeToFile(f'[Status API] Phase: {data.get("phase")} → Progress: {data.get("progress", 0)}%')
logging.writeToFile(f'[Status API] Files: {data.get("files_scanned", 0)}/{data.get("files_discovered", 0)} ({data.get("files_remaining", 0)} remaining)')
logging.writeToFile(f'[Status API] Threats: {data.get("threats_found", 0)} total (Critical: {data.get("critical_threats", 0)}, High: {data.get("high_threats", 0)})')
if data.get('current_file'):
logging.writeToFile(f'[Status API] Current File: {data.get("current_file")}')
if data.get('activity_description'):
logging.writeToFile(f'[Status API] Activity: {data.get("activity_description")}')
return JsonResponse({'success': True})
except json.JSONDecodeError:
logging.writeToFile('[Status API] Invalid JSON in status update')
return JsonResponse({'error': 'Invalid JSON'}, status=400)
except ValueError as e:
logging.writeToFile(f'[Status API] Value error in status update: {str(e)}')
return JsonResponse({'error': 'Invalid data types'}, status=400)
except Exception as e:
logging.writeToFile(f'[Status API] Status update error: {str(e)}')
return JsonResponse({'error': 'Internal server error'}, status=500)
@require_http_methods(['GET'])
def get_live_scan_progress(request, scan_id):
"""
Get current scan progress for real-time UI updates
GET /api/ai-scanner/scan/{scan_id}/live-progress
Response:
{
"success": true,
"scan_id": "550e8400-e29b-41d4-a716-446655440000",
"phase": "scanning_files",
"progress": 75,
"current_file": "wp-content/plugins/suspicious/malware.php",
"files_discovered": 1247,
"files_scanned": 935,
"files_remaining": 312,
"threats_found": 5,
"critical_threats": 2,
"high_threats": 3,
"activity_description": "Scanning file 935/1247: wp-content/plugins/suspicious/malware.php",
"last_updated": "2024-12-25T10:34:30Z",
"is_active": true
}
"""
try:
# Get latest status update
try:
status_update = ScanStatusUpdate.objects.get(scan_id=scan_id)
except ScanStatusUpdate.DoesNotExist:
logging.writeToFile(f'[Status API] Status not found for scan {scan_id}')
return JsonResponse({'success': False, 'error': 'Scan not found'}, status=404)
response_data = {
'success': True,
'scan_id': scan_id,
'phase': status_update.phase,
'progress': status_update.progress,
'current_file': status_update.current_file,
'files_discovered': status_update.files_discovered,
'files_scanned': status_update.files_scanned,
'files_remaining': status_update.files_remaining,
'threats_found': status_update.threats_found,
'critical_threats': status_update.critical_threats,
'high_threats': status_update.high_threats,
'activity_description': status_update.activity_description,
'last_updated': status_update.last_updated.isoformat(),
'is_active': status_update.is_active
}
# Extended logging for frontend polling
logging.writeToFile(f'[Status API] 📊 Frontend polling scan {scan_id}')
logging.writeToFile(f'[Status API] Status: {status_update.phase} ({status_update.progress}%) - Active: {status_update.is_active}')
logging.writeToFile(f'[Status API] Progress: {status_update.files_scanned}/{status_update.files_discovered} files, {status_update.threats_found} threats')
if status_update.current_file:
logging.writeToFile(f'[Status API] Currently scanning: {status_update.current_file}')
return JsonResponse(response_data)
except Exception as e:
logging.writeToFile(f'[Status API] Get progress error: {str(e)}')
return JsonResponse({'success': False, 'error': 'Internal server error'}, status=500)

View File

@@ -0,0 +1,42 @@
from django.db import models
from django.utils import timezone
class ScanStatusUpdate(models.Model):
"""Real-time scan progress updates"""
scan_id = models.CharField(max_length=100, db_index=True, primary_key=True)
phase = models.CharField(max_length=50) # starting, discovering_files, scanning_files, completing, completed
progress = models.IntegerField(default=0) # 0-100
# File tracking
current_file = models.TextField(blank=True)
files_discovered = models.IntegerField(default=0)
files_scanned = models.IntegerField(default=0)
files_remaining = models.IntegerField(default=0)
# Threat tracking
threats_found = models.IntegerField(default=0)
critical_threats = models.IntegerField(default=0)
high_threats = models.IntegerField(default=0)
# Activity description
activity_description = models.TextField(blank=True)
# Timestamps
last_updated = models.DateTimeField(auto_now=True)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
db_table = 'ai_scanner_status_updates'
ordering = ['-last_updated']
indexes = [
models.Index(fields=['scan_id', '-last_updated']),
]
def __str__(self):
return f"Status update for {self.scan_id} - {self.phase} ({self.progress}%)"
@property
def is_active(self):
"""Check if scan is still active"""
return self.phase not in ['completed', 'failed', 'cancelled']

File diff suppressed because it is too large Load Diff

3
aiScanner/tests.py Normal file
View File

@@ -0,0 +1,3 @@
from django.test import TestCase
# Create your tests here.

26
aiScanner/urls.py Normal file
View File

@@ -0,0 +1,26 @@
from django.urls import path
from . import views, api
urlpatterns = [
# Main AI Scanner pages
path('', views.aiScannerHome, name='aiScannerHome'),
path('setup-payment/', views.setupPayment, name='aiScannerSetupPayment'),
path('setup-complete/', views.setupComplete, name='aiScannerSetupComplete'),
path('start-scan/', views.startScan, name='aiScannerStartScan'),
path('refresh-balance/', views.refreshBalance, name='aiScannerRefreshBalance'),
path('add-payment-method/', views.addPaymentMethod, name='aiScannerAddPaymentMethod'),
path('payment-method-complete/', views.paymentMethodComplete, name='aiScannerPaymentMethodComplete'),
path('callback/', views.scanCallback, name='aiScannerCallback'),
# Scan management
path('scan-history/', views.getScanHistory, name='aiScannerHistory'),
path('scan-details/<str:scan_id>/', views.getScanDetails, name='aiScannerDetails'),
path('platform-status/<str:scan_id>/', views.getPlatformScanStatus, name='aiScannerPlatformStatus'),
# Note: RESTful API endpoints are in /api/urls.py for external access
# Legacy API endpoints (for backward compatibility)
path('api/authenticate/', views.aiScannerAuthenticate, name='aiScannerAuthenticate'),
path('api/list-files/', views.aiScannerListFiles, name='aiScannerListFiles'),
path('api/get-file/', views.aiScannerGetFile, name='aiScannerGetFile'),
]

439
aiScanner/views.py Normal file
View File

@@ -0,0 +1,439 @@
from django.shortcuts import render, redirect
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods
from loginSystem.views import loadLoginPage
from .aiScannerManager import AIScannerManager
import json
import os
def aiScannerHome(request):
"""Main AI Scanner page"""
try:
userID = request.session['userID']
sm = AIScannerManager()
return sm.scannerHome(request, userID)
except KeyError:
return redirect(loadLoginPage)
def setupPayment(request):
"""Setup payment method for AI scanner"""
try:
userID = request.session['userID']
sm = AIScannerManager()
return sm.setupPayment(request, userID)
except KeyError:
return JsonResponse({'success': False, 'error': 'Not authenticated'})
def setupComplete(request):
"""Handle return from payment setup"""
try:
userID = request.session['userID']
sm = AIScannerManager()
return sm.setupComplete(request, userID)
except KeyError:
return redirect(loadLoginPage)
def startScan(request):
"""Start a new AI security scan"""
try:
userID = request.session['userID']
sm = AIScannerManager()
return sm.startScan(request, userID)
except KeyError:
return JsonResponse({'success': False, 'error': 'Not authenticated'})
def refreshBalance(request):
"""Refresh account balance from API"""
try:
userID = request.session['userID']
sm = AIScannerManager()
return sm.refreshBalance(request, userID)
except KeyError:
return JsonResponse({'success': False, 'error': 'Not authenticated'})
def addPaymentMethod(request):
"""Add a new payment method"""
try:
userID = request.session['userID']
sm = AIScannerManager()
return sm.addPaymentMethod(request, userID)
except KeyError:
return JsonResponse({'success': False, 'error': 'Not authenticated'})
def paymentMethodComplete(request):
"""Handle return from adding payment method"""
try:
userID = request.session['userID']
sm = AIScannerManager()
return sm.paymentMethodComplete(request, userID)
except KeyError:
return redirect(loadLoginPage)
@csrf_exempt
def scanCallback(request):
"""Handle scan results callback from AI Scanner API"""
sm = AIScannerManager()
return sm.scanCallback(request)
def getScanHistory(request):
"""Get scan history for user"""
try:
userID = request.session['userID']
from loginSystem.models import Administrator
from .models import ScanHistory
admin = Administrator.objects.get(pk=userID)
scans = ScanHistory.objects.filter(admin=admin).order_by('-started_at')[:20]
scan_data = []
for scan in scans:
scan_data.append({
'scan_id': scan.scan_id,
'domain': scan.domain,
'status': scan.status,
'scan_type': scan.scan_type,
'started_at': scan.started_at.strftime('%Y-%m-%d %H:%M:%S'),
'completed_at': scan.completed_at.strftime('%Y-%m-%d %H:%M:%S') if scan.completed_at else None,
'cost_usd': float(scan.cost_usd) if scan.cost_usd else 0,
'files_scanned': scan.files_scanned,
'issues_found': scan.issues_found,
'findings': scan.findings[:5] if scan.findings else [], # First 5 findings
'summary': scan.summary
})
return JsonResponse({'success': True, 'scans': scan_data})
except KeyError:
return JsonResponse({'success': False, 'error': 'Not authenticated'})
except Exception as e:
return JsonResponse({'success': False, 'error': str(e)})
def getScanDetails(request, scan_id):
"""Get detailed scan results"""
try:
userID = request.session['userID']
from loginSystem.models import Administrator
from .models import ScanHistory
admin = Administrator.objects.get(pk=userID)
scan = ScanHistory.objects.get(scan_id=scan_id, admin=admin)
scan_data = {
'scan_id': scan.scan_id,
'domain': scan.domain,
'status': scan.status,
'scan_type': scan.scan_type,
'started_at': scan.started_at.strftime('%Y-%m-%d %H:%M:%S'),
'completed_at': scan.completed_at.strftime('%Y-%m-%d %H:%M:%S') if scan.completed_at else None,
'cost_usd': float(scan.cost_usd) if scan.cost_usd else 0,
'files_scanned': scan.files_scanned,
'issues_found': scan.issues_found,
'findings': scan.findings,
'summary': scan.summary,
'error_message': scan.error_message
}
return JsonResponse({'success': True, 'scan': scan_data})
except KeyError:
return JsonResponse({'success': False, 'error': 'Not authenticated'})
except ScanHistory.DoesNotExist:
return JsonResponse({'success': False, 'error': 'Scan not found'})
except Exception as e:
return JsonResponse({'success': False, 'error': str(e)})
def getPlatformScanStatus(request, scan_id):
"""Get real-time scan status from AI Scanner platform"""
try:
userID = request.session['userID']
from loginSystem.models import Administrator
from .models import ScanHistory
admin = Administrator.objects.get(pk=userID)
scan = ScanHistory.objects.get(scan_id=scan_id, admin=admin)
scanner_settings = admin.ai_scanner_settings
if not scanner_settings.api_key:
return JsonResponse({'success': False, 'error': 'API key not configured'})
# Get real-time status from platform
sm = AIScannerManager()
platform_status = sm.get_scan_status(scanner_settings.api_key, scan_id)
if platform_status and platform_status.get('success'):
status_data = platform_status.get('data', {})
# Return formatted status data for frontend
return JsonResponse({
'success': True,
'scan_id': scan_id,
'phase': status_data.get('status', 'unknown'),
'progress': status_data.get('progress', 0),
'current_file': status_data.get('current_file', ''),
'files_discovered': status_data.get('files_discovered', 0),
'files_scanned': status_data.get('files_scanned', 0),
'files_remaining': status_data.get('files_remaining', 0),
'threats_found': status_data.get('findings_count', 0),
'critical_threats': status_data.get('critical_threats', 0),
'high_threats': status_data.get('high_threats', 0),
'activity_description': status_data.get('activity_description', ''),
'last_updated': status_data.get('last_updated', ''),
'is_active': status_data.get('status') in ['scanning', 'discovering_files', 'starting'],
'cost': status_data.get('cost', '$0.00')
})
else:
# No live status available, return scan database status
return JsonResponse({
'success': True,
'scan_id': scan_id,
'phase': scan.status,
'progress': 100 if scan.status == 'completed' else 0,
'current_file': '',
'files_discovered': scan.files_scanned,
'files_scanned': scan.files_scanned,
'files_remaining': 0,
'threats_found': scan.issues_found,
'critical_threats': 0,
'high_threats': 0,
'activity_description': scan.error_message if scan.status == 'failed' else 'Scan completed',
'last_updated': scan.completed_at.isoformat() if scan.completed_at else scan.started_at.isoformat(),
'is_active': False,
'cost': f'${scan.cost_usd:.4f}' if scan.cost_usd else '$0.00'
})
except KeyError:
return JsonResponse({'success': False, 'error': 'Not authenticated'})
except ScanHistory.DoesNotExist:
return JsonResponse({'success': False, 'error': 'Scan not found'})
except Exception as e:
return JsonResponse({'success': False, 'error': str(e)})
# File Access API for AI Scanner
@csrf_exempt
@require_http_methods(['POST'])
def aiScannerAuthenticate(request):
"""Authenticate AI scanner access"""
try:
data = json.loads(request.body)
access_token = data.get('access_token')
scan_id = data.get('scan_id')
if not access_token or not scan_id:
return JsonResponse({'success': False, 'error': 'Missing parameters'})
from .models import FileAccessToken, ScanHistory
# Validate token
try:
file_token = FileAccessToken.objects.get(
token=access_token,
scan_history__scan_id=scan_id,
is_active=True
)
if file_token.is_expired():
return JsonResponse({'success': False, 'error': 'Token expired'})
# Get WordPress info
from websiteFunctions.models import Websites
try:
website = Websites.objects.get(domain=file_token.domain)
# Detect WordPress path and version
wp_path = file_token.wp_path
wp_version = 'Unknown'
php_version = 'Unknown'
# Try to get WP version from wp-includes/version.php
version_file = os.path.join(wp_path, 'wp-includes', 'version.php')
if os.path.exists(version_file):
try:
with open(version_file, 'r') as f:
content = f.read()
import re
match = re.search(r'\$wp_version\s*=\s*[\'"]([^\'"]+)[\'"]', content)
if match:
wp_version = match.group(1)
except:
pass
return JsonResponse({
'success': True,
'site_info': {
'domain': file_token.domain,
'wp_path': wp_path,
'php_version': php_version,
'wp_version': wp_version,
'scan_id': scan_id
}
})
except Websites.DoesNotExist:
return JsonResponse({'success': False, 'error': 'Website not found'})
except FileAccessToken.DoesNotExist:
return JsonResponse({'success': False, 'error': 'Invalid token'})
except Exception as e:
return JsonResponse({'success': False, 'error': str(e)})
@csrf_exempt
@require_http_methods(['GET'])
def aiScannerListFiles(request):
"""List directory contents for AI scanner"""
try:
path = request.GET.get('path', '')
access_token = request.headers.get('Authorization', '').replace('Bearer ', '')
if not access_token:
return JsonResponse({'success': False, 'error': 'No authorization token'})
from .models import FileAccessToken
# Validate token
try:
file_token = FileAccessToken.objects.get(token=access_token, is_active=True)
if file_token.is_expired():
return JsonResponse({'success': False, 'error': 'Token expired'})
# Construct full path
full_path = os.path.join(file_token.wp_path, path)
# Security check - ensure path is within WordPress directory
if not os.path.abspath(full_path).startswith(os.path.abspath(file_token.wp_path)):
return JsonResponse({'success': False, 'error': 'Path not allowed'})
if not os.path.exists(full_path):
return JsonResponse({'success': False, 'error': 'Path not found'})
if not os.path.isdir(full_path):
return JsonResponse({'success': False, 'error': 'Path is not a directory'})
# List directory contents
items = []
try:
for item in os.listdir(full_path):
item_path = os.path.join(full_path, item)
# Skip hidden files and certain directories
if item.startswith('.') or item in ['__pycache__', 'node_modules']:
continue
if os.path.isdir(item_path):
items.append({
'name': item,
'type': 'directory',
'path': os.path.join(path, item).replace('\\', '/') if path else item
})
else:
# Only include certain file types
if item.endswith(('.php', '.js', '.html', '.htm', '.css', '.txt', '.md', '.json', '.xml')):
items.append({
'name': item,
'type': 'file',
'path': os.path.join(path, item).replace('\\', '/') if path else item,
'size': os.path.getsize(item_path)
})
return JsonResponse({
'success': True,
'path': path,
'items': items
})
except PermissionError:
return JsonResponse({'success': False, 'error': 'Permission denied'})
except FileAccessToken.DoesNotExist:
return JsonResponse({'success': False, 'error': 'Invalid token'})
except Exception as e:
return JsonResponse({'success': False, 'error': str(e)})
@csrf_exempt
@require_http_methods(['GET'])
def aiScannerGetFile(request):
"""Get file content for AI scanner"""
try:
file_path = request.GET.get('path')
access_token = request.headers.get('Authorization', '').replace('Bearer ', '')
if not access_token or not file_path:
return JsonResponse({'success': False, 'error': 'Missing parameters'})
from .models import FileAccessToken
# Validate token
try:
file_token = FileAccessToken.objects.get(token=access_token, is_active=True)
if file_token.is_expired():
return JsonResponse({'success': False, 'error': 'Token expired'})
# Construct full path
full_path = os.path.join(file_token.wp_path, file_path)
# Security check - ensure path is within WordPress directory
if not os.path.abspath(full_path).startswith(os.path.abspath(file_token.wp_path)):
return JsonResponse({'success': False, 'error': 'Path not allowed'})
if not os.path.exists(full_path):
return JsonResponse({'success': False, 'error': 'File not found'})
if not os.path.isfile(full_path):
return JsonResponse({'success': False, 'error': 'Path is not a file'})
# Check file size (max 10MB as per API limits)
file_size = os.path.getsize(full_path)
if file_size > 10 * 1024 * 1024: # 10MB
return JsonResponse({'success': False, 'error': 'File too large'})
# Read file content
try:
with open(full_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
return JsonResponse({
'success': True,
'path': file_path,
'content': content,
'size': file_size
})
except UnicodeDecodeError:
# Try binary mode for non-text files
with open(full_path, 'rb') as f:
content = f.read()
# Return base64 encoded for binary files
import base64
return JsonResponse({
'success': True,
'path': file_path,
'content': base64.b64encode(content).decode('utf-8'),
'encoding': 'base64',
'size': file_size
})
except FileAccessToken.DoesNotExist:
return JsonResponse({'success': False, 'error': 'Invalid token'})
except Exception as e:
return JsonResponse({'success': False, 'error': str(e)})

View File

@@ -1339,6 +1339,9 @@
<a href="{% url 'imunifyAV' %}" class="menu-item">
<span>ImunifyAV</span>
</a>
<a href="{% url 'aiScannerHome' %}" class="menu-item">
<span>AI Scanner</span>
</a>
</div>
<a href="#" class="menu-item" onclick="toggleSubmenu('mail-settings-submenu', this); return false;">