Implement ImunifyAV asset management and routing

- Added a static method to ensure ImunifyAV assets are created and permissions set correctly in CageFS.py.
- Updated the URL routing in urls.py to include paths for ImunifyAV, supporting both legacy and new routes.
- Modified the ImunifyAV HTML template to use Django's URL template tag for better maintainability.
- Enhanced the cyberpanel_fixes.sh script to ensure ImunifyAV UI assets are installed during fixes.
- Improved database user resolution and password handling in mysqlUtilities.py for better security and reliability.

This update enhances the integration and management of ImunifyAV within the CyberPanel environment.
This commit is contained in:
Master3395
2025-11-15 23:25:13 +01:00
parent 34f10cebe3
commit 0aca2a5aaf
8 changed files with 352 additions and 13 deletions

View File

@@ -249,6 +249,30 @@ pattern_to_watch = ^/home/.+?/(public_html|public_ftp|private_html)(/.*)?$
except BaseException as msg:
logging.CyberCPLogFileWriter.statusWriter(ServerStatusUtil.lswsInstallStatusPath, str(msg) + ' [404].', 1)
@staticmethod
def _ensure_imunifyav_assets(statusFile):
try:
commands = [
'mkdir -p /etc/sysconfig/imunify360/generic',
'mkdir -p /usr/local/CyberCP/public/imunifyav',
'chown -R lscpd:lscpd /usr/local/CyberCP/public/imunifyav 2>/dev/null || true',
'chmod 755 /usr/local/CyberCP/public/imunifyav 2>/dev/null || true',
'chown -R lscpd:lscpd /etc/sysconfig/imunify360 2>/dev/null || true'
]
for command in commands:
ServerStatusUtil.executioner(command, statusFile)
if os.path.exists('/etc/redhat-release'):
pkg_cmd = 'yum install -y imunify-ui-generic imunify-antivirus || yum reinstall -y imunify-ui-generic'
else:
pkg_cmd = 'apt-get update -y >/dev/null 2>&1 && apt-get install -y imunify-antivirus || true'
ServerStatusUtil.executioner(pkg_cmd, statusFile)
except BaseException as msg:
logging.CyberCPLogFileWriter.statusWriter(ServerStatusUtil.lswsInstallStatusPath,
f"ImunifyAV asset verification warning: {str(msg)}\n", 1)
@staticmethod
def submitinstallImunifyAV():
try:
@@ -327,6 +351,8 @@ ui_path_owner = lscpd:lscpd
command = 'bash imav-deploy.sh --yes'
ServerStatusUtil.executioner(command, statusFile)
CageFS._ensure_imunifyav_assets(statusFile)
logging.CyberCPLogFileWriter.statusWriter(ServerStatusUtil.lswsInstallStatusPath,
"ImunifyAV reinstalled..\n", 1)

View File

@@ -15,10 +15,13 @@ Including another URLconf
"""
from django.urls import path, re_path, include
from django.contrib import admin
from firewall import views as firewall_views
urlpatterns = [
path('base/', include('baseTemplate.urls')),
path('', include('loginSystem.urls')),
path('imunifyav/', firewall_views.imunifyAV, name='imunifyav_root'),
path('ImunifyAV/', firewall_views.imunifyAV, name='imunifyav_root_legacy'),
path('packages/', include('packages.urls')),
path('websites/', include('websiteFunctions.urls')),
path('tuning/', include('tuning.urls')),

View File

@@ -0,0 +1,103 @@
import sys
import types
import unittest
from unittest import mock
# Provide minimal stubs when running tests outside the target servers.
if 'django' not in sys.modules:
django_stub = types.ModuleType("django")
django_stub.setup = lambda: None
sys.modules['django'] = django_stub
if 'MySQLdb' not in sys.modules:
mysql_stub = types.ModuleType("MySQLdb")
mysql_stub.connect = lambda *args, **kwargs: None
cursors_stub = types.SimpleNamespace(SSCursor=object)
mysql_stub.cursors = cursors_stub
sys.modules['MySQLdb'] = mysql_stub
sys.modules['MySQLdb.cursors'] = types.ModuleType("MySQLdb.cursors")
sys.modules['MySQLdb.cursors'].SSCursor = object
from plogical import mysqlUtilities
class DummyConnection:
def __init__(self, literal_side_effect=None):
self.literal_calls = []
self.literal_side_effect = literal_side_effect
self.closed = False
def literal(self, value):
if self.literal_side_effect:
raise self.literal_side_effect
self.literal_calls.append(value)
return f"'{value}'"
def close(self):
self.closed = True
class DummyCursor:
def __init__(self):
self.executed = []
self.fetchone_value = None
def execute(self, query, params=None):
self.executed.append((query, params))
def fetchone(self):
return self.fetchone_value
class MysqlUtilitiesChangePasswordTests(unittest.TestCase):
def test_numeric_password_is_coerced_and_sanitized(self):
connection = DummyConnection()
cursor = DummyCursor()
with mock.patch.object(mysqlUtilities.mysqlUtilities, 'setupConnection', return_value=(connection, cursor)), \
mock.patch.object(mysqlUtilities.mysqlUtilities, 'resolve_mysql_username', return_value='demo_user'), \
mock.patch('plogical.mysqlUtilities.os.path.exists', return_value=False):
result = mysqlUtilities.mysqlUtilities.changePassword('demo_user', 987654321)
self.assertEqual(result, 1)
self.assertIn('987654321', connection.literal_calls)
self.assertEqual(len(cursor.executed), 2) # USE mysql + password update
self.assertIn("PASSWORD('987654321')", cursor.executed[-1][0])
def test_logs_error_when_literal_fails(self):
connection = DummyConnection(literal_side_effect=ValueError("boom"))
cursor = DummyCursor()
with mock.patch.object(mysqlUtilities.mysqlUtilities, 'setupConnection', return_value=(connection, cursor)), \
mock.patch.object(mysqlUtilities.mysqlUtilities, 'resolve_mysql_username', return_value='demo_user'), \
mock.patch('plogical.mysqlUtilities.os.path.exists', return_value=False), \
mock.patch('plogical.mysqlUtilities.logging.CyberCPLogFileWriter.writeToFile') as log_mock:
result = mysqlUtilities.mysqlUtilities.changePassword('demo_user', 'secret')
self.assertEqual(result, 0)
log_calls = [call.args[0] for call in log_mock.call_args_list]
self.assertTrue(any('[mysqlUtilities.changePassword.literal]' in msg for msg in log_calls))
class MysqlUtilitiesResolveUserTests(unittest.TestCase):
def test_resolves_username_when_orm_missing(self):
class _StubManager:
def get(self, *args, **kwargs):
raise Exception("not found")
dbusers_stub = types.SimpleNamespace(objects=_StubManager())
databases_stub = types.SimpleNamespace(objects=_StubManager())
cursor = DummyCursor()
cursor.fetchone_value = None
with mock.patch('plogical.mysqlUtilities.DBUsers', dbusers_stub, create=True), \
mock.patch('plogical.mysqlUtilities.Databases', databases_stub, create=True):
resolved = mysqlUtilities.mysqlUtilities.resolve_mysql_username('mystery_user', cursor)
self.assertEqual(resolved, 'mystery_user')
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,102 @@
#!/usr/bin/env python3
"""
Lightweight regression test for the /imunifyav route.
Authenticates by injecting a valid CyberPanel session (using the first Administrator record),
requests both /imunifyav and /ImunifyAV, and records the HTTP status alongside the
imunify-antivirus service state.
"""
import json
import os
import subprocess
import sys
from datetime import datetime
from pathlib import Path
ROOT_DIR = Path(__file__).resolve().parents[2]
sys.path.append(str(ROOT_DIR))
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "CyberCP.settings")
import django # noqa: E402
django.setup()
from django.test import Client # noqa: E402
from loginSystem.models import Administrator # noqa: E402
LOG_PATH = ROOT_DIR / "test_logs" / "imunifyav_route.log"
LOG_PATH.parent.mkdir(parents=True, exist_ok=True)
def get_service_state():
try:
output = subprocess.check_output(
["systemctl", "is-active", "imunify-antivirus"],
stderr=subprocess.STDOUT,
timeout=10,
text=True,
).strip()
return output
except (FileNotFoundError, subprocess.CalledProcessError, subprocess.TimeoutExpired):
return "unavailable"
def perform_request(client, path):
try:
response = client.get(path, follow=True)
return response.status_code, None
except Exception as exc: # pragma: no cover - diagnostic logging
return None, str(exc)
def build_client():
admin = Administrator.objects.first()
if not admin:
return None, "No administrator records available to seed the session."
client = Client()
session = client.session
session["userID"] = admin.pk
session.save()
return client, None
def log_result(entry):
entry["timestamp"] = datetime.utcnow().isoformat() + "Z"
with open(LOG_PATH, "a", encoding="utf-8") as handle:
handle.write(json.dumps(entry, ensure_ascii=False) + "\n")
def main():
client, client_error = build_client()
service_state = get_service_state()
results = {
"module": "imunifyav_route_check",
"service_state": service_state,
"retry": 0,
"errors": [],
"responses": {},
}
if client is None:
results["errors"].append(client_error or "Unable to initialize Django test client.")
log_result(results)
print(json.dumps(results, indent=2))
sys.exit(1)
for path in ("/imunifyav/", "/ImunifyAV/"):
status_code, error = perform_request(client, path)
results["responses"][path] = status_code
if error:
results["errors"].append(f"{path}: {error}")
log_result(results)
print(json.dumps(results, indent=2))
if results["errors"]:
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -8,6 +8,7 @@ sys.path.append('/usr/local/CyberCP')
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "CyberCP.settings")
django.setup()
from django.http import HttpResponse
from django.db.models import Q
import json
from plogical.acl import ACLManager
import plogical.CyberCPLogFileWriter as logging
@@ -227,19 +228,26 @@ class DatabaseManager:
return ACLManager.loadErrorJson('changePasswordStatus', 0)
userName = data['dbUserName']
dbPassword = data['dbPassword']
dbPassword = str(data['dbPassword']) if data['dbPassword'] is not None else ''
db = Databases.objects.filter(dbUser=userName)
db_queryset = Databases.objects.filter(Q(dbUser=userName) | Q(dbName=userName))
if not db_queryset.exists():
data_ret = {'status': 0, 'changePasswordStatus': 0,
'error_message': "Database or database user could not be found."}
json_data = json.dumps(data_ret)
return HttpResponse(json_data)
database_obj = db_queryset.first()
admin = Administrator.objects.get(pk=userID)
if ACLManager.checkOwnership(db[0].website.domain, admin, currentACL) == 1:
if ACLManager.checkOwnership(database_obj.website.domain, admin, currentACL) == 1:
pass
else:
return ACLManager.loadErrorJson()
try:
meta = DBMeta.objects.get(database=db[0], key=DatabaseManager.REMOTE_ACCESS)
meta = DBMeta.objects.get(database=database_obj, key=DatabaseManager.REMOTE_ACCESS)
host = json.loads(meta.value)['remoteIP']
except:
host = None

View File

@@ -27,7 +27,7 @@
<p>{% trans "ImunifyAV is now integrated via their new API. You can manage Imunify by clicking below. You can use your server root credentials to access Imunify." %}</p>
<br>
<a target="_blank" href="/imunifyav">
<a target="_blank" href="{% url 'imunifyav_root' %}">
<button class="btn btn-primary">Access Now
</button>
</a>

View File

@@ -216,6 +216,34 @@ fix_missing_dependencies() {
print_status "$GREEN" "✅ Missing dependencies fixed"
}
ensure_imunify_av_assets() {
local package_manager=$1
print_status "$BLUE" "🔐 Ensuring ImunifyAV UI assets are installed..."
mkdir -p /etc/sysconfig/imunify360/generic 2>/dev/null || true
mkdir -p /usr/local/CyberCP/public/imunifyav 2>/dev/null || true
case $package_manager in
"yum"|"dnf")
if ! $package_manager install -y imunify-ui-generic imunify-antivirus >/dev/null 2>&1; then
print_status "$YELLOW" "⚠️ Unable to install Imunify packages via $package_manager (will continue)."
fi
;;
"apt")
apt-get update -y >/dev/null 2>&1 || true
if ! apt-get install -y imunify-antivirus >/dev/null 2>&1; then
print_status "$YELLOW" "⚠️ imunify-antivirus package not available in APT repositories."
fi
;;
esac
chown -R lscpd:lscpd /usr/local/CyberCP/public/imunifyav 2>/dev/null || true
chown -R lscpd:lscpd /etc/sysconfig/imunify360 2>/dev/null || true
print_status "$GREEN" "✅ ImunifyAV assets verified"
}
# Function to check service status
check_service_status() {
local service_name=$1
@@ -353,6 +381,9 @@ apply_cyberpanel_fixes() {
# Fix missing dependencies
fix_missing_dependencies "$package_manager"
# Ensure ImunifyAV UI assets are always present
ensure_imunify_av_assets "$package_manager"
print_status "$GREEN" "✅ All CyberPanel fixes applied successfully"
# Generate status summary

View File

@@ -921,15 +921,31 @@ password=%s
else:
LOCALHOST = mysqlUtilities.LOCALHOST
if encrypt == None:
try:
dbuser = DBUsers.objects.get(user=userName)
query = "SET PASSWORD FOR '" + userName + "'@'%s' = PASSWORD('" % (LOCALHOST) + dbPassword + "')"
except:
userName = mysqlUtilities.fetchuser(userName)
query = "SET PASSWORD FOR '" + userName + "'@'%s' = PASSWORD('" % (LOCALHOST) + dbPassword + "')"
password_value = '' if dbPassword is None else str(dbPassword)
resolved_user = mysqlUtilities.resolve_mysql_username(userName, cursor)
sanitized_user = mysqlUtilities._sanitize_mysql_identifier(resolved_user)
sanitized_host = mysqlUtilities._sanitize_mysql_identifier(LOCALHOST)
try:
literal_password = connection.literal(password_value)
if isinstance(literal_password, bytes):
literal_password = literal_password.decode('utf-8')
except BaseException as msg:
logging.CyberCPLogFileWriter.writeToFile('%s [mysqlUtilities.changePassword.literal]' % (str(msg)))
return 0
if encrypt is None:
query = "SET PASSWORD FOR '{user}'@'{host}' = PASSWORD({password})".format(
user=sanitized_user,
host=sanitized_host,
password=literal_password
)
else:
query = "SET PASSWORD FOR '" + userName + "'@'%s' = '" % (LOCALHOST) + dbPassword + "'"
query = "SET PASSWORD FOR '{user}'@'{host}' = {password}".format(
user=sanitized_user,
host=sanitized_host,
password=literal_password
)
if os.path.exists(ProcessUtilities.debugPath):
logging.CyberCPLogFileWriter.writeToFile(query)
@@ -983,6 +999,56 @@ password=%s
logging.CyberCPLogFileWriter.writeToFile(str(msg) + "[mysqlUtilities.fetchuser]")
return 0
@staticmethod
def _sanitize_mysql_identifier(value):
if value is None:
return ''
return str(value).replace("'", "''").strip()
@staticmethod
def resolve_mysql_username(identifier, cursor=None):
"""
Resolve the actual MySQL username backing the supplied identifier.
Handles DBUsers records, Databases ORM mappings and falls back to mysql.db lookups.
"""
candidate = '' if identifier is None else str(identifier).strip()
if len(candidate) == 0:
return candidate
try:
if DBUsers:
db_user = DBUsers.objects.get(user=candidate)
return db_user.user
except BaseException:
pass
try:
if Databases:
database = Databases.objects.get(dbUser=candidate)
return database.dbUser
except BaseException:
pass
try:
if Databases:
database = Databases.objects.get(dbName=candidate)
if database.dbUser:
return database.dbUser
except BaseException:
pass
if cursor is not None:
try:
cursor.execute("SELECT user FROM mysql.db WHERE db = %s LIMIT 1", (candidate,))
row = cursor.fetchone()
if row and row[0]:
return row[0]
except BaseException as msg:
logging.CyberCPLogFileWriter.writeToFile(str(msg) + "[mysqlUtilities.resolve_mysql_username]")
return candidate
@staticmethod
def allowRemoteAccess(dbName, userName, remoteIP):
try: