From 775036e3db5f1d279b699ce9d635ed10943ee08e Mon Sep 17 00:00:00 2001 From: David Snopek Date: Wed, 3 Apr 2013 23:42:01 +0100 Subject: [PATCH] Reduced the collection.py module to purely managing threads. --- AnkiServer/collection.py | 208 +++++++++++++++ AnkiServer/deck.py | 549 --------------------------------------- setup.py | 5 +- 3 files changed, 210 insertions(+), 552 deletions(-) create mode 100644 AnkiServer/collection.py delete mode 100644 AnkiServer/deck.py diff --git a/AnkiServer/collection.py b/AnkiServer/collection.py new file mode 100644 index 0000000..35f0d20 --- /dev/null +++ b/AnkiServer/collection.py @@ -0,0 +1,208 @@ + +import anki +import anki.storage + +from threading import Thread +from Queue import Queue + +try: + import simplejson as json +except ImportError: + import json + +import os, errno, time, logging + +__all__ = ['CollectionThread'] + +# TODO: I feel like we shouldn't need this wrapper... +class CollectionWrapper(object): + """A simple wrapper around a collection for the purpose of opening and closing on demand + as well as doing special initialization.""" + + def __init__(self, path): + self.path = os.path.realpath(path) + self._col = None + + def _create_colection(self): + # mkdir -p the path, because it might not exist + dirname = os.path.dirname(self.path) + try: + os.makedirs(dirname) + except OSError, exc: + if exc.errno == errno.EEXIST: + pass + else: + raise + + col = anki.Storage.Collection(self.path) + + # Do any special setup + self.setup_new_collection(col) + + return col + + def setup_new_collection(self, col): + """Override this function to initial collections in some special way.""" + pass + + def open(self): + if self._col is None: + if os.path.exists(self.path): + self._col = anki.DeckStorage.Deck(self.path) + else: + self._deck = self._create_deck() + return self._deck + + def close(self): + if self._col is None: + return + + self._col.close() + self._col = None + + def opened(self): + return self._col is not None + +class CollectionThread(object): + def __init__(self, path, wrapper_class=CollectionWrapper): + self.path = os.path.realpath(path) + self.wrapper = wrapper_class(path) + + self._queue = Queue() + self._thread = None + self._running = False + self.last_timestamp = time.time() + + @property + def running(self): + return self._running + + def qempty(self): + return self._queue.empty() + + def current(self): + from threading import current_thread + return current_thread() == self._thread + + def execute(self, func, args=[], kw={}, waitForReturn=True): + """ Executes a given function on this thread with the *args and **kw. + + If 'waitForReturn' is True, then it will block until the function has + executed and return its return value. If False, it will return None + immediately and the function will be executed sometime later. + """ + + if waitForReturn: + return_queue = Queue() + else: + return_queue = None + + self._queue.put((func, args, kw, return_queue)) + + if return_queue is not None: + ret = return_queue.get(True) + if isinstance(ret, Exception): + raise ret + return ret + + def _run(self): + logging.info('DeckThread[%s]: Starting...', self.path) + + try: + while self._running: + func, args, kw, return_queue = self._queue.get(True) + + logging.info('DeckThread[%s]: Running %s(*%s, **%s)', self.path, func.func_name, repr(args), repr(kw)) + self.last_timestamp = time.time() + + try: + ret = func(*args, **kw) + except Exception, e: + logging.error('DeckThread[%s]: Unable to %s(*%s, **%s): %s', + self.path, func.func_name, repr(args), repr(kw), e, exc_info=True) + # we return the Exception which will be raise'd on the other end + ret = e + + if return_queue is not None: + return_queue.put(ret) + except Exception, e: + logging.error('DeckThread[%s]: Thread crashed! Exception: %s', e, exc_info=True) + finally: + self.wrapper.close() + # clean out old thread object + self._thread = None + # in case we got here via an exception + self._running = False + + logging.info('DeckThread[%s]: Stopped!' % self.path) + + def start(self): + if not self._running: + self._running = True + assert self._thread is None + self._thread = Thread(target=self._run) + self._thread.start() + + def stop(self): + def _stop(): + self._running = False + self.execute(_stop, waitForReturn=False) + + def stop_and_wait(self): + """ Tell the thread to stop and wait for it to happen. """ + self.stop() + if self._thread is not None: + self._thread.join() + +class CollectionThreadPool(object): + def __init__(self, wrapper_class=CollectionWrapper): + self.wrapper_class = wrapper_class + self.threads = {} + + self.monitor_frequency = 15 + self.monitor_inactivity = 90 + + monitor = Thread(target=self._monitor_run) + monitor.daemon = True + monitor.start() + self._monitor_thread = monitor + + # TODO: it would be awesome to have a safe way to stop inactive threads completely! + # TODO: we need a way to inform other code that the collection has been closed + def _monitor_run(self): + """ Monitors threads for inactivity and closes the collection on them + (leaves the thread itself running -- hopefully waiting peacefully with only a + small memory footprint!) """ + while True: + cur = time.time() + for path, thread in self.threads.items(): + if thread.running and thread.wrapper.opened() and thread.qempty() and cur - thread.last_timestamp >= self.monitor_inactivity: + logging.info('Monitor is closing collection on inactive CollectionThread[%s]' % thread.path) + def closeCollection(wrapper): + wrapper.close() + thread.execute(closeCollection, [thread.wrapper], waitForReturn=False) + time.sleep(self.monitor_frequency) + + def create_thread(self, path): + return CollectionThread(path, wrapper_class=self.wrapper_class) + + def start(self, path): + path = os.path.realpath(path) + + try: + thread = self.threads[path] + except KeyError: + thread = self.threads[path] = self.create_thread(path) + + thread.start() + + return thread + + def shutdown(self): + for thread in self.threads.values(): + thread.stop() + self.threads = {} + +# TODO: There's got to be a way to do this without initializing it ALWAYS! +thread_pool = CollectionThreadPool() + diff --git a/AnkiServer/deck.py b/AnkiServer/deck.py deleted file mode 100644 index 749c36a..0000000 --- a/AnkiServer/deck.py +++ /dev/null @@ -1,549 +0,0 @@ - -from webob.dec import wsgify -from webob.exc import * -from webob import Response - -import anki -from anki.facts import Fact -from anki.models import Model, CardModel, FieldModel - -from threading import Thread -from Queue import Queue - -try: - import simplejson as json -except ImportError: - import json - -import os, errno, time, logging - -__all__ = ['DeckThread'] - -def ExternalModel(): - m = Model(u'External') - # we can only guarantee that the Front will be unique because it will - # be based on the headword, language, pos. The Back could be anything! - m.addFieldModel(FieldModel(u'Front', True, True)) - # while I think that Back should be required, I don't really want this to - # fail just because of that!! - m.addFieldModel(FieldModel(u'Back', False, False)) - m.addFieldModel(FieldModel(u'External ID', True, True)) - - front = u'{{{Front}}}' - back = u'{{{Back}}}' - m.addCardModel(CardModel(u'Forward', front, back)) - m.addCardModel(CardModel(u'Reverse', back, front)) - - m.tags = u"External" - return m - -class DeckWrapper(object): - def __init__(self, path): - self.path = os.path.abspath(path) - self._deck = None - - def _create_deck(self): - # mkdir -p the path, because it might not exist - dir = os.path.dirname(self.path) - try: - os.makedirs(dir) - except OSError, exc: - if exc.errno == errno.EEXIST: - pass - else: - raise - - deck = anki.DeckStorage.Deck(self.path) - try: - deck.initUndo() - deck.addModel(ExternalModel()) - deck.save() - except Exception, e: - deck.close() - deck = None - raise e - - return deck - - def open(self): - if self._deck is None: - if os.path.exists(self.path): - self._deck = anki.DeckStorage.Deck(self.path) - else: - self._deck = self._create_deck() - return self._deck - - def close(self): - if self._deck is None: - return - - self._deck.close() - self._deck = None - - # delete the cache for 'External ID' on this deck - if hasattr(self, '_external_field_id'): - delattr(self, '_external_field_id') - - def opened(self): - return self._deck is not None - - @property - def external_field_id(self): - if not hasattr(self, '_external_field_id'): - # find a field model id for a field named "External ID" - deck = self.open() - self._external_field_id = deck.s.scalar("SELECT id FROM fieldModels WHERE name = :name", name=u'External ID') - if self._external_field_id is None: - raise HTTPBadRequest("No field model named 'External ID'") - return self._external_field_id - - def find_fact(self, external_id): - deck = self.open() - return deck.s.scalar(""" - SELECT factId FROM fields WHERE fieldModelId = :fieldModelId AND - value = :externalId""", fieldModelId=self.external_field_id, externalId=external_id) - -class DeckThread(object): - def __init__(self, path): - self.path = os.path.abspath(path) - self.wrapper = DeckWrapper(path) - - self._queue = Queue() - self._thread = None - self._running = False - self.last_timestamp = time.time() - - @property - def running(self): - return self._running - - def qempty(self): - return self._queue.empty() - - def current(self): - from threading import current_thread - return current_thread() == self._thread - - def execute(self, func, args=[], kw={}, waitForReturn=True): - """ Executes a given function on this thread with the *args and **kw. - - If 'waitForReturn' is True, then it will block until the function has - executed and return its return value. If False, it will return None - immediately and the function will be executed sometime later. - """ - - if waitForReturn: - return_queue = Queue() - else: - return_queue = None - - self._queue.put((func, args, kw, return_queue)) - - if return_queue is not None: - ret = return_queue.get(True) - if isinstance(ret, Exception): - raise ret - return ret - - def _run(self): - logging.info('DeckThread[%s]: Starting...', self.path) - - try: - while self._running: - func, args, kw, return_queue = self._queue.get(True) - - logging.info('DeckThread[%s]: Running %s(*%s, **%s)', self.path, func.func_name, repr(args), repr(kw)) - self.last_timestamp = time.time() - - try: - ret = func(*args, **kw) - except Exception, e: - logging.error('DeckThread[%s]: Unable to %s(*%s, **%s): %s', - self.path, func.func_name, repr(args), repr(kw), e, exc_info=True) - # we return the Exception which will be raise'd on the other end - ret = e - - if return_queue is not None: - return_queue.put(ret) - except Exception, e: - logging.error('DeckThread[%s]: Thread crashed! Exception: %s', e, exc_info=True) - finally: - self.wrapper.close() - # clean out old thread object - self._thread = None - # in case we got here via an exception - self._running = False - - logging.info('DeckThread[%s]: Stopped!' % self.path) - - def start(self): - if not self._running: - self._running = True - assert self._thread is None - self._thread = Thread(target=self._run) - self._thread.start() - - def stop(self): - def _stop(): - self._running = False - self.execute(_stop, waitForReturn=False) - - def stop_and_wait(self): - """ Tell the thread to stop and wait for it to happen. """ - self.stop() - if self._thread is not None: - self._thread.join() - -class DeckThreadPool(object): - def __init__(self): - self.threads = {} - - self.monitor_frequency = 15 - self.monitor_inactivity = 90 - - monitor = Thread(target=self._monitor_run) - monitor.daemon = True - monitor.start() - self._monitor_thread = monitor - - # TODO: it would be awesome to have a safe way to stop inactive threads completely! - def _monitor_run(self): - """ Monitors threads for inactivity and closes the deck on them - (leaves the thread itself running -- hopefully waiting peacefully with only a - small memory footprint!) """ - while True: - cur = time.time() - for path, thread in self.threads.items(): - if thread.running and thread.wrapper.opened() and thread.qempty() and cur - thread.last_timestamp >= self.monitor_inactivity: - logging.info('Monitor is closing deck on inactive DeckThread[%s]' % thread.path) - def closeDeck(wrapper): - wrapper.close() - thread.execute(closeDeck, [thread.wrapper], waitForReturn=False) - time.sleep(self.monitor_frequency) - - def start(self, path): - path = os.path.abspath(path) - - try: - thread = self.threads[path] - except KeyError: - thread = self.threads[path] = DeckThread(path) - - thread.start() - - return thread - - def shutdown(self): - for thread in self.threads.values(): - thread.stop() - self.threads = {} - -thread_pool = DeckThreadPool() - -#def defer(*func, **opts): -# def decorator(func): -# def newFunc(*args, **kw): -# (self, thread) = args[0:2] -# if thread.current(): -# ret = func(*args, **kw) -# # don't return 'ret' if this isn't a wait function, to keep the API -# # consistent even when inside the thread itself (hopefully, help -# # avoid weird problems in the future) -# if opts.get('waitForReturn', True): -# return ret -# else: -# return thread.execute(func, args, kw, **opts) -# newFunc.func_name = func.func_name -# return newFunc -# -# if len(func) == 1: -# return decorator(func[0]) -# elif len(func) > 1: -# raise TypeError -# -# return decorator - -def opts(**opts): - def dec(func): - func.opts = opts - return func - return dec - -class DeckAppHandler(object): - def __init__(self, wrapper): - self.wrapper = wrapper - - def _output_fact(self, fact): - res = dict(zip(fact.keys(), fact.values())) - res['id'] = str(fact.id) - return res - - def _output_card(self, card): - return { - 'id': card.id, - 'question': card.question, - 'answer': card.answer, - } - - @opts(waitForReturn=False) - def setup(self): - # will create the deck if it doesn't exist - self.wrapper.open() - - @opts(waitForReturn=False) - def add_fact(self, fields): - fact_id = self.wrapper.find_fact(fields['External ID']) - if fact_id is not None: - fields['id'] = fact_id - self.save_fact(fields) - else: - deck = self.wrapper.open() - fact = deck.newFact() - for key in fact.keys(): - fact[key] = unicode(fields[key]) - - deck.addFact(fact) - deck.save() - - @opts(waitForReturn=False) - def save_fact(self, fact): - deck = self.wrapper.open() - newFact = deck.s.query(Fact).get(int(fact['id'])) - for key in newFact.keys(): - newFact[key] = fact[key] - - newFact.setModified(textChanged=True, deck=deck) - deck.setModified() - deck.save() - - def find_fact(self, external_id): - factId = self.wrapper.find_fact(external_id) - if not factId: - # we need to signal somehow to the calling application that no such - # deck exists, but without it being considered a "bad error". 404 is - # inappropriate that refers to the resource (ie. /find_fact) which is - # here obviously. - return None - - deck = self.wrapper.open() - fact = deck.s.query(Fact).get(factId) - return self._output_fact(fact) - - @opts(waitForReturn=False) - def delete_fact(self, fact_id=None, external_id=None): - if fact_id is None and external_id is not None: - fact_id = self.wrapper.find_fact(external_id) - if fact_id is not None: - deck = self.wrapper.open() - deck.deleteFact(int(fact_id)) - deck.save() - - def resync_facts(self, external_ids): - from anki.facts import fieldsTable - from sqlalchemy.sql import select, and_, not_ - - deck = self.wrapper.open() - - # remove extra cards - selectExtra = select([fieldsTable.c.factId], - and_( - fieldsTable.c.fieldModelId == self.wrapper.external_field_id, - not_(fieldsTable.c.value.in_(external_ids)) - ) - ) - for factId, in deck.s.execute(selectExtra).fetchall(): - deck.deleteFact(factId) - deck.save() - - # find ids that should be on this deck but which aren't - missing_ids = [] - for external_id in external_ids: - if self.wrapper.find_fact(external_id) is None: - missing_ids.append(external_id) - - return {'missing':missing_ids} - - def get_card(self): - deck = self.wrapper.open() - card = deck.getCard() - if card: - # grab the interval strings - intervals = [] - for i in range(1, 5): - intervals.append(deck.nextIntervalStr(card, i)) - - card = self._output_card(card) - card['intervals'] = intervals - card['finished'] = False - else: - # copied from Deck.nextDueMsg() in libanki/anki/deck.py - newCount = deck.newCardsDueBy(deck.dueCutoff + 86400) - newCardsTomorrow = min(newCount, deck.newCardsPerDay) - cards = deck.cardsDueBy(deck.dueCutoff + 86400) - - card = { - 'finished': True, - 'new_count': newCardsTomorrow, - 'reviews_count': cards - } - - # TODO: clean up a bit, now that we've finished this review - - return card - - @opts(waitForReturn=False) - def setup_scheduler(self, name): - deck = self.wrapper.open() - if name == 'standard': - deck.setupStandardScheduler() - elif name == 'reviewEarly': - deck.setupReviewEarlyScheduler() - elif name == 'learnMore': - deck.setupLearnMoreScheduler() - deck.refreshSession() - deck.reset() - - def get_options(self): - deck = self.wrapper.open() - - return { - 'new_cards': { - 'cards_per_day': deck.newCardsPerDay, - 'order': deck.newCardOrder, - 'spacing': deck.newCardSpacing, - }, - 'reviews': { - 'failed_card_max': deck.failedCardMax, - 'order': deck.revCardOrder, - 'failed_policy': deck.getFailedCardPolicy(), - } - } - - @opts(waitForReturn=False) - def set_options(self, study_options): - deck = self.wrapper.open() - - # new card options - deck.newCardsPerDay = int(study_options['new_cards']['cards_per_day']) - deck.newCardOrder = int(study_options['new_cards']['order']) - if deck.newCardOrder == anki.deck.NEW_CARDS_RANDOM: - deck.randomizeNewCards() - deck.newCardSpacing = int(study_options['new_cards']['spacing']) - - # reviews options - deck.setFailedCardPolicy(int(study_options['reviews']['failed_policy'])) - deck.failedCardMax = int(study_options['reviews']['failed_card_max']) - deck.revCardOrder = int(study_options['reviews']['order']) - - deck.flushMod() - deck.reset() - deck.save() - - def answer_card(self, card_id, ease): - ease = int(ease) - deck = self.wrapper.open() - card = deck.cardFromId(card_id) - if card: - try: - deck.answerCard(card, ease) - except: - import sys, traceback - exc_info = sys.exc_info() - print exc_info[1] - print traceback.print_tb(exc_info[2]) - return False - deck.save() - return True - -class DeckApp(object): - """ Our WSGI app. """ - - direct_operations = ['add_fact', 'save_fact', 'find_fact', 'delete_fact', 'resync_facts', - 'get_card', 'answer_card'] - - def __init__(self, data_root, allowed_hosts): - self.data_root = os.path.abspath(data_root) - self.allowed_hosts = allowed_hosts - - def _get_path(self, path): - npath = os.path.normpath(os.path.join(self.data_root, path)) - if npath[0:len(self.data_root)] != self.data_root: - # attempting to escape our data jail! - raise HTTPBadRequest('"%s" is not a valid path/id' % path) - return npath - - @wsgify - def __call__(self, req): - global thread_pool - - if self.allowed_hosts != '*': - try: - remote_addr = req.headers['X-Forwarded-For'] - except KeyError: - remote_addr = req.remote_addr - if remote_addr != self.allowed_hosts: - raise HTTPForbidden() - - if req.method != 'POST': - raise HTTPMethodNotAllowed(allow=['POST']) - - # get the deck and function to call from the path - func = req.path - if func[0] == '/': - func = func[1:] - parts = func.split('/') - path = '/'.join(parts[:-1]) - func = parts[-1] - if func[0] == '_' or not hasattr(DeckAppHandler, func) or not callable(getattr(DeckAppHandler, func)): - raise HTTPNotFound() - thread = thread_pool.start(self._get_path(path)) - handler = DeckAppHandler(thread.wrapper) - func = getattr(handler, func) - try: - opts = func.opts - except AttributeError: - opts = {} - - try: - input = json.loads(req.body) - except ValueError, e: - logging.error(req.path+': Unable to parse JSON: '+str(e), exc_info=True) - raise HTTPBadRequest() - # make the keys into non-unicode strings - input = dict([(str(k), v) for k, v in input.items()]) - - # debug - from pprint import pprint - pprint(input) - - # run it! - try: - output = thread.execute(func, [], input, **opts) - except Exception, e: - logging.error(e) - return HTTPInternalServerError() - - if output is None: - return Response('', content_type='text/plain') - else: - return Response(json.dumps(output), content_type='application/json') - -# Our entry point -def make_app(global_conf, **local_conf): - # setup the logger - logging_config_file = local_conf.get('logging.config_file') - if logging_config_file: - # monkey patch the logging.config.SMTPHandler if necessary - import sys - if sys.version_info[0] == 2 and sys.version_info[1] == 5: - import AnkiServer.logpatch - - # load the config file - import logging.config - logging.config.fileConfig(logging_config_file) - - return DeckApp( - data_root=local_conf.get('data_root', '.'), - allowed_hosts=local_conf.get('allowed_hosts', '*') - ) - diff --git a/setup.py b/setup.py index 8688fc4..29ceb90 100644 --- a/setup.py +++ b/setup.py @@ -3,8 +3,8 @@ from setuptools import setup, find_packages setup( name="AnkiServer", - version="0.0.1", - description="Provides the a RESTful API to manipulating Anki decks", + version="2.0.0a1", + description="A personal Anki sync server (so you can sync against your own server rather than AnkiWeb)", author="David Snopek", author_email="dsnopek@gmail.com", install_requires=["PasteDeploy>=1.3.2"], @@ -13,7 +13,6 @@ setup( test_suite='nose.collector', entry_points=""" [paste.app_factory] - deckapp = AnkiServer.deck:make_app syncapp = AnkiServer.sync:make_app [paste.server_runner]