123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297 |
- #!/usr/bin/env python3
- #
- # Working with threading and pySerial
- #
- # This file is part of pySerial. https://github.com/pyserial/pyserial
- # (C) 2015-2016 Chris Liechti <cliechti@gmx.net>
- #
- # SPDX-License-Identifier: BSD-3-Clause
- """\
- Support threading with serial ports.
- """
- from __future__ import absolute_import
- import serial
- import threading
- class Protocol(object):
- """\
- Protocol as used by the ReaderThread. This base class provides empty
- implementations of all methods.
- """
- def connection_made(self, transport):
- """Called when reader thread is started"""
- def data_received(self, data):
- """Called with snippets received from the serial port"""
- def connection_lost(self, exc):
- """\
- Called when the serial port is closed or the reader loop terminated
- otherwise.
- """
- if isinstance(exc, Exception):
- raise exc
- class Packetizer(Protocol):
- """
- Read binary packets from serial port. Packets are expected to be terminated
- with a TERMINATOR byte (null byte by default).
- The class also keeps track of the transport.
- """
- TERMINATOR = b'\0'
- def __init__(self):
- self.buffer = bytearray()
- self.transport = None
- def connection_made(self, transport):
- """Store transport"""
- self.transport = transport
- def connection_lost(self, exc):
- """Forget transport"""
- self.transport = None
- super(Packetizer, self).connection_lost(exc)
- def data_received(self, data):
- """Buffer received data, find TERMINATOR, call handle_packet"""
- self.buffer.extend(data)
- while self.TERMINATOR in self.buffer:
- packet, self.buffer = self.buffer.split(self.TERMINATOR, 1)
- self.handle_packet(packet)
- def handle_packet(self, packet):
- """Process packets - to be overridden by subclassing"""
- raise NotImplementedError('please implement functionality in handle_packet')
- class FramedPacket(Protocol):
- """
- Read binary packets. Packets are expected to have a start and stop marker.
- The class also keeps track of the transport.
- """
- START = b'('
- STOP = b')'
- def __init__(self):
- self.packet = bytearray()
- self.in_packet = False
- self.transport = None
- def connection_made(self, transport):
- """Store transport"""
- self.transport = transport
- def connection_lost(self, exc):
- """Forget transport"""
- self.transport = None
- self.in_packet = False
- del self.packet[:]
- super(FramedPacket, self).connection_lost(exc)
- def data_received(self, data):
- """Find data enclosed in START/STOP, call handle_packet"""
- for byte in serial.iterbytes(data):
- if byte == self.START:
- self.in_packet = True
- elif byte == self.STOP:
- self.in_packet = False
- self.handle_packet(bytes(self.packet)) # make read-only copy
- del self.packet[:]
- elif self.in_packet:
- self.packet.extend(byte)
- else:
- self.handle_out_of_packet_data(byte)
- def handle_packet(self, packet):
- """Process packets - to be overridden by subclassing"""
- raise NotImplementedError('please implement functionality in handle_packet')
- def handle_out_of_packet_data(self, data):
- """Process data that is received outside of packets"""
- pass
- class LineReader(Packetizer):
- """
- Read and write (Unicode) lines from/to serial port.
- The encoding is applied.
- """
- TERMINATOR = b'\r\n'
- ENCODING = 'utf-8'
- UNICODE_HANDLING = 'replace'
- def handle_packet(self, packet):
- self.handle_line(packet.decode(self.ENCODING, self.UNICODE_HANDLING))
- def handle_line(self, line):
- """Process one line - to be overridden by subclassing"""
- raise NotImplementedError('please implement functionality in handle_line')
- def write_line(self, text):
- """
- Write text to the transport. ``text`` is a Unicode string and the encoding
- is applied before sending ans also the newline is append.
- """
- # + is not the best choice but bytes does not support % or .format in py3 and we want a single write call
- self.transport.write(text.encode(self.ENCODING, self.UNICODE_HANDLING) + self.TERMINATOR)
- class ReaderThread(threading.Thread):
- """\
- Implement a serial port read loop and dispatch to a Protocol instance (like
- the asyncio.Protocol) but do it with threads.
- Calls to close() will close the serial port but it is also possible to just
- stop() this thread and continue the serial port instance otherwise.
- """
- def __init__(self, serial_instance, protocol_factory):
- """\
- Initialize thread.
- Note that the serial_instance' timeout is set to one second!
- Other settings are not changed.
- """
- super(ReaderThread, self).__init__()
- self.daemon = True
- self.serial = serial_instance
- self.protocol_factory = protocol_factory
- self.alive = True
- self._lock = threading.Lock()
- self._connection_made = threading.Event()
- self.protocol = None
- def stop(self):
- """Stop the reader thread"""
- self.alive = False
- if hasattr(self.serial, 'cancel_read'):
- self.serial.cancel_read()
- self.join(2)
- def run(self):
- """Reader loop"""
- if not hasattr(self.serial, 'cancel_read'):
- self.serial.timeout = 1
- self.protocol = self.protocol_factory()
- try:
- self.protocol.connection_made(self)
- except Exception as e:
- self.alive = False
- self.protocol.connection_lost(e)
- self._connection_made.set()
- return
- error = None
- self._connection_made.set()
- while self.alive and self.serial.is_open:
- try:
- # read all that is there or wait for one byte (blocking)
- data = self.serial.read(self.serial.in_waiting or 1)
- except serial.SerialException as e:
- # probably some I/O problem such as disconnected USB serial
- # adapters -> exit
- error = e
- break
- else:
- if data:
- # make a separated try-except for called user code
- try:
- self.protocol.data_received(data)
- except Exception as e:
- error = e
- break
- self.alive = False
- self.protocol.connection_lost(error)
- self.protocol = None
- def write(self, data):
- """Thread safe writing (uses lock)"""
- with self._lock:
- return self.serial.write(data)
- def close(self):
- """Close the serial port and exit reader thread (uses lock)"""
- # use the lock to let other threads finish writing
- with self._lock:
- # first stop reading, so that closing can be done on idle port
- self.stop()
- self.serial.close()
- def connect(self):
- """
- Wait until connection is set up and return the transport and protocol
- instances.
- """
- if self.alive:
- self._connection_made.wait()
- if not self.alive:
- raise RuntimeError('connection_lost already called')
- return (self, self.protocol)
- else:
- raise RuntimeError('already stopped')
- # - - context manager, returns protocol
- def __enter__(self):
- """\
- Enter context handler. May raise RuntimeError in case the connection
- could not be created.
- """
- self.start()
- self._connection_made.wait()
- if not self.alive:
- raise RuntimeError('connection_lost already called')
- return self.protocol
- def __exit__(self, exc_type, exc_val, exc_tb):
- """Leave context: close port"""
- self.close()
- # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
- # test
- if __name__ == '__main__':
- # pylint: disable=wrong-import-position
- import sys
- import time
- import traceback
- #~ PORT = 'spy:///dev/ttyUSB0'
- PORT = 'loop://'
- class PrintLines(LineReader):
- def connection_made(self, transport):
- super(PrintLines, self).connection_made(transport)
- sys.stdout.write('port opened\n')
- self.write_line('hello world')
- def handle_line(self, data):
- sys.stdout.write('line received: {!r}\n'.format(data))
- def connection_lost(self, exc):
- if exc:
- traceback.print_exc(exc)
- sys.stdout.write('port closed\n')
- ser = serial.serial_for_url(PORT, baudrate=115200, timeout=1)
- with ReaderThread(ser, PrintLines) as protocol:
- protocol.write_line('hello')
- time.sleep(2)
- # alternative usage
- ser = serial.serial_for_url(PORT, baudrate=115200, timeout=1)
- t = ReaderThread(ser, PrintLines)
- t.start()
- transport, protocol = t.connect()
- protocol.write_line('hello')
- time.sleep(2)
- t.close()
|