from __future__ import annotations
import time
from datetime import datetime, timedelta
from typing import TYPE_CHECKING
from dateutil.parser import parse as dateutil_parse
from loguru import logger
from sqlalchemy import Column, Date, DateTime, Integer, String, Table, Time, Unicode, and_, or_
from sqlalchemy.orm import relationship
from sqlalchemy.schema import ForeignKey
from flexget import db_schema, plugin
from flexget.manager import Session
from flexget.terminal import console
from flexget.utils import requests
from flexget.utils.database import json_synonym
from flexget.utils.tools import split_title_year
if TYPE_CHECKING:
from flexget.entry import Entry
Base = db_schema.versioned_base('api_trakt', 7)
AuthBase = db_schema.versioned_base('trakt_auth', 0)
logger = logger.bind(name='api_trakt')
# Production Site
CLIENT_ID = '57e188bcb9750c79ed452e1674925bc6848bd126e02bb15350211be74c6547af'
CLIENT_SECRET = 'db4af7531e8df678b134dbc22445a2c04ebdbdd7213be7f5b6d17dfdfabfcdc2'
API_URL = 'https://api.trakt.tv/'
PIN_URL = 'https://trakt.tv/pin/346'
# Oauth account authentication
[docs]
class TraktUserAuth(AuthBase):
__tablename__ = 'trakt_user_auth'
account = Column(Unicode, primary_key=True)
access_token = Column(Unicode)
refresh_token = Column(Unicode)
created = Column(DateTime)
expires = Column(DateTime)
def __init__(self, account, access_token, refresh_token, created, expires):
self.account = account
self.access_token = access_token
self.refresh_token = refresh_token
self.expires = token_expire_date(expires)
self.created = token_created_date(created)
[docs]
def token_expire_date(expires):
return datetime.now() + timedelta(seconds=expires)
[docs]
def token_created_date(created):
return datetime.fromtimestamp(created)
[docs]
def device_auth():
data = {'client_id': CLIENT_ID}
try:
r = requests.post(get_api_url('oauth/device/code'), data=data).json()
device_code = r['device_code']
user_code = r['user_code']
expires_in = r['expires_in']
interval = r['interval']
console(
'Please visit {} and authorize Flexget. Your user code is {}. Your code expires in '
'{} minutes.'.format(r['verification_url'], user_code, expires_in / 60.0)
)
logger.debug('Polling for user authorization.')
data['code'] = device_code
data['client_secret'] = CLIENT_SECRET
end_time = time.time() + expires_in
console('Waiting...', end='')
# stop polling after expires_in seconds
while time.time() < end_time:
time.sleep(interval)
polling_request = requests.post(
get_api_url('oauth/device/token'), data=data, raise_status=False
)
if polling_request.status_code == 200: # success
return polling_request.json()
if polling_request.status_code == 400: # pending -- waiting for user
console('...', end='')
elif polling_request.status_code == 404: # not found -- invalid device_code
raise plugin.PluginError('Invalid device code. Open an issue on Github.')
elif polling_request.status_code == 409: # already used -- user already approved
raise plugin.PluginError('User code has already been approved.')
elif polling_request.status_code == 410: # expired -- restart process
break
elif polling_request.status_code == 418: # denied -- user denied code
raise plugin.PluginError('User code has been denied.')
elif polling_request.status_code == 429: # polling too fast
logger.warning('Polling too quickly. Upping the interval. No action required.')
interval += 1
raise plugin.PluginError('User code has expired. Please try again.')
except requests.RequestException as e:
raise plugin.PluginError(f'Device authorization with Trakt.tv failed: {e}')
[docs]
def token_oauth(data):
try:
return requests.post(get_api_url('oauth/token'), data=data).json()
except requests.RequestException as e:
raise plugin.PluginError(f'Token exchange with trakt failed: {e}')
[docs]
def delete_account(account):
with Session() as session:
acc = session.query(TraktUserAuth).filter(TraktUserAuth.account == account).first()
if not acc:
raise plugin.PluginError(f'Account {account} not found.')
session.delete(acc)
[docs]
def get_access_token(account, token=None, refresh=False, re_auth=False, called_from_cli=False):
"""Get authorization info from a pin or refresh token.
:param account: Arbitrary account name to attach authorization to.
:param unicode token: The pin or refresh token, as supplied by the trakt website.
:param bool refresh: If True, refresh the access token using refresh_token from db.
:param bool re_auth: If True, account is re-authorized even if it already exists in db.
:raises RequestException: If there is a network error while authorizing.
"""
data = {
'client_id': CLIENT_ID,
'client_secret': CLIENT_SECRET,
'redirect_uri': 'urn:ietf:wg:oauth:2.0:oob',
}
with Session() as session:
acc = session.query(TraktUserAuth).filter(TraktUserAuth.account == account).first()
if acc and datetime.now() < acc.expires and not refresh and not re_auth:
return acc.access_token
if acc and (refresh or datetime.now() >= acc.expires - timedelta(days=5)) and not re_auth:
logger.debug('Using refresh token to re-authorize account {}.', account)
data['refresh_token'] = acc.refresh_token
data['grant_type'] = 'refresh_token'
token_dict = token_oauth(data)
elif token:
# We are only in here if a pin was specified, so it's safe to use console instead of logging
console(
'Warning: PIN authorization has been deprecated. Use Device Authorization instead.'
)
data['code'] = token
data['grant_type'] = 'authorization_code'
token_dict = token_oauth(data)
elif called_from_cli:
logger.debug(
'No pin specified for an unknown account {}. Attempting to authorize device.',
account,
)
token_dict = device_auth()
else:
raise plugin.PluginError(
f'Account {account} has not been authorized. See `flexget trakt auth -h` on how to.'
)
try:
new_acc = TraktUserAuth(
account,
token_dict['access_token'],
token_dict['refresh_token'],
token_dict.get('created_at', time.time()),
token_dict['expires_in'],
)
session.merge(new_acc)
except requests.RequestException as e:
raise plugin.PluginError(f'Token exchange with trakt failed: {e}')
return new_acc.access_token
[docs]
def make_list_slug(name):
"""Return the slug for use in url for given list name."""
slug = name.lower()
# These characters are just stripped in the url
for char in '!@#$%^*()[]{}/=?+\\|':
slug = slug.replace(char, '')
# These characters get replaced
slug = slug.replace('&', 'and')
return slug.replace(' ', '-')
[docs]
def get_session(account=None, token=None):
"""Create a requests session ready to talk to trakt API with FlexGet's api key.
Can also add user level authentication if `account` parameter is given.
:param account: An account authorized via `flexget trakt auth` CLI command. If given, returned session will be
authenticated for that account.
"""
# default to username if account name is not specified
session = requests.Session()
session.headers = {
'Content-Type': 'application/json',
'trakt-api-version': '2',
'trakt-api-key': CLIENT_ID,
}
if account:
access_token = get_access_token(account, token) if account else None
if access_token:
session.headers.update({'Authorization': f'Bearer {access_token}'})
return session
[docs]
def get_api_url(*endpoint):
"""Get the address of a trakt API endpoint.
:param endpoint: Can by a string endpoint (e.g. 'sync/watchlist') or an iterable (e.g. ('sync', 'watchlist')
Multiple parameters can also be specified instead of a single iterable.
:returns: The absolute url to the specified API endpoint.
"""
if len(endpoint) == 1 and not isinstance(endpoint[0], str):
endpoint = endpoint[0]
# Make sure integer portions are turned into strings first too
return API_URL + '/'.join(map(str, endpoint))
@db_schema.upgrade('api_trakt')
def upgrade(ver, session):
if ver is None or ver <= 6:
raise db_schema.UpgradeImpossible
return ver
[docs]
def get_entry_ids(entry: Entry):
"""Create a trakt ids dict from id fields on an entry. Prefer already populated info over lazy lookups."""
ids = {}
for lazy in [False, True]:
if entry.get('trakt_movie_id', eval_lazy=lazy):
ids['trakt'] = entry['trakt_movie_id']
elif entry.get('trakt_show_id', eval_lazy=lazy):
ids['trakt'] = entry['trakt_show_id']
elif entry.get('trakt_episode_id', eval_lazy=lazy):
ids['trakt'] = entry['trakt_episode_id']
if entry.get('tmdb_id', eval_lazy=lazy):
ids['tmdb'] = entry['tmdb_id']
if entry.get('tvdb_id', eval_lazy=lazy):
ids['tvdb'] = entry['tvdb_id']
if entry.get('imdb_id', eval_lazy=lazy):
ids['imdb'] = entry['imdb_id']
if entry.get('tvrage_id', eval_lazy=lazy):
ids['tvrage'] = entry['tvrage_id']
if ids:
break
return ids
[docs]
class TraktMovieTranslation(Base):
__tablename__ = 'trakt_movie_translations'
id = Column(Integer, primary_key=True, autoincrement=True)
language = Column(Unicode)
overview = Column(Unicode)
tagline = Column(Unicode)
title = Column(Unicode)
movie_id = Column(Integer, ForeignKey('trakt_movies.id'))
def __init__(self, translation, session):
super().__init__()
self.update(translation, session)
[docs]
def update(self, translation, session):
for col in translation:
setattr(self, col, translation.get(col))
[docs]
class TraktShowTranslation(Base):
__tablename__ = 'trakt_show_translations'
id = Column(Integer, primary_key=True, autoincrement=True)
language = Column(Unicode)
overview = Column(Unicode)
title = Column(Unicode)
show_id = Column(Integer, ForeignKey('trakt_shows.id'))
def __init__(self, translation, session):
super().__init__()
self.update(translation, session)
[docs]
def update(self, translation, session):
for col in translation:
setattr(self, col, translation.get(col))
[docs]
def get_translations(ident, style):
url = get_api_url(style + 's', ident, 'translations')
trakt_translation = TraktShowTranslation if style == 'show' else TraktMovieTranslation
trakt_translation_id = getattr(trakt_translation, style + '_id')
translations = []
req_session = get_session()
try:
results = req_session.get(url, params={'extended': 'full'}).json()
with Session() as session:
for result in results:
translation = (
session
.query(trakt_translation)
.filter(
and_(
trakt_translation.language == result.get('language'),
trakt_translation_id == ident,
)
)
.first()
)
if not translation:
translation = trakt_translation(result, session)
translations.append(translation)
except requests.RequestException as e:
logger.debug('Error adding translations to trakt id {}: {}', ident, e)
else:
return translations
[docs]
class TraktGenre(Base):
__tablename__ = 'trakt_genres'
name = Column(Unicode, primary_key=True)
show_genres_table = Table(
'trakt_show_genres',
Base.metadata,
Column('show_id', Integer, ForeignKey('trakt_shows.id')),
Column('genre_id', Unicode, ForeignKey('trakt_genres.name')),
)
Base.register_table(show_genres_table)
movie_genres_table = Table(
'trakt_movie_genres',
Base.metadata,
Column('movie_id', Integer, ForeignKey('trakt_movies.id')),
Column('genre_id', Unicode, ForeignKey('trakt_genres.name')),
)
Base.register_table(movie_genres_table)
[docs]
class TraktActor(Base):
__tablename__ = 'trakt_actors'
id = Column(Integer, primary_key=True, nullable=False)
name = Column(Unicode)
slug = Column(Unicode)
tmdb = Column(Integer)
imdb = Column(Unicode)
biography = Column(Unicode)
birthday = Column(Date)
death = Column(Date)
homepage = Column(Unicode)
def __init__(self, actor, session):
super().__init__()
self.update(actor, session)
[docs]
def update(self, actor, session):
if self.id and self.id != actor.get('ids').get('trakt'):
raise ValueError('Tried to update db actors with different actor data')
if not self.id:
self.id = actor.get('ids').get('trakt')
self.name = actor.get('name')
ids = actor.get('ids')
self.imdb = ids.get('imdb')
self.slug = ids.get('slug')
self.tmdb = ids.get('tmdb')
self.biography = actor.get('biography')
if actor.get('birthday'):
self.birthday = dateutil_parse(actor.get('birthday'))
if actor.get('death'):
self.death = dateutil_parse(actor.get('death'))
self.homepage = actor.get('homepage')
[docs]
def to_dict(self):
return {'name': self.name, 'trakt_id': self.id, 'imdb_id': self.imdb, 'tmdb_id': self.tmdb}
show_actors_table = Table(
'trakt_show_actors',
Base.metadata,
Column('show_id', Integer, ForeignKey('trakt_shows.id')),
Column('actors_id', Integer, ForeignKey('trakt_actors.id')),
)
Base.register_table(show_actors_table)
movie_actors_table = Table(
'trakt_movie_actors',
Base.metadata,
Column('movie_id', Integer, ForeignKey('trakt_movies.id')),
Column('actors_id', Integer, ForeignKey('trakt_actors.id')),
)
Base.register_table(movie_actors_table)
[docs]
def get_db_actors(ident, style) -> list[TraktActor]:
actors = {}
url = get_api_url(f'{style}s', ident, 'people')
req_session = get_session()
try:
results = req_session.get(url, params={'extended': 'full'}).json()
with Session() as session:
for result in results.get('cast'):
trakt_id = result.get('person').get('ids').get('trakt')
# sometimes an actor can occur twice in the list by mistake. This check is to avoid this unlikely event
if trakt_id in actors:
continue
actor = session.query(TraktActor).filter(TraktActor.id == trakt_id).first()
if not actor:
actor = TraktActor(result.get('person'), session)
actors[trakt_id] = actor
except requests.RequestException as e:
logger.debug('Error searching for actors for trakt id {}', e)
return list(actors.values())
[docs]
def get_translations_dict(translate, style):
res = {}
for lang in translate:
info = {'overview': lang.overview, 'title': lang.title}
if style == 'movie':
info['tagline'] = lang.tagline
res[lang.language] = info
return res
[docs]
def list_actors(actors):
res = {}
for actor in actors:
info = {
'trakt_id': actor.id,
'name': actor.name,
'imdb_id': str(actor.imdb),
'trakt_slug': actor.slug,
'tmdb_id': str(actor.tmdb),
'birthday': actor.birthday.strftime('%Y/%m/%d') if actor.birthday else None,
'biography': actor.biography,
'homepage': actor.homepage,
'death': actor.death.strftime('%Y/%m/%d') if actor.death else None,
}
res[str(actor.id)] = info
return res
[docs]
class TraktEpisode(Base):
__tablename__ = 'trakt_episodes'
id = Column(Integer, primary_key=True, autoincrement=False)
tvdb_id = Column(Integer)
imdb_id = Column(Unicode)
tmdb_id = Column(Integer)
tvrage_id = Column(Unicode)
title = Column(Unicode)
season = Column(Integer)
number = Column(Integer)
number_abs = Column(Integer)
overview = Column(Unicode)
first_aired = Column(DateTime)
updated_at = Column(DateTime)
cached_at = Column(DateTime)
series_id = Column(Integer, ForeignKey('trakt_shows.id'), nullable=False)
def __init__(self, trakt_episode, session):
super().__init__()
self.update(trakt_episode, session)
[docs]
def update(self, trakt_episode, session):
"""Update this record from the trakt media object `trakt_episode` returned by the trakt api."""
if self.id and self.id != trakt_episode['ids']['trakt']:
raise ValueError('Tried to update db ep with different ep data')
if not self.id:
self.id = trakt_episode['ids']['trakt']
self.imdb_id = trakt_episode['ids'].get('imdb')
self.tmdb_id = trakt_episode['ids'].get('tmdb')
self.tvrage_id = trakt_episode['ids'].get('tvrage')
self.tvdb_id = trakt_episode['ids'].get('tvdb')
self.first_aired = None
if trakt_episode.get('first_aired'):
self.first_aired = dateutil_parse(trakt_episode['first_aired'], ignoretz=True)
self.updated_at = dateutil_parse(trakt_episode.get('updated_at'), ignoretz=True)
self.cached_at = datetime.now()
for col in ['title', 'season', 'number', 'number_abs', 'overview']:
setattr(self, col, trakt_episode.get(col))
@property
def expired(self):
# TODO: should episode have its own expiration function?
return False
[docs]
class TraktSeason(Base):
__tablename__ = 'trakt_seasons'
id = Column(Integer, primary_key=True, autoincrement=False)
tvdb_id = Column(Integer)
tmdb_id = Column(Integer)
tvrage_id = Column(Unicode)
title = Column(Unicode)
number = Column(Integer)
episode_count = Column(Integer)
aired_episodes = Column(Integer)
overview = Column(Unicode)
first_aired = Column(DateTime)
ratings = Column(Integer)
votes = Column(Integer)
cached_at = Column(DateTime)
series_id = Column(Integer, ForeignKey('trakt_shows.id'), nullable=False)
def __init__(self, trakt_season, session):
super().__init__()
self.update(trakt_season, session)
[docs]
def update(self, trakt_season, session):
"""Update this record from the trakt media object `trakt_episode` returned by the trakt api."""
if self.id and self.id != trakt_season['ids']['trakt']:
raise ValueError('Tried to update db season with different season data')
if not self.id:
self.id = trakt_season['ids']['trakt']
self.tmdb_id = trakt_season['ids'].get('tmdb')
self.tvrage_id = trakt_season['ids'].get('tvrage')
self.tvdb_id = trakt_season['ids'].get('tvdb')
self.first_aired = None
if trakt_season.get('first_aired'):
self.first_aired = dateutil_parse(trakt_season['first_aired'], ignoretz=True)
self.cached_at = datetime.now()
for col in [
'title',
'number',
'episode_count',
'aired_episodes',
'ratings',
'votes',
'overview',
]:
setattr(self, col, trakt_season.get(col))
@property
def expired(self):
# TODO: should season have its own expiration function?
return False
[docs]
class TraktShow(Base):
__tablename__ = 'trakt_shows'
id = Column(Integer, primary_key=True, autoincrement=False)
title = Column(Unicode)
year = Column(Integer)
slug = Column(Unicode)
tvdb_id = Column(Integer)
imdb_id = Column(Unicode)
tmdb_id = Column(Integer)
tvrage_id = Column(Unicode)
overview = Column(Unicode)
first_aired = Column(DateTime)
air_day = Column(Unicode)
air_time = Column(Time)
timezone = Column(Unicode)
runtime = Column(Integer)
certification = Column(Unicode)
network = Column(Unicode)
country = Column(Unicode)
status = Column(String)
rating = Column(Integer)
votes = Column(Integer)
language = Column(Unicode)
homepage = Column(Unicode)
trailer = Column(Unicode)
aired_episodes = Column(Integer)
_translations = relationship(TraktShowTranslation)
_translation_languages = Column('translation_languages', Unicode)
translation_languages = json_synonym('_translation_languages')
episodes = relationship(
TraktEpisode, backref='show', cascade='all, delete, delete-orphan', lazy='dynamic'
)
seasons = relationship(
TraktSeason, backref='show', cascade='all, delete, delete-orphan', lazy='dynamic'
)
genres = relationship(TraktGenre, secondary=show_genres_table)
_actors = relationship(TraktActor, secondary=show_actors_table)
updated_at = Column(DateTime)
cached_at = Column(DateTime)
[docs]
def to_dict(self):
return {
'id': self.id,
'title': self.title,
'year': self.year,
'slug': self.slug,
'tvdb_id': self.tvdb_id,
'imdb_id': self.imdb_id,
'tmdb_id': self.tmdb_id,
'tvrage_id': self.tvrage_id,
'overview': self.overview,
'first_aired': self.first_aired,
'air_day': self.air_day,
'air_time': self.air_time.strftime('%H:%M') if self.air_time else None,
'timezone': self.timezone,
'runtime': self.runtime,
'certification': self.certification,
'network': self.network,
'country': self.country,
'status': self.status,
'rating': self.rating,
'votes': self.votes,
'language': self.language,
'homepage': self.homepage,
'number_of_aired_episodes': self.aired_episodes,
'genres': [g.name for g in self.genres],
'updated_at': self.updated_at,
'cached_at': self.cached_at,
}
def __init__(self, trakt_show, session):
super().__init__()
self.update(trakt_show, session)
[docs]
def update(self, trakt_show, session):
"""Update this record from the trakt media object `trakt_show` returned by the trakt api."""
if self.id and self.id != trakt_show['ids']['trakt']:
raise ValueError('Tried to update db show with different show data')
if not self.id:
self.id = trakt_show['ids']['trakt']
self.slug = trakt_show['ids']['slug']
self.imdb_id = trakt_show['ids'].get('imdb')
self.tmdb_id = trakt_show['ids'].get('tmdb')
self.tvrage_id = trakt_show['ids'].get('tvrage')
self.tvdb_id = trakt_show['ids'].get('tvdb')
if trakt_show.get('airs'):
airs = trakt_show.get('airs')
self.air_day = airs.get('day')
self.timezone = airs.get('timezone')
if airs.get('time'):
# Time might be HH:MM, or HH:MM:SS #2783
self.air_time = dateutil_parse(airs['time'], ignoretz=True).time()
else:
self.air_time = None
if trakt_show.get('first_aired'):
self.first_aired = dateutil_parse(trakt_show.get('first_aired'), ignoretz=True)
else:
self.first_aired = None
self.updated_at = dateutil_parse(trakt_show.get('updated_at'), ignoretz=True)
for col in [
'overview',
'runtime',
'rating',
'votes',
'language',
'title',
'year',
'runtime',
'certification',
'network',
'country',
'status',
'aired_episodes',
'trailer',
'homepage',
]:
setattr(self, col, trakt_show.get(col))
# Sometimes genres and translations are None but we really do want a list, hence the "or []"
self.genres = [
TraktGenre(name=g.replace(' ', '-')) for g in trakt_show.get('genres') or []
]
self.cached_at = datetime.now()
self.translation_languages = trakt_show.get('available_translations') or []
[docs]
def get_episode(self, season, number, session, only_cached=False):
# TODO: Does series data being expired mean all episode data should be refreshed?
episode = (
self.episodes
.filter(TraktEpisode.season == season)
.filter(TraktEpisode.number == number)
.first()
)
if not episode or self.expired:
url = get_api_url(
'shows', self.id, 'seasons', season, 'episodes', number, '?extended=full'
)
if only_cached:
raise LookupError(f'Episode {season} {number} not found in cache')
logger.debug(
'Episode {} {} not found in cache, looking up from trakt.', season, number
)
try:
data = get_session().get(url).json()
except requests.RequestException:
raise LookupError(f'Error Retrieving Trakt url: {url}')
if not data:
raise LookupError(f'No data in response from trakt {url}')
episode = self.episodes.filter(TraktEpisode.id == data['ids']['trakt']).first()
if episode:
episode.update(data, session)
else:
episode = TraktEpisode(data, session)
self.episodes.append(episode)
session.commit()
return episode
[docs]
def get_season(self, number, session, only_cached=False):
# TODO: Does series data being expired mean all season data should be refreshed?
season = self.seasons.filter(TraktSeason.number == number).first()
if not season or self.expired:
url = get_api_url('shows', self.id, 'seasons', '?extended=full')
if only_cached:
raise LookupError(f'Season {number} not found in cache')
logger.debug('Season {} not found in cache, looking up from trakt.', number)
try:
ses = get_session()
data = ses.get(url).json()
except requests.RequestException:
raise LookupError(f'Error Retrieving Trakt url: {url}')
if not data:
raise LookupError(f'No data in response from trakt {url}')
# We fetch all seasons for the given show because we barely get any data otherwise
for season_result in data:
db_season = self.seasons.filter(
TraktSeason.id == season_result['ids']['trakt']
).first()
if db_season:
db_season.update(season_result, session)
else:
db_season = TraktSeason(season_result, session)
self.seasons.append(db_season)
if number == season_result['number']:
season = db_season
if not season:
raise LookupError(f'Season {number} not found for show {self.title}')
session.commit()
return season
@property
def expired(self):
""":return: True if show details are considered to be expired, ie. need of update"""
# TODO: stolen from imdb plugin, maybe there's a better way?
if self.cached_at is None:
logger.debug('cached_at is None: {}', self)
return True
refresh_interval = 2
# if show has been cancelled or ended, then it is unlikely to be updated often
if self.year and (self.status in ('ended', 'canceled')):
# Make sure age is not negative
age = max((datetime.now().year - self.year), 0)
refresh_interval += age * 5
logger.debug('show `{}` age {} expires in {} days', self.title, age, refresh_interval)
return self.cached_at < datetime.now() - timedelta(days=refresh_interval)
@property
def translations(self):
if not self._translations:
self._translations = get_translations(self.id, 'show')
return self._translations
@property
def actors(self):
if not self._actors:
self._actors[:] = get_db_actors(self.id, 'show')
return self._actors
def __repr__(self):
return f'<name={self.title}, id={self.id}>'
[docs]
class TraktMovie(Base):
__tablename__ = 'trakt_movies'
id = Column(Integer, primary_key=True, autoincrement=False)
title = Column(Unicode)
year = Column(Integer)
slug = Column(Unicode)
imdb_id = Column(Unicode)
tmdb_id = Column(Integer)
tagline = Column(Unicode)
overview = Column(Unicode)
released = Column(Date)
runtime = Column(Integer)
rating = Column(Integer)
votes = Column(Integer)
trailer = Column(Unicode)
homepage = Column(Unicode)
language = Column(Unicode)
updated_at = Column(DateTime)
cached_at = Column(DateTime)
_translations = relationship(TraktMovieTranslation, backref='movie')
_translation_languages = Column('translation_languages', Unicode)
translation_languages = json_synonym('_translation_languages')
genres = relationship(TraktGenre, secondary=movie_genres_table)
_actors = relationship(TraktActor, secondary=movie_actors_table)
def __init__(self, trakt_movie, session):
super().__init__()
self.update(trakt_movie, session)
[docs]
def to_dict(self):
return {
'id': self.id,
'title': self.title,
'year': self.year,
'slug': self.slug,
'imdb_id': self.imdb_id,
'tmdb_id': self.tmdb_id,
'tagline': self.tagline,
'overview': self.overview,
'released': self.released,
'runtime': self.runtime,
'rating': self.rating,
'votes': self.votes,
'language': self.language,
'homepage': self.homepage,
'trailer': self.trailer,
'genres': [g.name for g in self.genres],
'updated_at': self.updated_at,
'cached_at': self.cached_at,
}
[docs]
def update(self, trakt_movie, session):
"""Update this record from the trakt media object `trakt_movie` returned by the trakt api."""
if self.id and self.id != trakt_movie['ids']['trakt']:
raise ValueError('Tried to update db movie with different movie data')
if not self.id:
self.id = trakt_movie['ids']['trakt']
self.slug = trakt_movie['ids']['slug']
self.imdb_id = trakt_movie['ids'].get('imdb')
self.tmdb_id = trakt_movie['ids'].get('tmdb')
for col in [
'title',
'overview',
'runtime',
'rating',
'votes',
'language',
'tagline',
'year',
'trailer',
'homepage',
]:
setattr(self, col, trakt_movie.get(col))
if trakt_movie.get('released'):
self.released = dateutil_parse(trakt_movie.get('released'), ignoretz=True).date()
self.updated_at = dateutil_parse(trakt_movie.get('updated_at'), ignoretz=True)
self.genres = [TraktGenre(name=g.replace(' ', '-')) for g in trakt_movie.get('genres', [])]
self.cached_at = datetime.now()
self.translation_languages = trakt_movie.get('available_translations', [])
@property
def expired(self):
""":return: True if movie details are considered to be expired, ie. need of update"""
# TODO: stolen from imdb plugin, maybe there's a better way?
if self.updated_at is None:
logger.debug('updated_at is None: {}', self)
return True
refresh_interval = 2
if self.year:
# Make sure age is not negative
age = max((datetime.now().year - self.year), 0)
refresh_interval += age * 5
logger.debug('movie `{}` age {} expires in {} days', self.title, age, refresh_interval)
return self.cached_at < datetime.now() - timedelta(days=refresh_interval)
@property
def translations(self):
if not self._translations:
self._translations = get_translations(self.id, 'movie')
return self._translations
@property
def actors(self):
if not self._actors:
self._actors[:] = get_db_actors(self.id, 'movie')
return self._actors
[docs]
class TraktShowSearchResult(Base):
__tablename__ = 'trakt_show_search_results'
id = Column(Integer, primary_key=True)
search = Column(Unicode, unique=True, nullable=False)
series_id = Column(Integer, ForeignKey('trakt_shows.id'), nullable=True)
series = relationship(TraktShow, backref='search_strings')
def __init__(self, search, series_id=None, series=None):
self.search = search.lower()
if series_id:
self.series_id = series_id
if series:
self.series = series
[docs]
class TraktMovieSearchResult(Base):
__tablename__ = 'trakt_movie_search_results'
id = Column(Integer, primary_key=True)
search = Column(Unicode, unique=True, nullable=False)
movie_id = Column(Integer, ForeignKey('trakt_movies.id'), nullable=True)
movie = relationship(TraktMovie, backref='search_strings')
def __init__(self, search, movie_id=None, movie=None):
self.search = search.lower()
if movie_id:
self.movie_id = movie_id
if movie:
self.movie = movie
[docs]
class TraktMovieIds:
"""Simple class that holds a variety of possible IDs that Trakt utilize in their API, eg. imdb id, trakt id."""
def __init__(self, trakt_id=None, trakt_slug=None, tmdb_id=None, imdb_id=None, **kwargs):
self.trakt_id = trakt_id
self.trakt_slug = trakt_slug
self.tmdb_id = tmdb_id
self.imdb_id = imdb_id
[docs]
def get_trakt_id(self):
return self.trakt_id or self.trakt_slug
[docs]
def to_dict(self):
"""Return a dict containing id fields that are relevant for a movie."""
return {
'id': self.trakt_id,
'slug': self.trakt_slug,
'tmdb_id': self.tmdb_id,
'imdb_id': self.imdb_id,
}
def __bool__(self):
return any([self.trakt_id, self.trakt_slug, self.tmdb_id, self.imdb_id])
[docs]
class TraktShowIds:
"""Simple class that holds a variety of possible IDs that Trakt utilize in their API, eg. imdb id, trakt id."""
def __init__(
self,
trakt_id=None,
trakt_slug=None,
tmdb_id=None,
imdb_id=None,
tvdb_id=None,
tvrage_id=None,
**kwargs,
):
self.trakt_id = trakt_id
self.trakt_slug = trakt_slug
self.tmdb_id = tmdb_id
self.imdb_id = imdb_id
self.tvdb_id = tvdb_id
self.tvrage_id = tvrage_id
[docs]
def get_trakt_id(self):
return self.trakt_id or self.trakt_slug
[docs]
def to_dict(self):
"""Return a dict containing id fields that are relevant for a show/season/episode."""
return {
'id': self.trakt_id,
'slug': self.trakt_slug,
'tmdb_id': self.tmdb_id,
'imdb_id': self.imdb_id,
'tvdb_id': self.tvdb_id,
'tvrage_id': self.tvrage_id,
}
def __bool__(self):
return any([
self.trakt_id,
self.trakt_slug,
self.tmdb_id,
self.imdb_id,
self.tvdb_id,
self.tvrage_id,
])
[docs]
def get_item_from_cache(table, session, title=None, year=None, trakt_ids=None):
"""Get the cached info for a given show/movie from the database.
:param table: Either TraktMovie or TraktShow
:param title: Title of the show/movie
:param year: First release year
:param trakt_ids: instance of TraktShowIds or TraktMovieIds
:param session: database session object
:return: query result
"""
result = None
if trakt_ids:
result = (
session
.query(table)
.filter(
or_(getattr(table, col) == val for col, val in trakt_ids.to_dict().items() if val)
)
.first()
)
elif title:
if not year:
title, year = split_title_year(title)
query = session.query(table).filter(table.title == title)
if year:
query = query.filter(table.year == year)
result = query.first()
return result
[docs]
def get_trakt_id_from_id(trakt_ids, media_type):
if not trakt_ids:
raise LookupError('No lookup arguments provided.')
requests_session = get_session()
for id_type, identifier in trakt_ids.to_dict().items():
if not identifier:
continue
stripped_id_type = id_type.rstrip('_id') # need to remove _id for the api call
try:
logger.debug('Searching with params: {}={}', stripped_id_type, identifier)
results = requests_session.get(
get_api_url('search'), params={'id_type': stripped_id_type, 'id': identifier}
).json()
except requests.RequestException as e:
raise LookupError(
f'Searching trakt for {stripped_id_type}={identifier} failed with error: {e}'
)
for result in results:
if result['type'] != media_type:
continue
return result[media_type]['ids']['trakt']
return None
[docs]
def get_trakt_id_from_title(title, media_type, year=None):
if not title:
raise LookupError('No lookup arguments provided.')
requests_session = get_session()
# Try finding trakt id based on title and year
parsed_title, y = split_title_year(title)
y = year or y
try:
params = {'query': parsed_title, 'type': media_type, 'year': y}
logger.debug('Type of title: {}', type(parsed_title))
logger.debug('Searching with params: {}', ', '.join(f'{k}={v}' for k, v in params.items()))
results = requests_session.get(get_api_url('search'), params=params).json()
except requests.RequestException as e:
raise LookupError(f'Searching trakt for {title} failed with error: {e}')
for result in results:
if year and result[media_type]['year'] != year:
continue
if parsed_title.lower() == result[media_type]['title'].lower():
return result[media_type]['ids']['trakt']
# grab the first result if there is no exact match
if results:
return results[0][media_type]['ids']['trakt']
return None
[docs]
def get_trakt_data(media_type, title=None, year=None, trakt_ids=None):
trakt_id = None
if trakt_ids:
trakt_id = trakt_ids.get_trakt_id()
if not trakt_id and trakt_ids:
trakt_id = get_trakt_id_from_id(trakt_ids, media_type)
if not trakt_id and title:
trakt_id = get_trakt_id_from_title(title, media_type, year=year)
if not trakt_id:
raise LookupError(
f'No results on Trakt.tv, title={title}, ids={trakt_ids.to_dict if trakt_ids else None}.'
)
# Get actual data from trakt
try:
return (
get_session()
.get(get_api_url(media_type + 's', trakt_id), params={'extended': 'full'})
.json()
)
except requests.RequestException as e:
raise LookupError(f'Error getting trakt data for id {trakt_id}: {e}')
[docs]
def get_user_data(data_type, media_type, session, username):
"""Fetch user data from Trakt.tv on the /users/<username>/<data_type>/<media_type> end point.
Example: a user's movie collection is fetched from /users/<username>/collection/movies.
:param data_type: Name of the data type eg. collection, watched etc.
:param media_type: Type of media we want <data_type> for eg. shows, episodes, movies.
:param session: A trakt requests session with a valid token
:param username: Username of the user to fetch data
:return:
"""
endpoint = f'{data_type}/{media_type}'
try:
data = session.get(get_api_url('users', username, data_type, media_type)).json()
if not data:
logger.warning('No {} data returned from trakt endpoint {}.', data_type, endpoint)
return []
logger.verbose(
'Received {} records from trakt.tv for user {} from endpoint {}',
len(data),
username,
endpoint,
)
# extract show, episode and movie information
for item in data:
episode = item.pop('episode', {})
season = item.pop('season', {})
show = item.pop('show', {})
movie = item.pop('movie', {})
item.update(episode)
item.update(season)
item.update(movie)
# show is irrelevant if either episode or season is present
if not episode and not season:
item.update(show)
except requests.RequestException as e:
raise plugin.PluginError(
f'Error fetching data from trakt.tv endpoint {endpoint} for user {username}: {e}'
)
return data
[docs]
def get_username(username=None, account=None):
"""Return 'me' if account is provided and username is not."""
if not username and account:
return 'me'
return username