Remove unnecessary class from helpers.file_utils

A class which keeps track of temporary files and removes them
automatically when they're not needed anymore might be a good idea,
but this implementation didn't remove files in some cases. Adding
unrelated methods that could as well be just standalone functions
is bad design, too.

In this case, it's better to just get rid of it altogether instead
of fixing it, since Python 3 has a TemporaryDirectory class, which
can be used for the same purpose and is definitely more
battle-tested.
This commit is contained in:
flan 2017-11-04 00:22:09 +01:00
parent e959d8745a
commit a48ad44a65
5 changed files with 82 additions and 207 deletions

View File

@ -4,7 +4,6 @@ import shutil
import tempfile
from anki import Collection
from helpers.file_utils import FileUtils
class CollectionUtils(object):
@ -15,7 +14,7 @@ class CollectionUtils(object):
def __init__(self):
self.collections_to_close = []
self.fileutils = FileUtils()
self.tempdir = tempfile.mkdtemp(prefix="CollectionUtils")
self.master_db_path = None
def __create_master_col(self):
@ -25,16 +24,11 @@ class CollectionUtils(object):
time.
"""
file_descriptor, file_path = tempfile.mkstemp(suffix=".anki2")
os.close(file_descriptor)
os.unlink(file_path) # We only need the file path.
file_path = os.path.join(self.tempdir, "collection.anki2")
master_col = Collection(file_path)
self.__mark_col_paths_for_deletion(master_col)
master_col.db.close()
self.master_db_path = file_path
self.fileutils.mark_for_deletion(self.master_db_path)
def __enter__(self):
return self
@ -44,15 +38,6 @@ class CollectionUtils(object):
def __mark_collection_for_closing(self, collection):
self.collections_to_close.append(collection)
def __mark_col_paths_for_deletion(self, collection):
"""
Marks the paths of all the database files and directories managed by
the collection for later deletion.
"""
self.fileutils.mark_for_deletion(collection.path)
self.fileutils.mark_for_deletion(collection.media.dir())
self.fileutils.mark_for_deletion(collection.media.col.path)
def clean_up(self):
"""
Removes all files created by the Collection objects we issued and the
@ -63,10 +48,7 @@ class CollectionUtils(object):
for col in self.collections_to_close:
col.close() # This also closes the media col.
self.collections_to_close = []
# Remove the files created by the collections.
self.fileutils.clean_up()
shutil.rmtree(self.tempdir)
self.master_db_path = None
def create_empty_col(self):
@ -77,14 +59,11 @@ class CollectionUtils(object):
if self.master_db_path is None:
self.__create_master_col()
file_descriptor, file_path = tempfile.mkstemp(suffix=".anki2")
file_descriptor, file_path = tempfile.mkstemp(dir=self.tempdir, suffix=".anki2")
# Overwrite temp file with a copy of our master db.
shutil.copy(self.master_db_path, file_path)
collection = Collection(file_path)
self.__mark_collection_for_closing(collection)
self.__mark_col_paths_for_deletion(collection)
return collection
@staticmethod

View File

@ -13,171 +13,72 @@ import zipfile
from anki.consts import SYNC_ZIP_SIZE
class FileUtils(object):
def create_named_file(filename, file_contents=None):
"""
Provides utility methods for creating temporary files and directories. All
created files and dirs are recursively removed when clean_up() is called.
Supports the with statement.
Creates a temporary file with a custom name within a new temporary
directory and marks that parent dir for recursive deletion method.
"""
def __init__(self):
self.paths_to_delete = []
# We need to create a parent directory for the file so we can freely
# choose the file name .
temp_file_parent_dir = tempfile.mkdtemp(prefix="named_file")
def __enter__(self):
return self
file_path = os.path.join(temp_file_parent_dir, filename)
def __exit__(self, exception_type, exception_value, traceback):
self.clean_up()
if file_contents is not None:
open(file_path, 'w').write(file_contents)
def clean_up(self):
"""
Recursively removes all files and directories created by this instance.
"""
return file_path.decode("utf-8")
# Change cwd to a dir we're not about to delete so later calls to
# os.getcwd() and similar functions don't raise Exceptions.
os.chdir("/tmp")
# Error callback for shutil.rmtree().
def on_error(func, path, excinfo):
logging.error("Error removing file: func={}, path={}, excinfo={}"
.format(func, path, excinfo))
def create_zip_with_existing_files(file_paths):
"""
The method zips existing files and returns the zip data. Logic is
adapted from Anki Desktop's MediaManager.mediaChangesZip().
for path in self.paths_to_delete:
if os.path.isfile(path):
logging.debug("Removing temporary file '{}'.".format(path))
os.remove(path)
elif os.path.isdir(path):
logging.debug(("Removing temporary dir tree '{}' with " +
"files {}").format(path, os.listdir(path)))
shutil.rmtree(path, onerror=on_error)
:param file_paths: the paths of the files to include in the zip
:type file_paths: list
:return: the data of the created zip file
"""
self.paths_to_delete = []
file_buffer = StringIO()
zip_file = zipfile.ZipFile(file_buffer,
'w',
compression=zipfile.ZIP_DEFLATED)
def mark_for_deletion(self, path):
self.paths_to_delete.append(path)
meta = []
sz = 0
def create_file(self, suffix='', prefix='tmp'):
file_descriptor, file_path = tempfile.mkstemp(suffix=suffix,
prefix=prefix)
self.mark_for_deletion(file_path)
return file_path
for count, filePath in enumerate(file_paths):
zip_file.write(filePath, str(count))
normname = unicodedata.normalize(
"NFC",
os.path.basename(filePath)
)
meta.append((normname, str(count)))
def create_dir(self, suffix='', prefix='tmp'):
dir_path = tempfile.mkdtemp(suffix=suffix,
prefix=prefix)
self.mark_for_deletion(dir_path)
return dir_path
sz += os.path.getsize(filePath)
if sz >= SYNC_ZIP_SIZE:
break
def create_file_path(self, suffix='', prefix='tmp'):
"""Generates a file path."""
zip_file.writestr("_meta", json.dumps(meta))
zip_file.close()
file_path = self.create_file(suffix, prefix)
os.unlink(file_path)
return file_path
return file_buffer.getvalue()
def create_dir_path(self, suffix='', prefix='tmp'):
dir_path = self.create_dir(suffix, prefix)
os.rmdir(dir_path)
return dir_path
def create_named_file(self, filename, file_contents=None):
"""
Creates a temporary file with a custom name within a new temporary
directory and marks that parent dir for recursive deletion method.
"""
def get_asset_path(relative_file_path):
"""
Retrieves the path of a file for testing from the "assets" directory.
# We need to create a parent directory for the file so we can freely
# choose the file name .
temp_file_parent_dir = tempfile.mkdtemp(prefix="anki")
self.mark_for_deletion(temp_file_parent_dir)
:param relative_file_path: the name of the file to retrieve, relative
to the "assets" directory
:return: the absolute path to the file in the "assets" directory.
"""
file_path = os.path.join(temp_file_parent_dir, filename)
join = os.path.join
if file_contents is not None:
open(file_path, 'w').write(file_contents)
return file_path
def create_named_file_path(self, filename):
file_path = self.create_named_file(filename)
return file_path
def create_file_copy(self, path):
basename = os.path.basename(path)
temp_file_path = self.create_named_file_path(basename)
shutil.copyfile(path, temp_file_path)
return temp_file_path
def create_named_files(self, filenames_and_data):
"""
Creates temporary files within the same new temporary parent directory
and marks that parent for recursive deletion.
:param filenames_and_data: list of tuples (filename, file contents)
:return: list of paths to the created files
"""
temp_files_parent_dir = tempfile.mkdtemp(prefix="anki")
self.mark_for_deletion(temp_files_parent_dir)
file_paths = []
for filename, file_contents in filenames_and_data:
path = os.path.join(temp_files_parent_dir, filename)
file_paths.append(path)
if file_contents is not None:
open(path, 'w').write(file_contents)
return file_paths
@staticmethod
def create_zip_with_existing_files(file_paths):
"""
The method zips existing files and returns the zip data. Logic is
adapted from Anki Desktop's MediaManager.mediaChangesZip().
:param file_paths: the paths of the files to include in the zip
:type file_paths: list
:return: the data of the created zip file
"""
file_buffer = StringIO()
zip_file = zipfile.ZipFile(file_buffer,
'w',
compression=zipfile.ZIP_DEFLATED)
meta = []
sz = 0
for count, filePath in enumerate(file_paths):
zip_file.write(filePath, str(count))
normname = unicodedata.normalize(
"NFC",
os.path.basename(filePath)
)
meta.append((normname, str(count)))
sz += os.path.getsize(filePath)
if sz >= SYNC_ZIP_SIZE:
break
zip_file.writestr("_meta", json.dumps(meta))
zip_file.close()
return file_buffer.getvalue()
def get_asset_path(self, relative_file_path):
"""
Retrieves the path of a file for testing from the "assets" directory.
:param relative_file_path: the name of the file to retrieve, relative
to the "assets" directory
:return: the absolute path to the file in the "assets" directory.
"""
join = os.path.join
script_dir = os.path.dirname(os.path.realpath(__file__))
support_dir = join(script_dir, os.pardir, "assets")
res = join(support_dir, relative_file_path)
return res
script_dir = os.path.dirname(os.path.realpath(__file__))
support_dir = join(script_dir, os.pardir, "assets")
res = join(support_dir, relative_file_path)
return res

View File

@ -1,33 +1,29 @@
# -*- coding: utf-8 -*-
import logging
import ConfigParser
import logging
import os
import shutil
import tempfile
from ankisyncd.sync_app import SyncApp, SyncCollectionHandler, SyncMediaHandler
from helpers.file_utils import FileUtils
class ServerUtils(object):
def __init__(self):
self.fileutils = FileUtils()
def clean_up(self):
self.fileutils.clean_up()
shutil.rmtree(self.dir)
def create_server_paths(self):
"""
Creates temporary files and dirs for our app to use during tests.
"""
dir = tempfile.mkdtemp(prefix="ServerUtils")
self.dir = dir
os.mkdir(os.path.join(dir, "data"))
auth = self.fileutils.create_file_path(suffix='.db',
prefix='ankiserver_auth_db_')
session = self.fileutils.create_file_path(suffix='.db',
prefix='ankiserver_session_db_')
data = self.fileutils.create_dir(suffix='',
prefix='ankiserver_data_root_')
return {
"auth_db": auth,
"session_db": session,
"data_root": data
"auth_db": os.path.join(dir, "auth.db"),
"session_db": os.path.join(dir, "session.db"),
"data_root": os.path.join(dir, "data"),
}
@staticmethod

View File

@ -5,7 +5,6 @@ from webtest import TestApp
from ankisyncd.users import SqliteUserManager
from helpers.collection_utils import CollectionUtils
from helpers.file_utils import FileUtils
from helpers.mock_servers import MockRemoteServer
from helpers.monkey_patches import monkeypatch_db, unpatch_db
from helpers.server_utils import ServerUtils
@ -15,15 +14,11 @@ class SyncAppFunctionalTestBase(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.fileutils = FileUtils()
cls.colutils = CollectionUtils()
cls.serverutils = ServerUtils()
@classmethod
def tearDownClass(cls):
cls.fileutils.clean_up()
cls.fileutils = None
cls.colutils.clean_up()
cls.colutils = None

View File

@ -3,7 +3,9 @@ import tempfile
import filecmp
import sqlite3
import os
import shutil
import helpers.file_utils
import helpers.db_utils
from anki.sync import MediaSyncer
from helpers.mock_servers import MockRemoteMediaServer
@ -16,6 +18,7 @@ class SyncAppFunctionalMediaTest(SyncAppFunctionalTestBase):
SyncAppFunctionalTestBase.setUp(self)
monkeypatch_mediamanager()
self.tempdir = tempfile.mkdtemp(prefix=self.__class__.__name__)
self.hkey = self.mock_remote_server.hostKey("testuser", "testpassword")
client_collection = self.colutils.create_empty_col()
self.client_syncer = self.create_client_syncer(client_collection,
@ -56,8 +59,13 @@ class SyncAppFunctionalMediaTest(SyncAppFunctionalTestBase):
raise IOError("file '" + right_db_path + "' does not exist")
# Create temporary copies of the files to act on.
left_db_path = self.fileutils.create_file_copy(left_db_path)
right_db_path = self.fileutils.create_file_copy(right_db_path)
newleft = os.path.join(self.tempdir, left_db_path) + ".tmp"
shutil.copyfile(left_db_path, newleft)
left_db_path = newleft
newright = os.path.join(self.tempdir, left_db_path) + ".tmp"
shutil.copyfile(right_db_path, newright)
right_db_path = newright
if not compare_timestamps:
# Set all timestamps that are not NULL to 0.
@ -91,7 +99,7 @@ class SyncAppFunctionalMediaTest(SyncAppFunctionalTestBase):
'media')
# Create a test file.
temp_file_path = self.fileutils.create_named_file(u"foo.jpg", "hello")
temp_file_path = helpers.file_utils.create_named_file(u"foo.jpg", "hello")
# Add the test file to the server's collection.
self.serverutils.add_files_to_mediasyncer(server,
@ -123,7 +131,7 @@ class SyncAppFunctionalMediaTest(SyncAppFunctionalTestBase):
'media')
# Create a test file.
temp_file_path = self.fileutils.create_named_file(u"foo.jpg", "hello")
temp_file_path = helpers.file_utils.create_named_file(u"foo.jpg", "hello")
# Add the test file to the client's media collection.
self.serverutils.add_files_to_mediasyncer(client,
@ -162,10 +170,8 @@ class SyncAppFunctionalMediaTest(SyncAppFunctionalTestBase):
'media')
# Create two files and add one to the server and one to the client.
file_for_client, file_for_server = self.fileutils.create_named_files([
(u"foo.jpg", "hello"),
(u"bar.jpg", "goodbye")
])
file_for_client = helpers.file_utils.create_named_file(u"foo.jpg", "hello")
file_for_server = helpers.file_utils.create_named_file(u"bar.jpg", "goodbye")
self.serverutils.add_files_to_mediasyncer(client,
[file_for_client],
@ -212,10 +218,8 @@ class SyncAppFunctionalMediaTest(SyncAppFunctionalTestBase):
# Create two files with identical names but different contents and
# checksums. Add one to the server and one to the client.
file_for_client, file_for_server = self.fileutils.create_named_files([
(u"foo.jpg", "hello"),
(u"foo.jpg", "goodbye")
])
file_for_client = helpers.file_utils.create_named_file(u"foo.jpg", "hello")
file_for_server = helpers.file_utils.create_named_file(u"foo.jpg", "goodbye")
self.serverutils.add_files_to_mediasyncer(client,
[file_for_client],
@ -259,7 +263,7 @@ class SyncAppFunctionalMediaTest(SyncAppFunctionalTestBase):
'media')
# Create a test file.
temp_file_path = self.fileutils.create_named_file(u"foo.jpg", "hello")
temp_file_path = helpers.file_utils.create_named_file(u"foo.jpg", "hello")
# Add the test file to client's media collection.
self.serverutils.add_files_to_mediasyncer(client,
@ -307,7 +311,7 @@ class SyncAppFunctionalMediaTest(SyncAppFunctionalTestBase):
# Add a test image file to the client's media collection but don't
# update its media db since the desktop client updates that, using
# findChanges(), only during syncs.
support_file = self.fileutils.get_asset_path(u'blue.jpg')
support_file = helpers.file_utils.get_asset_path(u'blue.jpg')
self.assertTrue(os.path.isfile(support_file))
self.serverutils.add_files_to_mediasyncer(client,
[support_file],