from __future__ import annotations
import copy
import functools
import re
from abc import ABC, abstractmethod, abstractstaticmethod
from collections.abc import MutableSet
from datetime import datetime
from urllib.parse import urlencode
import pendulum
from loguru import logger
from requests.exceptions import HTTPError, RequestException
from flexget.components.emby.emby_util import get_field_map
from flexget.entry import Entry
from flexget.plugin import PluginError
from flexget.utils import requests
from flexget.utils.simple_persistence import SimplePersistence
from flexget.utils.tools import get_current_flexget_version, split_title_year, str_to_int
persist = SimplePersistence('api_emby')
LOGIN_API = 'api'
LOGIN_USER = 'user'
LOGIN_CONNECT = 'connect'
EMBY_CONNECT = 'https://connect.emby.media'
EMBY_ENDPOINT_CONNECT_LOGIN = '/service/user/authenticate'
EMBY_ENDPOINT_CONNECT_SERVERS = '/service/servers'
EMBY_ENDPOINT_CONNECT_EXCHANGE = '/Connect/Exchange'
EMBY_ENDPOINT_LOGIN = '/emby/Users/AuthenticateByName'
EMBY_ENDPOINT_SEARCH = '/emby/Users/{userid}/Items'
EMBY_ENDPOINT_PHOTOS = '/emby/Items/{itemid}/Images/Primary'
EMBY_ENDPOINT_DOWNLOAD = '/emby/Items/{itemid}/Download'
EMBY_ENDPOINT_GETUSERS = '/emby/Users'
EMBY_ENDPOINT_USERINFO = '/emby/Users/{userid}'
EMBY_ENDPOINT_SERVERINFO = '/emby/System/Info'
EMBY_ENDPOINT_PARENTS = '/emby/Items/{itemid}/Ancestors'
EMBY_ENDPOINT_LIBRARY = '/emby/Library/MediaFolders'
EMBY_ENDPOINT_FAVORITE = '/emby/Users/{userid}/FavoriteItems/{itemid}'
EMBY_ENDPOINT_ITEMUPD = '/emby/Items/{itemid}'
EMBY_ENDPOINT_WATCHED = '/emby/Users/{userid}/PlayedItems/{itemid}'
EMBY_ENDPOINT_NEW_PLAYLIST = '/emby/Playlists'
EMBY_ENDPOINT_PLAYLIST = '/emby/Playlists/{listid}/Items'
EMBY_ENDPOINT_DELETE_ITEM = '/emby/Items/{itemid}'
EMBY_ENDPOINT_LIBRARY_REFRESH = '/emby/Library/Refresh'
logger = logger.bind(name='api_emby')
[docs]
class EmbyApiBase(ABC): # noqa: B024 The correct fix for it requires a thorough understanding of the code.
"""Base Class to all API integratios."""
EMBY_PREF = 'emby_'
[docs]
@staticmethod
def merge_field_map(dst: dict, *arg: dict, **kwargs):
"""Merge field maps from clild and parent class."""
allow_new = kwargs.get('allow_new', False)
destination = copy.deepcopy(dst)
for src in arg:
source = copy.deepcopy(src)
for key in source:
if key in destination and isinstance(destination[key], str):
destination[key] = [destination[key]]
elif key not in destination and not allow_new:
continue
elif key not in destination and allow_new:
destination[key] = []
if isinstance(source[key], str):
source[key] = [source[key]]
if source[key][0] and source[key][0].find(EmbyApiBase.EMBY_PREF) < 0:
source[key].insert(0, f'{EmbyApiBase.EMBY_PREF}{source[key][0]}')
for value_source in source[key]:
if value_source not in destination[key]:
destination[key].append(value_source)
return destination
[docs]
@staticmethod
def update_using_map(target, field_map: dict, source_item, **kwargs):
"""Update based on field map with source."""
allow_new = kwargs.get('allow_new', False)
my_field_map = field_map.copy()
func_get = dict.get if isinstance(source_item, dict) else getattr
for field, val in my_field_map.items():
values = val
if not isinstance(values, list):
values = [val]
if values[0] and values[0].find(EmbyApiBase.EMBY_PREF) < 0:
values.insert(0, f'{EmbyApiBase.EMBY_PREF}{values[0]}')
for value in values:
if isinstance(value, str):
try:
val = functools.reduce(func_get, value.split('.'), source_item)
except TypeError:
continue
else:
val = value(source_item)
if val is None:
continue
if not hasattr(target, field) and not allow_new:
continue
if isinstance(target, dict):
target[field] = val
else:
setattr(target, field, val)
break
[docs]
class EmbyAuth(EmbyApiBase):
"""Manage API Authorizations."""
_last_auth = None
field_map = {
'host': 'host',
'return_host': 'return_host',
'_apikey': 'apikey',
'_username': 'username',
'_password': 'password',
}
EMBY_DEF_HOST = 'http://localhost:8096'
EMBY_CLIENT = 'Flexget'
EMBY_DEVICE = 'Flexget Plugin'
EMBY_DEVICE_ID = 'flexget_plugin'
EMBY_VERSION = get_current_flexget_version()
host = EMBY_DEF_HOST
return_host = None
_userid = ''
_token = ''
_host_name = ''
_connect_token = ''
_connect_token_link = ''
_connect_username = ''
_serverid = ''
_logged = False
_username = None
_password = None
_apikey = None
_wanurl = None
_lanurl = None
_can_download = None
_login_type = None
def __init__(self, **kwargs):
server = kwargs.get('server') if 'server' in kwargs else kwargs
EmbyApiBase.update_using_map(self, EmbyAuth.field_map, server)
[docs]
def is_connect_server(self) -> bool:
"""Check if it's a connect server, if it's a url assumed not a emby connect.
Returns:
bool: Is emby connect server
"""
regexp = (
'('
'http|https'
r'):\/\/(\w+:{0,1}\w*@)?(\S+)(:[0-9]+)?(\/|\/([\w#!:.?+=&%@!\-\/]))?'
)
return not re.match(regexp, self.host)
[docs]
def login(self, optional=False):
"""Login user to API."""
userdata = None
if not self._apikey:
if self.is_connect_server():
# Make Emby connect login
self._login_type = LOGIN_CONNECT
userdata = self.check_token_data(persist.get('token_data'), LOGIN_CONNECT)
if not userdata:
logger.debug(
'Login to Emby Connect with username `{}` host `{}`',
self.username,
self.host,
)
self._connect_username = self._username
# Login to emby connect
args = {'nameOrEmail': self._username, 'rawpw': self._password}
connect_data = EmbyApi.request_emby(
EMBY_ENDPOINT_CONNECT_LOGIN, self, 'POST', emby_connect=True, **args
)
if (
not connect_data
or 'AccessToken' not in connect_data
or 'User' not in connect_data
or 'Id' not in connect_data['User']
):
raise PluginError(
f'Could not login to Emby Connect account `{self._connect_username}`'
)
self._connect_token = connect_data['AccessToken']
# Retrieve emby connect servers
args = {'userId': connect_data['User']['Id']}
connect_servers = EmbyApi.request_emby(
EMBY_ENDPOINT_CONNECT_SERVERS, self, 'GET', emby_connect=True, **args
)
if not isinstance(connect_servers, list):
raise PluginError(
f'Could not login to Emby Connect account `{self._connect_username}`, no server list'
)
for server in connect_servers:
if 'Name' not in server:
raise PluginError(
f'Could not login to Emby Connect account `{self._connect_username}`, no server list'
)
if (
server['Name'].lower() == self.host.lower()
or server['Name'].lower() == self.host_name.lower()
):
connect_server = server
break
else:
raise PluginError(
f'No server with name `{self.host}`` on `{self._connect_username}` account'
)
if 'AccessKey' not in connect_server or 'Url' not in connect_server:
raise PluginError(
f'Could not login to Emby Connect account `{self._connect_username}`, no server list'
)
self._connect_token_link = connect_server['AccessKey']
self.host = connect_server['Url']
args = {'format': 'json', 'ConnectUserId': connect_data['User']['Id']}
connect_exchange = EmbyApi.request_emby(
EMBY_ENDPOINT_CONNECT_EXCHANGE, self, 'GET', **args
)
if (
'LocalUserId' not in connect_exchange
or 'AccessToken' not in connect_exchange
):
raise PluginError(
f'Could not login with Emby Connect to server `{self.host}`'
)
self._userid = connect_exchange['LocalUserId']
self._token = connect_exchange['AccessToken']
self._logged = True
userdata = EmbyApi.request_emby(EMBY_ENDPOINT_USERINFO, self, 'GET', {})
else:
# Make Local user login
self._login_type = LOGIN_USER
userdata = self.check_token_data(persist.get('token_data'), LOGIN_USER)
if not userdata:
logger.debug(
'Login to {} with username {} and password', self.host, self.username
)
args = {'Username': self._username, 'Pw': self._password}
login_data = EmbyApi.request_emby(EMBY_ENDPOINT_LOGIN, self, 'POST', **args)
if not login_data and optional:
return
if not login_data:
self.logout()
raise PluginError('Could not login to Emby')
userdata = login_data.get('User')
self._token = login_data.get('AccessToken')
else:
allow_retry = True
else:
logger.debug('Login to {} with username {} and apikey', self.host, self.username)
userdata = self.get_user_by_name(self._username)
self._login_type = LOGIN_API
if not userdata and optional:
return
if not userdata:
self.logout()
raise PluginError('Could not login to Emby')
self._username = userdata.get('Name')
self._userid = userdata.get('Id')
self._serverid = userdata.get('ServerId')
if 'Policy' in userdata and not self._apikey:
self._can_download = userdata['Policy'].get('EnableContentDownloading', False)
elif self._apikey:
self._can_download = True
self._logged = True
serverinfo = EmbyApi.request_emby(EMBY_ENDPOINT_SERVERINFO, self, 'GET')
if serverinfo:
self._wanurl = serverinfo.get('WanAddress')
self._lanurl = serverinfo.get('LocalAddress')
elif allow_retry:
logger.debug('Try to clean token data to login again')
self.logout()
self.login()
return
else:
self.logout()
raise PluginError('Could not login to Emby')
self.save_token_data()
EmbyAuth._last_auth = self
[docs]
def logout(self):
"""Logout user from API."""
self._token = None
self._logged = False
self._connect_username = ''
if self.host_name:
self.host = self.host_name
self._host_name = ''
if 'token_data' in persist:
persist['token_data']['token'] = None
[docs]
def check_token_data(self, token_data, login_type):
"""Check saved tokens."""
if not token_data:
return False
if login_type == LOGIN_CONNECT:
self._host_name = self.host
connect_username = self._connect_username or self._username
if (
'token' not in token_data
or 'userid' not in token_data
or token_data.get('connect_username').lower() != connect_username.lower()
or login_type != token_data.get('login_type')
):
self.logout()
return False
elif (
'token' not in token_data
or 'userid' not in token_data
or token_data.get('username') != self._username
or token_data.get('host') != self.host
or login_type != token_data.get('login_type')
):
self.logout()
return False
self._userid = token_data.get('userid')
self._token = token_data.get('token')
self._login_type = token_data.get('login_type')
self._connect_username = token_data.get('connect_username', '')
self.host = token_data.get('host', '')
self._logged = True
endpoint = EMBY_ENDPOINT_USERINFO.format(userid=token_data['userid'])
try:
response = EmbyApi.request_emby(endpoint, self, 'GET')
except PluginError:
response = None
if not response:
self.logout()
return False
if self._userid != response.get('Id'):
self.logout()
return False
response['AccessToken'] = token_data['token']
logger.debug('Restored emby token from database')
return response
[docs]
def save_token_data(self):
"""Save token data to Data Base."""
if self._apikey or not self.host or not self.username:
return
if self._login_type == LOGIN_API:
return
logger.debug('Saving emby token to database')
persist['token_data'] = {
'host': self.host.lower(),
'username': self._username.lower(),
'userid': self._userid,
'serverid': self._serverid,
'token': self._token,
'connect_username': self._connect_username,
'login_type': self._login_type,
}
@property
def uid(self) -> str:
return self._userid
@property
def username(self) -> str:
return self._username
@property
def host_name(self) -> str:
return self._host_name
@property
def logged(self) -> bool:
return self._logged
@property
def can_download(self) -> bool:
return self._can_download
@property
def token(self) -> str:
return self._token
@property
def server_id(self) -> str:
return self._serverid
@property
def wanurl(self) -> str:
return self._wanurl
@property
def lanurl(self) -> str:
return self._lanurl
[docs]
def get_user_by_name(self, name: str) -> dict:
"""Get user by username."""
args = {'IsDisabled': False}
useres = EmbyApi.request_emby(EMBY_ENDPOINT_GETUSERS, self, 'GET', **args)
if not useres:
return None
for user in useres:
if user.get('Name').lower() == name.lower():
return user
return None
[docs]
@staticmethod
def get_last_auth():
return EmbyAuth._last_auth
[docs]
class EmbyApiListBase(EmbyApiBase):
"""Base class to all API Lists."""
auth = None
_index = 0
_iterator = 10
_len = 0
id = None
_name = None
types = None
watched = None
favorite = None
sort = None
allow_create = False
_internal_items = None
field_map = {
'id': ['library_id', 'id', 'Id'],
'_name': ['library_name', 'list', 'name', 'Name'],
'types': ['types'],
'watched': ['watched'],
'favorite': ['favorite'],
'sort': ['sort'],
}
def __init__(self, **kwargs):
self.auth = EmbyApi.get_auth(**kwargs)
EmbyApiBase.update_using_map(self, EmbyApiListBase.field_map, kwargs)
if self.types and not isinstance(self.types, list):
self.types = [self.types]
if isinstance(self.sort, str):
self.sort = {'field': self.sort, 'order': 'ascending'}
if isinstance(self.sort, dict):
self.sort['field'] = self.sort['field'].replace('_', '')
[docs]
def set_list_search_args(self, args: dict):
if self.watched is not None:
args['IsPlayed'] = self.watched
if self.favorite is not None:
args['IsFavorite'] = self.favorite
if self.sort is not None:
args['SortBy'] = self.sort['field']
args['SortOrder'] = self.sort['order']
if not self.types or len(self.types) == 0:
args['IncludeItemTypes'] = 'Movie,Episode'
else:
args['IncludeItemTypes'] = ','.join(type.title() for type in self.types)
[docs]
def add(self, entry: Entry):
"""Add a item to list."""
item = EmbyApiMedia.cast(auth=self.auth, **entry)
if not item:
logger.warning("Not possible to match '{}' in emby", item.fullname)
return
if self.contains(item):
logger.warning("'{}' already in {}", item.fullname, self.fullname)
return
logger.debug("Adding '{}' to {}", item.fullname, self.fullname)
self._add(item)
[docs]
@abstractmethod
def _add(self, item: EmbyApiMedia):
pass
[docs]
def remove(self, entry: Entry):
"""Remove a item from list."""
item = EmbyApiMedia.cast(auth=self.auth, **entry)
if not item:
logger.warning("Not possible to match '{}' in emby", item.fullname)
return
if not self.contains(item):
logger.warning("'{}' not in {}", item.fullname, self.fullname)
return
logger.debug("Removing '{}' from {}", item.fullname, self.fullname)
self._remove(item)
[docs]
@abstractmethod
def _remove(self, item: EmbyApiMedia):
pass
[docs]
@abstractmethod
def get_items(self):
pass
[docs]
def contains(self, item):
"""Check if list contains item."""
return bool(self.get(item))
[docs]
def get(self, item) -> EmbyApiMedia:
"""Get Item from list."""
if isinstance(item, EmbyApiMedia):
s_item = item
else:
s_item = EmbyApiMedia.cast(auth=self.auth, **item)
if not s_item:
return None
return s_item
@property
def iterator(self):
return self._iterator
@property
def index(self):
return self._index * self._iterator
@property
def next_index(self):
index = self._index + 1
return index * self._iterator
@property
def len(self):
return self._len
@property
def name(self):
return self._name
@property
def fullname(self):
return self._name
@property
def created(self):
return bool(self.id)
@property
def immutable(self):
return
[docs]
@abstractstaticmethod
def is_type(**kwargs):
pass
[docs]
class EmbyApiList(EmbyApiBase, MutableSet):
"""Class to interface lists."""
auth = None
id = None
name = None
_list = None
_items = None
field_map = {
'id': ['library_id', 'id', 'Id'],
'name': ['library_name', 'list', 'name', 'Name'],
}
def __init__(self, **kwargs):
self.auth = EmbyApi.get_auth(**kwargs)
EmbyApiBase.update_using_map(self, self.field_map, kwargs)
self._list = self.get_api_list(**kwargs)
if not self._list:
raise PluginError(f"List '{self.name}' does not exist")
self._items = self._list.get_items()
def __contains__(self, item):
return self._list.contains
def __iter__(self):
return iter(self._items)
def __len__(self):
return self._list.len
[docs]
def add(self, item) -> None:
self._list.add(item)
[docs]
def discard(self, item) -> None:
self._list.remove(item)
[docs]
def get(self, item) -> None:
item_s = self._list.get(item)
if not item_s:
return None
return item_s.to_entry()
@property
def immutable(self):
return self._list.immutable
[docs]
@staticmethod
def get_api_list(**kwargs) -> EmbyApiListBase:
if EmbyApiRootList.is_type(**kwargs):
logger.debug('List is a root list')
return EmbyApiRootList(**kwargs)
if EmbyApiWatchedList.is_type(**kwargs):
logger.debug('List is a watched list')
return EmbyApiWatchedList(**kwargs)
if EmbyApiFavoriteList.is_type(**kwargs):
logger.debug('List is a favorite list')
return EmbyApiFavoriteList(**kwargs)
if EmbyApiLibrary.is_type(**kwargs):
logger.debug('List is a library')
return EmbyApiLibrary(**kwargs)
if EmbyApiPlayList.is_type(**kwargs):
logger.debug('List is a playlist')
return EmbyApiPlayList(**kwargs)
if EmbyApiRootList.is_type(**kwargs) or EmbyApiRootList.allow_create:
logger.debug('Creating a root list')
return EmbyApiRootList(**kwargs)
if EmbyApiWatchedList.is_type(**kwargs) or EmbyApiWatchedList.allow_create:
logger.debug('Creating a watched list')
return EmbyApiWatchedList(**kwargs)
if EmbyApiFavoriteList.is_type(**kwargs) or EmbyApiFavoriteList.allow_create:
logger.debug('Creating a favorite list')
return EmbyApiFavoriteList(**kwargs)
if EmbyApiLibrary.is_type(**kwargs) or EmbyApiLibrary.allow_create:
logger.debug('Creating a library')
return EmbyApiLibrary(**kwargs)
if EmbyApiPlayList.is_type(**kwargs) or EmbyApiPlayList.allow_create:
logger.debug('Creating a playlist')
return EmbyApiPlayList(**kwargs)
return None
[docs]
class EmbyApiLibrary(EmbyApiListBase):
"""Library List."""
def __init__(self, **kwargs):
EmbyApiListBase.__init__(self, **kwargs)
list_data = EmbyApiLibrary._get_list_data(self.auth, list=self.name)
if not list_data:
return
self.id = list_data['Id']
[docs]
def _add(self, item):
pass
[docs]
def _remove(self, item):
pass
[docs]
def get(self, item):
item_g = EmbyApiListBase.get(self, item)
if not item_g:
return None
if isinstance(item_g.library, EmbyApiLibrary) and item_g.library.id == self.id:
return item_g
return None
[docs]
def get_items(self):
if not self.created:
return []
args = {}
EmbyApi.set_common_search_arg(args)
self.set_list_search_args(args)
index = 0
args['ParentId'] = self.id
endpoint = EMBY_ENDPOINT_SEARCH.format(userid=self.auth.uid)
logger.debug('Search library with: {}', args)
while True:
args['Limit'] = self.iterator
args['StartIndex'] = index * self.iterator
items = EmbyApi.request_emby(endpoint, self.auth, 'GET', **args)
self._len = items.get('TotalRecordCount', 0)
if not items.get('Items'):
if index > 0:
items = []
return
items = items.get('Items', [])
for media in items:
media_obj = EmbyApiMedia.cast(auth=self.auth, **media)
if media_obj:
yield media_obj
index += 1
[docs]
@staticmethod
def library_refresh(auth):
if not auth:
auth = EmbyApi.get_auth()
additem = EmbyApi.request_emby(EMBY_ENDPOINT_LIBRARY_REFRESH, auth, 'POST')
if not additem:
logger.error('Not possible to refresh emby library')
[docs]
@staticmethod
def _get_list_data(auth, **kwargs):
args = {}
list_name = kwargs.get('list')
EmbyApi.set_common_search_arg(args)
args['IsHidden'] = False
logger.debug('Search Library name list with: {}', args)
search_list_data = EmbyApi.request_emby(EMBY_ENDPOINT_LIBRARY, auth, 'GET', **args)
if not search_list_data or not search_list_data['Items']:
return None
for search_list in search_list_data['Items']:
if (
'Type' not in search_list
or 'CollectionType' not in search_list
or 'IsFolder' not in search_list
):
continue
if search_list['Type'] != 'CollectionFolder' or (
search_list['CollectionType'] != 'tvshows'
and search_list['CollectionType'] != 'movies'
):
continue
if search_list['Name'].lower() == list_name.lower():
return search_list
return None
[docs]
@staticmethod
def is_type(**kwargs):
auth = EmbyApi.get_auth(**kwargs)
list_name = kwargs.get('list')
list_data = EmbyApiLibrary._get_list_data(auth, list=list_name)
return bool(list_data)
@property
def fullname(self):
return f"Library '{self.name}'"
@property
def created(self):
return bool(self.id)
@property
def immutable(self):
return 'Library is not modifiable'
[docs]
class EmbyApiRootList(EmbyApiListBase):
"""Root Media List."""
def __init__(self, **kwargs):
EmbyApiListBase.__init__(self, **kwargs)
self._name = 'Root List'
[docs]
def _add(self, item: EmbyApiMedia):
logger.warning('Not possible to add items to root list')
[docs]
def _remove(self, item: EmbyApiMedia):
logger.warning('Not possible to remove items from root list')
[docs]
def get_items(self):
args = {}
EmbyApi.set_common_search_arg(args)
self.set_list_search_args(args)
index = 0
endpoint = EMBY_ENDPOINT_SEARCH.format(userid=self.auth.uid)
logger.debug('Search root list with: {}', args)
while True:
args['Limit'] = self.iterator
args['StartIndex'] = index * self.iterator
items = EmbyApi.request_emby(endpoint, self.auth, 'GET', **args)
self._len = items.get('TotalRecordCount', 0)
if not items.get('Items'):
if index > 0:
items = []
return
items = items.get('Items', [])
for media in items:
media_obj = EmbyApiMedia.cast(auth=self.auth, **media)
if media_obj:
yield media_obj
index += 1
[docs]
@staticmethod
def is_type(**kwargs):
return kwargs.get('list') == '' or kwargs.get('list') is None
@property
def created(self):
return True
[docs]
class EmbyApiWatchedList(EmbyApiListBase):
"""Watched Media List."""
def __init__(self, **kwargs):
EmbyApiListBase.__init__(self, **kwargs)
self._name = 'Watched List'
[docs]
def _add(self, item: EmbyApiMedia):
args = {}
endpoint = EMBY_ENDPOINT_WATCHED.format(userid=self.auth.uid, itemid=item.id)
additem = EmbyApi.request_emby(endpoint, self.auth, 'POST', **args)
if not additem:
logger.warning("Not possible to add item '{}' to watched list", item.fullname)
[docs]
def _remove(self, item: EmbyApiMedia):
args = {}
endpoint = EMBY_ENDPOINT_WATCHED.format(userid=self.auth.uid, itemid=item.id)
additem = EmbyApi.request_emby(endpoint, self.auth, 'DELETE', **args)
if not additem:
logger.warning("Not possible to remove item '{}' from watched list", item.fullname)
[docs]
def get(self, item):
item_g = EmbyApiListBase.get(self, item)
if not item_g:
return None
if item_g.watched:
return item_g
return None
[docs]
def get_items(self):
args = {}
EmbyApi.set_common_search_arg(args)
self.set_list_search_args(args)
index = 0
args['IsPlayed'] = True
endpoint = EMBY_ENDPOINT_SEARCH.format(userid=self.auth.uid)
logger.debug('Search watched list with: {}', args)
while True:
args['Limit'] = self.iterator
args['StartIndex'] = index * self.iterator
items = EmbyApi.request_emby(endpoint, self.auth, 'GET', **args)
self._len = items.get('TotalRecordCount', 0)
if not items.get('Items'):
if index > 0:
items = []
return
items = items.get('Items', [])
for media in items:
media_obj = EmbyApiMedia.cast(auth=self.auth, **media)
if media_obj:
yield media_obj
index += 1
[docs]
@staticmethod
def is_type(**kwargs):
return kwargs.get('list') == 'watched'
@property
def created(self):
return True
[docs]
class EmbyApiFavoriteList(EmbyApiListBase):
"""Favorite media list."""
def __init__(self, **kwargs):
EmbyApiListBase.__init__(self, **kwargs)
self._name = 'Favorite List'
[docs]
def _add(self, item: EmbyApiMedia):
args = {}
endpoint = EMBY_ENDPOINT_FAVORITE.format(userid=self.auth.uid, itemid=item.id)
additem = EmbyApi.request_emby(endpoint, self.auth, 'POST', **args)
if not additem:
logger.warning("Not possible to add item '{}' to favorite list", item.fullname)
[docs]
def _remove(self, item: EmbyApiMedia):
args = {}
endpoint = EMBY_ENDPOINT_FAVORITE.format(userid=self.auth.uid, itemid=item.id)
additem = EmbyApi.request_emby(endpoint, self.auth, 'DELETE', **args)
if not additem:
logger.warning("Not possible to remove item '{}' from favorite list", item.fullname)
[docs]
def get(self, item):
item_g = EmbyApiListBase.get(self, item)
if not item_g:
return None
if item_g.favorite:
return item_g
return None
[docs]
def get_items(self):
args = {}
EmbyApi.set_common_search_arg(args)
self.set_list_search_args(args)
index = 0
args['IsFavorite'] = True
endpoint = EMBY_ENDPOINT_SEARCH.format(userid=self.auth.uid)
logger.debug('Search favorite list with: {}', args)
while True:
args['Limit'] = self.iterator
args['StartIndex'] = index * self.iterator
items = EmbyApi.request_emby(endpoint, self.auth, 'GET', **args)
self._len = items.get('TotalRecordCount', 0)
if not items.get('Items'):
if index > 0:
items = []
return
items = items.get('Items', [])
for media in items:
media_obj = EmbyApiMedia.cast(auth=self.auth, **media)
if media_obj:
yield media_obj
index += 1
[docs]
@staticmethod
def is_type(**kwargs):
return kwargs.get('list') == 'favorite'
@property
def created(self):
return True
[docs]
class EmbyApiPlayList(EmbyApiListBase):
"""Playlist lists."""
allow_create = True
playlist_bind = {}
def __init__(self, **kwargs):
EmbyApiListBase.__init__(self, **kwargs)
list_data = EmbyApiPlayList._get_list_data(self.auth, list=self.name)
if not list_data:
return
self.id = list_data['Id']
[docs]
def _add(self, item: EmbyApiMedia):
if not self.created:
new_list = EmbyApiPlayList.create(item, auth=self.auth, list=self.name)
self.id = new_list.id
return
args = {}
args['UserId'] = self.auth.uid
args['Ids'] = item.id
endpoint = EMBY_ENDPOINT_PLAYLIST.format(listid=self.id)
additem = EmbyApi.request_emby(endpoint, self.auth, 'POST', **args)
if not additem:
logger.warning(
"Not possible to add item '{}' to Playlist '{}'", item.fullname, self.name
)
return
[docs]
def _remove(self, item):
if not self.playlist_bind:
self.fill_items()
if len(self.playlist_bind) == 1:
logger.debug('{} is empty', self.fullname)
self.destroy()
return
if item.id not in self.playlist_bind:
logger.warning("Can't find entry of '{}' in {}", item.fullname, self.fullname)
return
args = {}
args['EntryIds'] = self.playlist_bind[item.id]
endpoint = EMBY_ENDPOINT_PLAYLIST.format(listid=self.id)
additem = EmbyApi.request_emby(endpoint, self.auth, 'DELETE', **args)
if not additem:
logger.warning(
"Not possible to remove item '{}' from Playlist '{}'", item.fullname, self.name
)
return
self.playlist_bind.pop(item.id, None)
if not self.playlist_bind:
self.destroy()
[docs]
def contains(self, item):
"""Check if list contains item."""
self.fill_items()
return EmbyApiListBase.contains(self, item)
[docs]
def destroy(self):
logger.debug('Deleting {}', self.fullname)
endpoint = EMBY_ENDPOINT_DELETE_ITEM.format(itemid=self.id)
additem = EmbyApi.request_emby(endpoint, self.auth, 'DELETE')
if not additem:
logger.warning('Not possible to delete {}', self.fullname)
return
self.id = None
self._internal_items = []
self.playlist_bind = {}
[docs]
def fill_items(self):
args = {}
EmbyApi.set_common_search_arg(args)
self.set_list_search_args(args)
if not self.id:
return
args['ParentId'] = self.id
logger.debug('Search PlayList with: {}', args)
endpoint = EMBY_ENDPOINT_SEARCH.format(userid=self.auth.uid)
items = EmbyApi.request_emby(endpoint, self.auth, 'GET', **args)
self._internal_items = items['Items'] if items else []
self.playlist_bind = {}
for media in self._internal_items:
self.playlist_bind[media['Id']] = media['PlaylistItemId']
[docs]
def get(self, item):
item_g = EmbyApiListBase.get(self, item)
if not item_g or not self.playlist_bind:
return None
if self.playlist_bind.get(item.id):
return item_g
return None
[docs]
def get_items(self):
if not self.created:
return []
self.fill_items()
for media in self._internal_items:
media_obj = EmbyApiMedia.cast(auth=self.auth, **media)
if media_obj:
yield media_obj
[docs]
@staticmethod
def _get_list_data(auth, **kwargs):
args = {}
list_name = kwargs.get('list')
EmbyApi.set_common_search_arg(args)
args['SearchTerm'] = list_name
args['Type'] = 'Playlist'
logger.debug('Search Playlist Name list with: {}', args)
search_list_data = EmbyApi.request_emby(EMBY_ENDPOINT_SEARCH, auth, 'GET', **args)
if not search_list_data or not search_list_data['Items']:
return None
for search_list in search_list_data['Items']:
if search_list['Name'].lower() == list_name.lower():
return search_list
return None
[docs]
@staticmethod
def is_type(**kwargs):
auth = EmbyApi.get_auth(**kwargs)
list_name = kwargs.get('list')
list_data = EmbyApiPlayList._get_list_data(auth, list=list_name)
return bool(list_data)
[docs]
@staticmethod
def create(item, **kwargs):
auth = EmbyApi.get_auth(**kwargs)
list_name = kwargs.get('list')
args = {}
args['Name'] = list_name
args['Ids'] = item.id
logger.debug("Creating playlist '{}'", list_name)
items = EmbyApi.request_emby(EMBY_ENDPOINT_NEW_PLAYLIST, auth, 'POST', **args)
if not items:
logger.warning("Not possible to create playlist '{}'", list_name)
return None
logger.debug("Returning information of new playlist '{}'", items)
new_playlist = EmbyApiPlayList(auth=auth, **items)
if not new_playlist.created:
logger.warning("Not possible to create playlist '{}'", list_name)
return None
logger.debug("Created playlist '{}' with id {}", new_playlist.name, new_playlist.id)
return new_playlist
@property
def fullname(self):
return f"Playlist '{self.name}'"
[docs]
class EmbyApiSeries(EmbyApiMedia):
"""Series."""
TYPE = 'series'
field_map_up = {
'id': 'series_id',
'base_name': ['series_name', 'series_name', 'SeriesName'],
'imdb_id': 'series_imdb_id',
'tmdb_id': 'series_tmdb_id',
'tvdb_id': 'series_tvdb_id',
'page': 'series_page',
'aired_date': 'series_aired_date',
'overview': 'series_overview',
'photo': 'series_photo',
'series_id': ['series_id', 'SeriesId'],
'series_name': ['series_name', 'series_name', 'SeriesName'],
'series_imdb_id': 'series_imdb_id',
'series_tmdb_id': 'series_tmdb_id',
'series_tvdb_id': 'series_tvdb_id',
}
def __init__(self, **kwargs):
EmbyApiMedia.__init__(self, field_map=self.field_map_up, **kwargs)
[docs]
def to_dict(self) -> dict:
if not self:
return {}
return {
**EmbyApiMedia.to_dict(self),
'series_id': self.series_id,
'series_name': self.series_name,
'series_imdb_id': self.series_imdb_id,
'series_tmdb_id': self.series_tmdb_id,
'series_tvdb_id': self.series_tvdb_id,
'series_aired_date': self.series_aired_date,
'series_year': self.series_year,
'series_overview': self.series_overview,
'series_photo': self.series_photo,
'series_page': self.series_page,
}
@property
def series_id(self) -> str:
return self.id
@property
def series_name(self) -> str:
return self.name
@property
def series_overview(self) -> str:
return self.overview
@property
def series_photo(self) -> str:
return self.photo
@property
def series_page(self) -> str:
return self.page
@property
def series_imdb_id(self) -> str:
return self.imdb_id
@property
def series_tmdb_id(self) -> str:
return self.tmdb_id
@property
def series_tvdb_id(self) -> str:
return self.tvdb_id
@property
def series_aired_date(self) -> datetime:
return self.aired_date
@property
def series_year(self) -> int:
return self.year
@property
def fullname(self) -> str:
if self.year:
return f'{self.name} ({self.year})'
return self.name
[docs]
@staticmethod
def parse_string(string: str, force_parse=False):
"""Return Relevant Information from string."""
if not string:
return None, None
info = re.search(r'(.+) [s]?([0-9]+)[e|x]([0-9]+)', string, re.IGNORECASE)
if not info or not info.groups():
info = re.search(r'(.+) [s]([0-9]+)', string, re.IGNORECASE)
if not info or not info.groups():
if force_parse:
# I assume that it's only a series if contains pathern, but I might need to assume it's a series
return split_title_year(string)
return None, None
try:
info = info.group(1)
except IndexError:
return None, None
return split_title_year(info)
[docs]
@staticmethod
def search(**kwargs) -> EmbyApiSeries:
args = {}
auth = EmbyApi.get_auth(**kwargs)
parameters = {}
field_map = EmbyApiBase.merge_field_map(
EmbyApiSeries.field_map_up,
EmbyApiMedia.field_map,
allow_new=True,
)
EmbyApi.update_using_map(parameters, field_map, kwargs, allow_new=True)
EmbyApi.set_common_search_arg(args)
if 'series_id' in parameters:
args['Ids'] = parameters.get('series_id')
elif EmbyApi.has_providers_search_arg(**parameters):
EmbyApi.set_providers_search_arg(args, **parameters)
else:
args['SearchTerm'], year = EmbyApiSeries.parse_string(
parameters.get('search_string', parameters.get('base_name')), True
)
if not args['SearchTerm']:
logger.warning('Not possible to search series, no search term')
return None
if parameters.get('year'):
args['Years'] = parameters['year']
elif year:
args['Years'] = year
args['IncludeItemTypes'] = 'Series'
logger.debug('Search series with: {}', args)
series = EmbyApi.request_emby(EMBY_ENDPOINT_SEARCH, auth, 'GET', **args)
if not series or 'Items' not in series or not series['Items']:
if EmbyApi.has_providers_search_arg(**parameters):
EmbyApi.remove_providers_search(parameters)
return EmbyApiSeries.search(**parameters)
logger.warning('No series found')
return None
if len(series['Items']) == 1:
series = series['Items'][0]
elif len(series['Items']) > 1:
series_filter = list(
filter(lambda s: s.get('Name') == args.get('SearchTerm'), series['Items'])
)
if args.get('Years'):
series_filter = list(
filter(lambda s: s.get('Year') == args.get('Years'), series_filter)
)
if len(series_filter) == 1:
series = series_filter[0]
else:
logger.warning('More than one series found')
return None
series_api = EmbyApiSeries(auth=auth, **series)
if series_api:
logger.debug("Found series '{}' in emby server", series_api.fullname)
return series_api
return None
[docs]
@staticmethod
def is_type(**kwargs) -> bool:
mtype = EmbyApiMedia.get_mtype(**kwargs)
if mtype.lower() == EmbyApiSeries.TYPE.lower():
return True
if mtype:
return False
if kwargs.get('series_name'):
return True
series, _ = EmbyApiSeries.parse_string(kwargs.get('search_string', kwargs.get('title')))
return bool(series)
[docs]
class EmbyApiSeason(EmbyApiMedia):
"""Season."""
TYPE = 'season'
_series = None
season = None
field_map = {'season': ['season', 'IndexNumber']}
field_map_up = {
'id': ['season_id', 'SeasonId'],
'base_name': 'season_name',
'imdb_id': 'season_imdb_id',
'tmdb_id': 'season_tmdb_id',
'tvdb_id': 'season_tvdb_id',
'photo': 'season_photo',
'page': 'season_page',
'season_id': ['season_id', 'SeasonId'],
'season_name': 'season_name',
'season_imdb_id': 'season_imdb_id',
'season_tmdb_id': 'season_tmdb_id',
'season_tvdb_id': 'season_tvdb_id',
'season': ['season', 'series_season'],
}
def __init__(self, **kwargs):
EmbyApiMedia.__init__(self, field_map=self.field_map_up, **kwargs)
if not kwargs:
return
EmbyApiBase.update_using_map(self, EmbyApiSeason.field_map, kwargs)
if isinstance(kwargs.get('api_series'), EmbyApiSeries):
self._series = kwargs.get('api_series')
elif isinstance(self.series, EmbyApiSeries):
self._series = self.series
else:
self._series = EmbyApiSeries.search(**kwargs) or EmbyApiSeries(auth=self.auth)
[docs]
def to_dict(self) -> dict:
if not self:
return {}
return {
**EmbyApiSeries.to_dict(self.series),
**EmbyApiMedia.to_dict(self),
'season': self.season,
'season_id': self.season_id,
'season_name': self.season_name,
'season_page': self.season_page,
'season_photo': self.season_photo,
'season_imdb_id': self.season_imdb_id,
'season_tmdb_id': self.season_tmdb_id,
'season_tvdb_id': self.season_tvdb_id,
}
@property
def fullname(self) -> str:
if not self.series:
return self.name
if self.season == 0:
return f'{self.series.fullname} S00'
if not self.season:
return f'{self.series.fullname} Sxx'
return f'{self.series.fullname} S{self.season:02d}'
@property
def season_name(self) -> str:
return self.name
@property
def season_id(self) -> str:
return self.id
@property
def series_overview(self) -> str:
return self.overview
@property
def season_imdb_id(self) -> str:
return self.imdb_id
@property
def season_tmdb_id(self) -> str:
return self.tmdb_id
@property
def season_tvdb_id(self) -> str:
return self.tvdb_id
@property
def season_photo(self) -> str:
return self.photo
@property
def season_page(self) -> str:
return self.page
@property
def series(self) -> EmbyApiSeries:
if isinstance(self._series, EmbyApiSeries):
return self._series
self._series = EmbyApiSeries.get_from_child(self)
return self._series
[docs]
@staticmethod
def is_type(**kwargs) -> bool:
mtype = EmbyApiMedia.get_mtype(**kwargs)
if mtype.lower() == EmbyApiSeason.TYPE.lower():
return True
if mtype:
return False
if (
EmbyApiSeries.is_type(**kwargs)
and 'series_season' in kwargs
and not EmbyApiEpisode.is_type(**kwargs)
):
return True
if EmbyApiSeason.parse_string(kwargs.get('title')):
return True
return bool(EmbyApiSeason.parse_string(kwargs.get('search_string')))
[docs]
@staticmethod
def parse_string(string: str):
"""Return Relevant Information from string."""
if not string:
return None
info = re.search(r'(.+) [s]?([0-9]+)[e|x]([0-9]+)', string, re.IGNORECASE)
if not info or not info.groups():
info = re.search(r'(.+) [s]([0-9]+)', string, re.IGNORECASE)
if not info or not info.groups():
return None
try:
return str_to_int(info.group(2))
except IndexError:
return None
[docs]
@staticmethod
def search(**kwargs) -> EmbyApiSeason:
args = {}
auth = EmbyApi.get_auth(**kwargs)
kwargs['auth'] = auth
parameters = {}
field_map = EmbyApiBase.merge_field_map(
EmbyApiSeason.field_map_up,
EmbyApiSeries.field_map_up,
EmbyApiMedia.field_map,
allow_new=True,
)
EmbyApi.update_using_map(parameters, field_map, kwargs, allow_new=True)
# We need to have information regarding the series
season_series = None
if 'api_series' in kwargs:
season_series = kwargs.get('api_series')
if not season_series:
season_series = EmbyApiSeries.search(**kwargs)
if not season_series:
logger.warning('Not possible to determine season, series not found')
return None
if 'season_id' in parameters:
args['Ids'] = parameters.get('season_id')
else:
args['ParentId'] = season_series.series_id
args['IncludeItemTypes'] = 'Season'
EmbyApi.set_common_search_arg(args)
logger.debug('Search season with: {}', args)
seasons = EmbyApi.request_emby(EMBY_ENDPOINT_SEARCH, auth, 'GET', **args)
if not seasons or 'Items' not in seasons or not seasons['Items']:
logger.warning('No season found')
return None
seasons = seasons['Items']
target_season = parameters.get('season')
if not target_season:
target_season = EmbyApiSeason.parse_string(
parameters.get('search_string', parameters.get('base_name'))
)
if target_season:
seasons_filter = []
seasons_filter = list(filter(lambda s: s.get('IndexNumber') == target_season, seasons))
seasons = seasons_filter
if len(seasons) == 1:
season = seasons[0]
elif len(seasons) > 1:
logger.warning(
'More than one season found for {} {}', season_series.fullname, target_season
)
return None
else:
logger.warning('No season found')
return None
season_api = EmbyApiSeason(auth=auth, api_series=season_series, **season)
if season_api:
logger.debug("Found season '{}' in emby server", season_api.fullname)
return season_api
return None
[docs]
class EmbyApiEpisode(EmbyApiMedia):
"""Episode."""
TYPE = 'episode'
_series = None
_season = None
episode = None
field_map = {'episode': ['episode', 'IndexNumber']}
field_map_up = {
'id': 'ep_id',
'base_name': 'ep_name',
'imdb_id': 'ep_imdb_id',
'tmdb_id': 'ep_tmdb_id',
'tvdb_id': 'ep_tvdb_id',
'aired_date': 'ep_aired_date',
'photo': 'ep_photo',
'page': 'ep_page',
'ep_id': 'ep_id',
'ep_name': 'ep_name',
'ep_imdb_id': 'ep_imdb_id',
'ep_tmdb_id': 'ep_tmdb_id',
'ep_tvdb_id': 'ep_tvdb_id',
'episode': ['episode', 'series_episode'],
}
def __init__(self, **kwargs):
EmbyApiMedia.__init__(self, field_map=self.field_map_up, **kwargs)
if not kwargs:
return
EmbyApiBase.update_using_map(self, EmbyApiEpisode.field_map, kwargs)
if isinstance(kwargs.get('api_series'), EmbyApiSeries):
self._series = kwargs.get('api_series')
elif isinstance(self.series, EmbyApiSeries):
self._series = self.series
else:
self._series = EmbyApiSeries.search(**kwargs) or EmbyApiSeries(auth=self.auth)
if isinstance(kwargs.get('api_season'), EmbyApiSeason):
self._season = kwargs.get('api_season')
elif isinstance(self.season, EmbyApiSeason):
self._season = self.season
else:
self._season = EmbyApiSeason.search(**kwargs) or EmbyApiSeason(auth=self.auth)
[docs]
def to_dict(self) -> dict:
if not self:
return {}
return {
**EmbyApiSeason.to_dict(self.season),
**EmbyApiSeries.to_dict(self.series),
**EmbyApiMedia.to_dict(self),
'episode': self.episode,
'ep_id': self.ep_id,
'ep_name': self.ep_name,
'ep_page': self.ep_page,
'ep_photo': self.ep_photo,
'ep_imdb_id': self.ep_imdb_id,
'ep_tmdb_id': self.ep_tmdb_id,
'ep_tvdb_id': self.ep_tvdb_id,
'ep_aired_date': self.ep_aired_date,
'ep_overview': self.ep_overview,
}
@property
def fullname(self) -> str:
if not self.series:
return self.name
if not self.season:
return f'{self.series.fullname} SxxExx'
if self.episode == 0:
return f'{self.series.fullname} S{self.season.season:02d}E00'
if not self.episode:
return f'{self.series.fullname} S{self.season.season:02d}Exx'
return f'{self.series.fullname} S{self.season.season:02d}E{self.episode:02d} {self.name}'
@property
def ep_id(self) -> str:
return self.id
@property
def ep_name(self) -> str:
return self.name
@property
def ep_imdb_id(self) -> str:
return self.imdb_id
@property
def ep_tmdb_id(self) -> str:
return self.tmdb_id
@property
def ep_tvdb_id(self) -> str:
return self.tvdb_id
@property
def ep_photo(self) -> str:
return self.photo
@property
def ep_overview(self) -> str:
return self.overview
@property
def ep_page(self) -> str:
return self.page
@property
def ep_aired_date(self) -> datetime:
return self.aired_date
@property
def season(self) -> EmbyApiSeason:
if isinstance(self._season, EmbyApiSeason):
return self._season
self._season = EmbyApiSeason.get_from_child(self)
return self._season
@property
def series(self) -> EmbyApiSeries:
if isinstance(self._series, EmbyApiSeries):
return self._series
self._series = EmbyApiSeries.get_from_child(self)
return self._series
[docs]
@staticmethod
def search(**kwargs) -> EmbyApiEpisode:
episode_series = None
episode_season = None
auth = EmbyApi.get_auth(**kwargs)
kwargs['auth'] = auth
args = {}
parameters = {}
field_map = EmbyApiBase.merge_field_map(
EmbyApiEpisode.field_map,
EmbyApiEpisode.field_map_up,
EmbyApiSeason.field_map_up,
EmbyApiSeries.field_map_up,
EmbyApiMedia.field_map,
allow_new=True,
)
EmbyApi.update_using_map(parameters, field_map, kwargs, allow_new=True)
# We need to have information regarding the series
if 'api_series' in kwargs:
episode_series = kwargs.get('api_series')
if not episode_series:
episode_series = EmbyApiSeries.search(**kwargs)
if not episode_series:
logger.warning('Not possible to determine episode, series not found')
return None
# We need to have information regarding the season
if 'api_season' in kwargs:
episode_season = kwargs.get('api_season')
if not episode_season:
episode_season = EmbyApiSeason.search(api_series=episode_series, **kwargs)
if not episode_season:
logger.warning('Not possible to determine episode, season not found')
return None
# We need to return all the episodes for that show/season
if 'ep_id' in parameters:
args['Ids'] = parameters.get('ep_id')
elif episode_season and episode_season.season_id:
args['ParentId'] = episode_season.season_id
elif episode_series and episode_series.series_id:
args['ParentId'] = episode_series.series_id
EmbyApi.set_common_search_arg(args)
args['IncludeItemTypes'] = 'Episode'
logger.debug('Search episode with: {}', args)
response = EmbyApi.request_emby(EMBY_ENDPOINT_SEARCH, auth, 'GET', **args)
if not response or 'Items' not in response or not response['Items']:
logger.warning('No episode found')
return None
episodes = response['Items']
if len(episodes) == 0:
logger.warning('No episode found')
return None
target_episode = None
if 'Ids' in args:
target_episode = episodes[0].get('IndexNumber', None)
if not target_episode:
target_episode = parameters.get('episode')
if not target_episode:
target_episode = EmbyApiEpisode.parse_string(
parameters.get('search_string', parameters.get('base_name'))
)
episode = []
episode = list(filter(lambda e: e.get('IndexNumber') == target_episode, episodes))
if len(episode) > 1:
logger.warning('More than one episode found')
return None
if len(episode) == 0:
logger.warning(
'Episodes found for {} but none matches {}',
episode_season.fullname,
target_episode,
)
return None
episode_api = EmbyApiEpisode(
auth=auth,
api_series=episode_series,
api_season=episode_season,
**episode[0],
)
if episode_api:
logger.debug("Found episode '{}' in emby server", episode_api.fullname)
return episode_api
return None
[docs]
@staticmethod
def parse_string(string: str):
"""Return Relevant Information from string."""
if not string:
return None
info = re.search(r'(.+) [s]?([0-9]+)[e|x]([0-9]+)', string, re.IGNORECASE)
if not info or not info.groups():
return None
try:
return str_to_int(info.group(3))
except IndexError:
return None
[docs]
@staticmethod
def is_type(**kwargs) -> bool:
mtype = EmbyApiMedia.get_mtype(**kwargs)
if mtype.lower() == EmbyApiEpisode.TYPE.lower():
return True
if mtype:
return False
if (
EmbyApiSeries.is_type(**kwargs)
and 'series_season' in kwargs
and 'series_episode' in kwargs
):
return True
return bool(EmbyApiEpisode.parse_string(kwargs.get('search_string', kwargs.get('title'))))
[docs]
class EmbyApiMovie(EmbyApiMedia):
"""Movie."""
TYPE = 'movie'
field_map_up = {
'id': 'movie_id',
'base_name': 'movie_name',
'imdb_id': 'movie_imdb_id',
'tmdb_id': 'movie_tmdb_id',
'tvdb_id': 'movie_tvdb_id',
'aired_date': 'movie_aired_date',
'year': 'movie_year',
'photo': 'movie_photo',
'page': 'movie_page',
'overview': 'movie_overview',
'movie_id': 'movie_id',
'movie_name': 'movie_name',
'movie_imdb_id': 'movie_imdb_id',
'movie_tmdb_id': 'movie_tmdb_id',
'movie_tvdb_id': 'movie_tvdb_id',
'movie_year': 'movie_year',
}
def __init__(self, **kwargs):
EmbyApiMedia.__init__(self, field_map=EmbyApiMovie.field_map_up, **kwargs)
self.mtype = EmbyApiMovie.TYPE
[docs]
def to_dict(self) -> dict:
if not self:
return {}
return {
**EmbyApiMedia.to_dict(self),
'movie_id': self.movie_id,
'movie_name': self.movie_name,
'movie_imdb_id': self.movie_imdb_id,
'movie_tmdb_id': self.movie_tmdb_id,
'movie_tvdb_id': self.movie_tvdb_id,
'movie_aired_date': self.movie_aired_date,
'movie_year': self.movie_year,
'movie_photo': self.movie_photo,
'movie_page': self.movie_page,
'movie_overview': self.movie_overview,
}
@property
def fullname(self):
return f'{self.name} ({self.year})'
@property
def movie_id(self) -> str:
return self.id
@property
def movie_name(self) -> str:
return self.name
@property
def movie_page(self) -> str:
return self.page
@property
def movie_imdb_id(self) -> str:
return self.imdb_id
@property
def movie_tmdb_id(self) -> str:
return self.tmdb_id
@property
def movie_tvdb_id(self) -> str:
return self.tvdb_id
@property
def movie_overview(self) -> str:
return self.overview
@property
def movie_photo(self) -> str:
return self.photo
@property
def movie_aired_date(self) -> datetime:
return self.aired_date
@property
def movie_year(self) -> int:
return self.year
[docs]
@staticmethod
def parse_string(string: str):
"""Return Relevant Information from string."""
if not string:
return None, None
return split_title_year(string)
[docs]
@staticmethod
def search(**kwargs) -> EmbyApiMovie:
args = {}
auth = EmbyApi.get_auth(**kwargs)
kwargs['auth'] = auth
parameters = {}
field_map = EmbyApiBase.merge_field_map(
EmbyApiMovie.field_map_up,
EmbyApiMedia.field_map,
allow_new=True,
)
EmbyApi.update_using_map(parameters, field_map, kwargs, allow_new=True)
EmbyApi.set_common_search_arg(args)
if 'movie_id' in parameters:
args['Ids'] = parameters.get('movie_id')
elif 'id' in parameters:
args['Ids'] = parameters.get('id')
elif EmbyApi.has_providers_search_arg(**parameters):
EmbyApi.set_providers_search_arg(args, **parameters)
else:
args['SearchTerm'], year = EmbyApiMovie.parse_string(
parameters.get('search_string', parameters.get('base_name'))
)
if not args['SearchTerm']:
logger.warning('Not possible to search movie, no search term')
return None
if parameters.get('year'):
args['Years'] = parameters['year']
elif year:
args['Years'] = year
args['IncludeItemTypes'] = 'Movie'
logger.debug('Search movie with: {}', args)
movies = EmbyApi.request_emby(EMBY_ENDPOINT_SEARCH, auth, 'GET', **args)
if not movies or 'Items' not in movies or not movies['Items']:
if EmbyApi.has_providers_search_arg(**parameters):
EmbyApi.remove_providers_search(parameters)
return EmbyApiMovie.search(**parameters)
logger.warning('No movie found')
return None
if len(movies['Items']) == 1:
movie = movies['Items'][0]
if len(movies['Items']) > 1:
movie_filter = list(
filter(lambda s: s.get('Name') == args.get('SearchTerm'), movies['Items'])
)
if args.get('Years'):
movie_filter = list(
filter(lambda s: s.get('Year') == args.get('Years'), movie_filter)
)
if len(movie_filter) == 1:
movie = movie_filter[0]
else:
logger.warning('More than one movie found')
return None
movie_api = EmbyApiMovie(auth=auth, **movie)
if movie_api:
logger.debug('Found movie {} in emby server', movie_api.fullname)
return movie_api
return None
[docs]
@staticmethod
def is_type(**kwargs) -> bool:
mtype = EmbyApiMedia.get_mtype(**kwargs)
if mtype.lower() == EmbyApiMovie.TYPE.lower():
return True
if mtype:
return False
if kwargs.get('movie_name'):
return True
movie, _ = EmbyApiMovie.parse_string(kwargs.get('title', kwargs.get('search_string')))
return bool(movie)
[docs]
class EmbyApi(EmbyApiBase):
"""Class to interact with Emby API."""
_last_auth = None
auth = None
EMBY_EXTRA_FIELDS = [
'DateCreated',
'Path',
'ProviderIds',
'PremiereDate',
'MediaSources',
'Video3DFormat',
'Overview',
]
def __init__(self, auth: EmbyAuth):
self.auth = auth
EmbyApi._last_auth = auth
[docs]
@staticmethod
def set_common_search_arg(args: dict):
args['Recursive'] = True
args['Fields'] = ','.join(EmbyApi.EMBY_EXTRA_FIELDS)
args['IsMissing'] = False
[docs]
@staticmethod
def set_providers_search_arg(args: dict, **kwargs):
providers = []
if kwargs.get('imdb_id'):
providers.append(f'imdb.{kwargs.get("imdb_id")}')
if kwargs.get('tmdb_id'):
providers.append(f'tmdb.{kwargs.get("tmdb_id")}')
if kwargs.get('tvdb_id'):
providers.append(f'tvdb.{kwargs.get("tvdb_id")}')
if kwargs.get('tvrage_id'):
providers.append(f'tvrage.{kwargs.get("tvrage_id")}')
providers = list(dict.fromkeys(providers))
providers_str = ';'.join(providers)
if not providers_str:
return
args['AnyProviderIdEquals'] = providers_str
[docs]
@staticmethod
def remove_providers_search(args: dict):
args.pop('imdb_id', None)
args.pop('tmdb_id', None)
args.pop('tvdb_id', None)
args.pop('tvrage_id', None)
[docs]
@staticmethod
def has_providers_search_arg(**kwargs) -> bool:
providers = {}
EmbyApi.set_providers_search_arg(providers, **kwargs)
return bool(providers and providers['AnyProviderIdEquals'])
[docs]
@staticmethod
def get_auth(**kwargs) -> EmbyAuth:
if 'auth' in kwargs:
return kwargs['auth']
if EmbyApi._last_auth:
return EmbyApi._last_auth
if EmbyAuth.get_last_auth():
return EmbyAuth.get_last_auth()
return EmbyAuth(**kwargs)
[docs]
@staticmethod
def search(**kwargs):
search_result = None
if 'auth' not in kwargs:
kwargs['auth'] = EmbyApi.get_auth(**kwargs)
if EmbyApiEpisode.is_type(**kwargs):
logger.debug('API search episode')
search_result = EmbyApiEpisode.search(**kwargs)
elif EmbyApiSeason.is_type(**kwargs):
logger.debug('API search season')
search_result = EmbyApiSeason.search(**kwargs)
elif EmbyApiSeries.is_type(**kwargs):
logger.debug('API search series')
search_result = EmbyApiSeries.search(**kwargs)
elif EmbyApiMovie.is_type(**kwargs):
logger.debug('API search movie')
search_result = EmbyApiMovie.search(**kwargs)
elif 'executed' not in kwargs:
logger.debug('API search unknown media')
search_result = EmbyApiMedia.search(**kwargs)
if not search_result:
return None
if isinstance(search_result, dict):
return search_result
if isinstance(search_result, EmbyApiMedia):
return search_result.to_dict()
return None
[docs]
@staticmethod
def search_list(**kwargs):
if 'auth' not in kwargs:
kwargs['auth'] = EmbyApi.get_auth(**kwargs)
mlist = kwargs.get('list')
list_object = EmbyApiList.get_api_list(**kwargs)
if not list_object:
raise PluginError(f"List '{mlist}' does not exist")
mlist = list_object.get_items()
yield from mlist
[docs]
@staticmethod
def strtotime(date) -> datetime:
# YYYY-MM-DDTHH:MM:SS.0000000+00:00
if not date:
return None
if isinstance(date, datetime):
return pendulum.instance(date)
if not isinstance(date, str):
return None
try:
date = pendulum.parse(date)
except ValueError:
date = None
return date
[docs]
@staticmethod
def get_type(**kwargs) -> bool:
if EmbyApiEpisode.is_type(**kwargs):
return EmbyApiEpisode.TYPE
if EmbyApiSeason.is_type(**kwargs):
return EmbyApiSeason.TYPE
if EmbyApiSeries.is_type(**kwargs):
return EmbyApiSeries.TYPE
if EmbyApiMovie.is_type(**kwargs):
return EmbyApiMovie.TYPE
return EmbyApiMedia.TYPE
[docs]
@staticmethod
def request_emby(
endpoint: str,
auth: EmbyAuth,
method: str,
emby_connect=False,
**kwargs,
):
verify_certificates = bool(emby_connect)
if not auth:
auth = EmbyApi.get_auth(**kwargs)
return None
if not auth.host:
raise PluginError('No Emby server information')
if auth:
endpoint = endpoint.format(userid=auth.uid)
if not auth:
auth = EmbyApi.get_auth(**kwargs)
url = f'{auth.host}{endpoint}' if not emby_connect else f'{EMBY_CONNECT}{endpoint}'
if EMBY_ENDPOINT_CONNECT_EXCHANGE in endpoint:
request_headers = {}
request_headers['X-Emby-Token'] = auth._connect_token_link
else:
request_headers = auth.add_token_header({}, emby_connect=emby_connect)
try:
if method == 'POST':
response = requests.post(
url,
json=kwargs,
headers=request_headers,
allow_redirects=True,
verify=verify_certificates,
)
elif method == 'GET':
response = requests.get(
url,
params=kwargs,
headers=request_headers,
allow_redirects=True,
verify=verify_certificates,
)
elif method == 'DELETE':
response = requests.request(
'DELETE',
url,
params=kwargs,
headers=request_headers,
allow_redirects=True,
verify=verify_certificates,
)
except HTTPError as e: # Authentication Problem
if e.response.status_code == 401:
logger.error('Authentication Error: {}', str(e))
return False
raise PluginError(f'Could not connect to Emby Server: {e!s}') from e
except RequestException as e:
raise PluginError(f'Could not connect to Emby Server: {e!s}') from e
if response.status_code in (200, 204):
try:
return response.json()
except ValueError:
return {'code': response.status_code}
return False