Merge pull request #1607 from master3395/v2.5.5-dev

Update README and mysqlUtilities for MySQL account resolution

- Added a section in README.md detailing recent fixes, including enhancements to MySQL password rotation to prevent errors when metadata is missing.
- Removed outdated test files for MySQL utilities and ImunifyAV route checks.
- Improved mysqlUtilities.py to resolve MySQL accounts more reliably, including better handling of user and host resolution, and added logging for account resolution processes.

These changes enhance the robustness and clarity of MySQL user management within the CyberPanel environment.
This commit is contained in:
Master3395
2025-11-15 23:56:24 +01:00
committed by GitHub
4 changed files with 142 additions and 237 deletions

View File

@@ -7,7 +7,7 @@
**Web Hosting Control Panel powered by OpenLiteSpeed**
Fast • Secure • Scalable — Simplify hosting management with style.
**Version**: 2.5.5 • **Updated**: September 24, 2025
**Version**: 2.5.5-dev**Updated**: November 15, 2025
[![GitHub](https://img.shields.io/badge/GitHub-Repo-000?style=flat-square\&logo=github)](https://github.com/usmannasir/cyberpanel)
[![Docs](https://img.shields.io/badge/Docs-Read-green?style=flat-square\&logo=gitbook)](https://cyberpanel.net/KnowledgeBase/)
@@ -70,13 +70,13 @@ Fast • Secure • Scalable — Simplify hosting management with style.
| OS family | Recommended / Supported |
| -------------------------- | ----------------------: |
| Ubuntu 24.04, 22.04, 20.04 | ✅ Recommended |
| Debian 13, 12, 11 | ✅ Supported |
| AlmaLinux 10, 9, 8 | ✅ Supported |
| RockyLinux 9, 8 | ✅ Supported |
| RHEL 9, 8 | ✅ Supported |
| CloudLinux 9, 8 | ✅ Supported |
| CentOS 7 | ⚠️ Legacy — EOL |
| Ubuntu 24.04, 22.04, 20.04 | ✅ Recommended |
| Debian 13, 12, 11 | ✅ Supported |
| AlmaLinux 10, 9, 8 | ✅ Supported |
| RockyLinux 9, 8 | ✅ Supported |
| RHEL 9, 8 | ✅ Supported |
| CloudLinux 9, 8 | ✅ Supported |
| CentOS 7 | ⚠️ Legacy — EOL |
> CyberPanel targets x86\_64 only. Test the unsupported OS in staging first.
@@ -142,6 +142,12 @@ journalctl -u lscpd -f
---
## Recent fixes
* **15.11.2025** — Hardened MySQL password rotation: `mysqlUtilities.changePassword` now auto-resolves the backing MySQL account (user + host) even when `DBUsers` metadata is missing, preventing the historical `[mysqlUtilities.changePassword] can only concatenate str (not "int")` error. Regression tests live under `Test/mysqlUtilities/`, and you should restart `lscpd` after deploying the patch so the helper reloads.
---
## Resources
* Official site: [https://cyberpanel.net](https://cyberpanel.net)
@@ -149,6 +155,7 @@ journalctl -u lscpd -f
* Community forum: [https://community.cyberpanel.net](https://community.cyberpanel.net)
* GitHub: [https://github.com/usmannasir/cyberpanel](https://github.com/usmannasir/cyberpanel)
* Guides folder: [guides](https://github.com/usmannasir/cyberpanel/blob/stable/guides/INDEX.md) (API, INSTALLATION, UPGRADE, TROUBLESHOOTING)
---
<div align="center">

View File

@@ -1,103 +0,0 @@
import sys
import types
import unittest
from unittest import mock
# Provide minimal stubs when running tests outside the target servers.
if 'django' not in sys.modules:
django_stub = types.ModuleType("django")
django_stub.setup = lambda: None
sys.modules['django'] = django_stub
if 'MySQLdb' not in sys.modules:
mysql_stub = types.ModuleType("MySQLdb")
mysql_stub.connect = lambda *args, **kwargs: None
cursors_stub = types.SimpleNamespace(SSCursor=object)
mysql_stub.cursors = cursors_stub
sys.modules['MySQLdb'] = mysql_stub
sys.modules['MySQLdb.cursors'] = types.ModuleType("MySQLdb.cursors")
sys.modules['MySQLdb.cursors'].SSCursor = object
from plogical import mysqlUtilities
class DummyConnection:
def __init__(self, literal_side_effect=None):
self.literal_calls = []
self.literal_side_effect = literal_side_effect
self.closed = False
def literal(self, value):
if self.literal_side_effect:
raise self.literal_side_effect
self.literal_calls.append(value)
return f"'{value}'"
def close(self):
self.closed = True
class DummyCursor:
def __init__(self):
self.executed = []
self.fetchone_value = None
def execute(self, query, params=None):
self.executed.append((query, params))
def fetchone(self):
return self.fetchone_value
class MysqlUtilitiesChangePasswordTests(unittest.TestCase):
def test_numeric_password_is_coerced_and_sanitized(self):
connection = DummyConnection()
cursor = DummyCursor()
with mock.patch.object(mysqlUtilities.mysqlUtilities, 'setupConnection', return_value=(connection, cursor)), \
mock.patch.object(mysqlUtilities.mysqlUtilities, 'resolve_mysql_username', return_value='demo_user'), \
mock.patch('plogical.mysqlUtilities.os.path.exists', return_value=False):
result = mysqlUtilities.mysqlUtilities.changePassword('demo_user', 987654321)
self.assertEqual(result, 1)
self.assertIn('987654321', connection.literal_calls)
self.assertEqual(len(cursor.executed), 2) # USE mysql + password update
self.assertIn("PASSWORD('987654321')", cursor.executed[-1][0])
def test_logs_error_when_literal_fails(self):
connection = DummyConnection(literal_side_effect=ValueError("boom"))
cursor = DummyCursor()
with mock.patch.object(mysqlUtilities.mysqlUtilities, 'setupConnection', return_value=(connection, cursor)), \
mock.patch.object(mysqlUtilities.mysqlUtilities, 'resolve_mysql_username', return_value='demo_user'), \
mock.patch('plogical.mysqlUtilities.os.path.exists', return_value=False), \
mock.patch('plogical.mysqlUtilities.logging.CyberCPLogFileWriter.writeToFile') as log_mock:
result = mysqlUtilities.mysqlUtilities.changePassword('demo_user', 'secret')
self.assertEqual(result, 0)
log_calls = [call.args[0] for call in log_mock.call_args_list]
self.assertTrue(any('[mysqlUtilities.changePassword.literal]' in msg for msg in log_calls))
class MysqlUtilitiesResolveUserTests(unittest.TestCase):
def test_resolves_username_when_orm_missing(self):
class _StubManager:
def get(self, *args, **kwargs):
raise Exception("not found")
dbusers_stub = types.SimpleNamespace(objects=_StubManager())
databases_stub = types.SimpleNamespace(objects=_StubManager())
cursor = DummyCursor()
cursor.fetchone_value = None
with mock.patch('plogical.mysqlUtilities.DBUsers', dbusers_stub, create=True), \
mock.patch('plogical.mysqlUtilities.Databases', databases_stub, create=True):
resolved = mysqlUtilities.mysqlUtilities.resolve_mysql_username('mystery_user', cursor)
self.assertEqual(resolved, 'mystery_user')
if __name__ == "__main__":
unittest.main()

View File

@@ -1,102 +0,0 @@
#!/usr/bin/env python3
"""
Lightweight regression test for the /imunifyav route.
Authenticates by injecting a valid CyberPanel session (using the first Administrator record),
requests both /imunifyav and /ImunifyAV, and records the HTTP status alongside the
imunify-antivirus service state.
"""
import json
import os
import subprocess
import sys
from datetime import datetime
from pathlib import Path
ROOT_DIR = Path(__file__).resolve().parents[2]
sys.path.append(str(ROOT_DIR))
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "CyberCP.settings")
import django # noqa: E402
django.setup()
from django.test import Client # noqa: E402
from loginSystem.models import Administrator # noqa: E402
LOG_PATH = ROOT_DIR / "test_logs" / "imunifyav_route.log"
LOG_PATH.parent.mkdir(parents=True, exist_ok=True)
def get_service_state():
try:
output = subprocess.check_output(
["systemctl", "is-active", "imunify-antivirus"],
stderr=subprocess.STDOUT,
timeout=10,
text=True,
).strip()
return output
except (FileNotFoundError, subprocess.CalledProcessError, subprocess.TimeoutExpired):
return "unavailable"
def perform_request(client, path):
try:
response = client.get(path, follow=True)
return response.status_code, None
except Exception as exc: # pragma: no cover - diagnostic logging
return None, str(exc)
def build_client():
admin = Administrator.objects.first()
if not admin:
return None, "No administrator records available to seed the session."
client = Client()
session = client.session
session["userID"] = admin.pk
session.save()
return client, None
def log_result(entry):
entry["timestamp"] = datetime.utcnow().isoformat() + "Z"
with open(LOG_PATH, "a", encoding="utf-8") as handle:
handle.write(json.dumps(entry, ensure_ascii=False) + "\n")
def main():
client, client_error = build_client()
service_state = get_service_state()
results = {
"module": "imunifyav_route_check",
"service_state": service_state,
"retry": 0,
"errors": [],
"responses": {},
}
if client is None:
results["errors"].append(client_error or "Unable to initialize Django test client.")
log_result(results)
print(json.dumps(results, indent=2))
sys.exit(1)
for path in ("/imunifyav/", "/ImunifyAV/"):
status_code, error = perform_request(client, path)
results["responses"][path] = status_code
if error:
results["errors"].append(f"{path}: {error}")
log_result(results)
print(json.dumps(results, indent=2))
if results["errors"]:
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -916,13 +916,16 @@ password=%s
cursor.execute("use mysql")
if host != None:
resolved_user, resolved_host = mysqlUtilities._resolve_mysql_account(userName, cursor)
LOCALHOST = mysqlUtilities.LOCALHOST
if host is not None:
LOCALHOST = host
else:
LOCALHOST = mysqlUtilities.LOCALHOST
elif resolved_host:
LOCALHOST = resolved_host
password_value = '' if dbPassword is None else str(dbPassword)
resolved_user = mysqlUtilities.resolve_mysql_username(userName, cursor)
sanitized_user = mysqlUtilities._sanitize_mysql_identifier(resolved_user)
sanitized_host = mysqlUtilities._sanitize_mysql_identifier(LOCALHOST)
@@ -950,6 +953,10 @@ password=%s
if os.path.exists(ProcessUtilities.debugPath):
logging.CyberCPLogFileWriter.writeToFile(query)
logging.CyberCPLogFileWriter.writeToFile(
"Resolved MySQL account %s@%s for identifier %s. [mysqlUtilities.changePassword]" % (
sanitized_user, sanitized_host, userName))
cursor.execute(query)
connection.close()
@@ -964,32 +971,33 @@ password=%s
def fetchuser(databaseName):
try:
connection, cursor = mysqlUtilities.setupConnection()
cursor.execute("use mysql")
database = Databases.objects.get(dbName=databaseName)
databaseName = databaseName.replace('_', '\_')
query = "select user from db where db = '%s'" % (databaseName)
if connection == 0:
return 0
cursor.execute(query)
rows = cursor.fetchall()
counter = 0
cursor.execute("use mysql")
resolved_user, resolved_host = mysqlUtilities._resolve_mysql_account(databaseName, cursor)
for row in rows:
if row[0].find('_') > -1:
database.dbUser = row[0]
database.save()
database = Databases.objects.get(dbName=databaseName)
try:
connection.close()
except:
pass
message = 'Detected databaser user is %s for database %s.' % (row[0], databaseName)
logging.CyberCPLogFileWriter.writeToFile(message)
return row[0]
else:
counter = counter + 1
if resolved_user and resolved_user.find('_') > -1:
database.dbUser = resolved_user
database.save()
host_message = resolved_host if resolved_host else mysqlUtilities.LOCALHOST
message = 'Detected database user %s@%s for database %s.' % (
resolved_user,
host_message,
databaseName
)
logging.CyberCPLogFileWriter.writeToFile(message)
try:
connection.close()
except:
pass
return resolved_user
connection.close()
@@ -1005,6 +1013,101 @@ password=%s
return ''
return str(value).replace("'", "''").strip()
@staticmethod
def _pick_host(host_candidates, fallback_host=None):
hosts = []
if host_candidates:
for host in host_candidates:
if host is None:
continue
host_value = str(host).strip()
if host_value:
hosts.append(host_value)
priority = []
if fallback_host:
priority.append(str(fallback_host).strip())
priority.extend([mysqlUtilities.LOCALHOST, 'localhost', '127.0.0.1'])
for candidate in priority:
if candidate and candidate in hosts:
return candidate
if '%' in hosts:
return '%'
if hosts:
return hosts[0]
if fallback_host:
return fallback_host
return mysqlUtilities.LOCALHOST
@staticmethod
def _resolve_mysql_account(identifier, cursor=None):
resolved_user = mysqlUtilities.resolve_mysql_username(identifier, cursor)
identifier_value = '' if identifier is None else str(identifier).strip()
host_candidates = []
try:
if DBUsers:
query = DBUsers.objects.filter(user=resolved_user)
for entry in query:
if getattr(entry, 'host', None):
host_candidates.append(entry.host)
except BaseException as msg:
logging.CyberCPLogFileWriter.writeToFile('%s [mysqlUtilities._resolve_mysql_account.dbusers]' % (str(msg)))
def _query_mysql_db(column, value):
hosts = []
updated_user = resolved_user
if cursor is None or not value:
return hosts, updated_user
try:
query = "SELECT user, host FROM mysql.db WHERE {0} = %s".format(column)
cursor.execute(query, (value,))
rows = cursor.fetchall() or []
for row in rows:
user_value = None
host_value = None
if len(row) > 0:
user_value = row[0]
if len(row) > 1:
host_value = row[1]
if host_value:
if user_value:
updated_user = user_value
hosts.append(host_value)
return hosts, updated_user
except BaseException as msg:
logging.CyberCPLogFileWriter.writeToFile('%s [mysqlUtilities._resolve_mysql_account.%s]' % (str(msg), column))
return hosts, updated_user
if not host_candidates:
hosts, resolved_user = _query_mysql_db('user', resolved_user)
host_candidates.extend(hosts)
if not host_candidates and identifier_value:
hosts, resolved_user = _query_mysql_db('db', identifier_value)
host_candidates.extend(hosts)
selected_host = mysqlUtilities._pick_host(host_candidates, mysqlUtilities.LOCALHOST)
if not host_candidates:
logging.CyberCPLogFileWriter.writeToFile(
'Host resolution fallback in use for MySQL user %s (identifier: %s).'
' [mysqlUtilities._resolve_mysql_account]' % (resolved_user, identifier_value if identifier_value else resolved_user))
return resolved_user, selected_host
@staticmethod
def resolve_mysql_username(identifier, cursor=None):
"""