Merge SyncApp's SqliteUserManager and the newly introduced UserManager and group all user managers in user_managers.py.

This commit is contained in:
Christoph Mack 2016-05-27 21:33:22 +02:00 committed by flan
parent 573aeece81
commit c7d7ff3e85
5 changed files with 158 additions and 159 deletions

View File

@ -38,6 +38,8 @@ from anki.sync import Syncer, MediaSyncer
from anki.utils import intTime, checksum, isMac
from anki.consts import SYNC_ZIP_SIZE, SYNC_ZIP_COUNT
from ankisyncd.users import SimpleUserManager, SqliteUserManager
try:
from cStringIO import StringIO
except ImportError:
@ -316,25 +318,6 @@ class SimpleSessionManager(object):
def delete(self, hkey):
del self.sessions[hkey]
class SimpleUserManager(object):
"""A simple user manager that always allows any user."""
def authenticate(self, username, password):
"""
Returns True if this username is allowed to connect with this password. False otherwise.
Override this to change how users are authenticated.
"""
return True
def username2dirname(self, username):
"""
Returns the directory name for the given user. By default, this is just the username.
Override this to adjust the mapping between users and their directory.
"""
return username
class SyncApp(object):
valid_urls = SyncCollectionHandler.operations + SyncMediaHandler.operations + ['hostKey', 'upload', 'download']
@ -694,34 +677,6 @@ class SqliteSessionManager(SimpleSessionManager):
cursor.execute("DELETE FROM session WHERE hkey=?", (hkey,))
conn.commit()
class SqliteUserManager(SimpleUserManager):
"""Authenticates users against a SQLite database."""
def __init__(self, auth_db_path):
self.auth_db_path = os.path.abspath(auth_db_path)
def authenticate(self, username, password):
"""Returns True if this username is allowed to connect with this password. False otherwise."""
conn = sqlite.connect(self.auth_db_path)
cursor = conn.cursor()
param = (username,)
cursor.execute("SELECT hash FROM auth WHERE user=?", param)
db_ret = cursor.fetchone()
if db_ret != None:
db_hash = str(db_ret[0])
salt = db_hash[-16:]
hashobj = hashlib.sha256()
hashobj.update(username+password+salt)
conn.close()
return (db_ret != None and hashobj.hexdigest()+salt == db_hash)
def make_app(global_conf, **local_conf):
return SyncApp(**local_conf)

View File

@ -2,25 +2,57 @@
import binascii
from contextlib import closing
import hashlib
import logging
import os
import sqlite3 as sqlite
class UserManager:
def __init__(self, auth_db_path, collection_path):
self.auth_db_path = auth_db_path
class SimpleUserManager(object):
"""A simple user manager that always allows any user."""
def __init__(self, collection_path=''):
self.collection_path = collection_path
def authenticate(self, username, password):
"""
Returns True if this username is allowed to connect with this password.
False otherwise. Override this to change how users are authenticated.
"""
return True
def username2dirname(self, username):
"""
Returns the directory name for the given user. By default, this is just
the username. Override this to adjust the mapping between users and
their directory.
"""
return username
def _create_user_dir(self, username):
user_dir_path = os.path.join(self.collection_path, username)
if not os.path.isdir(user_dir_path):
logging.info("Creating collection directory for user '{}' at {}"
.format(username, user_dir_path))
os.makedirs(user_dir_path)
class SqliteUserManager(SimpleUserManager):
"""Authenticates users against a SQLite database."""
def __init__(self, auth_db_path, collection_path=None):
SimpleUserManager.__init__(self, collection_path)
self.auth_db_path = auth_db_path
def auth_db_exists(self):
return os.path.isfile(self.auth_db_path)
def user_list(self):
if not self.auth_db_exists():
self.create_auth_db()
return []
raise ValueError("Cannot list users for nonexistent auth db {}."
.format(self.auth_db_path))
else:
conn = sqlite.connect(self.auth_db_path)
cursor = conn.cursor()
@ -37,8 +69,9 @@ class UserManager:
def del_user(self, username):
if not self.auth_db_exists():
self.create_auth_db()
raise ValueError("Cannot remove user from nonexistent auth db {}."
.format(self.auth_db_path))
else:
conn = sqlite.connect(self.auth_db_path)
cursor = conn.cursor()
logging.info("Removing user '{}' from auth db."
@ -69,6 +102,59 @@ class UserManager:
conn.commit()
conn.close()
def set_password_for_user(self, username, new_password):
if not self.auth_db_exists():
raise ValueError("Cannot remove user from nonexistent auth db {}."
.format(self.auth_db_path))
elif not self.user_exists(username):
raise ValueError("Cannot remove nonexistent user {}."
.format(username))
else:
hash = self._create_pass_hash(username, new_password)
conn = sqlite.connect(self.auth_db_path)
cursor = conn.cursor()
cursor.execute("UPDATE auth SET hash=? WHERE user=?", (hash, username))
conn.commit()
conn.close()
logging.info("Changed password for user {}.".format(username))
def authenticate_user(self, username, password):
"""Returns True if this username is allowed to connect with this password. False otherwise."""
conn = sqlite.connect(self.auth_db_path)
cursor = conn.cursor()
param = (username,)
cursor.execute("SELECT hash FROM auth WHERE user=?", param)
db_hash = cursor.fetchone()
conn.close()
if db_hash is None:
logging.info("Authentication failed for nonexistent user {}."
.format(username))
return False
else:
expected_value = str(db_hash[0])
salt = self._extract_salt(expected_value)
hashobj = hashlib.sha256()
hashobj.update(username + password + salt)
actual_value = hashobj.hexdigest() + salt
if actual_value == expected_value:
logging.info("Authentication succeeded for user {}."
.format(username))
return True
else:
logging.info("Authentication failed for user {}."
.format(username))
return False
@staticmethod
def _extract_salt(hash):
return hash[-16:]
@staticmethod
def _create_pass_hash(username, password):
salt = binascii.b2a_hex(os.urandom(8))
@ -85,10 +171,3 @@ class UserManager:
(user VARCHAR PRIMARY KEY, hash VARCHAR)""")
conn.commit()
conn.close()
def _create_user_dir(self, username):
user_dir_path = os.path.join(self.collection_path, username)
if not os.path.isdir(user_dir_path):
logging.info("Creating collection directory for user '{}' at {}"
.format(username, user_dir_path))
os.makedirs(user_dir_path)

View File

@ -6,7 +6,7 @@ import unittest
from webtest import TestApp
from ankisyncd.users import UserManager
from ankisyncd.users import SqliteUserManager
from helpers.collection_utils import CollectionUtils
from helpers.db_utils import DBUtils
from helpers.file_utils import FileUtils
@ -45,7 +45,7 @@ class SyncAppFunctionalTestBase(unittest.TestCase):
self.server_paths = self.serverutils.create_server_paths()
# Add a test user to the temp auth db the server will use.
self.user_manager = UserManager(self.server_paths['auth_db'],
self.user_manager = SqliteUserManager(self.server_paths['auth_db'],
self.server_paths['data_root'])
self.user_manager.add_user('testuser', 'testpassword')

View File

@ -57,84 +57,6 @@ class SyncCollectionHandlerTest(CollectionTestBase):
self.assertEqual(meta['cont'], True)
class SimpleUserManagerTest(unittest.TestCase):
_good_test_un = 'username'
_good_test_pw = 'password'
_bad_test_un = 'notAUsername'
_bad_test_pw = 'notAPassword'
def setUp(self):
self._user_manager = SimpleUserManager()
def tearDown(self):
self._user_manager = None
def test_authenticate(self):
self.assertTrue(self._user_manager.authenticate(self._good_test_un,
self._good_test_pw))
self.assertTrue(self._user_manager.authenticate(self._bad_test_un,
self._bad_test_pw))
self.assertTrue(self._user_manager.authenticate(self._good_test_un,
self._bad_test_pw))
self.assertTrue(self._user_manager.authenticate(self._bad_test_un,
self._good_test_pw))
def test_username2dirname(self):
dirname = self._user_manager.username2dirname(self._good_test_un)
self.assertEqual(dirname, self._good_test_un)
class SqliteUserManagerTest(SimpleUserManagerTest):
file_descriptor, _test_auth_db_path = tempfile.mkstemp(suffix=".db")
os.close(file_descriptor)
os.unlink(_test_auth_db_path)
def _create_test_auth_db(self, db_path, username, password):
if os.path.exists(db_path):
os.remove(db_path)
salt = binascii.b2a_hex(os.urandom(8))
crypto_hash = hashlib.sha256(username+password+salt).hexdigest()+salt
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("""CREATE TABLE IF NOT EXISTS auth
(user VARCHAR PRIMARY KEY, hash VARCHAR)""")
cursor.execute("INSERT INTO auth VALUES (?, ?)", (username, crypto_hash))
conn.commit()
conn.close()
def setUp(self):
self._create_test_auth_db(self._test_auth_db_path,
self._good_test_un,
self._good_test_pw)
self._user_manager = SqliteUserManager(self._test_auth_db_path)
def tearDown(self):
if os.path.exists(self._test_auth_db_path):
os.remove(self._test_auth_db_path)
def test_authenticate(self):
self.assertTrue(self._user_manager.authenticate(self._good_test_un,
self._good_test_pw))
self.assertFalse(self._user_manager.authenticate(self._bad_test_un,
self._bad_test_pw))
self.assertFalse(self._user_manager.authenticate(self._good_test_un,
self._bad_test_pw))
self.assertFalse(self._user_manager.authenticate(self._bad_test_un,
self._good_test_pw))
class SimpleSessionManagerTest(unittest.TestCase):
test_hkey = '1234567890'
test_session = SyncUserSession('testName', 'testPath', None, None)

View File

@ -5,17 +5,39 @@ import os
import unittest
from ankisyncd.users import UserManager
from ankisyncd.users import SimpleUserManager, SqliteUserManager
from helpers.file_utils import FileUtils
class SimpleUserManagerTest(unittest.TestCase):
_good_test_un = 'username'
_good_test_pw = 'password'
def setUp(self):
self.user_manager = SimpleUserManager()
_bad_test_un = 'notAUsername'
_bad_test_pw = 'notAPassword'
def tearDown(self):
self._user_manager = None
def test_authenticate(self):
good_test_un = 'username'
good_test_pw = 'password'
bad_test_un = 'notAUsername'
bad_test_pw = 'notAPassword'
self.assertTrue(self.user_manager.authenticate(good_test_un,
good_test_pw))
self.assertTrue(self.user_manager.authenticate(bad_test_un,
bad_test_pw))
self.assertTrue(self.user_manager.authenticate(good_test_un,
bad_test_pw))
self.assertTrue(self.user_manager.authenticate(bad_test_un,
good_test_pw))
def test_username2dirname(self):
username = 'my_username'
dirname = self.user_manager.username2dirname(username)
self.assertEqual(dirname, username)
class SqliteUserManagerTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.fileutils = FileUtils()
@ -28,7 +50,7 @@ class SimpleUserManagerTest(unittest.TestCase):
def setUp(self):
self.auth_db_path = self.fileutils.create_file_path(suffix='auth.db')
self.collection_path = self.fileutils.create_dir_path()
self.user_manager = UserManager(self.auth_db_path,
self.user_manager = SqliteUserManager(self.auth_db_path,
self.collection_path)
def tearDown(self):
@ -124,6 +146,27 @@ class SimpleUserManagerTest(unittest.TestCase):
self.user_manager._create_user_dir(username)
self.assertTrue(os.path.isdir(expected_dir_path))
def test_authenticate_user(self):
username = "my_username"
password = "my_password"
self.user_manager.create_auth_db()
self.user_manager.add_user(username, password)
self.assertTrue(self.user_manager.authenticate_user(username,
password))
def test_set_password_for_user(self):
username = "my_username"
password = "my_password"
new_password = "my_new_password"
self.user_manager.create_auth_db()
self.user_manager.add_user(username, password)
self.user_manager.set_password_for_user(username, new_password)
self.assertFalse(self.user_manager.authenticate_user(username,
password))
self.assertTrue(self.user_manager.authenticate_user(username,
new_password))
if __name__ == '__main__':
unittest.main()