Source code for flexget.plugins.input.gazelle

from datetime import datetime

from loguru import logger
from sqlalchemy import Column, DateTime, String, Unicode

from flexget import db_schema, plugin
from flexget.components.sites.utils import normalize_unicode
from flexget.config_schema import one_or_more
from flexget.entry import Entry
from flexget.event import event
from flexget.manager import Session
from flexget.plugin import PluginError
from flexget.utils.database import json_synonym
from flexget.utils.requests import TokenBucketLimiter
from flexget.utils.tools import parse_filesize

DETECT_2FA = 'Authenticator Code', 'TOTP code'
logger = logger.bind(name='gazelle')
Base = db_schema.versioned_base('gazelle_session', 0)


[docs] class GazelleSession(Base): __tablename__ = 'gazelle_session' username = Column(Unicode, primary_key=True) base_url = Column(String, primary_key=True) authkey = Column(String) passkey = Column(String) _cookies = Column('cookie', Unicode) cookies = json_synonym('_cookies') expires = Column(DateTime)
[docs] class InputGazelle: """A generic plugin that searches a Gazelle-based website. Limited functionality but should work for almost all of them. """ def __init__(self): """Set up a plugin that only has the ability to do about basic search.""" self.base_url = None # Aliases for config -> api params # Extended in subclasses self.aliases = {'search': 'searchstr'} # API parameters # None means a raw value entry (no validation other than schema) # A dict means an enum with a config -> api mapping # A list is an enum with no mapping # Extended in subclasses self.params = {'searchstr': None} @property def schema(self): """The schema of the plugin. Subclasses should extend this to implement more params """ schema = { 'type': 'object', 'properties': { 'base_url': {'type': 'string'}, 'username': {'type': 'string'}, 'password': {'type': 'string'}, 'max_pages': {'type': 'integer'}, 'search': {'type': 'string'}, }, 'required': ['username', 'password'], 'additionalProperties': False, } # base_url is only required if the subclass doesn't set it if not self.base_url: schema['required'].append('base_url') return schema
[docs] def _key(self, key): """Get the API key name from the entered key.""" if key in self.aliases: return self.aliases[key] return key
[docs] def _opts(self, key): """Get the options for the specified key.""" return self.params[self._key(key)]
[docs] def _getval(self, key, val): """Get the value for the specified key based on a config option.""" opts = self._opts(key) if isinstance(opts, dict): # Translate the input value to the API value # The str cast converts bools to 'True'/'False' for use as keys # This allows for options that have True/False/Other values return opts[str(val)] if isinstance(val, list): # Fix yaml parser making a list out of a string return ','.join(val) return val
[docs] def params_from_config(self, config): """Filter params and map config values -> api values.""" ret = {} for k, v in config.items(): key = self._key(k) if key in self.params: ret[key] = self._getval(k, v) return ret
[docs] def setup(self, task, config): """Set up a session and log in.""" self._session = task.requests base_url = config.get('base_url', '').rstrip('/') if base_url: if self.base_url and self.base_url != base_url: logger.warning( 'Using plugin designed for {} on {} - things may break', self.base_url, base_url, ) self.base_url = base_url if not self.base_url: raise PluginError("No 'base_url' configured") # Any more than 5 pages is probably way too loose of a search self.max_pages = config.get('max_pages', 5) # The consistent request limiting rule seems to be: # "Refrain from making more than five (5) requests every ten (10) seconds" self._session.add_domain_limiter(TokenBucketLimiter(self.base_url, 2, '2 seconds')) self.username = config['username'] self.password = config['password'] # Login self.authenticate() # Logged in successfully, it's ok if nothing matches now task.no_entries_ok = True
[docs] def resume_session(self): """Resume an existing session from the database. Return True on successful recovery, False otherwise """ logger.debug('Attempting to find an existing session in the DB') with Session() as session: db_session = ( session .query(GazelleSession) .filter( GazelleSession.base_url == self.base_url, GazelleSession.username == self.username, ) .one_or_none() ) if db_session and db_session.expires and db_session.expires >= datetime.utcnow(): # Found a valid session in the DB - use it self._session.cookies.update(db_session.cookies) self.authkey = db_session.authkey self.passkey = db_session.passkey return True return False
[docs] def save_current_session(self): """Store the current session in the database so it can be resumed later.""" logger.debug('Storing session info in the DB') with Session() as session: expires = None for c in self._session.cookies: if c.name == 'session': expires = datetime.utcfromtimestamp(c.expires) db_session = GazelleSession( username=self.username, base_url=self.base_url, cookies=dict(self._session.cookies), expires=expires, authkey=self.authkey, passkey=self.passkey, ) session.merge(db_session)
[docs] def authenticate(self, force=False): """Log in and store auth data from the server. Adapted from https://github.com/isaaczafuta/whatapi """ # clean slate before creating/restoring cookies self._session.cookies.clear() if not force and self.resume_session(): logger.info('Logged into {} using cached session', self.base_url) return # Forcing a re-login or no session in DB - log in using provided creds url = f'{self.base_url}/login.php' data = {'username': self.username, 'password': self.password, 'keeplogged': 1} r = self._session.post(url, data=data, allow_redirects=False, raise_status=True) if not r.is_redirect or r.next.url != f'{self.base_url}/index.php': msg = f'Failed to log into {self.base_url}' for otp_text in DETECT_2FA: # TODO: Find a better signal that 2FA is enabled if otp_text in r.text: msg += ' - Accounts using 2FA are currently not supported' break raise PluginError(msg) account_info = self.request(no_login=True, action='index') self.authkey = account_info['authkey'] self.passkey = account_info['passkey'] logger.info('Logged in to {}', self.base_url) # Store the session so we can resume it later self.save_current_session()
[docs] def request(self, no_login=False, **params): """Make an AJAX request to the API. If `no_login` is True, logging in will not be attempted if the request is redirected to the login page. Adapted from https://github.com/isaaczafuta/whatapi """ if 'action' not in params: raise ValueError("An 'action' is required when making a request") ajaxpage = f'{self.base_url}/ajax.php' r = self._session.get(ajaxpage, params=params, allow_redirects=False, raise_status=True) if not no_login and r.is_redirect and r.next.url == f'{self.base_url}/login.php': logger.warning('Redirected to login page, reauthenticating and trying again') self.authenticate(force=True) return self.request(no_login=True, **params) if r.status_code != 200: raise PluginError(f'{self.base_url} returned a non-200 status code') try: json_response = r.json() if json_response['status'] != 'success': # Try to deal with errors returned by the API error = json_response.get('error', json_response.get('status')) if not error or error == 'failure': error = json_response.get('response', str(json_response)) raise PluginError(f"{self.base_url} gave a failure response of '{error}'") return json_response['response'] except (ValueError, TypeError, KeyError): raise PluginError(f'{self.base_url} returned an invalid response')
[docs] def search_results(self, params): """Yield search results.""" page = 1 pages = None while page <= self.max_pages: if pages and page >= pages: break logger.debug('Attempting to get page {} of search results', page) result = self.request(action='browse', page=page, **params) if not result['results']: break yield from result['results'] pages = result.get('pages', pages) page += 1 if page > self.max_pages: logger.warning('Stopped after {} pages (out of {} total pages)', self.max_pages, pages)
[docs] def get_entries(self, search_results): """Yield Entry objects from search results.""" for result in search_results: # Get basic information on the release info = {k: result[k] for k in ('groupId', 'groupName')} # Releases can have multiple download options for tor in result['torrents']: temp = info.copy() temp['torrentId'] = tor['torrentId'] yield Entry( title='{groupName} ({groupId} - {torrentId}).torrent'.format(**temp), url='{}/torrents.php?action=download&id={}&authkey={}&torrent_pass={}'.format( self.base_url, temp['torrentId'], self.authkey, self.passkey ), torrent_seeds=tor['seeders'], torrent_leeches=tor['leechers'], # Size is returned in bytes content_size=parse_filesize(str(tor['size']) + 'b'), )
@plugin.internet(logger) def search(self, task, entry, config): """Search interface.""" self.setup(task, config) entries = set() params = self.params_from_config(config) for search_string in entry.get('search_strings', [entry['title']]): query = normalize_unicode(search_string) params[self._key('search')] = query entries.update(self.get_entries(self.search_results(params))) return entries @plugin.internet(logger) def on_task_input(self, task, config): """Task input interface.""" self.setup(task, config) params = self.params_from_config(config) return self.get_entries(self.search_results(params))
[docs] class InputGazelleMusic(InputGazelle): """A plugin that searches a Gazelle-based music website. Based on https://github.com/WhatCD/Gazelle since it's the starting point of all Gazelle-based music sites. """ def __init__(self): """Set up the majority of parameters that these sites support.""" super().__init__() self.aliases.update({ 'artist': 'artistname', 'album': 'groupname', 'leech_type': 'freetorrent', 'release_type': 'releasetype', 'tags': 'taglist', 'tag_type': 'tags_type', 'log': 'haslog', }) self.params.update({ 'taglist': None, 'artistname': None, 'groupname': None, 'year': None, 'tags_type': {'any': 0, 'all': 1}, 'encoding': [ '192', 'APS (VBR)', 'V2 (VBR)', 'V1 (VBR)', '256', 'APX (VBR)', 'V0 (VBR)', 'q8.x (VBR)', '320', 'Lossless', '24bit Lossless', 'Other', ], 'format': ['MP3', 'FLAC', 'Ogg Vorbis', 'AAC', 'AC3', 'DTS'], 'media': ['CD', 'DVD', 'Vinyl', 'Soundboard', 'SACD', 'DAT', 'Cassette', 'WEB'], 'releasetype': { 'album': 1, 'soundtrack': 3, 'EP': 5, 'anthology': 6, 'compilation': 7, 'single': 9, 'live album': 11, 'remix': 13, 'bootleg': 14, 'interview': 15, 'mixtape': 16, 'unknown': 21, }, 'haslog': {'False': 0, 'True': 1, '100%': 100, '<100%': -1}, 'freetorrent': {'freeleech': 1, 'neutral': 2, 'either': 3, 'normal': 0}, 'hascue': {'False': 0, 'True': 1}, 'scene': {'False': 0, 'True': 1}, 'vanityhouse': {'False': 0, 'True': 1}, }) @property def schema(self): """The schema of the plugin. Extends the super's schema """ schema = super().schema schema['properties'].update({ 'artist': {'type': 'string'}, 'album': {'type': 'string'}, 'year': {'type': ['string', 'integer']}, 'tags': one_or_more({'type': 'string'}), 'tag_type': {'type': 'string', 'enum': list(self._opts('tag_type').keys())}, 'encoding': {'type': 'string', 'enum': self._opts('encoding')}, 'format': {'type': 'string', 'enum': self._opts('format')}, 'media': {'type': 'string', 'enum': self._opts('media')}, 'release_type': { 'type': 'string', 'enum': list(self._opts('release_type').keys()), }, 'log': { 'oneOf': [ {'type': 'string', 'enum': list(self._opts('log').keys())}, {'type': 'boolean'}, ] }, 'leech_type': {'type': 'string', 'enum': list(self._opts('leech_type').keys())}, 'hascue': {'type': 'boolean'}, 'scene': {'type': 'boolean'}, 'vanityhouse': {'type': 'boolean'}, }) return schema
[docs] def get_entries(self, search_results): """Yield Entry objects from search results.""" for result in search_results: # Get basic information on the release info = {k: result[k] for k in ('artist', 'groupName', 'groupYear')} # Releases can have multiple download options for tor in result['torrents']: temp = info.copy() temp.update({k: tor[k] for k in ('media', 'encoding', 'format', 'torrentId')}) yield Entry( title='{artist} - {groupName} - {groupYear} ' '({media} - {format} - {encoding})-{torrentId}.torrent'.format(**temp), url='{}/torrents.php?action=download&id={}&authkey={}&torrent_pass={}'.format( self.base_url, temp['torrentId'], self.authkey, self.passkey ), torrent_seeds=tor['seeders'], torrent_leeches=tor['leechers'], # Size is returned in bytes content_size=parse_filesize(str(tor['size']) + 'b'), )
[docs] class InputRedacted(InputGazelleMusic): """A plugin that searches RED.""" def __init__(self): """Set up custom base_url and parameters.""" super().__init__() self.base_url = 'https://redacted.ch' self.params['encoding'].remove('q8.x (VBR)') self.params['format'].remove('Ogg Vorbis') self.params['media'].append('Blu-ray') self.params['releasetype'].update({'demo': 17, 'concert recording': 18, 'dj mix': 19})
[docs] class InputNotWhat(InputGazelleMusic): """A plugin that searches NWCD.""" def __init__(self): """Set up custom base_url and parameters.""" super().__init__() self.base_url = 'https://notwhat.cd' self.params['media'].extend(['Blu-ray', 'Unknown']) self.params['releasetype'].update({'demo': 22, 'dj mix': 23, 'concert recording': 24}) self.params['haslog'].update({ 'gold': 102, 'silver': 101, 'gold/silver': 100, 'lineage': -5, 'unscored': -1, 'missing lineage': -6, 'missing dr score': -7, 'missing sample rate': -8, 'missing description': -9, })
[docs] @event('plugin.register') def register_plugin(): plugin.register(InputGazelle, 'gazelle', interfaces=['task', 'search'], api_ver=2) plugin.register(InputGazelleMusic, 'gazellemusic', interfaces=['task', 'search'], api_ver=2) plugin.register(InputRedacted, 'redacted', interfaces=['task', 'search'], api_ver=2) plugin.register(InputNotWhat, 'notwhatcd', interfaces=['task', 'search'], api_ver=2)