Fix backup file operation failures for API key authentication

Problem: "Failed to backup file before replacement" error when using API key authentication.

Root Cause:
- The externalApp field (which contains the system user for file operations) was sometimes None
- When using API key authentication, the code couldn't determine which user to run commands as

Solution:
- Added fallback logic to ensure we always have a valid user for file operations:
  1. First try wp_site.owner.externalApp
  2. If not set, try Websites.objects.get(domain).externalApp
  3. If still not set, fall back to admin username
- Added detailed error messages and logging throughout backup operations
- Enhanced error reporting to include user context and operation details
- Added validation for backup directory creation with proper error handling

Changes:
- Modified validate_access_token() OPTION 2 and OPTION 3 to ensure external_app is always set
- Enhanced backup and replace operations with better error messages
- Added detailed logging for debugging file operation failures
- Include user context in error messages for easier troubleshooting

This ensures file operations work correctly even when externalApp field is not properly configured.
This commit is contained in:
usmannasir
2025-10-27 14:16:01 +05:00
parent af35aadb92
commit 2cd361a837
2 changed files with 62 additions and 13 deletions

View File

@@ -142,6 +142,20 @@ def validate_access_token(token, scan_id):
wp_path = wp_site.path
external_app = wp_site.owner.externalApp # Get externalApp from the website owner
# If no external app, try to get it from the website directly
if not external_app:
try:
from websiteFunctions.models import Websites
website = Websites.objects.get(domain=scan.domain)
external_app = website.externalApp
except Websites.DoesNotExist:
pass
# If still no external app, use the admin username as fallback
if not external_app:
external_app = scanner_settings.admin.userName
logging.writeToFile(f'[API] Warning: No externalApp for {scan.domain}, using admin username: {external_app}')
logging.writeToFile(f'[API] API key validated successfully for scan {scan_id}, domain {scan.domain}, path {wp_path}, user {external_app}')
return AuthWrapper(
@@ -179,18 +193,34 @@ def validate_access_token(token, scan_id):
scan = ScanHistory.objects.get(scan_id=scan_id)
# Get WordPress site info
from websiteFunctions.models import WPSites
from websiteFunctions.models import WPSites, Websites
wp_site = WPSites.objects.filter(
FinalURL__icontains=scan.domain
).first()
if wp_site:
logging.writeToFile(f'[API] Platform callback validated: API key exists, scan {scan_id} found')
# Get the external app (user) for this website
external_app = wp_site.owner.externalApp
# If no external app, try to get it from the website directly
if not external_app:
try:
website = Websites.objects.get(domain=scan.domain)
external_app = website.externalApp
except Websites.DoesNotExist:
pass
# If still no external app, use the admin username as fallback
if not external_app:
external_app = wp_site.owner.admin.userName
logging.writeToFile(f'[API] Warning: No externalApp for {scan.domain}, using admin username: {external_app}')
logging.writeToFile(f'[API] Platform callback validated: API key exists, scan {scan_id} found, user {external_app}')
return AuthWrapper(
domain=scan.domain,
wp_path=wp_site.path,
auth_type='api_key',
external_app=wp_site.owner.externalApp,
external_app=external_app,
source_obj=None
), None
else:
@@ -953,9 +983,10 @@ def scanner_backup_file(request):
# Get website user from auth wrapper (already validated during authentication)
user = file_token.external_app
if not user:
error_msg = 'External app not available in auth context'
error_msg = f'External app (user) not available in auth context for domain {file_token.domain}'
logging.writeToFile(f'[API] Backup error: {error_msg}, auth_type={file_token.auth_type}')
log_file_operation(scan_id, 'backup', file_path, False, error_msg, request=request)
return JsonResponse({'success': False, 'error': error_msg}, status=500)
return JsonResponse({'success': False, 'error': error_msg, 'error_code': 'NO_USER'}, status=500)
# Check file exists
from plogical.processUtilities import ProcessUtilities
@@ -1273,9 +1304,10 @@ def scanner_replace_file(request):
# Get website user from auth wrapper (already validated during authentication)
user = file_token.external_app
if not user:
error_msg = 'External app not available in auth context'
error_msg = f'External app (user) not available in auth context for domain {file_token.domain}'
logging.writeToFile(f'[API] Replace error: {error_msg}, auth_type={file_token.auth_type}')
log_file_operation(scan_id, 'replace', file_path, False, error_msg, request=request)
return JsonResponse({'success': False, 'error': error_msg}, status=500)
return JsonResponse({'success': False, 'error': error_msg, 'error_code': 'NO_USER'}, status=500)
# Verify hash if provided
from plogical.processUtilities import ProcessUtilities
@@ -1305,7 +1337,16 @@ def scanner_replace_file(request):
wp_path_clean = file_token.wp_path.rstrip('/')
backup_dir_name = f'{wp_path_clean}/.ai-scanner-backups/{datetime.datetime.now().strftime("%Y-%m-%d")}'
mkdir_cmd = f'mkdir -p "{backup_dir_name}"'
ProcessUtilities.executioner(mkdir_cmd, user=user)
logging.writeToFile(f'[API] Creating backup dir with user {user}: {backup_dir_name}')
mkdir_result = ProcessUtilities.executioner(mkdir_cmd, user=user)
# Check if directory creation failed
if mkdir_result != 1:
error_msg = f'Failed to create backup directory: {backup_dir_name} for user {user}'
logging.writeToFile(f'[API] {error_msg}, mkdir_result={mkdir_result}')
log_file_operation(scan_id, 'replace', file_path, False, error_msg, request=request)
return JsonResponse({'success': False, 'error': 'Failed to create backup directory', 'error_code': 'BACKUP_DIR_FAILED', 'details': error_msg}, status=500)
timestamp = int(time.time())
basename = os.path.basename(full_path)
@@ -1317,8 +1358,10 @@ def scanner_replace_file(request):
# executioner returns 1 for success, 0 for failure
if cp_result != 1:
log_file_operation(scan_id, 'replace', file_path, False, 'Failed to create backup', backup_path=backup_path, request=request)
return JsonResponse({'success': False, 'error': 'Failed to create backup', 'error_code': 'BACKUP_FAILED'}, status=500)
error_msg = f'Failed to create backup: cp command failed for user {user}'
logging.writeToFile(f'[API] {error_msg}, cp_result={cp_result}, backup_path={backup_path}')
log_file_operation(scan_id, 'replace', file_path, False, error_msg, backup_path=backup_path, request=request)
return JsonResponse({'success': False, 'error': 'Failed to backup file before replacement', 'error_code': 'BACKUP_FAILED', 'details': error_msg}, status=500)
# Write new content to temp file first (atomic write)
# Write to /tmp (accessible by all users, no permission issues)