diff --git a/CLManager/CageFS.py b/CLManager/CageFS.py index f5e2790ca..31b38a5bd 100644 --- a/CLManager/CageFS.py +++ b/CLManager/CageFS.py @@ -249,6 +249,30 @@ pattern_to_watch = ^/home/.+?/(public_html|public_ftp|private_html)(/.*)?$ except BaseException as msg: logging.CyberCPLogFileWriter.statusWriter(ServerStatusUtil.lswsInstallStatusPath, str(msg) + ' [404].', 1) + @staticmethod + def _ensure_imunifyav_assets(statusFile): + try: + commands = [ + 'mkdir -p /etc/sysconfig/imunify360/generic', + 'mkdir -p /usr/local/CyberCP/public/imunifyav', + 'chown -R lscpd:lscpd /usr/local/CyberCP/public/imunifyav 2>/dev/null || true', + 'chmod 755 /usr/local/CyberCP/public/imunifyav 2>/dev/null || true', + 'chown -R lscpd:lscpd /etc/sysconfig/imunify360 2>/dev/null || true' + ] + + for command in commands: + ServerStatusUtil.executioner(command, statusFile) + + if os.path.exists('/etc/redhat-release'): + pkg_cmd = 'yum install -y imunify-ui-generic imunify-antivirus || yum reinstall -y imunify-ui-generic' + else: + pkg_cmd = 'apt-get update -y >/dev/null 2>&1 && apt-get install -y imunify-antivirus || true' + + ServerStatusUtil.executioner(pkg_cmd, statusFile) + except BaseException as msg: + logging.CyberCPLogFileWriter.statusWriter(ServerStatusUtil.lswsInstallStatusPath, + f"ImunifyAV asset verification warning: {str(msg)}\n", 1) + @staticmethod def submitinstallImunifyAV(): try: @@ -327,6 +351,8 @@ ui_path_owner = lscpd:lscpd command = 'bash imav-deploy.sh --yes' ServerStatusUtil.executioner(command, statusFile) + CageFS._ensure_imunifyav_assets(statusFile) + logging.CyberCPLogFileWriter.statusWriter(ServerStatusUtil.lswsInstallStatusPath, "ImunifyAV reinstalled..\n", 1) diff --git a/CyberCP/urls.py b/CyberCP/urls.py index da7ab903a..9aebb6674 100644 --- a/CyberCP/urls.py +++ b/CyberCP/urls.py @@ -15,10 +15,13 @@ Including another URLconf """ from django.urls import path, re_path, include from django.contrib import admin +from firewall import views as firewall_views urlpatterns = [ path('base/', include('baseTemplate.urls')), path('', include('loginSystem.urls')), + path('imunifyav/', firewall_views.imunifyAV, name='imunifyav_root'), + path('ImunifyAV/', firewall_views.imunifyAV, name='imunifyav_root_legacy'), path('packages/', include('packages.urls')), path('websites/', include('websiteFunctions.urls')), path('tuning/', include('tuning.urls')), diff --git a/Test/databases/test_mysql_utilities.py b/Test/databases/test_mysql_utilities.py new file mode 100644 index 000000000..449268aed --- /dev/null +++ b/Test/databases/test_mysql_utilities.py @@ -0,0 +1,103 @@ +import sys +import types +import unittest +from unittest import mock + +# Provide minimal stubs when running tests outside the target servers. +if 'django' not in sys.modules: + django_stub = types.ModuleType("django") + django_stub.setup = lambda: None + sys.modules['django'] = django_stub + +if 'MySQLdb' not in sys.modules: + mysql_stub = types.ModuleType("MySQLdb") + mysql_stub.connect = lambda *args, **kwargs: None + cursors_stub = types.SimpleNamespace(SSCursor=object) + mysql_stub.cursors = cursors_stub + sys.modules['MySQLdb'] = mysql_stub + sys.modules['MySQLdb.cursors'] = types.ModuleType("MySQLdb.cursors") + sys.modules['MySQLdb.cursors'].SSCursor = object + +from plogical import mysqlUtilities + + +class DummyConnection: + def __init__(self, literal_side_effect=None): + self.literal_calls = [] + self.literal_side_effect = literal_side_effect + self.closed = False + + def literal(self, value): + if self.literal_side_effect: + raise self.literal_side_effect + self.literal_calls.append(value) + return f"'{value}'" + + def close(self): + self.closed = True + + +class DummyCursor: + def __init__(self): + self.executed = [] + self.fetchone_value = None + + def execute(self, query, params=None): + self.executed.append((query, params)) + + def fetchone(self): + return self.fetchone_value + + +class MysqlUtilitiesChangePasswordTests(unittest.TestCase): + def test_numeric_password_is_coerced_and_sanitized(self): + connection = DummyConnection() + cursor = DummyCursor() + + with mock.patch.object(mysqlUtilities.mysqlUtilities, 'setupConnection', return_value=(connection, cursor)), \ + mock.patch.object(mysqlUtilities.mysqlUtilities, 'resolve_mysql_username', return_value='demo_user'), \ + mock.patch('plogical.mysqlUtilities.os.path.exists', return_value=False): + result = mysqlUtilities.mysqlUtilities.changePassword('demo_user', 987654321) + + self.assertEqual(result, 1) + self.assertIn('987654321', connection.literal_calls) + self.assertEqual(len(cursor.executed), 2) # USE mysql + password update + self.assertIn("PASSWORD('987654321')", cursor.executed[-1][0]) + + def test_logs_error_when_literal_fails(self): + connection = DummyConnection(literal_side_effect=ValueError("boom")) + cursor = DummyCursor() + + with mock.patch.object(mysqlUtilities.mysqlUtilities, 'setupConnection', return_value=(connection, cursor)), \ + mock.patch.object(mysqlUtilities.mysqlUtilities, 'resolve_mysql_username', return_value='demo_user'), \ + mock.patch('plogical.mysqlUtilities.os.path.exists', return_value=False), \ + mock.patch('plogical.mysqlUtilities.logging.CyberCPLogFileWriter.writeToFile') as log_mock: + result = mysqlUtilities.mysqlUtilities.changePassword('demo_user', 'secret') + + self.assertEqual(result, 0) + log_calls = [call.args[0] for call in log_mock.call_args_list] + self.assertTrue(any('[mysqlUtilities.changePassword.literal]' in msg for msg in log_calls)) + + +class MysqlUtilitiesResolveUserTests(unittest.TestCase): + def test_resolves_username_when_orm_missing(self): + class _StubManager: + def get(self, *args, **kwargs): + raise Exception("not found") + + dbusers_stub = types.SimpleNamespace(objects=_StubManager()) + databases_stub = types.SimpleNamespace(objects=_StubManager()) + + cursor = DummyCursor() + cursor.fetchone_value = None + + with mock.patch('plogical.mysqlUtilities.DBUsers', dbusers_stub, create=True), \ + mock.patch('plogical.mysqlUtilities.Databases', databases_stub, create=True): + resolved = mysqlUtilities.mysqlUtilities.resolve_mysql_username('mystery_user', cursor) + + self.assertEqual(resolved, 'mystery_user') + + +if __name__ == "__main__": + unittest.main() + diff --git a/Test/install/check_imunifyav_route.py b/Test/install/check_imunifyav_route.py new file mode 100644 index 000000000..5fefcdbd1 --- /dev/null +++ b/Test/install/check_imunifyav_route.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python3 +""" +Lightweight regression test for the /imunifyav route. + +Authenticates by injecting a valid CyberPanel session (using the first Administrator record), +requests both /imunifyav and /ImunifyAV, and records the HTTP status alongside the +imunify-antivirus service state. +""" +import json +import os +import subprocess +import sys +from datetime import datetime +from pathlib import Path + +ROOT_DIR = Path(__file__).resolve().parents[2] +sys.path.append(str(ROOT_DIR)) + +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "CyberCP.settings") + +import django # noqa: E402 +django.setup() + +from django.test import Client # noqa: E402 +from loginSystem.models import Administrator # noqa: E402 + +LOG_PATH = ROOT_DIR / "test_logs" / "imunifyav_route.log" +LOG_PATH.parent.mkdir(parents=True, exist_ok=True) + + +def get_service_state(): + try: + output = subprocess.check_output( + ["systemctl", "is-active", "imunify-antivirus"], + stderr=subprocess.STDOUT, + timeout=10, + text=True, + ).strip() + return output + except (FileNotFoundError, subprocess.CalledProcessError, subprocess.TimeoutExpired): + return "unavailable" + + +def perform_request(client, path): + try: + response = client.get(path, follow=True) + return response.status_code, None + except Exception as exc: # pragma: no cover - diagnostic logging + return None, str(exc) + + +def build_client(): + admin = Administrator.objects.first() + if not admin: + return None, "No administrator records available to seed the session." + + client = Client() + session = client.session + session["userID"] = admin.pk + session.save() + return client, None + + +def log_result(entry): + entry["timestamp"] = datetime.utcnow().isoformat() + "Z" + with open(LOG_PATH, "a", encoding="utf-8") as handle: + handle.write(json.dumps(entry, ensure_ascii=False) + "\n") + + +def main(): + client, client_error = build_client() + service_state = get_service_state() + results = { + "module": "imunifyav_route_check", + "service_state": service_state, + "retry": 0, + "errors": [], + "responses": {}, + } + + if client is None: + results["errors"].append(client_error or "Unable to initialize Django test client.") + log_result(results) + print(json.dumps(results, indent=2)) + sys.exit(1) + + for path in ("/imunifyav/", "/ImunifyAV/"): + status_code, error = perform_request(client, path) + results["responses"][path] = status_code + if error: + results["errors"].append(f"{path}: {error}") + + log_result(results) + print(json.dumps(results, indent=2)) + + if results["errors"]: + sys.exit(1) + + +if __name__ == "__main__": + main() + diff --git a/databases/databaseManager.py b/databases/databaseManager.py index 0f09ea0cc..5bf124fec 100644 --- a/databases/databaseManager.py +++ b/databases/databaseManager.py @@ -8,6 +8,7 @@ sys.path.append('/usr/local/CyberCP') os.environ.setdefault("DJANGO_SETTINGS_MODULE", "CyberCP.settings") django.setup() from django.http import HttpResponse +from django.db.models import Q import json from plogical.acl import ACLManager import plogical.CyberCPLogFileWriter as logging @@ -227,19 +228,26 @@ class DatabaseManager: return ACLManager.loadErrorJson('changePasswordStatus', 0) userName = data['dbUserName'] - dbPassword = data['dbPassword'] + dbPassword = str(data['dbPassword']) if data['dbPassword'] is not None else '' - db = Databases.objects.filter(dbUser=userName) + db_queryset = Databases.objects.filter(Q(dbUser=userName) | Q(dbName=userName)) + if not db_queryset.exists(): + data_ret = {'status': 0, 'changePasswordStatus': 0, + 'error_message': "Database or database user could not be found."} + json_data = json.dumps(data_ret) + return HttpResponse(json_data) + + database_obj = db_queryset.first() admin = Administrator.objects.get(pk=userID) - if ACLManager.checkOwnership(db[0].website.domain, admin, currentACL) == 1: + if ACLManager.checkOwnership(database_obj.website.domain, admin, currentACL) == 1: pass else: return ACLManager.loadErrorJson() try: - meta = DBMeta.objects.get(database=db[0], key=DatabaseManager.REMOTE_ACCESS) + meta = DBMeta.objects.get(database=database_obj, key=DatabaseManager.REMOTE_ACCESS) host = json.loads(meta.value)['remoteIP'] except: host = None diff --git a/firewall/templates/firewall/imunifyAV.html b/firewall/templates/firewall/imunifyAV.html index 1c5b61fe8..e90941412 100644 --- a/firewall/templates/firewall/imunifyAV.html +++ b/firewall/templates/firewall/imunifyAV.html @@ -27,7 +27,7 @@
{% trans "ImunifyAV is now integrated via their new API. You can manage Imunify by clicking below. You can use your server root credentials to access Imunify." %}