test_win32trace.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
  1. import unittest
  2. import win32trace
  3. import threading
  4. import time
  5. import os
  6. import sys
  7. if __name__=='__main__':
  8. this_file = sys.argv[0]
  9. else:
  10. this_file = __file__
  11. def CheckNoOtherReaders():
  12. win32trace.write("Hi")
  13. time.sleep(0.05)
  14. if win32trace.read() != "Hi":
  15. # Reset everything so following tests still fail with this error!
  16. win32trace.TermRead()
  17. win32trace.TermWrite()
  18. raise RuntimeError("An existing win32trace reader appears to be " \
  19. "running - please stop this process and try again")
  20. class TestInitOps(unittest.TestCase):
  21. def setUp(self):
  22. # clear old data
  23. win32trace.InitRead()
  24. win32trace.read()
  25. win32trace.TermRead()
  26. def tearDown(self):
  27. try:
  28. win32trace.TermRead()
  29. except win32trace.error:
  30. pass
  31. try:
  32. win32trace.TermWrite()
  33. except win32trace.error:
  34. pass
  35. def testInitTermRead(self):
  36. self.assertRaises(win32trace.error, win32trace.read)
  37. win32trace.InitRead()
  38. result = win32trace.read()
  39. self.assertEquals(result, '')
  40. win32trace.TermRead()
  41. self.assertRaises(win32trace.error, win32trace.read)
  42. win32trace.InitRead()
  43. self.assertRaises(win32trace.error, win32trace.InitRead)
  44. win32trace.InitWrite()
  45. self.assertRaises(win32trace.error, win32trace.InitWrite)
  46. win32trace.TermWrite()
  47. win32trace.TermRead()
  48. def testInitTermWrite(self):
  49. self.assertRaises(win32trace.error, win32trace.write, 'Hei')
  50. win32trace.InitWrite()
  51. win32trace.write('Johan Galtung')
  52. win32trace.TermWrite()
  53. self.assertRaises(win32trace.error, win32trace.write, 'Hei')
  54. def testTermSematics(self):
  55. win32trace.InitWrite()
  56. win32trace.write('Ta da')
  57. # if we both Write and Read are terminated at the same time,
  58. # we lose the data as the win32 object is closed. Note that
  59. # if another writer is running, we do *not* lose the data - so
  60. # test for either the correct data or an empty string
  61. win32trace.TermWrite()
  62. win32trace.InitRead()
  63. self.failUnless(win32trace.read() in ['Ta da', ''])
  64. win32trace.TermRead()
  65. # we keep the data because we init read before terminating write
  66. win32trace.InitWrite()
  67. win32trace.write('Ta da')
  68. win32trace.InitRead()
  69. win32trace.TermWrite()
  70. self.assertEquals('Ta da', win32trace.read())
  71. win32trace.TermRead()
  72. class BasicSetupTearDown(unittest.TestCase):
  73. def setUp(self):
  74. win32trace.InitRead()
  75. # If any other writers are running (even if not actively writing),
  76. # terminating the module will *not* close the handle, meaning old data
  77. # will remain. This can cause other tests to fail.
  78. win32trace.read()
  79. win32trace.InitWrite()
  80. def tearDown(self):
  81. win32trace.TermWrite()
  82. win32trace.TermRead()
  83. class TestModuleOps(BasicSetupTearDown):
  84. def testRoundTrip(self):
  85. win32trace.write('Syver Enstad')
  86. syverEnstad = win32trace.read()
  87. self.assertEquals('Syver Enstad', syverEnstad)
  88. def testRoundTripUnicode(self):
  89. win32trace.write('\xa9opyright Syver Enstad')
  90. syverEnstad = win32trace.read()
  91. # str objects are always returned in py2k (latin-1 encoding was used
  92. # on unicode objects)
  93. self.assertEquals('\xa9opyright Syver Enstad', syverEnstad)
  94. def testBlockingRead(self):
  95. win32trace.write('Syver Enstad')
  96. self.assertEquals('Syver Enstad', win32trace.blockingread())
  97. def testBlockingReadUnicode(self):
  98. win32trace.write('\xa9opyright Syver Enstad')
  99. # str objects are always returned in py2k (latin-1 encoding was used
  100. # on unicode objects)
  101. self.assertEquals('\xa9opyright Syver Enstad', win32trace.blockingread())
  102. def testFlush(self):
  103. win32trace.flush()
  104. class TestTraceObjectOps(BasicSetupTearDown):
  105. def testInit(self):
  106. win32trace.TermRead()
  107. win32trace.TermWrite()
  108. traceObject = win32trace.GetTracer()
  109. self.assertRaises(win32trace.error, traceObject.read)
  110. self.assertRaises(win32trace.error, traceObject.write, '')
  111. win32trace.InitRead()
  112. win32trace.InitWrite()
  113. self.assertEquals('', traceObject.read())
  114. traceObject.write('Syver')
  115. def testFlush(self):
  116. traceObject = win32trace.GetTracer()
  117. traceObject.flush()
  118. def testIsatty(self):
  119. tracer = win32trace.GetTracer()
  120. assert tracer.isatty() == False
  121. def testRoundTrip(self):
  122. traceObject = win32trace.GetTracer()
  123. traceObject.write('Syver Enstad')
  124. self.assertEquals('Syver Enstad', traceObject.read())
  125. class WriterThread(threading.Thread):
  126. def run(self):
  127. self.writeCount = 0
  128. for each in range(self.BucketCount):
  129. win32trace.write(str(each))
  130. self.writeCount = self.BucketCount
  131. def verifyWritten(self):
  132. return self.writeCount == self.BucketCount
  133. class TestMultipleThreadsWriting(unittest.TestCase):
  134. # FullBucket is the thread count
  135. FullBucket = 50
  136. BucketCount = 9 # buckets must be a single digit number (ie. less than 10)
  137. def setUp(self):
  138. WriterThread.BucketCount = self.BucketCount
  139. win32trace.InitRead()
  140. win32trace.read() # clear any old data.
  141. win32trace.InitWrite()
  142. CheckNoOtherReaders()
  143. self.threads = [WriterThread() for each in range(self.FullBucket)]
  144. self.buckets = list(range(self.BucketCount))
  145. for each in self.buckets:
  146. self.buckets[each] = 0
  147. def tearDown(self):
  148. win32trace.TermRead()
  149. win32trace.TermWrite()
  150. def areBucketsFull(self):
  151. bucketsAreFull = True
  152. for each in self.buckets:
  153. assert each <= self.FullBucket, each
  154. if each != self.FullBucket:
  155. bucketsAreFull = False
  156. break
  157. return bucketsAreFull
  158. def read(self):
  159. while 1:
  160. readString = win32trace.blockingread()
  161. for ch in readString:
  162. integer = int(ch)
  163. count = self.buckets[integer]
  164. assert count != -1
  165. self.buckets[integer] = count + 1
  166. if self.buckets[integer] == self.FullBucket:
  167. if self.areBucketsFull():
  168. return
  169. def testThreads(self):
  170. for each in self.threads:
  171. each.start()
  172. self.read()
  173. for each in self.threads:
  174. each.join()
  175. for each in self.threads:
  176. assert each.verifyWritten()
  177. assert self.areBucketsFull()
  178. class TestHugeChunks(unittest.TestCase):
  179. # BiggestChunk is the size where we stop stressing the writer
  180. BiggestChunk = 2**16 # 256k should do it.
  181. def setUp(self):
  182. win32trace.InitRead()
  183. win32trace.read() # clear any old data
  184. win32trace.InitWrite()
  185. def testHugeChunks(self):
  186. data = "*" * 1023 + "\n"
  187. while len(data) <= self.BiggestChunk:
  188. win32trace.write(data)
  189. data = data + data
  190. # If we made it here, we passed.
  191. def tearDown(self):
  192. win32trace.TermRead()
  193. win32trace.TermWrite()
  194. import win32event
  195. import win32process
  196. class TraceWriteProcess:
  197. def __init__(self, threadCount):
  198. self.exitCode = -1
  199. self.threadCount = threadCount
  200. def start(self):
  201. procHandle, threadHandle, procId, threadId = win32process.CreateProcess(
  202. None, # appName
  203. 'python.exe "%s" /run_test_process %s %s' % (this_file,
  204. self.BucketCount,
  205. self.threadCount),
  206. None, # process security
  207. None, # thread security
  208. 0, # inherit handles
  209. win32process.NORMAL_PRIORITY_CLASS,
  210. None, # new environment
  211. None, # Current directory
  212. win32process.STARTUPINFO(), # startup info
  213. )
  214. self.processHandle = procHandle
  215. def join(self):
  216. win32event.WaitForSingleObject(self.processHandle,
  217. win32event.INFINITE)
  218. self.exitCode = win32process.GetExitCodeProcess(self.processHandle)
  219. def verifyWritten(self):
  220. return self.exitCode == 0
  221. class TestOutofProcess(unittest.TestCase):
  222. BucketCount = 9
  223. FullBucket = 50
  224. def setUp(self):
  225. win32trace.InitRead()
  226. TraceWriteProcess.BucketCount = self.BucketCount
  227. self.setUpWriters()
  228. self.buckets = list(range(self.BucketCount))
  229. for each in self.buckets:
  230. self.buckets[each] = 0
  231. def tearDown(self):
  232. win32trace.TermRead()
  233. def setUpWriters(self):
  234. self.processes = []
  235. # 5 processes, quot threads in each process
  236. quot, remainder = divmod(self.FullBucket, 5)
  237. for each in range(5):
  238. self.processes.append(TraceWriteProcess(quot))
  239. if remainder:
  240. self.processes.append(TraceWriteProcess(remainder))
  241. def areBucketsFull(self):
  242. bucketsAreFull = True
  243. for each in self.buckets:
  244. assert each <= self.FullBucket, each
  245. if each != self.FullBucket:
  246. bucketsAreFull = False
  247. break
  248. return bucketsAreFull
  249. def read(self):
  250. while 1:
  251. readString = win32trace.blockingread()
  252. for ch in readString:
  253. integer = int(ch)
  254. count = self.buckets[integer]
  255. assert count != -1
  256. self.buckets[integer] = count + 1
  257. if self.buckets[integer] == self.FullBucket:
  258. if self.areBucketsFull():
  259. return
  260. def testProcesses(self):
  261. for each in self.processes:
  262. each.start()
  263. self.read()
  264. for each in self.processes:
  265. each.join()
  266. for each in self.processes:
  267. assert each.verifyWritten()
  268. assert self.areBucketsFull()
  269. def _RunAsTestProcess():
  270. # Run as an external process by the main tests.
  271. WriterThread.BucketCount = int(sys.argv[2])
  272. threadCount = int(sys.argv[3])
  273. threads = [WriterThread() for each in range(threadCount)]
  274. win32trace.InitWrite()
  275. for t in threads:
  276. t.start()
  277. for t in threads:
  278. t.join()
  279. for t in threads:
  280. if not t.verifyWritten():
  281. sys.exit(-1)
  282. if __name__ == '__main__':
  283. if sys.argv[1:2]==["/run_test_process"]:
  284. _RunAsTestProcess()
  285. sys.exit(0)
  286. # If some other win32traceutil reader is running, these tests fail
  287. # badly (as the other reader sometimes sees the output!)
  288. win32trace.InitRead()
  289. win32trace.InitWrite()
  290. CheckNoOtherReaders()
  291. # reset state so test env is back to normal
  292. win32trace.TermRead()
  293. win32trace.TermWrite()
  294. unittest.main()