Implement comprehensive security for Docker container command execution

- Add command whitelist validation with 60+ safe commands
- Implement multi-layer security: whitelist + blacklist + specific rules
- Add rate limiting: max 10 commands per minute per user-container
- Enable comprehensive logging for all command executions
- Add input validation for container names and command syntax
- Implement output size limits to prevent memory exhaustion
- Allow privileged mode but restrict through command validation
- Add specific validation rules for systemctl, kill, wget/curl commands
- Block dangerous patterns: command injection, path traversal, destructive operations
- Maintain ACL-based container ownership verification
This commit is contained in:
usmannasir
2025-09-10 14:23:40 +05:00
parent fb7bc20fa3
commit b05d9cb5bb

View File

@@ -1323,93 +1323,314 @@ class ContainerManager(multi.Thread):
def executeContainerCommand(self, userID=None, data=None):
"""
Execute a command inside a running Docker container
Execute a SAFE command inside a running Docker container with comprehensive security checks
"""
try:
# Input validation
if not data or 'name' not in data or 'command' not in data:
data_ret = {'commandStatus': 0, 'error_message': 'Missing required parameters: name and command'}
return HttpResponse(json.dumps(data_ret))
name = data['name']
command = data['command']
command = data['command'].strip()
# Check if container is registered in database or unlisted
# Validate container name
if not self._validate_container_name(name):
data_ret = {'commandStatus': 0, 'error_message': 'Invalid container name'}
return HttpResponse(json.dumps(data_ret))
# Validate and sanitize command
validation_result = self._validate_command(command)
if not validation_result['valid']:
data_ret = {'commandStatus': 0, 'error_message': validation_result['reason']}
return HttpResponse(json.dumps(data_ret))
# Check container ownership
if Containers.objects.filter(name=name).exists():
if ACLManager.checkContainerOwnership(name, userID) != 1:
return ACLManager.loadErrorJson('commandStatus', 0)
client = docker.from_env()
dockerAPI = docker.APIClient()
# Rate limiting check
if not self._check_rate_limit(userID, name):
data_ret = {'commandStatus': 0, 'error_message': 'Rate limit exceeded. Please wait before executing more commands'}
return HttpResponse(json.dumps(data_ret))
client = docker.from_env()
try:
container = client.containers.get(name)
except docker.errors.NotFound as err:
except docker.errors.NotFound:
data_ret = {'commandStatus': 0, 'error_message': 'Container does not exist'}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
except:
data_ret = {'commandStatus': 0, 'error_message': 'Unknown error'}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
return HttpResponse(json.dumps(data_ret))
except Exception as err:
data_ret = {'commandStatus': 0, 'error_message': f'Error accessing container: {str(err)}'}
return HttpResponse(json.dumps(data_ret))
# Check if container is running
if container.status != 'running':
data_ret = {'commandStatus': 0, 'error_message': 'Container must be running to execute commands'}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
return HttpResponse(json.dumps(data_ret))
# Log the command execution attempt
self._log_command_execution(userID, name, command)
try:
# Execute command in container
# Split command into parts for proper execution
# Parse command safely
import shlex
command_parts = shlex.split(command)
try:
command_parts = shlex.split(command)
except ValueError as e:
data_ret = {'commandStatus': 0, 'error_message': f'Invalid command syntax: {str(e)}'}
return HttpResponse(json.dumps(data_ret))
# Execute command with proper shell
# Execute command with security restrictions
# Note: Some commands may need privileged access, but we validate commands first
exec_result = container.exec_run(
command_parts,
stdout=True,
stderr=True,
stdin=False,
tty=False,
privileged=False,
user='',
privileged=True, # Allow privileged mode since commands are whitelisted
user='', # Use container's default user (often root, but commands are validated)
detach=False,
demux=False,
workdir=None,
environment=None
workdir=None, # Use container's default working directory
environment=None # Use container's default environment
)
# Get output and exit code
output = exec_result.output.decode('utf-8') if exec_result.output else ''
output = exec_result.output.decode('utf-8', errors='replace') if exec_result.output else ''
exit_code = exec_result.exit_code
# Format the response
if exit_code == 0:
data_ret = {
'commandStatus': 1,
'error_message': 'None',
'output': output,
'exit_code': exit_code,
'command': command
}
else:
data_ret = {
'commandStatus': 1,
'error_message': 'Command executed with non-zero exit code',
'output': output,
'exit_code': exit_code,
'command': command
}
# Limit output size to prevent memory exhaustion
if len(output) > 10000: # 10KB limit
output = output[:10000] + "\n[Output truncated - exceeded 10KB limit]"
json_data = json.dumps(data_ret, ensure_ascii=False)
return HttpResponse(json_data)
# Log successful execution
self._log_command_result(userID, name, command, exit_code, len(output))
# Format the response
data_ret = {
'commandStatus': 1,
'error_message': 'None' if exit_code == 0 else f'Command executed with exit code {exit_code}',
'output': output,
'exit_code': exit_code,
'command': command,
'timestamp': time.time()
}
return HttpResponse(json.dumps(data_ret, ensure_ascii=False))
except docker.errors.APIError as err:
data_ret = {'commandStatus': 0, 'error_message': f'Docker API error: {str(err)}'}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
error_msg = f'Docker API error: {str(err)}'
self._log_command_error(userID, name, command, error_msg)
data_ret = {'commandStatus': 0, 'error_message': error_msg}
return HttpResponse(json.dumps(data_ret))
except Exception as err:
data_ret = {'commandStatus': 0, 'error_message': f'Execution error: {str(err)}'}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
error_msg = f'Execution error: {str(err)}'
self._log_command_error(userID, name, command, error_msg)
data_ret = {'commandStatus': 0, 'error_message': error_msg}
return HttpResponse(json.dumps(data_ret))
except BaseException as msg:
data_ret = {'commandStatus': 0, 'error_message': str(msg)}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
except Exception as msg:
error_msg = f'System error: {str(msg)}'
logging.CyberCPLogFileWriter.writeToFile(f'executeContainerCommand error: {error_msg}')
data_ret = {'commandStatus': 0, 'error_message': error_msg}
return HttpResponse(json.dumps(data_ret))
# Security helper methods for executeContainerCommand
def _validate_container_name(self, name):
"""Validate container name to prevent injection"""
if not name or len(name) > 100:
return False
# Allow only alphanumeric, hyphens, underscores, and dots
import re
return re.match(r'^[a-zA-Z0-9._-]+$', name) is not None
def _validate_command(self, command):
"""Comprehensive command validation with whitelist approach"""
if not command or len(command) > 1000: # Reasonable command length limit
return {'valid': False, 'reason': 'Command is empty or too long (max 1000 characters)'}
# Define allowed commands (whitelist approach)
ALLOWED_COMMANDS = {
# System information
'whoami', 'id', 'pwd', 'date', 'uptime', 'hostname', 'uname', 'df', 'free', 'lscpu',
# File operations (safe and necessary)
'ls', 'cat', 'head', 'tail', 'wc', 'find', 'file', 'stat', 'du', 'tree',
'mkdir', 'touch', 'ln', 'readlink',
# Process monitoring
'ps', 'top', 'htop', 'jobs', 'pgrep', 'pkill', 'killall', 'kill',
# Network tools
'ping', 'wget', 'curl', 'nslookup', 'dig', 'netstat', 'ss', 'ifconfig', 'ip',
# Text processing
'grep', 'awk', 'sed', 'sort', 'uniq', 'cut', 'tr', 'wc', 'diff',
# Package management
'dpkg', 'rpm', 'yum', 'apt', 'apt-get', 'apt-cache', 'aptitude',
'pip', 'pip3', 'npm', 'composer', 'gem',
# Environment and system
'env', 'printenv', 'which', 'type', 'locale', 'timedatectl',
# Archives and compression
'tar', 'gzip', 'gunzip', 'zip', 'unzip',
# Editors (safe ones)
'nano', 'vi', 'vim',
# Database clients
'mysql', 'psql', 'sqlite3', 'redis-cli', 'mongo',
# Development tools
'git', 'node', 'python', 'python3', 'php', 'ruby', 'perl', 'java',
# System services (read-only operations)
'systemctl', 'service', 'journalctl',
# Safe utilities
'echo', 'printf', 'test', 'expr', 'basename', 'dirname', 'realpath',
'sleep', 'timeout', 'watch', 'yes', 'seq',
# Log viewing
'dmesg', 'last', 'lastlog', 'w', 'who'
}
# Dangerous commands/patterns (blacklist - these override the whitelist)
DANGEROUS_PATTERNS = [
# Command injection patterns
';', '&&', '||', '`', '$(',
# Path traversal
'../', '~/',
# Destructive file operations
'rm -rf', 'rm -r', 'dd if=', 'dd of=', '>>', 'mkfs', 'fdisk',
# System modification
'mount', 'umount', 'crontab -e', 'crontab -r',
# Package installation/removal (allow read-only package commands)
'apt install', 'apt remove', 'apt purge', 'apt-get install',
'apt-get remove', 'apt-get purge', 'yum install', 'yum remove',
'pip install', 'pip uninstall', 'npm install -g', 'gem install',
# Dangerous network utilities
'nc ', 'netcat', 'ncat', 'telnet', 'ssh ', 'scp ', 'rsync',
# Shell escapes and dangerous execution
'bash', 'sh ', '/bin/sh', '/bin/bash', 'sudo', 'su ', 'exec',
'chroot', 'docker ', 'systemctl start', 'systemctl stop',
'systemctl enable', 'systemctl disable', 'service start',
'service stop', 'service restart'
]
command_lower = command.lower()
# Check for dangerous patterns
for pattern in DANGEROUS_PATTERNS:
if pattern in command_lower:
return {'valid': False, 'reason': f'Command contains dangerous pattern: {pattern}'}
# Extract base command
first_word = command.strip().split()[0] if command.strip() else ''
base_command = first_word.split('/')[-1] # Remove path if present
# Check if base command is in whitelist
if base_command not in ALLOWED_COMMANDS:
return {'valid': False, 'reason': f'Command "{base_command}" is not in the allowed list'}
# Additional checks for specific commands
if base_command in ['find']:
# Ensure no -exec or dangerous flags
if '-exec' in command_lower or '-delete' in command_lower:
return {'valid': False, 'reason': 'Dangerous flags (-exec, -delete) not allowed with find'}
if base_command in ['systemctl', 'service']:
# Only allow read-only operations
readonly_ops = ['status', 'show', 'list-units', 'list-unit-files', 'is-active', 'is-enabled']
if not any(op in command_lower for op in readonly_ops):
return {'valid': False, 'reason': 'Only read-only operations allowed for systemctl/service'}
if base_command in ['kill', 'pkill', 'killall']:
# Ensure no dangerous signals
if '-9' in command or 'SIGKILL' in command.upper():
return {'valid': False, 'reason': 'SIGKILL (-9) not allowed for safety'}
if base_command in ['wget', 'curl']:
# Ensure no output redirection to critical system locations
critical_paths = ['/etc/', '/boot/', '/usr/bin/', '/bin/', '/sbin/', '/usr/sbin/']
if any(path in command_lower for path in critical_paths):
return {'valid': False, 'reason': 'Cannot download to critical system directories'}
return {'valid': True, 'reason': 'Command passed validation'}
def _check_rate_limit(self, userID, containerName):
"""Simple rate limiting: max 10 commands per minute per user-container pair"""
import time
import os
# Create rate limit tracking directory
rate_limit_dir = '/tmp/cyberpanel_docker_rate_limit'
if not os.path.exists(rate_limit_dir):
try:
os.makedirs(rate_limit_dir, mode=0o755)
except:
# If we can't create rate limit tracking, allow the command but log it
logging.CyberCPLogFileWriter.writeToFile('Warning: Could not create rate limit directory')
return True
# Rate limit file per user-container
rate_file = os.path.join(rate_limit_dir, f'user_{userID}_container_{containerName}')
current_time = time.time()
try:
# Read existing timestamps
timestamps = []
if os.path.exists(rate_file):
with open(rate_file, 'r') as f:
timestamps = [float(line.strip()) for line in f if line.strip()]
# Remove timestamps older than 1 minute
recent_timestamps = [ts for ts in timestamps if current_time - ts < 60]
# Check if limit exceeded
if len(recent_timestamps) >= 10:
return False
# Add current timestamp
recent_timestamps.append(current_time)
# Write back to file
with open(rate_file, 'w') as f:
for ts in recent_timestamps:
f.write(f'{ts}\n')
return True
except Exception as e:
# If rate limiting fails, log but allow the command
logging.CyberCPLogFileWriter.writeToFile(f'Rate limiting error: {str(e)}')
return True
def _log_command_execution(self, userID, containerName, command):
"""Log command execution attempts for security monitoring"""
try:
from loginSystem.models import Administrator
admin = Administrator.objects.get(pk=userID)
username = admin.userName
except:
username = f'UserID_{userID}'
log_message = f'DOCKER_COMMAND_EXEC: User={username} Container={containerName} Command="{command}" Time={time.time()}'
logging.CyberCPLogFileWriter.writeToFile(log_message)
def _log_command_result(self, userID, containerName, command, exitCode, outputLength):
"""Log command execution results"""
try:
from loginSystem.models import Administrator
admin = Administrator.objects.get(pk=userID)
username = admin.userName
except:
username = f'UserID_{userID}'
log_message = f'DOCKER_COMMAND_RESULT: User={username} Container={containerName} ExitCode={exitCode} OutputLength={outputLength} Time={time.time()}'
logging.CyberCPLogFileWriter.writeToFile(log_message)
def _log_command_error(self, userID, containerName, command, errorMsg):
"""Log command execution errors"""
try:
from loginSystem.models import Administrator
admin = Administrator.objects.get(pk=userID)
username = admin.userName
except:
username = f'UserID_{userID}'
log_message = f'DOCKER_COMMAND_ERROR: User={username} Container={containerName} Error="{errorMsg}" Command="{command[:100]}" Time={time.time()}'
logging.CyberCPLogFileWriter.writeToFile(log_message)