win32comport_demo.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. # This is a simple serial port terminal demo.
  2. #
  3. # Its primary purpose is to demonstrate the native serial port access offered via
  4. # win32file.
  5. # It uses 3 threads:
  6. # - The main thread, which cranks up the other 2 threads, then simply waits for them to exit.
  7. # - The user-input thread - blocks waiting for a keyboard character, and when found sends it
  8. # out the COM port. If the character is Ctrl+C, it stops, signalling the COM port thread to stop.
  9. # - The COM port thread is simply listening for input on the COM port, and prints it to the screen.
  10. # This demo uses userlapped IO, so that none of the read or write operations actually block (however,
  11. # in this sample, the very next thing we do _is_ block - so it shows off the concepts even though it
  12. # doesnt exploit them.
  13. from win32file import * # The base COM port and file IO functions.
  14. from win32event import * # We use events and the WaitFor[Multiple]Objects functions.
  15. import win32con # constants.
  16. import msvcrt # For the getch() function.
  17. import threading
  18. import sys
  19. def FindModem():
  20. # Snoop over the comports, seeing if it is likely we have a modem.
  21. for i in range(1,5):
  22. port = "COM%d" % (i,)
  23. try:
  24. handle = CreateFile(port,
  25. win32con.GENERIC_READ | win32con.GENERIC_WRITE,
  26. 0, # exclusive access
  27. None, # no security
  28. win32con.OPEN_EXISTING,
  29. win32con.FILE_ATTRIBUTE_NORMAL,
  30. None)
  31. # It appears that an available COM port will always success here,
  32. # just return 0 for the status flags. We only care that it has _any_ status
  33. # flags (and therefore probably a real modem)
  34. if GetCommModemStatus(handle) != 0:
  35. return port
  36. except error:
  37. pass # No port, or modem status failed.
  38. return None
  39. # A basic synchronous COM port file-like object
  40. class SerialTTY:
  41. def __init__(self, port):
  42. if type(port)==type(0):
  43. port = "COM%d" % (port,)
  44. self.handle = CreateFile(port,
  45. win32con.GENERIC_READ | win32con.GENERIC_WRITE,
  46. 0, # exclusive access
  47. None, # no security
  48. win32con.OPEN_EXISTING,
  49. win32con.FILE_ATTRIBUTE_NORMAL | win32con.FILE_FLAG_OVERLAPPED,
  50. None)
  51. # Tell the port we want a notification on each char.
  52. SetCommMask(self.handle, EV_RXCHAR)
  53. # Setup a 4k buffer
  54. SetupComm(self.handle, 4096, 4096)
  55. # Remove anything that was there
  56. PurgeComm(self.handle, PURGE_TXABORT | PURGE_RXABORT | PURGE_TXCLEAR | PURGE_RXCLEAR )
  57. # Setup for overlapped IO.
  58. timeouts = 0xFFFFFFFF, 0, 1000, 0, 1000
  59. SetCommTimeouts(self.handle, timeouts)
  60. # Setup the connection info.
  61. dcb = GetCommState( self.handle )
  62. dcb.BaudRate = CBR_115200
  63. dcb.ByteSize = 8
  64. dcb.Parity = NOPARITY
  65. dcb.StopBits = ONESTOPBIT
  66. SetCommState(self.handle, dcb)
  67. print("Connected to %s at %s baud" % (port, dcb.BaudRate))
  68. def _UserInputReaderThread(self):
  69. overlapped = OVERLAPPED()
  70. overlapped.hEvent = CreateEvent(None, 1, 0, None)
  71. try:
  72. while 1:
  73. ch = msvcrt.getch()
  74. if ord(ch)==3:
  75. break
  76. WriteFile(self.handle, ch, overlapped)
  77. # Wait for the write to complete.
  78. WaitForSingleObject(overlapped.hEvent, INFINITE)
  79. finally:
  80. SetEvent(self.eventStop)
  81. def _ComPortThread(self):
  82. overlapped = OVERLAPPED()
  83. overlapped.hEvent = CreateEvent(None, 1, 0, None)
  84. while 1:
  85. # XXX - note we could _probably_ just use overlapped IO on the win32file.ReadFile() statement
  86. # XXX but this tests the COM stuff!
  87. rc, mask = WaitCommEvent(self.handle, overlapped)
  88. if rc == 0: # Character already ready!
  89. SetEvent(overlapped.hEvent)
  90. rc = WaitForMultipleObjects([overlapped.hEvent, self.eventStop], 0, INFINITE)
  91. if rc == WAIT_OBJECT_0:
  92. # Some input - read and print it
  93. flags, comstat = ClearCommError( self.handle )
  94. rc, data = ReadFile(self.handle, comstat.cbInQue, overlapped)
  95. WaitForSingleObject(overlapped.hEvent, INFINITE)
  96. sys.stdout.write(data)
  97. else:
  98. # Stop the thread!
  99. # Just incase the user input thread uis still going, close it
  100. sys.stdout.close()
  101. break
  102. def Run(self):
  103. self.eventStop = CreateEvent(None, 0, 0, None)
  104. # Start the reader and writer threads.
  105. user_thread = threading.Thread(target = self._UserInputReaderThread)
  106. user_thread.start()
  107. com_thread = threading.Thread(target = self._ComPortThread)
  108. com_thread.start()
  109. user_thread.join()
  110. com_thread.join()
  111. if __name__=='__main__':
  112. print("Serial port terminal demo - press Ctrl+C to exit")
  113. if len(sys.argv)<=1:
  114. port = FindModem()
  115. if port is None:
  116. print("No COM port specified, and no modem could be found")
  117. print("Please re-run this script with the name of a COM port (eg COM3)")
  118. sys.exit(1)
  119. else:
  120. port = sys.argv[1]
  121. tty = SerialTTY(port)
  122. tty.Run()