Use more with statements
This commit is contained in:
parent
1677ed38b9
commit
0e5bbf4f9e
@ -179,7 +179,8 @@ class SyncMediaHandler(anki.sync.MediaSyncer):
|
|||||||
file_path = os.path.join(self.col.media.dir(), filename)
|
file_path = os.path.join(self.col.media.dir(), filename)
|
||||||
|
|
||||||
# Save file to media directory.
|
# Save file to media directory.
|
||||||
open(file_path, 'wb').write(file_data)
|
with open(file_path, 'wb') as f:
|
||||||
|
f.write(file_data)
|
||||||
mtime = self.col.media._mtime(file_path)
|
mtime = self.col.media._mtime(file_path)
|
||||||
|
|
||||||
media_to_add.append((filename, csum, mtime, 0))
|
media_to_add.append((filename, csum, mtime, 0))
|
||||||
@ -242,18 +243,17 @@ class SyncMediaHandler(anki.sync.MediaSyncer):
|
|||||||
cnt = 0
|
cnt = 0
|
||||||
sz = 0
|
sz = 0
|
||||||
f = BytesIO()
|
f = BytesIO()
|
||||||
z = zipfile.ZipFile(f, "w", compression=zipfile.ZIP_DEFLATED)
|
|
||||||
|
|
||||||
for fname in files:
|
with zipfile.ZipFile(f, "w", compression=zipfile.ZIP_DEFLATED) as z:
|
||||||
z.write(os.path.join(self.col.media.dir(), fname), str(cnt))
|
for fname in files:
|
||||||
flist[str(cnt)] = fname
|
z.write(os.path.join(self.col.media.dir(), fname), str(cnt))
|
||||||
sz += os.path.getsize(os.path.join(self.col.media.dir(), fname))
|
flist[str(cnt)] = fname
|
||||||
if sz > SYNC_ZIP_SIZE or cnt > SYNC_ZIP_COUNT:
|
sz += os.path.getsize(os.path.join(self.col.media.dir(), fname))
|
||||||
break
|
if sz > SYNC_ZIP_SIZE or cnt > SYNC_ZIP_COUNT:
|
||||||
cnt += 1
|
break
|
||||||
|
cnt += 1
|
||||||
|
|
||||||
z.writestr("_meta", json.dumps(flist))
|
z.writestr("_meta", json.dumps(flist))
|
||||||
z.close()
|
|
||||||
|
|
||||||
return f.getvalue()
|
return f.getvalue()
|
||||||
|
|
||||||
@ -389,9 +389,8 @@ class SyncApp:
|
|||||||
import gzip
|
import gzip
|
||||||
|
|
||||||
if compression:
|
if compression:
|
||||||
buf = gzip.GzipFile(mode="rb", fileobj=BytesIO(data))
|
with gzip.GzipFile(mode="rb", fileobj=BytesIO(data)) as gz:
|
||||||
data = buf.read()
|
data = gz.read()
|
||||||
buf.close()
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
data = json.loads(data.decode())
|
data = json.loads(data.decode())
|
||||||
@ -423,11 +422,10 @@ class SyncApp:
|
|||||||
f.write(data)
|
f.write(data)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
test_db = anki.db.DB(temp_db_path)
|
with anki.db.DB(temp_db_path) as test_db:
|
||||||
if test_db.scalar("pragma integrity_check") != "ok":
|
if test_db.scalar("pragma integrity_check") != "ok":
|
||||||
raise HTTPBadRequest("Integrity check failed for uploaded "
|
raise HTTPBadRequest("Integrity check failed for uploaded "
|
||||||
"collection database file.")
|
"collection database file.")
|
||||||
test_db.close()
|
|
||||||
except sqlite.Error as e:
|
except sqlite.Error as e:
|
||||||
raise HTTPBadRequest("Uploaded collection database file is "
|
raise HTTPBadRequest("Uploaded collection database file is "
|
||||||
"corrupt.")
|
"corrupt.")
|
||||||
|
|||||||
@ -26,7 +26,8 @@ def create_named_file(filename, file_contents=None):
|
|||||||
file_path = os.path.join(temp_file_parent_dir, filename)
|
file_path = os.path.join(temp_file_parent_dir, filename)
|
||||||
|
|
||||||
if file_contents is not None:
|
if file_contents is not None:
|
||||||
open(file_path, 'w').write(file_contents)
|
with open(file_path, "w") as f:
|
||||||
|
f.write(file_contents)
|
||||||
|
|
||||||
return file_path
|
return file_path
|
||||||
|
|
||||||
@ -41,30 +42,23 @@ def create_zip_with_existing_files(file_paths):
|
|||||||
:return: the data of the created zip file
|
:return: the data of the created zip file
|
||||||
"""
|
"""
|
||||||
|
|
||||||
file_buffer = BytesIO()
|
buf = BytesIO()
|
||||||
zip_file = zipfile.ZipFile(file_buffer,
|
with zipfile.ZipFile(buf, "w", compression=zipfile.ZIP_DEFLATED) as z:
|
||||||
'w',
|
meta = []
|
||||||
compression=zipfile.ZIP_DEFLATED)
|
size = 0
|
||||||
|
|
||||||
meta = []
|
for index, path in enumerate(file_paths):
|
||||||
sz = 0
|
z.write(path, str(index))
|
||||||
|
normname = unicodedata.normalize("NFC", os.path.basename(path))
|
||||||
|
meta.append((normname, str(index)))
|
||||||
|
|
||||||
for count, filePath in enumerate(file_paths):
|
size += os.path.getsize(path)
|
||||||
zip_file.write(filePath, str(count))
|
if size >= SYNC_ZIP_SIZE:
|
||||||
normname = unicodedata.normalize(
|
break
|
||||||
"NFC",
|
|
||||||
os.path.basename(filePath)
|
|
||||||
)
|
|
||||||
meta.append((normname, str(count)))
|
|
||||||
|
|
||||||
sz += os.path.getsize(filePath)
|
z.writestr("_meta", json.dumps(meta))
|
||||||
if sz >= SYNC_ZIP_SIZE:
|
|
||||||
break
|
|
||||||
|
|
||||||
zip_file.writestr("_meta", json.dumps(meta))
|
return buf.getvalue()
|
||||||
zip_file.close()
|
|
||||||
|
|
||||||
return file_buffer.getvalue()
|
|
||||||
|
|
||||||
|
|
||||||
def get_asset_path(relative_file_path):
|
def get_asset_path(relative_file_path):
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user