"""Base class for SharedKeyDB and VerifierDB.""" | |
import anydbm | |
import thread | |
class BaseDB: | |
def __init__(self, filename, type): | |
self.type = type | |
self.filename = filename | |
if self.filename: | |
self.db = None | |
else: | |
self.db = {} | |
self.lock = thread.allocate_lock() | |
def create(self): | |
"""Create a new on-disk database. | |
@raise anydbm.error: If there's a problem creating the database. | |
""" | |
if self.filename: | |
self.db = anydbm.open(self.filename, "n") #raises anydbm.error | |
self.db["--Reserved--type"] = self.type | |
self.db.sync() | |
else: | |
self.db = {} | |
def open(self): | |
"""Open a pre-existing on-disk database. | |
@raise anydbm.error: If there's a problem opening the database. | |
@raise ValueError: If the database is not of the right type. | |
""" | |
if not self.filename: | |
raise ValueError("Can only open on-disk databases") | |
self.db = anydbm.open(self.filename, "w") #raises anydbm.error | |
try: | |
if self.db["--Reserved--type"] != self.type: | |
raise ValueError("Not a %s database" % self.type) | |
except KeyError: | |
raise ValueError("Not a recognized database") | |
def __getitem__(self, username): | |
if self.db == None: | |
raise AssertionError("DB not open") | |
self.lock.acquire() | |
try: | |
valueStr = self.db[username] | |
finally: | |
self.lock.release() | |
return self._getItem(username, valueStr) | |
def __setitem__(self, username, value): | |
if self.db == None: | |
raise AssertionError("DB not open") | |
valueStr = self._setItem(username, value) | |
self.lock.acquire() | |
try: | |
self.db[username] = valueStr | |
if self.filename: | |
self.db.sync() | |
finally: | |
self.lock.release() | |
def __delitem__(self, username): | |
if self.db == None: | |
raise AssertionError("DB not open") | |
self.lock.acquire() | |
try: | |
del(self.db[username]) | |
if self.filename: | |
self.db.sync() | |
finally: | |
self.lock.release() | |
def __contains__(self, username): | |
"""Check if the database contains the specified username. | |
@type username: str | |
@param username: The username to check for. | |
@rtype: bool | |
@return: True if the database contains the username, False | |
otherwise. | |
""" | |
if self.db == None: | |
raise AssertionError("DB not open") | |
self.lock.acquire() | |
try: | |
return self.db.has_key(username) | |
finally: | |
self.lock.release() | |
def check(self, username, param): | |
value = self.__getitem__(username) | |
return self._checkItem(value, username, param) | |
def keys(self): | |
"""Return a list of usernames in the database. | |
@rtype: list | |
@return: The usernames in the database. | |
""" | |
if self.db == None: | |
raise AssertionError("DB not open") | |
self.lock.acquire() | |
try: | |
usernames = self.db.keys() | |
finally: | |
self.lock.release() | |
usernames = [u for u in usernames if not u.startswith("--Reserved--")] | |
return usernames |