protocol_loop.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. #! python
  2. #
  3. # This module implements a loop back connection receiving itself what it sent.
  4. #
  5. # The purpose of this module is.. well... You can run the unit tests with it.
  6. # and it was so easy to implement ;-)
  7. #
  8. # This file is part of pySerial. https://github.com/pyserial/pyserial
  9. # (C) 2001-2020 Chris Liechti <cliechti@gmx.net>
  10. #
  11. # SPDX-License-Identifier: BSD-3-Clause
  12. #
  13. # URL format: loop://[option[/option...]]
  14. # options:
  15. # - "debug" print diagnostic messages
  16. from __future__ import absolute_import
  17. import logging
  18. import numbers
  19. import time
  20. try:
  21. import urlparse
  22. except ImportError:
  23. import urllib.parse as urlparse
  24. try:
  25. import queue
  26. except ImportError:
  27. import Queue as queue
  28. from serial.serialutil import SerialBase, SerialException, to_bytes, iterbytes, SerialTimeoutException, PortNotOpenError
  29. # map log level names to constants. used in from_url()
  30. LOGGER_LEVELS = {
  31. 'debug': logging.DEBUG,
  32. 'info': logging.INFO,
  33. 'warning': logging.WARNING,
  34. 'error': logging.ERROR,
  35. }
  36. class Serial(SerialBase):
  37. """Serial port implementation that simulates a loop back connection in plain software."""
  38. BAUDRATES = (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800,
  39. 9600, 19200, 38400, 57600, 115200)
  40. def __init__(self, *args, **kwargs):
  41. self.buffer_size = 4096
  42. self.queue = None
  43. self.logger = None
  44. self._cancel_write = False
  45. super(Serial, self).__init__(*args, **kwargs)
  46. def open(self):
  47. """\
  48. Open port with current settings. This may throw a SerialException
  49. if the port cannot be opened.
  50. """
  51. if self.is_open:
  52. raise SerialException("Port is already open.")
  53. self.logger = None
  54. self.queue = queue.Queue(self.buffer_size)
  55. if self._port is None:
  56. raise SerialException("Port must be configured before it can be used.")
  57. # not that there is anything to open, but the function applies the
  58. # options found in the URL
  59. self.from_url(self.port)
  60. # not that there anything to configure...
  61. self._reconfigure_port()
  62. # all things set up get, now a clean start
  63. self.is_open = True
  64. if not self._dsrdtr:
  65. self._update_dtr_state()
  66. if not self._rtscts:
  67. self._update_rts_state()
  68. self.reset_input_buffer()
  69. self.reset_output_buffer()
  70. def close(self):
  71. if self.is_open:
  72. self.is_open = False
  73. try:
  74. self.queue.put_nowait(None)
  75. except queue.Full:
  76. pass
  77. super(Serial, self).close()
  78. def _reconfigure_port(self):
  79. """\
  80. Set communication parameters on opened port. For the loop://
  81. protocol all settings are ignored!
  82. """
  83. # not that's it of any real use, but it helps in the unit tests
  84. if not isinstance(self._baudrate, numbers.Integral) or not 0 < self._baudrate < 2 ** 32:
  85. raise ValueError("invalid baudrate: {!r}".format(self._baudrate))
  86. if self.logger:
  87. self.logger.info('_reconfigure_port()')
  88. def from_url(self, url):
  89. """extract host and port from an URL string"""
  90. parts = urlparse.urlsplit(url)
  91. if parts.scheme != "loop":
  92. raise SerialException(
  93. 'expected a string in the form '
  94. '"loop://[?logging={debug|info|warning|error}]": not starting '
  95. 'with loop:// ({!r})'.format(parts.scheme))
  96. try:
  97. # process options now, directly altering self
  98. for option, values in urlparse.parse_qs(parts.query, True).items():
  99. if option == 'logging':
  100. logging.basicConfig() # XXX is that good to call it here?
  101. self.logger = logging.getLogger('pySerial.loop')
  102. self.logger.setLevel(LOGGER_LEVELS[values[0]])
  103. self.logger.debug('enabled logging')
  104. else:
  105. raise ValueError('unknown option: {!r}'.format(option))
  106. except ValueError as e:
  107. raise SerialException(
  108. 'expected a string in the form '
  109. '"loop://[?logging={debug|info|warning|error}]": {}'.format(e))
  110. # - - - - - - - - - - - - - - - - - - - - - - - -
  111. @property
  112. def in_waiting(self):
  113. """Return the number of bytes currently in the input buffer."""
  114. if not self.is_open:
  115. raise PortNotOpenError()
  116. if self.logger:
  117. # attention the logged value can differ from return value in
  118. # threaded environments...
  119. self.logger.debug('in_waiting -> {:d}'.format(self.queue.qsize()))
  120. return self.queue.qsize()
  121. def read(self, size=1):
  122. """\
  123. Read size bytes from the serial port. If a timeout is set it may
  124. return less characters as requested. With no timeout it will block
  125. until the requested number of bytes is read.
  126. """
  127. if not self.is_open:
  128. raise PortNotOpenError()
  129. if self._timeout is not None and self._timeout != 0:
  130. timeout = time.time() + self._timeout
  131. else:
  132. timeout = None
  133. data = bytearray()
  134. while size > 0 and self.is_open:
  135. try:
  136. b = self.queue.get(timeout=self._timeout) # XXX inter char timeout
  137. except queue.Empty:
  138. if self._timeout == 0:
  139. break
  140. else:
  141. if b is not None:
  142. data += b
  143. size -= 1
  144. else:
  145. break
  146. # check for timeout now, after data has been read.
  147. # useful for timeout = 0 (non blocking) read
  148. if timeout and time.time() > timeout:
  149. if self.logger:
  150. self.logger.info('read timeout')
  151. break
  152. return bytes(data)
  153. def cancel_read(self):
  154. self.queue.put_nowait(None)
  155. def cancel_write(self):
  156. self._cancel_write = True
  157. def write(self, data):
  158. """\
  159. Output the given byte string over the serial port. Can block if the
  160. connection is blocked. May raise SerialException if the connection is
  161. closed.
  162. """
  163. self._cancel_write = False
  164. if not self.is_open:
  165. raise PortNotOpenError()
  166. data = to_bytes(data)
  167. # calculate aprox time that would be used to send the data
  168. time_used_to_send = 10.0 * len(data) / self._baudrate
  169. # when a write timeout is configured check if we would be successful
  170. # (not sending anything, not even the part that would have time)
  171. if self._write_timeout is not None and time_used_to_send > self._write_timeout:
  172. # must wait so that unit test succeeds
  173. time_left = self._write_timeout
  174. while time_left > 0 and not self._cancel_write:
  175. time.sleep(min(time_left, 0.5))
  176. time_left -= 0.5
  177. if self._cancel_write:
  178. return 0 # XXX
  179. raise SerialTimeoutException('Write timeout')
  180. for byte in iterbytes(data):
  181. self.queue.put(byte, timeout=self._write_timeout)
  182. return len(data)
  183. def reset_input_buffer(self):
  184. """Clear input buffer, discarding all that is in the buffer."""
  185. if not self.is_open:
  186. raise PortNotOpenError()
  187. if self.logger:
  188. self.logger.info('reset_input_buffer()')
  189. try:
  190. while self.queue.qsize():
  191. self.queue.get_nowait()
  192. except queue.Empty:
  193. pass
  194. def reset_output_buffer(self):
  195. """\
  196. Clear output buffer, aborting the current output and
  197. discarding all that is in the buffer.
  198. """
  199. if not self.is_open:
  200. raise PortNotOpenError()
  201. if self.logger:
  202. self.logger.info('reset_output_buffer()')
  203. try:
  204. while self.queue.qsize():
  205. self.queue.get_nowait()
  206. except queue.Empty:
  207. pass
  208. @property
  209. def out_waiting(self):
  210. """Return how many bytes the in the outgoing buffer"""
  211. if not self.is_open:
  212. raise PortNotOpenError()
  213. if self.logger:
  214. # attention the logged value can differ from return value in
  215. # threaded environments...
  216. self.logger.debug('out_waiting -> {:d}'.format(self.queue.qsize()))
  217. return self.queue.qsize()
  218. def _update_break_state(self):
  219. """\
  220. Set break: Controls TXD. When active, to transmitting is
  221. possible.
  222. """
  223. if self.logger:
  224. self.logger.info('_update_break_state({!r})'.format(self._break_state))
  225. def _update_rts_state(self):
  226. """Set terminal status line: Request To Send"""
  227. if self.logger:
  228. self.logger.info('_update_rts_state({!r}) -> state of CTS'.format(self._rts_state))
  229. def _update_dtr_state(self):
  230. """Set terminal status line: Data Terminal Ready"""
  231. if self.logger:
  232. self.logger.info('_update_dtr_state({!r}) -> state of DSR'.format(self._dtr_state))
  233. @property
  234. def cts(self):
  235. """Read terminal status line: Clear To Send"""
  236. if not self.is_open:
  237. raise PortNotOpenError()
  238. if self.logger:
  239. self.logger.info('CTS -> state of RTS ({!r})'.format(self._rts_state))
  240. return self._rts_state
  241. @property
  242. def dsr(self):
  243. """Read terminal status line: Data Set Ready"""
  244. if self.logger:
  245. self.logger.info('DSR -> state of DTR ({!r})'.format(self._dtr_state))
  246. return self._dtr_state
  247. @property
  248. def ri(self):
  249. """Read terminal status line: Ring Indicator"""
  250. if not self.is_open:
  251. raise PortNotOpenError()
  252. if self.logger:
  253. self.logger.info('returning dummy for RI')
  254. return False
  255. @property
  256. def cd(self):
  257. """Read terminal status line: Carrier Detect"""
  258. if not self.is_open:
  259. raise PortNotOpenError()
  260. if self.logger:
  261. self.logger.info('returning dummy for CD')
  262. return True
  263. # - - - platform specific - - -
  264. # None so far
  265. # simple client test
  266. if __name__ == '__main__':
  267. import sys
  268. s = Serial('loop://')
  269. sys.stdout.write('{}\n'.format(s))
  270. sys.stdout.write("write...\n")
  271. s.write("hello\n")
  272. s.flush()
  273. sys.stdout.write("read: {!r}\n".format(s.read(5)))
  274. s.close()