Clodoaldo Neto , , .
, Luca Fiaschi, . python3 psutil. , process.username process.cmdline process_iter() get_process_list().
Here is an example of a very slightly modified version of Luca Fiaschi code that works with python3 (requires psutil module). Hope this is at least basically correct!
import psutil
import psycopg2
import subprocess
import time
import os
SSH_HOST = "111.222.333.444"
SSH_USER = "user"
SSH_KEYFILE = "key.pem"
SSH_FOREIGN_PORT = 5432
SSH_INTERNAL_PORT = 5432
DB_HOST = "127.0.0.1"
DB_PORT = SSH_INTERNAL_PORT
DB_PASSWORD = "password"
DB_DATABASE = "postgres"
DB_USER = "user"
class SSHTunnel(object):
"""
A context manager implementation of an ssh tunnel opened from python
"""
def __init__(self, tunnel_command):
assert "-fN" in tunnel_command, "need to open the tunnel with -fN"
self._tunnel_command = tunnel_command
self._delay = 0.1
self.ssh_tunnel = None
def create_tunnel(self):
tunnel_cmd = self._tunnel_command
ssh_process = subprocess.Popen(tunnel_cmd, universal_newlines=True,
shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
stdin=subprocess.PIPE)
while True:
p = ssh_process.poll()
if p is not None: break
time.sleep(self._delay)
if p == 0:
current_username = psutil.Process(os.getpid()).username()
ssh_processes = [proc for proc in psutil.process_iter() if proc.cmdline() == tunnel_cmd.split() and proc.username() == current_username]
if len(ssh_processes) == 1:
self.ssh_tunnel = ssh_processes[0]
return ssh_processes[0]
else:
raise RuntimeError('multiple (or zero?) tunnel ssh processes found: ' + str(ssh_processes))
else:
raise RuntimeError('Error creating tunnel: ' + str(p) + ' :: ' + str(ssh_process.stdout.readlines()))
def release(self):
""" Get rid of the tunnel by killin the pid
"""
if self.ssh_tunnel:
self.ssh_tunnel.terminate()
def __enter__(self):
self.create_tunnel()
return self
def __exit__(self, type, value, traceback):
self.release()
def __del__(self):
self.release()
command = "ssh -i %s %s@%s -fNL %d:localhost:%d"\
% (SSH_KEYFILE, SSH_USER, SSH_HOST, SSH_INTERNAL_PORT, SSH_FOREIGN_PORT)
with SSHTunnel(command):
conn = psycopg2.connect(host = DB_HOST, password = DB_PASSWORD,
database = DB_DATABASE, user = DB_USER, port = DB_PORT)
curs = conn.cursor()
sql = "select * from table"
curs.execute(sql)
rows = curs.fetchall()
print(rows)
source
share