Actually integrated the thread_pool into the SyncApp.

This commit is contained in:
David Snopek 2013-04-04 00:42:27 +01:00
parent 775036e3db
commit 661662400f
2 changed files with 129 additions and 57 deletions

View File

@ -48,10 +48,10 @@ class CollectionWrapper(object):
def open(self):
if self._col is None:
if os.path.exists(self.path):
self._col = anki.DeckStorage.Deck(self.path)
self._col = anki.storage.Collection(self.path)
else:
self._deck = self._create_deck()
return self._deck
self._col = self._create_collection()
return self._col
def close(self):
if self._col is None:
@ -106,19 +106,19 @@ class CollectionThread(object):
return ret
def _run(self):
logging.info('DeckThread[%s]: Starting...', self.path)
logging.info('CollectionThread[%s]: Starting...', self.path)
try:
while self._running:
func, args, kw, return_queue = self._queue.get(True)
logging.info('DeckThread[%s]: Running %s(*%s, **%s)', self.path, func.func_name, repr(args), repr(kw))
logging.info('CollectionThread[%s]: Running %s(*%s, **%s)', self.path, func.func_name, repr(args), repr(kw))
self.last_timestamp = time.time()
try:
ret = func(*args, **kw)
except Exception, e:
logging.error('DeckThread[%s]: Unable to %s(*%s, **%s): %s',
logging.error('CollectionThread[%s]: Unable to %s(*%s, **%s): %s',
self.path, func.func_name, repr(args), repr(kw), e, exc_info=True)
# we return the Exception which will be raise'd on the other end
ret = e
@ -126,7 +126,7 @@ class CollectionThread(object):
if return_queue is not None:
return_queue.put(ret)
except Exception, e:
logging.error('DeckThread[%s]: Thread crashed! Exception: %s', e, exc_info=True)
logging.error('CollectionThread[%s]: Thread crashed! Exception: %s', e, exc_info=True)
finally:
self.wrapper.close()
# clean out old thread object
@ -134,7 +134,7 @@ class CollectionThread(object):
# in case we got here via an exception
self._running = False
logging.info('DeckThread[%s]: Stopped!' % self.path)
logging.info('CollectionThread[%s]: Stopped!' % self.path)
def start(self):
if not self._running:

View File

@ -7,25 +7,17 @@ from webob import Response
import sys
sys.path.insert(0, "/usr/share/anki")
#import anki
#from anki.sync import HttpSyncServer, CHUNK_SIZE
#from anki.db import sqlite
#from anki.utils import checksum
import anki
from anki.sync import LocalServer, MediaSyncer
# TODO: shouldn't use this directly! This should be through the thread pool
from anki.storage import Collection
#import AnkiServer.deck
#import MySQLdb
try:
import simplejson as json
except ImportError:
import json
import os, zlib, tempfile, time
import os, tempfile
class SyncCollectionHandler(LocalServer):
operations = ['meta', 'applyChanges', 'start', 'chunk', 'applyChunk', 'sanityCheck2', 'finish']
@ -33,6 +25,22 @@ class SyncCollectionHandler(LocalServer):
def __init__(self, col):
LocalServer.__init__(self, col)
def applyChanges(self, changes):
#self.lmod, lscm, self.maxUsn, lts, dummy = self.meta()
# TODO: how should we set this value?
#self.lnewer = 1
result = LocalServer.applyChanges(self, changes)
#self.prepareToChunk()
return result
#def chunk(self, ):
# self.prepareToChunk()
# return LocalServer.chunk()
class SyncMediaHandler(MediaSyncer):
operations = ['remove', 'files', 'addFiles', 'mediaSanity']
@ -52,28 +60,45 @@ class SyncMediaHandler(MediaSyncer):
return fd.getvalue()
class SyncUser(object):
class SyncUserSession(object):
def __init__(self, name, path):
# make sure the user path exists
if not os.path.exists(path):
os.mkdir(path)
import time
self.name = name
self.path = path
self.version = 0
self.created = time.time()
# make sure the user path exists
if not os.path.exists(path):
os.mkdir(path)
self.collection_handler = None
self.media_handler = None
def get_collection_path(self):
return os.path.realpath(os.path.join(self.path, 'collection.anki2'))
def get_thread(self):
from AnkiServer.collection import thread_pool
return thread_pool.start(self.get_collection_path())
def get_handler_for_operation(self, operation, col):
if operation in SyncCollectionHandler.operations:
cache_name, handler_class = 'collection_handler', SyncCollectionHandler
else:
cache_name, handler_class = 'media_handler', SyncMediaHandler
if getattr(self, cache_name) is None:
setattr(self, cache_name, handler_class(col))
return getattr(self, cache_name)
class SyncApp(object):
valid_urls = SyncCollectionHandler.operations + SyncMediaHandler.operations + ['hostKey', 'upload']
valid_urls = SyncCollectionHandler.operations + SyncMediaHandler.operations + ['hostKey', 'upload', 'download', 'getDecks']
def __init__(self, **kw):
self.data_root = os.path.abspath(kw.get('data_root', '.'))
self.base_url = kw.get('base_url', '/')
self.users = {}
self.sessions = {}
# make sure the base_url has a trailing slash
if len(self.base_url) == 0:
@ -82,20 +107,48 @@ class SyncApp(object):
self.base_url = base_url + '/'
def authenticate(self, username, password):
"""Override this to change how users are authenticated."""
"""
Returns True if this username is allowed to connect with this password. False otherwise.
Override this to change how users are authenticated.
"""
# TODO: This should have the exact opposite default ;-)
return True
def username2dirname(self, username):
"""Override this to adjust the mapping between users and their directory."""
"""
Returns the directory name for the given user. By default, this is just the username.
Override this to adjust the mapping between users and their directory.
"""
return username
def generateHostKey(self, username):
"""Generates a new host key to be used by the given username to identify their session.
This values is random."""
import hashlib, time, random, string
chars = string.ascii_letters + string.digits
val = ':'.join([username, str(int(time.time())), ''.join(random.choice(chars) for x in range(8))])
return hashlib.md5(val).hexdigest()
def create_session(self, hkey, username, user_path):
"""Creates, stores and returns a new session for the given hkey and username."""
session = self.sessions[hkey] = SyncUserSession(username, user_path)
return session
def load_session(self, hkey):
return self.sessions.get(hkey)
def save_session(self, hkey, session):
pass
def delete_session(self, hkey):
del self.sessions[hkey]
def _decode_data(self, data, compression=0):
import gzip, StringIO
@ -112,6 +165,16 @@ class SyncApp(object):
return data
def operation_upload(self, wrapper, data, session):
# TODO: deal with thread pool
fd = open(session.get_collection_path(), 'wb')
fd.write(data)
fd.close()
def operation_download(self, wrapper, data, session):
pass
@wsgify
def __call__(self, req):
print req.path
@ -120,6 +183,15 @@ class SyncApp(object):
if url not in self.valid_urls:
raise HTTPNotFound()
if url == 'getDecks':
# This is an Anki 1.x client! Tell them to upgrade.
import zlib
return Response(
status='200 OK',
content_type='application/json',
content_encoding='deflate',
body=zlib.compress(json.dumps({'status': 'oldVersion'})))
try:
compression = req.POST['c']
except KeyError:
@ -146,10 +218,9 @@ class SyncApp(object):
if dirname is None:
raise HTTPForbidden()
# setup user and map to a hkey
hkey = self.generateHostKey(u)
user_path = os.path.join(self.data_root, dirname)
self.users[hkey] = SyncUser(u, user_path)
session = self.create_session(hkey, u, user_path)
result = {'key': hkey}
return Response(
@ -160,60 +231,62 @@ class SyncApp(object):
# TODO: do I have to pass 'null' for the client to receive None?
raise HTTPForbidden('null')
# verify the hostkey
# Get and verify the session
try:
hkey = req.POST['k']
user = self.users[hkey]
except KeyError:
raise HTTPForbidden()
session = self.load_session(hkey)
if session is None:
raise HTTPForbidden()
if url in SyncCollectionHandler.operations + SyncMediaHandler.operations:
# TODO: use thread pool!
col = Collection(user.get_collection_path())
if url in SyncCollectionHandler.operations:
handler = SyncCollectionHandler(col)
else:
handler = SyncMediaHandler(col)
func = getattr(handler, url)
# 'meta' passes the SYNC_VER but it isn't used in the handler
if url == 'meta' and data.has_key('v'):
user.version = data['v']
session.version = data['v']
del data['v']
try:
# Create a closure to run this operation inside of the thread allocated to this collection
def runFunc(wrapper):
handler = session.get_handler_for_operation(url, wrapper.open())
func = getattr(handler, url)
result = func(**data)
#except Exception, e:
# print e
# raise HTTPInternalServerError()
finally:
col.close()
handler.col.save()
return result
runFunc.func_name = url
# Send to the thread to execute
thread = session.get_thread()
result = thread.execute(runFunc, [thread.wrapper])
# If it's a complex data type, we convert it to JSON
if type(result) not in (str, unicode):
result = json.dumps(result)
if url == 'finish':
self.delete_session(hkey)
return Response(
status='200 OK',
content_type='application/json',
body=result)
elif url == 'upload':
# TODO: deal with thread pool
elif url in ('upload', 'download'):
if url == 'upload':
func = self.operation_upload
else:
func = self.operation_download
fd = open(user.get_collection_path(), 'wb')
fd.write(data['data'])
fd.close()
thread = session.get_thread()
thread.execute(self.operation_upload, [thread.wrapper, data['data'], session])
return Response(
status='200 OK',
content_type='text/plain',
body='OK')
# TODO: turn this into a 500 error in the future!
return Response(status='503 Temporarily Unavailable ', content_type='text/plain', body='This operation isn\'t implemented yet.')
# This was one of our operations but it didn't get handled... Oops!
raise HTTPInternalServerError()
return Response(status='200 OK', content_type='text/plain', body='Anki Sync Server')
@ -222,8 +295,8 @@ def make_app(global_conf, **local_conf):
return SyncApp(**local_conf)
def main():
from wsgiref.simple_server import make_server
from AnkiServer.collection import thread_pool
ankiserver = SyncApp()
httpd = make_server('', 8001, ankiserver)
@ -233,8 +306,7 @@ def main():
except KeyboardInterrupt:
print "Exiting ..."
finally:
#AnkiServer.deck.thread_pool.shutdown()
pass
thread_pool.shutdown()
if __name__ == '__main__': main()