protocol_cp2110.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. #! python
  2. #
  3. # Backend for Silicon Labs CP2110/4 HID-to-UART devices.
  4. #
  5. # This file is part of pySerial. https://github.com/pyserial/pyserial
  6. # (C) 2001-2015 Chris Liechti <cliechti@gmx.net>
  7. # (C) 2019 Google LLC
  8. #
  9. # SPDX-License-Identifier: BSD-3-Clause
  10. # This backend implements support for HID-to-UART devices manufactured
  11. # by Silicon Labs and marketed as CP2110 and CP2114. The
  12. # implementation is (mostly) OS-independent and in userland. It relies
  13. # on cython-hidapi (https://github.com/trezor/cython-hidapi).
  14. # The HID-to-UART protocol implemented by CP2110/4 is described in the
  15. # AN434 document from Silicon Labs:
  16. # https://www.silabs.com/documents/public/application-notes/AN434-CP2110-4-Interface-Specification.pdf
  17. # TODO items:
  18. # - rtscts support is configured for hardware flow control, but the
  19. # signaling is missing (AN434 suggests this is done through GPIO).
  20. # - Cancelling reads and writes is not supported.
  21. # - Baudrate validation is not implemented, as it depends on model and configuration.
  22. import struct
  23. import threading
  24. try:
  25. import urlparse
  26. except ImportError:
  27. import urllib.parse as urlparse
  28. try:
  29. import Queue
  30. except ImportError:
  31. import queue as Queue
  32. import hid # hidapi
  33. import serial
  34. from serial.serialutil import SerialBase, SerialException, PortNotOpenError, to_bytes, Timeout
  35. # Report IDs and related constant
  36. _REPORT_GETSET_UART_ENABLE = 0x41
  37. _DISABLE_UART = 0x00
  38. _ENABLE_UART = 0x01
  39. _REPORT_SET_PURGE_FIFOS = 0x43
  40. _PURGE_TX_FIFO = 0x01
  41. _PURGE_RX_FIFO = 0x02
  42. _REPORT_GETSET_UART_CONFIG = 0x50
  43. _REPORT_SET_TRANSMIT_LINE_BREAK = 0x51
  44. _REPORT_SET_STOP_LINE_BREAK = 0x52
  45. class Serial(SerialBase):
  46. # This is not quite correct. AN343 specifies that the minimum
  47. # baudrate is different between CP2110 and CP2114, and it's halved
  48. # when using non-8-bit symbols.
  49. BAUDRATES = (300, 375, 600, 1200, 1800, 2400, 4800, 9600, 19200,
  50. 38400, 57600, 115200, 230400, 460800, 500000, 576000,
  51. 921600, 1000000)
  52. def __init__(self, *args, **kwargs):
  53. self._hid_handle = None
  54. self._read_buffer = None
  55. self._thread = None
  56. super(Serial, self).__init__(*args, **kwargs)
  57. def open(self):
  58. if self._port is None:
  59. raise SerialException("Port must be configured before it can be used.")
  60. if self.is_open:
  61. raise SerialException("Port is already open.")
  62. self._read_buffer = Queue.Queue()
  63. self._hid_handle = hid.device()
  64. try:
  65. portpath = self.from_url(self.portstr)
  66. self._hid_handle.open_path(portpath)
  67. except OSError as msg:
  68. raise SerialException(msg.errno, "could not open port {}: {}".format(self._port, msg))
  69. try:
  70. self._reconfigure_port()
  71. except:
  72. try:
  73. self._hid_handle.close()
  74. except:
  75. pass
  76. self._hid_handle = None
  77. raise
  78. else:
  79. self.is_open = True
  80. self._thread = threading.Thread(target=self._hid_read_loop)
  81. self._thread.setDaemon(True)
  82. self._thread.setName('pySerial CP2110 reader thread for {}'.format(self._port))
  83. self._thread.start()
  84. def from_url(self, url):
  85. parts = urlparse.urlsplit(url)
  86. if parts.scheme != "cp2110":
  87. raise SerialException(
  88. 'expected a string in the forms '
  89. '"cp2110:///dev/hidraw9" or "cp2110://0001:0023:00": '
  90. 'not starting with cp2110:// {{!r}}'.format(parts.scheme))
  91. if parts.netloc: # cp2100://BUS:DEVICE:ENDPOINT, for libusb
  92. return parts.netloc.encode('utf-8')
  93. return parts.path.encode('utf-8')
  94. def close(self):
  95. self.is_open = False
  96. if self._thread:
  97. self._thread.join(1) # read timeout is 0.1
  98. self._thread = None
  99. self._hid_handle.close()
  100. self._hid_handle = None
  101. def _reconfigure_port(self):
  102. parity_value = None
  103. if self._parity == serial.PARITY_NONE:
  104. parity_value = 0x00
  105. elif self._parity == serial.PARITY_ODD:
  106. parity_value = 0x01
  107. elif self._parity == serial.PARITY_EVEN:
  108. parity_value = 0x02
  109. elif self._parity == serial.PARITY_MARK:
  110. parity_value = 0x03
  111. elif self._parity == serial.PARITY_SPACE:
  112. parity_value = 0x04
  113. else:
  114. raise ValueError('Invalid parity: {!r}'.format(self._parity))
  115. if self.rtscts:
  116. flow_control_value = 0x01
  117. else:
  118. flow_control_value = 0x00
  119. data_bits_value = None
  120. if self._bytesize == 5:
  121. data_bits_value = 0x00
  122. elif self._bytesize == 6:
  123. data_bits_value = 0x01
  124. elif self._bytesize == 7:
  125. data_bits_value = 0x02
  126. elif self._bytesize == 8:
  127. data_bits_value = 0x03
  128. else:
  129. raise ValueError('Invalid char len: {!r}'.format(self._bytesize))
  130. stop_bits_value = None
  131. if self._stopbits == serial.STOPBITS_ONE:
  132. stop_bits_value = 0x00
  133. elif self._stopbits == serial.STOPBITS_ONE_POINT_FIVE:
  134. stop_bits_value = 0x01
  135. elif self._stopbits == serial.STOPBITS_TWO:
  136. stop_bits_value = 0x01
  137. else:
  138. raise ValueError('Invalid stop bit specification: {!r}'.format(self._stopbits))
  139. configuration_report = struct.pack(
  140. '>BLBBBB',
  141. _REPORT_GETSET_UART_CONFIG,
  142. self._baudrate,
  143. parity_value,
  144. flow_control_value,
  145. data_bits_value,
  146. stop_bits_value)
  147. self._hid_handle.send_feature_report(configuration_report)
  148. self._hid_handle.send_feature_report(
  149. bytes((_REPORT_GETSET_UART_ENABLE, _ENABLE_UART)))
  150. self._update_break_state()
  151. @property
  152. def in_waiting(self):
  153. return self._read_buffer.qsize()
  154. def reset_input_buffer(self):
  155. if not self.is_open:
  156. raise PortNotOpenError()
  157. self._hid_handle.send_feature_report(
  158. bytes((_REPORT_SET_PURGE_FIFOS, _PURGE_RX_FIFO)))
  159. # empty read buffer
  160. while self._read_buffer.qsize():
  161. self._read_buffer.get(False)
  162. def reset_output_buffer(self):
  163. if not self.is_open:
  164. raise PortNotOpenError()
  165. self._hid_handle.send_feature_report(
  166. bytes((_REPORT_SET_PURGE_FIFOS, _PURGE_TX_FIFO)))
  167. def _update_break_state(self):
  168. if not self._hid_handle:
  169. raise PortNotOpenError()
  170. if self._break_state:
  171. self._hid_handle.send_feature_report(
  172. bytes((_REPORT_SET_TRANSMIT_LINE_BREAK, 0)))
  173. else:
  174. # Note that while AN434 states "There are no data bytes in
  175. # the payload other than the Report ID", either hidapi or
  176. # Linux does not seem to send the report otherwise.
  177. self._hid_handle.send_feature_report(
  178. bytes((_REPORT_SET_STOP_LINE_BREAK, 0)))
  179. def read(self, size=1):
  180. if not self.is_open:
  181. raise PortNotOpenError()
  182. data = bytearray()
  183. try:
  184. timeout = Timeout(self._timeout)
  185. while len(data) < size:
  186. if self._thread is None:
  187. raise SerialException('connection failed (reader thread died)')
  188. buf = self._read_buffer.get(True, timeout.time_left())
  189. if buf is None:
  190. return bytes(data)
  191. data += buf
  192. if timeout.expired():
  193. break
  194. except Queue.Empty: # -> timeout
  195. pass
  196. return bytes(data)
  197. def write(self, data):
  198. if not self.is_open:
  199. raise PortNotOpenError()
  200. data = to_bytes(data)
  201. tx_len = len(data)
  202. while tx_len > 0:
  203. to_be_sent = min(tx_len, 0x3F)
  204. report = to_bytes([to_be_sent]) + data[:to_be_sent]
  205. self._hid_handle.write(report)
  206. data = data[to_be_sent:]
  207. tx_len = len(data)
  208. def _hid_read_loop(self):
  209. try:
  210. while self.is_open:
  211. data = self._hid_handle.read(64, timeout_ms=100)
  212. if not data:
  213. continue
  214. data_len = data.pop(0)
  215. assert data_len == len(data)
  216. self._read_buffer.put(bytearray(data))
  217. finally:
  218. self._thread = None