From c7d7ff3e858415ec12785579882a4c7586cbfce4 Mon Sep 17 00:00:00 2001 From: Christoph Mack Date: Fri, 27 May 2016 21:33:22 +0200 Subject: [PATCH] Merge SyncApp's SqliteUserManager and the newly introduced UserManager and group all user managers in user_managers.py. --- ankisyncd/sync_app.py | 49 +--------- ankisyncd/users.py | 123 ++++++++++++++++++++----- tests/sync_app_functional_test_base.py | 6 +- tests/test_sync_app.py | 78 ---------------- tests/test_users.py | 61 ++++++++++-- 5 files changed, 158 insertions(+), 159 deletions(-) diff --git a/ankisyncd/sync_app.py b/ankisyncd/sync_app.py index da978ef..a703be6 100644 --- a/ankisyncd/sync_app.py +++ b/ankisyncd/sync_app.py @@ -38,6 +38,8 @@ from anki.sync import Syncer, MediaSyncer from anki.utils import intTime, checksum, isMac from anki.consts import SYNC_ZIP_SIZE, SYNC_ZIP_COUNT +from ankisyncd.users import SimpleUserManager, SqliteUserManager + try: from cStringIO import StringIO except ImportError: @@ -316,25 +318,6 @@ class SimpleSessionManager(object): def delete(self, hkey): del self.sessions[hkey] -class SimpleUserManager(object): - """A simple user manager that always allows any user.""" - - def authenticate(self, username, password): - """ - Returns True if this username is allowed to connect with this password. False otherwise. - Override this to change how users are authenticated. - """ - - return True - - def username2dirname(self, username): - """ - Returns the directory name for the given user. By default, this is just the username. - Override this to adjust the mapping between users and their directory. - """ - - return username - class SyncApp(object): valid_urls = SyncCollectionHandler.operations + SyncMediaHandler.operations + ['hostKey', 'upload', 'download'] @@ -694,34 +677,6 @@ class SqliteSessionManager(SimpleSessionManager): cursor.execute("DELETE FROM session WHERE hkey=?", (hkey,)) conn.commit() -class SqliteUserManager(SimpleUserManager): - """Authenticates users against a SQLite database.""" - - def __init__(self, auth_db_path): - self.auth_db_path = os.path.abspath(auth_db_path) - - def authenticate(self, username, password): - """Returns True if this username is allowed to connect with this password. False otherwise.""" - - conn = sqlite.connect(self.auth_db_path) - cursor = conn.cursor() - param = (username,) - - cursor.execute("SELECT hash FROM auth WHERE user=?", param) - - db_ret = cursor.fetchone() - - if db_ret != None: - db_hash = str(db_ret[0]) - salt = db_hash[-16:] - hashobj = hashlib.sha256() - - hashobj.update(username+password+salt) - - conn.close() - - return (db_ret != None and hashobj.hexdigest()+salt == db_hash) - def make_app(global_conf, **local_conf): return SyncApp(**local_conf) diff --git a/ankisyncd/users.py b/ankisyncd/users.py index ce83686..49c4d80 100644 --- a/ankisyncd/users.py +++ b/ankisyncd/users.py @@ -2,25 +2,57 @@ import binascii -from contextlib import closing import hashlib import logging import os import sqlite3 as sqlite -class UserManager: - def __init__(self, auth_db_path, collection_path): - self.auth_db_path = auth_db_path +class SimpleUserManager(object): + """A simple user manager that always allows any user.""" + + def __init__(self, collection_path=''): self.collection_path = collection_path + def authenticate(self, username, password): + """ + Returns True if this username is allowed to connect with this password. + False otherwise. Override this to change how users are authenticated. + """ + + return True + + def username2dirname(self, username): + """ + Returns the directory name for the given user. By default, this is just + the username. Override this to adjust the mapping between users and + their directory. + """ + + return username + + def _create_user_dir(self, username): + user_dir_path = os.path.join(self.collection_path, username) + if not os.path.isdir(user_dir_path): + logging.info("Creating collection directory for user '{}' at {}" + .format(username, user_dir_path)) + os.makedirs(user_dir_path) + + +class SqliteUserManager(SimpleUserManager): + """Authenticates users against a SQLite database.""" + + def __init__(self, auth_db_path, collection_path=None): + SimpleUserManager.__init__(self, collection_path) + self.auth_db_path = auth_db_path + def auth_db_exists(self): return os.path.isfile(self.auth_db_path) def user_list(self): if not self.auth_db_exists(): - self.create_auth_db() - return [] + raise ValueError("Cannot list users for nonexistent auth db {}." + .format(self.auth_db_path)) else: conn = sqlite.connect(self.auth_db_path) cursor = conn.cursor() @@ -37,15 +69,16 @@ class UserManager: def del_user(self, username): if not self.auth_db_exists(): - self.create_auth_db() - - conn = sqlite.connect(self.auth_db_path) - cursor = conn.cursor() - logging.info("Removing user '{}' from auth db." - .format(username)) - cursor.execute("DELETE FROM auth WHERE user=?", (username,)) - conn.commit() - conn.close() + raise ValueError("Cannot remove user from nonexistent auth db {}." + .format(self.auth_db_path)) + else: + conn = sqlite.connect(self.auth_db_path) + cursor = conn.cursor() + logging.info("Removing user '{}' from auth db." + .format(username)) + cursor.execute("DELETE FROM auth WHERE user=?", (username,)) + conn.commit() + conn.close() def add_user(self, username, password): self._add_user_to_auth_db(username, password) @@ -69,6 +102,59 @@ class UserManager: conn.commit() conn.close() + def set_password_for_user(self, username, new_password): + if not self.auth_db_exists(): + raise ValueError("Cannot remove user from nonexistent auth db {}." + .format(self.auth_db_path)) + elif not self.user_exists(username): + raise ValueError("Cannot remove nonexistent user {}." + .format(username)) + else: + hash = self._create_pass_hash(username, new_password) + + conn = sqlite.connect(self.auth_db_path) + cursor = conn.cursor() + cursor.execute("UPDATE auth SET hash=? WHERE user=?", (hash, username)) + conn.commit() + conn.close() + + logging.info("Changed password for user {}.".format(username)) + + def authenticate_user(self, username, password): + """Returns True if this username is allowed to connect with this password. False otherwise.""" + + conn = sqlite.connect(self.auth_db_path) + cursor = conn.cursor() + param = (username,) + cursor.execute("SELECT hash FROM auth WHERE user=?", param) + db_hash = cursor.fetchone() + conn.close() + + if db_hash is None: + logging.info("Authentication failed for nonexistent user {}." + .format(username)) + return False + else: + expected_value = str(db_hash[0]) + salt = self._extract_salt(expected_value) + + hashobj = hashlib.sha256() + hashobj.update(username + password + salt) + actual_value = hashobj.hexdigest() + salt + + if actual_value == expected_value: + logging.info("Authentication succeeded for user {}." + .format(username)) + return True + else: + logging.info("Authentication failed for user {}." + .format(username)) + return False + + @staticmethod + def _extract_salt(hash): + return hash[-16:] + @staticmethod def _create_pass_hash(username, password): salt = binascii.b2a_hex(os.urandom(8)) @@ -85,10 +171,3 @@ class UserManager: (user VARCHAR PRIMARY KEY, hash VARCHAR)""") conn.commit() conn.close() - - def _create_user_dir(self, username): - user_dir_path = os.path.join(self.collection_path, username) - if not os.path.isdir(user_dir_path): - logging.info("Creating collection directory for user '{}' at {}" - .format(username, user_dir_path)) - os.makedirs(user_dir_path) diff --git a/tests/sync_app_functional_test_base.py b/tests/sync_app_functional_test_base.py index 3702284..ae71afc 100644 --- a/tests/sync_app_functional_test_base.py +++ b/tests/sync_app_functional_test_base.py @@ -6,7 +6,7 @@ import unittest from webtest import TestApp -from ankisyncd.users import UserManager +from ankisyncd.users import SqliteUserManager from helpers.collection_utils import CollectionUtils from helpers.db_utils import DBUtils from helpers.file_utils import FileUtils @@ -45,8 +45,8 @@ class SyncAppFunctionalTestBase(unittest.TestCase): self.server_paths = self.serverutils.create_server_paths() # Add a test user to the temp auth db the server will use. - self.user_manager = UserManager(self.server_paths['auth_db'], - self.server_paths['data_root']) + self.user_manager = SqliteUserManager(self.server_paths['auth_db'], + self.server_paths['data_root']) self.user_manager.add_user('testuser', 'testpassword') # Get absolute path to development ini file. diff --git a/tests/test_sync_app.py b/tests/test_sync_app.py index 4fbfdd0..dbba660 100644 --- a/tests/test_sync_app.py +++ b/tests/test_sync_app.py @@ -57,84 +57,6 @@ class SyncCollectionHandlerTest(CollectionTestBase): self.assertEqual(meta['cont'], True) -class SimpleUserManagerTest(unittest.TestCase): - _good_test_un = 'username' - _good_test_pw = 'password' - - _bad_test_un = 'notAUsername' - _bad_test_pw = 'notAPassword' - - def setUp(self): - self._user_manager = SimpleUserManager() - - def tearDown(self): - self._user_manager = None - - def test_authenticate(self): - self.assertTrue(self._user_manager.authenticate(self._good_test_un, - self._good_test_pw)) - - self.assertTrue(self._user_manager.authenticate(self._bad_test_un, - self._bad_test_pw)) - - self.assertTrue(self._user_manager.authenticate(self._good_test_un, - self._bad_test_pw)) - - self.assertTrue(self._user_manager.authenticate(self._bad_test_un, - self._good_test_pw)) - - def test_username2dirname(self): - dirname = self._user_manager.username2dirname(self._good_test_un) - self.assertEqual(dirname, self._good_test_un) - - -class SqliteUserManagerTest(SimpleUserManagerTest): - file_descriptor, _test_auth_db_path = tempfile.mkstemp(suffix=".db") - os.close(file_descriptor) - os.unlink(_test_auth_db_path) - - def _create_test_auth_db(self, db_path, username, password): - if os.path.exists(db_path): - os.remove(db_path) - - salt = binascii.b2a_hex(os.urandom(8)) - crypto_hash = hashlib.sha256(username+password+salt).hexdigest()+salt - - conn = sqlite3.connect(db_path) - cursor = conn.cursor() - - cursor.execute("""CREATE TABLE IF NOT EXISTS auth - (user VARCHAR PRIMARY KEY, hash VARCHAR)""") - - cursor.execute("INSERT INTO auth VALUES (?, ?)", (username, crypto_hash)) - - conn.commit() - conn.close() - - def setUp(self): - self._create_test_auth_db(self._test_auth_db_path, - self._good_test_un, - self._good_test_pw) - self._user_manager = SqliteUserManager(self._test_auth_db_path) - - def tearDown(self): - if os.path.exists(self._test_auth_db_path): - os.remove(self._test_auth_db_path) - - def test_authenticate(self): - self.assertTrue(self._user_manager.authenticate(self._good_test_un, - self._good_test_pw)) - - self.assertFalse(self._user_manager.authenticate(self._bad_test_un, - self._bad_test_pw)) - - self.assertFalse(self._user_manager.authenticate(self._good_test_un, - self._bad_test_pw)) - - self.assertFalse(self._user_manager.authenticate(self._bad_test_un, - self._good_test_pw)) - - class SimpleSessionManagerTest(unittest.TestCase): test_hkey = '1234567890' test_session = SyncUserSession('testName', 'testPath', None, None) diff --git a/tests/test_users.py b/tests/test_users.py index 76614ab..6994692 100644 --- a/tests/test_users.py +++ b/tests/test_users.py @@ -5,17 +5,39 @@ import os import unittest -from ankisyncd.users import UserManager +from ankisyncd.users import SimpleUserManager, SqliteUserManager from helpers.file_utils import FileUtils class SimpleUserManagerTest(unittest.TestCase): - _good_test_un = 'username' - _good_test_pw = 'password' + def setUp(self): + self.user_manager = SimpleUserManager() - _bad_test_un = 'notAUsername' - _bad_test_pw = 'notAPassword' + def tearDown(self): + self._user_manager = None + def test_authenticate(self): + good_test_un = 'username' + good_test_pw = 'password' + bad_test_un = 'notAUsername' + bad_test_pw = 'notAPassword' + + self.assertTrue(self.user_manager.authenticate(good_test_un, + good_test_pw)) + self.assertTrue(self.user_manager.authenticate(bad_test_un, + bad_test_pw)) + self.assertTrue(self.user_manager.authenticate(good_test_un, + bad_test_pw)) + self.assertTrue(self.user_manager.authenticate(bad_test_un, + good_test_pw)) + + def test_username2dirname(self): + username = 'my_username' + dirname = self.user_manager.username2dirname(username) + self.assertEqual(dirname, username) + + +class SqliteUserManagerTest(unittest.TestCase): @classmethod def setUpClass(cls): cls.fileutils = FileUtils() @@ -28,8 +50,8 @@ class SimpleUserManagerTest(unittest.TestCase): def setUp(self): self.auth_db_path = self.fileutils.create_file_path(suffix='auth.db') self.collection_path = self.fileutils.create_dir_path() - self.user_manager = UserManager(self.auth_db_path, - self.collection_path) + self.user_manager = SqliteUserManager(self.auth_db_path, + self.collection_path) def tearDown(self): self.user_manager = None @@ -124,6 +146,27 @@ class SimpleUserManagerTest(unittest.TestCase): self.user_manager._create_user_dir(username) self.assertTrue(os.path.isdir(expected_dir_path)) + def test_authenticate_user(self): + username = "my_username" + password = "my_password" + + self.user_manager.create_auth_db() + self.user_manager.add_user(username, password) + + self.assertTrue(self.user_manager.authenticate_user(username, + password)) + + def test_set_password_for_user(self): + username = "my_username" + password = "my_password" + new_password = "my_new_password" + + self.user_manager.create_auth_db() + self.user_manager.add_user(username, password) + + self.user_manager.set_password_for_user(username, new_password) + self.assertFalse(self.user_manager.authenticate_user(username, + password)) + self.assertTrue(self.user_manager.authenticate_user(username, + new_password)) -if __name__ == '__main__': - unittest.main()