1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 """\
21 X2GoTelekinesisClient class - Connect to Telekinesis Server on X2Go Server.
22
23 """
24 __NAME__ = 'x2gotelekinesisclient-pylib'
25
26
27 import gevent
28 import os
29 import copy
30 import threading
31 import socket
32
33
34 import x2go.forward as forward
35 import x2go.log as log
36 import x2go.utils as utils
37 import x2go.x2go_exceptions as x2go_exceptions
38
39 from x2go.defaults import X2GOCLIENT_OS as _X2GOCLIENT_OS
40 if _X2GOCLIENT_OS in ("Windows"):
41 import subprocess
42 else:
43 import x2go.gevent_subprocess as subprocess
44
45 from x2go.defaults import LOCAL_HOME as _LOCAL_HOME
46 from x2go.defaults import X2GO_SESSIONS_ROOTDIR as _X2GO_SESSIONS_ROOTDIR
47 from x2go.defaults import CURRENT_LOCAL_USER as _CURRENT_LOCAL_USER
48
49
51 """\
52 Telekinesis is a communication framework used by X2Go.
53
54 This class implements the startup of the telekinesis client used by
55 Python X2Go.
56
57 """
58 TEKICLIENT_CMD = 'telekinesis-client'
59 """Telekinesis client command. Might be OS specific."""
60 TEKICLIENT_ARGS = ['-setWORMHOLEPORT={port}', '-setX2GOSID={sid}', ]
61 """Arguments to be passed to the Telekinesis client."""
62 TEKICLIENT_ENV = {}
63 """Provide environment variables to the Telekinesis client command."""
64
65 - def __init__(self, session_info=None,
66 ssh_transport=None,
67 sessions_rootdir=os.path.join(_LOCAL_HOME, _X2GO_SESSIONS_ROOTDIR),
68 session_instance=None,
69 logger=None, loglevel=log.loglevel_DEFAULT, ):
70 """\
71 @param session_info: session information provided as an C{X2GoServerSessionInfo*} backend
72 instance
73 @type session_info: C{X2GoServerSessionInfo*} instance
74 @param ssh_transport: SSH transport object from C{paramiko.SSHClient}
75 @type ssh_transport: C{paramiko.Transport} instance
76 @param sessions_rootdir: base dir where X2Go session files are stored (by default: ~/.x2go)
77 @type sessions_rootdir: C{str}
78 @param logger: you can pass an L{X2GoLogger} object to the
79 L{X2GoTelekinesisClient} constructor
80 @param session_instance: the L{X2GoSession} instance this C{X2GoProxy*} instance belongs to
81 @type session_instance: L{X2GoSession} instance
82 @type logger: L{X2GoLogger} instance
83 @param loglevel: if no L{X2GoLogger} object has been supplied a new one will be
84 constructed with the given loglevel
85 @type loglevel: int
86
87 """
88 self.tekiclient_log_stdout = None
89 self.tekiclient_log_stderr = None
90 self.tekiclient_datalog_stdout = None
91 self.tekiclient_datalog_stderr = None
92 self.fw_ctrl_tunnel = None
93 self.fw_data_tunnel = None
94 self.telekinesis_client = None
95 self.telekinesis_sshfs = None
96
97 if ssh_transport is None:
98
99 raise x2go_exceptions.X2GoTelekinesisClientException('SSH transport not available')
100
101 if session_instance is None:
102
103 raise x2go_exceptions.X2GoTelekinesisClientException('X2GoSession instance not available')
104
105 if logger is None:
106 self.logger = log.X2GoLogger(loglevel=loglevel)
107 else:
108 self.logger = copy.deepcopy(logger)
109 self.logger.tag = __NAME__
110
111 if self.logger.get_loglevel() & log.loglevel_DEBUG:
112 self.TEKICLIENT_ARGS.extend(['-setDEBUG=1',])
113
114 self.sessions_rootdir = sessions_rootdir
115 self.session_info = session_info
116 self.session_name = self.session_info.name
117 self.ssh_transport = ssh_transport
118 self.session_instance = session_instance
119 self.tekiclient = None
120 self.tekiclient_log = 'telekinesis-client.log'
121 self.tekiclient_datalog = 'telekinesis-client-sshfs.log'
122 self.TEKICLIENT_ENV = os.environ.copy()
123 self.local_tekictrl_port = self.session_info.tekictrl_port
124 self.local_tekidata_port = self.session_info.tekidata_port
125
126 threading.Thread.__init__(self)
127 self.daemon = True
128
130 """\
131 On instance destruction make sure this telekinesis client thread is stopped properly.
132
133 """
134 self.stop_thread()
135
137 """\
138 Test if the Telekinesis client command is installed on this machine.
139
140 @return: C{True} if the Telekinesis client command is available
141 @rtype: C{bool}
142
143 """
144
145
146
147 if utils.which('telekinesis-client'):
148 return True
149 else:
150 return False
151
153 """\
154 Close any left open port forwarding tunnel, also close Telekinesis client's log file,
155 if left open.
156
157 """
158 if self.tekiclient:
159 self.logger('Shutting down Telekinesis client subprocess', loglevel=log.loglevel_DEBUG)
160 try:
161 self.tekiclient.kill()
162 except OSError, e:
163 self.logger('Telekinesis client shutdown gave a message that we may ignore: %s' % str(e), loglevel=log.loglevel_WARN)
164 self.tekiclient = None
165
166 if self.fw_ctrl_tunnel is not None:
167 self.logger('Shutting down Telekinesis wormhole', loglevel=log.loglevel_DEBUG)
168 forward.stop_forward_tunnel(self.fw_ctrl_tunnel)
169 self.fw_ctrl_tunnel = None
170
171 if self.telekinesis_sshfs is not None:
172 telekinesis_sshfs_command = ['fusermount', '-u', '/tmp/.x2go-{local_user}/telekinesis/S-{sid}/'.format(local_user=_CURRENT_LOCAL_USER, sid=self.session_name), ]
173 self.logger('Umounting SSHFS mount for Telekinesis via forking a threaded subprocess: %s' % " ".join(telekinesis_sshfs_command), loglevel=log.loglevel_DEBUG)
174 self.telekinesis_sshfs_umount = subprocess.Popen(telekinesis_sshfs_command,
175 env=self.TEKICLIENT_ENV,
176 stdin=None,
177 stdout=self.tekiclient_datalog_stdout,
178 stderr=self.tekiclient_datalog_stderr,
179 shell=False)
180 self.telekinesis_sshfs = None
181
182 if self.fw_data_tunnel is not None:
183 self.logger('Shutting down Telekinesis DATA tunnel', loglevel=log.loglevel_DEBUG)
184 forward.stop_forward_tunnel(self.fw_data_tunnel)
185 self.fw_data_tunnel = None
186 if self.tekiclient_log_stdout is not None:
187 self.tekiclient_log_stdout.close()
188 if self.tekiclient_log_stderr is not None:
189 self.tekiclient_log_stderr.close()
190 if self.tekiclient_datalog_stdout is not None:
191 self.tekiclient_datalog_stdout.close()
192 if self.tekiclient_datalog_stderr is not None:
193 self.tekiclient_datalog_stderr.close()
194
196 """\
197 End the thread runner and tidy up.
198
199 """
200 self._keepalive = False
201
202 _count = 0
203 _maxwait = 40
204 while self.tekiclient is not None and (_count < _maxwait):
205 _count += 1
206 self.logger('waiting for Telekinesis client to shut down: 0.5s x %s' % _count, loglevel=log.loglevel_DEBUG)
207 gevent.sleep(.5)
208
210 """\
211 Start the X2Go Telekinesis client command. The Telekinesis client command utilizes a
212 Paramiko/SSH based forwarding tunnel (openssh -L option). This tunnel
213 gets started here and is forked into background (Greenlet/gevent).
214
215 """
216 self._keepalive = True
217 self.tekiclient = None
218
219 try:
220 os.makedirs(self.session_info.local_container)
221 except OSError, e:
222 if e.errno == 17:
223
224 pass
225
226 try:
227 if self.ssh_transport.getpeername()[0] in ('::1', '127.0.0.1', 'localhost', 'localhost.localdomain'):
228 self.local_tekictrl_port += 10000
229 except socket.error:
230 raise x2go_exceptions.X2GoControlSessionException('The control session has died unexpectedly.')
231 self.local_tekictrl_port = utils.detect_unused_port(preferred_port=self.local_tekictrl_port)
232
233 self.fw_ctrl_tunnel = forward.start_forward_tunnel(local_port=self.local_tekictrl_port,
234 remote_port=self.session_info.tekictrl_port,
235 ssh_transport=self.ssh_transport,
236 session_instance=self.session_instance,
237 session_name=self.session_name,
238 subsystem='Telekinesis Wormhole',
239 logger=self.logger,
240 )
241
242 self._update_local_tekictrl_socket(self.local_tekictrl_port)
243
244 cmd_line = self._generate_cmdline()
245
246 self.tekiclient_log_stdout = open('%s/%s' % (self.session_info.local_container, self.tekiclient_log, ), 'a')
247 self.tekiclient_log_stderr = open('%s/%s' % (self.session_info.local_container, self.tekiclient_log, ), 'a')
248 self.logger('forking threaded subprocess: %s' % " ".join(cmd_line), loglevel=log.loglevel_DEBUG)
249
250 while not self.tekiclient:
251 gevent.sleep(.2)
252 p = self.tekiclient = subprocess.Popen(cmd_line,
253 env=self.TEKICLIENT_ENV,
254 stdin=None,
255 stdout=self.tekiclient_log_stdout,
256 stderr=self.tekiclient_log_stderr,
257 shell=False)
258
259 while self._keepalive:
260 gevent.sleep(1)
261
262 try:
263 p.terminate()
264 self.logger('terminating Telekinesis client: %s' % p, loglevel=log.loglevel_DEBUG)
265 except OSError, e:
266 if e.errno == 3:
267
268 pass
269
270
271 self._tidy_up()
272
274 for idx, a in enumerate(self.TEKICLIENT_ARGS):
275 if a.startswith('-setWORMHOLEPORT='):
276 self.TEKICLIENT_ARGS[idx] = '-setWORMHOLEPORT=%s' % port
277
279 """\
280 Generate the NX proxy command line for execution.
281
282 """
283 cmd_line = [ self.TEKICLIENT_CMD, ]
284 _tekiclient_args = " ".join(self.TEKICLIENT_ARGS).format(sid=self.session_name).split(' ')
285 cmd_line.extend(_tekiclient_args)
286 return cmd_line
287
289 """\
290 Start the thread runner and wait for the Telekinesis client to come up.
291
292 @return: a subprocess instance that knows about the externally started Telekinesis client command.
293 @rtype: C{obj}
294
295 """
296 self.logger('starting local Telekinesis client...', loglevel=log.loglevel_INFO)
297
298
299 self.logger('Connecting Telekinesis data channel first via SSHFS host=127.0.0.1, port=%s.' % (self.session_info.tekidata_port,), loglevel=log.loglevel_DEBUG)
300
301 if self.session_info is None or self.ssh_transport is None or not self.session_info.local_container:
302 return None, False
303
304 try:
305 if self.ssh_transport.getpeername()[0] in ('::1', '127.0.0.1', 'localhost', 'localhost.localdomain'):
306 self.local_tekidata_port += 10000
307 except socket.error:
308 raise x2go_exceptions.X2GoControlSessionException('The control session has died unexpectedly.')
309 self.local_tekidata_port = utils.detect_unused_port(preferred_port=self.local_tekidata_port)
310
311 self.fw_data_tunnel = forward.start_forward_tunnel(local_port=self.local_tekidata_port,
312 remote_port=self.session_info.tekidata_port,
313 ssh_transport=self.ssh_transport,
314 session_instance=self.session_instance,
315 session_name=self.session_name,
316 subsystem='Telekinesis Data',
317 logger=self.logger,
318 )
319 self.tekiclient_datalog_stdout = open('%s/%s' % (self.session_info.local_container, self.tekiclient_datalog, ), 'a')
320 self.tekiclient_datalog_stderr = open('%s/%s' % (self.session_info.local_container, self.tekiclient_datalog, ), 'a')
321 try:
322 os.makedirs(os.path.normpath('/tmp/.x2go-{local_user}/telekinesis/S-{sid}/'.format(local_user=_CURRENT_LOCAL_USER, sid=self.session_name)))
323 except OSError, e:
324 if e.errno == 17:
325
326 pass
327 if self.session_instance.has_server_feature('X2GO_TELEKINESIS_TEKISFTPSERVER'):
328
329
330 telekinesis_sshfs_command = ['sshfs',
331 '-o', 'compression=no',
332 '-o', 'follow_symlinks',
333 '-o', 'directport={tekidata_port}'.format(tekidata_port=self.local_tekidata_port),
334 '127.0.0.1:/',
335 '/tmp/.x2go-{local_user}/telekinesis/S-{sid}/'.format(local_user=_CURRENT_LOCAL_USER, sid=self.session_name),
336 ]
337 else:
338
339
340 telekinesis_sshfs_command = ['sshfs',
341 '-o', 'compression=no',
342 '-o', 'follow_symlinks',
343 '-o', 'directport={tekidata_port}'.format(tekidata_port=self.local_tekidata_port),
344 '127.0.0.1:{remote_home}/.x2go/C-{sid}/telekinesis/remote/'.format(remote_home=self.session_instance.get_remote_home(), sid=self.session_name),
345 '/tmp/.x2go-{local_user}/telekinesis/S-{sid}/'.format(local_user=_CURRENT_LOCAL_USER, sid=self.session_name),
346 ]
347 self.logger('forking threaded subprocess: %s' % " ".join(telekinesis_sshfs_command), loglevel=log.loglevel_DEBUG)
348 try:
349 self.telekinesis_sshfs = subprocess.Popen(telekinesis_sshfs_command,
350 env=self.TEKICLIENT_ENV,
351 stdin=None,
352 stdout=self.tekiclient_datalog_stdout,
353 stderr=self.tekiclient_datalog_stderr,
354 shell=False)
355 except OSError, e:
356 if e.errno == 2:
357 self.logger("The 'sshfs' command is not available on your client machine, please install it to get Telekinesis up and running!!!", loglevel=log.loglevel_WARN)
358 else:
359 self.logger("An error occurred while setting up the Telekinesis data stream (via SSHFS): %s (errno: %s)" % (str(e), e.errno), loglevel=log.loglevel_WARN)
360 return None, False
361
362
363 _count = 0
364 _maxwait = 40
365 while self.fw_data_tunnel and (not self.fw_data_tunnel.is_active) and (not self.fw_data_tunnel.failed) and (_count < _maxwait):
366 _count += 1
367 self.logger('waiting for Telekinesis data tunnel to come up: 0.5s x %s' % _count, loglevel=log.loglevel_DEBUG)
368 gevent.sleep(.5)
369
370
371 if self.fw_data_tunnel.is_active and self.telekinesis_sshfs:
372
373 gevent.sleep(1)
374 threading.Thread.start(self)
375
376 self.logger('Telekinesis client tries to connect to host=127.0.0.1, port=%s.' % (self.session_info.tekictrl_port,), loglevel=log.loglevel_DEBUG)
377 self.logger('Telekinesis client writes its log to %s.' % os.path.join(self.session_info.local_container, self.tekiclient_log), loglevel=log.loglevel_DEBUG)
378 while self.tekiclient is None and _count < _maxwait:
379 _count += 1
380 self.logger('waiting for Telekinesis client to come up: 0.4s x %s' % _count, loglevel=log.loglevel_DEBUG)
381 gevent.sleep(.4)
382
383
384 if self.tekiclient is not None:
385
386
387 _count = 0
388 _maxwait = 40
389 while self.fw_ctrl_tunnel and (not self.fw_ctrl_tunnel.is_active) and (not self.fw_ctrl_tunnel.failed) and (_count < _maxwait):
390 _count += 1
391 self.logger('waiting for Telekinesis wormhole to come up: 0.5s x %s' % _count, loglevel=log.loglevel_DEBUG)
392 gevent.sleep(.5)
393
394 else:
395 self.logger('Aborting Telekinesis client startup for session %s, because the Telekinesis data connection failed to be established.' % (self.session_name,), loglevel=log.loglevel_WARN)
396
397 return self.tekiclient, bool(self.tekiclient) and (self.fw_ctrl_tunnel and self.fw_ctrl_tunnel.is_active)
398
400 """\
401 Check if a proxy instance is up and running.
402
403 @return: Proxy state, C{True} for proxy being up-and-running, C{False} otherwise
404 @rtype C{bool}
405
406 """
407 return bool(self.tekiclient and self.tekiclient.poll() is None) and self.fw_ctrl_tunnel.is_active and self.fw_data_tunnel.is_active
408