Source code for flexget.components.trakt.trakt_list

from collections.abc import MutableSet

from loguru import logger

from flexget import plugin
from flexget.entry import Entry
from flexget.event import event
from flexget.utils import json
from flexget.utils.cached_input import cached
from flexget.utils.requests import RequestException, TimedLimiter
from flexget.utils.tools import split_title_year

from . import db

logger = logger.bind(name='trakt_list')
IMMUTABLE_LISTS = ['trending', 'popular']


[docs] def generate_show_title(item): show_info = item['show'] if show_info['year'] and not item['strip_dates']: return '{} ({})'.format(show_info['title'], show_info['year']) return show_info['title']
[docs] def generate_episode_title(item): show_info = item['show'] episode_info = item['episode'] if show_info['year'] and not item['strip_dates']: return ( '{} ({}) S{:02d}E{:02d} {}'.format( show_info['title'], show_info['year'], episode_info['season'], episode_info['number'], episode_info['title'] or '', ) ).strip() return ( '{} S{:02d}E{:02d} {}'.format( show_info['title'], episode_info['season'], episode_info['number'], episode_info['title'] or '', ) ).strip()
field_maps = { 'movie': { 'title': lambda i: ( '{} ({})'.format(i['movie']['title'], i['movie']['year']) if i['movie']['year'] and not i['strip_dates'] else '{}'.format(i['movie']['title']) ), 'movie_name': 'movie.title', 'movie_year': 'movie.year', 'trakt_movie_name': 'movie.title', 'trakt_movie_year': 'movie.year', 'imdb_id': 'movie.ids.imdb', 'tmdb_id': 'movie.ids.tmdb', 'trakt_movie_id': 'movie.ids.trakt', 'trakt_movie_slug': 'movie.ids.slug', }, 'show': { 'title': generate_show_title, 'series_name': generate_show_title, 'trakt_series_name': 'show.title', 'trakt_series_year': 'show.year', 'imdb_id': 'show.ids.imdb', 'tvdb_id': 'show.ids.tvdb', 'tvrage_id': 'show.ids.tvrage', 'tmdb_id': 'show.ids.tmdb', 'trakt_show_id': 'show.ids.trakt', 'trakt_show_slug': 'show.ids.slug', }, 'episode': { 'title': generate_episode_title, 'series_name': generate_show_title, 'trakt_series_name': 'show.title', 'trakt_series_year': 'show.year', 'series_season': 'episode.season', 'series_episode': 'episode.number', 'series_id': lambda i: 'S{:02d}E{:02d}'.format( i['episode']['season'], i['episode']['number'] ), 'imdb_id': 'show.ids.imdb', 'tvdb_id': 'show.ids.tvdb', 'tvrage_id': 'show.ids.tvrage', 'trakt_episode_id': 'episode.ids.trakt', 'trakt_show_id': 'show.ids.trakt', 'trakt_show_slug': 'show.ids.slug', 'trakt_ep_name': 'episode.title', }, }
[docs] class TraktSet(MutableSet): @property def immutable(self): if self.config['list'] in IMMUTABLE_LISTS: return '{} list is not modifiable'.format(self.config['list']) return None schema = { 'type': 'object', 'properties': { 'username': {'type': 'string'}, 'account': {'type': 'string'}, 'list': {'type': 'string'}, 'type': { 'type': 'string', 'enum': ['shows', 'seasons', 'episodes', 'movies', 'auto'], 'default': 'auto', }, 'strip_dates': {'type': 'boolean', 'default': False}, 'language': {'type': 'string', 'minLength': 2, 'maxLength': 2}, }, 'required': ['list'], 'anyOf': [ {'required': ['username']}, {'required': ['account']}, # The 'trending' and 'popular' lists don't require an username {'properties': {'list': {'enum': ['trending', 'popular']}}}, ], 'error_anyOf': 'At least one of `username` or `account` options are needed.', 'additionalProperties': False, } def __init__(self, config): self.config = config if self.config.get('account') and not self.config.get('username'): self.config['username'] = 'me' self.session = db.get_session(self.config.get('account')) # Lists may not have modified results if modified then accessed in quick succession. self.session.add_domain_limiter(TimedLimiter('trakt.tv', '2 seconds')) self._cached_items = None def __iter__(self): return iter(self.items) def __len__(self): return len(self.items)
[docs] def add(self, entry): self.submit([entry])
def __ior__(self, entries): # Optimization to submit multiple entries at same time self.submit(entries)
[docs] def discard(self, entry): self.submit([entry], remove=True)
def __isub__(self, entries): # Optimization to submit multiple entries at same time self.submit(entries, remove=True)
[docs] def _find_entry(self, entry): for item in self.items: if self.config['type'] in ['episodes', 'auto'] and self.episode_match(entry, item): return item if self.config['type'] in ['seasons', 'auto'] and self.season_match(entry, item): return item if self.config['type'] in ['shows', 'auto'] and self.show_match(entry, item): return item if self.config['type'] in ['movies', 'auto'] and self.movie_match(entry, item): return item return None
def __contains__(self, entry): return self._find_entry(entry) is not None
[docs] def clear(self): if self.items: for item in self.items: self.discard(item) self._cached_items = None
[docs] def get(self, entry): return self._find_entry(entry)
# -- Public interface ends here -- #
[docs] def get_items(self): """Iterate over etrieved itesms from the trakt api.""" if ( self.config['list'] in ['collection', 'watched', 'trending', 'popular'] and self.config['type'] == 'auto' ): raise plugin.PluginError( '`type` cannot be `auto` for {} list.'.format(self.config['list']) ) limit_per_page = 1000 endpoint = self.get_list_endpoint() list_type = (self.config['type']).rstrip('s') logger.verbose('Retrieving `{}` list `{}`', self.config['type'], self.config['list']) try: page = 1 collecting_finished = False while not collecting_finished: result = self.session.get( db.get_api_url(endpoint), params={'limit': limit_per_page, 'page': page} ) page = int(result.headers.get('X-Pagination-Page', 1)) number_of_pages = int(result.headers.get('X-Pagination-Page-Count', 1)) if page == 2: # If there is more than one page (more than 1000 items) warn user they may want to limit logger.verbose( 'There are a large number of items in trakt `{}` list. You may want to enable `limit` ' 'plugin to reduce the amount of entries in this task.', self.config['list'], ) collecting_finished = page >= number_of_pages page += 1 try: trakt_items = result.json() except ValueError: logger.debug('Could not decode json from response: {}', result.text) raise plugin.PluginError('Error getting list from trakt.') if not trakt_items: logger.warning( 'No data returned from trakt for {} list {}.', self.config['type'], self.config['list'], ) return for item in trakt_items: if self.config['type'] == 'auto': list_type = item['type'] if self.config['list'] == 'popular': item = {list_type: item} # Collection and watched lists don't return 'type' along with the items (right now) if 'type' in item and item['type'] != list_type: logger.debug( 'Skipping {} because it is not a {}', item[item['type']].get('title', 'unknown'), list_type, ) continue if list_type not in item: # Issue 2445 logger.warning('Item type can not be determined, skipping item {}', item) continue if list_type != 'episode' and not item[list_type]['title']: # Skip shows/movies with no title logger.warning( 'Item in trakt list does not appear to have a title, skipping.' ) continue entry = Entry() if list_type == 'episode': entry['url'] = 'https://trakt.tv/shows/{}/seasons/{}/episodes/{}'.format( item['show']['ids']['slug'], item['episode']['season'], item['episode']['number'], ) else: entry['url'] = 'https://trakt.tv/{}s/{}'.format( list_type, item[list_type]['ids'].get('slug'), ) # Pass the strip dates option in so it can be used in the update maps item['strip_dates'] = self.config.get('strip_dates') entry.update_using_map(field_maps[list_type], item) # get movie name translation language = self.config.get('language') if list_type == 'movie' and language: endpoint = ['movies', entry['trakt_movie_id'], 'translations', language] try: result = self.session.get(db.get_api_url(endpoint)) try: translation = result.json() except ValueError: raise plugin.PluginError( f'Error decoding movie translation from trakt: {result.text}.' ) except RequestException as e: raise plugin.PluginError( f'Could not retrieve movie translation from trakt: {e!s}' ) if not translation or not translation[0]['title']: logger.warning( 'No translation data returned from trakt for movie {}.', entry['title'], ) else: logger.verbose( 'Found `{}` translation for movie `{}`: {}', language, entry['movie_name'], translation[0]['title'], ) entry['title'] = translation[0]['title'] if entry.get('movie_year') and not self.config.get('strip_dates'): entry['title'] += ' ({})'.format(entry['movie_year']) entry['movie_name'] = translation[0]['title'] if entry.isvalid(): yield entry else: logger.debug('Invalid entry created? {}', entry) except RequestException as e: raise plugin.PluginError(f'Could not retrieve list from trakt ({e})')
@property def items(self): if self._cached_items is None: self._cached_items = list(self.get_items()) return self._cached_items
[docs] def invalidate_cache(self): self._cached_items = None
[docs] def get_list_endpoint(self, remove=False, submit=False): if ( not submit and self.config['list'] == 'collection' and self.config['type'] == 'episodes' ): # API restriction as they don't have an endpoint for collected episodes yet if self.config['list'] == 'collection': raise plugin.PluginError('`type` cannot be `episodes` for collection list.') if self.config.get('account'): return ('sync', 'history', 'episodes') raise plugin.PluginError( 'A trakt `account` needs to be configured to get the episode history.' ) if self.config['list'] in ['collection', 'watchlist', 'watched', 'ratings']: if self.config.get('account'): endpoint = ( 'sync', 'history' if self.config['list'] == 'watched' else self.config['list'], ) if not submit: endpoint += (self.config['type'],) # Watched shows has a different endpoint for submitting and getting if self.config['list'] == 'watched' and self.config['type'] == 'shows': endpoint = ('sync', 'watched', 'shows') else: endpoint = ( 'users', self.config['username'], self.config['list'], self.config['type'], ) elif self.config['list'] in ['trending', 'popular']: endpoint = (self.config['type'], self.config['list']) else: endpoint = ( 'users', self.config['username'], 'lists', db.make_list_slug(self.config['list']), 'items', ) if remove: endpoint += ('remove',) return endpoint
[docs] def show_match(self, entry1, entry2): return any( entry1.get(ident) is not None and entry1[ident] == entry2.get(ident) for ident in [ 'series_name', 'trakt_show_id', 'tmdb_id', 'tvdb_id', 'imdb_id', 'tvrage_id', ] )
[docs] def season_match(self, entry1, entry2): return ( self.show_match(entry1, entry2) and entry1.get('series_season') is not None and entry1['series_season'] == entry2.get('series_season') )
[docs] def episode_match(self, entry1, entry2): return ( self.season_match(entry1, entry2) and entry1.get('series_episode') is not None and entry1['series_episode'] == entry2.get('series_episode') )
[docs] def movie_match(self, entry1, entry2): if any( entry1.get(id) is not None and entry1[id] == entry2[id] for id in ['trakt_movie_id', 'imdb_id', 'tmdb_id'] ): return True return bool( entry1.get('movie_name') and (entry1.get('movie_name'), entry1.get('movie_year')) == (entry2.get('movie_name'), entry2.get('movie_year')) )
[docs] def submit(self, entries, remove=False): """Submit movies or episodes to trakt api.""" found = {} for entry in entries: if self.config['type'] in ['auto', 'shows', 'seasons', 'episodes'] and entry.get( 'series_name' ): show_name, show_year = split_title_year(entry['series_name']) show = {'title': show_name, 'ids': db.get_entry_ids(entry)} if show_year: show['year'] = show_year if ( self.config['type'] in ['auto', 'seasons', 'episodes'] and entry.get('series_season') is not None ): season = {'number': entry['series_season']} if ( self.config['type'] in ['auto', 'episodes'] and entry.get('series_episode') is not None ): season['episodes'] = [{'number': entry['series_episode']}] show['seasons'] = [season] if self.config['type'] in ['seasons', 'episodes'] and 'seasons' not in show: logger.debug('Not submitting `{}`, no season found.', entry['title']) continue if self.config['type'] == 'episodes' and 'episodes' not in show['seasons'][0]: logger.debug('Not submitting `{}`, no episode number found.', entry['title']) continue found.setdefault('shows', []).append(show) elif self.config['type'] in ['auto', 'movies']: movie = {'ids': db.get_entry_ids(entry)} if not movie['ids']: if entry.get('movie_name') is not None: movie['title'] = entry.get('movie_name') or entry.get('imdb_name') movie['year'] = entry.get('movie_year') or entry.get('imdb_year') else: logger.debug( 'Not submitting `{}`, no movie name or id found.', entry['title'] ) continue found.setdefault('movies', []).append(movie) if not (found.get('shows') or found.get('movies')): logger.debug('Nothing to submit to trakt.') return url = db.get_api_url(self.get_list_endpoint(remove, submit=True)) logger.debug('Submitting data to trakt.tv ({}): {}', url, found) try: result = self.session.post(url, data=json.dumps(found), raise_status=False) except RequestException as e: logger.error('Error submitting data to trakt.tv: {}', e) return if 200 <= result.status_code < 300: action = 'deleted' if remove else 'added' res = result.json() # Default to 0 for all categories, even if trakt response didn't include them for cat in ('movies', 'shows', 'episodes', 'seasons'): res[action].setdefault(cat, 0) logger.info( 'Successfully {0} to/from list {1}: {movies} movie(s), {shows} show(s), {episodes} episode(s), ' '{seasons} season(s).', action, self.config['list'], **res[action], ) for media_type, request in res['not_found'].items(): if request: logger.debug('not found {}: {}', media_type, request) # TODO: Improve messages about existing and unknown results # Mark the results expired if we added or removed anything if sum(res[action].values()): self.invalidate_cache() elif result.status_code == 404: logger.error('List does not appear to exist on trakt: {}', self.config['list']) elif result.status_code == 401: logger.error('Authentication error: have you authorized Flexget on Trakt.tv?') logger.debug('trakt response: {}', result.text) else: logger.error('Unknown error submitting data to trakt.tv: {}', result.text)
@property def online(self): """Set the online status of the plugin. Online plugin should be treated differently in certain situations, like test mode """ return True
[docs] class TraktList: schema = TraktSet.schema
[docs] def get_list(self, config): return TraktSet(config)
# TODO: we should somehow invalidate this cache when the list is modified @cached('trakt_list', persist='2 hours') def on_task_input(self, task, config): # We use the generator here rather than the cached list in case limit plugin is used. return TraktSet(config).get_items()
[docs] @event('plugin.register') def register_plugin(): plugin.register(TraktList, 'trakt_list', api_ver=2, interfaces=['task', 'list'])