1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042 |
- #!/usr/bin/env python
- #
- # Very simple serial terminal
- #
- # This file is part of pySerial. https://github.com/pyserial/pyserial
- # (C)2002-2020 Chris Liechti <cliechti@gmx.net>
- #
- # SPDX-License-Identifier: BSD-3-Clause
- from __future__ import absolute_import
- import codecs
- import os
- import sys
- import threading
- import serial
- from serial.tools.list_ports import comports
- from serial.tools import hexlify_codec
- # pylint: disable=wrong-import-order,wrong-import-position
- codecs.register(lambda c: hexlify_codec.getregentry() if c == 'hexlify' else None)
- try:
- raw_input
- except NameError:
- # pylint: disable=redefined-builtin,invalid-name
- raw_input = input # in python3 it's "raw"
- unichr = chr
- def key_description(character):
- """generate a readable description for a key"""
- ascii_code = ord(character)
- if ascii_code < 32:
- return 'Ctrl+{:c}'.format(ord('@') + ascii_code)
- else:
- return repr(character)
- # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
- class ConsoleBase(object):
- """OS abstraction for console (input/output codec, no echo)"""
- def __init__(self):
- if sys.version_info >= (3, 0):
- self.byte_output = sys.stdout.buffer
- else:
- self.byte_output = sys.stdout
- self.output = sys.stdout
- def setup(self):
- """Set console to read single characters, no echo"""
- def cleanup(self):
- """Restore default console settings"""
- def getkey(self):
- """Read a single key from the console"""
- return None
- def write_bytes(self, byte_string):
- """Write bytes (already encoded)"""
- self.byte_output.write(byte_string)
- self.byte_output.flush()
- def write(self, text):
- """Write string"""
- self.output.write(text)
- self.output.flush()
- def cancel(self):
- """Cancel getkey operation"""
- # - - - - - - - - - - - - - - - - - - - - - - - -
- # context manager:
- # switch terminal temporary to normal mode (e.g. to get user input)
- def __enter__(self):
- self.cleanup()
- return self
- def __exit__(self, *args, **kwargs):
- self.setup()
- if os.name == 'nt': # noqa
- import msvcrt
- import ctypes
- import platform
- class Out(object):
- """file-like wrapper that uses os.write"""
- def __init__(self, fd):
- self.fd = fd
- def flush(self):
- pass
- def write(self, s):
- os.write(self.fd, s)
- class Console(ConsoleBase):
- fncodes = {
- ';': '\1bOP', # F1
- '<': '\1bOQ', # F2
- '=': '\1bOR', # F3
- '>': '\1bOS', # F4
- '?': '\1b[15~', # F5
- '@': '\1b[17~', # F6
- 'A': '\1b[18~', # F7
- 'B': '\1b[19~', # F8
- 'C': '\1b[20~', # F9
- 'D': '\1b[21~', # F10
- }
- navcodes = {
- 'H': '\x1b[A', # UP
- 'P': '\x1b[B', # DOWN
- 'K': '\x1b[D', # LEFT
- 'M': '\x1b[C', # RIGHT
- 'G': '\x1b[H', # HOME
- 'O': '\x1b[F', # END
- 'R': '\x1b[2~', # INSERT
- 'S': '\x1b[3~', # DELETE
- 'I': '\x1b[5~', # PGUP
- 'Q': '\x1b[6~', # PGDN
- }
-
- def __init__(self):
- super(Console, self).__init__()
- self._saved_ocp = ctypes.windll.kernel32.GetConsoleOutputCP()
- self._saved_icp = ctypes.windll.kernel32.GetConsoleCP()
- ctypes.windll.kernel32.SetConsoleOutputCP(65001)
- ctypes.windll.kernel32.SetConsoleCP(65001)
- # ANSI handling available through SetConsoleMode since Windows 10 v1511
- # https://en.wikipedia.org/wiki/ANSI_escape_code#cite_note-win10th2-1
- if platform.release() == '10' and int(platform.version().split('.')[2]) > 10586:
- ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004
- import ctypes.wintypes as wintypes
- if not hasattr(wintypes, 'LPDWORD'): # PY2
- wintypes.LPDWORD = ctypes.POINTER(wintypes.DWORD)
- SetConsoleMode = ctypes.windll.kernel32.SetConsoleMode
- GetConsoleMode = ctypes.windll.kernel32.GetConsoleMode
- GetStdHandle = ctypes.windll.kernel32.GetStdHandle
- mode = wintypes.DWORD()
- GetConsoleMode(GetStdHandle(-11), ctypes.byref(mode))
- if (mode.value & ENABLE_VIRTUAL_TERMINAL_PROCESSING) == 0:
- SetConsoleMode(GetStdHandle(-11), mode.value | ENABLE_VIRTUAL_TERMINAL_PROCESSING)
- self._saved_cm = mode
- self.output = codecs.getwriter('UTF-8')(Out(sys.stdout.fileno()), 'replace')
- # the change of the code page is not propagated to Python, manually fix it
- sys.stderr = codecs.getwriter('UTF-8')(Out(sys.stderr.fileno()), 'replace')
- sys.stdout = self.output
- self.output.encoding = 'UTF-8' # needed for input
- def __del__(self):
- ctypes.windll.kernel32.SetConsoleOutputCP(self._saved_ocp)
- ctypes.windll.kernel32.SetConsoleCP(self._saved_icp)
- try:
- ctypes.windll.kernel32.SetConsoleMode(ctypes.windll.kernel32.GetStdHandle(-11), self._saved_cm)
- except AttributeError: # in case no _saved_cm
- pass
- def getkey(self):
- while True:
- z = msvcrt.getwch()
- if z == unichr(13):
- return unichr(10)
- elif z is unichr(0) or z is unichr(0xe0):
- try:
- code = msvcrt.getwch()
- if z is unichr(0):
- return self.fncodes[code]
- else:
- return self.navcodes[code]
- except KeyError:
- pass
- else:
- return z
- def cancel(self):
- # CancelIo, CancelSynchronousIo do not seem to work when using
- # getwch, so instead, send a key to the window with the console
- hwnd = ctypes.windll.kernel32.GetConsoleWindow()
- ctypes.windll.user32.PostMessageA(hwnd, 0x100, 0x0d, 0)
- elif os.name == 'posix':
- import atexit
- import termios
- import fcntl
- class Console(ConsoleBase):
- def __init__(self):
- super(Console, self).__init__()
- self.fd = sys.stdin.fileno()
- self.old = termios.tcgetattr(self.fd)
- atexit.register(self.cleanup)
- if sys.version_info < (3, 0):
- self.enc_stdin = codecs.getreader(sys.stdin.encoding)(sys.stdin)
- else:
- self.enc_stdin = sys.stdin
- def setup(self):
- new = termios.tcgetattr(self.fd)
- new[3] = new[3] & ~termios.ICANON & ~termios.ECHO & ~termios.ISIG
- new[6][termios.VMIN] = 1
- new[6][termios.VTIME] = 0
- termios.tcsetattr(self.fd, termios.TCSANOW, new)
- def getkey(self):
- c = self.enc_stdin.read(1)
- if c == unichr(0x7f):
- c = unichr(8) # map the BS key (which yields DEL) to backspace
- return c
- def cancel(self):
- fcntl.ioctl(self.fd, termios.TIOCSTI, b'\0')
- def cleanup(self):
- termios.tcsetattr(self.fd, termios.TCSAFLUSH, self.old)
- else:
- raise NotImplementedError(
- 'Sorry no implementation for your platform ({}) available.'.format(sys.platform))
- # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
- class Transform(object):
- """do-nothing: forward all data unchanged"""
- def rx(self, text):
- """text received from serial port"""
- return text
- def tx(self, text):
- """text to be sent to serial port"""
- return text
- def echo(self, text):
- """text to be sent but displayed on console"""
- return text
- class CRLF(Transform):
- """ENTER sends CR+LF"""
- def tx(self, text):
- return text.replace('\n', '\r\n')
- class CR(Transform):
- """ENTER sends CR"""
- def rx(self, text):
- return text.replace('\r', '\n')
- def tx(self, text):
- return text.replace('\n', '\r')
- class LF(Transform):
- """ENTER sends LF"""
- class NoTerminal(Transform):
- """remove typical terminal control codes from input"""
- REPLACEMENT_MAP = dict((x, 0x2400 + x) for x in range(32) if unichr(x) not in '\r\n\b\t')
- REPLACEMENT_MAP.update(
- {
- 0x7F: 0x2421, # DEL
- 0x9B: 0x2425, # CSI
- })
- def rx(self, text):
- return text.translate(self.REPLACEMENT_MAP)
- echo = rx
- class NoControls(NoTerminal):
- """Remove all control codes, incl. CR+LF"""
- REPLACEMENT_MAP = dict((x, 0x2400 + x) for x in range(32))
- REPLACEMENT_MAP.update(
- {
- 0x20: 0x2423, # visual space
- 0x7F: 0x2421, # DEL
- 0x9B: 0x2425, # CSI
- })
- class Printable(Transform):
- """Show decimal code for all non-ASCII characters and replace most control codes"""
- def rx(self, text):
- r = []
- for c in text:
- if ' ' <= c < '\x7f' or c in '\r\n\b\t':
- r.append(c)
- elif c < ' ':
- r.append(unichr(0x2400 + ord(c)))
- else:
- r.extend(unichr(0x2080 + ord(d) - 48) for d in '{:d}'.format(ord(c)))
- r.append(' ')
- return ''.join(r)
- echo = rx
- class Colorize(Transform):
- """Apply different colors for received and echo"""
- def __init__(self):
- # XXX make it configurable, use colorama?
- self.input_color = '\x1b[37m'
- self.echo_color = '\x1b[31m'
- def rx(self, text):
- return self.input_color + text
- def echo(self, text):
- return self.echo_color + text
- class DebugIO(Transform):
- """Print what is sent and received"""
- def rx(self, text):
- sys.stderr.write(' [RX:{!r}] '.format(text))
- sys.stderr.flush()
- return text
- def tx(self, text):
- sys.stderr.write(' [TX:{!r}] '.format(text))
- sys.stderr.flush()
- return text
- # other ideas:
- # - add date/time for each newline
- # - insert newline after: a) timeout b) packet end character
- EOL_TRANSFORMATIONS = {
- 'crlf': CRLF,
- 'cr': CR,
- 'lf': LF,
- }
- TRANSFORMATIONS = {
- 'direct': Transform, # no transformation
- 'default': NoTerminal,
- 'nocontrol': NoControls,
- 'printable': Printable,
- 'colorize': Colorize,
- 'debug': DebugIO,
- }
- # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
- def ask_for_port():
- """\
- Show a list of ports and ask the user for a choice. To make selection
- easier on systems with long device names, also allow the input of an
- index.
- """
- sys.stderr.write('\n--- Available ports:\n')
- ports = []
- for n, (port, desc, hwid) in enumerate(sorted(comports()), 1):
- sys.stderr.write('--- {:2}: {:20} {!r}\n'.format(n, port, desc))
- ports.append(port)
- while True:
- port = raw_input('--- Enter port index or full name: ')
- try:
- index = int(port) - 1
- if not 0 <= index < len(ports):
- sys.stderr.write('--- Invalid index!\n')
- continue
- except ValueError:
- pass
- else:
- port = ports[index]
- return port
- class Miniterm(object):
- """\
- Terminal application. Copy data from serial port to console and vice versa.
- Handle special keys from the console to show menu etc.
- """
- def __init__(self, serial_instance, echo=False, eol='crlf', filters=()):
- self.console = Console()
- self.serial = serial_instance
- self.echo = echo
- self.raw = False
- self.input_encoding = 'UTF-8'
- self.output_encoding = 'UTF-8'
- self.eol = eol
- self.filters = filters
- self.update_transformations()
- self.exit_character = unichr(0x1d) # GS/CTRL+]
- self.menu_character = unichr(0x14) # Menu: CTRL+T
- self.alive = None
- self._reader_alive = None
- self.receiver_thread = None
- self.rx_decoder = None
- self.tx_decoder = None
- def _start_reader(self):
- """Start reader thread"""
- self._reader_alive = True
- # start serial->console thread
- self.receiver_thread = threading.Thread(target=self.reader, name='rx')
- self.receiver_thread.daemon = True
- self.receiver_thread.start()
- def _stop_reader(self):
- """Stop reader thread only, wait for clean exit of thread"""
- self._reader_alive = False
- if hasattr(self.serial, 'cancel_read'):
- self.serial.cancel_read()
- self.receiver_thread.join()
- def start(self):
- """start worker threads"""
- self.alive = True
- self._start_reader()
- # enter console->serial loop
- self.transmitter_thread = threading.Thread(target=self.writer, name='tx')
- self.transmitter_thread.daemon = True
- self.transmitter_thread.start()
- self.console.setup()
- def stop(self):
- """set flag to stop worker threads"""
- self.alive = False
- def join(self, transmit_only=False):
- """wait for worker threads to terminate"""
- self.transmitter_thread.join()
- if not transmit_only:
- if hasattr(self.serial, 'cancel_read'):
- self.serial.cancel_read()
- self.receiver_thread.join()
- def close(self):
- self.serial.close()
- def update_transformations(self):
- """take list of transformation classes and instantiate them for rx and tx"""
- transformations = [EOL_TRANSFORMATIONS[self.eol]] + [TRANSFORMATIONS[f]
- for f in self.filters]
- self.tx_transformations = [t() for t in transformations]
- self.rx_transformations = list(reversed(self.tx_transformations))
- def set_rx_encoding(self, encoding, errors='replace'):
- """set encoding for received data"""
- self.input_encoding = encoding
- self.rx_decoder = codecs.getincrementaldecoder(encoding)(errors)
- def set_tx_encoding(self, encoding, errors='replace'):
- """set encoding for transmitted data"""
- self.output_encoding = encoding
- self.tx_encoder = codecs.getincrementalencoder(encoding)(errors)
- def dump_port_settings(self):
- """Write current settings to sys.stderr"""
- sys.stderr.write("\n--- Settings: {p.name} {p.baudrate},{p.bytesize},{p.parity},{p.stopbits}\n".format(
- p=self.serial))
- sys.stderr.write('--- RTS: {:8} DTR: {:8} BREAK: {:8}\n'.format(
- ('active' if self.serial.rts else 'inactive'),
- ('active' if self.serial.dtr else 'inactive'),
- ('active' if self.serial.break_condition else 'inactive')))
- try:
- sys.stderr.write('--- CTS: {:8} DSR: {:8} RI: {:8} CD: {:8}\n'.format(
- ('active' if self.serial.cts else 'inactive'),
- ('active' if self.serial.dsr else 'inactive'),
- ('active' if self.serial.ri else 'inactive'),
- ('active' if self.serial.cd else 'inactive')))
- except serial.SerialException:
- # on RFC 2217 ports, it can happen if no modem state notification was
- # yet received. ignore this error.
- pass
- sys.stderr.write('--- software flow control: {}\n'.format('active' if self.serial.xonxoff else 'inactive'))
- sys.stderr.write('--- hardware flow control: {}\n'.format('active' if self.serial.rtscts else 'inactive'))
- sys.stderr.write('--- serial input encoding: {}\n'.format(self.input_encoding))
- sys.stderr.write('--- serial output encoding: {}\n'.format(self.output_encoding))
- sys.stderr.write('--- EOL: {}\n'.format(self.eol.upper()))
- sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters)))
- def reader(self):
- """loop and copy serial->console"""
- try:
- while self.alive and self._reader_alive:
- # read all that is there or wait for one byte
- data = self.serial.read(self.serial.in_waiting or 1)
- if data:
- if self.raw:
- self.console.write_bytes(data)
- else:
- text = self.rx_decoder.decode(data)
- for transformation in self.rx_transformations:
- text = transformation.rx(text)
- self.console.write(text)
- except serial.SerialException:
- self.alive = False
- self.console.cancel()
- raise # XXX handle instead of re-raise?
- def writer(self):
- """\
- Loop and copy console->serial until self.exit_character character is
- found. When self.menu_character is found, interpret the next key
- locally.
- """
- menu_active = False
- try:
- while self.alive:
- try:
- c = self.console.getkey()
- except KeyboardInterrupt:
- c = '\x03'
- if not self.alive:
- break
- if menu_active:
- self.handle_menu_key(c)
- menu_active = False
- elif c == self.menu_character:
- menu_active = True # next char will be for menu
- elif c == self.exit_character:
- self.stop() # exit app
- break
- else:
- #~ if self.raw:
- text = c
- for transformation in self.tx_transformations:
- text = transformation.tx(text)
- self.serial.write(self.tx_encoder.encode(text))
- if self.echo:
- echo_text = c
- for transformation in self.tx_transformations:
- echo_text = transformation.echo(echo_text)
- self.console.write(echo_text)
- except:
- self.alive = False
- raise
- def handle_menu_key(self, c):
- """Implement a simple menu / settings"""
- if c == self.menu_character or c == self.exit_character:
- # Menu/exit character again -> send itself
- self.serial.write(self.tx_encoder.encode(c))
- if self.echo:
- self.console.write(c)
- elif c == '\x15': # CTRL+U -> upload file
- self.upload_file()
- elif c in '\x08hH?': # CTRL+H, h, H, ? -> Show help
- sys.stderr.write(self.get_help_text())
- elif c == '\x12': # CTRL+R -> Toggle RTS
- self.serial.rts = not self.serial.rts
- sys.stderr.write('--- RTS {} ---\n'.format('active' if self.serial.rts else 'inactive'))
- elif c == '\x04': # CTRL+D -> Toggle DTR
- self.serial.dtr = not self.serial.dtr
- sys.stderr.write('--- DTR {} ---\n'.format('active' if self.serial.dtr else 'inactive'))
- elif c == '\x02': # CTRL+B -> toggle BREAK condition
- self.serial.break_condition = not self.serial.break_condition
- sys.stderr.write('--- BREAK {} ---\n'.format('active' if self.serial.break_condition else 'inactive'))
- elif c == '\x05': # CTRL+E -> toggle local echo
- self.echo = not self.echo
- sys.stderr.write('--- local echo {} ---\n'.format('active' if self.echo else 'inactive'))
- elif c == '\x06': # CTRL+F -> edit filters
- self.change_filter()
- elif c == '\x0c': # CTRL+L -> EOL mode
- modes = list(EOL_TRANSFORMATIONS) # keys
- eol = modes.index(self.eol) + 1
- if eol >= len(modes):
- eol = 0
- self.eol = modes[eol]
- sys.stderr.write('--- EOL: {} ---\n'.format(self.eol.upper()))
- self.update_transformations()
- elif c == '\x01': # CTRL+A -> set encoding
- self.change_encoding()
- elif c == '\x09': # CTRL+I -> info
- self.dump_port_settings()
- #~ elif c == '\x01': # CTRL+A -> cycle escape mode
- #~ elif c == '\x0c': # CTRL+L -> cycle linefeed mode
- elif c in 'pP': # P -> change port
- self.change_port()
- elif c in 'zZ': # S -> suspend / open port temporarily
- self.suspend_port()
- elif c in 'bB': # B -> change baudrate
- self.change_baudrate()
- elif c == '8': # 8 -> change to 8 bits
- self.serial.bytesize = serial.EIGHTBITS
- self.dump_port_settings()
- elif c == '7': # 7 -> change to 8 bits
- self.serial.bytesize = serial.SEVENBITS
- self.dump_port_settings()
- elif c in 'eE': # E -> change to even parity
- self.serial.parity = serial.PARITY_EVEN
- self.dump_port_settings()
- elif c in 'oO': # O -> change to odd parity
- self.serial.parity = serial.PARITY_ODD
- self.dump_port_settings()
- elif c in 'mM': # M -> change to mark parity
- self.serial.parity = serial.PARITY_MARK
- self.dump_port_settings()
- elif c in 'sS': # S -> change to space parity
- self.serial.parity = serial.PARITY_SPACE
- self.dump_port_settings()
- elif c in 'nN': # N -> change to no parity
- self.serial.parity = serial.PARITY_NONE
- self.dump_port_settings()
- elif c == '1': # 1 -> change to 1 stop bits
- self.serial.stopbits = serial.STOPBITS_ONE
- self.dump_port_settings()
- elif c == '2': # 2 -> change to 2 stop bits
- self.serial.stopbits = serial.STOPBITS_TWO
- self.dump_port_settings()
- elif c == '3': # 3 -> change to 1.5 stop bits
- self.serial.stopbits = serial.STOPBITS_ONE_POINT_FIVE
- self.dump_port_settings()
- elif c in 'xX': # X -> change software flow control
- self.serial.xonxoff = (c == 'X')
- self.dump_port_settings()
- elif c in 'rR': # R -> change hardware flow control
- self.serial.rtscts = (c == 'R')
- self.dump_port_settings()
- elif c in 'qQ':
- self.stop() # Q -> exit app
- else:
- sys.stderr.write('--- unknown menu character {} --\n'.format(key_description(c)))
- def upload_file(self):
- """Ask user for filenname and send its contents"""
- sys.stderr.write('\n--- File to upload: ')
- sys.stderr.flush()
- with self.console:
- filename = sys.stdin.readline().rstrip('\r\n')
- if filename:
- try:
- with open(filename, 'rb') as f:
- sys.stderr.write('--- Sending file {} ---\n'.format(filename))
- while True:
- block = f.read(1024)
- if not block:
- break
- self.serial.write(block)
- # Wait for output buffer to drain.
- self.serial.flush()
- sys.stderr.write('.') # Progress indicator.
- sys.stderr.write('\n--- File {} sent ---\n'.format(filename))
- except IOError as e:
- sys.stderr.write('--- ERROR opening file {}: {} ---\n'.format(filename, e))
- def change_filter(self):
- """change the i/o transformations"""
- sys.stderr.write('\n--- Available Filters:\n')
- sys.stderr.write('\n'.join(
- '--- {:<10} = {.__doc__}'.format(k, v)
- for k, v in sorted(TRANSFORMATIONS.items())))
- sys.stderr.write('\n--- Enter new filter name(s) [{}]: '.format(' '.join(self.filters)))
- with self.console:
- new_filters = sys.stdin.readline().lower().split()
- if new_filters:
- for f in new_filters:
- if f not in TRANSFORMATIONS:
- sys.stderr.write('--- unknown filter: {!r}\n'.format(f))
- break
- else:
- self.filters = new_filters
- self.update_transformations()
- sys.stderr.write('--- filters: {}\n'.format(' '.join(self.filters)))
- def change_encoding(self):
- """change encoding on the serial port"""
- sys.stderr.write('\n--- Enter new encoding name [{}]: '.format(self.input_encoding))
- with self.console:
- new_encoding = sys.stdin.readline().strip()
- if new_encoding:
- try:
- codecs.lookup(new_encoding)
- except LookupError:
- sys.stderr.write('--- invalid encoding name: {}\n'.format(new_encoding))
- else:
- self.set_rx_encoding(new_encoding)
- self.set_tx_encoding(new_encoding)
- sys.stderr.write('--- serial input encoding: {}\n'.format(self.input_encoding))
- sys.stderr.write('--- serial output encoding: {}\n'.format(self.output_encoding))
- def change_baudrate(self):
- """change the baudrate"""
- sys.stderr.write('\n--- Baudrate: ')
- sys.stderr.flush()
- with self.console:
- backup = self.serial.baudrate
- try:
- self.serial.baudrate = int(sys.stdin.readline().strip())
- except ValueError as e:
- sys.stderr.write('--- ERROR setting baudrate: {} ---\n'.format(e))
- self.serial.baudrate = backup
- else:
- self.dump_port_settings()
- def change_port(self):
- """Have a conversation with the user to change the serial port"""
- with self.console:
- try:
- port = ask_for_port()
- except KeyboardInterrupt:
- port = None
- if port and port != self.serial.port:
- # reader thread needs to be shut down
- self._stop_reader()
- # save settings
- settings = self.serial.getSettingsDict()
- try:
- new_serial = serial.serial_for_url(port, do_not_open=True)
- # restore settings and open
- new_serial.applySettingsDict(settings)
- new_serial.rts = self.serial.rts
- new_serial.dtr = self.serial.dtr
- new_serial.open()
- new_serial.break_condition = self.serial.break_condition
- except Exception as e:
- sys.stderr.write('--- ERROR opening new port: {} ---\n'.format(e))
- new_serial.close()
- else:
- self.serial.close()
- self.serial = new_serial
- sys.stderr.write('--- Port changed to: {} ---\n'.format(self.serial.port))
- # and restart the reader thread
- self._start_reader()
- def suspend_port(self):
- """\
- open port temporarily, allow reconnect, exit and port change to get
- out of the loop
- """
- # reader thread needs to be shut down
- self._stop_reader()
- self.serial.close()
- sys.stderr.write('\n--- Port closed: {} ---\n'.format(self.serial.port))
- do_change_port = False
- while not self.serial.is_open:
- sys.stderr.write('--- Quit: {exit} | p: port change | any other key to reconnect ---\n'.format(
- exit=key_description(self.exit_character)))
- k = self.console.getkey()
- if k == self.exit_character:
- self.stop() # exit app
- break
- elif k in 'pP':
- do_change_port = True
- break
- try:
- self.serial.open()
- except Exception as e:
- sys.stderr.write('--- ERROR opening port: {} ---\n'.format(e))
- if do_change_port:
- self.change_port()
- else:
- # and restart the reader thread
- self._start_reader()
- sys.stderr.write('--- Port opened: {} ---\n'.format(self.serial.port))
- def get_help_text(self):
- """return the help text"""
- # help text, starts with blank line!
- return """
- --- pySerial ({version}) - miniterm - help
- ---
- --- {exit:8} Exit program (alias {menu} Q)
- --- {menu:8} Menu escape key, followed by:
- --- Menu keys:
- --- {menu:7} Send the menu character itself to remote
- --- {exit:7} Send the exit character itself to remote
- --- {info:7} Show info
- --- {upload:7} Upload file (prompt will be shown)
- --- {repr:7} encoding
- --- {filter:7} edit filters
- --- Toggles:
- --- {rts:7} RTS {dtr:7} DTR {brk:7} BREAK
- --- {echo:7} echo {eol:7} EOL
- ---
- --- Port settings ({menu} followed by the following):
- --- p change port
- --- 7 8 set data bits
- --- N E O S M change parity (None, Even, Odd, Space, Mark)
- --- 1 2 3 set stop bits (1, 2, 1.5)
- --- b change baud rate
- --- x X disable/enable software flow control
- --- r R disable/enable hardware flow control
- """.format(version=getattr(serial, 'VERSION', 'unknown version'),
- exit=key_description(self.exit_character),
- menu=key_description(self.menu_character),
- rts=key_description('\x12'),
- dtr=key_description('\x04'),
- brk=key_description('\x02'),
- echo=key_description('\x05'),
- info=key_description('\x09'),
- upload=key_description('\x15'),
- repr=key_description('\x01'),
- filter=key_description('\x06'),
- eol=key_description('\x0c'))
- # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
- # default args can be used to override when calling main() from an other script
- # e.g to create a miniterm-my-device.py
- def main(default_port=None, default_baudrate=9600, default_rts=None, default_dtr=None):
- """Command line tool, entry point"""
- import argparse
- parser = argparse.ArgumentParser(
- description='Miniterm - A simple terminal program for the serial port.')
- parser.add_argument(
- 'port',
- nargs='?',
- help='serial port name ("-" to show port list)',
- default=default_port)
- parser.add_argument(
- 'baudrate',
- nargs='?',
- type=int,
- help='set baud rate, default: %(default)s',
- default=default_baudrate)
- group = parser.add_argument_group('port settings')
- group.add_argument(
- '--parity',
- choices=['N', 'E', 'O', 'S', 'M'],
- type=lambda c: c.upper(),
- help='set parity, one of {N E O S M}, default: N',
- default='N')
- group.add_argument(
- '--rtscts',
- action='store_true',
- help='enable RTS/CTS flow control (default off)',
- default=False)
- group.add_argument(
- '--xonxoff',
- action='store_true',
- help='enable software flow control (default off)',
- default=False)
- group.add_argument(
- '--rts',
- type=int,
- help='set initial RTS line state (possible values: 0, 1)',
- default=default_rts)
- group.add_argument(
- '--dtr',
- type=int,
- help='set initial DTR line state (possible values: 0, 1)',
- default=default_dtr)
- group.add_argument(
- '--non-exclusive',
- dest='exclusive',
- action='store_false',
- help='disable locking for native ports',
- default=True)
- group.add_argument(
- '--ask',
- action='store_true',
- help='ask again for port when open fails',
- default=False)
- group = parser.add_argument_group('data handling')
- group.add_argument(
- '-e', '--echo',
- action='store_true',
- help='enable local echo (default off)',
- default=False)
- group.add_argument(
- '--encoding',
- dest='serial_port_encoding',
- metavar='CODEC',
- help='set the encoding for the serial port (e.g. hexlify, Latin1, UTF-8), default: %(default)s',
- default='UTF-8')
- group.add_argument(
- '-f', '--filter',
- action='append',
- metavar='NAME',
- help='add text transformation',
- default=[])
- group.add_argument(
- '--eol',
- choices=['CR', 'LF', 'CRLF'],
- type=lambda c: c.upper(),
- help='end of line mode',
- default='CRLF')
- group.add_argument(
- '--raw',
- action='store_true',
- help='Do no apply any encodings/transformations',
- default=False)
- group = parser.add_argument_group('hotkeys')
- group.add_argument(
- '--exit-char',
- type=int,
- metavar='NUM',
- help='Unicode of special character that is used to exit the application, default: %(default)s',
- default=0x1d) # GS/CTRL+]
- group.add_argument(
- '--menu-char',
- type=int,
- metavar='NUM',
- help='Unicode code of special character that is used to control miniterm (menu), default: %(default)s',
- default=0x14) # Menu: CTRL+T
- group = parser.add_argument_group('diagnostics')
- group.add_argument(
- '-q', '--quiet',
- action='store_true',
- help='suppress non-error messages',
- default=False)
- group.add_argument(
- '--develop',
- action='store_true',
- help='show Python traceback on error',
- default=False)
- args = parser.parse_args()
- if args.menu_char == args.exit_char:
- parser.error('--exit-char can not be the same as --menu-char')
- if args.filter:
- if 'help' in args.filter:
- sys.stderr.write('Available filters:\n')
- sys.stderr.write('\n'.join(
- '{:<10} = {.__doc__}'.format(k, v)
- for k, v in sorted(TRANSFORMATIONS.items())))
- sys.stderr.write('\n')
- sys.exit(1)
- filters = args.filter
- else:
- filters = ['default']
- while True:
- # no port given on command line -> ask user now
- if args.port is None or args.port == '-':
- try:
- args.port = ask_for_port()
- except KeyboardInterrupt:
- sys.stderr.write('\n')
- parser.error('user aborted and port is not given')
- else:
- if not args.port:
- parser.error('port is not given')
- try:
- serial_instance = serial.serial_for_url(
- args.port,
- args.baudrate,
- parity=args.parity,
- rtscts=args.rtscts,
- xonxoff=args.xonxoff,
- do_not_open=True)
- if not hasattr(serial_instance, 'cancel_read'):
- # enable timeout for alive flag polling if cancel_read is not available
- serial_instance.timeout = 1
- if args.dtr is not None:
- if not args.quiet:
- sys.stderr.write('--- forcing DTR {}\n'.format('active' if args.dtr else 'inactive'))
- serial_instance.dtr = args.dtr
- if args.rts is not None:
- if not args.quiet:
- sys.stderr.write('--- forcing RTS {}\n'.format('active' if args.rts else 'inactive'))
- serial_instance.rts = args.rts
- if isinstance(serial_instance, serial.Serial):
- serial_instance.exclusive = args.exclusive
- serial_instance.open()
- except serial.SerialException as e:
- sys.stderr.write('could not open port {!r}: {}\n'.format(args.port, e))
- if args.develop:
- raise
- if not args.ask:
- sys.exit(1)
- else:
- args.port = '-'
- else:
- break
- miniterm = Miniterm(
- serial_instance,
- echo=args.echo,
- eol=args.eol.lower(),
- filters=filters)
- miniterm.exit_character = unichr(args.exit_char)
- miniterm.menu_character = unichr(args.menu_char)
- miniterm.raw = args.raw
- miniterm.set_rx_encoding(args.serial_port_encoding)
- miniterm.set_tx_encoding(args.serial_port_encoding)
- if not args.quiet:
- sys.stderr.write('--- Miniterm on {p.name} {p.baudrate},{p.bytesize},{p.parity},{p.stopbits} ---\n'.format(
- p=miniterm.serial))
- sys.stderr.write('--- Quit: {} | Menu: {} | Help: {} followed by {} ---\n'.format(
- key_description(miniterm.exit_character),
- key_description(miniterm.menu_character),
- key_description(miniterm.menu_character),
- key_description('\x08')))
- miniterm.start()
- try:
- miniterm.join(True)
- except KeyboardInterrupt:
- pass
- if not args.quiet:
- sys.stderr.write('\n--- exit ---\n')
- miniterm.join()
- miniterm.close()
- # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
- if __name__ == '__main__':
- main()
|