Fix file operation endpoints to use website externalApp correctly

- Add external_app field to AuthWrapper class
- Get externalApp from website object during authentication
  - For FileAccessToken: Use Websites.externalApp
  - For API Key: Use WPSites.owner.externalApp
- Update all 5 file operation endpoints to use file_token.external_app
  - scanner_backup_file
  - scanner_get_file
  - scanner_replace_file
  - scanner_rename_file
  - scanner_delete_file
- Ensures externalApp matches wp_path since they come from same source
- Fixes backup API failing due to incorrect user context
This commit is contained in:
usmannasir
2025-10-26 14:12:42 +05:00
parent f870494bcb
commit 8ffd11fa08

View File

@@ -21,10 +21,11 @@ class AuthWrapper:
"""
Wrapper to provide consistent interface for both FileAccessToken and API Key authentication
"""
def __init__(self, domain, wp_path, auth_type, source_obj=None):
def __init__(self, domain, wp_path, auth_type, external_app=None, source_obj=None):
self.domain = domain
self.wp_path = wp_path
self.auth_type = auth_type # 'file_token' or 'api_key'
self.external_app = external_app # The website's externalApp for command execution
self.source_obj = source_obj # Original FileAccessToken or AIScannerSettings object
@@ -55,11 +56,21 @@ def validate_access_token(token, scan_id):
logging.writeToFile(f'[API] File token expired for scan {scan_id}, trying API key fallback...')
# Don't return here - fall through to try API key
else:
logging.writeToFile(f'[API] File token validated successfully for scan {scan_id}')
# Get externalApp from the website object
from websiteFunctions.models import Websites
try:
website = Websites.objects.get(domain=file_token.domain)
external_app = website.externalApp
except Websites.DoesNotExist:
logging.writeToFile(f'[API] Website not found for domain: {file_token.domain}')
return None, "Website not found"
logging.writeToFile(f'[API] File token validated successfully for scan {scan_id}, user {external_app}')
return AuthWrapper(
domain=file_token.domain,
wp_path=file_token.wp_path,
auth_type='file_token',
external_app=external_app,
source_obj=file_token
), None
@@ -100,13 +111,15 @@ def validate_access_token(token, scan_id):
return None, "WordPress site not found"
wp_path = wp_site.path
external_app = wp_site.owner.externalApp # Get externalApp from the website owner
logging.writeToFile(f'[API] API key validated successfully for scan {scan_id}, domain {scan.domain}, path {wp_path}')
logging.writeToFile(f'[API] API key validated successfully for scan {scan_id}, domain {scan.domain}, path {wp_path}, user {external_app}')
return AuthWrapper(
domain=scan.domain,
wp_path=wp_path,
auth_type='api_key',
external_app=external_app,
source_obj=scanner_settings
), None
@@ -868,12 +881,12 @@ def scanner_backup_file(request):
log_file_operation(scan_id, 'backup', file_path, False, str(e), request=request)
return JsonResponse({'success': False, 'error': 'Path not allowed'}, status=403)
# Get website user
try:
user = get_website_user(file_token.domain)
except SecurityError as e:
log_file_operation(scan_id, 'backup', file_path, False, str(e), request=request)
return JsonResponse({'success': False, 'error': str(e)}, status=404)
# Get website user from auth wrapper (already validated during authentication)
user = file_token.external_app
if not user:
error_msg = 'External app not available in auth context'
log_file_operation(scan_id, 'backup', file_path, False, error_msg, request=request)
return JsonResponse({'success': False, 'error': error_msg}, status=500)
# Check file exists
from plogical.processUtilities import ProcessUtilities
@@ -888,8 +901,16 @@ def scanner_backup_file(request):
# Create backup directory
import datetime
backup_dir_name = f'{file_token.wp_path}/.ai-scanner-backups/{datetime.datetime.now().strftime("%Y-%m-%d")}'
logging.writeToFile(f'[API] Creating backup directory: {backup_dir_name}')
mkdir_cmd = f'mkdir -p "{backup_dir_name}"'
ProcessUtilities.executioner(mkdir_cmd, user=user)
mkdir_result = ProcessUtilities.executioner(mkdir_cmd, user=user)
if mkdir_result != 0:
error_msg = f'Failed to create backup directory: {backup_dir_name}'
logging.writeToFile(f'[API] {error_msg}')
log_file_operation(scan_id, 'backup', file_path, False, error_msg, request=request)
return JsonResponse({'success': False, 'error': error_msg, 'error_code': 'BACKUP_DIR_FAILED'}, status=500)
# Create backup filename with timestamp
timestamp = int(time.time())
@@ -897,13 +918,19 @@ def scanner_backup_file(request):
backup_filename = f'{basename}.{timestamp}.bak'
backup_path = os.path.join(backup_dir_name, backup_filename)
logging.writeToFile(f'[API] Backing up {full_path} to {backup_path}')
# Copy file to backup
cp_cmd = f'cp "{full_path}" "{backup_path}"'
cp_result = ProcessUtilities.executioner(cp_cmd, user=user)
cp_result = ProcessUtilities.outputExecutioner(cp_cmd, user=user, retRequired=True)
if cp_result != 0:
log_file_operation(scan_id, 'backup', file_path, False, 'Failed to create backup', request=request)
return JsonResponse({'success': False, 'error': 'Failed to create backup', 'error_code': 'BACKUP_FAILED'}, status=500)
# Check both return code and output for errors
if cp_result[0] != 0 or (cp_result[1] and 'error' in cp_result[1].lower()):
error_output = cp_result[1] if len(cp_result) > 1 else 'Unknown error'
error_msg = f'Failed to create backup: {error_output}'
logging.writeToFile(f'[API] Backup failed: cp returned {cp_result[0]}, output: {error_output}')
log_file_operation(scan_id, 'backup', file_path, False, error_msg, request=request)
return JsonResponse({'success': False, 'error': error_msg, 'error_code': 'BACKUP_FAILED'}, status=500)
# Get file size
stat_cmd = f'stat -c %s "{backup_path}"'
@@ -1008,12 +1035,12 @@ def scanner_get_file(request):
log_file_operation(scan_id, 'read', file_path, False, f'File type not allowed: {file_ext}', request=request)
return JsonResponse({'success': False, 'error': f'File type not allowed: {file_ext}'}, status=403)
# Get website user
try:
user = get_website_user(file_token.domain)
except SecurityError as e:
log_file_operation(scan_id, 'read', file_path, False, str(e), request=request)
return JsonResponse({'success': False, 'error': str(e)}, status=404)
# Get website user from auth wrapper (already validated during authentication)
user = file_token.external_app
if not user:
error_msg = 'External app not available in auth context'
log_file_operation(scan_id, 'read', file_path, False, error_msg, request=request)
return JsonResponse({'success': False, 'error': error_msg}, status=500)
# Check file size
from plogical.processUtilities import ProcessUtilities
@@ -1172,12 +1199,12 @@ def scanner_replace_file(request):
log_file_operation(scan_id, 'replace', file_path, False, str(e), request=request)
return JsonResponse({'success': False, 'error': 'Path not allowed'}, status=403)
# Get website user
try:
user = get_website_user(file_token.domain)
except SecurityError as e:
log_file_operation(scan_id, 'replace', file_path, False, str(e), request=request)
return JsonResponse({'success': False, 'error': str(e)}, status=404)
# Get website user from auth wrapper (already validated during authentication)
user = file_token.external_app
if not user:
error_msg = 'External app not available in auth context'
log_file_operation(scan_id, 'replace', file_path, False, error_msg, request=request)
return JsonResponse({'success': False, 'error': error_msg}, status=500)
# Verify hash if provided
from plogical.processUtilities import ProcessUtilities
@@ -1359,12 +1386,12 @@ def scanner_rename_file(request):
log_file_operation(scan_id, 'rename', old_path, False, str(e), request=request)
return JsonResponse({'success': False, 'error': 'Path not allowed'}, status=403)
# Get website user
try:
user = get_website_user(file_token.domain)
except SecurityError as e:
log_file_operation(scan_id, 'rename', old_path, False, str(e), request=request)
return JsonResponse({'success': False, 'error': str(e)}, status=404)
# Get website user from auth wrapper (already validated during authentication)
user = file_token.external_app
if not user:
error_msg = 'External app not available in auth context'
log_file_operation(scan_id, 'rename', old_path, False, error_msg, request=request)
return JsonResponse({'success': False, 'error': error_msg}, status=500)
# Check source file exists
from plogical.processUtilities import ProcessUtilities
@@ -1522,12 +1549,12 @@ def scanner_delete_file(request):
log_file_operation(scan_id, 'delete', file_path, False, 'Cannot delete protected system file', request=request)
return JsonResponse({'success': False, 'error': 'Cannot delete protected system file', 'error_code': 'PROTECTED_FILE'}, status=403)
# Get website user
try:
user = get_website_user(file_token.domain)
except SecurityError as e:
log_file_operation(scan_id, 'delete', file_path, False, str(e), request=request)
return JsonResponse({'success': False, 'error': str(e)}, status=404)
# Get website user from auth wrapper (already validated during authentication)
user = file_token.external_app
if not user:
error_msg = 'External app not available in auth context'
log_file_operation(scan_id, 'delete', file_path, False, error_msg, request=request)
return JsonResponse({'success': False, 'error': error_msg}, status=500)
# Get file info before deletion
from plogical.processUtilities import ProcessUtilities