from datetime import datetime, timedelta, timezone
from loguru import logger
from sqlalchemy import Boolean, Column, DateTime, Float, Integer, Table, Text, Unicode
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.orm import relationship
from sqlalchemy.schema import ForeignKey
from flexget import db_schema
from flexget.utils import requests
from flexget.utils.database import Session, json_synonym, text_date_synonym, with_session
from flexget.utils.simple_persistence import SimplePersistence
from flexget.utils.tools import chunked, split_title_year
logger = logger.bind(name='api_tvdb')
Base = db_schema.versioned_base('api_tvdb', 7)
persist = SimplePersistence('api_tvdb')
SEARCH_RESULT_EXPIRATION_DAYS = 3
[docs]
class TVDBRequest:
# This is a FlexGet API key
API_KEY = '4D297D8CFDE0E105'
BASE_URL = 'https://api.thetvdb.com/'
BANNER_URL = 'http://thetvdb.com/banners/'
def __init__(self, username=None, account_id=None, api_key=None):
self.username = username
self.account_id = account_id
self.api_key = api_key
self.auth_key = self.username.lower() if self.username else 'default'
[docs]
def get_auth_token(self, refresh=False):
with Session() as session:
auth_token = session.query(TVDBTokens).filter(TVDBTokens.name == self.auth_key).first()
if not auth_token:
auth_token = TVDBTokens()
auth_token.name = self.auth_key
if refresh or auth_token.has_expired():
data = {'apikey': TVDBRequest.API_KEY}
if self.username:
data['username'] = self.username
if self.account_id:
data['userkey'] = self.account_id
if self.api_key:
data['apikey'] = self.api_key
logger.debug(
'Authenticating to TheTVDB with {}',
self.username or 'api_key',
)
auth_token.token = (
requests.post(TVDBRequest.BASE_URL + 'login', json=data).json().get('token')
)
auth_token.refreshed = datetime.now()
auth_token = session.merge(auth_token)
return auth_token.token
[docs]
def _request(self, method, endpoint, **params):
url = TVDBRequest.BASE_URL + endpoint
language = params.pop('language', 'en')
headers = {'Authorization': f'Bearer {self.get_auth_token()}', 'Accept-Language': language}
data = params.pop('data', None)
result = requests.request(
method, url, params=params, headers=headers, raise_status=False, json=data
)
if result.status_code == 401:
logger.debug('Auth token expired, refreshing')
headers['Authorization'] = f'Bearer {self.get_auth_token(refresh=True)}'
result = requests.request(
method, url, params=params, headers=headers, raise_status=False, json=data
)
result.raise_for_status()
result = result.json()
if result.get('errors'):
logger.debug('Result contains errors: {}', result['errors'])
# a hack to make sure it doesn't raise exception on a simple invalidLanguage. This is because tvdb
# has a tendency to contain bad data and randomly return this error for no reason
if len(result['errors']) > 1 or 'invalidLanguage' not in result['errors']:
raise LookupError(f'Error processing request on tvdb: {result["errors"]}')
return result
[docs]
def get(self, endpoint, **params):
result = self._request('get', endpoint, **params)
return result.get('data')
[docs]
def post(self, endpoint, **params):
result = self._request('post', endpoint, **params)
return result.get('data')
[docs]
def put(self, endpoint, **params):
result = self._request('put', endpoint, **params)
return result.get('data')
[docs]
def delete(self, endpoint, **params):
return self._request('delete', endpoint, **params)
@db_schema.upgrade('api_tvdb')
def upgrade(ver, session):
if ver is None or ver <= 6:
raise db_schema.UpgradeImpossible
return ver
# association tables
genres_table = Table(
'tvdb_series_genres',
Base.metadata,
Column('series_id', Integer, ForeignKey('tvdb_series.id')),
Column('genre_id', Integer, ForeignKey('tvdb_genres.id')),
)
Base.register_table(genres_table)
[docs]
class TVDBTokens(Base):
__tablename__ = 'tvdb_tokens'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(Unicode)
token = Column(Text)
refreshed = Column(DateTime)
[docs]
def has_expired(self):
if not self.token or not self.refreshed:
return True
seconds = (datetime.now() - self.refreshed).total_seconds()
return seconds >= 86400
[docs]
class TVDBSeries(Base):
__tablename__ = 'tvdb_series'
id = Column(Integer, primary_key=True, autoincrement=False)
last_updated = Column(Integer)
expired = Column(Boolean)
name = Column(Unicode)
language = Column(Unicode)
rating = Column(Float)
status = Column(Unicode)
runtime = Column(Integer)
airs_time = Column(Unicode)
airs_dayofweek = Column(Unicode)
content_rating = Column(Unicode)
network = Column(Unicode)
overview = Column(Unicode)
imdb_id = Column(Unicode)
zap2it_id = Column(Unicode)
_banner = Column('banner', Unicode)
_first_aired = Column('first_aired', DateTime)
first_aired = text_date_synonym('_first_aired')
_aliases = Column('aliases', Unicode)
aliases = json_synonym('_aliases')
_actors = Column('actors', Unicode)
actors_list = json_synonym('_actors')
_posters = Column('posters', Unicode)
posters_list = json_synonym('_posters')
_genres = relationship('TVDBGenre', secondary=genres_table)
genres = association_proxy('_genres', 'name')
episodes = relationship('TVDBEpisode', backref='series', cascade='all, delete, delete-orphan')
def __init__(self, tvdb_id, language):
"""Look up movie on tvdb and create a new database model for it.
These instances should only be added to a session via `session.merge`.
"""
self.id = tvdb_id
try:
series = TVDBRequest().get(f'series/{self.id}', language=language)
except requests.RequestException as e:
raise LookupError(f'Error updating data from tvdb: {e}')
self.language = language or 'en'
self.last_updated = series['lastUpdated']
self.name = series['seriesName']
self.rating = float(series['siteRating']) if series['siteRating'] else 0.0
self.status = series['status']
self.runtime = int(series['runtime']) if series['runtime'] else 0
self.airs_time = series['airsTime']
self.airs_dayofweek = series['airsDayOfWeek']
self.content_rating = series['rating']
self.network = series['network']
self.overview = series['overview']
self.imdb_id = series['imdbId']
self.zap2it_id = series['zap2itId']
self.first_aired = series['firstAired']
self.expired = False
self.aliases = series['aliases']
self._banner = series['banner']
self._genres = [TVDBGenre(id=name) for name in series['genre']] if series['genre'] else []
if not self.name:
raise LookupError(
f"Not possible to get name to series with id {self.id} in language '{self.language}'"
)
if self.first_aired is None:
logger.debug(
'Falling back to getting first episode aired date for series {}', self.name
)
try:
episode = TVDBRequest().get(
f'series/{self.id}/episodes/query?airedSeason=1&airedEpisode=1',
language=language,
)
self.first_aired = episode[0]['firstAired']
except requests.RequestException:
logger.error('Failed to get first episode for series {}', self.name)
# Actors and Posters are lazy populated
self._actors = None
self._posters = None
def __repr__(self):
return f'<TVDBSeries name={self.name},tvdb_id={self.id}>'
@property
def banner(self):
if self._banner:
return TVDBRequest.BANNER_URL + self._banner
return None
@property
def actors(self):
return self.get_actors()
@property
def posters(self):
return self.get_posters()
[docs]
def get_actors(self):
if not self._actors:
logger.debug('Looking up actors for series {}', self.name)
try:
actors_query = TVDBRequest().get(f'series/{self.id}/actors')
self.actors_list = [a['name'] for a in actors_query] if actors_query else []
except requests.RequestException as e:
if None is not e.response and e.response.status_code == 404:
self.actors_list = []
else:
raise LookupError(f'Error updating actors from tvdb: {e}')
return self.actors_list
[docs]
def get_posters(self):
if not self._posters:
logger.debug('Getting top 5 posters for series {}', self.name)
try:
poster_main = TVDBRequest().get(f'series/{self.id}').get('poster')
poster_query = TVDBRequest().get(
f'series/{self.id}/images/query', keyType='poster'
)
self.posters_list = [poster_main] if poster_main else []
self.posters_list += (
[p['fileName'] for p in poster_query[:5]] if poster_query else []
)
except requests.RequestException as e:
if None is not e.response and e.response.status_code == 404:
self.posters_list = []
else:
raise LookupError(f'Error updating posters from tvdb: {e}')
return [TVDBRequest.BANNER_URL + p for p in self.posters_list[:5]]
[docs]
def to_dict(self):
return {
'aliases': list(self.aliases),
'tvdb_id': self.id,
'last_updated': datetime.fromtimestamp(self.last_updated).strftime(
'%Y-%m-%d %H:%M:%S'
),
'expired': self.expired,
'series_name': self.name,
'language': self.language,
'rating': self.rating,
'status': self.status,
'runtime': self.runtime,
'airs_time': self.airs_time,
'airs_dayofweek': self.airs_dayofweek,
'content_rating': self.content_rating,
'network': self.network,
'overview': self.overview,
'imdb_id': self.imdb_id,
'zap2it_id': self.zap2it_id,
'banner': self.banner,
'posters': self.posters,
'genres': list(self.genres),
'first_aired': self.first_aired,
}
[docs]
class TVDBGenre(Base):
__tablename__ = 'tvdb_genres'
id = Column(Unicode, primary_key=True)
@property
def name(self):
return self.id
[docs]
class TVDBEpisode(Base):
__tablename__ = 'tvdb_episodes'
id = Column(Integer, primary_key=True, autoincrement=False)
expired = Column(Boolean)
last_updated = Column(Integer)
season_number = Column(Integer)
episode_number = Column(Integer)
absolute_number = Column(Integer)
name = Column(Unicode)
overview = Column(Unicode)
rating = Column(Float)
director = Column(Unicode)
_image = Column(Unicode)
_first_aired = Column('firstaired', DateTime)
first_aired = text_date_synonym('_first_aired')
series_id = Column(Integer, ForeignKey('tvdb_series.id'), nullable=False)
def __init__(self, series_id, ep_id, language=None):
"""Look up movie on tvdb and create a new database model for it.
These instances should only be added to a session via `session.merge`.
"""
self.series_id = series_id
self.id = ep_id
self.expired = False
try:
episode = TVDBRequest().get(f'episodes/{self.id}', language=language)
except requests.RequestException as e:
raise LookupError(f'Error updating data from tvdb: {e}')
self.id = episode['id']
self.last_updated = episode['lastUpdated']
self.season_number = episode['airedSeason']
self.episode_number = episode['airedEpisodeNumber']
self.absolute_number = episode['absoluteNumber']
self.name = episode['episodeName']
self.overview = episode['overview']
self.director = ', '.join(episode['directors'])
self._image = episode['filename']
self.rating = episode['siteRating']
self.first_aired = episode['firstAired']
def __repr__(self):
return (
f'<TVDBEpisode series={self.series.name},season={self.season_number},'
f'episode={self.episode_number}>'
)
[docs]
def to_dict(self):
return {
'id': self.id,
'expired': self.expired,
'last_update': self.last_updated,
'season_number': self.season_number,
'episode_number': self.episode_number,
'absolute_number': self.absolute_number,
'episode_name': self.name,
'overview': self.overview,
'director': self.director,
'rating': self.rating,
'image': self.image,
'first_aired': self.first_aired,
'series_id': self.series_id,
}
@property
def image(self):
if self._image:
return TVDBRequest.BANNER_URL + self._image
return None
[docs]
class TVDBSearchResult(Base):
__tablename__ = 'tvdb_search_results'
id = Column(Integer, primary_key=True)
search = Column(Unicode, nullable=False, unique=True)
series_id = Column(Integer, ForeignKey('tvdb_series.id'), nullable=True)
series = relationship(TVDBSeries, backref='search_strings')
def __init__(self, search, series_id=None, series=None):
self.search = search.lower()
if series_id:
self.series_id = series_id
if series:
self.series = series
[docs]
class TVDBSeriesSearchResult(Base):
"""Table that holds a single result that results from the /search/series endpoint, which return a series with a minimal set of parameters."""
__tablename__ = 'tvdb_series_search_results'
id = Column(Integer, primary_key=True, autoincrement=False)
lookup_term = Column(Unicode)
name = Column(Unicode)
status = Column(Unicode)
network = Column(Unicode)
overview = Column(Unicode)
_banner = Column('banner', Unicode)
_first_aired = Column('first_aired', DateTime)
first_aired = text_date_synonym('_first_aired')
_aliases = Column('aliases', Unicode)
aliases = json_synonym('_aliases')
created_at = Column(DateTime)
search_name = Column(Unicode)
def __init__(self, series, lookup_term=None):
self.lookup_term = lookup_term
self.id = series['id']
self.name = series['seriesName']
self.first_aired = series['firstAired']
self.network = series['network']
self.overview = series['overview']
self.status = series['status']
self._banner = series['banner']
self.aliases = series['aliases']
self.created_at = datetime.now()
@property
def banner(self):
if self._banner:
return TVDBRequest.BANNER_URL + self._banner
return None
[docs]
def to_dict(self):
return {
'aliases': list(self.aliases),
'banner': self.banner,
'first_aired': self.first_aired,
'tvdb_id': self.id,
'network': self.network,
'overview': self.overview,
'series_name': self.name,
'status': self.status,
}
@property
def expired(self):
logger.debug('checking series {} for expiration', self.name)
if datetime.now() - self.created_at >= timedelta(days=SEARCH_RESULT_EXPIRATION_DAYS):
logger.debug('series {} is expires, should re-fetch', self.name)
return True
logger.debug('series {} is not expired', self.name)
return False
[docs]
def find_series_id(name, language=None):
"""Look up the tvdb id for a series."""
try:
series = TVDBRequest().get('search/series', name=name, language=language)
except requests.RequestException as e:
raise LookupError(f'Unable to get search results for {name}: {e}')
name = name.lower()
if not series:
raise LookupError(f'No results found for {name}')
# Cleanup results for sorting
for s in series:
if s['firstAired']:
try:
s['firstAired'] = datetime.strptime(s['firstAired'], '%Y-%m-%d')
except ValueError:
logger.warning(
'Invalid firstAired date "{}" when parsing series {} ',
s['firstAired'],
s['seriesName'],
)
s['firstAired'] = datetime(1970, 1, 1)
else:
s['firstAired'] = datetime(1970, 1, 1)
s['names'] = [a.lower() for a in s.get('aliases')] if s.get('aliases') else []
if s.get('seriesName'):
s['names'].append(s.get('seriesName').lower())
s['running'] = s['status'] == 'Continuing'
for n in s['names']:
# Exact matching by stripping our the year
title = split_title_year(n)[0]
if title not in s['names']:
s['names'].append(title)
# Sort by status, aired_date
series = sorted(series, key=lambda x: (x['running'], x['firstAired']), reverse=True)
for s in series:
# Exact match
if name in s['names']:
return s['id']
# If there is no exact match, pick the first result
return series[0]['id']
[docs]
def _update_search_strings(series, session, search=None):
search_strings = series.search_strings
aliases = [a.lower() for a in series.aliases] if series.aliases else []
searches = [search.lower()] if search else []
add = [series.name.lower(), *aliases, *searches]
for name in set(add):
if name not in search_strings:
search_result = (
session.query(TVDBSearchResult).filter(TVDBSearchResult.search == name).first()
)
if not search_result:
search_result = TVDBSearchResult(search=name)
search_result.series_id = series.id
session.add(search_result)
@with_session
def lookup_series(name=None, tvdb_id=None, only_cached=False, session=None, language=None):
"""Look up information on a series. Will be returned from cache if available, and looked up online and cached if not.
Either `name` or `tvdb_id` parameter are needed to specify the series.
:param unicode name: Name of series.
:param int tvdb_id: TVDb ID of series.
:param bool only_cached: If True, will not cause an online lookup. LookupError will be raised if not available
in the cache.
:param session: An sqlalchemy session to be used to lookup and store to cache. Commit(s) may occur when passing in
a session. If one is not supplied it will be created.
:param language: Language abbreviation string to be sent to API
:return: Instance of :class:`TVDBSeries` populated with series information. If session was not supplied, this will
be a detached from the database, so relationships cannot be loaded.
:raises: :class:`LookupError` if series cannot be looked up.
"""
if not (name or tvdb_id):
raise LookupError('No criteria specified for tvdb lookup')
logger.debug("Looking up tvdb information for '{}'. TVDB ID: {}", name, tvdb_id)
series = None
def id_str():
return f'<name={name},tvdb_id={tvdb_id}>'
if tvdb_id:
series = session.query(TVDBSeries).filter(TVDBSeries.id == tvdb_id).first()
if not series and name:
found = (
session.query(TVDBSearchResult).filter(TVDBSearchResult.search == name.lower()).first()
)
if found and found.series:
series = found.series
if series:
# Series found in cache, update if cache has expired.
if not only_cached:
mark_expired(session)
if not only_cached and series.expired:
logger.verbose('Data for {} has expired, refreshing from tvdb', series.name)
try:
updated_series = TVDBSeries(series.id, language)
series = session.merge(updated_series)
if series and series.name:
_update_search_strings(series, session, search=name)
except LookupError as e:
logger.warning(
'Error while updating from tvdb ({}), using cached data.', e.args[0]
)
else:
logger.debug('Series {} information restored from cache.', id_str())
else:
if only_cached:
raise LookupError(f'Series {id_str()} not found from cache')
# There was no series found in the cache, do a lookup from tvdb
logger.debug('Series {} not found in cache, looking up from tvdb.', id_str())
if tvdb_id:
series = session.merge(TVDBSeries(tvdb_id, language))
elif name:
tvdb_id = find_series_id(name, language=language)
if tvdb_id:
series = session.query(TVDBSeries).filter(TVDBSeries.id == tvdb_id).first()
if not series:
series = session.merge(TVDBSeries(tvdb_id, language))
if series and series.name:
_update_search_strings(series, session, search=name)
if not series:
raise LookupError(f'No results found from tvdb for {id_str()}')
if not series.name:
raise LookupError('Tvdb result for series does not have a title.')
return series
@with_session
def lookup_episode(
name=None,
season_number=None,
episode_number=None,
absolute_number=None,
tvdb_id=None,
first_aired=None,
only_cached=False,
session=None,
language=None,
):
"""Look up information on an episode. Will be returned from cache if available, and looked up online and cached if not.
Either `name` or `tvdb_id` parameter are needed to specify the series.
Either `season_number` and `episode_number`, `absolute_number`, or `first_aired` are required to specify episode
number.
:param unicode name: Name of series episode belongs to.
:param int tvdb_id: TVDb ID of series episode belongs to.
:param int season_number: Season number of episode.
:param int episode_number: Episode number of episode.
:param int absolute_number: Absolute number of episode.
:param date first_aired: Air date of episode. DateTime object.
:param bool only_cached: If True, will not cause an online lookup. LookupError will be raised if not available
in the cache.
:param session: An sqlalchemy session to be used to lookup and store to cache. Commit(s) may occur when passing in
a session. If one is not supplied it will be created, however if you need to access relationships you should
pass one in.
:param language: Language abbreviation string to be sent to API
:return: Instance of :class:`TVDBEpisode` populated with series information.
:raises: :class:`LookupError` if episode cannot be looked up.
"""
# First make sure we have the series data
series = lookup_series(name=name, tvdb_id=tvdb_id, only_cached=only_cached, session=session)
if not series:
raise LookupError(f'Series {name} ({tvdb_id}) not found from')
ep_description = series.name
query_params = {}
episode = session.query(TVDBEpisode).filter(TVDBEpisode.series_id == series.id)
if absolute_number is not None:
episode = episode.filter(TVDBEpisode.absolute_number == absolute_number)
query_params['absoluteNumber'] = absolute_number
ep_description = f'{ep_description} absNo: {absolute_number}'
if season_number is not None:
episode = episode.filter(TVDBEpisode.season_number == season_number)
query_params['airedSeason'] = season_number
ep_description = f'{ep_description} s{season_number}'
if episode_number is not None:
episode = episode.filter(TVDBEpisode.episode_number == episode_number)
query_params['airedEpisode'] = episode_number
ep_description = f'{ep_description} e{episode_number}'
if first_aired is not None:
episode = episode.filter(TVDBEpisode.first_aired == first_aired)
query_params['firstAired'] = datetime.strftime(first_aired, '%Y-%m-%d')
ep_description = f'{ep_description} e{first_aired}'
episode = episode.first()
if episode:
if episode.expired and not only_cached:
logger.info('Data for {!r} has expired, refreshing from tvdb', episode)
try:
updated_episode = TVDBEpisode(series.id, episode.id)
episode = session.merge(updated_episode)
except LookupError as e:
logger.warning('Error while updating from tvdb ({}), using cached data.', str(e))
else:
logger.debug('Using episode info for {} from cache.', ep_description)
else:
if only_cached:
raise LookupError(f'Episode {ep_description} not found from cache')
# There was no episode found in the cache, do a lookup from tvdb
logger.debug('Episode {} not found in cache, looking up from tvdb.', ep_description)
try:
results = TVDBRequest().get(
f'series/{series.id}/episodes/query', language=language, **query_params
)
if results:
# Check if this episode id is already in our db
episode = (
session.query(TVDBEpisode).filter(TVDBEpisode.id == results[0]['id']).first()
)
if not episode or (episode and episode.expired is not False):
updated_episode = TVDBEpisode(series.id, results[0]['id'], language=language)
episode = session.merge(updated_episode)
except requests.RequestException as e:
raise LookupError(f'Error looking up episode from TVDb ({e})')
if episode:
return episode
raise LookupError(f'No results found for {ep_description}')
@with_session
def search_for_series(
search_name=None, imdb_id=None, zap2it_id=None, force_search=None, session=None, language=None
):
"""Search IMDB using a an identifier, return a list of cached search results.
One of `search_name`, `imdb_id` or `zap2it_id` is required.
:param search_name: Name of search to use
:param imdb_id: Search via IMDB ID
:param zap2it_id: Search via zap2it ID
:param force_search: Should search use cache
:param session: Current DB session
:return: A List of TVDBSeriesSearchResult objects
"""
if imdb_id:
lookup_term = imdb_id
lookup_url = f'search/series?imdbId={imdb_id}'
elif zap2it_id:
lookup_term = zap2it_id
lookup_url = f'search/series?zap2itId={zap2it_id}'
elif search_name:
lookup_term = search_name
lookup_url = f'search/series?name={search_name}'
else:
raise LookupError('not enough parameters for lookup')
series_search_results = []
if not force_search:
logger.debug('trying to fetch TVDB search results from DB')
series_search_results = (
session
.query(TVDBSeriesSearchResult)
.filter(TVDBSeriesSearchResult.lookup_term == lookup_term)
.all()
)
if not series_search_results or any(series.expired for series in series_search_results):
try:
logger.debug('trying to fetch TVDB search results from TVDB')
fetched_results = TVDBRequest().get(lookup_url, language=language)
except requests.RequestException as e:
raise LookupError(f'Error searching series from TVDb ({e})')
series_search_results = [
session.merge(TVDBSeriesSearchResult(series, lookup_term))
for series in fetched_results
]
if series_search_results:
return series_search_results
raise LookupError('No results found for series lookup')
[docs]
def mark_expired(session):
"""Mark series and episodes that have expired since we cached them."""
# Only get the expired list every hour
last_check = persist.get('last_check')
utcnow = datetime.now(timezone.utc).replace(tzinfo=None)
if not last_check:
persist['last_check'] = utcnow
return
if utcnow - last_check <= timedelta(hours=2):
# It has been less than 2 hour, don't check again
return
new_last_check = utcnow
try:
# Calculate seconds since epoch minus a minute for buffer
last_check_epoch = int((last_check - datetime(1970, 1, 1)).total_seconds()) - 60
logger.debug('Getting updates from thetvdb ({})', last_check_epoch)
updates = TVDBRequest().get('updated/query', fromTime=last_check_epoch)
except requests.RequestException as e:
logger.error('Could not get update information from tvdb: {}', e)
return
expired_series = [series['id'] for series in updates] if updates else []
# Update our cache to mark the items that have expired
for chunk in chunked(expired_series):
series_updated = (
session
.query(TVDBSeries)
.filter(TVDBSeries.id.in_(chunk))
.update({'expired': True}, 'fetch')
)
episodes_updated = (
session
.query(TVDBEpisode)
.filter(TVDBEpisode.series_id.in_(chunk))
.update({'expired': True}, 'fetch')
)
logger.debug(
'{} series and {} episodes marked as expired', series_updated, episodes_updated
)
persist['last_check'] = new_last_check