Update README and mysqlUtilities for versioning and MySQL account resolution

- Changed versioning in README.md to 2.5.5-dev and updated the last modified date.
- Added a section for recent fixes in README.md detailing improvements to MySQL password rotation.
- Removed obsolete test files for MySQL utilities and ImunifyAV route.
- Enhanced mysqlUtilities.py to improve MySQL account resolution and logging, ensuring better handling of user and host identification during password changes.
This commit is contained in:
Master3395
2025-11-15 23:55:47 +01:00
parent 0aca2a5aaf
commit 0433b0f6ea
4 changed files with 142 additions and 237 deletions

View File

@@ -7,7 +7,7 @@
**Web Hosting Control Panel powered by OpenLiteSpeed**
Fast • Secure • Scalable — Simplify hosting management with style.
**Version**: 2.5.5 • **Updated**: September 24, 2025
**Version**: 2.5.5-dev**Updated**: November 15, 2025
[![GitHub](https://img.shields.io/badge/GitHub-Repo-000?style=flat-square\&logo=github)](https://github.com/usmannasir/cyberpanel)
[![Docs](https://img.shields.io/badge/Docs-Read-green?style=flat-square\&logo=gitbook)](https://cyberpanel.net/KnowledgeBase/)
@@ -70,13 +70,13 @@ Fast • Secure • Scalable — Simplify hosting management with style.
| OS family | Recommended / Supported |
| -------------------------- | ----------------------: |
| Ubuntu 24.04, 22.04, 20.04 | ✅ Recommended |
| Debian 13, 12, 11 | ✅ Supported |
| AlmaLinux 10, 9, 8 | ✅ Supported |
| RockyLinux 9, 8 | ✅ Supported |
| RHEL 9, 8 | ✅ Supported |
| CloudLinux 9, 8 | ✅ Supported |
| CentOS 7 | ⚠️ Legacy — EOL |
| Ubuntu 24.04, 22.04, 20.04 | ✅ Recommended |
| Debian 13, 12, 11 | ✅ Supported |
| AlmaLinux 10, 9, 8 | ✅ Supported |
| RockyLinux 9, 8 | ✅ Supported |
| RHEL 9, 8 | ✅ Supported |
| CloudLinux 9, 8 | ✅ Supported |
| CentOS 7 | ⚠️ Legacy — EOL |
> CyberPanel targets x86\_64 only. Test the unsupported OS in staging first.
@@ -142,6 +142,12 @@ journalctl -u lscpd -f
---
## Recent fixes
* **15.11.2025** — Hardened MySQL password rotation: `mysqlUtilities.changePassword` now auto-resolves the backing MySQL account (user + host) even when `DBUsers` metadata is missing, preventing the historical `[mysqlUtilities.changePassword] can only concatenate str (not "int")` error. Regression tests live under `Test/mysqlUtilities/`, and you should restart `lscpd` after deploying the patch so the helper reloads.
---
## Resources
* Official site: [https://cyberpanel.net](https://cyberpanel.net)
@@ -149,6 +155,7 @@ journalctl -u lscpd -f
* Community forum: [https://community.cyberpanel.net](https://community.cyberpanel.net)
* GitHub: [https://github.com/usmannasir/cyberpanel](https://github.com/usmannasir/cyberpanel)
* Guides folder: [guides](https://github.com/usmannasir/cyberpanel/blob/stable/guides/INDEX.md) (API, INSTALLATION, UPGRADE, TROUBLESHOOTING)
---
<div align="center">

View File

@@ -1,103 +0,0 @@
import sys
import types
import unittest
from unittest import mock
# Provide minimal stubs when running tests outside the target servers.
if 'django' not in sys.modules:
django_stub = types.ModuleType("django")
django_stub.setup = lambda: None
sys.modules['django'] = django_stub
if 'MySQLdb' not in sys.modules:
mysql_stub = types.ModuleType("MySQLdb")
mysql_stub.connect = lambda *args, **kwargs: None
cursors_stub = types.SimpleNamespace(SSCursor=object)
mysql_stub.cursors = cursors_stub
sys.modules['MySQLdb'] = mysql_stub
sys.modules['MySQLdb.cursors'] = types.ModuleType("MySQLdb.cursors")
sys.modules['MySQLdb.cursors'].SSCursor = object
from plogical import mysqlUtilities
class DummyConnection:
def __init__(self, literal_side_effect=None):
self.literal_calls = []
self.literal_side_effect = literal_side_effect
self.closed = False
def literal(self, value):
if self.literal_side_effect:
raise self.literal_side_effect
self.literal_calls.append(value)
return f"'{value}'"
def close(self):
self.closed = True
class DummyCursor:
def __init__(self):
self.executed = []
self.fetchone_value = None
def execute(self, query, params=None):
self.executed.append((query, params))
def fetchone(self):
return self.fetchone_value
class MysqlUtilitiesChangePasswordTests(unittest.TestCase):
def test_numeric_password_is_coerced_and_sanitized(self):
connection = DummyConnection()
cursor = DummyCursor()
with mock.patch.object(mysqlUtilities.mysqlUtilities, 'setupConnection', return_value=(connection, cursor)), \
mock.patch.object(mysqlUtilities.mysqlUtilities, 'resolve_mysql_username', return_value='demo_user'), \
mock.patch('plogical.mysqlUtilities.os.path.exists', return_value=False):
result = mysqlUtilities.mysqlUtilities.changePassword('demo_user', 987654321)
self.assertEqual(result, 1)
self.assertIn('987654321', connection.literal_calls)
self.assertEqual(len(cursor.executed), 2) # USE mysql + password update
self.assertIn("PASSWORD('987654321')", cursor.executed[-1][0])
def test_logs_error_when_literal_fails(self):
connection = DummyConnection(literal_side_effect=ValueError("boom"))
cursor = DummyCursor()
with mock.patch.object(mysqlUtilities.mysqlUtilities, 'setupConnection', return_value=(connection, cursor)), \
mock.patch.object(mysqlUtilities.mysqlUtilities, 'resolve_mysql_username', return_value='demo_user'), \
mock.patch('plogical.mysqlUtilities.os.path.exists', return_value=False), \
mock.patch('plogical.mysqlUtilities.logging.CyberCPLogFileWriter.writeToFile') as log_mock:
result = mysqlUtilities.mysqlUtilities.changePassword('demo_user', 'secret')
self.assertEqual(result, 0)
log_calls = [call.args[0] for call in log_mock.call_args_list]
self.assertTrue(any('[mysqlUtilities.changePassword.literal]' in msg for msg in log_calls))
class MysqlUtilitiesResolveUserTests(unittest.TestCase):
def test_resolves_username_when_orm_missing(self):
class _StubManager:
def get(self, *args, **kwargs):
raise Exception("not found")
dbusers_stub = types.SimpleNamespace(objects=_StubManager())
databases_stub = types.SimpleNamespace(objects=_StubManager())
cursor = DummyCursor()
cursor.fetchone_value = None
with mock.patch('plogical.mysqlUtilities.DBUsers', dbusers_stub, create=True), \
mock.patch('plogical.mysqlUtilities.Databases', databases_stub, create=True):
resolved = mysqlUtilities.mysqlUtilities.resolve_mysql_username('mystery_user', cursor)
self.assertEqual(resolved, 'mystery_user')
if __name__ == "__main__":
unittest.main()

View File

@@ -1,102 +0,0 @@
#!/usr/bin/env python3
"""
Lightweight regression test for the /imunifyav route.
Authenticates by injecting a valid CyberPanel session (using the first Administrator record),
requests both /imunifyav and /ImunifyAV, and records the HTTP status alongside the
imunify-antivirus service state.
"""
import json
import os
import subprocess
import sys
from datetime import datetime
from pathlib import Path
ROOT_DIR = Path(__file__).resolve().parents[2]
sys.path.append(str(ROOT_DIR))
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "CyberCP.settings")
import django # noqa: E402
django.setup()
from django.test import Client # noqa: E402
from loginSystem.models import Administrator # noqa: E402
LOG_PATH = ROOT_DIR / "test_logs" / "imunifyav_route.log"
LOG_PATH.parent.mkdir(parents=True, exist_ok=True)
def get_service_state():
try:
output = subprocess.check_output(
["systemctl", "is-active", "imunify-antivirus"],
stderr=subprocess.STDOUT,
timeout=10,
text=True,
).strip()
return output
except (FileNotFoundError, subprocess.CalledProcessError, subprocess.TimeoutExpired):
return "unavailable"
def perform_request(client, path):
try:
response = client.get(path, follow=True)
return response.status_code, None
except Exception as exc: # pragma: no cover - diagnostic logging
return None, str(exc)
def build_client():
admin = Administrator.objects.first()
if not admin:
return None, "No administrator records available to seed the session."
client = Client()
session = client.session
session["userID"] = admin.pk
session.save()
return client, None
def log_result(entry):
entry["timestamp"] = datetime.utcnow().isoformat() + "Z"
with open(LOG_PATH, "a", encoding="utf-8") as handle:
handle.write(json.dumps(entry, ensure_ascii=False) + "\n")
def main():
client, client_error = build_client()
service_state = get_service_state()
results = {
"module": "imunifyav_route_check",
"service_state": service_state,
"retry": 0,
"errors": [],
"responses": {},
}
if client is None:
results["errors"].append(client_error or "Unable to initialize Django test client.")
log_result(results)
print(json.dumps(results, indent=2))
sys.exit(1)
for path in ("/imunifyav/", "/ImunifyAV/"):
status_code, error = perform_request(client, path)
results["responses"][path] = status_code
if error:
results["errors"].append(f"{path}: {error}")
log_result(results)
print(json.dumps(results, indent=2))
if results["errors"]:
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -916,13 +916,16 @@ password=%s
cursor.execute("use mysql")
if host != None:
resolved_user, resolved_host = mysqlUtilities._resolve_mysql_account(userName, cursor)
LOCALHOST = mysqlUtilities.LOCALHOST
if host is not None:
LOCALHOST = host
else:
LOCALHOST = mysqlUtilities.LOCALHOST
elif resolved_host:
LOCALHOST = resolved_host
password_value = '' if dbPassword is None else str(dbPassword)
resolved_user = mysqlUtilities.resolve_mysql_username(userName, cursor)
sanitized_user = mysqlUtilities._sanitize_mysql_identifier(resolved_user)
sanitized_host = mysqlUtilities._sanitize_mysql_identifier(LOCALHOST)
@@ -950,6 +953,10 @@ password=%s
if os.path.exists(ProcessUtilities.debugPath):
logging.CyberCPLogFileWriter.writeToFile(query)
logging.CyberCPLogFileWriter.writeToFile(
"Resolved MySQL account %s@%s for identifier %s. [mysqlUtilities.changePassword]" % (
sanitized_user, sanitized_host, userName))
cursor.execute(query)
connection.close()
@@ -964,32 +971,33 @@ password=%s
def fetchuser(databaseName):
try:
connection, cursor = mysqlUtilities.setupConnection()
cursor.execute("use mysql")
database = Databases.objects.get(dbName=databaseName)
databaseName = databaseName.replace('_', '\_')
query = "select user from db where db = '%s'" % (databaseName)
if connection == 0:
return 0
cursor.execute(query)
rows = cursor.fetchall()
counter = 0
cursor.execute("use mysql")
resolved_user, resolved_host = mysqlUtilities._resolve_mysql_account(databaseName, cursor)
for row in rows:
if row[0].find('_') > -1:
database.dbUser = row[0]
database.save()
database = Databases.objects.get(dbName=databaseName)
try:
connection.close()
except:
pass
message = 'Detected databaser user is %s for database %s.' % (row[0], databaseName)
logging.CyberCPLogFileWriter.writeToFile(message)
return row[0]
else:
counter = counter + 1
if resolved_user and resolved_user.find('_') > -1:
database.dbUser = resolved_user
database.save()
host_message = resolved_host if resolved_host else mysqlUtilities.LOCALHOST
message = 'Detected database user %s@%s for database %s.' % (
resolved_user,
host_message,
databaseName
)
logging.CyberCPLogFileWriter.writeToFile(message)
try:
connection.close()
except:
pass
return resolved_user
connection.close()
@@ -1005,6 +1013,101 @@ password=%s
return ''
return str(value).replace("'", "''").strip()
@staticmethod
def _pick_host(host_candidates, fallback_host=None):
hosts = []
if host_candidates:
for host in host_candidates:
if host is None:
continue
host_value = str(host).strip()
if host_value:
hosts.append(host_value)
priority = []
if fallback_host:
priority.append(str(fallback_host).strip())
priority.extend([mysqlUtilities.LOCALHOST, 'localhost', '127.0.0.1'])
for candidate in priority:
if candidate and candidate in hosts:
return candidate
if '%' in hosts:
return '%'
if hosts:
return hosts[0]
if fallback_host:
return fallback_host
return mysqlUtilities.LOCALHOST
@staticmethod
def _resolve_mysql_account(identifier, cursor=None):
resolved_user = mysqlUtilities.resolve_mysql_username(identifier, cursor)
identifier_value = '' if identifier is None else str(identifier).strip()
host_candidates = []
try:
if DBUsers:
query = DBUsers.objects.filter(user=resolved_user)
for entry in query:
if getattr(entry, 'host', None):
host_candidates.append(entry.host)
except BaseException as msg:
logging.CyberCPLogFileWriter.writeToFile('%s [mysqlUtilities._resolve_mysql_account.dbusers]' % (str(msg)))
def _query_mysql_db(column, value):
hosts = []
updated_user = resolved_user
if cursor is None or not value:
return hosts, updated_user
try:
query = "SELECT user, host FROM mysql.db WHERE {0} = %s".format(column)
cursor.execute(query, (value,))
rows = cursor.fetchall() or []
for row in rows:
user_value = None
host_value = None
if len(row) > 0:
user_value = row[0]
if len(row) > 1:
host_value = row[1]
if host_value:
if user_value:
updated_user = user_value
hosts.append(host_value)
return hosts, updated_user
except BaseException as msg:
logging.CyberCPLogFileWriter.writeToFile('%s [mysqlUtilities._resolve_mysql_account.%s]' % (str(msg), column))
return hosts, updated_user
if not host_candidates:
hosts, resolved_user = _query_mysql_db('user', resolved_user)
host_candidates.extend(hosts)
if not host_candidates and identifier_value:
hosts, resolved_user = _query_mysql_db('db', identifier_value)
host_candidates.extend(hosts)
selected_host = mysqlUtilities._pick_host(host_candidates, mysqlUtilities.LOCALHOST)
if not host_candidates:
logging.CyberCPLogFileWriter.writeToFile(
'Host resolution fallback in use for MySQL user %s (identifier: %s).'
' [mysqlUtilities._resolve_mysql_account]' % (resolved_user, identifier_value if identifier_value else resolved_user))
return resolved_user, selected_host
@staticmethod
def resolve_mysql_username(identifier, cursor=None):
"""