123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900 |
- from __future__ import absolute_import
- import errno
- import fcntl
- import os
- import select
- import struct
- import sys
- import termios
- import serial
- from serial.serialutil import SerialBase, SerialException, to_bytes, \
- PortNotOpenError, SerialTimeoutException, Timeout
- class PlatformSpecificBase(object):
- BAUDRATE_CONSTANTS = {}
- def _set_special_baudrate(self, baudrate):
- raise NotImplementedError('non-standard baudrates are not supported on this platform')
- def _set_rs485_mode(self, rs485_settings):
- raise NotImplementedError('RS485 not supported on this platform')
- def set_low_latency_mode(self, low_latency_settings):
- raise NotImplementedError('Low latency not supported on this platform')
- def _update_break_state(self):
- """\
- Set break: Controls TXD. When active, no transmitting is possible.
- """
- if self._break_state:
- fcntl.ioctl(self.fd, TIOCSBRK)
- else:
- fcntl.ioctl(self.fd, TIOCCBRK)
-
- CMSPAR = 0
- plat = sys.platform.lower()
- if plat[:5] == 'linux':
- import array
-
- CMSPAR = 0o10000000000
-
- TCGETS2 = 0x802C542A
- TCSETS2 = 0x402C542B
- BOTHER = 0o010000
-
- TIOCGRS485 = 0x542E
- TIOCSRS485 = 0x542F
- SER_RS485_ENABLED = 0b00000001
- SER_RS485_RTS_ON_SEND = 0b00000010
- SER_RS485_RTS_AFTER_SEND = 0b00000100
- SER_RS485_RX_DURING_TX = 0b00010000
- class PlatformSpecific(PlatformSpecificBase):
- BAUDRATE_CONSTANTS = {
- 0: 0o000000,
- 50: 0o000001,
- 75: 0o000002,
- 110: 0o000003,
- 134: 0o000004,
- 150: 0o000005,
- 200: 0o000006,
- 300: 0o000007,
- 600: 0o000010,
- 1200: 0o000011,
- 1800: 0o000012,
- 2400: 0o000013,
- 4800: 0o000014,
- 9600: 0o000015,
- 19200: 0o000016,
- 38400: 0o000017,
- 57600: 0o010001,
- 115200: 0o010002,
- 230400: 0o010003,
- 460800: 0o010004,
- 500000: 0o010005,
- 576000: 0o010006,
- 921600: 0o010007,
- 1000000: 0o010010,
- 1152000: 0o010011,
- 1500000: 0o010012,
- 2000000: 0o010013,
- 2500000: 0o010014,
- 3000000: 0o010015,
- 3500000: 0o010016,
- 4000000: 0o010017
- }
- def set_low_latency_mode(self, low_latency_settings):
- buf = array.array('i', [0] * 32)
- try:
-
- fcntl.ioctl(self.fd, termios.TIOCGSERIAL, buf)
-
- if low_latency_settings:
- buf[4] |= 0x2000
- else:
- buf[4] &= ~0x2000
-
- fcntl.ioctl(self.fd, termios.TIOCSSERIAL, buf)
- except IOError as e:
- raise ValueError('Failed to update ASYNC_LOW_LATENCY flag to {}: {}'.format(low_latency_settings, e))
- def _set_special_baudrate(self, baudrate):
-
- buf = array.array('i', [0] * 64)
- try:
-
- fcntl.ioctl(self.fd, TCGETS2, buf)
-
- buf[2] &= ~termios.CBAUD
- buf[2] |= BOTHER
- buf[9] = buf[10] = baudrate
-
- fcntl.ioctl(self.fd, TCSETS2, buf)
- except IOError as e:
- raise ValueError('Failed to set custom baud rate ({}): {}'.format(baudrate, e))
- def _set_rs485_mode(self, rs485_settings):
- buf = array.array('i', [0] * 8)
- try:
- fcntl.ioctl(self.fd, TIOCGRS485, buf)
- buf[0] |= SER_RS485_ENABLED
- if rs485_settings is not None:
- if rs485_settings.loopback:
- buf[0] |= SER_RS485_RX_DURING_TX
- else:
- buf[0] &= ~SER_RS485_RX_DURING_TX
- if rs485_settings.rts_level_for_tx:
- buf[0] |= SER_RS485_RTS_ON_SEND
- else:
- buf[0] &= ~SER_RS485_RTS_ON_SEND
- if rs485_settings.rts_level_for_rx:
- buf[0] |= SER_RS485_RTS_AFTER_SEND
- else:
- buf[0] &= ~SER_RS485_RTS_AFTER_SEND
- if rs485_settings.delay_before_tx is not None:
- buf[1] = int(rs485_settings.delay_before_tx * 1000)
- if rs485_settings.delay_before_rx is not None:
- buf[2] = int(rs485_settings.delay_before_rx * 1000)
- else:
- buf[0] = 0
- fcntl.ioctl(self.fd, TIOCSRS485, buf)
- except IOError as e:
- raise ValueError('Failed to set RS485 mode: {}'.format(e))
- elif plat == 'cygwin':
- class PlatformSpecific(PlatformSpecificBase):
- BAUDRATE_CONSTANTS = {
- 128000: 0x01003,
- 256000: 0x01005,
- 500000: 0x01007,
- 576000: 0x01008,
- 921600: 0x01009,
- 1000000: 0x0100a,
- 1152000: 0x0100b,
- 1500000: 0x0100c,
- 2000000: 0x0100d,
- 2500000: 0x0100e,
- 3000000: 0x0100f
- }
- elif plat[:6] == 'darwin':
- import array
- IOSSIOSPEED = 0x80045402
- class PlatformSpecific(PlatformSpecificBase):
- osx_version = os.uname()[2].split('.')
- TIOCSBRK = 0x2000747B
- TIOCCBRK = 0x2000747A
-
- if int(osx_version[0]) >= 8:
- def _set_special_baudrate(self, baudrate):
-
- buf = array.array('i', [baudrate])
- fcntl.ioctl(self.fd, IOSSIOSPEED, buf, 1)
- def _update_break_state(self):
- """\
- Set break: Controls TXD. When active, no transmitting is possible.
- """
- if self._break_state:
- fcntl.ioctl(self.fd, PlatformSpecific.TIOCSBRK)
- else:
- fcntl.ioctl(self.fd, PlatformSpecific.TIOCCBRK)
- elif plat[:3] == 'bsd' or \
- plat[:7] == 'freebsd' or \
- plat[:6] == 'netbsd' or \
- plat[:7] == 'openbsd':
- class ReturnBaudrate(object):
- def __getitem__(self, key):
- return key
- class PlatformSpecific(PlatformSpecificBase):
-
-
-
- BAUDRATE_CONSTANTS = ReturnBaudrate()
- TIOCSBRK = 0x2000747B
- TIOCCBRK = 0x2000747A
-
- def _update_break_state(self):
- """\
- Set break: Controls TXD. When active, no transmitting is possible.
- """
- if self._break_state:
- fcntl.ioctl(self.fd, PlatformSpecific.TIOCSBRK)
- else:
- fcntl.ioctl(self.fd, PlatformSpecific.TIOCCBRK)
- else:
- class PlatformSpecific(PlatformSpecificBase):
- pass
- TIOCMGET = getattr(termios, 'TIOCMGET', 0x5415)
- TIOCMBIS = getattr(termios, 'TIOCMBIS', 0x5416)
- TIOCMBIC = getattr(termios, 'TIOCMBIC', 0x5417)
- TIOCMSET = getattr(termios, 'TIOCMSET', 0x5418)
- TIOCM_DTR = getattr(termios, 'TIOCM_DTR', 0x002)
- TIOCM_RTS = getattr(termios, 'TIOCM_RTS', 0x004)
- TIOCM_CTS = getattr(termios, 'TIOCM_CTS', 0x020)
- TIOCM_CAR = getattr(termios, 'TIOCM_CAR', 0x040)
- TIOCM_RNG = getattr(termios, 'TIOCM_RNG', 0x080)
- TIOCM_DSR = getattr(termios, 'TIOCM_DSR', 0x100)
- TIOCM_CD = getattr(termios, 'TIOCM_CD', TIOCM_CAR)
- TIOCM_RI = getattr(termios, 'TIOCM_RI', TIOCM_RNG)
- if hasattr(termios, 'TIOCINQ'):
- TIOCINQ = termios.TIOCINQ
- else:
- TIOCINQ = getattr(termios, 'FIONREAD', 0x541B)
- TIOCOUTQ = getattr(termios, 'TIOCOUTQ', 0x5411)
- TIOCM_zero_str = struct.pack('I', 0)
- TIOCM_RTS_str = struct.pack('I', TIOCM_RTS)
- TIOCM_DTR_str = struct.pack('I', TIOCM_DTR)
- TIOCSBRK = getattr(termios, 'TIOCSBRK', 0x5427)
- TIOCCBRK = getattr(termios, 'TIOCCBRK', 0x5428)
- class Serial(SerialBase, PlatformSpecific):
- """\
- Serial port class POSIX implementation. Serial port configuration is
- done with termios and fcntl. Runs on Linux and many other Un*x like
- systems.
- """
- def open(self):
- """\
- Open port with current settings. This may throw a SerialException
- if the port cannot be opened."""
- if self._port is None:
- raise SerialException("Port must be configured before it can be used.")
- if self.is_open:
- raise SerialException("Port is already open.")
- self.fd = None
-
- try:
- self.fd = os.open(self.portstr, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK)
- except OSError as msg:
- self.fd = None
- raise SerialException(msg.errno, "could not open port {}: {}".format(self._port, msg))
-
- self.pipe_abort_read_r, self.pipe_abort_read_w = None, None
- self.pipe_abort_write_r, self.pipe_abort_write_w = None, None
- try:
- self._reconfigure_port(force_update=True)
- try:
- if not self._dsrdtr:
- self._update_dtr_state()
- if not self._rtscts:
- self._update_rts_state()
- except IOError as e:
-
- if e.errno not in (errno.EINVAL, errno.ENOTTY):
- raise
- self._reset_input_buffer()
- self.pipe_abort_read_r, self.pipe_abort_read_w = os.pipe()
- self.pipe_abort_write_r, self.pipe_abort_write_w = os.pipe()
- fcntl.fcntl(self.pipe_abort_read_r, fcntl.F_SETFL, os.O_NONBLOCK)
- fcntl.fcntl(self.pipe_abort_write_r, fcntl.F_SETFL, os.O_NONBLOCK)
- except BaseException:
- try:
- os.close(self.fd)
- except Exception:
-
-
- pass
- self.fd = None
- if self.pipe_abort_read_w is not None:
- os.close(self.pipe_abort_read_w)
- self.pipe_abort_read_w = None
- if self.pipe_abort_read_r is not None:
- os.close(self.pipe_abort_read_r)
- self.pipe_abort_read_r = None
- if self.pipe_abort_write_w is not None:
- os.close(self.pipe_abort_write_w)
- self.pipe_abort_write_w = None
- if self.pipe_abort_write_r is not None:
- os.close(self.pipe_abort_write_r)
- self.pipe_abort_write_r = None
- raise
- self.is_open = True
- def _reconfigure_port(self, force_update=False):
- """Set communication parameters on opened port."""
- if self.fd is None:
- raise SerialException("Can only operate on a valid file descriptor")
-
- if self._exclusive is not None:
- if self._exclusive:
- try:
- fcntl.flock(self.fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
- except IOError as msg:
- raise SerialException(msg.errno, "Could not exclusively lock port {}: {}".format(self._port, msg))
- else:
- fcntl.flock(self.fd, fcntl.LOCK_UN)
- custom_baud = None
- vmin = vtime = 0
- if self._inter_byte_timeout is not None:
- vmin = 1
- vtime = int(self._inter_byte_timeout * 10)
- try:
- orig_attr = termios.tcgetattr(self.fd)
- iflag, oflag, cflag, lflag, ispeed, ospeed, cc = orig_attr
- except termios.error as msg:
- raise SerialException("Could not configure port: {}".format(msg))
-
- cflag |= (termios.CLOCAL | termios.CREAD)
- lflag &= ~(termios.ICANON | termios.ECHO | termios.ECHOE |
- termios.ECHOK | termios.ECHONL |
- termios.ISIG | termios.IEXTEN)
- for flag in ('ECHOCTL', 'ECHOKE'):
- if hasattr(termios, flag):
- lflag &= ~getattr(termios, flag)
- oflag &= ~(termios.OPOST | termios.ONLCR | termios.OCRNL)
- iflag &= ~(termios.INLCR | termios.IGNCR | termios.ICRNL | termios.IGNBRK)
- if hasattr(termios, 'IUCLC'):
- iflag &= ~termios.IUCLC
- if hasattr(termios, 'PARMRK'):
- iflag &= ~termios.PARMRK
-
- try:
- ispeed = ospeed = getattr(termios, 'B{}'.format(self._baudrate))
- except AttributeError:
- try:
- ispeed = ospeed = self.BAUDRATE_CONSTANTS[self._baudrate]
- except KeyError:
-
-
-
- try:
- ispeed = ospeed = BOTHER
- except NameError:
-
- ispeed = ospeed = getattr(termios, 'B38400')
- try:
- custom_baud = int(self._baudrate)
- except ValueError:
- raise ValueError('Invalid baud rate: {!r}'.format(self._baudrate))
- else:
- if custom_baud < 0:
- raise ValueError('Invalid baud rate: {!r}'.format(self._baudrate))
-
- cflag &= ~termios.CSIZE
- if self._bytesize == 8:
- cflag |= termios.CS8
- elif self._bytesize == 7:
- cflag |= termios.CS7
- elif self._bytesize == 6:
- cflag |= termios.CS6
- elif self._bytesize == 5:
- cflag |= termios.CS5
- else:
- raise ValueError('Invalid char len: {!r}'.format(self._bytesize))
-
- if self._stopbits == serial.STOPBITS_ONE:
- cflag &= ~(termios.CSTOPB)
- elif self._stopbits == serial.STOPBITS_ONE_POINT_FIVE:
- cflag |= (termios.CSTOPB)
- elif self._stopbits == serial.STOPBITS_TWO:
- cflag |= (termios.CSTOPB)
- else:
- raise ValueError('Invalid stop bit specification: {!r}'.format(self._stopbits))
-
- iflag &= ~(termios.INPCK | termios.ISTRIP)
- if self._parity == serial.PARITY_NONE:
- cflag &= ~(termios.PARENB | termios.PARODD | CMSPAR)
- elif self._parity == serial.PARITY_EVEN:
- cflag &= ~(termios.PARODD | CMSPAR)
- cflag |= (termios.PARENB)
- elif self._parity == serial.PARITY_ODD:
- cflag &= ~CMSPAR
- cflag |= (termios.PARENB | termios.PARODD)
- elif self._parity == serial.PARITY_MARK and CMSPAR:
- cflag |= (termios.PARENB | CMSPAR | termios.PARODD)
- elif self._parity == serial.PARITY_SPACE and CMSPAR:
- cflag |= (termios.PARENB | CMSPAR)
- cflag &= ~(termios.PARODD)
- else:
- raise ValueError('Invalid parity: {!r}'.format(self._parity))
-
-
- if hasattr(termios, 'IXANY'):
- if self._xonxoff:
- iflag |= (termios.IXON | termios.IXOFF)
- else:
- iflag &= ~(termios.IXON | termios.IXOFF | termios.IXANY)
- else:
- if self._xonxoff:
- iflag |= (termios.IXON | termios.IXOFF)
- else:
- iflag &= ~(termios.IXON | termios.IXOFF)
-
- if hasattr(termios, 'CRTSCTS'):
- if self._rtscts:
- cflag |= (termios.CRTSCTS)
- else:
- cflag &= ~(termios.CRTSCTS)
- elif hasattr(termios, 'CNEW_RTSCTS'):
- if self._rtscts:
- cflag |= (termios.CNEW_RTSCTS)
- else:
- cflag &= ~(termios.CNEW_RTSCTS)
-
-
-
- if vmin < 0 or vmin > 255:
- raise ValueError('Invalid vmin: {!r}'.format(vmin))
- cc[termios.VMIN] = vmin
-
- if vtime < 0 or vtime > 255:
- raise ValueError('Invalid vtime: {!r}'.format(vtime))
- cc[termios.VTIME] = vtime
-
- if force_update or [iflag, oflag, cflag, lflag, ispeed, ospeed, cc] != orig_attr:
- termios.tcsetattr(
- self.fd,
- termios.TCSANOW,
- [iflag, oflag, cflag, lflag, ispeed, ospeed, cc])
-
- if custom_baud is not None:
- self._set_special_baudrate(custom_baud)
- if self._rs485_mode is not None:
- self._set_rs485_mode(self._rs485_mode)
- def close(self):
- """Close port"""
- if self.is_open:
- if self.fd is not None:
- os.close(self.fd)
- self.fd = None
- os.close(self.pipe_abort_read_w)
- os.close(self.pipe_abort_read_r)
- os.close(self.pipe_abort_write_w)
- os.close(self.pipe_abort_write_r)
- self.pipe_abort_read_r, self.pipe_abort_read_w = None, None
- self.pipe_abort_write_r, self.pipe_abort_write_w = None, None
- self.is_open = False
-
- @property
- def in_waiting(self):
- """Return the number of bytes currently in the input buffer."""
-
- s = fcntl.ioctl(self.fd, TIOCINQ, TIOCM_zero_str)
- return struct.unpack('I', s)[0]
-
- def read(self, size=1):
- """\
- Read size bytes from the serial port. If a timeout is set it may
- return less characters as requested. With no timeout it will block
- until the requested number of bytes is read.
- """
- if not self.is_open:
- raise PortNotOpenError()
- read = bytearray()
- timeout = Timeout(self._timeout)
- while len(read) < size:
- try:
- ready, _, _ = select.select([self.fd, self.pipe_abort_read_r], [], [], timeout.time_left())
- if self.pipe_abort_read_r in ready:
- os.read(self.pipe_abort_read_r, 1000)
- break
-
-
-
-
- if not ready:
- break
- buf = os.read(self.fd, size - len(read))
- except OSError as e:
-
-
-
- if e.errno not in (errno.EAGAIN, errno.EALREADY, errno.EWOULDBLOCK, errno.EINPROGRESS, errno.EINTR):
- raise SerialException('read failed: {}'.format(e))
- except select.error as e:
-
-
-
- if e[0] not in (errno.EAGAIN, errno.EALREADY, errno.EWOULDBLOCK, errno.EINPROGRESS, errno.EINTR):
- raise SerialException('read failed: {}'.format(e))
- else:
-
-
- if not buf:
-
-
-
- raise SerialException(
- 'device reports readiness to read but returned no data '
- '(device disconnected or multiple access on port?)')
- read.extend(buf)
- if timeout.expired():
- break
- return bytes(read)
- def cancel_read(self):
- if self.is_open:
- os.write(self.pipe_abort_read_w, b"x")
- def cancel_write(self):
- if self.is_open:
- os.write(self.pipe_abort_write_w, b"x")
- def write(self, data):
- """Output the given byte string over the serial port."""
- if not self.is_open:
- raise PortNotOpenError()
- d = to_bytes(data)
- tx_len = length = len(d)
- timeout = Timeout(self._write_timeout)
- while tx_len > 0:
- try:
- n = os.write(self.fd, d)
- if timeout.is_non_blocking:
-
-
- return n
- elif not timeout.is_infinite:
-
-
- if timeout.expired():
- raise SerialTimeoutException('Write timeout')
- abort, ready, _ = select.select([self.pipe_abort_write_r], [self.fd], [], timeout.time_left())
- if abort:
- os.read(self.pipe_abort_write_r, 1000)
- break
- if not ready:
- raise SerialTimeoutException('Write timeout')
- else:
- assert timeout.time_left() is None
-
- abort, ready, _ = select.select([self.pipe_abort_write_r], [self.fd], [], None)
- if abort:
- os.read(self.pipe_abort_write_r, 1)
- break
- if not ready:
- raise SerialException('write failed (select)')
- d = d[n:]
- tx_len -= n
- except SerialException:
- raise
- except OSError as e:
-
-
-
- if e.errno not in (errno.EAGAIN, errno.EALREADY, errno.EWOULDBLOCK, errno.EINPROGRESS, errno.EINTR):
- raise SerialException('write failed: {}'.format(e))
- except select.error as e:
-
-
-
- if e[0] not in (errno.EAGAIN, errno.EALREADY, errno.EWOULDBLOCK, errno.EINPROGRESS, errno.EINTR):
- raise SerialException('write failed: {}'.format(e))
- if not timeout.is_non_blocking and timeout.expired():
- raise SerialTimeoutException('Write timeout')
- return length - len(d)
- def flush(self):
- """\
- Flush of file like objects. In this case, wait until all data
- is written.
- """
- if not self.is_open:
- raise PortNotOpenError()
- termios.tcdrain(self.fd)
- def _reset_input_buffer(self):
- """Clear input buffer, discarding all that is in the buffer."""
- termios.tcflush(self.fd, termios.TCIFLUSH)
- def reset_input_buffer(self):
- """Clear input buffer, discarding all that is in the buffer."""
- if not self.is_open:
- raise PortNotOpenError()
- self._reset_input_buffer()
- def reset_output_buffer(self):
- """\
- Clear output buffer, aborting the current output and discarding all
- that is in the buffer.
- """
- if not self.is_open:
- raise PortNotOpenError()
- termios.tcflush(self.fd, termios.TCOFLUSH)
- def send_break(self, duration=0.25):
- """\
- Send break condition. Timed, returns to idle state after given
- duration.
- """
- if not self.is_open:
- raise PortNotOpenError()
- termios.tcsendbreak(self.fd, int(duration / 0.25))
- def _update_rts_state(self):
- """Set terminal status line: Request To Send"""
- if self._rts_state:
- fcntl.ioctl(self.fd, TIOCMBIS, TIOCM_RTS_str)
- else:
- fcntl.ioctl(self.fd, TIOCMBIC, TIOCM_RTS_str)
- def _update_dtr_state(self):
- """Set terminal status line: Data Terminal Ready"""
- if self._dtr_state:
- fcntl.ioctl(self.fd, TIOCMBIS, TIOCM_DTR_str)
- else:
- fcntl.ioctl(self.fd, TIOCMBIC, TIOCM_DTR_str)
- @property
- def cts(self):
- """Read terminal status line: Clear To Send"""
- if not self.is_open:
- raise PortNotOpenError()
- s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str)
- return struct.unpack('I', s)[0] & TIOCM_CTS != 0
- @property
- def dsr(self):
- """Read terminal status line: Data Set Ready"""
- if not self.is_open:
- raise PortNotOpenError()
- s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str)
- return struct.unpack('I', s)[0] & TIOCM_DSR != 0
- @property
- def ri(self):
- """Read terminal status line: Ring Indicator"""
- if not self.is_open:
- raise PortNotOpenError()
- s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str)
- return struct.unpack('I', s)[0] & TIOCM_RI != 0
- @property
- def cd(self):
- """Read terminal status line: Carrier Detect"""
- if not self.is_open:
- raise PortNotOpenError()
- s = fcntl.ioctl(self.fd, TIOCMGET, TIOCM_zero_str)
- return struct.unpack('I', s)[0] & TIOCM_CD != 0
-
- @property
- def out_waiting(self):
- """Return the number of bytes currently in the output buffer."""
-
- s = fcntl.ioctl(self.fd, TIOCOUTQ, TIOCM_zero_str)
- return struct.unpack('I', s)[0]
- def fileno(self):
- """\
- For easier use of the serial port instance with select.
- WARNING: this function is not portable to different platforms!
- """
- if not self.is_open:
- raise PortNotOpenError()
- return self.fd
- def set_input_flow_control(self, enable=True):
- """\
- Manually control flow - when software flow control is enabled.
- This will send XON (true) or XOFF (false) to the other device.
- WARNING: this function is not portable to different platforms!
- """
- if not self.is_open:
- raise PortNotOpenError()
- if enable:
- termios.tcflow(self.fd, termios.TCION)
- else:
- termios.tcflow(self.fd, termios.TCIOFF)
- def set_output_flow_control(self, enable=True):
- """\
- Manually control flow of outgoing data - when hardware or software flow
- control is enabled.
- WARNING: this function is not portable to different platforms!
- """
- if not self.is_open:
- raise PortNotOpenError()
- if enable:
- termios.tcflow(self.fd, termios.TCOON)
- else:
- termios.tcflow(self.fd, termios.TCOOFF)
- def nonblocking(self):
- """DEPRECATED - has no use"""
- import warnings
- warnings.warn("nonblocking() has no effect, already nonblocking", DeprecationWarning)
- class PosixPollSerial(Serial):
- """\
- Poll based read implementation. Not all systems support poll properly.
- However this one has better handling of errors, such as a device
- disconnecting while it's in use (e.g. USB-serial unplugged).
- """
- def read(self, size=1):
- """\
- Read size bytes from the serial port. If a timeout is set it may
- return less characters as requested. With no timeout it will block
- until the requested number of bytes is read.
- """
- if not self.is_open:
- raise PortNotOpenError()
- read = bytearray()
- timeout = Timeout(self._timeout)
- poll = select.poll()
- poll.register(self.fd, select.POLLIN | select.POLLERR | select.POLLHUP | select.POLLNVAL)
- poll.register(self.pipe_abort_read_r, select.POLLIN | select.POLLERR | select.POLLHUP | select.POLLNVAL)
- if size > 0:
- while len(read) < size:
-
-
- for fd, event in poll.poll(None if timeout.is_infinite else (timeout.time_left() * 1000)):
- if fd == self.pipe_abort_read_r:
- break
- if event & (select.POLLERR | select.POLLHUP | select.POLLNVAL):
- raise SerialException('device reports error (poll)')
-
-
- if fd == self.pipe_abort_read_r:
- os.read(self.pipe_abort_read_r, 1000)
- break
- buf = os.read(self.fd, size - len(read))
- read.extend(buf)
- if timeout.expired() \
- or (self._inter_byte_timeout is not None and self._inter_byte_timeout > 0) and not buf:
- break
- return bytes(read)
- class VTIMESerial(Serial):
- """\
- Implement timeout using vtime of tty device instead of using select.
- This means that no inter character timeout can be specified and that
- the error handling is degraded.
- Overall timeout is disabled when inter-character timeout is used.
- Note that this implementation does NOT support cancel_read(), it will
- just ignore that.
- """
- def _reconfigure_port(self, force_update=True):
- """Set communication parameters on opened port."""
- super(VTIMESerial, self)._reconfigure_port()
- fcntl.fcntl(self.fd, fcntl.F_SETFL, 0)
- if self._inter_byte_timeout is not None:
- vmin = 1
- vtime = int(self._inter_byte_timeout * 10)
- elif self._timeout is None:
- vmin = 1
- vtime = 0
- else:
- vmin = 0
- vtime = int(self._timeout * 10)
- try:
- orig_attr = termios.tcgetattr(self.fd)
- iflag, oflag, cflag, lflag, ispeed, ospeed, cc = orig_attr
- except termios.error as msg:
- raise serial.SerialException("Could not configure port: {}".format(msg))
- if vtime < 0 or vtime > 255:
- raise ValueError('Invalid vtime: {!r}'.format(vtime))
- cc[termios.VTIME] = vtime
- cc[termios.VMIN] = vmin
- termios.tcsetattr(
- self.fd,
- termios.TCSANOW,
- [iflag, oflag, cflag, lflag, ispeed, ospeed, cc])
- def read(self, size=1):
- """\
- Read size bytes from the serial port. If a timeout is set it may
- return less characters as requested. With no timeout it will block
- until the requested number of bytes is read.
- """
- if not self.is_open:
- raise PortNotOpenError()
- read = bytearray()
- while len(read) < size:
- buf = os.read(self.fd, size - len(read))
- if not buf:
- break
- read.extend(buf)
- return bytes(read)
-
- cancel_read = property()
|