diff --git a/tests/helpers/collection_utils.py b/tests/helpers/collection_utils.py index 54789b9..3c94e62 100644 --- a/tests/helpers/collection_utils.py +++ b/tests/helpers/collection_utils.py @@ -4,7 +4,6 @@ import shutil import tempfile from anki import Collection -from helpers.file_utils import FileUtils class CollectionUtils(object): @@ -15,7 +14,7 @@ class CollectionUtils(object): def __init__(self): self.collections_to_close = [] - self.fileutils = FileUtils() + self.tempdir = tempfile.mkdtemp(prefix="CollectionUtils") self.master_db_path = None def __create_master_col(self): @@ -25,16 +24,11 @@ class CollectionUtils(object): time. """ - file_descriptor, file_path = tempfile.mkstemp(suffix=".anki2") - os.close(file_descriptor) - os.unlink(file_path) # We only need the file path. + file_path = os.path.join(self.tempdir, "collection.anki2") master_col = Collection(file_path) - self.__mark_col_paths_for_deletion(master_col) master_col.db.close() self.master_db_path = file_path - self.fileutils.mark_for_deletion(self.master_db_path) - def __enter__(self): return self @@ -44,15 +38,6 @@ class CollectionUtils(object): def __mark_collection_for_closing(self, collection): self.collections_to_close.append(collection) - def __mark_col_paths_for_deletion(self, collection): - """ - Marks the paths of all the database files and directories managed by - the collection for later deletion. - """ - self.fileutils.mark_for_deletion(collection.path) - self.fileutils.mark_for_deletion(collection.media.dir()) - self.fileutils.mark_for_deletion(collection.media.col.path) - def clean_up(self): """ Removes all files created by the Collection objects we issued and the @@ -63,10 +48,7 @@ class CollectionUtils(object): for col in self.collections_to_close: col.close() # This also closes the media col. self.collections_to_close = [] - - # Remove the files created by the collections. - self.fileutils.clean_up() - + shutil.rmtree(self.tempdir) self.master_db_path = None def create_empty_col(self): @@ -77,14 +59,11 @@ class CollectionUtils(object): if self.master_db_path is None: self.__create_master_col() - file_descriptor, file_path = tempfile.mkstemp(suffix=".anki2") - + file_descriptor, file_path = tempfile.mkstemp(dir=self.tempdir, suffix=".anki2") # Overwrite temp file with a copy of our master db. shutil.copy(self.master_db_path, file_path) collection = Collection(file_path) - self.__mark_collection_for_closing(collection) - self.__mark_col_paths_for_deletion(collection) return collection @staticmethod diff --git a/tests/helpers/file_utils.py b/tests/helpers/file_utils.py index 07e877e..f831114 100644 --- a/tests/helpers/file_utils.py +++ b/tests/helpers/file_utils.py @@ -13,171 +13,72 @@ import zipfile from anki.consts import SYNC_ZIP_SIZE -class FileUtils(object): +def create_named_file(filename, file_contents=None): """ - Provides utility methods for creating temporary files and directories. All - created files and dirs are recursively removed when clean_up() is called. - Supports the with statement. + Creates a temporary file with a custom name within a new temporary + directory and marks that parent dir for recursive deletion method. """ - def __init__(self): - self.paths_to_delete = [] + # We need to create a parent directory for the file so we can freely + # choose the file name . + temp_file_parent_dir = tempfile.mkdtemp(prefix="named_file") - def __enter__(self): - return self + file_path = os.path.join(temp_file_parent_dir, filename) - def __exit__(self, exception_type, exception_value, traceback): - self.clean_up() + if file_contents is not None: + open(file_path, 'w').write(file_contents) - def clean_up(self): - """ - Recursively removes all files and directories created by this instance. - """ + return file_path.decode("utf-8") - # Change cwd to a dir we're not about to delete so later calls to - # os.getcwd() and similar functions don't raise Exceptions. - os.chdir("/tmp") - # Error callback for shutil.rmtree(). - def on_error(func, path, excinfo): - logging.error("Error removing file: func={}, path={}, excinfo={}" - .format(func, path, excinfo)) +def create_zip_with_existing_files(file_paths): + """ + The method zips existing files and returns the zip data. Logic is + adapted from Anki Desktop's MediaManager.mediaChangesZip(). - for path in self.paths_to_delete: - if os.path.isfile(path): - logging.debug("Removing temporary file '{}'.".format(path)) - os.remove(path) - elif os.path.isdir(path): - logging.debug(("Removing temporary dir tree '{}' with " + - "files {}").format(path, os.listdir(path))) - shutil.rmtree(path, onerror=on_error) + :param file_paths: the paths of the files to include in the zip + :type file_paths: list + :return: the data of the created zip file + """ - self.paths_to_delete = [] + file_buffer = StringIO() + zip_file = zipfile.ZipFile(file_buffer, + 'w', + compression=zipfile.ZIP_DEFLATED) - def mark_for_deletion(self, path): - self.paths_to_delete.append(path) + meta = [] + sz = 0 - def create_file(self, suffix='', prefix='tmp'): - file_descriptor, file_path = tempfile.mkstemp(suffix=suffix, - prefix=prefix) - self.mark_for_deletion(file_path) - return file_path + for count, filePath in enumerate(file_paths): + zip_file.write(filePath, str(count)) + normname = unicodedata.normalize( + "NFC", + os.path.basename(filePath) + ) + meta.append((normname, str(count))) - def create_dir(self, suffix='', prefix='tmp'): - dir_path = tempfile.mkdtemp(suffix=suffix, - prefix=prefix) - self.mark_for_deletion(dir_path) - return dir_path + sz += os.path.getsize(filePath) + if sz >= SYNC_ZIP_SIZE: + break - def create_file_path(self, suffix='', prefix='tmp'): - """Generates a file path.""" + zip_file.writestr("_meta", json.dumps(meta)) + zip_file.close() - file_path = self.create_file(suffix, prefix) - os.unlink(file_path) - return file_path + return file_buffer.getvalue() - def create_dir_path(self, suffix='', prefix='tmp'): - dir_path = self.create_dir(suffix, prefix) - os.rmdir(dir_path) - return dir_path - def create_named_file(self, filename, file_contents=None): - """ - Creates a temporary file with a custom name within a new temporary - directory and marks that parent dir for recursive deletion method. - """ +def get_asset_path(relative_file_path): + """ + Retrieves the path of a file for testing from the "assets" directory. - # We need to create a parent directory for the file so we can freely - # choose the file name . - temp_file_parent_dir = tempfile.mkdtemp(prefix="anki") - self.mark_for_deletion(temp_file_parent_dir) + :param relative_file_path: the name of the file to retrieve, relative + to the "assets" directory + :return: the absolute path to the file in the "assets" directory. + """ - file_path = os.path.join(temp_file_parent_dir, filename) + join = os.path.join - if file_contents is not None: - open(file_path, 'w').write(file_contents) - - return file_path - - def create_named_file_path(self, filename): - file_path = self.create_named_file(filename) - return file_path - - def create_file_copy(self, path): - basename = os.path.basename(path) - temp_file_path = self.create_named_file_path(basename) - shutil.copyfile(path, temp_file_path) - return temp_file_path - - def create_named_files(self, filenames_and_data): - """ - Creates temporary files within the same new temporary parent directory - and marks that parent for recursive deletion. - - :param filenames_and_data: list of tuples (filename, file contents) - :return: list of paths to the created files - """ - - temp_files_parent_dir = tempfile.mkdtemp(prefix="anki") - self.mark_for_deletion(temp_files_parent_dir) - - file_paths = [] - for filename, file_contents in filenames_and_data: - path = os.path.join(temp_files_parent_dir, filename) - file_paths.append(path) - if file_contents is not None: - open(path, 'w').write(file_contents) - - return file_paths - - @staticmethod - def create_zip_with_existing_files(file_paths): - """ - The method zips existing files and returns the zip data. Logic is - adapted from Anki Desktop's MediaManager.mediaChangesZip(). - - :param file_paths: the paths of the files to include in the zip - :type file_paths: list - :return: the data of the created zip file - """ - - file_buffer = StringIO() - zip_file = zipfile.ZipFile(file_buffer, - 'w', - compression=zipfile.ZIP_DEFLATED) - - meta = [] - sz = 0 - - for count, filePath in enumerate(file_paths): - zip_file.write(filePath, str(count)) - normname = unicodedata.normalize( - "NFC", - os.path.basename(filePath) - ) - meta.append((normname, str(count))) - - sz += os.path.getsize(filePath) - if sz >= SYNC_ZIP_SIZE: - break - - zip_file.writestr("_meta", json.dumps(meta)) - zip_file.close() - - return file_buffer.getvalue() - - def get_asset_path(self, relative_file_path): - """ - Retrieves the path of a file for testing from the "assets" directory. - - :param relative_file_path: the name of the file to retrieve, relative - to the "assets" directory - :return: the absolute path to the file in the "assets" directory. - """ - - join = os.path.join - - script_dir = os.path.dirname(os.path.realpath(__file__)) - support_dir = join(script_dir, os.pardir, "assets") - res = join(support_dir, relative_file_path) - return res + script_dir = os.path.dirname(os.path.realpath(__file__)) + support_dir = join(script_dir, os.pardir, "assets") + res = join(support_dir, relative_file_path) + return res diff --git a/tests/helpers/server_utils.py b/tests/helpers/server_utils.py index 5850916..0e4463e 100644 --- a/tests/helpers/server_utils.py +++ b/tests/helpers/server_utils.py @@ -1,33 +1,29 @@ # -*- coding: utf-8 -*- -import logging import ConfigParser +import logging +import os +import shutil +import tempfile from ankisyncd.sync_app import SyncApp, SyncCollectionHandler, SyncMediaHandler -from helpers.file_utils import FileUtils class ServerUtils(object): - def __init__(self): - self.fileutils = FileUtils() - def clean_up(self): - self.fileutils.clean_up() + shutil.rmtree(self.dir) def create_server_paths(self): """ Creates temporary files and dirs for our app to use during tests. """ + dir = tempfile.mkdtemp(prefix="ServerUtils") + self.dir = dir + os.mkdir(os.path.join(dir, "data")) - auth = self.fileutils.create_file_path(suffix='.db', - prefix='ankiserver_auth_db_') - session = self.fileutils.create_file_path(suffix='.db', - prefix='ankiserver_session_db_') - data = self.fileutils.create_dir(suffix='', - prefix='ankiserver_data_root_') return { - "auth_db": auth, - "session_db": session, - "data_root": data + "auth_db": os.path.join(dir, "auth.db"), + "session_db": os.path.join(dir, "session.db"), + "data_root": os.path.join(dir, "data"), } @staticmethod diff --git a/tests/sync_app_functional_test_base.py b/tests/sync_app_functional_test_base.py index 22d94ee..ca32a70 100644 --- a/tests/sync_app_functional_test_base.py +++ b/tests/sync_app_functional_test_base.py @@ -5,7 +5,6 @@ from webtest import TestApp from ankisyncd.users import SqliteUserManager from helpers.collection_utils import CollectionUtils -from helpers.file_utils import FileUtils from helpers.mock_servers import MockRemoteServer from helpers.monkey_patches import monkeypatch_db, unpatch_db from helpers.server_utils import ServerUtils @@ -15,15 +14,11 @@ class SyncAppFunctionalTestBase(unittest.TestCase): @classmethod def setUpClass(cls): - cls.fileutils = FileUtils() cls.colutils = CollectionUtils() cls.serverutils = ServerUtils() @classmethod def tearDownClass(cls): - cls.fileutils.clean_up() - cls.fileutils = None - cls.colutils.clean_up() cls.colutils = None diff --git a/tests/test_web_media.py b/tests/test_web_media.py index 72446d1..a39afaa 100644 --- a/tests/test_web_media.py +++ b/tests/test_web_media.py @@ -3,7 +3,9 @@ import tempfile import filecmp import sqlite3 import os +import shutil +import helpers.file_utils import helpers.db_utils from anki.sync import MediaSyncer from helpers.mock_servers import MockRemoteMediaServer @@ -16,6 +18,7 @@ class SyncAppFunctionalMediaTest(SyncAppFunctionalTestBase): SyncAppFunctionalTestBase.setUp(self) monkeypatch_mediamanager() + self.tempdir = tempfile.mkdtemp(prefix=self.__class__.__name__) self.hkey = self.mock_remote_server.hostKey("testuser", "testpassword") client_collection = self.colutils.create_empty_col() self.client_syncer = self.create_client_syncer(client_collection, @@ -56,8 +59,13 @@ class SyncAppFunctionalMediaTest(SyncAppFunctionalTestBase): raise IOError("file '" + right_db_path + "' does not exist") # Create temporary copies of the files to act on. - left_db_path = self.fileutils.create_file_copy(left_db_path) - right_db_path = self.fileutils.create_file_copy(right_db_path) + newleft = os.path.join(self.tempdir, left_db_path) + ".tmp" + shutil.copyfile(left_db_path, newleft) + left_db_path = newleft + + newright = os.path.join(self.tempdir, left_db_path) + ".tmp" + shutil.copyfile(right_db_path, newright) + right_db_path = newright if not compare_timestamps: # Set all timestamps that are not NULL to 0. @@ -91,7 +99,7 @@ class SyncAppFunctionalMediaTest(SyncAppFunctionalTestBase): 'media') # Create a test file. - temp_file_path = self.fileutils.create_named_file(u"foo.jpg", "hello") + temp_file_path = helpers.file_utils.create_named_file(u"foo.jpg", "hello") # Add the test file to the server's collection. self.serverutils.add_files_to_mediasyncer(server, @@ -123,7 +131,7 @@ class SyncAppFunctionalMediaTest(SyncAppFunctionalTestBase): 'media') # Create a test file. - temp_file_path = self.fileutils.create_named_file(u"foo.jpg", "hello") + temp_file_path = helpers.file_utils.create_named_file(u"foo.jpg", "hello") # Add the test file to the client's media collection. self.serverutils.add_files_to_mediasyncer(client, @@ -162,10 +170,8 @@ class SyncAppFunctionalMediaTest(SyncAppFunctionalTestBase): 'media') # Create two files and add one to the server and one to the client. - file_for_client, file_for_server = self.fileutils.create_named_files([ - (u"foo.jpg", "hello"), - (u"bar.jpg", "goodbye") - ]) + file_for_client = helpers.file_utils.create_named_file(u"foo.jpg", "hello") + file_for_server = helpers.file_utils.create_named_file(u"bar.jpg", "goodbye") self.serverutils.add_files_to_mediasyncer(client, [file_for_client], @@ -212,10 +218,8 @@ class SyncAppFunctionalMediaTest(SyncAppFunctionalTestBase): # Create two files with identical names but different contents and # checksums. Add one to the server and one to the client. - file_for_client, file_for_server = self.fileutils.create_named_files([ - (u"foo.jpg", "hello"), - (u"foo.jpg", "goodbye") - ]) + file_for_client = helpers.file_utils.create_named_file(u"foo.jpg", "hello") + file_for_server = helpers.file_utils.create_named_file(u"foo.jpg", "goodbye") self.serverutils.add_files_to_mediasyncer(client, [file_for_client], @@ -259,7 +263,7 @@ class SyncAppFunctionalMediaTest(SyncAppFunctionalTestBase): 'media') # Create a test file. - temp_file_path = self.fileutils.create_named_file(u"foo.jpg", "hello") + temp_file_path = helpers.file_utils.create_named_file(u"foo.jpg", "hello") # Add the test file to client's media collection. self.serverutils.add_files_to_mediasyncer(client, @@ -307,7 +311,7 @@ class SyncAppFunctionalMediaTest(SyncAppFunctionalTestBase): # Add a test image file to the client's media collection but don't # update its media db since the desktop client updates that, using # findChanges(), only during syncs. - support_file = self.fileutils.get_asset_path(u'blue.jpg') + support_file = helpers.file_utils.get_asset_path(u'blue.jpg') self.assertTrue(os.path.isfile(support_file)) self.serverutils.add_files_to_mediasyncer(client, [support_file],