from __future__ import annotations
import difflib
import time
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any
from urllib.error import URLError
from urllib.parse import quote_plus
from loguru import logger
from sqlalchemy import Column, DateTime, Integer, String, Table, func, sql
from sqlalchemy.orm import relationship
from sqlalchemy.schema import ForeignKey, Index
from flexget import db_schema, plugin
from flexget.plugin import PluginError, internet
from flexget.utils import requests
from flexget.utils.database import text_date_synonym, with_session
from flexget.utils.sqlalchemy_utils import table_add_column, table_schema
if TYPE_CHECKING:
from sqlalchemy.orm import Session
logger = logger.bind(name='api_rottentomatoes')
Base: type[db_schema.VersionedBaseMeta] = db_schema.versioned_base('api_rottentomatoes', 2)
session = requests.Session()
# There is a 5 call per second rate limit per api key with multiple users on the same api key, this can be problematic
session.add_domain_limiter(requests.TimedLimiter('api.rottentomatoes.com', '0.4 seconds'))
# This is developer Atlanta800's API key
API_KEY = 'rh8chjzp8vu6gnpwj88736uv'
API_VER = 'v1.0'
SERVER = 'http://api.rottentomatoes.com/api/public'
MIN_MATCH = 0.5
MIN_DIFF = 0.01
@db_schema.upgrade('api_rottentomatoes')
def upgrade(ver: int, session: Session) -> int:
if ver == 0:
table_names = [
'rottentomatoes_actors',
'rottentomatoes_alternate_ids',
'rottentomatoes_directors',
'rottentomatoes_genres',
'rottentomatoes_links',
'rottentomatoes_movie_actors',
'rottentomatoes_movie_directors',
'rottentomatoes_movie_genres',
'rottentomatoes_movies',
'rottentomatoes_posters',
'rottentomatoes_releasedates',
'rottentomatoes_search_results',
]
tables = [table_schema(name, session) for name in table_names]
for table in tables:
session.execute(table.delete())
table_add_column('rottentomatoes_actors', 'rt_id', String, session)
ver = 1
if ver == 1:
table = table_schema('rottentomatoes_search_results', session)
session.execute(sql.delete(table, table.c.movie_id.is_(None)))
ver = 2
return ver
# association tables
genres_table = Table(
'rottentomatoes_movie_genres',
Base.metadata,
Column('movie_id', Integer, ForeignKey('rottentomatoes_movies.id')),
Column('genre_id', Integer, ForeignKey('rottentomatoes_genres.id')),
Index('ix_rottentomatoes_movie_genres', 'movie_id', 'genre_id'),
)
Base.register_table(genres_table)
actors_table = Table(
'rottentomatoes_movie_actors',
Base.metadata,
Column('movie_id', Integer, ForeignKey('rottentomatoes_movies.id')),
Column('actor_id', Integer, ForeignKey('rottentomatoes_actors.id')),
Index('ix_rottentomatoes_movie_actors', 'movie_id', 'actor_id'),
)
Base.register_table(actors_table)
directors_table = Table(
'rottentomatoes_movie_directors',
Base.metadata,
Column('movie_id', Integer, ForeignKey('rottentomatoes_movies.id')),
Column('director_id', Integer, ForeignKey('rottentomatoes_directors.id')),
Index('ix_rottentomatoes_movie_directors', 'movie_id', 'director_id'),
)
Base.register_table(directors_table)
# TODO: get rid of
[docs]
class RottenTomatoesContainer:
"""Base class for RottenTomatoes objects."""
def __init__(self, init_dict: dict[str, Any] | None = None) -> None:
if isinstance(init_dict, dict):
self.update_from_dict(init_dict)
[docs]
def update_from_dict(self, update_dict: dict[str, Any]) -> None:
"""Populate any simple (string or number) attributes from a dict."""
for col in self.__table__.columns:
if isinstance(update_dict.get(col.name), (str, int, float)):
setattr(self, col.name, update_dict[col.name])
[docs]
class RottenTomatoesMovie(RottenTomatoesContainer, Base):
__tablename__ = 'rottentomatoes_movies'
id = Column(Integer, primary_key=True, autoincrement=False, nullable=False)
title = Column(String)
year = Column(Integer)
genres = relationship('RottenTomatoesGenre', secondary=genres_table, backref='movies')
mpaa_rating = Column(String)
runtime = Column(Integer)
critics_consensus = Column(String)
release_dates = relationship(
'ReleaseDate', backref='movie', cascade='all, delete, delete-orphan'
)
critics_rating = Column(String)
critics_score = Column(Integer)
audience_rating = Column(String)
audience_score = Column(Integer)
synopsis = Column(String)
posters = relationship(
'RottenTomatoesPoster', backref='movie', cascade='all, delete, delete-orphan'
)
cast = relationship('RottenTomatoesActor', secondary=actors_table, backref='movies')
directors = relationship('RottenTomatoesDirector', secondary=directors_table, backref='movies')
studio = Column(String)
# NOTE: alternate_ids is not anymore used, it used to store imdb_id
alternate_ids = relationship(
'RottenTomatoesAlternateId', backref='movie', cascade='all, delete, delete-orphan'
)
links = relationship(
'RottenTomatoesLink', backref='movie', cascade='all, delete, delete-orphan'
)
# updated time, so we can grab new rating counts after 48 hours
# set a default, so existing data gets updated with a rating
updated = Column(DateTime)
@property
def expired(self) -> bool:
""":return: True if movie details are considered to be expired, ie. need of update"""
if self.updated is None:
logger.debug('updated is None: {}', self)
return True
refresh_interval = 2
if self.year:
age = datetime.now().year - self.year
refresh_interval += age * 5
logger.debug('movie `{}` age {} expires in {} days', self.title, age, refresh_interval)
return self.updated < datetime.now() - timedelta(days=refresh_interval)
def __repr__(self) -> str:
return f'<RottenTomatoesMovie(title={self.title},id={self.id},year={self.year})>'
[docs]
class RottenTomatoesGenre(Base):
__tablename__ = 'rottentomatoes_genres'
id = Column(Integer, primary_key=True)
name = Column(String)
def __init__(self, name: str) -> None:
self.name = name
[docs]
class ReleaseDate(Base):
__tablename__ = 'rottentomatoes_releasedates'
db_id = Column(Integer, primary_key=True)
movie_id = Column(Integer, ForeignKey('rottentomatoes_movies.id'))
name = Column(String)
date = text_date_synonym('_date')
_date = Column('date', DateTime)
def __init__(self, name: str, date: datetime) -> None:
self.name = name
self.date = date
[docs]
class RottenTomatoesPoster(Base):
__tablename__ = 'rottentomatoes_posters'
db_id = Column(Integer, primary_key=True)
movie_id = Column(Integer, ForeignKey('rottentomatoes_movies.id'))
name = Column(String)
url = Column(String)
def __init__(self, name: str, url: str) -> None:
self.name = name
self.url = url
[docs]
class RottenTomatoesActor(Base):
__tablename__ = 'rottentomatoes_actors'
id = Column(Integer, primary_key=True)
rt_id = Column(String)
name = Column(String)
def __init__(self, name: str, rt_id: str) -> None:
self.name = name
self.rt_id = rt_id
[docs]
class RottenTomatoesDirector(Base):
__tablename__ = 'rottentomatoes_directors'
id = Column(Integer, primary_key=True)
name = Column(String)
def __init__(self, name: str) -> None:
self.name = name
[docs]
class RottenTomatoesAlternateId(Base):
__tablename__ = 'rottentomatoes_alternate_ids'
db_id = Column(Integer, primary_key=True)
movie_id = Column(Integer, ForeignKey('rottentomatoes_movies.id'))
name = Column(String)
id = Column(String)
def __init__(self, name: str, id: str) -> None:
self.name = name
self.id = id
[docs]
class RottenTomatoesLink(Base):
__tablename__ = 'rottentomatoes_links'
db_id = Column(Integer, primary_key=True)
movie_id = Column(Integer, ForeignKey('rottentomatoes_movies.id'))
name = Column(String)
url = Column(String)
def __init__(self, name: str, url: str) -> None:
self.name = name
self.url = url
[docs]
class RottenTomatoesSearchResult(Base):
__tablename__ = 'rottentomatoes_search_results'
id = Column(Integer, primary_key=True)
search = Column(String, nullable=False)
movie_id = Column(Integer, ForeignKey('rottentomatoes_movies.id'), nullable=False)
movie = relationship(RottenTomatoesMovie, backref='search_strings')
def __repr__(self) -> str:
return f'<RottenTomatoesSearchResult(search={self.search},movie_id={self.movie_id},movie={self.movie})>'
@internet(logger)
@with_session
def lookup_movie(
title: str | None = None,
year: int | None = None,
rottentomatoes_id: int | None = None,
smart_match: bool | None = None,
only_cached: bool = False,
session: Session | None = None,
api_key: str | None = None,
) -> RottenTomatoesMovie:
"""Do a lookup from Rotten Tomatoes for the movie matching the passed arguments.
Any combination of criteria can be passed, the most specific criteria specified will be used.
:param rottentomatoes_id: rottentomatoes_id of desired movie
:param string title: title of desired movie
:param year: release year of desired movie
:param smart_match: attempt to clean and parse title and year from a string
:param only_cached: if this is specified, an online lookup will not occur if the movie is not in the cache
:param session: optionally specify a session to use, if specified, returned Movie will be live in that session
:param api_key: optionally specify an API key to use
:returns: The Movie object populated with data from Rotten Tomatoes
:raises: PluginError if a match cannot be found or there are other problems with the lookup
"""
if smart_match:
# If smart_match was specified, and we don't have more specific criteria, parse it into a title and year
title_parser = plugin.get('parsing', 'api_rottentomatoes').parse_movie(smart_match)
title = title_parser.name
year = title_parser.year
if title == '' and not (rottentomatoes_id or title):
raise PluginError(f'Failed to parse name from {smart_match}')
search_string = ''
if title:
search_string = title.lower()
if year:
search_string = f'{search_string} {year}'
elif not rottentomatoes_id:
raise PluginError('No criteria specified for rotten tomatoes lookup')
def id_str() -> str:
return f'<title={title},year={year},rottentomatoes_id={rottentomatoes_id}>'
logger.debug('Looking up rotten tomatoes information for {}', id_str())
movie = None
# Try to lookup from cache
if rottentomatoes_id:
movie = (
session
.query(RottenTomatoesMovie)
.filter(RottenTomatoesMovie.id == rottentomatoes_id)
.first()
)
if not movie and title:
movie_filter = session.query(RottenTomatoesMovie).filter(
func.lower(RottenTomatoesMovie.title) == title.lower()
)
if year:
movie_filter = movie_filter.filter(RottenTomatoesMovie.year == year)
movie = movie_filter.first()
if not movie:
logger.debug('No matches in movie cache found, checking search cache.')
found = (
session
.query(RottenTomatoesSearchResult)
.filter(func.lower(RottenTomatoesSearchResult.search) == search_string)
.first()
)
if found and found.movie:
logger.debug('Movie found in search cache.')
movie = found.movie
if movie:
# Movie found in cache, check if cache has expired.
if movie.expired and not only_cached:
logger.debug(
'Cache has expired for {}, attempting to refresh from Rotten Tomatoes.', id_str()
)
try:
result = movies_info(movie.id, api_key)
movie = _set_movie_details(movie, session, result, api_key)
session.merge(movie)
except URLError:
logger.error(
'Error refreshing movie details from Rotten Tomatoes, cached info being used.'
)
else:
logger.debug('Movie {} information restored from cache.', id_str())
else:
if only_cached:
raise PluginError(f'Movie {id_str()} not found from cache')
# There was no movie found in the cache, do a lookup from Rotten Tomatoes
logger.debug('Movie {} not found in cache, looking up from rotten tomatoes.', id_str())
try:
if not movie and rottentomatoes_id:
result = movies_info(rottentomatoes_id, api_key)
if result:
movie = RottenTomatoesMovie()
movie = _set_movie_details(movie, session, result, api_key)
session.add(movie)
if not movie and title:
# TODO: Extract to method
logger.verbose('Searching from rt `{}`', search_string)
results = movies_search(search_string, api_key=api_key)
if results:
results = results.get('movies')
if results:
for movie_res in results:
seq = difflib.SequenceMatcher(
lambda x: x == ' ', movie_res['title'].lower(), title.lower()
)
movie_res['match'] = seq.ratio()
results.sort(key=lambda x: x['match'], reverse=True)
# Remove all movies below MIN_MATCH, and different year
for movie_res in results[:]:
if year and movie_res.get('year'):
movie_res['year'] = int(movie_res['year'])
if movie_res['year'] != year:
release_year = False
if movie_res.get('release_dates', {}).get('theater'):
logger.debug('Checking year against theater release date')
release_year = time.strptime(
movie_res['release_dates'].get('theater'), '%Y-%m-%d'
).tm_year
elif movie_res.get('release_dates', {}).get('dvd'):
logger.debug('Checking year against dvd release date')
release_year = time.strptime(
movie_res['release_dates'].get('dvd'), '%Y-%m-%d'
).tm_year
if not (release_year and release_year == year):
logger.debug(
'removing {} - {} (wrong year: {})',
movie_res['title'],
movie_res['id'],
str(release_year or movie_res['year']),
)
results.remove(movie_res)
continue
if movie_res['match'] < MIN_MATCH:
logger.debug('removing {} (min_match)', movie_res['title'])
results.remove(movie_res)
continue
if not results:
raise PluginError('no appropriate results')
if len(results) == 1:
logger.debug('SUCCESS: only one movie remains')
else:
# Check min difference between best two hits
diff = results[0]['match'] - results[1]['match']
if diff < MIN_DIFF:
logger.debug(
'unable to determine correct movie, min_diff too small(`{} ({}) - {}` <-?-> `{} ({}) - {}`)',
results[0]['title'],
results[0]['year'],
results[0]['id'],
results[1]['title'],
results[1]['year'],
results[1]['id'],
)
for r in results:
logger.debug(
'remain: {} (match: {}) {}',
r['title'],
r['match'],
r['id'],
)
raise PluginError('min_diff')
result = movies_info(results[0].get('id'), api_key)
if not result:
result = results[0]
movie = (
session
.query(RottenTomatoesMovie)
.filter(RottenTomatoesMovie.id == result['id'])
.first()
)
if not movie:
movie = RottenTomatoesMovie()
movie = _set_movie_details(movie, session, result, api_key)
session.add(movie)
session.commit()
if title.lower() != movie.title.lower():
logger.debug("Saving search result for '{}'", search_string)
session.add(
RottenTomatoesSearchResult(search=search_string, movie=movie)
)
except URLError:
raise PluginError('Error looking up movie from RottenTomatoes')
if not movie:
raise PluginError(f'No results found from rotten tomatoes for {id_str()}')
# Access attributes to force the relationships to eager load before we detach from session
for attr in [
'alternate_ids',
'cast',
'directors',
'genres',
'links',
'posters',
'release_dates',
]:
getattr(movie, attr)
session.commit()
return movie
# TODO: get rid of or heavily refactor
[docs]
def _set_movie_details(
movie: RottenTomatoesMovie,
session: Session,
movie_data: dict[str, Any] | None = None,
api_key: str | None = None,
) -> Any:
"""Populate ``movie`` object from given data.
:param movie: movie object to update
:param session: session to use, returned Movie will be live in that session
:param api_key: optionally specify an API key to use
:param movie_data: data to copy into the :movie:
"""
if not movie_data:
if not movie.id:
raise PluginError('Cannot get rotten tomatoes details without rotten tomatoes id')
movie_data = movies_info(movie.id, api_key)
if movie_data:
if movie.id:
logger.debug(
'Updating movie info (actually just deleting the old info and adding the new)'
)
del movie.release_dates[:]
del movie.posters[:]
del movie.alternate_ids[:]
del movie.links[:]
movie.update_from_dict(movie_data)
movie.update_from_dict(movie_data.get('ratings'))
genres = movie_data.get('genres')
if genres:
for name in genres:
genre = (
session
.query(RottenTomatoesGenre)
.filter(func.lower(RottenTomatoesGenre.name) == name.lower())
.first()
)
if not genre:
genre = RottenTomatoesGenre(name)
movie.genres.append(genre)
release_dates = movie_data.get('release_dates')
if release_dates:
for name, date in list(release_dates.items()):
movie.release_dates.append(ReleaseDate(name, date))
posters = movie_data.get('posters')
if posters:
for name, url in list(posters.items()):
movie.posters.append(RottenTomatoesPoster(name, url))
cast = movie_data.get('abridged_cast')
if cast:
for res_actor in cast:
actor = (
session
.query(RottenTomatoesActor)
.filter(func.lower(RottenTomatoesActor.rt_id) == res_actor['id'])
.first()
)
if not actor:
actor = RottenTomatoesActor(res_actor['name'], res_actor['id'])
movie.cast.append(actor)
directors = movie_data.get('abridged_directors')
if directors:
for res_director in directors:
director = (
session
.query(RottenTomatoesDirector)
.filter(
func.lower(RottenTomatoesDirector.name) == res_director['name'].lower()
)
.first()
)
if not director:
director = RottenTomatoesDirector(res_director['name'])
movie.directors.append(director)
alternate_ids = movie_data.get('alternate_ids')
if alternate_ids:
for name, id in list(alternate_ids.items()):
movie.alternate_ids.append(RottenTomatoesAlternateId(name, id))
links = movie_data.get('links')
if links:
for name, url in list(links.items()):
movie.links.append(RottenTomatoesLink(name, url))
movie.updated = datetime.now()
else:
raise PluginError(f'No movie_data for rottentomatoes_id {movie.id}')
return movie
[docs]
def movies_info(id, api_key: str | None = None):
if not api_key:
api_key = API_KEY
url = f'{SERVER}/{API_VER}/movies/{id}.json?apikey={api_key}'
result = get_json(url)
if isinstance(result, dict) and result.get('id'):
return result
return None
[docs]
def lists(
list_type,
list_name,
limit: int = 20,
page_limit: int = 20,
page: int | None = None,
api_key=None,
):
if isinstance(list_type, str):
list_type = list_type.replace(' ', '_')
if isinstance(list_name, str):
list_name = list_name.replace(' ', '_')
if not api_key:
api_key = API_KEY
url = f'{SERVER}/{API_VER}/lists/{list_type}/{list_name}.json?apikey={api_key}'
if limit:
url += f'&limit={limit}'
if page_limit:
url += f'&page_limit={page_limit}'
if page:
url += f'&page={page}'
results = get_json(url)
if isinstance(results, dict) and len(results.get('movies')):
return results
return None
[docs]
def movies_search(
q, page_limit: int | None = None, page: int | None = None, api_key: str | None = None
):
if isinstance(q, str):
q = quote_plus(q.encode('latin-1', errors='ignore'))
if not api_key:
api_key = API_KEY
url = f'{SERVER}/{API_VER}/movies.json?q={q}&apikey={api_key}'
if page_limit:
url += f'&page_limit={page_limit}'
if page:
url += f'&page={page}'
results = get_json(url)
if isinstance(results, dict) and results.get('total') and results.get('movies'):
return results
return None
[docs]
def get_json(url: str) -> dict[str, Any] | None:
try:
logger.debug('fetching json at {}', url)
data = session.get(url)
return data.json()
except requests.RequestException as e:
logger.warning('Request failed {}: {}', url, e)
return None
except ValueError:
logger.warning('Rotten Tomatoes returned invalid json at: {}', url)
return None