Merge pull request #1606 from master3395/v2.5.5-dev

Implement ImunifyAV asset management and routing
This commit is contained in:
Master3395
2025-11-15 23:25:45 +01:00
committed by GitHub
8 changed files with 352 additions and 13 deletions

View File

@@ -249,6 +249,30 @@ pattern_to_watch = ^/home/.+?/(public_html|public_ftp|private_html)(/.*)?$
except BaseException as msg:
logging.CyberCPLogFileWriter.statusWriter(ServerStatusUtil.lswsInstallStatusPath, str(msg) + ' [404].', 1)
@staticmethod
def _ensure_imunifyav_assets(statusFile):
try:
commands = [
'mkdir -p /etc/sysconfig/imunify360/generic',
'mkdir -p /usr/local/CyberCP/public/imunifyav',
'chown -R lscpd:lscpd /usr/local/CyberCP/public/imunifyav 2>/dev/null || true',
'chmod 755 /usr/local/CyberCP/public/imunifyav 2>/dev/null || true',
'chown -R lscpd:lscpd /etc/sysconfig/imunify360 2>/dev/null || true'
]
for command in commands:
ServerStatusUtil.executioner(command, statusFile)
if os.path.exists('/etc/redhat-release'):
pkg_cmd = 'yum install -y imunify-ui-generic imunify-antivirus || yum reinstall -y imunify-ui-generic'
else:
pkg_cmd = 'apt-get update -y >/dev/null 2>&1 && apt-get install -y imunify-antivirus || true'
ServerStatusUtil.executioner(pkg_cmd, statusFile)
except BaseException as msg:
logging.CyberCPLogFileWriter.statusWriter(ServerStatusUtil.lswsInstallStatusPath,
f"ImunifyAV asset verification warning: {str(msg)}\n", 1)
@staticmethod
def submitinstallImunifyAV():
try:
@@ -327,6 +351,8 @@ ui_path_owner = lscpd:lscpd
command = 'bash imav-deploy.sh --yes'
ServerStatusUtil.executioner(command, statusFile)
CageFS._ensure_imunifyav_assets(statusFile)
logging.CyberCPLogFileWriter.statusWriter(ServerStatusUtil.lswsInstallStatusPath,
"ImunifyAV reinstalled..\n", 1)

View File

@@ -15,10 +15,13 @@ Including another URLconf
"""
from django.urls import path, re_path, include
from django.contrib import admin
from firewall import views as firewall_views
urlpatterns = [
path('base/', include('baseTemplate.urls')),
path('', include('loginSystem.urls')),
path('imunifyav/', firewall_views.imunifyAV, name='imunifyav_root'),
path('ImunifyAV/', firewall_views.imunifyAV, name='imunifyav_root_legacy'),
path('packages/', include('packages.urls')),
path('websites/', include('websiteFunctions.urls')),
path('tuning/', include('tuning.urls')),

View File

@@ -0,0 +1,103 @@
import sys
import types
import unittest
from unittest import mock
# Provide minimal stubs when running tests outside the target servers.
if 'django' not in sys.modules:
django_stub = types.ModuleType("django")
django_stub.setup = lambda: None
sys.modules['django'] = django_stub
if 'MySQLdb' not in sys.modules:
mysql_stub = types.ModuleType("MySQLdb")
mysql_stub.connect = lambda *args, **kwargs: None
cursors_stub = types.SimpleNamespace(SSCursor=object)
mysql_stub.cursors = cursors_stub
sys.modules['MySQLdb'] = mysql_stub
sys.modules['MySQLdb.cursors'] = types.ModuleType("MySQLdb.cursors")
sys.modules['MySQLdb.cursors'].SSCursor = object
from plogical import mysqlUtilities
class DummyConnection:
def __init__(self, literal_side_effect=None):
self.literal_calls = []
self.literal_side_effect = literal_side_effect
self.closed = False
def literal(self, value):
if self.literal_side_effect:
raise self.literal_side_effect
self.literal_calls.append(value)
return f"'{value}'"
def close(self):
self.closed = True
class DummyCursor:
def __init__(self):
self.executed = []
self.fetchone_value = None
def execute(self, query, params=None):
self.executed.append((query, params))
def fetchone(self):
return self.fetchone_value
class MysqlUtilitiesChangePasswordTests(unittest.TestCase):
def test_numeric_password_is_coerced_and_sanitized(self):
connection = DummyConnection()
cursor = DummyCursor()
with mock.patch.object(mysqlUtilities.mysqlUtilities, 'setupConnection', return_value=(connection, cursor)), \
mock.patch.object(mysqlUtilities.mysqlUtilities, 'resolve_mysql_username', return_value='demo_user'), \
mock.patch('plogical.mysqlUtilities.os.path.exists', return_value=False):
result = mysqlUtilities.mysqlUtilities.changePassword('demo_user', 987654321)
self.assertEqual(result, 1)
self.assertIn('987654321', connection.literal_calls)
self.assertEqual(len(cursor.executed), 2) # USE mysql + password update
self.assertIn("PASSWORD('987654321')", cursor.executed[-1][0])
def test_logs_error_when_literal_fails(self):
connection = DummyConnection(literal_side_effect=ValueError("boom"))
cursor = DummyCursor()
with mock.patch.object(mysqlUtilities.mysqlUtilities, 'setupConnection', return_value=(connection, cursor)), \
mock.patch.object(mysqlUtilities.mysqlUtilities, 'resolve_mysql_username', return_value='demo_user'), \
mock.patch('plogical.mysqlUtilities.os.path.exists', return_value=False), \
mock.patch('plogical.mysqlUtilities.logging.CyberCPLogFileWriter.writeToFile') as log_mock:
result = mysqlUtilities.mysqlUtilities.changePassword('demo_user', 'secret')
self.assertEqual(result, 0)
log_calls = [call.args[0] for call in log_mock.call_args_list]
self.assertTrue(any('[mysqlUtilities.changePassword.literal]' in msg for msg in log_calls))
class MysqlUtilitiesResolveUserTests(unittest.TestCase):
def test_resolves_username_when_orm_missing(self):
class _StubManager:
def get(self, *args, **kwargs):
raise Exception("not found")
dbusers_stub = types.SimpleNamespace(objects=_StubManager())
databases_stub = types.SimpleNamespace(objects=_StubManager())
cursor = DummyCursor()
cursor.fetchone_value = None
with mock.patch('plogical.mysqlUtilities.DBUsers', dbusers_stub, create=True), \
mock.patch('plogical.mysqlUtilities.Databases', databases_stub, create=True):
resolved = mysqlUtilities.mysqlUtilities.resolve_mysql_username('mystery_user', cursor)
self.assertEqual(resolved, 'mystery_user')
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,102 @@
#!/usr/bin/env python3
"""
Lightweight regression test for the /imunifyav route.
Authenticates by injecting a valid CyberPanel session (using the first Administrator record),
requests both /imunifyav and /ImunifyAV, and records the HTTP status alongside the
imunify-antivirus service state.
"""
import json
import os
import subprocess
import sys
from datetime import datetime
from pathlib import Path
ROOT_DIR = Path(__file__).resolve().parents[2]
sys.path.append(str(ROOT_DIR))
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "CyberCP.settings")
import django # noqa: E402
django.setup()
from django.test import Client # noqa: E402
from loginSystem.models import Administrator # noqa: E402
LOG_PATH = ROOT_DIR / "test_logs" / "imunifyav_route.log"
LOG_PATH.parent.mkdir(parents=True, exist_ok=True)
def get_service_state():
try:
output = subprocess.check_output(
["systemctl", "is-active", "imunify-antivirus"],
stderr=subprocess.STDOUT,
timeout=10,
text=True,
).strip()
return output
except (FileNotFoundError, subprocess.CalledProcessError, subprocess.TimeoutExpired):
return "unavailable"
def perform_request(client, path):
try:
response = client.get(path, follow=True)
return response.status_code, None
except Exception as exc: # pragma: no cover - diagnostic logging
return None, str(exc)
def build_client():
admin = Administrator.objects.first()
if not admin:
return None, "No administrator records available to seed the session."
client = Client()
session = client.session
session["userID"] = admin.pk
session.save()
return client, None
def log_result(entry):
entry["timestamp"] = datetime.utcnow().isoformat() + "Z"
with open(LOG_PATH, "a", encoding="utf-8") as handle:
handle.write(json.dumps(entry, ensure_ascii=False) + "\n")
def main():
client, client_error = build_client()
service_state = get_service_state()
results = {
"module": "imunifyav_route_check",
"service_state": service_state,
"retry": 0,
"errors": [],
"responses": {},
}
if client is None:
results["errors"].append(client_error or "Unable to initialize Django test client.")
log_result(results)
print(json.dumps(results, indent=2))
sys.exit(1)
for path in ("/imunifyav/", "/ImunifyAV/"):
status_code, error = perform_request(client, path)
results["responses"][path] = status_code
if error:
results["errors"].append(f"{path}: {error}")
log_result(results)
print(json.dumps(results, indent=2))
if results["errors"]:
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -8,6 +8,7 @@ sys.path.append('/usr/local/CyberCP')
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "CyberCP.settings")
django.setup()
from django.http import HttpResponse
from django.db.models import Q
import json
from plogical.acl import ACLManager
import plogical.CyberCPLogFileWriter as logging
@@ -227,19 +228,26 @@ class DatabaseManager:
return ACLManager.loadErrorJson('changePasswordStatus', 0)
userName = data['dbUserName']
dbPassword = data['dbPassword']
dbPassword = str(data['dbPassword']) if data['dbPassword'] is not None else ''
db = Databases.objects.filter(dbUser=userName)
db_queryset = Databases.objects.filter(Q(dbUser=userName) | Q(dbName=userName))
if not db_queryset.exists():
data_ret = {'status': 0, 'changePasswordStatus': 0,
'error_message': "Database or database user could not be found."}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
database_obj = db_queryset.first()
admin = Administrator.objects.get(pk=userID)
if ACLManager.checkOwnership(db[0].website.domain, admin, currentACL) == 1:
if ACLManager.checkOwnership(database_obj.website.domain, admin, currentACL) == 1:
pass
else:
return ACLManager.loadErrorJson()
try:
meta = DBMeta.objects.get(database=db[0], key=DatabaseManager.REMOTE_ACCESS)
meta = DBMeta.objects.get(database=database_obj, key=DatabaseManager.REMOTE_ACCESS)
host = json.loads(meta.value)['remoteIP']
except:
host = None

View File

@@ -27,7 +27,7 @@
<p>{% trans "ImunifyAV is now integrated via their new API. You can manage Imunify by clicking below. You can use your server root credentials to access Imunify." %}</p>
<br>
<a target="_blank" href="/imunifyav">
<a target="_blank" href="{% url 'imunifyav_root' %}">
<button class="btn btn-primary">Access Now
</button>
</a>

View File

@@ -216,6 +216,34 @@ fix_missing_dependencies() {
print_status "$GREEN" "✅ Missing dependencies fixed"
}
ensure_imunify_av_assets() {
local package_manager=$1
print_status "$BLUE" "🔐 Ensuring ImunifyAV UI assets are installed..."
mkdir -p /etc/sysconfig/imunify360/generic 2>/dev/null || true
mkdir -p /usr/local/CyberCP/public/imunifyav 2>/dev/null || true
case $package_manager in
"yum"|"dnf")
if ! $package_manager install -y imunify-ui-generic imunify-antivirus >/dev/null 2>&1; then
print_status "$YELLOW" "⚠️ Unable to install Imunify packages via $package_manager (will continue)."
fi
;;
"apt")
apt-get update -y >/dev/null 2>&1 || true
if ! apt-get install -y imunify-antivirus >/dev/null 2>&1; then
print_status "$YELLOW" "⚠️ imunify-antivirus package not available in APT repositories."
fi
;;
esac
chown -R lscpd:lscpd /usr/local/CyberCP/public/imunifyav 2>/dev/null || true
chown -R lscpd:lscpd /etc/sysconfig/imunify360 2>/dev/null || true
print_status "$GREEN" "✅ ImunifyAV assets verified"
}
# Function to check service status
check_service_status() {
local service_name=$1
@@ -352,6 +380,9 @@ apply_cyberpanel_fixes() {
# Fix missing dependencies
fix_missing_dependencies "$package_manager"
# Ensure ImunifyAV UI assets are always present
ensure_imunify_av_assets "$package_manager"
print_status "$GREEN" "✅ All CyberPanel fixes applied successfully"

View File

@@ -921,15 +921,31 @@ password=%s
else:
LOCALHOST = mysqlUtilities.LOCALHOST
if encrypt == None:
try:
dbuser = DBUsers.objects.get(user=userName)
query = "SET PASSWORD FOR '" + userName + "'@'%s' = PASSWORD('" % (LOCALHOST) + dbPassword + "')"
except:
userName = mysqlUtilities.fetchuser(userName)
query = "SET PASSWORD FOR '" + userName + "'@'%s' = PASSWORD('" % (LOCALHOST) + dbPassword + "')"
password_value = '' if dbPassword is None else str(dbPassword)
resolved_user = mysqlUtilities.resolve_mysql_username(userName, cursor)
sanitized_user = mysqlUtilities._sanitize_mysql_identifier(resolved_user)
sanitized_host = mysqlUtilities._sanitize_mysql_identifier(LOCALHOST)
try:
literal_password = connection.literal(password_value)
if isinstance(literal_password, bytes):
literal_password = literal_password.decode('utf-8')
except BaseException as msg:
logging.CyberCPLogFileWriter.writeToFile('%s [mysqlUtilities.changePassword.literal]' % (str(msg)))
return 0
if encrypt is None:
query = "SET PASSWORD FOR '{user}'@'{host}' = PASSWORD({password})".format(
user=sanitized_user,
host=sanitized_host,
password=literal_password
)
else:
query = "SET PASSWORD FOR '" + userName + "'@'%s' = '" % (LOCALHOST) + dbPassword + "'"
query = "SET PASSWORD FOR '{user}'@'{host}' = {password}".format(
user=sanitized_user,
host=sanitized_host,
password=literal_password
)
if os.path.exists(ProcessUtilities.debugPath):
logging.CyberCPLogFileWriter.writeToFile(query)
@@ -983,6 +999,56 @@ password=%s
logging.CyberCPLogFileWriter.writeToFile(str(msg) + "[mysqlUtilities.fetchuser]")
return 0
@staticmethod
def _sanitize_mysql_identifier(value):
if value is None:
return ''
return str(value).replace("'", "''").strip()
@staticmethod
def resolve_mysql_username(identifier, cursor=None):
"""
Resolve the actual MySQL username backing the supplied identifier.
Handles DBUsers records, Databases ORM mappings and falls back to mysql.db lookups.
"""
candidate = '' if identifier is None else str(identifier).strip()
if len(candidate) == 0:
return candidate
try:
if DBUsers:
db_user = DBUsers.objects.get(user=candidate)
return db_user.user
except BaseException:
pass
try:
if Databases:
database = Databases.objects.get(dbUser=candidate)
return database.dbUser
except BaseException:
pass
try:
if Databases:
database = Databases.objects.get(dbName=candidate)
if database.dbUser:
return database.dbUser
except BaseException:
pass
if cursor is not None:
try:
cursor.execute("SELECT user FROM mysql.db WHERE db = %s LIMIT 1", (candidate,))
row = cursor.fetchone()
if row and row[0]:
return row[0]
except BaseException as msg:
logging.CyberCPLogFileWriter.writeToFile(str(msg) + "[mysqlUtilities.resolve_mysql_username]")
return candidate
@staticmethod
def allowRemoteAccess(dbName, userName, remoteIP):
try: