# -*- coding:utf-8 -*- # # Copyright (C) 2018 sthoo # # Support: Report an issue at https://github.com/sth2018/FastWordQuery/issues # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # any later version; http://www.gnu.org/copyleft/gpl.html. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import inspect import os import random # use ntpath module to ensure the windows-style (e.g. '\\LDOCE.css') # path can be processed on Unix platform. # However, anki version on mac platforms doesn't including this package? # import ntpath import re import shutil import sqlite3 import urllib import zlib from collections import defaultdict from functools import wraps from hashlib import md5, sha1 import requests from bs4 import BeautifulSoup from aqt import mw from aqt.qt import QMutex, QThread from ..context import config from ..lang import _cl from ..libs import MdxBuilder, StardictBuilder from ..utils import MapDict, wrap_css try: import urllib2 except Exception: import urllib.request as urllib2 try: from cookielib import CookieJar except Exception: from http.cookiejar import CookieJar try: import threading as _threading except ImportError: import dummy_threading as _threading __all__ = [ 'register', 'export', 'copy_static_file', 'with_styles', 'parse_html', 'service_wrap', 'get_hex_name', 'Service', 'WebService', 'LocalService', 'MdxService', 'StardictService', 'QueryResult' ] _default_ua = 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 ' \ '(KHTML, like Gecko) Chrome/70.0.3538.67 Safari/537.36' def get_hex_name(prefix, val, suffix): ''' get sha1 hax name ''' hex_digest = sha1(val.encode('utf-8')).hexdigest().lower() name = '.'.join(['-'.join([prefix, hex_digest[:8], hex_digest[8:16], hex_digest[16:24], hex_digest[24:32], hex_digest[32:], ]), suffix, ]) return name def _is_method_or_func(object): return inspect.isfunction(object) or inspect.ismethod(object) def register(labels): """ register the dict service with a labels, which will be shown in the dicts list. """ def _deco(cls): cls.__register_label__ = _cl(labels) methods = inspect.getmembers(cls, predicate=_is_method_or_func) exports = [] for method in methods: attrs = getattr(method[1], '__export_attrs__', None) if attrs and attrs[1] == -1: exports.append(( getattr(method[1], '__def_index__', 0), method[1] )) exports = sorted(exports) for index, method in enumerate(exports): attrs = getattr(method[1], '__export_attrs__', None) attrs[1] = index return cls return _deco def export(labels): """ export dict field function with a labels, which will be shown in the fields list. """ def _with(fld_func): @wraps(fld_func) def _deco(self, *args, **kwargs): res = fld_func(self, *args, **kwargs) return QueryResult(result=res) if not isinstance(res, QueryResult) else res _deco.__export_attrs__ = [_cl(labels), -1] _deco.__def_index__ = export.EXPORT_INDEX export.EXPORT_INDEX += 1 return _deco return _with export.EXPORT_INDEX = 0 def copy_static_file(filename, new_filename=None, static_dir='static'): """ copy file in static directory to media folder """ abspath = os.path.join(os.path.dirname(os.path.realpath(__file__)), static_dir, filename) shutil.copy(abspath, new_filename if new_filename else filename) def with_styles(**styles): """ cssfile: specify the css file in static folder css: css strings js: js strings jsfile: specify the js file in static folder """ def _with(fld_func): @wraps(fld_func) def _deco(cls, *args, **kwargs): res = fld_func(cls, *args, **kwargs) cssfile, css, jsfile, js, need_wrap_css, class_wrapper =\ styles.get('cssfile', None),\ styles.get('css', None),\ styles.get('jsfile', None),\ styles.get('js', None),\ styles.get('need_wrap_css', False),\ styles.get('wrap_class', '') def wrap(html, css_obj, is_file=True): # wrap css and html if need_wrap_css and class_wrapper: html = u'
{}
'.format( class_wrapper, html) return html, wrap_css(css_obj, is_file=is_file, class_wrapper=class_wrapper)[0] return html, css_obj if cssfile: new_cssfile = cssfile if cssfile.startswith('_') \ else u'_' + cssfile # copy the css file to media folder copy_static_file(cssfile, new_cssfile) # wrap the css file res, new_cssfile = wrap(res, new_cssfile) res = u'{1}'.format( new_cssfile, res) if css: res, css = wrap(res, css, is_file=False) res = u'{1}'.format(css, res) if not isinstance(res, QueryResult): return QueryResult(result=res, jsfile=jsfile, js=js) else: res.set_styles(jsfile=jsfile, js=js) return res return _deco return _with _BS_LOCKS = [_threading.Lock(), _threading.Lock()] # bs4 threading lock, overload protection def parse_html(html): ''' use bs4 lib parse HTML, run only 2 BS at the same time ''' lock = _BS_LOCKS[random.randrange(0, len(_BS_LOCKS) - 1, 1)] lock.acquire() soup = BeautifulSoup(html, 'html.parser') lock.release() return soup def service_wrap(service, *args, **kwargs): """ wrap the service class constructor """ def _service(): return service(*args, **kwargs) return _service class Service(object): ''' Dictionary Service Abstract Class ''' def __init__(self): self.cache = defaultdict(defaultdict) self._unique = self.__class__.__name__ self._exporters = self._get_exporters() self._fields, self._actions = zip(*self._exporters) \ if self._exporters else (None, None) self._word = '' # query interval: default 500ms self.query_interval = 0.5 def cache_this(self, result): self.cache[self.word].update(result) return result def cached(self, key): return (self.word in self.cache) and (key in self.cache[self.word]) def cache_result(self, key): return self.cache[self.word].get(key, u'') def _get_from_api(self): return {} def _get_field(self, key, default=u''): return self.cache_result(key) if self.cached(key) else self._get_from_api().get(key, default) @property def unique(self): return self._unique @unique.setter def unique(self, value): self._unique = value @property def word(self): return self._word @word.setter def word(self, value): value = re.sub(r']*>', '', value) self._word = value @property def quote_word(self): return urllib2.quote(self.word) @property def support(self): return True @property def fields(self): return self._fields @property def actions(self): return self._actions @property def exporters(self): return self._exporters def _get_exporters(self): flds = dict() methods = inspect.getmembers(self, predicate=inspect.ismethod) for method in methods: export_attrs = getattr(method[1], '__export_attrs__', None) if export_attrs: label, index = export_attrs[0], export_attrs[1] flds.update({int(index): (label, method[1])}) sorted_flds = sorted(flds) return [flds[key] for key in sorted_flds] def active(self, fld_ord, word): self.word = word if fld_ord >= 0 and fld_ord < len(self.actions): return self.actions[fld_ord]() return QueryResult.default() @staticmethod def get_anki_label(filename, type_): formats = {'audio': u'[sound:{0}]', 'img': u'', 'video': u''} return formats[type_].format(filename) class WebService(Service): """ Web Dictionary Service """ def __init__(self): super(WebService, self).__init__() self._cookie = CookieJar() self._opener = urllib2.build_opener( urllib2.HTTPCookieProcessor(self._cookie)) self.query_interval = 1.0 @property def title(self): return getattr(self, '__register_label__', self.unique) def get_response(self, url, data=None, headers=None, timeout=10): default_headers = { 'User-Agent': _default_ua } if headers: default_headers.update(headers) request = urllib2.Request(url, headers=default_headers) try: response = self._opener.open(request, data=data, timeout=timeout) data = response.read() if response.info().get('Content-Encoding') == 'gzip': data = zlib.decompress(data, 16 + zlib.MAX_WBITS) return data except Exception: return u'' @classmethod def download(cls, url, filename, timeout=15): import socket socket.setdefaulttimeout(timeout) try: with open(filename, "wb") as f: f.write(requests.get(url, headers={ 'User-Agent': _default_ua }).content) return True except Exception: pass class TinyDownloadError(ValueError): """Raises when a download is too small.""" def net_stream(self, targets, require=None, method='GET', awesome_ua=False, add_padding=False, custom_quoter=None, custom_headers=None): """ Returns the raw payload string from the specified target(s). If multiple targets are specified, their resulting payloads are glued together. Each "target" is a bare URL string or a tuple containing an address and a dict for what to tack onto the query string. Finally, a require dict may be passed to enforce a Content-Type using key 'mime' and/or a minimum payload size using key 'size'. If using multiple targets, these requirements apply to each response. The underlying library here already understands how to search the environment for proxy settings (e.g. HTTP_PROXY), so we do not need to do anything extra for that. If add_padding is True, then some additional null padding will be added onto the stream returned. This is helpful for some web services that sometimes return MP3s that `mplayer` clips early. """ DEFAULT_TIMEOUT = 3 PADDING = '\0' * 2**11 assert method in ['GET', 'POST'], "method must be GET or POST" targets = targets if isinstance(targets, list) else [targets] targets = [ (target, None) if isinstance(target, str) else ( target[0], '&'.join( '='.join([ key, ( custom_quoter[key] if (custom_quoter and key in custom_quoter) else urllib2.quote )( val.encode('utf-8') if isinstance(val, str) else str(val), safe='', ), ]) for key, val in target[1].items() ), ) for target in targets ] require = require or {} payloads = [] for number, (url, params) in enumerate(targets, 1): desc = "web request" if len(targets) == 1 \ else "web request (%d of %d)" % (number, len(targets)) headers = {'User-Agent': _default_ua} if custom_headers: headers.update(custom_headers) response = urllib2.urlopen( urllib2.Request( url=('?'.join([url, params]) if params and method == 'GET' else url), headers=headers, ), data=params if params and method == 'POST' else None, timeout=DEFAULT_TIMEOUT, ) if not response: raise IOError("No response for %s" % desc) if response.getcode() != 200: value_error = ValueError( "Got %d status for %s" % (response.getcode(), desc) ) try: value_error.payload = response.read() response.close() except Exception: pass raise value_error if 'mime' in require and \ require['mime'] != format(response.info(). gettype()).replace('/x-', '/'): value_error = ValueError( "Request got %s Content-Type for %s; wanted %s" % (response.info().gettype(), desc, require['mime']) ) value_error.got_mime = response.info().gettype() value_error.wanted_mime = require['mime'] raise value_error payload = response.read() response.close() if 'size' in require and len(payload) < require['size']: raise self.TinyDownloadError( "Request got %d-byte stream for %s; wanted %d+ bytes" % (len(payload), desc, require['size']) ) payloads.append(payload) if add_padding: payloads.append(PADDING) return b''.join(payloads) def net_download(self, path, *args, **kwargs): """ Downloads a file to the given path from the specified target(s). See net_stream() for information about available options. """ try: payload = self.net_stream(*args, **kwargs) with open(path, 'wb') as f: f.write(payload) f.close() return True except Exception: return False class _DictBuildWorker(QThread): """Local Dictionary Builder""" def __init__(self, func): super(_DictBuildWorker, self).__init__() self._builder = None self._func = func def run(self): try: self._builder = self._func() except Exception: self._builder = None @property def builder(self): return self._builder class LocalService(Service): """ Local Dictionary Service """ def __init__(self, dict_path): super(LocalService, self).__init__() self.dict_path = dict_path self.builder = None self.missed_css = set() # MdxBuilder instances map _mdx_builders = defaultdict(dict) _mutex_builder = QMutex() @staticmethod def _get_builer(key, func=None): LocalService._mutex_builder.lock() key = md5(str(key).encode('utf-8')).hexdigest() if not(func is None): if not LocalService._mdx_builders[key]: worker = _DictBuildWorker(func) worker.start() while not worker.isFinished(): mw.app.processEvents() worker.wait(100) LocalService._mdx_builders[key] = worker.builder LocalService._mutex_builder.unlock() return LocalService._mdx_builders[key] @property def support(self): return os.path.isfile(self.dict_path) @property def title(self): return getattr(self, '__register_label__', u'Unkown') @property def _filename(self): return os.path.splitext(os.path.basename(self.dict_path))[0] def active(self, fld_ord, word): self.missed_css.clear() return super(LocalService, self).active(fld_ord, word) class MdxService(LocalService): """ MDX Local Dictionary Service """ def __init__(self, dict_path): super(MdxService, self).__init__(dict_path) self.media_cache = defaultdict(set) self.cache = defaultdict(str) self.html_cache = defaultdict(str) self.query_interval = 0.01 self.word_links = [] self.styles = [] if MdxService.check(self.dict_path): self.builder = self._get_builer(dict_path, service_wrap(MdxBuilder, dict_path)) @staticmethod def check(dict_path): return os.path.isfile(dict_path) and dict_path.lower().endswith('.mdx') @property def support(self): return self.builder and MdxService.check(self.dict_path) @property def title(self): if config.use_filename or not self.builder._title or self.builder._title.startswith('Title'): return self._filename else: return self.builder._title @export([u'默认', u'Default']) def fld_whole(self): html = self.get_default_html() js = re.findall(r'', html, re.DOTALL) jsfile = re.findall(r'