user level ssh terminal

This commit is contained in:
usmannasir
2025-05-18 13:43:51 +05:00
parent be923247c0
commit 9cd851c4e6
8 changed files with 773 additions and 50 deletions

View File

@@ -19,6 +19,10 @@ from .dockerviews import startContainer as docker_startContainer
from .dockerviews import stopContainer as docker_stopContainer
from .dockerviews import restartContainer as docker_restartContainer
from .resource_monitoring import get_website_resource_usage
import jwt
from datetime import datetime, timedelta
import OpenSSL
from plogical.processUtilities import ProcessUtilities
def loadWebsitesHome(request):
val = request.session['userID']
@@ -1454,13 +1458,63 @@ def prestaShopInstall(request):
def sshAccess(request, domain):
try:
# from plogical.CyberCPLogFileWriter import CyberCPLogFileWriter
# # Ensure FastAPI SSH server systemd service file is in place
# try:
# service_path = '/etc/systemd/system/fastapi_ssh_server.service'
# local_service_path = 'fastapi_ssh_server.service'
# check_service = ProcessUtilities.outputExecutioner(f'test -f {service_path} && echo exists || echo missing')
# if 'missing' in check_service:
# ProcessUtilities.outputExecutioner(f'cp /usr/local/CyberCP/fastapi_ssh_server.service {service_path}')
# ProcessUtilities.outputExecutioner('systemctl daemon-reload')
# except Exception as e:
# CyberCPLogFileWriter.writeLog(f"Failed to copy or reload fastapi_ssh_server.service: {e}")
# # Ensure FastAPI SSH server is running using ProcessUtilities
# try:
# ProcessUtilities.outputExecutioner('systemctl is-active --quiet fastapi_ssh_server')
# ProcessUtilities.outputExecutioner('systemctl enable --now fastapi_ssh_server')
# ProcessUtilities.outputExecutioner('systemctl start fastapi_ssh_server')
# except Exception as e:
# CyberCPLogFileWriter.writeLog(f"Failed to ensure fastapi_ssh_server is running: {e}")
# # Add-on check logic
# url = "https://platform.cyberpersons.com/CyberpanelAdOns/Adonpermission"
# data = {
# "name": "all",
# "IP": ACLManager.GetServerIP()
# }
# import requests
# import json
# try:
# response = requests.post(url, data=json.dumps(data))
# Status = response.json().get('status', 0)
# except Exception:
# Status = 0
# has_addons = (Status == 1) or ProcessUtilities.decideServer() == ProcessUtilities.ent
# from plogical.CyberCPLogFileWriter import CyberCPLogFileWriter
# CyberCPLogFileWriter.writeToFile(f"has_addons: {has_addons}")
# userID = request.session['userID']
# wm = WebsiteManager(domain)
# # SSL check
# cert_path = '/usr/local/lscp/conf/cert.pem'
# is_selfsigned = False
# ssl_issue_link = '/manageSSL/sslForHostName'
# try:
# cert_content = ProcessUtilities.outputExecutioner(f'cat {cert_path}')
# cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert_content)
# is_selfsigned = cert.get_issuer().der() == cert.get_subject().der()
# except Exception:
# is_selfsigned = True # If cert missing or unreadable, treat as self-signed
userID = request.session['userID']
wm = WebsiteManager(domain)
return wm.sshAccess(request, userID)
except KeyError:
return redirect(loadLoginPage)
def saveSSHAccessChanges(request):
try:
userID = request.session['userID']
@@ -1922,4 +1976,54 @@ def get_website_resources(request):
except BaseException as msg:
logging.CyberCPLogFileWriter.writeToFile(f'Error in get_website_resources: {str(msg)}')
return JsonResponse({'status': 0, 'error_message': str(msg)})
return JsonResponse({'status': 0, 'error_message': str(msg)})
@csrf_exempt
def get_terminal_jwt(request):
import logging
logger = logging.getLogger("cyberpanel.ssh.jwt")
try:
logger.error("get_terminal_jwt called")
logger.error(f"Request body: {request.body}")
data = json.loads(request.body)
domain = data.get('domain')
logger.error(f"Domain: {domain}")
if not domain:
logger.error("No domain provided")
return JsonResponse({'status': 0, 'error_message': 'Domain required'})
user_id = request.session.get('userID')
logger.error(f"User ID from session: {user_id}")
if not user_id:
logger.error("User not authenticated")
return JsonResponse({'status': 0, 'error_message': 'Not authenticated'})
from websiteFunctions.models import Websites
from plogical.acl import ACLManager
from loginSystem.models import Administrator
admin = Administrator.objects.get(pk=user_id)
currentACL = ACLManager.loadedACL(user_id)
if ACLManager.checkOwnership(domain, admin, currentACL) != 1:
logger.error("User not authorized for domain")
return JsonResponse({'status': 0, 'error_message': 'Not authorized'})
try:
website = Websites.objects.get(domain=domain)
except Websites.DoesNotExist:
logger.error("Website not found")
return JsonResponse({'status': 0, 'error_message': 'Website not found'})
ssh_user = website.externalApp
logger.error(f"SSH user: {ssh_user}")
if not ssh_user:
logger.error("SSH user is empty or not set for this website.")
return JsonResponse({'status': 0, 'error_message': 'SSH user not configured for this website.'})
from datetime import datetime, timedelta
import jwt as pyjwt
payload = {
'user_id': user_id,
'ssh_user': ssh_user,
'exp': datetime.utcnow() + timedelta(minutes=10)
}
token = pyjwt.encode(payload, 'YOUR_SECRET_KEY', algorithm='HS256')
logger.error(f"JWT generated: {token}")
return JsonResponse({'status': 1, 'token': token, 'ssh_user': ssh_user})
except Exception as e:
logger.error(f"Exception in get_terminal_jwt: {str(e)}")
return JsonResponse({'status': 0, 'error_message': str(e)})