bug fix: potential issue with sftp destination add https://github.com/usmannasir/cyberpanel/issues/1210;

;
This commit is contained in:
usmannasir
2024-02-10 20:07:52 +05:00
parent c7d300418b
commit 9de6243c26

View File

@@ -1,6 +1,8 @@
import os
import sys
import paramiko
sys.path.append('/usr/local/CyberCP')
import django
@@ -9,7 +11,7 @@ try:
django.setup()
except:
pass
import pysftp
from plogical.randomPassword import generate_pass
import pexpect
from plogical import CyberCPLogFileWriter as logging
@@ -1147,143 +1149,222 @@ class backupUtilities:
except BaseException as msg:
logging.CyberCPLogFileWriter.writeToFile(str(msg) + " [initiateRestore]")
# @staticmethod
# def sendKey(IPAddress, password, port='22', user='root'):
# try:
#
# expectation = []
# expectation.append("password:")
# expectation.append("Password:")
# expectation.append("Permission denied")
# expectation.append("100%")
#
# ## Temp changes
#
# command = 'chmod 600 %s' % ('/root/.ssh/cyberpanel.pub')
# ProcessUtilities.executioner(command)
#
# command = "scp -o StrictHostKeyChecking=no -P " + port + " /root/.ssh/cyberpanel.pub " + user + "@" + IPAddress + ":~/.ssh/authorized_keys"
# setupKeys = pexpect.spawn(command, timeout=3)
#
# if os.path.exists(ProcessUtilities.debugPath):
# logging.CyberCPLogFileWriter.writeToFile(command)
#
# index = setupKeys.expect(expectation)
#
# ## on first login attempt send password
#
# if index == 0:
# setupKeys.sendline(password)
# setupKeys.expect("100%")
# setupKeys.wait()
# elif index == 1:
# setupKeys.sendline(password)
# setupKeys.expect("100%")
# setupKeys.wait()
# elif index == 2:
# return [0, 'Please enable password authentication on your remote server.']
# elif index == 3:
# pass
# else:
# raise BaseException
#
# ## Temp changes
#
# command = 'chmod 644 %s' % ('/root/.ssh/cyberpanel.pub')
# ProcessUtilities.executioner(command)
#
# return [1, "None"]
#
# except pexpect.TIMEOUT as msg:
#
# command = 'chmod 644 %s' % ('/root/.ssh/cyberpanel.pub')
# ProcessUtilities.executioner(command)
#
# logging.CyberCPLogFileWriter.writeToFile(str(msg) + " [sendKey]")
# return [0, "TIMEOUT [sendKey]"]
# except pexpect.EOF as msg:
#
# command = 'chmod 644 %s' % ('/root/.ssh/cyberpanel.pub')
# ProcessUtilities.executioner(command)
#
# logging.CyberCPLogFileWriter.writeToFile(str(msg) + " [sendKey]")
# return [0, "EOF [sendKey]"]
# except BaseException as msg:
#
# command = 'chmod 644 %s' % ('/root/.ssh/cyberpanel.pub')
# ProcessUtilities.executioner(command)
#
# logging.CyberCPLogFileWriter.writeToFile(str(msg) + " [sendKey]")
# return [0, str(msg) + " [sendKey]"]
# @staticmethod
# def setupSSHKeys(IPAddress, password, port='22', user='root'):
# try:
# ## Checking for host verification
#
# backupUtilities.host_key_verification(IPAddress)
#
# if backupUtilities.checkIfHostIsUp(IPAddress) == 1:
# pass
# else:
# logging.CyberCPLogFileWriter.writeToFile("Host is Down.")
# # return [0,"Host is Down."]
#
# expectation = []
# expectation.append("password:")
# expectation.append("Password:")
# expectation.append("Permission denied")
# expectation.append("File exists")
#
# command = "ssh -o StrictHostKeyChecking=no -p " + port + ' ' + user + "@" + IPAddress + ' "mkdir ~/.ssh || rm -f ~/.ssh/temp && rm -f ~/.ssh/authorized_temp && cp ~/.ssh/authorized_keys ~/.ssh/temp || chmod 700 ~/.ssh || chmod g-w ~"'
# setupKeys = pexpect.spawn(command, timeout=3)
#
# if os.path.exists(ProcessUtilities.debugPath):
# logging.CyberCPLogFileWriter.writeToFile(command)
#
# index = setupKeys.expect(expectation)
#
# ## on first login attempt send password
#
# if index == 0:
# setupKeys.sendline(password)
# elif index == 1:
# setupKeys.sendline(password)
# elif index == 2:
# return [0, 'Please enable password authentication on your remote server.']
# elif index == 3:
# pass
# else:
# raise BaseException
#
# ## if it again give you password, than provided password is wrong
#
# expectation = []
# expectation.append("please try again.")
# expectation.append("Password:")
# expectation.append(pexpect.EOF)
#
# index = setupKeys.expect(expectation)
#
# if index == 0:
# return [0, "Wrong Password!"]
# elif index == 1:
# return [0, "Wrong Password!"]
# elif index == 2:
# setupKeys.wait()
#
# sendKey = backupUtilities.sendKey(IPAddress, password, port, user)
#
# if sendKey[0] == 1:
# return [1, "None"]
# else:
# return [0, sendKey[1]]
#
#
# except pexpect.TIMEOUT as msg:
# return [0, str(msg) + " [TIMEOUT setupSSHKeys]"]
# except BaseException as msg:
# return [0, str(msg) + " [setupSSHKeys]"]
@staticmethod
def sendKey(IPAddress, password, port='22', user='root'):
try:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(IPAddress, port=int(port), username=user, password=password)
expectation = []
expectation.append("password:")
expectation.append("Password:")
expectation.append("Permission denied")
expectation.append("100%")
## Temp changes
if os.path.exists('/root/.ssh/cyberpanel.pub'):
pass
else:
command = "ssh-keygen -f /root/.ssh/cyberpanel -t rsa -N ''"
ProcessUtilities.executioner(command, 'root', True)
command = 'chmod 600 %s' % ('/root/.ssh/cyberpanel.pub')
ProcessUtilities.executioner(command)
command = "scp -o StrictHostKeyChecking=no -P " + port + " /root/.ssh/cyberpanel.pub " + user + "@" + IPAddress + ":~/.ssh/authorized_keys"
setupKeys = pexpect.spawn(command, timeout=3)
sftp = ssh.open_sftp()
sftp.put('/root/.ssh/cyberpanel.pub', '.ssh/authorized_keys')
sftp.close()
if os.path.exists(ProcessUtilities.debugPath):
logging.CyberCPLogFileWriter.writeToFile(command)
ssh.exec_command('chmod 600 .ssh/authorized_keys')
index = setupKeys.expect(expectation)
## on first login attempt send password
if index == 0:
setupKeys.sendline(password)
setupKeys.expect("100%")
setupKeys.wait()
elif index == 1:
setupKeys.sendline(password)
setupKeys.expect("100%")
setupKeys.wait()
elif index == 2:
return [0, 'Please enable password authentication on your remote server.']
elif index == 3:
pass
else:
raise BaseException
## Temp changes
ssh.close()
command = 'chmod 644 %s' % ('/root/.ssh/cyberpanel.pub')
ProcessUtilities.executioner(command)
return [1, "None"]
except pexpect.TIMEOUT as msg:
command = 'chmod 644 %s' % ('/root/.ssh/cyberpanel.pub')
ProcessUtilities.executioner(command)
logging.CyberCPLogFileWriter.writeToFile(str(msg) + " [sendKey]")
return [0, "TIMEOUT [sendKey]"]
except pexpect.EOF as msg:
command = 'chmod 644 %s' % ('/root/.ssh/cyberpanel.pub')
ProcessUtilities.executioner(command)
logging.CyberCPLogFileWriter.writeToFile(str(msg) + " [sendKey]")
return [0, "EOF [sendKey]"]
except BaseException as msg:
command = 'chmod 644 %s' % ('/root/.ssh/cyberpanel.pub')
ProcessUtilities.executioner(command)
logging.CyberCPLogFileWriter.writeToFile(str(msg) + " [sendKey]")
return [0, str(msg) + " [sendKey]"]
except paramiko.AuthenticationException:
return [0, 'Authentication failed. [sendKey]']
except paramiko.SSHException as e:
return [0, f'SSH error: {str(e)} [sendKey]']
except Exception as e:
return [0, f'General Error: {str(e)} [sendKey]']
@staticmethod
def setupSSHKeys(IPAddress, password, port='22', user='root'):
try:
## Checking for host verification
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(IPAddress, port=int(port), username=user, password=password)
backupUtilities.host_key_verification(IPAddress)
commands = [
"mkdir -p .ssh",
"rm -f .ssh/temp",
"rm -f .ssh/authorized_temp",
"cp .ssh/authorized_keys .ssh/temp",
"chmod 700 .ssh",
"chmod g-w ~",
]
if backupUtilities.checkIfHostIsUp(IPAddress) == 1:
pass
for command in commands:
try:
ssh.exec_command(command)
except BaseException as msg:
logging.CyberCPLogFileWriter.writeToFile(f'Error executing remote command {command}. Error {str(msg)}')
ssh.close()
sendKey = backupUtilities.sendKey(IPAddress, password, port, user)
if sendKey[0] == 1:
command = 'chmod 644 %s' % ('/root/.ssh/cyberpanel.pub')
ProcessUtilities.executioner(command)
return [1, "None"]
else:
logging.CyberCPLogFileWriter.writeToFile("Host is Down.")
# return [0,"Host is Down."]
command = 'chmod 644 %s' % ('/root/.ssh/cyberpanel.pub')
ProcessUtilities.executioner(command)
return [0, sendKey[1]]
expectation = []
expectation.append("password:")
expectation.append("Password:")
expectation.append("Permission denied")
expectation.append("File exists")
except paramiko.AuthenticationException:
return [0, 'Authentication failed. [setupSSHKeys]']
except paramiko.SSHException as e:
return [0, f'SSH error: {str(e)} [setupSSHKeys]']
except Exception as e:
return [0, f'General Error: {str(e)} [setupSSHKeys]']
command = "ssh -o StrictHostKeyChecking=no -p " + port + ' ' + user + "@" + IPAddress + ' "mkdir ~/.ssh || rm -f ~/.ssh/temp && rm -f ~/.ssh/authorized_temp && cp ~/.ssh/authorized_keys ~/.ssh/temp || chmod 700 ~/.ssh || chmod g-w ~"'
setupKeys = pexpect.spawn(command, timeout=3)
if os.path.exists(ProcessUtilities.debugPath):
logging.CyberCPLogFileWriter.writeToFile(command)
index = setupKeys.expect(expectation)
## on first login attempt send password
if index == 0:
setupKeys.sendline(password)
elif index == 1:
setupKeys.sendline(password)
elif index == 2:
return [0, 'Please enable password authentication on your remote server.']
elif index == 3:
pass
else:
raise BaseException
## if it again give you password, than provided password is wrong
expectation = []
expectation.append("please try again.")
expectation.append("Password:")
expectation.append(pexpect.EOF)
index = setupKeys.expect(expectation)
if index == 0:
return [0, "Wrong Password!"]
elif index == 1:
return [0, "Wrong Password!"]
elif index == 2:
setupKeys.wait()
sendKey = backupUtilities.sendKey(IPAddress, password, port, user)
if sendKey[0] == 1:
return [1, "None"]
else:
return [0, sendKey[1]]
except pexpect.TIMEOUT as msg:
return [0, str(msg) + " [TIMEOUT setupSSHKeys]"]
except BaseException as msg:
return [0, str(msg) + " [setupSSHKeys]"]
@staticmethod
def checkIfHostIsUp(IPAddress):