serialcli.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. #! python
  2. #
  3. # Backend for .NET/Mono (IronPython), .NET >= 2
  4. #
  5. # This file is part of pySerial. https://github.com/pyserial/pyserial
  6. # (C) 2008-2015 Chris Liechti <cliechti@gmx.net>
  7. #
  8. # SPDX-License-Identifier: BSD-3-Clause
  9. from __future__ import absolute_import
  10. import System
  11. import System.IO.Ports
  12. from serial.serialutil import *
  13. # must invoke function with byte array, make a helper to convert strings
  14. # to byte arrays
  15. sab = System.Array[System.Byte]
  16. def as_byte_array(string):
  17. return sab([ord(x) for x in string]) # XXX will require adaption when run with a 3.x compatible IronPython
  18. class Serial(SerialBase):
  19. """Serial port implementation for .NET/Mono."""
  20. BAUDRATES = (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800,
  21. 9600, 19200, 38400, 57600, 115200)
  22. def open(self):
  23. """\
  24. Open port with current settings. This may throw a SerialException
  25. if the port cannot be opened.
  26. """
  27. if self._port is None:
  28. raise SerialException("Port must be configured before it can be used.")
  29. if self.is_open:
  30. raise SerialException("Port is already open.")
  31. try:
  32. self._port_handle = System.IO.Ports.SerialPort(self.portstr)
  33. except Exception as msg:
  34. self._port_handle = None
  35. raise SerialException("could not open port %s: %s" % (self.portstr, msg))
  36. # if RTS and/or DTR are not set before open, they default to True
  37. if self._rts_state is None:
  38. self._rts_state = True
  39. if self._dtr_state is None:
  40. self._dtr_state = True
  41. self._reconfigure_port()
  42. self._port_handle.Open()
  43. self.is_open = True
  44. if not self._dsrdtr:
  45. self._update_dtr_state()
  46. if not self._rtscts:
  47. self._update_rts_state()
  48. self.reset_input_buffer()
  49. def _reconfigure_port(self):
  50. """Set communication parameters on opened port."""
  51. if not self._port_handle:
  52. raise SerialException("Can only operate on a valid port handle")
  53. #~ self._port_handle.ReceivedBytesThreshold = 1
  54. if self._timeout is None:
  55. self._port_handle.ReadTimeout = System.IO.Ports.SerialPort.InfiniteTimeout
  56. else:
  57. self._port_handle.ReadTimeout = int(self._timeout * 1000)
  58. # if self._timeout != 0 and self._interCharTimeout is not None:
  59. # timeouts = (int(self._interCharTimeout * 1000),) + timeouts[1:]
  60. if self._write_timeout is None:
  61. self._port_handle.WriteTimeout = System.IO.Ports.SerialPort.InfiniteTimeout
  62. else:
  63. self._port_handle.WriteTimeout = int(self._write_timeout * 1000)
  64. # Setup the connection info.
  65. try:
  66. self._port_handle.BaudRate = self._baudrate
  67. except IOError as e:
  68. # catch errors from illegal baudrate settings
  69. raise ValueError(str(e))
  70. if self._bytesize == FIVEBITS:
  71. self._port_handle.DataBits = 5
  72. elif self._bytesize == SIXBITS:
  73. self._port_handle.DataBits = 6
  74. elif self._bytesize == SEVENBITS:
  75. self._port_handle.DataBits = 7
  76. elif self._bytesize == EIGHTBITS:
  77. self._port_handle.DataBits = 8
  78. else:
  79. raise ValueError("Unsupported number of data bits: %r" % self._bytesize)
  80. if self._parity == PARITY_NONE:
  81. self._port_handle.Parity = getattr(System.IO.Ports.Parity, 'None') # reserved keyword in Py3k
  82. elif self._parity == PARITY_EVEN:
  83. self._port_handle.Parity = System.IO.Ports.Parity.Even
  84. elif self._parity == PARITY_ODD:
  85. self._port_handle.Parity = System.IO.Ports.Parity.Odd
  86. elif self._parity == PARITY_MARK:
  87. self._port_handle.Parity = System.IO.Ports.Parity.Mark
  88. elif self._parity == PARITY_SPACE:
  89. self._port_handle.Parity = System.IO.Ports.Parity.Space
  90. else:
  91. raise ValueError("Unsupported parity mode: %r" % self._parity)
  92. if self._stopbits == STOPBITS_ONE:
  93. self._port_handle.StopBits = System.IO.Ports.StopBits.One
  94. elif self._stopbits == STOPBITS_ONE_POINT_FIVE:
  95. self._port_handle.StopBits = System.IO.Ports.StopBits.OnePointFive
  96. elif self._stopbits == STOPBITS_TWO:
  97. self._port_handle.StopBits = System.IO.Ports.StopBits.Two
  98. else:
  99. raise ValueError("Unsupported number of stop bits: %r" % self._stopbits)
  100. if self._rtscts and self._xonxoff:
  101. self._port_handle.Handshake = System.IO.Ports.Handshake.RequestToSendXOnXOff
  102. elif self._rtscts:
  103. self._port_handle.Handshake = System.IO.Ports.Handshake.RequestToSend
  104. elif self._xonxoff:
  105. self._port_handle.Handshake = System.IO.Ports.Handshake.XOnXOff
  106. else:
  107. self._port_handle.Handshake = getattr(System.IO.Ports.Handshake, 'None') # reserved keyword in Py3k
  108. #~ def __del__(self):
  109. #~ self.close()
  110. def close(self):
  111. """Close port"""
  112. if self.is_open:
  113. if self._port_handle:
  114. try:
  115. self._port_handle.Close()
  116. except System.IO.Ports.InvalidOperationException:
  117. # ignore errors. can happen for unplugged USB serial devices
  118. pass
  119. self._port_handle = None
  120. self.is_open = False
  121. # - - - - - - - - - - - - - - - - - - - - - - - -
  122. @property
  123. def in_waiting(self):
  124. """Return the number of characters currently in the input buffer."""
  125. if not self.is_open:
  126. raise PortNotOpenError()
  127. return self._port_handle.BytesToRead
  128. def read(self, size=1):
  129. """\
  130. Read size bytes from the serial port. If a timeout is set it may
  131. return less characters as requested. With no timeout it will block
  132. until the requested number of bytes is read.
  133. """
  134. if not self.is_open:
  135. raise PortNotOpenError()
  136. # must use single byte reads as this is the only way to read
  137. # without applying encodings
  138. data = bytearray()
  139. while size:
  140. try:
  141. data.append(self._port_handle.ReadByte())
  142. except System.TimeoutException:
  143. break
  144. else:
  145. size -= 1
  146. return bytes(data)
  147. def write(self, data):
  148. """Output the given string over the serial port."""
  149. if not self.is_open:
  150. raise PortNotOpenError()
  151. #~ if not isinstance(data, (bytes, bytearray)):
  152. #~ raise TypeError('expected %s or bytearray, got %s' % (bytes, type(data)))
  153. try:
  154. # must call overloaded method with byte array argument
  155. # as this is the only one not applying encodings
  156. self._port_handle.Write(as_byte_array(data), 0, len(data))
  157. except System.TimeoutException:
  158. raise SerialTimeoutException('Write timeout')
  159. return len(data)
  160. def reset_input_buffer(self):
  161. """Clear input buffer, discarding all that is in the buffer."""
  162. if not self.is_open:
  163. raise PortNotOpenError()
  164. self._port_handle.DiscardInBuffer()
  165. def reset_output_buffer(self):
  166. """\
  167. Clear output buffer, aborting the current output and
  168. discarding all that is in the buffer.
  169. """
  170. if not self.is_open:
  171. raise PortNotOpenError()
  172. self._port_handle.DiscardOutBuffer()
  173. def _update_break_state(self):
  174. """
  175. Set break: Controls TXD. When active, to transmitting is possible.
  176. """
  177. if not self.is_open:
  178. raise PortNotOpenError()
  179. self._port_handle.BreakState = bool(self._break_state)
  180. def _update_rts_state(self):
  181. """Set terminal status line: Request To Send"""
  182. if not self.is_open:
  183. raise PortNotOpenError()
  184. self._port_handle.RtsEnable = bool(self._rts_state)
  185. def _update_dtr_state(self):
  186. """Set terminal status line: Data Terminal Ready"""
  187. if not self.is_open:
  188. raise PortNotOpenError()
  189. self._port_handle.DtrEnable = bool(self._dtr_state)
  190. @property
  191. def cts(self):
  192. """Read terminal status line: Clear To Send"""
  193. if not self.is_open:
  194. raise PortNotOpenError()
  195. return self._port_handle.CtsHolding
  196. @property
  197. def dsr(self):
  198. """Read terminal status line: Data Set Ready"""
  199. if not self.is_open:
  200. raise PortNotOpenError()
  201. return self._port_handle.DsrHolding
  202. @property
  203. def ri(self):
  204. """Read terminal status line: Ring Indicator"""
  205. if not self.is_open:
  206. raise PortNotOpenError()
  207. #~ return self._port_handle.XXX
  208. return False # XXX an error would be better
  209. @property
  210. def cd(self):
  211. """Read terminal status line: Carrier Detect"""
  212. if not self.is_open:
  213. raise PortNotOpenError()
  214. return self._port_handle.CDHolding
  215. # - - platform specific - - - -
  216. # none