mirror of
https://github.com/usmannasir/cyberpanel.git
synced 2025-12-16 05:19:43 +01:00
add aiscanner file patcher
This commit is contained in:
@@ -17,40 +17,104 @@ class SecurityError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class AuthWrapper:
|
||||
"""
|
||||
Wrapper to provide consistent interface for both FileAccessToken and API Key authentication
|
||||
"""
|
||||
def __init__(self, domain, wp_path, auth_type, source_obj=None):
|
||||
self.domain = domain
|
||||
self.wp_path = wp_path
|
||||
self.auth_type = auth_type # 'file_token' or 'api_key'
|
||||
self.source_obj = source_obj # Original FileAccessToken or AIScannerSettings object
|
||||
|
||||
|
||||
def validate_access_token(token, scan_id):
|
||||
"""
|
||||
Implement proper token validation
|
||||
- Check token format
|
||||
- Verify token hasn't expired
|
||||
- Confirm token is for the correct scan
|
||||
- Log access attempts
|
||||
Validate authentication token - accepts BOTH file access tokens and API keys
|
||||
|
||||
Authentication Flow:
|
||||
1. Try FileAccessToken (temporary token for active scans)
|
||||
2. If not found, try API Key (for post-scan file operations)
|
||||
|
||||
Returns: (AuthWrapper object or None, error_message or None)
|
||||
"""
|
||||
try:
|
||||
if not token or not token.startswith('cp_'):
|
||||
logging.writeToFile(f'[API] Invalid token format: {token[:20] if token else "None"}...')
|
||||
return None, "Invalid token format"
|
||||
|
||||
# Find the token in database
|
||||
# OPTION 1: Try FileAccessToken first (for active scans)
|
||||
try:
|
||||
file_token = FileAccessToken.objects.get(
|
||||
token=token,
|
||||
scan_history__scan_id=scan_id,
|
||||
is_active=True
|
||||
)
|
||||
|
||||
|
||||
if file_token.is_expired():
|
||||
logging.writeToFile(f'[API] Token expired for scan {scan_id}')
|
||||
return None, "Token expired"
|
||||
|
||||
logging.writeToFile(f'[API] Token validated successfully for scan {scan_id}')
|
||||
return file_token, None
|
||||
|
||||
logging.writeToFile(f'[API] File token expired for scan {scan_id}, trying API key fallback...')
|
||||
# Don't return here - fall through to try API key
|
||||
else:
|
||||
logging.writeToFile(f'[API] File token validated successfully for scan {scan_id}')
|
||||
return AuthWrapper(
|
||||
domain=file_token.domain,
|
||||
wp_path=file_token.wp_path,
|
||||
auth_type='file_token',
|
||||
source_obj=file_token
|
||||
), None
|
||||
|
||||
except FileAccessToken.DoesNotExist:
|
||||
logging.writeToFile(f'[API] Token not found for scan {scan_id}')
|
||||
logging.writeToFile(f'[API] File token not found for scan {scan_id}, trying API key fallback...')
|
||||
# Fall through to try API key
|
||||
|
||||
# OPTION 2: Try API Key (for post-scan file operations)
|
||||
try:
|
||||
from .models import AIScannerSettings, ScanHistory
|
||||
|
||||
# Find API key in settings
|
||||
scanner_settings = AIScannerSettings.objects.get(
|
||||
api_key=token
|
||||
)
|
||||
|
||||
logging.writeToFile(f'[API] Found API key for admin: {scanner_settings.admin.userName}')
|
||||
|
||||
# Get the scan to verify it belongs to this admin and get domain/path
|
||||
try:
|
||||
scan = ScanHistory.objects.get(
|
||||
scan_id=scan_id,
|
||||
admin=scanner_settings.admin
|
||||
)
|
||||
|
||||
# Get wp_path from website
|
||||
try:
|
||||
website = Websites.objects.get(domain=scan.domain)
|
||||
wp_path = website.path
|
||||
|
||||
logging.writeToFile(f'[API] API key validated successfully for scan {scan_id}, domain {scan.domain}')
|
||||
|
||||
return AuthWrapper(
|
||||
domain=scan.domain,
|
||||
wp_path=wp_path,
|
||||
auth_type='api_key',
|
||||
source_obj=scanner_settings
|
||||
), None
|
||||
|
||||
except Websites.DoesNotExist:
|
||||
logging.writeToFile(f'[API] Website not found for domain: {scan.domain}')
|
||||
return None, "Website not found"
|
||||
|
||||
except ScanHistory.DoesNotExist:
|
||||
logging.writeToFile(f'[API] Scan {scan_id} not found or does not belong to API key owner')
|
||||
return None, "Scan not found or access denied"
|
||||
|
||||
except AIScannerSettings.DoesNotExist:
|
||||
logging.writeToFile(f'[API] API key not found in settings')
|
||||
return None, "Invalid token"
|
||||
|
||||
|
||||
except Exception as e:
|
||||
logging.writeToFile(f'[API] Token validation error: {str(e)}')
|
||||
import traceback
|
||||
logging.writeToFile(f'[API] Traceback: {traceback.format_exc()}')
|
||||
return None, "Token validation failed"
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user