protocol_spy.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. #! python
  2. #
  3. # This module implements a special URL handler that wraps an other port,
  4. # print the traffic for debugging purposes. With this, it is possible
  5. # to debug the serial port traffic on every application that uses
  6. # serial_for_url.
  7. #
  8. # This file is part of pySerial. https://github.com/pyserial/pyserial
  9. # (C) 2015 Chris Liechti <cliechti@gmx.net>
  10. #
  11. # SPDX-License-Identifier: BSD-3-Clause
  12. #
  13. # URL format: spy://port[?option[=value][&option[=value]]]
  14. # options:
  15. # - dev=X a file or device to write to
  16. # - color use escape code to colorize output
  17. # - raw forward raw bytes instead of hexdump
  18. #
  19. # example:
  20. # redirect output to an other terminal window on Posix (Linux):
  21. # python -m serial.tools.miniterm spy:///dev/ttyUSB0?dev=/dev/pts/14\&color
  22. from __future__ import absolute_import
  23. import sys
  24. import time
  25. import serial
  26. from serial.serialutil import to_bytes
  27. try:
  28. import urlparse
  29. except ImportError:
  30. import urllib.parse as urlparse
  31. def sixteen(data):
  32. """\
  33. yield tuples of hex and ASCII display in multiples of 16. Includes a
  34. space after 8 bytes and (None, None) after 16 bytes and at the end.
  35. """
  36. n = 0
  37. for b in serial.iterbytes(data):
  38. yield ('{:02X} '.format(ord(b)), b.decode('ascii') if b' ' <= b < b'\x7f' else '.')
  39. n += 1
  40. if n == 8:
  41. yield (' ', '')
  42. elif n >= 16:
  43. yield (None, None)
  44. n = 0
  45. if n > 0:
  46. while n < 16:
  47. n += 1
  48. if n == 8:
  49. yield (' ', '')
  50. yield (' ', ' ')
  51. yield (None, None)
  52. def hexdump(data):
  53. """yield lines with hexdump of data"""
  54. values = []
  55. ascii = []
  56. offset = 0
  57. for h, a in sixteen(data):
  58. if h is None:
  59. yield (offset, ' '.join([''.join(values), ''.join(ascii)]))
  60. del values[:]
  61. del ascii[:]
  62. offset += 0x10
  63. else:
  64. values.append(h)
  65. ascii.append(a)
  66. class FormatRaw(object):
  67. """Forward only RX and TX data to output."""
  68. def __init__(self, output, color):
  69. self.output = output
  70. self.color = color
  71. self.rx_color = '\x1b[32m'
  72. self.tx_color = '\x1b[31m'
  73. def rx(self, data):
  74. """show received data"""
  75. if self.color:
  76. self.output.write(self.rx_color)
  77. self.output.write(data)
  78. self.output.flush()
  79. def tx(self, data):
  80. """show transmitted data"""
  81. if self.color:
  82. self.output.write(self.tx_color)
  83. self.output.write(data)
  84. self.output.flush()
  85. def control(self, name, value):
  86. """(do not) show control calls"""
  87. pass
  88. class FormatHexdump(object):
  89. """\
  90. Create a hex dump of RX ad TX data, show when control lines are read or
  91. written.
  92. output example::
  93. 000000.000 Q-RX flushInput
  94. 000002.469 RTS inactive
  95. 000002.773 RTS active
  96. 000003.001 TX 48 45 4C 4C 4F HELLO
  97. 000003.102 RX 48 45 4C 4C 4F HELLO
  98. """
  99. def __init__(self, output, color):
  100. self.start_time = time.time()
  101. self.output = output
  102. self.color = color
  103. self.rx_color = '\x1b[32m'
  104. self.tx_color = '\x1b[31m'
  105. self.control_color = '\x1b[37m'
  106. def write_line(self, timestamp, label, value, value2=''):
  107. self.output.write('{:010.3f} {:4} {}{}\n'.format(timestamp, label, value, value2))
  108. self.output.flush()
  109. def rx(self, data):
  110. """show received data as hex dump"""
  111. if self.color:
  112. self.output.write(self.rx_color)
  113. if data:
  114. for offset, row in hexdump(data):
  115. self.write_line(time.time() - self.start_time, 'RX', '{:04X} '.format(offset), row)
  116. else:
  117. self.write_line(time.time() - self.start_time, 'RX', '<empty>')
  118. def tx(self, data):
  119. """show transmitted data as hex dump"""
  120. if self.color:
  121. self.output.write(self.tx_color)
  122. for offset, row in hexdump(data):
  123. self.write_line(time.time() - self.start_time, 'TX', '{:04X} '.format(offset), row)
  124. def control(self, name, value):
  125. """show control calls"""
  126. if self.color:
  127. self.output.write(self.control_color)
  128. self.write_line(time.time() - self.start_time, name, value)
  129. class Serial(serial.Serial):
  130. """\
  131. Inherit the native Serial port implementation and wrap all the methods and
  132. attributes.
  133. """
  134. # pylint: disable=no-member
  135. def __init__(self, *args, **kwargs):
  136. super(Serial, self).__init__(*args, **kwargs)
  137. self.formatter = None
  138. self.show_all = False
  139. @serial.Serial.port.setter
  140. def port(self, value):
  141. if value is not None:
  142. serial.Serial.port.__set__(self, self.from_url(value))
  143. def from_url(self, url):
  144. """extract host and port from an URL string"""
  145. parts = urlparse.urlsplit(url)
  146. if parts.scheme != 'spy':
  147. raise serial.SerialException(
  148. 'expected a string in the form '
  149. '"spy://port[?option[=value][&option[=value]]]": '
  150. 'not starting with spy:// ({!r})'.format(parts.scheme))
  151. # process options now, directly altering self
  152. formatter = FormatHexdump
  153. color = False
  154. output = sys.stderr
  155. try:
  156. for option, values in urlparse.parse_qs(parts.query, True).items():
  157. if option == 'file':
  158. output = open(values[0], 'w')
  159. elif option == 'color':
  160. color = True
  161. elif option == 'raw':
  162. formatter = FormatRaw
  163. elif option == 'all':
  164. self.show_all = True
  165. else:
  166. raise ValueError('unknown option: {!r}'.format(option))
  167. except ValueError as e:
  168. raise serial.SerialException(
  169. 'expected a string in the form '
  170. '"spy://port[?option[=value][&option[=value]]]": {}'.format(e))
  171. self.formatter = formatter(output, color)
  172. return ''.join([parts.netloc, parts.path])
  173. def write(self, tx):
  174. tx = to_bytes(tx)
  175. self.formatter.tx(tx)
  176. return super(Serial, self).write(tx)
  177. def read(self, size=1):
  178. rx = super(Serial, self).read(size)
  179. if rx or self.show_all:
  180. self.formatter.rx(rx)
  181. return rx
  182. if hasattr(serial.Serial, 'cancel_read'):
  183. def cancel_read(self):
  184. self.formatter.control('Q-RX', 'cancel_read')
  185. super(Serial, self).cancel_read()
  186. if hasattr(serial.Serial, 'cancel_write'):
  187. def cancel_write(self):
  188. self.formatter.control('Q-TX', 'cancel_write')
  189. super(Serial, self).cancel_write()
  190. @property
  191. def in_waiting(self):
  192. n = super(Serial, self).in_waiting
  193. if self.show_all:
  194. self.formatter.control('Q-RX', 'in_waiting -> {}'.format(n))
  195. return n
  196. def flush(self):
  197. self.formatter.control('Q-TX', 'flush')
  198. super(Serial, self).flush()
  199. def reset_input_buffer(self):
  200. self.formatter.control('Q-RX', 'reset_input_buffer')
  201. super(Serial, self).reset_input_buffer()
  202. def reset_output_buffer(self):
  203. self.formatter.control('Q-TX', 'reset_output_buffer')
  204. super(Serial, self).reset_output_buffer()
  205. def send_break(self, duration=0.25):
  206. self.formatter.control('BRK', 'send_break {}s'.format(duration))
  207. super(Serial, self).send_break(duration)
  208. @serial.Serial.break_condition.setter
  209. def break_condition(self, level):
  210. self.formatter.control('BRK', 'active' if level else 'inactive')
  211. serial.Serial.break_condition.__set__(self, level)
  212. @serial.Serial.rts.setter
  213. def rts(self, level):
  214. self.formatter.control('RTS', 'active' if level else 'inactive')
  215. serial.Serial.rts.__set__(self, level)
  216. @serial.Serial.dtr.setter
  217. def dtr(self, level):
  218. self.formatter.control('DTR', 'active' if level else 'inactive')
  219. serial.Serial.dtr.__set__(self, level)
  220. @serial.Serial.cts.getter
  221. def cts(self):
  222. level = super(Serial, self).cts
  223. self.formatter.control('CTS', 'active' if level else 'inactive')
  224. return level
  225. @serial.Serial.dsr.getter
  226. def dsr(self):
  227. level = super(Serial, self).dsr
  228. self.formatter.control('DSR', 'active' if level else 'inactive')
  229. return level
  230. @serial.Serial.ri.getter
  231. def ri(self):
  232. level = super(Serial, self).ri
  233. self.formatter.control('RI', 'active' if level else 'inactive')
  234. return level
  235. @serial.Serial.cd.getter
  236. def cd(self):
  237. level = super(Serial, self).cd
  238. self.formatter.control('CD', 'active' if level else 'inactive')
  239. return level
  240. # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  241. if __name__ == '__main__':
  242. ser = Serial(None)
  243. ser.port = 'spy:///dev/ttyS0'
  244. print(ser)