comprehensive permissions check in ai scans

This commit is contained in:
usmannasir
2025-06-26 18:14:24 +05:00
parent 286a2ffb46
commit bb9c7f2ecc
2 changed files with 109 additions and 20 deletions

View File

@@ -26,12 +26,14 @@ class AIScannerManager:
try:
admin = Administrator.objects.get(pk=userID)
# Load ACL permissions
currentACL = ACLManager.loadedACL(userID)
# Check ACL permissions (with fallback for new field)
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL.aiScannerAccess == 0:
if currentACL.get('aiScannerAccess', 1) == 0:
return ACLManager.loadError()
except AttributeError:
except (AttributeError, KeyError):
# Field doesn't exist yet, allow access for now
self.logger.writeToFile(f'[AIScannerManager.scannerHome] aiScannerAccess field not found, allowing access')
pass
@@ -45,8 +47,14 @@ class AIScannerManager:
# Get current pricing from API
pricing_data = self.get_ai_scanner_pricing()
# Get recent scan history
recent_scans = ScanHistory.objects.filter(admin=admin)[:10]
# Get recent scan history with ACL respect
if currentACL['admin'] == 1:
# Admin can see all scans
recent_scans = ScanHistory.objects.all().order_by('-started_at')[:10]
else:
# Users can only see their own scans and their sub-users' scans
user_admins = ACLManager.loadUserObjects(userID)
recent_scans = ScanHistory.objects.filter(admin__in=user_admins).order_by('-started_at')[:10]
# Get current balance if payment is configured
current_balance = scanner_settings.balance
@@ -70,10 +78,9 @@ class AIScannerManager:
server_ip = ACLManager.fetchIP()
vps_info = self.check_vps_free_scans(server_ip)
# Get user's websites for scan selection
from websiteFunctions.models import Websites
# Get user's websites for scan selection using ACL-aware method
try:
websites = Websites.objects.filter(admin=admin)
websites = ACLManager.findWebsiteObjects(currentACL, userID)
self.logger.writeToFile(f'[AIScannerManager.scannerHome] Found {websites.count()} websites for {admin.userName}')
except Exception as e:
self.logger.writeToFile(f'[AIScannerManager.scannerHome] Error fetching websites: {str(e)}')
@@ -120,12 +127,14 @@ class AIScannerManager:
admin = Administrator.objects.get(pk=userID)
# Load ACL permissions
currentACL = ACLManager.loadedACL(userID)
# Check ACL permissions (with fallback for new field)
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL.aiScannerAccess == 0:
if currentACL.get('aiScannerAccess', 1) == 0:
return JsonResponse({'success': False, 'error': 'Access denied'})
except AttributeError:
except (AttributeError, KeyError):
# Field doesn't exist yet, allow access for now
pass
@@ -217,12 +226,14 @@ class AIScannerManager:
admin = Administrator.objects.get(pk=userID)
# Load ACL permissions
currentACL = ACLManager.loadedACL(userID)
# Check ACL permissions (with fallback for new field)
try:
currentACL = ACLManager.loadedACL(userID)
if currentACL.aiScannerAccess == 0:
if currentACL.get('aiScannerAccess', 1) == 0:
return JsonResponse({'success': False, 'error': 'Access denied'})
except AttributeError:
except (AttributeError, KeyError):
# Field doesn't exist yet, allow access for now
pass
@@ -242,12 +253,17 @@ class AIScannerManager:
if not domain:
return JsonResponse({'success': False, 'error': 'Domain is required'})
# Validate domain belongs to user
# Validate domain belongs to user using ACL-aware method
from websiteFunctions.models import Websites
try:
website = Websites.objects.get(domain=domain, admin=admin)
# Check if user has access to this domain through ACL system
if not ACLManager.checkOwnership(domain, admin, currentACL):
return JsonResponse({'success': False, 'error': 'Access denied to this domain'})
# Get the website object (we know it exists due to checkOwnership)
website = Websites.objects.get(domain=domain)
except Websites.DoesNotExist:
return JsonResponse({'success': False, 'error': 'Domain not found or access denied'})
return JsonResponse({'success': False, 'error': 'Domain not found'})
# Generate scan ID and file access token
scan_id = f'cp_{uuid.uuid4().hex[:12]}'
@@ -314,6 +330,17 @@ class AIScannerManager:
admin = Administrator.objects.get(pk=userID)
# Load ACL permissions
currentACL = ACLManager.loadedACL(userID)
# Check ACL permissions (with fallback for new field)
try:
if currentACL.get('aiScannerAccess', 1) == 0:
return JsonResponse({'success': False, 'error': 'Access denied'})
except (AttributeError, KeyError):
# Field doesn't exist yet, allow access for now
pass
# Get scanner settings
try:
scanner_settings = AIScannerSettings.objects.get(admin=admin)
@@ -352,6 +379,17 @@ class AIScannerManager:
admin = Administrator.objects.get(pk=userID)
# Load ACL permissions
currentACL = ACLManager.loadedACL(userID)
# Check ACL permissions (with fallback for new field)
try:
if currentACL.get('aiScannerAccess', 1) == 0:
return JsonResponse({'success': False, 'error': 'Access denied'})
except (AttributeError, KeyError):
# Field doesn't exist yet, allow access for now
pass
# Check if user has scanner configured
try:
scanner_settings = AIScannerSettings.objects.get(admin=admin)
@@ -392,6 +430,19 @@ class AIScannerManager:
"""Handle return from adding payment method"""
try:
admin = Administrator.objects.get(pk=userID)
# Load ACL permissions
currentACL = ACLManager.loadedACL(userID)
# Check ACL permissions (with fallback for new field)
try:
if currentACL.get('aiScannerAccess', 1) == 0:
messages.error(request, 'Access denied to AI Scanner')
return redirect('dashboard')
except (AttributeError, KeyError):
# Field doesn't exist yet, allow access for now
pass
status = request.GET.get('status')
# Log all URL parameters for debugging