from __future__ import annotations
import typing
from collections.abc import MutableSet
from loguru import logger
from flexget import plugin
from flexget.entry import Entry
from flexget.event import event
PLUGIN_NAME = 'plex_watchlist'
SUPPORTED_IDS = ['imdb_id', 'tmdb_id', 'tvdb_id', 'plex_guid']
logger = logger.bind(name=PLUGIN_NAME)
if typing.TYPE_CHECKING:
from plexapi.myplex import MyPlexAccount
from plexapi.video import Movie, Show
[docs]
def import_plexaccount() -> type[MyPlexAccount]:
try:
from plexapi.myplex import MyPlexAccount
except ImportError:
raise plugin.DependencyError('plex_watchlist', 'plexapi', 'plexapi package required')
return MyPlexAccount
[docs]
def to_entry(plex_item: Movie | Show) -> Entry:
entry = Entry(
title=f'{plex_item.title} ({plex_item.year})' if plex_item.year else plex_item.title,
url=plex_item.guid,
)
if plex_item.TYPE == 'movie':
entry['movie_name'] = plex_item.title
entry['movie_year'] = plex_item.year
elif plex_item.TYPE == 'show':
entry['series_name'] = plex_item.title
entry['series_year'] = plex_item.year
entry.update(get_supported_ids_from_plex_object(plex_item))
return entry
[docs]
def get_supported_ids_from_plex_object(plex_item):
ids = {'plex_guid': plex_item.guid}
for guid in plex_item.guids:
x = guid.id.split('://')
try:
value = int(x[1])
except ValueError:
value = x[1]
media_id = f'{x[0]}_id'
if media_id in SUPPORTED_IDS:
ids[media_id] = value
return ids
[docs]
class VideoStub:
guid: str
title: str
# plexapi objects are build from XML. So we create a simple stub that works for watchlist calls
[docs]
def to_plex_item(entry):
item = VideoStub()
item.guid = entry['plex_guid']
item.title = entry['title']
return item
[docs]
class PlexManagedWatchlist(MutableSet):
def __init__(
self,
username: str | None = None,
password: str | None = None,
token: str | None = None,
filter: str | None = None,
type: str | None = None,
):
self.username = username
self.password = password
self.token = token
self.type = type
self.filter = filter
self._items: list[Entry] | None = None
self._account: MyPlexAccount | None = None
@property
def account(self) -> MyPlexAccount:
MyPlexAccount = import_plexaccount() # noqa: N806 It's a class
if self._account is None:
self._account = MyPlexAccount(self.username, self.password, self.token)
return self._account
@property
def items(self) -> list[Entry]:
if self._items is None:
watchlist = self.account.watchlist(filter=self.filter, libtype=self.type)
self._items = []
for item in watchlist:
self._items.append(to_entry(item))
return self._items
def __iter__(self):
return iter(self.items)
def __len__(self) -> int:
return len(self.items)
def __contains__(self, entry) -> bool:
return self._find_entry(entry) is not None
[docs]
def get(self, entry) -> Entry | None:
return self._find_entry(entry)
[docs]
def add(self, entry: Entry) -> None:
item = None
if 'plex_guid' in entry:
item = to_plex_item(entry)
else:
logger.debug('Searching for {} with discover', entry['title'])
results = self.account.searchDiscover(entry['title'], libtype=self.type)
matched_entry = self._match_entry(entry, [to_entry(result) for result in results])
if matched_entry:
item = to_plex_item(matched_entry)
if item:
if self.account.onWatchlist(item):
logger.debug('"{}" is already on the watchlist', item.title)
return
logger.debug('Adding "{}" to the watchlist', item.title)
self.account.addToWatchlist(item)
[docs]
def discard(self, entry) -> None:
entry = self._find_entry(entry)
if entry:
item = to_plex_item(entry)
logger.debug('Removing {} from watchlist', entry['title'])
self.account.removeFromWatchlist(item)
@property
def online(self) -> bool:
return True
@property
def immutable(self):
return False
[docs]
def _find_entry(self, entry):
return self._match_entry(entry, self.items)
[docs]
def _match_entry(self, entry: Entry, entries: list[Entry]):
for item in entries:
# match on supported ids
if any(entry.get(id) is not None and entry[id] == item[id] for id in SUPPORTED_IDS):
return item
name = entry.get('movie_name', None) or entry.get('series_name', None)
year = entry.get('movie_year', None) or entry.get('series_year', None)
_name = item.get('movie_name', None) or item.get('series_name', None)
_year = item.get('movie_year', None) or item.get('series_year', None)
if (name and year) and (_name == name and _year == year):
return item
# title matching sucks but lets try as last resort
if entry.get('title').lower() == item['title'].lower():
return item
return None
[docs]
class PlexWatchlist:
schema = {
'properties': {
'username': {'type': 'string'},
'password': {'type': 'string'},
'token': {'type': 'string'},
'type': {'type': 'string', 'enum': ['movie', 'show']},
'filter': {'type': 'string', 'enum': ['available', 'released']},
},
'anyOf': [{'required': ['token']}, {'required': ['username', 'password']}],
}
[docs]
@plugin.priority(plugin.PRIORITY_FIRST)
def on_task_start(self, task, config):
import_plexaccount()
[docs]
def get_list(self, config):
return PlexManagedWatchlist(**config)
@plugin.internet(logger)
def on_task_input(self, task, config):
yaml_list = PlexManagedWatchlist(**config)
yield from yaml_list
[docs]
@event('plugin.register')
def register_plugin():
plugin.register(PlexWatchlist, PLUGIN_NAME, api_ver=2, interfaces=['task', 'list'])