from loguru import logger
from flexget import plugin
from flexget.config_schema import one_or_more
from flexget.entry import Entry
from flexget.event import event
from flexget.utils import json
from flexget.utils.cached_input import cached
from flexget.utils.requests import RequestException
from flexget.utils.soup import get_soup
try:
from playwright.sync_api import sync_playwright
except ImportError:
sync_playwright = None
logger = logger.bind(name='imdb_watchlist')
USER_ID_RE = r'^ur\d{7,9}$'
CUSTOM_LIST_RE = r'^ls\d{7,10}$'
USER_LISTS = ['watchlist', 'ratings', 'checkins']
TITLE_TYPE_MAP = {
'movies': 'movie',
'short films': 'short',
'games': 'videoGame',
'mini series': 'tvMiniSeries',
'shows': 'tvSeries',
'episodes': 'tvEpisode',
'tv movies': 'tvMovie',
'tv specials': 'tvSpecial',
'videos': 'video',
}
[docs]
class ImdbWatchlist:
"""Creates an entry for each movie in your imdb list."""
def __init__(self):
self._waf_cookies = None
schema = {
'type': 'object',
'properties': {
'user_id': {
'type': 'string',
'pattern': USER_ID_RE,
'error_pattern': 'user_id must be in the form urXXXXXXX',
},
'list': {
'type': 'string',
'oneOf': [{'enum': USER_LISTS}, {'pattern': CUSTOM_LIST_RE}],
'error_oneOf': 'list must be either {}, or a custom list name (lsXXXXXXXXX)'.format(
', '.join(USER_LISTS)
),
},
'force_language': {'type': 'string', 'default': 'en-us'},
'type': {
'oneOf': [
one_or_more(
{'type': 'string', 'enum': list(TITLE_TYPE_MAP.keys())}, unique_items=True
),
{'type': 'string', 'enum': ['all']},
]
},
'strip_dates': {'type': 'boolean', 'default': False},
},
'additionalProperties': False,
'required': ['list'],
'anyOf': [
{'required': ['user_id']},
{'properties': {'list': {'pattern': CUSTOM_LIST_RE}}},
],
'error_anyOf': 'user_id is required if not using a custom list (lsXXXXXXXXX format)',
}
default_user_agent = (
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebkit/537.36 (KHTML, like Gecko) '
'Chrome/69.0.3497.100 Safari/537.36'
)
[docs]
def prepare_config(self, config):
if 'type' not in config:
config['type'] = ['all']
if not isinstance(config['type'], list):
config['type'] = [config['type']]
return config
@cached('imdb_watchlist', persist='2 hours')
def on_task_input(self, task, config) -> list[Entry]:
config = self.prepare_config(config)
# Create movie entries by parsing imdb list page(s) html using beautifulsoup
logger.verbose('Retrieving imdb list: {}', config['list'])
headers = {
'Accept-Language': config.get('force_language'),
'User-Agent': self.default_user_agent,
}
params = {'view': 'detail', 'page': 1}
if config['list'] in USER_LISTS:
url = 'https://www.imdb.com/user/{}/{}'.format(config['user_id'], config['list'])
if config['list'] == 'watchlist':
params = {'view': 'detail'}
else:
url = 'https://www.imdb.com/list/{}'.format(config['list'])
if 'all' not in config['type']:
title_types = [TITLE_TYPE_MAP[title_type] for title_type in config['type']]
params['title_type'] = ','.join(title_types)
params['sort'] = 'list_order%2Casc'
if config['list'] == 'watchlist':
entries = self.parse_html_list(
task, config, url, params, headers, kind='predefinedList'
)
else:
entries = self.parse_html_list(task, config, url, params, headers)
return entries
[docs]
def _solve_waf(self, url, params, headers):
"""Bypass AWS WAF JS challenge using Playwright (optional dep).
Returns cookies dict with aws-waf-token, or None on failure.
"""
if sync_playwright is None:
logger.warning(
'AWS WAF challenge detected. Install playwright to bypass: '
'pip install playwright && playwright install chromium'
)
return None
logger.info('Solving AWS WAF challenge via Playwright headless browser...')
try:
with sync_playwright() as pw:
browser = pw.chromium.launch(headless=True)
context = browser.new_context(
user_agent=headers.get('User-Agent', self.default_user_agent),
locale=headers.get('Accept-Language', 'en-us').replace('-', '_'),
)
page = context.new_page()
page.set_extra_http_headers({
'Accept-Language': headers.get('Accept-Language', 'en-us')
})
page.goto(
url + '?' + '&'.join(f'{k}={v}' for k, v in params.items()),
wait_until='networkidle',
timeout=30000,
)
cookies = page.context.cookies()
browser.close()
cookie_dict = {c['name']: c['value'] for c in cookies}
if 'aws-waf-token' in cookie_dict:
logger.debug('Successfully obtained aws-waf-token from Playwright')
return cookie_dict
logger.warning('Playwright did not receive aws-waf-token cookie')
return None # noqa: TRY300
except Exception as e:
logger.warning('Failed to solve AWS WAF challenge with Playwright: {}', e)
return None
[docs]
def fetch_page(self, task, url, params, headers):
logger.debug('Requesting: {} {}', url, headers)
# If we have cached WAF cookies from a previous solve, attach them
if self._waf_cookies:
task.requests.cookies.update(self._waf_cookies)
try:
page = task.requests.get(url, params=params, headers=headers)
except RequestException as e:
raise plugin.PluginError(str(e))
# AWS WAF JS challenge detected — solve it via Playwright
if page.status_code == 202 and page.headers.get('x-amzn-waf-action') == 'challenge':
logger.info('IMDB WAF challenge triggered, attempting to bypass...')
waf_cookies = self._solve_waf(url, params, headers)
if waf_cookies:
self._waf_cookies = waf_cookies
task.requests.cookies.update(waf_cookies)
try:
page = task.requests.get(url, params=params, headers=headers)
except RequestException as e:
raise plugin.PluginError(str(e))
if page.status_code != 200:
raise plugin.PluginError(
f'Unable to get imdb list. Either list is private or does not exist. '
f'Html status code was: {page.status_code}.'
)
return page
[docs]
def parse_html_list(self, task, config, url, params, headers, kind='list') -> list[Entry]:
logger.debug('Parsing imdb list: {}', url)
page = self.fetch_page(task, url, params, headers)
soup = get_soup(page.text)
try:
query_result = json.loads(
soup.find('script', id='__NEXT_DATA__', type='application/json').string
)
# Handle different JSON structures for different list types
if config['list'] == 'ratings':
# Ratings use advancedTitleSearch structure
advanced_search = query_result['props']['pageProps']['mainColumnData'][
'advancedTitleSearch'
]
total_item_count = advanced_search['total']
items = advanced_search['edges']
else:
# Watchlists and other lists use the existing structure
total_item_count = query_result['props']['pageProps']['totalItems']
items = query_result['props']['pageProps']['mainColumnData'][kind][
'titleListItemSearch'
]['edges']
logger.verbose('imdb list contains {} items', total_item_count)
except Exception:
total_item_count = 0
items = []
if not total_item_count:
logger.verbose('Nothing found in imdb list: {}', config['list'])
return []
page_no = 1
while len(items) < total_item_count:
page_no += 1
params['page'] = page_no
page = self.fetch_page(task, url, params, headers)
soup = get_soup(page.text)
try:
query_result = json.loads(
soup.find('script', id='__NEXT_DATA__', type='application/json').string
)
# Handle pagination for different structures
if config['list'] == 'ratings':
new_items = query_result['props']['pageProps']['mainColumnData'][
'advancedTitleSearch'
]['edges']
else:
new_items = query_result['props']['pageProps']['mainColumnData'][kind][
'titleListItemSearch'
]['edges']
items.extend(new_items)
except Exception:
raise plugin.PluginError('Received invalid list data')
# Extract the actual list items from the different structures
if config['list'] == 'ratings':
# For ratings, items are directly in edges with 'node' containing the title
return [self.parse_entry(item['node']['title'], config) for item in items]
# For other lists, items are in edges with 'listItem' structure
return [self.parse_entry(item['listItem'], config) for item in items]
[docs]
def parse_entry(self, item, config) -> Entry:
entry = Entry()
title = item['titleText']['text']
title_type = item['titleType']['id']
entry['title'] = title
entry['url'] = entry['imdb_url'] = f'https://www.imdb.com/title/{item["id"]}/'
entry['imdb_id'] = item['id']
entry['imdb_name'] = title
entry['imdb_original_name'] = item['originalTitleText']['text']
if title_type in ['movie', 'tvMovie']:
entry['movie_name'] = title
elif title_type in ['tvSeries', 'tvMiniSeries']:
entry['series_name'] = title
if item['releaseYear']:
year = item['releaseYear']['year']
entry['imdb_year'] = year
if title_type in ['movie', 'tvMovie']:
entry['movie_year'] = year
elif title_type in ['tvSeries', 'tvMiniSeries']:
entry['series_year'] = year
if not config.get('strip_dates'):
entry['title'] += f' ({year})'
rating = item['ratingsSummary']['aggregateRating']
if isinstance(rating, float):
entry['imdb_user_score'] = entry['imdb_score'] = rating
entry['imdb_votes'] = item['ratingsSummary']['voteCount']
return entry
[docs]
@event('plugin.register')
def register_plugin():
plugin.register(ImdbWatchlist, 'imdb_watchlist', api_ver=2)