Files
CyberPanel/plogical/test_backup_compatibility.py

300 lines
11 KiB
Python
Raw Normal View History

#!/usr/local/CyberCP/bin/python
"""
Test script to verify backward compatibility of database backup improvements
Tests both legacy and new backup/restore paths
"""
import os
import sys
import json
import tempfile
import shutil
sys.path.append('/usr/local/CyberCP')
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "CyberCP.settings")
from plogical.mysqlUtilities import mysqlUtilities
from plogical.processUtilities import ProcessUtilities
class BackupCompatibilityTests:
"""Test suite for backup compatibility"""
@staticmethod
def setup_test_environment():
"""Create a test directory for backups"""
test_dir = tempfile.mkdtemp(prefix="cyberpanel_backup_test_")
print(f"Created test directory: {test_dir}")
return test_dir
@staticmethod
def cleanup_test_environment(test_dir):
"""Clean up test directory"""
if os.path.exists(test_dir):
shutil.rmtree(test_dir)
print(f"Cleaned up test directory: {test_dir}")
@staticmethod
def test_config_file():
"""Test configuration file reading"""
print("\n=== Testing Configuration File ===")
config_file = '/usr/local/CyberCP/plogical/backup_config.json'
if os.path.exists(config_file):
with open(config_file, 'r') as f:
config = json.load(f)
print(f"Configuration loaded successfully")
print(f"Use compression: {config['database_backup']['use_compression']}")
print(f"Use new features: {config['database_backup']['use_new_features']}")
print(f"Auto-detect restore: {config['compatibility']['auto_detect_restore']}")
return True
else:
print(f"Configuration file not found at {config_file}")
return False
@staticmethod
def test_helper_functions():
"""Test helper functions"""
print("\n=== Testing Helper Functions ===")
# Test checkNewBackupFeatures
new_features = mysqlUtilities.checkNewBackupFeatures()
print(f"New backup features enabled: {new_features}")
# Test shouldUseCompression
use_compression = mysqlUtilities.shouldUseCompression()
print(f"Compression enabled: {use_compression}")
# Test supportParallelDump
parallel_support = mysqlUtilities.supportParallelDump()
print(f"Parallel dump supported: {parallel_support}")
# Test getNumberOfCores
cores = ProcessUtilities.getNumberOfCores()
print(f"Number of CPU cores: {cores}")
return True
@staticmethod
def test_legacy_backup(test_db="test_legacy_db", test_dir="/tmp"):
"""Test that legacy backups still work"""
print("\n=== Testing Legacy Backup (No Compression, No New Features) ===")
try:
# Create backup with old method
print(f"Creating legacy backup for {test_db}...")
result = mysqlUtilities.createDatabaseBackup(
test_db, test_dir, use_compression=False, use_new_features=False
)
if result == 1:
print(f"✓ Legacy backup created successfully")
# Check that .sql file exists (not .sql.gz)
legacy_file = f"{test_dir}/{test_db}.sql"
if os.path.exists(legacy_file):
file_size = os.path.getsize(legacy_file)
print(f"✓ Legacy backup file exists: {legacy_file}")
print(f" File size: {file_size} bytes")
# Check metadata file
metadata_file = f"{test_dir}/{test_db}.backup.json"
if os.path.exists(metadata_file):
with open(metadata_file, 'r') as f:
metadata = json.load(f)
print(f"✓ Metadata file exists")
print(f" Backup version: {metadata['backup_version']}")
print(f" Compressed: {metadata['compressed']}")
print(f" New features: {metadata['new_features']}")
return True
else:
print(f"✗ Legacy backup file not found: {legacy_file}")
return False
else:
print(f"✗ Legacy backup failed")
return False
except Exception as e:
print(f"✗ Error during legacy backup test: {str(e)}")
return False
@staticmethod
def test_new_backup(test_db="test_new_db", test_dir="/tmp"):
"""Test new compressed backups"""
print("\n=== Testing New Backup (With Compression and New Features) ===")
try:
# Create backup with new method
print(f"Creating compressed backup for {test_db}...")
result = mysqlUtilities.createDatabaseBackup(
test_db, test_dir, use_compression=True, use_new_features=True
)
if result == 1:
print(f"✓ New backup created successfully")
# Check that .sql.gz file exists
compressed_file = f"{test_dir}/{test_db}.sql.gz"
if os.path.exists(compressed_file):
file_size = os.path.getsize(compressed_file)
print(f"✓ Compressed backup file exists: {compressed_file}")
print(f" File size: {file_size} bytes")
# Check metadata file
metadata_file = f"{test_dir}/{test_db}.backup.json"
if os.path.exists(metadata_file):
with open(metadata_file, 'r') as f:
metadata = json.load(f)
print(f"✓ Metadata file exists")
print(f" Backup version: {metadata['backup_version']}")
print(f" Compressed: {metadata['compressed']}")
print(f" New features: {metadata['new_features']}")
return True
else:
print(f"✗ Compressed backup file not found: {compressed_file}")
# Check if legacy file was created instead
legacy_file = f"{test_dir}/{test_db}.sql"
if os.path.exists(legacy_file):
print(f" Note: Legacy file exists instead: {legacy_file}")
return False
else:
print(f"✗ New backup failed")
return False
except Exception as e:
print(f"✗ Error during new backup test: {str(e)}")
return False
@staticmethod
def test_format_detection(test_dir="/tmp"):
"""Test backup format auto-detection"""
print("\n=== Testing Format Detection ===")
# Test detection of compressed backup
test_db = "test_detect"
# Create a dummy compressed backup
compressed_file = f"{test_dir}/{test_db}.sql.gz"
with open(compressed_file, 'wb') as f:
f.write(b'\x1f\x8b\x08\x00\x00\x00\x00\x00') # gzip header
# Create metadata
metadata = {
'database': test_db,
'compressed': True,
'new_features': True,
'backup_version': '2.0'
}
metadata_file = f"{test_dir}/{test_db}.backup.json"
with open(metadata_file, 'w') as f:
json.dump(metadata, f)
# Test detection
detected_format = mysqlUtilities.detectBackupFormat(test_dir, test_db)
print(f"Detected format for compressed backup:")
print(f" Compressed: {detected_format['compressed']}")
print(f" New features: {detected_format['new_features']}")
print(f" Version: {detected_format['backup_version']}")
# Clean up test files
os.remove(compressed_file)
os.remove(metadata_file)
# Create a dummy uncompressed backup
uncompressed_file = f"{test_dir}/{test_db}.sql"
with open(uncompressed_file, 'w') as f:
f.write("-- MySQL dump\n")
# Test detection without metadata
detected_format = mysqlUtilities.detectBackupFormat(test_dir, test_db)
print(f"\nDetected format for uncompressed backup (no metadata):")
print(f" Compressed: {detected_format['compressed']}")
print(f" New features: {detected_format['new_features']}")
print(f" Version: {detected_format['backup_version']}")
# Clean up
os.remove(uncompressed_file)
return True
@staticmethod
def test_mysqldump_command():
"""Test mysqldump command building"""
print("\n=== Testing MySQL Dump Command Building ===")
# Test legacy command
legacy_cmd = mysqlUtilities.buildMysqldumpCommand(
"root", "localhost", "3306", "test_db",
use_new_features=False, use_compression=False
)
print(f"Legacy command: {legacy_cmd}")
# Test new command with features
new_cmd = mysqlUtilities.buildMysqldumpCommand(
"root", "localhost", "3306", "test_db",
use_new_features=True, use_compression=True
)
print(f"New command: {new_cmd}")
return True
@staticmethod
def run_all_tests():
"""Run all compatibility tests"""
print("=" * 60)
print("CyberPanel Database Backup Compatibility Test Suite")
print("=" * 60)
all_passed = True
# Test configuration
if not BackupCompatibilityTests.test_config_file():
all_passed = False
# Test helper functions
if not BackupCompatibilityTests.test_helper_functions():
all_passed = False
# Test mysqldump command building
if not BackupCompatibilityTests.test_mysqldump_command():
all_passed = False
# Setup test environment
test_dir = BackupCompatibilityTests.setup_test_environment()
try:
# Test format detection
if not BackupCompatibilityTests.test_format_detection(test_dir):
all_passed = False
# Note: Actual backup/restore tests would require a real database
# These are commented out but show the structure
# # Test legacy backup
# if not BackupCompatibilityTests.test_legacy_backup("test_db", test_dir):
# all_passed = False
# # Test new backup
# if not BackupCompatibilityTests.test_new_backup("test_db", test_dir):
# all_passed = False
finally:
# Cleanup
BackupCompatibilityTests.cleanup_test_environment(test_dir)
print("\n" + "=" * 60)
if all_passed:
print("✓ All tests passed successfully!")
print("The backup system is fully backward compatible.")
else:
print("✗ Some tests failed. Please check the output above.")
print("=" * 60)
return all_passed
if __name__ == "__main__":
# Run the test suite
success = BackupCompatibilityTests.run_all_tests()
sys.exit(0 if success else 1)