process_connect_string.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. """ a clumsy attempt at a macro language to let the programmer execute code on the server (ex: determine 64bit)"""
  2. from . import is64bit as is64bit
  3. def macro_call(macro_name, args, kwargs):
  4. """ allow the programmer to perform limited processing on the server by passing macro names and args
  5. :new_key - the key name the macro will create
  6. :args[0] - macro name
  7. :args[1:] - any arguments
  8. :code - the value of the keyword item
  9. :kwargs - the connection keyword dictionary. ??key has been removed
  10. --> the value to put in for kwargs['name'] = value
  11. """
  12. if isinstance(args, (str, str)):
  13. args = [args] # the user forgot to pass a sequence, so make a string into args[0]
  14. new_key = args[0]
  15. try:
  16. if macro_name == "is64bit":
  17. if is64bit.Python(): # if on 64 bit Python
  18. return new_key, args[1] # return first argument
  19. else:
  20. try:
  21. return new_key, args[2] # else return second argument (if defined)
  22. except IndexError:
  23. return new_key, '' # else return blank
  24. elif macro_name == "getuser": # get the name of the user the server is logged in under
  25. if not new_key in kwargs:
  26. import getpass
  27. return new_key, getpass.getuser()
  28. elif macro_name == "getnode": # get the name of the computer running the server
  29. import platform
  30. try:
  31. return new_key, args[1] % platform.node()
  32. except IndexError:
  33. return new_key, platform.node()
  34. elif macro_name == "getenv": # expand the server's environment variable args[1]
  35. try:
  36. dflt = args[2] # if not found, default from args[2]
  37. except IndexError: # or blank
  38. dflt = ''
  39. return new_key, os.environ.get(args[1], dflt)
  40. elif macro_name == "auto_security":
  41. if not 'user' in kwargs or not kwargs['user']: # missing, blank, or Null username
  42. return new_key, 'Integrated Security=SSPI'
  43. return new_key, 'User ID=%(user)s; Password=%(password)s' % kwargs
  44. elif macro_name == "find_temp_test_path": # helper function for testing ado operation -- undocumented
  45. import tempfile, os
  46. return new_key, os.path.join(tempfile.gettempdir(), 'adodbapi_test', args[1])
  47. raise ValueError ('Unknown connect string macro=%s' % macro_name)
  48. except:
  49. raise ValueError ('Error in macro processing %s %s' % (macro_name, repr(args)))
  50. def process(args, kwargs, expand_macros=False): # --> connection string with keyword arguments processed.
  51. """ attempts to inject arguments into a connection string using Python "%" operator for strings
  52. co: adodbapi connection object
  53. args: positional parameters from the .connect() call
  54. kvargs: keyword arguments from the .connect() call
  55. """
  56. try:
  57. dsn = args[0]
  58. except IndexError:
  59. dsn = None
  60. if isinstance(dsn, dict): # as a convenience the first argument may be django settings
  61. kwargs.update(dsn)
  62. elif dsn: # the connection string is passed to the connection as part of the keyword dictionary
  63. kwargs['connection_string'] = dsn
  64. try:
  65. a1 = args[1]
  66. except IndexError:
  67. a1 = None
  68. # historically, the second positional argument might be a timeout value
  69. if isinstance(a1, int):
  70. kwargs['timeout'] = a1
  71. # if the second positional argument is a string, then it is user
  72. elif isinstance(a1, str):
  73. kwargs['user'] = a1
  74. # if the second positional argument is a dictionary, use it as keyword arguments, too
  75. elif isinstance(a1, dict):
  76. kwargs.update(a1)
  77. try:
  78. kwargs['password'] = args[2] # the third positional argument is password
  79. kwargs['host'] = args[3] # the fourth positional argument is host name
  80. kwargs['database'] = args[4] # the fifth positional argument is database name
  81. except IndexError:
  82. pass
  83. # make sure connection string is defined somehow
  84. if not 'connection_string' in kwargs:
  85. try: # perhaps 'dsn' was defined
  86. kwargs['connection_string'] = kwargs['dsn']
  87. except KeyError:
  88. try: # as a last effort, use the "host" keyword
  89. kwargs['connection_string'] = kwargs['host']
  90. except KeyError:
  91. raise TypeError ("Must define 'connection_string' for ado connections")
  92. if expand_macros:
  93. for kwarg in list(kwargs.keys()):
  94. if kwarg.startswith('macro_'): # If a key defines a macro
  95. macro_name = kwarg[6:] # name without the "macro_"
  96. macro_code = kwargs.pop(kwarg) # we remove the macro_key and get the code to execute
  97. new_key, rslt = macro_call(macro_name, macro_code, kwargs) # run the code in the local context
  98. kwargs[new_key] = rslt # put the result back in the keywords dict
  99. # special processing for PyRO IPv6 host address
  100. try:
  101. s = kwargs['proxy_host']
  102. if ':' in s: # it is an IPv6 address
  103. if s[0] != '[': # is not surrounded by brackets
  104. kwargs['proxy_host'] = s.join(('[',']')) # put it in brackets
  105. except KeyError:
  106. pass
  107. return kwargs