test_odbc.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. # odbc test suite kindly contributed by Frank Millman.
  2. import sys
  3. import os
  4. import unittest
  5. import odbc
  6. import tempfile
  7. from pywin32_testutil import str2bytes, str2memory, TestSkipped
  8. # We use the DAO ODBC driver
  9. from win32com.client.gencache import EnsureDispatch
  10. from win32com.client import constants
  11. import pythoncom
  12. class TestStuff(unittest.TestCase):
  13. def setUp(self):
  14. self.tablename = "pywin32test_users"
  15. self.db_filename = None
  16. self.conn = self.cur = None
  17. try:
  18. # Test any database if a connection string is supplied...
  19. conn_str = os.environ['TEST_ODBC_CONNECTION_STRING']
  20. except KeyError:
  21. # Create a local MSAccess DB for testing.
  22. self.db_filename = tempfile.NamedTemporaryFile().name + '.mdb'
  23. # Create a brand-new database - what is the story with these?
  24. for suffix in (".36", ".35", ".30"):
  25. try:
  26. dbe = EnsureDispatch("DAO.DBEngine" + suffix)
  27. break
  28. except pythoncom.com_error:
  29. pass
  30. else:
  31. raise TestSkipped("Can't find a DB engine")
  32. workspace = dbe.Workspaces(0)
  33. newdb = workspace.CreateDatabase(self.db_filename,
  34. constants.dbLangGeneral,
  35. constants.dbEncrypt)
  36. newdb.Close()
  37. conn_str = "Driver={Microsoft Access Driver (*.mdb)};dbq=%s;Uid=;Pwd=;" \
  38. % (self.db_filename,)
  39. ## print 'Connection string:', conn_str
  40. self.conn = odbc.odbc(conn_str)
  41. # And we expect a 'users' table for these tests.
  42. self.cur = self.conn.cursor()
  43. ## self.cur.setoutputsize(1000)
  44. try:
  45. self.cur.execute("""drop table %s""" %self.tablename)
  46. except (odbc.error, odbc.progError):
  47. pass
  48. ## This needs to be adjusted for sql server syntax for unicode fields
  49. ## - memo -> TEXT
  50. ## - varchar -> nvarchar
  51. self.assertEqual(self.cur.execute(
  52. """create table %s (
  53. userid varchar(25),
  54. username varchar(25),
  55. bitfield bit,
  56. intfield integer,
  57. floatfield float,
  58. datefield datetime,
  59. rawfield varbinary(100),
  60. longtextfield memo,
  61. longbinaryfield image
  62. )""" %self.tablename),-1)
  63. def tearDown(self):
  64. if self.cur is not None:
  65. try:
  66. self.cur.execute("""drop table %s""" %self.tablename)
  67. except (odbc.error, odbc.progError) as why:
  68. print("Failed to delete test table %s" %self.tablename, why)
  69. self.cur.close()
  70. self.cur = None
  71. if self.conn is not None:
  72. self.conn.close()
  73. self.conn = None
  74. if self.db_filename is not None:
  75. try:
  76. os.unlink(self.db_filename)
  77. except OSError:
  78. pass
  79. def test_insert_select(self, userid='Frank', username='Frank Millman'):
  80. self.assertEqual(self.cur.execute("insert into %s (userid, username) \
  81. values (?,?)" %self.tablename, [userid, username]),1)
  82. self.assertEqual(self.cur.execute("select * from %s \
  83. where userid = ?" %self.tablename, [userid.lower()]),0)
  84. self.assertEqual(self.cur.execute("select * from %s \
  85. where username = ?" %self.tablename, [username.lower()]),0)
  86. def test_insert_select_unicode(self, userid='Frank', username="Frank Millman"):
  87. self.assertEqual(self.cur.execute("insert into %s (userid, username)\
  88. values (?,?)" %self.tablename, [userid, username]),1)
  89. self.assertEqual(self.cur.execute("select * from %s \
  90. where userid = ?" %self.tablename, [userid.lower()]),0)
  91. self.assertEqual(self.cur.execute("select * from %s \
  92. where username = ?" %self.tablename, [username.lower()]),0)
  93. def test_insert_select_unicode_ext(self):
  94. userid = "t-\xe0\xf2"
  95. username = "test-\xe0\xf2 name"
  96. self.test_insert_select_unicode(userid, username)
  97. def _test_val(self, fieldName, value):
  98. for x in range(100):
  99. self.cur.execute("delete from %s where userid='Frank'" %self.tablename)
  100. self.assertEqual(self.cur.execute(
  101. "insert into %s (userid, %s) values (?,?)" % (self.tablename, fieldName),
  102. ["Frank", value]), 1)
  103. self.cur.execute("select %s from %s where userid = ?" % (fieldName, self.tablename),
  104. ["Frank"])
  105. rows = self.cur.fetchmany()
  106. self.failUnlessEqual(1, len(rows))
  107. row = rows[0]
  108. self.failUnlessEqual(row[0], value)
  109. def testBit(self):
  110. self._test_val('bitfield', 1)
  111. self._test_val('bitfield', 0)
  112. def testInt(self):
  113. self._test_val('intfield', 1)
  114. self._test_val('intfield', 0)
  115. try:
  116. big = sys.maxsize
  117. except AttributeError:
  118. big = sys.maxint
  119. self._test_val('intfield', big)
  120. def testFloat(self):
  121. self._test_val('floatfield', 1.01)
  122. self._test_val('floatfield', 0)
  123. def testVarchar(self, ):
  124. self._test_val('username', 'foo')
  125. def testLongVarchar(self):
  126. """ Test a long text field in excess of internal cursor data size (65536)"""
  127. self._test_val('longtextfield', 'abc' * 70000)
  128. def testLongBinary(self):
  129. """ Test a long raw field in excess of internal cursor data size (65536)"""
  130. self._test_val('longbinaryfield', str2memory('\0\1\2' * 70000))
  131. def testRaw(self):
  132. ## Test binary data
  133. self._test_val('rawfield', str2memory('\1\2\3\4\0\5\6\7\8'))
  134. def test_widechar(self):
  135. """Test a unicode character that would be mangled if bound as plain character.
  136. For example, previously the below was returned as ascii 'a'
  137. """
  138. self._test_val('username', '\u0101')
  139. def testDates(self):
  140. import datetime
  141. for v in (
  142. (1900, 12, 25, 23, 39, 59),
  143. ):
  144. d = datetime.datetime(*v)
  145. self._test_val('datefield', d)
  146. def test_set_nonzero_length(self):
  147. self.assertEqual(self.cur.execute("insert into %s (userid,username) "
  148. "values (?,?)" %self.tablename, ['Frank', 'Frank Millman']),1)
  149. self.assertEqual(self.cur.execute("update %s set username = ?" %self.tablename,
  150. ['Frank']),1)
  151. self.assertEqual(self.cur.execute("select * from %s" %self.tablename), 0)
  152. self.assertEqual(len(self.cur.fetchone()[1]),5)
  153. def test_set_zero_length(self):
  154. self.assertEqual(self.cur.execute("insert into %s (userid,username) "
  155. "values (?,?)" %self.tablename, [str2bytes('Frank'), '']),1)
  156. self.assertEqual(self.cur.execute("select * from %s" %self.tablename), 0)
  157. self.assertEqual(len(self.cur.fetchone()[1]),0)
  158. def test_set_zero_length_unicode(self):
  159. self.assertEqual(self.cur.execute("insert into %s (userid,username) "
  160. "values (?,?)" %self.tablename, ['Frank', '']),1)
  161. self.assertEqual(self.cur.execute("select * from %s" %self.tablename), 0)
  162. self.assertEqual(len(self.cur.fetchone()[1]),0)
  163. if __name__ == '__main__':
  164. unittest.main()