History support for console and telnet application, Fixes: #1083

Arrows UP/DOWN for telnet, Ref. #1083

Typo, Ref. #1083

Support async commands, Ref. #1083

Small refactor, Ref. #1083

Asyncio telnet server - connections support, Ref. #10831

Prompt-toolkit in dependencies, ref. #1083

Few comments, ref. #1083

Direct imports, ref. #1083

Windows size changed support in telnet server, ref. #1139

Fake termios

Fake termios - different approach

InputStream - copied source from prompt_toolkit
This commit is contained in:
ziajka 2017-07-13 14:36:37 +02:00
parent bb90c0ba52
commit ac508435c3
5 changed files with 751 additions and 53 deletions

View File

@ -30,6 +30,7 @@ import gns3server.utils.get_resource
import os import os
import sys import sys
import types
# To avoid strange bug later we switch the event loop before any other operation # To avoid strange bug later we switch the event loop before any other operation
if sys.platform.startswith("win"): if sys.platform.startswith("win"):
@ -38,6 +39,9 @@ if sys.platform.startswith("win"):
loop = asyncio.ProactorEventLoop() loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)
if sys.platform.startswith("win"):
sys.modules['termios'] = types.ModuleType('termios')
def daemonize(): def daemonize():
""" """

View File

@ -20,7 +20,18 @@ import sys
import asyncio import asyncio
import inspect import inspect
from .telnet_server import AsyncioTelnetServer from prompt_toolkit import prompt
from prompt_toolkit.history import InMemoryHistory
from prompt_toolkit.contrib.completers import WordCompleter
from prompt_toolkit.enums import DEFAULT_BUFFER
from prompt_toolkit.eventloop.base import EventLoop
from prompt_toolkit.interface import CommandLineInterface
from prompt_toolkit.layout.screen import Size
from prompt_toolkit.shortcuts import create_prompt_application, create_asyncio_eventloop
from prompt_toolkit.terminal.vt100_output import Vt100_Output
from .telnet_server import AsyncioTelnetServer, TelnetConnection
from .input_stream import InputStream
class EmbedShell: class EmbedShell:
@ -60,6 +71,14 @@ class EmbedShell:
def prompt(self, val): def prompt(self, val):
self._prompt = val self._prompt = val
@property
def welcome_message(self):
return self._welcome_message
@welcome_message.setter
def welcome_message(self, welcome_message):
self._welcome_message = welcome_message
@asyncio.coroutine @asyncio.coroutine
def help(self, *args): def help(self, *args):
""" """
@ -90,6 +109,11 @@ class EmbedShell:
found = False found = False
if cmd[0] == '?': if cmd[0] == '?':
cmd[0] = 'help' cmd[0] = 'help'
# when there is no command specified just return empty result
if not cmd[0].strip():
return ""
for (name, meth) in inspect.getmembers(self): for (name, meth) in inspect.getmembers(self):
if name == cmd[0]: if name == cmd[0]:
cmd.pop(0) cmd.pop(0)
@ -97,7 +121,7 @@ class EmbedShell:
found = True found = True
break break
if not found: if not found:
res = ('Command not found {}'.format(cmd[0]) + (yield from self.help())) res = ('Command not found {}\n'.format(cmd[0]) + (yield from self.help()))
return res return res
@asyncio.coroutine @asyncio.coroutine
@ -111,29 +135,140 @@ class EmbedShell:
res = yield from self._parse_command(result) res = yield from self._parse_command(result)
self._writer.feed_data(res.encode()) self._writer.feed_data(res.encode())
def get_commands(self):
"""
Returns commands available to execute
:return: list of (name, doc) tuples
"""
commands = []
for name, value in inspect.getmembers(self):
if not inspect.isgeneratorfunction(value):
continue
if name.startswith('_') or name == 'run':
continue
doc = inspect.getdoc(value)
commands.append((name, doc))
return commands
class UnstoppableEventLoop(EventLoop):
"""
Partially fake event loop which cannot be stopped by CommandLineInterface
"""
def __init__(self, loop):
self._loop = loop
def close(self):
" Ignore. "
def stop(self):
" Ignore. "
def run_in_executor(self, *args, **kwargs):
return self._loop.run_in_executor(*args, **kwargs)
def call_from_executor(self, callback, **kwargs):
self._loop.call_from_executor(callback, **kwargs)
def add_reader(self, fd, callback):
raise NotImplementedError
def remove_reader(self, fd):
raise NotImplementedError
class ShellConnection(TelnetConnection):
def __init__(self, reader, writer, shell, loop):
super(ShellConnection, self).__init__(reader, writer)
self._shell = shell
self._loop = loop
self._cli = None
self._cb = None
self._size = Size(rows=40, columns=79)
self.encoding = 'utf-8'
@asyncio.coroutine
def connected(self):
def get_size():
return self._size
self._cli = CommandLineInterface(
application=create_prompt_application(self._shell.prompt),
eventloop=UnstoppableEventLoop(create_asyncio_eventloop(self._loop)),
output=Vt100_Output(self, get_size))
self._cb = self._cli.create_eventloop_callbacks()
self._inputstream = InputStream(self._cb.feed_key)
# Taken from prompt_toolkit telnet server
# https://github.com/jonathanslenders/python-prompt-toolkit/blob/99fa7fae61c9b4ed9767ead3b4f9b1318cfa875d/prompt_toolkit/contrib/telnet/server.py#L165
self._cli._is_running = True
if self._shell.welcome_message is not None:
self.send(self._shell.welcome_message.encode())
self._cli._redraw()
@asyncio.coroutine
def disconnected(self):
pass
def window_size_changed(self, columns, rows):
self._size = Size(rows=rows, columns=columns)
self._cb.terminal_size_changed()
@asyncio.coroutine
def feed(self, data):
data = data.decode()
self._inputstream.feed(data)
self._cli._redraw()
# Prompt toolkit has returned the command
if self._cli.is_returning:
try:
returned_value = self._cli.return_value()
except (EOFError, KeyboardInterrupt) as e:
# don't close terminal, just keep it alive
self.close()
return
command = returned_value.text
res = yield from self._shell._parse_command(command)
self.send(res.encode())
self.reset()
def reset(self):
""" Resets terminal screen"""
self._cli.reset()
self._cli.buffers[DEFAULT_BUFFER].reset()
self._cli.renderer.request_absolute_cursor_position()
self._cli._redraw()
def write(self, data):
""" Compat with CLI"""
self.send(data)
def flush(self):
""" Compat with CLI"""
pass
def create_telnet_shell(shell, loop=None): def create_telnet_shell(shell, loop=None):
""" """
Run a shell application with a telnet frontend Run a shell application with a telnet frontend
:param application: An EmbedShell instance :param application: An EmbedShell instance
:param loop: The event loop :param loop: The event loop
:returns: Telnet server :returns: Telnet server
""" """
class Stream(asyncio.StreamReader):
def write(self, data):
self.feed_data(data)
@asyncio.coroutine
def drain(self):
pass
shell.reader = Stream()
shell.writer = Stream()
if loop is None: if loop is None:
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
loop.create_task(shell.run())
return AsyncioTelnetServer(reader=shell.writer, writer=shell.reader, binary=False, echo=False) def factory(reader, writer):
return ShellConnection(reader, writer, shell, loop)
return AsyncioTelnetServer(binary=True, echo=True, naws=True, connection_factory=factory)
def create_stdin_shell(shell, loop=None): def create_stdin_shell(shell, loop=None):
@ -145,9 +280,13 @@ def create_stdin_shell(shell, loop=None):
:returns: Telnet server :returns: Telnet server
""" """
@asyncio.coroutine @asyncio.coroutine
def feed_stdin(loop, reader): def feed_stdin(loop, reader, shell):
history = InMemoryHistory()
completer = WordCompleter([name for name, _ in shell.get_commands()], ignore_case=True)
while True: while True:
line = yield from loop.run_in_executor(None, sys.stdin.readline) line = yield from prompt(
">", patch_stdout=True, return_asyncio_coroutine=True, history=history, completer=completer)
line += '\n'
reader.feed_data(line.encode()) reader.feed_data(line.encode())
@asyncio.coroutine @asyncio.coroutine
@ -164,7 +303,7 @@ def create_stdin_shell(shell, loop=None):
if loop is None: if loop is None:
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
reader_task = loop.create_task(feed_stdin(loop, reader)) reader_task = loop.create_task(feed_stdin(loop, reader, shell))
writer_task = loop.create_task(read_stdout(writer)) writer_task = loop.create_task(read_stdout(writer))
shell_task = loop.create_task(shell.run()) shell_task = loop.create_task(shell.run())
return asyncio.gather(shell_task, writer_task, reader_task) return asyncio.gather(shell_task, writer_task, reader_task)
@ -181,20 +320,26 @@ if __name__ == '__main__':
This command accept arguments: hello tutu will display tutu This command accept arguments: hello tutu will display tutu
""" """
if len(args): @asyncio.coroutine
return ' '.join(args) def world():
else: yield from asyncio.sleep(2)
return 'world\n' if len(args):
return ' '.join(args)
else:
return 'world\n'
return (yield from world())
# Demo using telnet # Demo using telnet
# server = create_telnet_shell(Demo()) shell = Demo(welcome_message="Welcome!\n")
# coro = asyncio.start_server(server.run, '127.0.0.1', 4444, loop=loop) server = create_telnet_shell(shell, loop=loop)
# s = loop.run_until_complete(coro) coro = asyncio.start_server(server.run, '127.0.0.1', 4444, loop=loop)
# try: s = loop.run_until_complete(coro)
# loop.run_forever() try:
# except KeyboardInterrupt: loop.run_forever()
# pass except KeyboardInterrupt:
pass
# Demo using stdin # Demo using stdin
loop.run_until_complete(create_stdin_shell(Demo())) # loop.run_until_complete(create_stdin_shell(Demo()))
loop.close() # loop.close()

View File

@ -0,0 +1,419 @@
"""
Parser for VT100 input stream.
"""
# Copied from prompt_toolkit/terminal/vt100_input.py due to dependency on termios (which is not available on Windows)
from __future__ import unicode_literals
import re
import six
from six.moves import range
from prompt_toolkit.keys import Keys
from prompt_toolkit.key_binding.input_processor import KeyPress
__all__ = (
'InputStream',
'raw_mode',
'cooked_mode',
)
_DEBUG_RENDERER_INPUT = False
_DEBUG_RENDERER_INPUT_FILENAME = 'prompt-toolkit-render-input.log'
# Regex matching any CPR response
# (Note that we use '\Z' instead of '$', because '$' could include a trailing
# newline.)
_cpr_response_re = re.compile('^' + re.escape('\x1b[') + r'\d+;\d+R\Z')
# Mouse events:
# Typical: "Esc[MaB*" Urxvt: "Esc[96;14;13M" and for Xterm SGR: "Esc[<64;85;12M"
_mouse_event_re = re.compile('^' + re.escape('\x1b[') + r'(<?[\d;]+[mM]|M...)\Z')
# Regex matching any valid prefix of a CPR response.
# (Note that it doesn't contain the last character, the 'R'. The prefix has to
# be shorter.)
_cpr_response_prefix_re = re.compile('^' + re.escape('\x1b[') + r'[\d;]*\Z')
_mouse_event_prefix_re = re.compile('^' + re.escape('\x1b[') + r'(<?[\d;]*|M.{0,2})\Z')
class _Flush(object):
""" Helper object to indicate flush operation to the parser. """
pass
# Mapping of vt100 escape codes to Keys.
ANSI_SEQUENCES = {
'\x1b': Keys.Escape,
'\x00': Keys.ControlSpace, # Control-Space (Also for Ctrl-@)
'\x01': Keys.ControlA, # Control-A (home)
'\x02': Keys.ControlB, # Control-B (emacs cursor left)
'\x03': Keys.ControlC, # Control-C (interrupt)
'\x04': Keys.ControlD, # Control-D (exit)
'\x05': Keys.ControlE, # Contrel-E (end)
'\x06': Keys.ControlF, # Control-F (cursor forward)
'\x07': Keys.ControlG, # Control-G
'\x08': Keys.ControlH, # Control-H (8) (Identical to '\b')
'\x09': Keys.ControlI, # Control-I (9) (Identical to '\t')
'\x0a': Keys.ControlJ, # Control-J (10) (Identical to '\n')
'\x0b': Keys.ControlK, # Control-K (delete until end of line; vertical tab)
'\x0c': Keys.ControlL, # Control-L (clear; form feed)
'\x0d': Keys.ControlM, # Control-M (13) (Identical to '\r')
'\x0e': Keys.ControlN, # Control-N (14) (history forward)
'\x0f': Keys.ControlO, # Control-O (15)
'\x10': Keys.ControlP, # Control-P (16) (history back)
'\x11': Keys.ControlQ, # Control-Q
'\x12': Keys.ControlR, # Control-R (18) (reverse search)
'\x13': Keys.ControlS, # Control-S (19) (forward search)
'\x14': Keys.ControlT, # Control-T
'\x15': Keys.ControlU, # Control-U
'\x16': Keys.ControlV, # Control-V
'\x17': Keys.ControlW, # Control-W
'\x18': Keys.ControlX, # Control-X
'\x19': Keys.ControlY, # Control-Y (25)
'\x1a': Keys.ControlZ, # Control-Z
'\x1c': Keys.ControlBackslash, # Both Control-\ and Ctrl-|
'\x1d': Keys.ControlSquareClose, # Control-]
'\x1e': Keys.ControlCircumflex, # Control-^
'\x1f': Keys.ControlUnderscore, # Control-underscore (Also for Ctrl-hypen.)
'\x7f': Keys.Backspace, # (127) Backspace
'\x1b[A': Keys.Up,
'\x1b[B': Keys.Down,
'\x1b[C': Keys.Right,
'\x1b[D': Keys.Left,
'\x1b[H': Keys.Home,
'\x1bOH': Keys.Home,
'\x1b[F': Keys.End,
'\x1bOF': Keys.End,
'\x1b[3~': Keys.Delete,
'\x1b[3;2~': Keys.ShiftDelete, # xterm, gnome-terminal.
'\x1b[3;5~': Keys.ControlDelete, # xterm, gnome-terminal.
'\x1b[1~': Keys.Home, # tmux
'\x1b[4~': Keys.End, # tmux
'\x1b[5~': Keys.PageUp,
'\x1b[6~': Keys.PageDown,
'\x1b[7~': Keys.Home, # xrvt
'\x1b[8~': Keys.End, # xrvt
'\x1b[Z': Keys.BackTab, # shift + tab
'\x1b[2~': Keys.Insert,
'\x1bOP': Keys.F1,
'\x1bOQ': Keys.F2,
'\x1bOR': Keys.F3,
'\x1bOS': Keys.F4,
'\x1b[[A': Keys.F1, # Linux console.
'\x1b[[B': Keys.F2, # Linux console.
'\x1b[[C': Keys.F3, # Linux console.
'\x1b[[D': Keys.F4, # Linux console.
'\x1b[[E': Keys.F5, # Linux console.
'\x1b[11~': Keys.F1, # rxvt-unicode
'\x1b[12~': Keys.F2, # rxvt-unicode
'\x1b[13~': Keys.F3, # rxvt-unicode
'\x1b[14~': Keys.F4, # rxvt-unicode
'\x1b[15~': Keys.F5,
'\x1b[17~': Keys.F6,
'\x1b[18~': Keys.F7,
'\x1b[19~': Keys.F8,
'\x1b[20~': Keys.F9,
'\x1b[21~': Keys.F10,
'\x1b[23~': Keys.F11,
'\x1b[24~': Keys.F12,
'\x1b[25~': Keys.F13,
'\x1b[26~': Keys.F14,
'\x1b[28~': Keys.F15,
'\x1b[29~': Keys.F16,
'\x1b[31~': Keys.F17,
'\x1b[32~': Keys.F18,
'\x1b[33~': Keys.F19,
'\x1b[34~': Keys.F20,
# Xterm
'\x1b[1;2P': Keys.F13,
'\x1b[1;2Q': Keys.F14,
# '\x1b[1;2R': Keys.F15, # Conflicts with CPR response.
'\x1b[1;2S': Keys.F16,
'\x1b[15;2~': Keys.F17,
'\x1b[17;2~': Keys.F18,
'\x1b[18;2~': Keys.F19,
'\x1b[19;2~': Keys.F20,
'\x1b[20;2~': Keys.F21,
'\x1b[21;2~': Keys.F22,
'\x1b[23;2~': Keys.F23,
'\x1b[24;2~': Keys.F24,
'\x1b[1;5A': Keys.ControlUp, # Cursor Mode
'\x1b[1;5B': Keys.ControlDown, # Cursor Mode
'\x1b[1;5C': Keys.ControlRight, # Cursor Mode
'\x1b[1;5D': Keys.ControlLeft, # Cursor Mode
'\x1b[1;2A': Keys.ShiftUp,
'\x1b[1;2B': Keys.ShiftDown,
'\x1b[1;2C': Keys.ShiftRight,
'\x1b[1;2D': Keys.ShiftLeft,
# Tmux sends following keystrokes when control+arrow is pressed, but for
# Emacs ansi-term sends the same sequences for normal arrow keys. Consider
# it a normal arrow press, because that's more important.
'\x1bOA': Keys.Up,
'\x1bOB': Keys.Down,
'\x1bOC': Keys.Right,
'\x1bOD': Keys.Left,
'\x1b[5A': Keys.ControlUp,
'\x1b[5B': Keys.ControlDown,
'\x1b[5C': Keys.ControlRight,
'\x1b[5D': Keys.ControlLeft,
'\x1bOc': Keys.ControlRight, # rxvt
'\x1bOd': Keys.ControlLeft, # rxvt
'\x1b[200~': Keys.BracketedPaste, # Start of bracketed paste.
# Meta + arrow keys. Several terminals handle this differently.
# The following sequences are for xterm and gnome-terminal.
# (Iterm sends ESC followed by the normal arrow_up/down/left/right
# sequences, and the OSX Terminal sends ESCb and ESCf for "alt
# arrow_left" and "alt arrow_right." We don't handle these
# explicitely, in here, because would could not distinguesh between
# pressing ESC (to go to Vi navigation mode), followed by just the
# 'b' or 'f' key. These combinations are handled in
# the input processor.)
'\x1b[1;3D': (Keys.Escape, Keys.Left),
'\x1b[1;3C': (Keys.Escape, Keys.Right),
'\x1b[1;3A': (Keys.Escape, Keys.Up),
'\x1b[1;3B': (Keys.Escape, Keys.Down),
# Sequences generated by numpad 5. Not sure what it means. (It doesn't
# appear in 'infocmp'. Just ignore.
'\x1b[E': Keys.Ignore, # Xterm.
'\x1b[G': Keys.Ignore, # Linux console.
}
class _IsPrefixOfLongerMatchCache(dict):
"""
Dictiory that maps input sequences to a boolean indicating whether there is
any key that start with this characters.
"""
def __missing__(self, prefix):
# (hard coded) If this could be a prefix of a CPR response, return
# True.
if (_cpr_response_prefix_re.match(prefix) or _mouse_event_prefix_re.match(prefix)):
result = True
else:
# If this could be a prefix of anything else, also return True.
result = any(v for k, v in ANSI_SEQUENCES.items() if k.startswith(prefix) and k != prefix)
self[prefix] = result
return result
_IS_PREFIX_OF_LONGER_MATCH_CACHE = _IsPrefixOfLongerMatchCache()
class InputStream(object):
"""
Parser for VT100 input stream.
Feed the data through the `feed` method and the correct callbacks of the
`input_processor` will be called.
::
def callback(key):
pass
i = InputStream(callback)
i.feed('data\x01...')
:attr input_processor: :class:`~prompt_toolkit.key_binding.InputProcessor` instance.
"""
# Lookup table of ANSI escape sequences for a VT100 terminal
# Hint: in order to know what sequences your terminal writes to stdin, run
# "od -c" and start typing.
def __init__(self, feed_key_callback):
assert callable(feed_key_callback)
self.feed_key_callback = feed_key_callback
self.reset()
if _DEBUG_RENDERER_INPUT:
self.LOG = open(_DEBUG_RENDERER_INPUT_FILENAME, 'ab')
def reset(self, request=False):
self._in_bracketed_paste = False
self._start_parser()
def _start_parser(self):
"""
Start the parser coroutine.
"""
self._input_parser = self._input_parser_generator()
self._input_parser.send(None)
def _get_match(self, prefix):
"""
Return the key that maps to this prefix.
"""
# (hard coded) If we match a CPR response, return Keys.CPRResponse.
# (This one doesn't fit in the ANSI_SEQUENCES, because it contains
# integer variables.)
if _cpr_response_re.match(prefix):
return Keys.CPRResponse
elif _mouse_event_re.match(prefix):
return Keys.Vt100MouseEvent
# Otherwise, use the mappings.
try:
return ANSI_SEQUENCES[prefix]
except KeyError:
return None
def _input_parser_generator(self):
"""
Coroutine (state machine) for the input parser.
"""
prefix = ''
retry = False
flush = False
while True:
flush = False
if retry:
retry = False
else:
# Get next character.
c = yield
if c == _Flush:
flush = True
else:
prefix += c
# If we have some data, check for matches.
if prefix:
is_prefix_of_longer_match = _IS_PREFIX_OF_LONGER_MATCH_CACHE[prefix]
match = self._get_match(prefix)
# Exact matches found, call handlers..
if (flush or not is_prefix_of_longer_match) and match:
self._call_handler(match, prefix)
prefix = ''
# No exact match found.
elif (flush or not is_prefix_of_longer_match) and not match:
found = False
retry = True
# Loop over the input, try the longest match first and
# shift.
for i in range(len(prefix), 0, -1):
match= self._get_match(prefix[:i])
if match:
self._call_handler(match, prefix[:i])
prefix = prefix[i:]
found = True
if not found:
self._call_handler(prefix[0], prefix[0])
prefix = prefix[1:]
def _call_handler(self, key, insert_text):
"""
Callback to handler.
"""
if isinstance(key, tuple):
for k in key:
self._call_handler(k, insert_text)
else:
if key == Keys.BracketedPaste:
self._in_bracketed_paste = True
self._paste_buffer = ''
else:
self.feed_key_callback(KeyPress(key, insert_text))
def feed(self, data):
"""
Feed the input stream.
:param data: Input string (unicode).
"""
assert isinstance(data, six.text_type)
if _DEBUG_RENDERER_INPUT:
self.LOG.write(repr(data).encode('utf-8') + b'\n')
self.LOG.flush()
# Handle bracketed paste. (We bypass the parser that matches all other
# key presses and keep reading input until we see the end mark.)
# This is much faster then parsing character by character.
if self._in_bracketed_paste:
self._paste_buffer += data
end_mark = '\x1b[201~'
if end_mark in self._paste_buffer:
end_index = self._paste_buffer.index(end_mark)
# Feed content to key bindings.
paste_content = self._paste_buffer[:end_index]
self.feed_key_callback(KeyPress(Keys.BracketedPaste, paste_content))
# Quit bracketed paste mode and handle remaining input.
self._in_bracketed_paste = False
remaining = self._paste_buffer[end_index + len(end_mark):]
self._paste_buffer = ''
self.feed(remaining)
# Handle normal input character by character.
else:
for i, c in enumerate(data):
if self._in_bracketed_paste:
# Quit loop and process from this position when the parser
# entered bracketed paste.
self.feed(data[i:])
break
else:
# Replace \r by \n. (Some clients send \r instead of \n
# when enter is pressed. E.g. telnet and some other
# terminals.)
# XXX: We should remove this in a future version. It *is*
# now possible to recognise the difference.
# (We remove ICRNL/INLCR/IGNCR below.)
# However, this breaks IPython and maybe other applications,
# because they bind ControlJ (\n) for handling the Enter key.
# When this is removed, replace Enter=ControlJ by
# Enter=ControlM in keys.py.
if c == '\r':
c = '\n'
self._input_parser.send(c)
def flush(self):
"""
Flush the buffer of the input stream.
This will allow us to handle the escape key (or maybe meta) sooner.
The input received by the escape key is actually the same as the first
characters of e.g. Arrow-Up, so without knowing what follows the escape
sequence, we don't know whether escape has been pressed, or whether
it's something else. This flush function should be called after a
timeout, and processes everything that's still in the buffer as-is, so
without assuming any characters will folow.
"""
self._input_parser.send(_Flush)
def feed_and_flush(self, data):
"""
Wrapper around ``feed`` and ``flush``.
"""
self.feed(data)
self.flush()

View File

@ -18,6 +18,7 @@
import re import re
import asyncio import asyncio
import asyncio.subprocess import asyncio.subprocess
import struct
import logging import logging
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -57,12 +58,73 @@ LINEMO = 34 # Line Mode
READ_SIZE = 1024 READ_SIZE = 1024
class AsyncioTelnetServer: class TelnetConnection(object):
"""Default implementation of telnet connection which may but may not be used."""
def __init__(self, reader=None, writer=None, binary=True, echo=False): def __init__(self, reader, writer):
self.is_closing = False
self._reader = reader self._reader = reader
self._writer = writer self._writer = writer
self._clients = set()
@property
def reader(self):
return self._reader
@property
def writer(self):
return self._writer
@asyncio.coroutine
def connected(self):
"""Method called when client is connected"""
pass
@asyncio.coroutine
def disconnected(self):
"""Method called when client is disconnecting"""
pass
def window_size_changed(self, columns, rows):
"""Method called when window size changed, only can occur when
`naws` flag is enable in server configuration."""
pass
def feed(self, data):
"""
Handles incoming data
:return:
"""
def send(self, data):
"""
Sending data back to client
:return:
"""
data = data.decode().replace("\n", "\r\n")
self.writer.write(data.encode())
def close(self):
"""
Closes current connection
:return:
"""
self.is_closing = True
class AsyncioTelnetServer:
MAX_NEGOTIATION_READ = 10
def __init__(self, reader=None, writer=None, binary=True, echo=False, naws=False, connection_factory=None):
"""
Initializes telnet server
:param naws when True make a window size negotiation
:param connection_factory: when set it's possible to inject own implementation of connection
"""
assert connection_factory is None or (connection_factory is not None and reader is None and writer is None), \
"Please use either reader and writer either connection_factory, otherwise duplicate data may be produced."
self._reader = reader
self._writer = writer
self._connections = dict()
self._lock = asyncio.Lock() self._lock = asyncio.Lock()
self._reader_process = None self._reader_process = None
self._current_read = None self._current_read = None
@ -72,6 +134,15 @@ class AsyncioTelnetServer:
# the data is echo on his terminal by telnet otherwise # the data is echo on his terminal by telnet otherwise
# it's our job (or the wrapped app) to send back the data # it's our job (or the wrapped app) to send back the data
self._echo = echo self._echo = echo
self._naws = naws
def default_connection_factory(reader, writer):
return TelnetConnection(reader, writer)
if connection_factory is None:
connection_factory = default_connection_factory
self._connection_factory = connection_factory
@staticmethod @staticmethod
@asyncio.coroutine @asyncio.coroutine
@ -86,7 +157,7 @@ class AsyncioTelnetServer:
yield from writer.drain() yield from writer.drain()
@asyncio.coroutine @asyncio.coroutine
def _write_intro(self, writer, binary=False, echo=False): def _write_intro(self, writer, binary=False, echo=False, naws=False):
# Send initial telnet session opening # Send initial telnet session opening
if echo: if echo:
writer.write(bytes([IAC, WILL, ECHO])) writer.write(bytes([IAC, WILL, ECHO]))
@ -106,17 +177,23 @@ class AsyncioTelnetServer:
IAC, DONT, SGA, IAC, DONT, SGA,
IAC, WONT, BINARY, IAC, WONT, BINARY,
IAC, DONT, BINARY])) IAC, DONT, BINARY]))
if naws:
writer.write(bytes([
IAC, DO, NAWS
]))
yield from writer.drain() yield from writer.drain()
@asyncio.coroutine @asyncio.coroutine
def run(self, network_reader, network_writer): def run(self, network_reader, network_writer):
# Keep track of connected clients # Keep track of connected clients
self._clients.add(network_writer) connection = self._connection_factory(network_reader, network_writer)
self._connections[network_writer] = connection
try: try:
yield from self._write_intro(network_writer, echo=self._echo, binary=self._binary) yield from self._write_intro(network_writer, echo=self._echo, binary=self._binary, naws=self._naws)
yield from connection.connected()
yield from self._process(network_reader, network_writer) yield from self._process(network_reader, network_writer, connection)
except ConnectionResetError: except ConnectionResetError:
with (yield from self._lock): with (yield from self._lock):
@ -125,8 +202,15 @@ class AsyncioTelnetServer:
if self._reader_process == network_reader: if self._reader_process == network_reader:
self._reader_process = None self._reader_process = None
# Cancel current read from this reader # Cancel current read from this reader
self._current_read.cancel() if self._current_read is not None:
self._clients.remove(network_writer) self._current_read.cancel()
yield from connection.disconnected()
del self._connections[network_writer]
@asyncio.coroutine
def client_connected_hook(self):
pass
@asyncio.coroutine @asyncio.coroutine
def _get_reader(self, network_reader): def _get_reader(self, network_reader):
@ -136,13 +220,14 @@ class AsyncioTelnetServer:
with (yield from self._lock): with (yield from self._lock):
if self._reader_process is None: if self._reader_process is None:
self._reader_process = network_reader self._reader_process = network_reader
if self._reader_process == network_reader: if self._reader:
self._current_read = asyncio.async(self._reader.read(READ_SIZE)) if self._reader_process == network_reader:
return self._current_read self._current_read = asyncio.async(self._reader.read(READ_SIZE))
return self._current_read
return None return None
@asyncio.coroutine @asyncio.coroutine
def _process(self, network_reader, network_writer): def _process(self, network_reader, network_writer, connection):
network_read = asyncio.async(network_reader.read(READ_SIZE)) network_read = asyncio.async(network_reader.read(READ_SIZE))
reader_read = yield from self._get_reader(network_reader) reader_read = yield from self._get_reader(network_reader)
@ -172,7 +257,8 @@ class AsyncioTelnetServer:
network_read = asyncio.async(network_reader.read(READ_SIZE)) network_read = asyncio.async(network_reader.read(READ_SIZE))
if IAC in data: if IAC in data:
data = yield from self._IAC_parser(data, network_reader, network_writer) data = yield from self._IAC_parser(data, network_reader, network_writer, connection)
if len(data) == 0: if len(data) == 0:
continue continue
@ -182,18 +268,49 @@ class AsyncioTelnetServer:
if self._writer: if self._writer:
self._writer.write(data) self._writer.write(data)
yield from self._writer.drain() yield from self._writer.drain()
yield from connection.feed(data)
if connection.is_closing:
raise ConnectionResetError()
elif coro == reader_read: elif coro == reader_read:
if self._reader.at_eof(): if self._reader and self._reader.at_eof():
raise ConnectionResetError() raise ConnectionResetError()
reader_read = yield from self._get_reader(network_reader) reader_read = yield from self._get_reader(network_reader)
# Replicate the output on all clients # Replicate the output on all clients
for writer in self._clients: for connection in self._connections.values():
writer.write(data) connection.writer.write(data)
yield from writer.drain() yield from connection.writer.drain()
def _IAC_parser(self, buf, network_reader, network_writer): @asyncio.coroutine
def _read(self, cmd, buffer, location, reader):
""" Reads next op from the buffer or reader"""
try:
op = buffer[location]
cmd.append(op)
return op
except IndexError:
op = yield from reader.read(1)
buffer.extend(op)
cmd.append(buffer[location])
return op
def _negotiate(self, data, connection):
""" Performs negotiation commands"""
command, payload = data[0], data[1:]
if command == NAWS:
if len(payload) == 4:
columns, rows = struct.unpack(str('!HH'), bytes(payload))
connection.window_size_changed(columns, rows)
else:
log.warning('Wrong number of NAWS bytes')
else:
log.debug("Not supported negotiation sequence, received {} bytes", len(data))
def _IAC_parser(self, buf, network_reader, network_writer, connection):
""" """
Processes and removes any Telnet commands from the buffer. Processes and removes any Telnet commands from the buffer.
@ -201,6 +318,7 @@ class AsyncioTelnetServer:
:returns: buffer minus Telnet commands :returns: buffer minus Telnet commands
""" """
skip_to = 0 skip_to = 0
while True: while True:
# Locate an IAC to process # Locate an IAC to process
@ -218,7 +336,7 @@ class AsyncioTelnetServer:
iac_cmd.append(buf[iac_loc + 1]) iac_cmd.append(buf[iac_loc + 1])
# Is this just a 2-byte TELNET command? # Is this just a 2-byte TELNET command?
if iac_cmd[1] not in [WILL, WONT, DO, DONT]: if iac_cmd[1] not in [WILL, WONT, DO, DONT, SB]:
if iac_cmd[1] == AYT: if iac_cmd[1] == AYT:
log.debug("Telnet server received Are-You-There (AYT)") log.debug("Telnet server received Are-You-There (AYT)")
network_writer.write(b'\r\nYour Are-You-There received. I am here.\r\n') network_writer.write(b'\r\nYour Are-You-There received. I am here.\r\n')
@ -234,6 +352,17 @@ class AsyncioTelnetServer:
else: else:
log.debug("Unhandled telnet command: " log.debug("Unhandled telnet command: "
"{0:#x} {1:#x}".format(*iac_cmd)) "{0:#x} {1:#x}".format(*iac_cmd))
elif iac_cmd[1] == SB: # starts negotiation commands
negotiation = []
for pos in range(2, self.MAX_NEGOTIATION_READ):
op = yield from self._read(iac_cmd, buf, iac_loc + pos, network_reader)
negotiation.append(op)
if op == SE:
# ends negotiation commands
break
# SE command is followed by IAC, remove the last two operations from stack
self._negotiate(negotiation[0:-2], connection)
# This must be a 3-byte TELNET command # This must be a 3-byte TELNET command
else: else:
@ -260,7 +389,7 @@ class AsyncioTelnetServer:
log.debug("Unhandled DONT telnet command: " log.debug("Unhandled DONT telnet command: "
"{0:#x} {1:#x} {2:#x}".format(*iac_cmd)) "{0:#x} {1:#x} {2:#x}".format(*iac_cmd))
elif iac_cmd[1] == WILL: elif iac_cmd[1] == WILL:
if iac_cmd[2] not in [BINARY]: if iac_cmd[2] not in [BINARY, NAWS]:
log.debug("Unhandled WILL telnet command: " log.debug("Unhandled WILL telnet command: "
"{0:#x} {1:#x} {2:#x}".format(*iac_cmd)) "{0:#x} {1:#x} {2:#x}".format(*iac_cmd))
elif iac_cmd[1] == WONT: elif iac_cmd[1] == WONT:

View File

@ -7,3 +7,4 @@ raven>=5.23.0
psutil>=3.0.0 psutil>=3.0.0
zipstream>=1.1.4 zipstream>=1.1.4
typing>=3.5.3.0 # Otherwise yarl fail with python 3.4 typing>=3.5.3.0 # Otherwise yarl fail with python 3.4
prompt-toolkit