Source code for flexget.components.imdb.imdb_watchlist

from loguru import logger

from flexget import plugin
from flexget.config_schema import one_or_more
from flexget.entry import Entry
from flexget.event import event
from flexget.utils import json
from flexget.utils.cached_input import cached
from flexget.utils.requests import RequestException
from flexget.utils.soup import get_soup

try:
    from playwright.sync_api import sync_playwright
except ImportError:
    sync_playwright = None

logger = logger.bind(name='imdb_watchlist')
USER_ID_RE = r'^ur\d{7,9}$'
CUSTOM_LIST_RE = r'^ls\d{7,10}$'
USER_LISTS = ['watchlist', 'ratings', 'checkins']
TITLE_TYPE_MAP = {
    'movies': 'movie',
    'short films': 'short',
    'games': 'videoGame',
    'mini series': 'tvMiniSeries',
    'shows': 'tvSeries',
    'episodes': 'tvEpisode',
    'tv movies': 'tvMovie',
    'tv specials': 'tvSpecial',
    'videos': 'video',
}


[docs] class ImdbWatchlist: """Creates an entry for each movie in your imdb list.""" def __init__(self): self._waf_cookies = None schema = { 'type': 'object', 'properties': { 'user_id': { 'type': 'string', 'pattern': USER_ID_RE, 'error_pattern': 'user_id must be in the form urXXXXXXX', }, 'list': { 'type': 'string', 'oneOf': [{'enum': USER_LISTS}, {'pattern': CUSTOM_LIST_RE}], 'error_oneOf': 'list must be either {}, or a custom list name (lsXXXXXXXXX)'.format( ', '.join(USER_LISTS) ), }, 'force_language': {'type': 'string', 'default': 'en-us'}, 'type': { 'oneOf': [ one_or_more( {'type': 'string', 'enum': list(TITLE_TYPE_MAP.keys())}, unique_items=True ), {'type': 'string', 'enum': ['all']}, ] }, 'strip_dates': {'type': 'boolean', 'default': False}, }, 'additionalProperties': False, 'required': ['list'], 'anyOf': [ {'required': ['user_id']}, {'properties': {'list': {'pattern': CUSTOM_LIST_RE}}}, ], 'error_anyOf': 'user_id is required if not using a custom list (lsXXXXXXXXX format)', } default_user_agent = ( 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebkit/537.36 (KHTML, like Gecko) ' 'Chrome/69.0.3497.100 Safari/537.36' )
[docs] def prepare_config(self, config): if 'type' not in config: config['type'] = ['all'] if not isinstance(config['type'], list): config['type'] = [config['type']] return config
@cached('imdb_watchlist', persist='2 hours') def on_task_input(self, task, config) -> list[Entry]: config = self.prepare_config(config) # Create movie entries by parsing imdb list page(s) html using beautifulsoup logger.verbose('Retrieving imdb list: {}', config['list']) headers = { 'Accept-Language': config.get('force_language'), 'User-Agent': self.default_user_agent, } params = {'view': 'detail', 'page': 1} if config['list'] in USER_LISTS: url = 'https://www.imdb.com/user/{}/{}'.format(config['user_id'], config['list']) if config['list'] == 'watchlist': params = {'view': 'detail'} else: url = 'https://www.imdb.com/list/{}'.format(config['list']) if 'all' not in config['type']: title_types = [TITLE_TYPE_MAP[title_type] for title_type in config['type']] params['title_type'] = ','.join(title_types) params['sort'] = 'list_order%2Casc' if config['list'] == 'watchlist': entries = self.parse_html_list( task, config, url, params, headers, kind='predefinedList' ) else: entries = self.parse_html_list(task, config, url, params, headers) return entries
[docs] def _solve_waf(self, url, params, headers): """Bypass AWS WAF JS challenge using Playwright (optional dep). Returns cookies dict with aws-waf-token, or None on failure. """ if sync_playwright is None: logger.warning( 'AWS WAF challenge detected. Install playwright to bypass: ' 'pip install playwright && playwright install chromium' ) return None logger.info('Solving AWS WAF challenge via Playwright headless browser...') try: with sync_playwright() as pw: browser = pw.chromium.launch(headless=True) context = browser.new_context( user_agent=headers.get('User-Agent', self.default_user_agent), locale=headers.get('Accept-Language', 'en-us').replace('-', '_'), ) page = context.new_page() page.set_extra_http_headers({ 'Accept-Language': headers.get('Accept-Language', 'en-us') }) page.goto( url + '?' + '&'.join(f'{k}={v}' for k, v in params.items()), wait_until='networkidle', timeout=30000, ) cookies = page.context.cookies() browser.close() cookie_dict = {c['name']: c['value'] for c in cookies} if 'aws-waf-token' in cookie_dict: logger.debug('Successfully obtained aws-waf-token from Playwright') return cookie_dict logger.warning('Playwright did not receive aws-waf-token cookie') return None # noqa: TRY300 except Exception as e: logger.warning('Failed to solve AWS WAF challenge with Playwright: {}', e) return None
[docs] def fetch_page(self, task, url, params, headers): logger.debug('Requesting: {} {}', url, headers) # If we have cached WAF cookies from a previous solve, attach them if self._waf_cookies: task.requests.cookies.update(self._waf_cookies) try: page = task.requests.get(url, params=params, headers=headers) except RequestException as e: raise plugin.PluginError(str(e)) # AWS WAF JS challenge detected — solve it via Playwright if page.status_code == 202 and page.headers.get('x-amzn-waf-action') == 'challenge': logger.info('IMDB WAF challenge triggered, attempting to bypass...') waf_cookies = self._solve_waf(url, params, headers) if waf_cookies: self._waf_cookies = waf_cookies task.requests.cookies.update(waf_cookies) try: page = task.requests.get(url, params=params, headers=headers) except RequestException as e: raise plugin.PluginError(str(e)) if page.status_code != 200: raise plugin.PluginError( f'Unable to get imdb list. Either list is private or does not exist. ' f'Html status code was: {page.status_code}.' ) return page
[docs] def parse_html_list(self, task, config, url, params, headers, kind='list') -> list[Entry]: logger.debug('Parsing imdb list: {}', url) page = self.fetch_page(task, url, params, headers) soup = get_soup(page.text) try: query_result = json.loads( soup.find('script', id='__NEXT_DATA__', type='application/json').string ) # Handle different JSON structures for different list types if config['list'] == 'ratings': # Ratings use advancedTitleSearch structure advanced_search = query_result['props']['pageProps']['mainColumnData'][ 'advancedTitleSearch' ] total_item_count = advanced_search['total'] items = advanced_search['edges'] else: # Watchlists and other lists use the existing structure total_item_count = query_result['props']['pageProps']['totalItems'] items = query_result['props']['pageProps']['mainColumnData'][kind][ 'titleListItemSearch' ]['edges'] logger.verbose('imdb list contains {} items', total_item_count) except Exception: total_item_count = 0 items = [] if not total_item_count: logger.verbose('Nothing found in imdb list: {}', config['list']) return [] page_no = 1 while len(items) < total_item_count: page_no += 1 params['page'] = page_no page = self.fetch_page(task, url, params, headers) soup = get_soup(page.text) try: query_result = json.loads( soup.find('script', id='__NEXT_DATA__', type='application/json').string ) # Handle pagination for different structures if config['list'] == 'ratings': new_items = query_result['props']['pageProps']['mainColumnData'][ 'advancedTitleSearch' ]['edges'] else: new_items = query_result['props']['pageProps']['mainColumnData'][kind][ 'titleListItemSearch' ]['edges'] items.extend(new_items) except Exception: raise plugin.PluginError('Received invalid list data') # Extract the actual list items from the different structures if config['list'] == 'ratings': # For ratings, items are directly in edges with 'node' containing the title return [self.parse_entry(item['node']['title'], config) for item in items] # For other lists, items are in edges with 'listItem' structure return [self.parse_entry(item['listItem'], config) for item in items]
[docs] def parse_entry(self, item, config) -> Entry: entry = Entry() title = item['titleText']['text'] title_type = item['titleType']['id'] entry['title'] = title entry['url'] = entry['imdb_url'] = f'https://www.imdb.com/title/{item["id"]}/' entry['imdb_id'] = item['id'] entry['imdb_name'] = title entry['imdb_original_name'] = item['originalTitleText']['text'] if title_type in ['movie', 'tvMovie']: entry['movie_name'] = title elif title_type in ['tvSeries', 'tvMiniSeries']: entry['series_name'] = title if item['releaseYear']: year = item['releaseYear']['year'] entry['imdb_year'] = year if title_type in ['movie', 'tvMovie']: entry['movie_year'] = year elif title_type in ['tvSeries', 'tvMiniSeries']: entry['series_year'] = year if not config.get('strip_dates'): entry['title'] += f' ({year})' rating = item['ratingsSummary']['aggregateRating'] if isinstance(rating, float): entry['imdb_user_score'] = entry['imdb_score'] = rating entry['imdb_votes'] = item['ratingsSummary']['voteCount'] return entry
[docs] @event('plugin.register') def register_plugin(): plugin.register(ImdbWatchlist, 'imdb_watchlist', api_ver=2)