anki-sync-server/AnkiServer/threading.py
David Snopek e25cf25684 Squashed commit of the following:
commit cb509e8f75e3dcdbc66327be4bfbf6661aa084b5
Author: David Snopek <dsnopek@gmail.com>
Date:   Fri Jul 12 22:06:28 2013 +0100

    Cut down 'import' statements to only modules actually used.

commit 0ea255115e095e31af5a991e9cce2b5b15cb496d
Author: David Snopek <dsnopek@gmail.com>
Date:   Fri Jul 12 22:00:06 2013 +0100

     * Add getCollectionManager() so that the whole process can share the same ThreadingCollectionManager object.

     * Got the RestApp actually working!

commit 00997bab600b13d4b430ed2c2839b1d2232f55ed
Author: David Snopek <dsnopek@gmail.com>
Date:   Fri Jul 12 21:04:58 2013 +0100

    Got the sync_app working again (more or less)

commit 459c69566bb92d2c0195a384e067d98c059bdea7
Author: David Snopek <dsnopek@gmail.com>
Date:   Fri Jul 12 19:47:40 2013 +0100

    Started implementing test for the RESTful callbacks that PrepECN is going to need.

commit 7ffbac793f9bf45ab9056c1de475422b8742e107
Author: David Snopek <dsnopek@gmail.com>
Date:   Fri Jul 12 17:19:06 2013 +0100

    Started work on a WSGI app for RESTful access to Anki based on Bibliobird code here:

      https://raw.github.com/dsnopek/bbcom/master/AnkiServer/AnkiServer/deck.py

commit 8820411388ce0c2b7b14769c614c22c675d2dbdd
Author: David Snopek <dsnopek@gmail.com>
Date:   Fri Jul 12 15:03:56 2013 +0100

     * Seperated the collection and threading code.

     * Implemented a new interface to interact with the collections, which will hopefully be more transparent and testable.
2013-07-12 22:08:16 +01:00

186 lines
6.1 KiB
Python

from __future__ import absolute_import
import anki
import anki.storage
from AnkiServer.collection import CollectionWrapper, CollectionManager
from threading import Thread
from Queue import Queue
import time, logging
__all__ = ['ThreadingCollectionWrapper', 'ThreadingCollectionManager']
class ThreadingCollectionWrapper(object):
"""Provides the same interface as CollectionWrapper, but it creates a new Thread to
interact with the collection."""
def __init__(self, path, setup_new_collection=None):
self.path = path
self.wrapper = CollectionWrapper(path, setup_new_collection)
self._queue = Queue()
self._thread = None
self._running = False
self.last_timestamp = time.time()
self.start()
@property
def running(self):
return self._running
def qempty(self):
return self._queue.empty()
def current(self):
from threading import current_thread
return current_thread() == self._thread
def execute(self, func, args=[], kw={}, waitForReturn=True):
""" Executes a given function on this thread with the *args and **kw.
If 'waitForReturn' is True, then it will block until the function has
executed and return its return value. If False, it will return None
immediately and the function will be executed sometime later.
"""
if waitForReturn:
return_queue = Queue()
else:
return_queue = None
self._queue.put((func, args, kw, return_queue))
if return_queue is not None:
ret = return_queue.get(True)
if isinstance(ret, Exception):
raise ret
return ret
def _run(self):
logging.info('CollectionThread[%s]: Starting...', self.path)
try:
while self._running:
func, args, kw, return_queue = self._queue.get(True)
if hasattr(func, 'func_name'):
func_name = func.func_name
else:
func_name = func.__class__.__name__
logging.info('CollectionThread[%s]: Running %s(*%s, **%s)', self.path, func_name, repr(args), repr(kw))
self.last_timestamp = time.time()
try:
ret = self.wrapper.execute(func, args, kw, return_queue)
except Exception, e:
logging.error('CollectionThread[%s]: Unable to %s(*%s, **%s): %s',
self.path, func.func_name, repr(args), repr(kw), e, exc_info=True)
# we return the Exception which will be raise'd on the other end
ret = e
if return_queue is not None:
return_queue.put(ret)
except Exception, e:
logging.error('CollectionThread[%s]: Thread crashed! Exception: %s', self.path, e, exc_info=True)
finally:
self.wrapper.close()
# clean out old thread object
self._thread = None
# in case we got here via an exception
self._running = False
logging.info('CollectionThread[%s]: Stopped!', self.path)
def start(self):
if not self._running:
self._running = True
assert self._thread is None
self._thread = Thread(target=self._run)
self._thread.start()
def stop(self):
def _stop(col):
self._running = False
self.execute(_stop, waitForReturn=False)
def stop_and_wait(self):
""" Tell the thread to stop and wait for it to happen. """
self.stop()
if self._thread is not None:
self._thread.join()
# Mimic the CollectionWrapper interface
def open(self):
pass
def close(self):
self.stop()
def opened(self):
return self.wrapper.opened()
class ThreadingCollectionManager(CollectionManager):
"""Manages a set of ThreadingCollectionWrapper objects."""
collection_wrapper = ThreadingCollectionWrapper
def __init__(self):
super(ThreadingCollectionManager, self).__init__()
self.monitor_frequency = 15
self.monitor_inactivity = 90
monitor = Thread(target=self._monitor_run)
monitor.daemon = True
monitor.start()
self._monitor_thread = monitor
# TODO: we should raise some error if a collection is started on a manager that has already been shutdown!
# or maybe we could support being restarted?
# TODO: it would be awesome to have a safe way to stop inactive threads completely!
# TODO: we need a way to inform other code that the collection has been closed
def _monitor_run(self):
""" Monitors threads for inactivity and closes the collection on them
(leaves the thread itself running -- hopefully waiting peacefully with only a
small memory footprint!) """
while True:
cur = time.time()
for path, thread in self.collections.items():
if thread.running and thread.wrapper.opened() and thread.qempty() and cur - thread.last_timestamp >= self.monitor_inactivity:
logging.info('Monitor is closing collection on inactive CollectionThread[%s]', thread.path)
def closeCollection(wrapper):
wrapper.close()
thread.execute(closeCollection, waitForReturn=False)
time.sleep(self.monitor_frequency)
def shutdown(self):
# TODO: stop the monitor thread!
# This will stop all the collection threads
super(ThreadingCollectionManager, self).shutdown()
#
# For working with the global ThreadingCollectionManager:
#
collection_manager = None
def getCollectionManager():
"""Return the global ThreadingCollectionManager for this process."""
global collection_manager
if collection_manager is None:
collection_manager = ThreadingCollectionManager()
return collection_manager
def shutdown():
"""If the global ThreadingCollectionManager exists, shut it down."""
global collection_manager
if collection_manager is not None:
collection_manager.shutdown()
collection_manager = None