123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900 |
- #!/usr/bin/env python
- #
- # backend for serial IO for POSIX compatible systems, like Linux, OSX
- #
- # This file is part of pySerial. https://github.com/pyserial/pyserial
- # (C) 2001-2020 Chris Liechti <cliechti@gmx.net>
- #
- # SPDX-License-Identifier: BSD-3-Clause
- #
- # parts based on code from Grant B. Edwards <grante@visi.com>:
- # ftp://ftp.visi.com/users/grante/python/PosixSerial.py
- #
- # references: http://www.easysw.com/~mike/serial/serial.html
- # Collection of port names (was previously used by number_to_device which was
- # removed.
- # - Linux /dev/ttyS%d (confirmed)
- # - cygwin/win32 /dev/com%d (confirmed)
- # - openbsd (OpenBSD) /dev/cua%02d
- # - bsd*, freebsd* /dev/cuad%d
- # - darwin (OS X) /dev/cuad%d
- # - netbsd /dev/dty%02d (NetBSD 1.6 testing by Erk)
- # - irix (IRIX) /dev/ttyf%d (partially tested) names depending on flow control
- # - hp (HP-UX) /dev/tty%dp0 (not tested)
- # - sunos (Solaris/SunOS) /dev/tty%c (letters, 'a'..'z') (confirmed)
- # - aix (AIX) /dev/tty%d
- from __future__ import absolute_import
- # pylint: disable=abstract-method
- import errno
- import fcntl
- import os
- import select
- import struct
- import sys
- import termios
- import serial
- from serial.serialutil import SerialBase, SerialException, to_bytes, \
- PortNotOpenError, SerialTimeoutException, Timeout
- class PlatformSpecificBase(object):
- BAUDRATE_CONSTANTS = {}
- def _set_special_baudrate(self, baudrate):
- raise NotImplementedError('non-standard baudrates are not supported on this platform')
- def _set_rs485_mode(self, rs485_settings):
- raise NotImplementedError('RS485 not supported on this platform')
- def set_low_latency_mode(self, low_latency_settings):
- raise NotImplementedError('Low latency not supported on this platform')
- def _update_break_state(self):
- """\
- Set break: Controls TXD. When active, no transmitting is possible.
- """
- if self._break_state:
- fcntl.ioctl(self.fd, TIOCSBRK)
- else:
- fcntl.ioctl(self.fd, TIOCCBRK)
-
- # some systems support an extra flag to enable the two in POSIX unsupported
- # paritiy settings for MARK and SPACE
- CMSPAR = 0 # default, for unsupported platforms, override below
- # try to detect the OS so that a device can be selected...
- # this code block should supply a device() and set_special_baudrate() function
- # for the platform
- plat = sys.platform.lower()
- if plat[:5] == 'linux': # Linux (confirmed) # noqa
- import array
- # extra termios flags
- CMSPAR = 0o10000000000 # Use "stick" (mark/space) parity
- # baudrate ioctls
- TCGETS2 = 0x802C542A
- TCSETS2 = 0x402C542B
- BOTHER = 0o010000
- # RS485 ioctls
- TIOCGRS485 = 0x542E
- TIOCSRS485 = 0x542F
- SER_RS485_ENABLED = 0b00000001
- SER_RS485_RTS_ON_SEND = 0b00000010
- SER_RS485_RTS_AFTER_SEND = 0b00000100
- SER_RS485_RX_DURING_TX = 0b00010000
- class PlatformSpecific(PlatformSpecificBase):
- BAUDRATE_CONSTANTS = {
- 0: 0o000000, # hang up
- 50: 0o000001,
- 75: 0o000002,
- 110: 0o000003,
- 134: 0o000004,
- 150: 0o000005,
- 200: 0o000006,
- 300: 0o000007,
- 600: 0o000010,
- 1200: 0o000011,
- 1800: 0o000012,
- 2400: 0o000013,
- 4800: 0o000014,
- 9600: 0o000015,
- 19200: 0o000016,
- 38400: 0o000017,
- 57600: 0o010001,
- 115200: 0o010002,
- 230400: 0o010003,
- 460800: 0o010004,
- 500000: 0o010005,
- 576000: 0o010006,
- 921600: 0o010007,
- 1000000: 0o010010,
- 1152000: 0o010011,
- 1500000: 0o010012,
- 2000000: 0o010013,
- 2500000: 0o010014,
- 3000000: 0o010015,
- 3500000: 0o010016,
- 4000000: 0o010017
- }
- def set_low_latency_mode(self, low_latency_settings):
- buf = array.array('i', [0] * 32)
- try:
- # get serial_struct
- fcntl.ioctl(self.fd, termios.TIOCGSERIAL, buf)
- # set or unset ASYNC_LOW_LATENCY flag
- if low_latency_settings:
- buf[4] |= 0x2000
- else:
- buf[4] &= ~0x2000
- # set serial_struct
- fcntl.ioctl(self.fd, termios.TIOCSSERIAL, buf)
- except IOError as e:
- raise ValueError('Failed to update ASYNC_LOW_LATENCY flag to {}: {}'.format(low_latency_settings, e))
- def _set_special_baudrate(self, baudrate):
- # right size is 44 on x86_64, allow for some growth
- buf = array.array('i', [0] * 64)
- try:
- # get serial_struct
- fcntl.ioctl(self.fd, TCGETS2, buf)
- # set custom speed
- buf[2] &= ~termios.CBAUD
- buf[2] |= BOTHER
- buf[9] = buf[10] = baudrate
- # set serial_struct
- fcntl.ioctl(self.fd, TCSETS2, buf)
- except IOError as e:
- raise ValueError('Failed to set custom baud rate ({}): {}'.format(baudrate, e))
- def _set_rs485_mode(self, rs485_settings):
- buf = array.array('i', [0] * 8) # flags, delaytx, delayrx, padding
- try:
- fcntl.ioctl(self.fd, TIOCGRS485, buf)
- buf[0] |= SER_RS485_ENABLED
- if rs485_settings is not None:
- if rs485_settings.loopback:
- buf[0] |= SER_RS485_RX_DURING_TX
- else:
- buf[0] &= ~SER_RS485_RX_DURING_TX
- if rs485_settings.rts_level_for_tx:
- buf[0] |= SER_RS485_RTS_ON_SEND
- else:
- buf[0] &= ~SER_RS485_RTS_ON_SEND
- if rs485_settings.rts_level_for_rx:
- buf[0] |= SER_RS485_RTS_AFTER_SEND
- else:
- buf[0] &= ~SER_RS485_RTS_AFTER_SEND
- if rs485_settings.delay_before_tx is not None:
- buf[1] = int(rs485_settings.delay_before_tx * 1000)
- if rs485_settings.delay_before_rx is not None:
- buf[2] = int(rs485_settings.delay_before_rx * 1000)
- else:
- buf[0] = 0 # clear SER_RS485_ENABLED
- fcntl.ioctl(self.fd, TIOCSRS485, buf)
- except IOError as e:
- raise ValueError('Failed to set RS485 mode: {}'.format(e))
- elif plat == 'cygwin': # cygwin/win32 (confirmed)
- class PlatformSpecific(PlatformSpecificBase):
- BAUDRATE_CONSTANTS = {
- 128000: 0x01003,
- 256000: 0x01005,
- 500000: 0x01007,
- 576000: 0x01008,
- 921600: 0x01009,
- 1000000: 0x0100a,
- 1152000: 0x0100b,
- 1500000: 0x0100c,
- 2000000: 0x0100d,
- 2500000: 0x0100e,
- 3000000: 0x0100f
- }
- elif plat[:6] == 'darwin': # OS X
- import array
- IOSSIOSPEED = 0x80045402 # _IOW('T', 2, speed_t)
- class PlatformSpecific(PlatformSpecificBase):
- osx_version = os.uname()[2].split('.')
- TIOCSBRK = 0x2000747B # _IO('t', 123)
- TIOCCBRK = 0x2000747A # _IO('t', 122)
- # Tiger or above can support arbitrary serial speeds
- if int(osx_version[0]) >= 8:
- def _set_special_baudrate(self, baudrate):
- # use IOKit-specific call to set up high speeds
- buf = array.array('i', [baudrate])
- fcntl.ioctl(self.fd, IOSSIOSPEED, buf, 1)
- def _update_break_state(self):
- """\
- Set break: Controls TXD. When active, no transmitting is possible.
- """
- if self._break_state:
- fcntl.ioctl(self.fd, PlatformSpecific.TIOCSBRK)
- else:
- fcntl.ioctl(self.fd, PlatformSpecific.TIOCCBRK)
- elif plat[:3] == 'bsd' or \
- plat[:7] == 'freebsd' or \
- plat[:6] == 'netbsd' or \
- plat[:7] == 'openbsd':
- class ReturnBaudrate(object):
- def __getitem__(self, key):
- return key
- class PlatformSpecific(PlatformSpecificBase):
- # Only tested on FreeBSD:
- # The baud rate may be passed in as
- # a literal value.
- BAUDRATE_CONSTANTS = ReturnBaudrate()
- TIOCSBRK = 0x2000747B # _IO('t', 123)
- TIOCCBRK = 0x2000747A # _IO('t', 122)
-
- def _update_break_state(self):
- """\
- Set break: Controls TXD. When active, no transmitting is possible.
- """
- if self._break_state:
- fcntl.ioctl(self.fd, PlatformSpecific.TIOCSBRK)
- else:
- fcntl.ioctl(self.fd, PlatformSpecific.TIOCCBRK)
- else:
- class PlatformSpecific(PlatformSpecificBase):
- pass
- # load some constants for later use.
- # try to use values from termios, use defaults from linux otherwise
- TIOCMGET = getattr(termios, 'TIOCMGET', 0x5415)
- TIOCMBIS = getattr(termios, 'TIOCMBIS', 0x5416)
- TIOCMBIC = getattr(termios, 'TIOCMBIC', 0x5417)
- TIOCMSET = getattr(termios, 'TIOCMSET', 0x5418)
- # TIOCM_LE = getattr(termios, 'TIOCM_LE', 0x001)
- TIOCM_DTR = getattr(termios, 'TIOCM_DTR', 0x002)
- TIOCM_RTS = getattr(termios, 'TIOCM_RTS', 0x004)
- # TIOCM_ST = getattr(termios, 'TIOCM_ST', 0x008)
- # TIOCM_SR = getattr(termios, 'TIOCM_SR', 0x010)
- TIOCM_CTS = getattr(termios, 'TIOCM_CTS', 0x020)
- TIOCM_CAR = getattr(termios, 'TIOCM_CAR', 0x040)
- TIOCM_RNG = getattr(termios, 'TIOCM_RNG', 0x080)
- TIOCM_DSR = getattr(termios, 'TIOCM_DSR', 0x100)
- TIOCM_CD = getattr(termios, 'TIOCM_CD', TIOCM_CAR)
- TIOCM_RI = getattr(termios, 'TIOCM_RI', TIOCM_RNG)
- # TIOCM_OUT1 = getattr(termios, 'TIOCM_OUT1', 0x2000)
- # TIOCM_OUT2 = getattr(termios, 'TIOCM_OUT2', 0x4000)
- if hasattr(termios, 'TIOCINQ'):
- TIOCINQ = termios.TIOCINQ
- else:
- TIOCINQ = getattr(termios, 'FIONREAD', 0x541B)
- TIOCOUTQ = getattr(termios, 'TIOCOUTQ', 0x5411)
- TIOCM_zero_str = struct.pack('I', 0)
- TIOCM_RTS_str = struct.pack('I', TIOCM_RTS)
- TIOCM_DTR_str = struct.pack('I', TIOCM_DTR)
- TIOCSBRK = getattr(termios, 'TIOCSBRK', 0x5427)
- TIOCCBRK = getattr(termios, 'TIOCCBRK', 0x5428)
- class Serial(SerialBase, PlatformSpecific):
- """\
- Serial port class POSIX implementation. Serial port configuration is
- done with termios and fcntl. Runs on Linux and many other Un*x like
- systems.
- """
- def open(self):
- """\
- Open port with current settings. This may throw a SerialException
- if the port cannot be opened."""
- if self._port is None:
- raise SerialException("Port must be configured before it can be used.")
- if self.is_open:
- raise SerialException("Port is already open.")
- self.fd = None
- # open
- try:
- self.fd = os.open(self.portstr, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK)
- except OSError as msg:
- self.fd = None
- raise SerialException(msg.errno, "could not open port {}: {}".format(self._port, msg))
- #~ fcntl.fcntl(self.fd, fcntl.F_SETFL, 0) # set blocking
- self.pipe_abort_read_r, self.pipe_abort_read_w = None, None
- self.pipe_abort_write_r, self.pipe_abort_write_w = None, None
- try:
- self._reconfigure_port(force_update=True)
- try:
- if not self._dsrdtr:
- self._update_dtr_state()
- if not self._rtscts:
- self._update_rts_state()
- except IOError as e:
- # ignore Invalid argument and Inappropriate ioctl
- if e.errno not in (errno.EINVAL, errno.ENOTTY):
- raise
- self._reset_input_buffer()
- self.pipe_abort_read_r, self.pipe_abort_read_w = os.pipe()
- self.pipe_abort_write_r, self.pipe_abort_write_w = os.pipe()
- fcntl.fcntl(self.pipe_abort_read_r, fcntl.F_SETFL, os.O_NONBLOCK)
- fcntl.fcntl(self.pipe_abort_write_r, fcntl.F_SETFL, os.O_NONBLOCK)
- except BaseException:
- try:
- os.close(self.fd)
- except Exception:
- # ignore any exception when closing the port
- # also to keep original exception that happened when setting up
- pass
- self.fd = None
- if self.pipe_abort_read_w is not None:
- os.close(self.pipe_abort_read_w)
- self.pipe_abort_read_w = None
- if self.pipe_abort_read_r is not None:
- os.close(self.pipe_abort_read_r)
- self.pipe_abort_read_r = None
- if self.pipe_abort_write_w is not None:
- os.close(self.pipe_abort_write_w)
- self.pipe_abort_write_w = None
- if self.pipe_abort_write_r is not None:
- os.close(self.pipe_abort_write_r)
- self.pipe_abort_write_r = None
- raise
- self.is_open = True
- def _reconfigure_port(self, force_update=False):
- """Set communication parameters on opened port."""
- if self.fd is None:
- raise SerialException("Can only operate on a valid file descriptor")
- # if exclusive lock is requested, create it before we modify anything else
- if self._exclusive is not None:
- if self._exclusive:
- try:
- fcntl.flock(self.fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
- except IOError as msg:
- raise SerialException(msg.errno, "Could not exclusively lock port {}: {}".format(self._port, msg))
- else:
- fcntl.flock(self.fd, fcntl.LOCK_UN)
- custom_baud = None
- vmin = vtime = 0 # timeout is done via select
- if self._inter_byte_timeout is not None:
- vmin = 1
- vtime = int(self._inter_byte_timeout * 10)
- try:
- orig_attr = termios.tcgetattr(self.fd)
- iflag, oflag, cflag, lflag, ispeed, ospeed, cc = orig_attr
- except termios.error as msg: # if a port is nonexistent but has a /dev file, it'll fail here
- raise SerialException("Could not configure port: {}".format(msg))
- # set up raw mode / no echo / binary
- cflag |= (termios.CLOCAL | termios.CREAD)
- lflag &= ~(termios.ICANON | termios.ECHO | termios.ECHOE |
- termios.ECHOK | termios.ECHONL |
- termios.ISIG | termios.IEXTEN) # |termios.ECHOPRT
- for flag in ('ECHOCTL', 'ECHOKE'): # netbsd workaround for Erk
- if hasattr(termios, flag):
- lflag &= ~getattr(termios, flag)
- oflag &= ~(termios.OPOST | termios.ONLCR | termios.OCRNL)
- iflag &= ~(termios.INLCR | termios.IGNCR | termios.ICRNL | termios.IGNBRK)
- if hasattr(termios, 'IUCLC'):
- iflag &= ~termios.IUCLC
- if hasattr(termios, 'PARMRK'):
- iflag &= ~termios.PARMRK
- # setup baud rate
- try:
- ispeed = ospeed = getattr(termios, 'B{}'.format(self._baudrate))
- except AttributeError:
- try:
- ispeed = ospeed = self.BAUDRATE_CONSTANTS[self._baudrate]
- except KeyError:
- #~ raise ValueError('Invalid baud rate: %r' % self._baudrate)
- # See if BOTHER is defined for this platform; if it is, use
- # this for a speed not defined in the baudrate constants list.
- try:
- ispeed = ospeed = BOTHER
- except NameError:
- # may need custom baud rate, it isn't in our list.
- ispeed = ospeed = getattr(termios, 'B38400')
- try:
- custom_baud = int(self._baudrate) # store for later
- except ValueError:
- raise ValueError('Invalid baud rate: {!r}'.format(self._baudrate))
- else:
- if custom_baud < 0:
- raise ValueError('Invalid baud rate: {!r}'.format(self._baudrate))
- # setup char len
- cflag &= ~termios.CSIZE
- if self._bytesize == 8:
- cflag |= termios.CS8
- elif self._bytesize == 7:
- cflag |= termios.CS7
- elif self._bytesize == 6:
- cflag |= termios.CS6
- elif self._bytesize == 5:
- cflag |= termios.CS5
- else:
- raise ValueError('Invalid char len: {!r}'.format(self._bytesize))
- # setup stop bits
- if self._stopbits == serial.STOPBITS_ONE:
- cflag &= ~(termios.CSTOPB)
- elif self._stopbits == serial.STOPBITS_ONE_POINT_FIVE:
- cflag |= (termios.CSTOPB) # XXX same as TWO.. there is no POSIX support for 1.5
- elif self._stopbits == serial.STOPBITS_TWO:
- cflag |= (termios.CSTOPB)
- else:
- raise ValueError('Invalid stop bit specification: {!r}'.format(self._stopbits))
- # setup parity
- iflag &= ~(termios.INPCK | termios.ISTRIP)
- if self._parity == serial.PARITY_NONE:
- cflag &= ~(termios.PARENB | termios.PARODD | CMSPAR)
- elif self._parity == serial.PARITY_EVEN:
- cflag &= ~(termios.PARODD | CMSPAR)
- cflag |= (termios.PARENB)
- elif self._parity == serial.PARITY_ODD:
- cflag &= ~CMSPAR
- cflag |= (termios.PARENB | termios.PARODD)
- elif self._parity == serial.PARITY_MARK and CMSPAR:
- cflag |= (termios.PARENB | CMSPAR | termios.PARODD)
- elif self._parity == serial.PARITY_SPACE and CMSPAR:
- cflag |= (termios.PARENB | CMSPAR)
- cflag &= ~(termios.PARODD)
- else:
- raise ValueError('Invalid parity: {!r}'.format(self._parity))
- # setup flow control
- # xonxoff
- if hasattr(termios, 'IXANY'):
- if self._xonxoff:
- iflag |= (termios.IXON | termios.IXOFF) # |termios.IXANY)
- else:
- iflag &= ~(termios.IXON | termios.IXOFF | termios.IXANY)
- else:
- if self._xonxoff:
- iflag |= (termios.IXON | termios.IXOFF)
- else:
- iflag &= ~(termios.IXON | termios.IXOFF)
- # rtscts
- if hasattr(termios, 'CRTSCTS'):
- if self._rtscts:
- cflag |= (termios.CRTSCTS)
- else:
- cflag &= ~(termios.CRTSCTS)
- elif hasattr(termios, 'CNEW_RTSCTS'): # try it with alternate constant name
- if self._rtscts:
- cflag |= (termios.CNEW_RTSCTS)
- else:
- cflag &= ~(termios.CNEW_RTSCTS)
- # XXX should there be a warning if setting up rtscts (and xonxoff etc) fails??
- # buffer
- # vmin "minimal number of characters to be read. 0 for non blocking"
- if vmin < 0 or vmin > 255:
- raise ValueError('Invalid vmin: {!r}'.format(vmin))
- cc[termios.VMIN] = vmin
- # vtime
- if vtime < 0 or vtime > 255:
- raise ValueError('Invalid vtime: {!r}'.format(vtime))
- cc[termios.VTIME] = vtime
- # activate settings
- if force_update or [iflag, oflag, cflag, lflag, ispeed, ospeed, cc] != orig_attr:
- termios.tcsetattr(
- self.fd,
- termios.TCSANOW,
- [iflag, oflag, cflag, lflag, ispeed, ospeed, cc])
- # apply custom baud rate, if any
- if custom_baud is not None:
- self._set_special_baudrate(custom_baud)
- if self._rs485_mode is not None:
- self._set_rs485_mode(self._rs485_mode)
- def close(self):
- """Close port"""
- if self.is_open:
- if self.fd is not None:
- os.close(self.fd)
- self.fd = None
- os.close(self.pipe_abort_read_w)
- os.close(self.pipe_abort_read_r)
- os.close(self.pipe_abort_write_w)
- os.close(self.pipe_abort_write_r)
- self.pipe_abort_read_r, self.pipe_abort_read_w = None, None
- self.pipe_abort_write_r, self.pipe_abort_write_w = None, None
- self.is_open = False
- # - - - - - - - - - - - - - - - - - - - - - - - -
- @property
- def in_waiting(self):
- """Return the number of bytes currently in the input buffer."""
- #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str)
- s = fcntl.ioctl(self.fd, TIOCINQ, TIOCM_zero_str)
- return struct.unpack('I', s)[0]
- # select based implementation, proved to work on many systems
- def read(self, size=1):
- """\
- Read size bytes from the serial port. If a timeout is set it may
- return less characters as requested. With no timeout it will block
- until the requested number of bytes is read.
- """
- if not self.is_open:
- raise PortNotOpenError()
- read = bytearray()
- timeout = Timeout(self._timeout)
- while len(read) < size:
- try:
- ready, _, _ = select.select([self.fd, self.pipe_abort_read_r], [], [], timeout.time_left())
- if self.pipe_abort_read_r in ready:
- os.read(self.pipe_abort_read_r, 1000)
- break
- # If select was used with a timeout, and the timeout occurs, it
- # returns with empty lists -> thus abort read operation.
- # For timeout == 0 (non-blocking operation) also abort when
- # there is nothing to read.
- if not ready:
- break # timeout
- buf = os.read(self.fd, size - len(read))
- except OSError as e:
- # this is for Python 3.x where select.error is a subclass of
- # OSError ignore BlockingIOErrors and EINTR. other errors are shown
- # https://www.python.org/dev/peps/pep-0475.
- if e.errno not in (errno.EAGAIN, errno.EALREADY, errno.EWOULDBLOCK, errno.EINPROGRESS, errno.EINTR):
- raise SerialException('read failed: {}'.format(e))
- except select.error as e:
- # this is for Python 2.x
- # ignore BlockingIOErrors and EINTR. all errors are shown
- # see also http://www.python.org/dev/peps/pep-3151/#select
- if e[0] not in (errno.EAGAIN, errno.EALREADY, errno.EWOULDBLOCK, errno.EINPROGRESS, errno.EINTR):
- raise SerialException('read failed: {}'.format(e))
- else:
- # read should always return some data as select reported it was
- # ready to read when we get to this point.
- if not buf:
- # Disconnected devices, at least on Linux, show the
- # behavior that they are always ready to read immediately
- # but reading returns nothing.
- raise SerialException(
- 'device reports readiness to read but returned no data '
- '(device disconnected or multiple access on port?)')
- read.extend(buf)
- if timeout.expired():
- break
- return bytes(read)
- def cancel_read(self):
- if self.is_open:
- os.write(self.pipe_abort_read_w, b"x")
- def cancel_write(self):
- if self.is_open:
- os.write(self.pipe_abort_write_w, b"x")
- def write(self, data):
- """Output the given byte string over the serial port."""
- if not self.is_open:
- raise PortNotOpenError()
- d = to_bytes(data)
- tx_len = length = len(d)
- timeout = Timeout(self._write_timeout)
- while tx_len > 0:
- try:
- n = os.write(self.fd, d)
- if timeout.is_non_blocking:
- # Zero timeout indicates non-blocking - simply return the
- # number of bytes of data actually written
- return n
- elif not timeout.is_infinite:
- # when timeout is set, use select to wait for being ready
- # with the time left as timeout
- if timeout.expired():
- raise SerialTimeoutException('Write timeout')
- abort, ready, _ = select.select([self.pipe_abort_write_r], [self.fd], [], timeout.time_left())
- if abort:
- os.read(self.pipe_abort_write_r, 1000)
- break
- if not ready:
- raise SerialTimeoutException('Write timeout')
- else:
- assert timeout.time_left() is None
- # wait for write operation
- abort, ready, _ = select.select([self.pipe_abort_write_r], [self.fd], [], None)
- if abort:
- os.read(self.pipe_abort_write_r, 1)
- break
- if not ready:
- raise SerialException('write failed (select)')
- d = d[n:]
- tx_len -= n
- except SerialException:
- raise
- except OSError as e:
- # this is for Python 3.x where select.error is a subclass of
- # OSError ignore BlockingIOErrors and EINTR. other errors are shown
- # https://www.python.org/dev/peps/pep-0475.
- if e.errno not in (errno.EAGAIN, errno.EALREADY, errno.EWOULDBLOCK, errno.EINPROGRESS, errno.EINTR):
- raise SerialException('write failed: {}'.format(e))
- except select.error as e:
- # this is for Python 2.x
- # ignore BlockingIOErrors and EINTR. all errors are shown
- # see also http://www.python.org/dev/peps/pep-3151/#select
- if e[0] not in (errno.EAGAIN, errno.EALREADY, errno.EWOULDBLOCK, errno.EINPROGRESS, errno.EINTR):
- raise SerialException('write failed: {}'.format(e))
- if not timeout.is_non_blocking and timeout.expired():
- raise SerialTimeoutException('Write timeout')
- return length - len(d)
- def flush(self):
- """\
- Flush of file like objects. In this case, wait until all data
- is written.
- """
- if not self.is_open:
- raise PortNotOpenError()
- termios.tcdrain(self.fd)
- def _reset_input_buffer(self):
- """Clear input buffer, discarding all that is in the buffer."""
- termios.tcflush(self.fd, termios.TCIFLUSH)
- def reset_input_buffer(self):
- """Clear input buffer, discarding all that is in the buffer."""
- if not self.is_open:
- raise PortNotOpenError()
- self._reset_input_buffer()
- def reset_output_buffer(self):
- """\
- Clear output buffer, aborting the current output and discarding all
- that is in the buffer.
- """
- if not self.is_open:
- raise PortNotOpenError()
- termios.tcflush(self.fd, termios.TCOFLUSH)
- def send_break(self, duration=0.25):
- """\
- Send break condition. Timed, returns to idle state after given
- duration.
- """
- if not self.is_open:
- raise PortNotOpenError()
- termios.tcsendbreak(self.fd, int(duration / 0.25))
- def _update_rts_state(self):
- """Set terminal status line: Request To Send"""
- if self._rts_state:
- fcntl.ioctl(self.fd, TIOCMBIS, TIOCM_RTS_str)
- else:
- fcntl.ioctl(self.fd, TIOCMBIC, TIOCM_RTS_str)
- def _update_dtr_state(self):
- """Set terminal status line: Data Terminal Ready"""
- if self._dtr_state:
- fcntl.ioctl(self.fd, TIOCMBIS, TIOCM_DTR_str)
- else:
- fcntl.ioctl(self.fd, TIOCMBIC, TIOCM_DTR_str)
- @property
- def cts(self):
- """Read terminal status line: Clear To Send"""
- if not self.is_open:
- raise PortNotOpenError()
- s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str)
- return struct.unpack('I', s)[0] & TIOCM_CTS != 0
- @property
- def dsr(self):
- """Read terminal status line: Data Set Ready"""
- if not self.is_open:
- raise PortNotOpenError()
- s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str)
- return struct.unpack('I', s)[0] & TIOCM_DSR != 0
- @property
- def ri(self):
- """Read terminal status line: Ring Indicator"""
- if not self.is_open:
- raise PortNotOpenError()
- s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str)
- return struct.unpack('I', s)[0] & TIOCM_RI != 0
- @property
- def cd(self):
- """Read terminal status line: Carrier Detect"""
- if not self.is_open:
- raise PortNotOpenError()
- s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str)
- return struct.unpack('I', s)[0] & TIOCM_CD != 0
- # - - platform specific - - - -
- @property
- def out_waiting(self):
- """Return the number of bytes currently in the output buffer."""
- #~ s = fcntl.ioctl(self.fd, termios.FIONREAD, TIOCM_zero_str)
- s = fcntl.ioctl(self.fd, TIOCOUTQ, TIOCM_zero_str)
- return struct.unpack('I', s)[0]
- def fileno(self):
- """\
- For easier use of the serial port instance with select.
- WARNING: this function is not portable to different platforms!
- """
- if not self.is_open:
- raise PortNotOpenError()
- return self.fd
- def set_input_flow_control(self, enable=True):
- """\
- Manually control flow - when software flow control is enabled.
- This will send XON (true) or XOFF (false) to the other device.
- WARNING: this function is not portable to different platforms!
- """
- if not self.is_open:
- raise PortNotOpenError()
- if enable:
- termios.tcflow(self.fd, termios.TCION)
- else:
- termios.tcflow(self.fd, termios.TCIOFF)
- def set_output_flow_control(self, enable=True):
- """\
- Manually control flow of outgoing data - when hardware or software flow
- control is enabled.
- WARNING: this function is not portable to different platforms!
- """
- if not self.is_open:
- raise PortNotOpenError()
- if enable:
- termios.tcflow(self.fd, termios.TCOON)
- else:
- termios.tcflow(self.fd, termios.TCOOFF)
- def nonblocking(self):
- """DEPRECATED - has no use"""
- import warnings
- warnings.warn("nonblocking() has no effect, already nonblocking", DeprecationWarning)
- class PosixPollSerial(Serial):
- """\
- Poll based read implementation. Not all systems support poll properly.
- However this one has better handling of errors, such as a device
- disconnecting while it's in use (e.g. USB-serial unplugged).
- """
- def read(self, size=1):
- """\
- Read size bytes from the serial port. If a timeout is set it may
- return less characters as requested. With no timeout it will block
- until the requested number of bytes is read.
- """
- if not self.is_open:
- raise PortNotOpenError()
- read = bytearray()
- timeout = Timeout(self._timeout)
- poll = select.poll()
- poll.register(self.fd, select.POLLIN | select.POLLERR | select.POLLHUP | select.POLLNVAL)
- poll.register(self.pipe_abort_read_r, select.POLLIN | select.POLLERR | select.POLLHUP | select.POLLNVAL)
- if size > 0:
- while len(read) < size:
- # print "\tread(): size",size, "have", len(read) #debug
- # wait until device becomes ready to read (or something fails)
- for fd, event in poll.poll(None if timeout.is_infinite else (timeout.time_left() * 1000)):
- if fd == self.pipe_abort_read_r:
- break
- if event & (select.POLLERR | select.POLLHUP | select.POLLNVAL):
- raise SerialException('device reports error (poll)')
- # we don't care if it is select.POLLIN or timeout, that's
- # handled below
- if fd == self.pipe_abort_read_r:
- os.read(self.pipe_abort_read_r, 1000)
- break
- buf = os.read(self.fd, size - len(read))
- read.extend(buf)
- if timeout.expired() \
- or (self._inter_byte_timeout is not None and self._inter_byte_timeout > 0) and not buf:
- break # early abort on timeout
- return bytes(read)
- class VTIMESerial(Serial):
- """\
- Implement timeout using vtime of tty device instead of using select.
- This means that no inter character timeout can be specified and that
- the error handling is degraded.
- Overall timeout is disabled when inter-character timeout is used.
- Note that this implementation does NOT support cancel_read(), it will
- just ignore that.
- """
- def _reconfigure_port(self, force_update=True):
- """Set communication parameters on opened port."""
- super(VTIMESerial, self)._reconfigure_port()
- fcntl.fcntl(self.fd, fcntl.F_SETFL, 0) # clear O_NONBLOCK
- if self._inter_byte_timeout is not None:
- vmin = 1
- vtime = int(self._inter_byte_timeout * 10)
- elif self._timeout is None:
- vmin = 1
- vtime = 0
- else:
- vmin = 0
- vtime = int(self._timeout * 10)
- try:
- orig_attr = termios.tcgetattr(self.fd)
- iflag, oflag, cflag, lflag, ispeed, ospeed, cc = orig_attr
- except termios.error as msg: # if a port is nonexistent but has a /dev file, it'll fail here
- raise serial.SerialException("Could not configure port: {}".format(msg))
- if vtime < 0 or vtime > 255:
- raise ValueError('Invalid vtime: {!r}'.format(vtime))
- cc[termios.VTIME] = vtime
- cc[termios.VMIN] = vmin
- termios.tcsetattr(
- self.fd,
- termios.TCSANOW,
- [iflag, oflag, cflag, lflag, ispeed, ospeed, cc])
- def read(self, size=1):
- """\
- Read size bytes from the serial port. If a timeout is set it may
- return less characters as requested. With no timeout it will block
- until the requested number of bytes is read.
- """
- if not self.is_open:
- raise PortNotOpenError()
- read = bytearray()
- while len(read) < size:
- buf = os.read(self.fd, size - len(read))
- if not buf:
- break
- read.extend(buf)
- return bytes(read)
- # hack to make hasattr return false
- cancel_read = property()
|