Add X-API-Key header support for AI Scanner file operations

- Added extract_auth_token() function to handle both Bearer and X-API-Key authentication
- Updated all file operation endpoints to support X-API-Key headers:
  - list_files()
  - get_file_content()
  - scanner_backup_file()
  - scanner_get_file()
  - scanner_replace_file()
  - scanner_rename_file()
  - scanner_delete_file()
- Maintains backward compatibility with existing Bearer token authentication
- Added test script to verify both authentication methods work correctly
- Enables permanent API key authentication for file fixes (no more expired token issues)

This change allows the platform to fix files using the permanent CyberPanel API key
instead of temporary file access tokens that expire after ~1 hour.
This commit is contained in:
usmannasir
2025-10-27 13:27:37 +05:00
parent a60c48276d
commit cfeac42527
2 changed files with 183 additions and 39 deletions

View File

@@ -29,6 +29,27 @@ class AuthWrapper:
self.source_obj = source_obj # Original FileAccessToken or AIScannerSettings object
def extract_auth_token(request):
"""
Extract authentication token from either Bearer or X-API-Key header
Returns: (token, auth_type) where auth_type is 'bearer' or 'api_key'
"""
# Check for X-API-Key header first (preferred for permanent auth)
api_key_header = request.META.get('HTTP_X_API_KEY', '')
if api_key_header:
logging.writeToFile(f'[API] Using X-API-Key authentication')
return api_key_header, 'api_key'
# Check for Bearer token (backward compatibility)
auth_header = request.META.get('HTTP_AUTHORIZATION', '')
if auth_header.startswith('Bearer '):
logging.writeToFile(f'[API] Using Bearer token authentication')
return auth_header.replace('Bearer ', ''), 'bearer'
return None, None
def validate_access_token(token, scan_id):
"""
Validate authentication token - accepts BOTH file access tokens and API keys
@@ -309,14 +330,13 @@ def list_files(request):
}
"""
try:
# Validate authorization
auth_header = request.META.get('HTTP_AUTHORIZATION', '')
if not auth_header.startswith('Bearer '):
return JsonResponse({'error': 'Missing or invalid Authorization header'}, status=401)
access_token = auth_header.replace('Bearer ', '')
# Validate authorization (supports both Bearer token and X-API-Key)
access_token, auth_type = extract_auth_token(request)
if not access_token:
return JsonResponse({'error': 'Missing or invalid Authorization header. Use Bearer token or X-API-Key header'}, status=401)
scan_id = request.META.get('HTTP_X_SCAN_ID', '')
if not scan_id:
return JsonResponse({'error': 'X-Scan-ID header required'}, status=400)
@@ -436,14 +456,13 @@ def get_file_content(request):
}
"""
try:
# Validate authorization
auth_header = request.META.get('HTTP_AUTHORIZATION', '')
if not auth_header.startswith('Bearer '):
return JsonResponse({'error': 'Missing or invalid Authorization header'}, status=401)
access_token = auth_header.replace('Bearer ', '')
# Validate authorization (supports both Bearer token and X-API-Key)
access_token, auth_type = extract_auth_token(request)
if not access_token:
return JsonResponse({'error': 'Missing or invalid Authorization header. Use Bearer token or X-API-Key header'}, status=401)
scan_id = request.META.get('HTTP_X_SCAN_ID', '')
if not scan_id:
return JsonResponse({'error': 'X-Scan-ID header required'}, status=400)
@@ -855,12 +874,11 @@ def scanner_backup_file(request):
file_path = data.get('file_path', '').strip('/')
scan_id = data.get('scan_id', '')
# Validate authorization
auth_header = request.META.get('HTTP_AUTHORIZATION', '')
if not auth_header.startswith('Bearer '):
return JsonResponse({'success': False, 'error': 'Missing or invalid Authorization header'}, status=401)
# Validate authorization (supports both Bearer token and X-API-Key)
access_token, auth_type = extract_auth_token(request)
if not access_token:
return JsonResponse({'success': False, 'error': 'Missing or invalid Authorization header. Use Bearer token or X-API-Key header'}, status=401)
access_token = auth_header.replace('Bearer ', '')
header_scan_id = request.META.get('HTTP_X_SCAN_ID', '')
if not scan_id or not header_scan_id or scan_id != header_scan_id:
@@ -997,12 +1015,11 @@ def scanner_get_file(request):
}
"""
try:
# Validate authorization
auth_header = request.META.get('HTTP_AUTHORIZATION', '')
if not auth_header.startswith('Bearer '):
return JsonResponse({'success': False, 'error': 'Missing or invalid Authorization header'}, status=401)
# Validate authorization (supports both Bearer token and X-API-Key)
access_token, auth_type = extract_auth_token(request)
if not access_token:
return JsonResponse({'success': False, 'error': 'Missing or invalid Authorization header. Use Bearer token or X-API-Key header'}, status=401)
access_token = auth_header.replace('Bearer ', '')
scan_id = request.META.get('HTTP_X_SCAN_ID', '')
if not scan_id:
@@ -1177,12 +1194,11 @@ def scanner_replace_file(request):
backup_before_replace = data.get('backup_before_replace', True)
verify_hash = data.get('verify_hash', '')
# Validate authorization
auth_header = request.META.get('HTTP_AUTHORIZATION', '')
if not auth_header.startswith('Bearer '):
return JsonResponse({'success': False, 'error': 'Missing or invalid Authorization header'}, status=401)
# Validate authorization (supports both Bearer token and X-API-Key)
access_token, auth_type = extract_auth_token(request)
if not access_token:
return JsonResponse({'success': False, 'error': 'Missing or invalid Authorization header. Use Bearer token or X-API-Key header'}, status=401)
access_token = auth_header.replace('Bearer ', '')
scan_id = request.META.get('HTTP_X_SCAN_ID', '')
if not scan_id:
@@ -1386,12 +1402,11 @@ def scanner_rename_file(request):
new_path = data.get('new_path', '').strip('/')
backup_before_rename = data.get('backup_before_rename', True)
# Validate authorization
auth_header = request.META.get('HTTP_AUTHORIZATION', '')
if not auth_header.startswith('Bearer '):
return JsonResponse({'success': False, 'error': 'Missing or invalid Authorization header'}, status=401)
# Validate authorization (supports both Bearer token and X-API-Key)
access_token, auth_type = extract_auth_token(request)
if not access_token:
return JsonResponse({'success': False, 'error': 'Missing or invalid Authorization header. Use Bearer token or X-API-Key header'}, status=401)
access_token = auth_header.replace('Bearer ', '')
scan_id = request.META.get('HTTP_X_SCAN_ID', '')
if not scan_id:
@@ -1560,12 +1575,11 @@ def scanner_delete_file(request):
'message': 'Set confirm_deletion: true to proceed'
}, status=400)
# Validate authorization
auth_header = request.META.get('HTTP_AUTHORIZATION', '')
if not auth_header.startswith('Bearer '):
return JsonResponse({'success': False, 'error': 'Missing or invalid Authorization header'}, status=401)
# Validate authorization (supports both Bearer token and X-API-Key)
access_token, auth_type = extract_auth_token(request)
if not access_token:
return JsonResponse({'success': False, 'error': 'Missing or invalid Authorization header. Use Bearer token or X-API-Key header'}, status=401)
access_token = auth_header.replace('Bearer ', '')
scan_id = request.META.get('HTTP_X_SCAN_ID', '')
if not scan_id:

130
test_api_auth.py Normal file
View File

@@ -0,0 +1,130 @@
#!/usr/bin/env python3
"""
Test script to verify both Bearer token and X-API-Key authentication work
for CyberPanel AI Scanner file operations.
"""
import requests
import json
import sys
# Test configuration
BASE_URL = "http://localhost:8001" # Adjust if needed
SCAN_ID = "test-scan-123"
FILE_PATH = "wp-content/plugins/test.php"
def test_bearer_auth(token):
"""Test with Bearer token authentication"""
print("Testing Bearer token authentication...")
headers = {
"Authorization": f"Bearer {token}",
"X-Scan-ID": SCAN_ID,
"Content-Type": "application/json"
}
# Test get-file endpoint
url = f"{BASE_URL}/api/scanner/get-file"
params = {"file_path": FILE_PATH}
response = requests.get(url, params=params, headers=headers)
print(f"Bearer auth response: {response.status_code}")
if response.status_code != 200:
print(f"Response: {response.text}")
return response.status_code == 200 or response.status_code == 404 # 404 is ok if file doesn't exist
def test_api_key_auth(api_key):
"""Test with X-API-Key authentication"""
print("\nTesting X-API-Key authentication...")
headers = {
"X-API-Key": api_key,
"X-Scan-ID": SCAN_ID,
"Content-Type": "application/json"
}
# Test get-file endpoint
url = f"{BASE_URL}/api/scanner/get-file"
params = {"file_path": FILE_PATH}
response = requests.get(url, params=params, headers=headers)
print(f"X-API-Key auth response: {response.status_code}")
if response.status_code != 200:
print(f"Response: {response.text}")
return response.status_code == 200 or response.status_code == 404 # 404 is ok if file doesn't exist
def test_mixed_endpoints():
"""Test different endpoints with both authentication methods"""
print("\n" + "="*50)
print("Testing multiple endpoints with both auth methods")
print("="*50)
# You would need real tokens for this to work
test_token = "cp_test_token_12345"
test_api_key = "cp_test_api_key_67890"
endpoints = [
("GET", "/api/ai-scanner/files/list", {"path": "wp-content"}),
("GET", "/api/ai-scanner/files/content", {"path": FILE_PATH}),
("GET", "/api/scanner/get-file", {"file_path": FILE_PATH}),
]
for method, endpoint, params in endpoints:
print(f"\nTesting {method} {endpoint}")
# Test with Bearer token
headers_bearer = {
"Authorization": f"Bearer {test_token}",
"X-Scan-ID": SCAN_ID
}
# Test with X-API-Key
headers_api_key = {
"X-API-Key": test_api_key,
"X-Scan-ID": SCAN_ID
}
url = f"{BASE_URL}{endpoint}"
# Make requests (will fail without valid tokens, but shows the headers work)
if method == "GET":
response_bearer = requests.get(url, params=params, headers=headers_bearer)
response_api_key = requests.get(url, params=params, headers=headers_api_key)
print(f" Bearer auth: {response_bearer.status_code}")
print(f" X-API-Key auth: {response_api_key.status_code}")
def main():
"""Main test function"""
print("CyberPanel AI Scanner Authentication Test")
print("="*50)
if len(sys.argv) > 1:
# If token provided as argument, use it
token = sys.argv[1]
# Test both authentication methods with the same token
# (assumes token is valid for both methods)
bearer_success = test_bearer_auth(token)
api_key_success = test_api_key_auth(token)
print("\n" + "="*50)
print("Test Results:")
print(f" Bearer authentication: {'✓ PASS' if bearer_success else '✗ FAIL'}")
print(f" X-API-Key authentication: {'✓ PASS' if api_key_success else '✗ FAIL'}")
print("="*50)
else:
# Run mock tests to show the endpoints accept both header formats
test_mixed_endpoints()
print("\n" + "="*50)
print("Note: To run real tests, provide a valid token:")
print(f" python {sys.argv[0]} cp_your_token_here")
print("="*50)
if __name__ == "__main__":
main()