diff --git a/AnkiServer/collection.py b/AnkiServer/collection.py index 35f0d20..11105b0 100644 --- a/AnkiServer/collection.py +++ b/AnkiServer/collection.py @@ -48,10 +48,10 @@ class CollectionWrapper(object): def open(self): if self._col is None: if os.path.exists(self.path): - self._col = anki.DeckStorage.Deck(self.path) + self._col = anki.storage.Collection(self.path) else: - self._deck = self._create_deck() - return self._deck + self._col = self._create_collection() + return self._col def close(self): if self._col is None: @@ -106,19 +106,19 @@ class CollectionThread(object): return ret def _run(self): - logging.info('DeckThread[%s]: Starting...', self.path) + logging.info('CollectionThread[%s]: Starting...', self.path) try: while self._running: func, args, kw, return_queue = self._queue.get(True) - logging.info('DeckThread[%s]: Running %s(*%s, **%s)', self.path, func.func_name, repr(args), repr(kw)) + logging.info('CollectionThread[%s]: Running %s(*%s, **%s)', self.path, func.func_name, repr(args), repr(kw)) self.last_timestamp = time.time() try: ret = func(*args, **kw) except Exception, e: - logging.error('DeckThread[%s]: Unable to %s(*%s, **%s): %s', + logging.error('CollectionThread[%s]: Unable to %s(*%s, **%s): %s', self.path, func.func_name, repr(args), repr(kw), e, exc_info=True) # we return the Exception which will be raise'd on the other end ret = e @@ -126,7 +126,7 @@ class CollectionThread(object): if return_queue is not None: return_queue.put(ret) except Exception, e: - logging.error('DeckThread[%s]: Thread crashed! Exception: %s', e, exc_info=True) + logging.error('CollectionThread[%s]: Thread crashed! Exception: %s', e, exc_info=True) finally: self.wrapper.close() # clean out old thread object @@ -134,7 +134,7 @@ class CollectionThread(object): # in case we got here via an exception self._running = False - logging.info('DeckThread[%s]: Stopped!' % self.path) + logging.info('CollectionThread[%s]: Stopped!' % self.path) def start(self): if not self._running: diff --git a/AnkiServer/sync.py b/AnkiServer/sync.py index 9ffa0a6..0c5b7b3 100644 --- a/AnkiServer/sync.py +++ b/AnkiServer/sync.py @@ -7,25 +7,17 @@ from webob import Response import sys sys.path.insert(0, "/usr/share/anki") -#import anki -#from anki.sync import HttpSyncServer, CHUNK_SIZE -#from anki.db import sqlite -#from anki.utils import checksum import anki from anki.sync import LocalServer, MediaSyncer # TODO: shouldn't use this directly! This should be through the thread pool from anki.storage import Collection -#import AnkiServer.deck - -#import MySQLdb - try: import simplejson as json except ImportError: import json -import os, zlib, tempfile, time +import os, tempfile class SyncCollectionHandler(LocalServer): operations = ['meta', 'applyChanges', 'start', 'chunk', 'applyChunk', 'sanityCheck2', 'finish'] @@ -33,6 +25,22 @@ class SyncCollectionHandler(LocalServer): def __init__(self, col): LocalServer.__init__(self, col) + + def applyChanges(self, changes): + #self.lmod, lscm, self.maxUsn, lts, dummy = self.meta() + # TODO: how should we set this value? + #self.lnewer = 1 + + result = LocalServer.applyChanges(self, changes) + + #self.prepareToChunk() + + return result + + #def chunk(self, ): + # self.prepareToChunk() + # return LocalServer.chunk() + class SyncMediaHandler(MediaSyncer): operations = ['remove', 'files', 'addFiles', 'mediaSanity'] @@ -52,28 +60,45 @@ class SyncMediaHandler(MediaSyncer): return fd.getvalue() -class SyncUser(object): +class SyncUserSession(object): def __init__(self, name, path): - # make sure the user path exists - if not os.path.exists(path): - os.mkdir(path) - import time self.name = name self.path = path self.version = 0 self.created = time.time() + # make sure the user path exists + if not os.path.exists(path): + os.mkdir(path) + + self.collection_handler = None + self.media_handler = None + def get_collection_path(self): return os.path.realpath(os.path.join(self.path, 'collection.anki2')) + + def get_thread(self): + from AnkiServer.collection import thread_pool + return thread_pool.start(self.get_collection_path()) + + def get_handler_for_operation(self, operation, col): + if operation in SyncCollectionHandler.operations: + cache_name, handler_class = 'collection_handler', SyncCollectionHandler + else: + cache_name, handler_class = 'media_handler', SyncMediaHandler + + if getattr(self, cache_name) is None: + setattr(self, cache_name, handler_class(col)) + return getattr(self, cache_name) class SyncApp(object): - valid_urls = SyncCollectionHandler.operations + SyncMediaHandler.operations + ['hostKey', 'upload'] + valid_urls = SyncCollectionHandler.operations + SyncMediaHandler.operations + ['hostKey', 'upload', 'download', 'getDecks'] def __init__(self, **kw): self.data_root = os.path.abspath(kw.get('data_root', '.')) self.base_url = kw.get('base_url', '/') - self.users = {} + self.sessions = {} # make sure the base_url has a trailing slash if len(self.base_url) == 0: @@ -82,20 +107,48 @@ class SyncApp(object): self.base_url = base_url + '/' def authenticate(self, username, password): - """Override this to change how users are authenticated.""" + """ + Returns True if this username is allowed to connect with this password. False otherwise. + + Override this to change how users are authenticated. + """ + # TODO: This should have the exact opposite default ;-) return True def username2dirname(self, username): - """Override this to adjust the mapping between users and their directory.""" + """ + Returns the directory name for the given user. By default, this is just the username. + + Override this to adjust the mapping between users and their directory. + """ + return username def generateHostKey(self, username): + """Generates a new host key to be used by the given username to identify their session. + This values is random.""" + import hashlib, time, random, string chars = string.ascii_letters + string.digits val = ':'.join([username, str(int(time.time())), ''.join(random.choice(chars) for x in range(8))]) return hashlib.md5(val).hexdigest() + def create_session(self, hkey, username, user_path): + """Creates, stores and returns a new session for the given hkey and username.""" + + session = self.sessions[hkey] = SyncUserSession(username, user_path) + return session + + def load_session(self, hkey): + return self.sessions.get(hkey) + + def save_session(self, hkey, session): + pass + + def delete_session(self, hkey): + del self.sessions[hkey] + def _decode_data(self, data, compression=0): import gzip, StringIO @@ -112,6 +165,16 @@ class SyncApp(object): return data + def operation_upload(self, wrapper, data, session): + # TODO: deal with thread pool + + fd = open(session.get_collection_path(), 'wb') + fd.write(data) + fd.close() + + def operation_download(self, wrapper, data, session): + pass + @wsgify def __call__(self, req): print req.path @@ -120,6 +183,15 @@ class SyncApp(object): if url not in self.valid_urls: raise HTTPNotFound() + if url == 'getDecks': + # This is an Anki 1.x client! Tell them to upgrade. + import zlib + return Response( + status='200 OK', + content_type='application/json', + content_encoding='deflate', + body=zlib.compress(json.dumps({'status': 'oldVersion'}))) + try: compression = req.POST['c'] except KeyError: @@ -146,10 +218,9 @@ class SyncApp(object): if dirname is None: raise HTTPForbidden() - # setup user and map to a hkey hkey = self.generateHostKey(u) user_path = os.path.join(self.data_root, dirname) - self.users[hkey] = SyncUser(u, user_path) + session = self.create_session(hkey, u, user_path) result = {'key': hkey} return Response( @@ -160,60 +231,62 @@ class SyncApp(object): # TODO: do I have to pass 'null' for the client to receive None? raise HTTPForbidden('null') - # verify the hostkey + # Get and verify the session try: hkey = req.POST['k'] - user = self.users[hkey] except KeyError: raise HTTPForbidden() + session = self.load_session(hkey) + if session is None: + raise HTTPForbidden() if url in SyncCollectionHandler.operations + SyncMediaHandler.operations: - # TODO: use thread pool! - col = Collection(user.get_collection_path()) - - if url in SyncCollectionHandler.operations: - handler = SyncCollectionHandler(col) - else: - handler = SyncMediaHandler(col) - - func = getattr(handler, url) - # 'meta' passes the SYNC_VER but it isn't used in the handler if url == 'meta' and data.has_key('v'): - user.version = data['v'] + session.version = data['v'] del data['v'] - try: + # Create a closure to run this operation inside of the thread allocated to this collection + def runFunc(wrapper): + handler = session.get_handler_for_operation(url, wrapper.open()) + func = getattr(handler, url) result = func(**data) - #except Exception, e: - # print e - # raise HTTPInternalServerError() - finally: - col.close() + handler.col.save() + return result + runFunc.func_name = url + + # Send to the thread to execute + thread = session.get_thread() + result = thread.execute(runFunc, [thread.wrapper]) # If it's a complex data type, we convert it to JSON if type(result) not in (str, unicode): result = json.dumps(result) + + if url == 'finish': + self.delete_session(hkey) return Response( status='200 OK', content_type='application/json', body=result) - elif url == 'upload': - # TODO: deal with thread pool + elif url in ('upload', 'download'): + if url == 'upload': + func = self.operation_upload + else: + func = self.operation_download - fd = open(user.get_collection_path(), 'wb') - fd.write(data['data']) - fd.close() + thread = session.get_thread() + thread.execute(self.operation_upload, [thread.wrapper, data['data'], session]) return Response( status='200 OK', content_type='text/plain', body='OK') - # TODO: turn this into a 500 error in the future! - return Response(status='503 Temporarily Unavailable ', content_type='text/plain', body='This operation isn\'t implemented yet.') + # This was one of our operations but it didn't get handled... Oops! + raise HTTPInternalServerError() return Response(status='200 OK', content_type='text/plain', body='Anki Sync Server') @@ -222,8 +295,8 @@ def make_app(global_conf, **local_conf): return SyncApp(**local_conf) def main(): - from wsgiref.simple_server import make_server + from AnkiServer.collection import thread_pool ankiserver = SyncApp() httpd = make_server('', 8001, ankiserver) @@ -233,8 +306,7 @@ def main(): except KeyboardInterrupt: print "Exiting ..." finally: - #AnkiServer.deck.thread_pool.shutdown() - pass + thread_pool.shutdown() if __name__ == '__main__': main()