naming_storage.py 18 KB


  1. """
  2. Name Server persistent storage implementations.
  3. Pyro - Python Remote Objects. Copyright by Irmen de Jong (irmen@razorvine.net).
  4. """
  5. import re
  6. import logging
  7. import sys
  8. import threading
  9. if sys.version_info <= (3, 4):
  10. from collections import MutableMapping
  11. else:
  12. from collections.abc import MutableMapping
  13. from contextlib import closing
  14. from Pyro4.errors import NamingError
  15. try:
  16. import anydbm as dbm # python 2
  17. except ImportError:
  18. try:
  19. import dbm # python 3
  20. except ImportError:
  21. dbm = None
  22. except Exception as x:
  23. # pypy can generate a distutils error somehow if dbm is not available
  24. dbm = None
  25. try:
  26. import sqlite3
  27. except ImportError:
  28. sqlite3 = None
  29. log = logging.getLogger("Pyro4.naming_storage")
  30. class SqlStorage(MutableMapping):
  31. """
  32. Sqlite-based storage.
  33. It is just a single (name,uri) table for the names and another table for the metadata.
  34. Sqlite db connection objects aren't thread-safe, so a new connection is created in every method.
  35. """
  36. def __init__(self, dbfile):
  37. if dbfile == ":memory:":
  38. raise ValueError("We don't support the sqlite :memory: database type. Just use the default volatile in-memory store.")
  39. self.dbfile = dbfile
  40. with closing(sqlite3.connect(dbfile)) as db:
  41. db.execute("PRAGMA foreign_keys=ON")
  42. try:
  43. db.execute("SELECT COUNT(*) FROM pyro_names").fetchone()
  44. except sqlite3.OperationalError:
  45. # the table does not yet exist
  46. self._create_schema(db)
  47. else:
  48. # check if we need to update the existing schema
  49. try:
  50. db.execute("SELECT COUNT(*) FROM pyro_metadata").fetchone()
  51. except sqlite3.OperationalError:
  52. # metadata schema needs to be created and existing data migrated
  53. db.execute("ALTER TABLE pyro_names RENAME TO pyro_names_old")
  54. self._create_schema(db)
  55. db.execute("INSERT INTO pyro_names(name, uri) SELECT name, uri FROM pyro_names_old")
  56. db.execute("DROP TABLE pyro_names_old")
  57. db.commit()
  58. def _create_schema(self, db):
  59. db.execute("""CREATE TABLE pyro_names
  60. (
  61. id integer PRIMARY KEY,
  62. name nvarchar NOT NULL UNIQUE,
  63. uri nvarchar NOT NULL
  64. );""")
  65. db.execute("""CREATE TABLE pyro_metadata
  66. (
  67. object integer NOT NULL,
  68. metadata nvarchar NOT NULL,
  69. FOREIGN KEY(object) REFERENCES pyro_names(id)
  70. );""")
  71. def __getattr__(self, item):
  72. raise NotImplementedError("SqlStorage doesn't implement method/attribute '" + item + "'")
  73. def __getitem__(self, item):
  74. try:
  75. with closing(sqlite3.connect(self.dbfile)) as db:
  76. result = db.execute("SELECT id, uri FROM pyro_names WHERE name=?", (item,)).fetchone()
  77. if result:
  78. dbid, uri = result
  79. metadata = {m[0] for m in db.execute("SELECT metadata FROM pyro_metadata WHERE object=?", (dbid,)).fetchall()}
  80. return uri, metadata
  81. else:
  82. raise KeyError(item)
  83. except sqlite3.DatabaseError as e:
  84. raise NamingError("sqlite error in getitem: " + str(e))
  85. def __setitem__(self, key, value):
  86. uri, metadata = value
  87. try:
  88. with closing(sqlite3.connect(self.dbfile)) as db:
  89. cursor = db.cursor()
  90. cursor.execute("PRAGMA foreign_keys=ON")
  91. dbid = cursor.execute("SELECT id FROM pyro_names WHERE name=?", (key,)).fetchone()
  92. if dbid:
  93. dbid = dbid[0]
  94. cursor.execute("DELETE FROM pyro_metadata WHERE object=?", (dbid,))
  95. cursor.execute("DELETE FROM pyro_names WHERE id=?", (dbid,))
  96. cursor.execute("INSERT INTO pyro_names(name, uri) VALUES(?,?)", (key, uri))
  97. if metadata:
  98. object_id = cursor.lastrowid
  99. for m in metadata:
  100. cursor.execute("INSERT INTO pyro_metadata(object, metadata) VALUES (?,?)", (object_id, m))
  101. cursor.close()
  102. db.commit()
  103. except sqlite3.DatabaseError as e:
  104. raise NamingError("sqlite error in setitem: " + str(e))
  105. def __len__(self):
  106. try:
  107. with closing(sqlite3.connect(self.dbfile)) as db:
  108. return db.execute("SELECT count(*) FROM pyro_names").fetchone()[0]
  109. except sqlite3.DatabaseError as e:
  110. raise NamingError("sqlite error in len: " + str(e))
  111. def __contains__(self, item):
  112. try:
  113. with closing(sqlite3.connect(self.dbfile)) as db:
  114. return db.execute("SELECT EXISTS(SELECT 1 FROM pyro_names WHERE name=? LIMIT 1)", (item,)).fetchone()[0]
  115. except sqlite3.DatabaseError as e:
  116. raise NamingError("sqlite error in contains: " + str(e))
  117. def __delitem__(self, key):
  118. try:
  119. with closing(sqlite3.connect(self.dbfile)) as db:
  120. db.execute("PRAGMA foreign_keys=ON")
  121. dbid = db.execute("SELECT id FROM pyro_names WHERE name=?", (key,)).fetchone()
  122. if dbid:
  123. dbid = dbid[0]
  124. db.execute("DELETE FROM pyro_metadata WHERE object=?", (dbid,))
  125. db.execute("DELETE FROM pyro_names WHERE id=?", (dbid,))
  126. db.commit()
  127. except sqlite3.DatabaseError as e:
  128. raise NamingError("sqlite error in delitem: " + str(e))
  129. def __iter__(self):
  130. try:
  131. with closing(sqlite3.connect(self.dbfile)) as db:
  132. result = db.execute("SELECT name FROM pyro_names")
  133. return iter([n[0] for n in result.fetchall()])
  134. except sqlite3.DatabaseError as e:
  135. raise NamingError("sqlite error in iter: " + str(e))
  136. def clear(self):
  137. try:
  138. with closing(sqlite3.connect(self.dbfile)) as db:
  139. db.execute("PRAGMA foreign_keys=ON")
  140. db.execute("DELETE FROM pyro_metadata")
  141. db.execute("DELETE FROM pyro_names")
  142. db.commit()
  143. with closing(sqlite3.connect(self.dbfile, isolation_level=None)) as db:
  144. db.execute("VACUUM") # this cannot run inside a transaction.
  145. except sqlite3.DatabaseError as e:
  146. raise NamingError("sqlite error in clear: " + str(e))
  147. def optimized_prefix_list(self, prefix, return_metadata=False):
  148. try:
  149. with closing(sqlite3.connect(self.dbfile)) as db:
  150. names = {}
  151. if return_metadata:
  152. for dbid, name, uri in db.execute("SELECT id, name, uri FROM pyro_names WHERE name LIKE ?", (prefix + '%',)).fetchall():
  153. metadata = {m[0] for m in db.execute("SELECT metadata FROM pyro_metadata WHERE object=?", (dbid,)).fetchall()}
  154. names[name] = uri, metadata
  155. else:
  156. for name, uri in db.execute("SELECT name, uri FROM pyro_names WHERE name LIKE ?", (prefix + '%',)).fetchall():
  157. names[name] = uri
  158. return names
  159. except sqlite3.DatabaseError as e:
  160. raise NamingError("sqlite error in optimized_prefix_list: " + str(e))
  161. def optimized_regex_list(self, regex, return_metadata=False):
  162. # defining a regex function isn't much better than simply regexing ourselves over the full table.
  163. return None
  164. def optimized_metadata_search(self, metadata_all=None, metadata_any=None, return_metadata=False):
  165. try:
  166. with closing(sqlite3.connect(self.dbfile)) as db:
  167. if metadata_any:
  168. # any of the given metadata
  169. params = list(metadata_any)
  170. sql = "SELECT id, name, uri FROM pyro_names WHERE id IN (SELECT object FROM pyro_metadata WHERE metadata IN ({seq}))" \
  171. .format(seq=",".join(['?'] * len(metadata_any)))
  172. else:
  173. # all of the given metadata
  174. params = list(metadata_all)
  175. params.append(len(metadata_all))
  176. sql = "SELECT id, name, uri FROM pyro_names WHERE id IN (SELECT object FROM pyro_metadata WHERE metadata IN ({seq}) " \
  177. "GROUP BY object HAVING COUNT(metadata)=?)".format(seq=",".join(['?'] * len(metadata_all)))
  178. result = db.execute(sql, params).fetchall()
  179. if return_metadata:
  180. names = {}
  181. for dbid, name, uri in result:
  182. metadata = {m[0] for m in db.execute("SELECT metadata FROM pyro_metadata WHERE object=?", (dbid,)).fetchall()}
  183. names[name] = uri, metadata
  184. else:
  185. names = {name: uri for (dbid, name, uri) in result}
  186. return names
  187. except sqlite3.DatabaseError as e:
  188. raise NamingError("sqlite error in optimized_metadata_search: " + str(e))
  189. def remove_items(self, items):
  190. try:
  191. with closing(sqlite3.connect(self.dbfile)) as db:
  192. db.execute("PRAGMA foreign_keys=ON")
  193. for item in items:
  194. dbid = db.execute("SELECT id FROM pyro_names WHERE name=?", (item,)).fetchone()
  195. if dbid:
  196. dbid = dbid[0]
  197. db.execute("DELETE FROM pyro_metadata WHERE object=?", (dbid,))
  198. db.execute("DELETE FROM pyro_names WHERE id=?", (dbid,))
  199. db.commit()
  200. except sqlite3.DatabaseError as e:
  201. raise NamingError("sqlite error in remove_items: " + str(e))
  202. def everything(self, return_metadata=False):
  203. try:
  204. with closing(sqlite3.connect(self.dbfile)) as db:
  205. names = {}
  206. if return_metadata:
  207. for dbid, name, uri in db.execute("SELECT id, name, uri FROM pyro_names").fetchall():
  208. metadata = {m[0] for m in db.execute("SELECT metadata FROM pyro_metadata WHERE object=?", (dbid,)).fetchall()}
  209. names[name] = uri, metadata
  210. else:
  211. for name, uri in db.execute("SELECT name, uri FROM pyro_names").fetchall():
  212. names[name] = uri
  213. return names
  214. except sqlite3.DatabaseError as e:
  215. raise NamingError("sqlite error in everything: " + str(e))
  216. def close(self):
  217. pass
  218. class DbmStorage(MutableMapping):
  219. """
  220. Storage implementation that uses a persistent dbm file.
  221. Because dbm only supports strings as key/value, we encode/decode them in utf-8.
  222. Dbm files cannot be accessed concurrently, so a strict concurrency model
  223. is used where only one operation is processed at the same time
  224. (this is very slow when compared to the in-memory storage)
  225. DbmStorage does NOT support storing metadata! It only accepts empty metadata,
  226. and always returns empty metadata.
  227. """
  228. def __init__(self, dbmfile):
  229. self.dbmfile = dbmfile
  230. db = dbm.open(self.dbmfile, "c", mode=0o600)
  231. db.close()
  232. self.lock = threading.Lock()
  233. def __getattr__(self, item):
  234. raise NotImplementedError("DbmStorage doesn't implement method/attribute '" + item + "'")
  235. def __getitem__(self, item):
  236. item = item.encode("utf-8")
  237. with self.lock:
  238. try:
  239. with closing(dbm.open(self.dbmfile)) as db:
  240. return db[item].decode("utf-8"), frozenset() # always return empty metadata
  241. except dbm.error as e:
  242. raise NamingError("dbm error in getitem: " + str(e))
  243. def __setitem__(self, key, value):
  244. uri, metadata = value
  245. if metadata:
  246. log.warning("DbmStorage doesn't support metadata, silently discarded")
  247. key = key.encode("utf-8")
  248. uri = uri.encode("utf-8")
  249. with self.lock:
  250. try:
  251. with closing(dbm.open(self.dbmfile, "w")) as db:
  252. db[key] = uri
  253. except dbm.error as e:
  254. raise NamingError("dbm error in setitem: " + str(e))
  255. def __len__(self):
  256. with self.lock:
  257. try:
  258. with closing(dbm.open(self.dbmfile)) as db:
  259. return len(db)
  260. except dbm.error as e:
  261. raise NamingError("dbm error in len: " + str(e))
  262. def __contains__(self, item):
  263. item = item.encode("utf-8")
  264. with self.lock:
  265. try:
  266. with closing(dbm.open(self.dbmfile)) as db:
  267. return item in db
  268. except dbm.error as e:
  269. raise NamingError("dbm error in contains: " + str(e))
  270. def __delitem__(self, key):
  271. key = key.encode("utf-8")
  272. with self.lock:
  273. try:
  274. with closing(dbm.open(self.dbmfile, "w")) as db:
  275. del db[key]
  276. except dbm.error as e:
  277. raise NamingError("dbm error in delitem: " + str(e))
  278. def __iter__(self):
  279. with self.lock:
  280. try:
  281. with closing(dbm.open(self.dbmfile)) as db:
  282. return iter([key.decode("utf-8") for key in db.keys()])
  283. except dbm.error as e:
  284. raise NamingError("dbm error in iter: " + str(e))
  285. def clear(self):
  286. with self.lock:
  287. try:
  288. with closing(dbm.open(self.dbmfile, "w")) as db:
  289. if hasattr(db, "clear"):
  290. db.clear()
  291. else:
  292. for key in db.keys():
  293. del db[key]
  294. except dbm.error as e:
  295. raise NamingError("dbm error in clear: " + str(e))
  296. def optimized_prefix_list(self, prefix, return_metadata=False):
  297. with self.lock:
  298. try:
  299. with closing(dbm.open(self.dbmfile)) as db:
  300. result = {}
  301. if hasattr(db, "items"):
  302. for key, value in db.items():
  303. key = key.decode("utf-8")
  304. if key.startswith(prefix):
  305. uri = value.decode("utf-8")
  306. result[key] = (uri, frozenset()) if return_metadata else uri # always return empty metadata
  307. else:
  308. for key in db.keys():
  309. keystr = key.decode("utf-8")
  310. if keystr.startswith(prefix):
  311. uri = db[key].decode("utf-8")
  312. result[keystr] = (uri, frozenset()) if return_metadata else uri # always return empty metadata
  313. return result
  314. except dbm.error as e:
  315. raise NamingError("dbm error in optimized_prefix_list: " + str(e))
  316. def optimized_regex_list(self, regex, return_metadata=False):
  317. try:
  318. regex = re.compile(regex + "$") # add end of string marker
  319. except re.error as x:
  320. raise NamingError("invalid regex: " + str(x))
  321. with self.lock:
  322. try:
  323. with closing(dbm.open(self.dbmfile)) as db:
  324. result = {}
  325. if hasattr(db, "items"):
  326. for key, value in db.items():
  327. key = key.decode("utf-8")
  328. if regex.match(key):
  329. uri = value.decode("utf-8")
  330. result[key] = (uri, frozenset()) if return_metadata else uri # always return empty metadata
  331. else:
  332. for key in db.keys():
  333. keystr = key.decode("utf-8")
  334. if regex.match(keystr):
  335. uri = db[key].decode("utf-8")
  336. result[keystr] = (uri, frozenset()) if return_metadata else uri # always return empty metadata
  337. return result
  338. except dbm.error as e:
  339. raise NamingError("dbm error in optimized_regex_list: " + str(e))
  340. def optimized_metadata_search(self, metadata_all=None, metadata_any=None, return_metadata=False):
  341. if metadata_all or metadata_any:
  342. raise NamingError("DbmStorage doesn't support metadata")
  343. return self.everything(return_metadata)
  344. def remove_items(self, items):
  345. with self.lock:
  346. try:
  347. with closing(dbm.open(self.dbmfile, "w")) as db:
  348. for item in items:
  349. try:
  350. del db[item.encode("utf-8")]
  351. except KeyError:
  352. pass
  353. except dbm.error as e:
  354. raise NamingError("dbm error in remove_items: " + str(e))
  355. def everything(self, return_metadata=False):
  356. with self.lock:
  357. try:
  358. with closing(dbm.open(self.dbmfile)) as db:
  359. result = {}
  360. if hasattr(db, "items"):
  361. for key, value in db.items():
  362. uri = value.decode("utf-8")
  363. result[key.decode("utf-8")] = (uri, frozenset()) if return_metadata else uri # always return empty metadata
  364. else:
  365. for key in db.keys():
  366. uri = db[key].decode("utf-8")
  367. result[key.decode("utf-8")] = (uri, frozenset()) if return_metadata else uri # always return empty metadata
  368. return result
  369. except dbm.error as e:
  370. raise NamingError("dbm error in everything: " + str(e))
  371. def close(self):
  372. pass