Source code for flexget.components.managed_lists.lists.movie_list.cli

from argparse import ArgumentParser, ArgumentTypeError

from sqlalchemy.orm.exc import NoResultFound

from flexget import options, plugin
from flexget.entry import Entry
from flexget.event import event
from flexget.manager import Session
from flexget.plugin import DependencyError, PluginError
from flexget.terminal import TerminalTable, console, table_parser
from flexget.utils.tools import split_title_year

from . import db
from .movie_list import MovieListBase


[docs] def lookup_movie(title, session, identifiers=None): try: imdb_lookup = plugin.get('imdb_lookup', 'movie_list').lookup except DependencyError: imdb_lookup = None try: tmdb_lookup = plugin.get('tmdb_lookup', 'movie_list').lookup except DependencyError: tmdb_lookup = None if not (imdb_lookup or tmdb_lookup): return None entry = Entry(title=title) if identifiers: for identifier in identifiers: for key, value in identifier.items(): entry[key] = value imdb_failed = imdb_lookup is None if imdb_lookup: try: imdb_lookup(entry, session=session) # IMDB lookup raises PluginError instead of the normal ValueError except PluginError: imdb_failed = True if imdb_failed and tmdb_lookup: tmdb_lookup(entry) # Return only if lookup was successful if entry.get('movie_name'): return entry return None
[docs] def movie_list_keyword_type(identifier): if identifier.count('=') != 1: raise ArgumentTypeError( f'Received identifier in wrong format: {identifier}, ' ' should be in keyword format like `imdb_id=tt1234567`' ) name, value = identifier.split('=', 2) if name not in MovieListBase().supported_ids: raise ArgumentTypeError( 'Received unsupported identifier ID {}. Should be one of {}'.format( identifier, ' ,'.join(MovieListBase().supported_ids) ) ) return {name: value}
[docs] def do_cli(manager, options): """Handle movie-list subcommand.""" if options.list_action == 'all': movie_list_lists(options) return if options.list_action == 'list': movie_list_list(options) return if options.list_action == 'add': movie_list_add(options) return if options.list_action == 'del': movie_list_del(options) return if options.list_action == 'purge': movie_list_purge(options) return
[docs] def movie_list_lists(options): """Show all movie lists.""" lists = db.get_movie_lists() header = ['#', 'List Name'] table = TerminalTable(*header, table_type=options.table_type) for movie_list in lists: table.add_row(str(movie_list.id), movie_list.name) console(table)
[docs] def movie_list_list(options): """List movie list.""" with Session() as session: try: movie_list = db.get_list_by_exact_name(options.list_name) except NoResultFound: console(f'Could not find movie list with name {options.list_name}') return movies = db.get_movies_by_list_id( movie_list.id, order_by='added', descending=True, session=session ) # Find which identifiers are actually used by any movie used_identifiers = set() for movie in movies: used_identifiers.update(movie.identifiers.keys()) # Only include identifier columns that are actually used relevant_identifiers = [ identifier for identifier in db.MovieListBase().supported_ids if identifier in used_identifiers ] header = ['#', 'Movie Name', 'Movie year'] header += relevant_identifiers title = f'{len(movies)} Movies in movie list: `{options.list_name}`' table = TerminalTable(*header, table_type=options.table_type, title=title) for movie in movies: movie_row = [str(movie.id), movie.title, str(movie.year) if movie.year else ''] movie_row.extend([ str(movie.identifiers.get(identifier, '')) for identifier in relevant_identifiers ]) table.add_row(*movie_row) console(table)
[docs] def movie_list_add(options): with Session() as session: try: movie_list = db.get_list_by_exact_name(options.list_name, session=session) except NoResultFound: console(f'Could not find movie list with name {options.list_name}, creating') movie_list = db.MovieListList(name=options.list_name) session.add(movie_list) session.commit() title, year = split_title_year(options.movie_title) console(f'Trying to lookup movie title: `{title}`') movie_lookup = lookup_movie(title=title, session=session, identifiers=options.identifiers) if not movie_lookup: console(f'ERROR: movie lookup failed for movie {options.movie_title}, aborting') return title = movie_lookup['movie_name'] movie = db.get_movie_by_title_and_year( list_id=movie_list.id, title=title, year=year, session=session ) if not movie: console(f'Adding movie with title {title} to list {movie_list.name}') movie = db.MovieListMovie(title=title, year=year, list_id=movie_list.id) else: console(f'Movie with title {title} already exist in list {movie_list.name}') id_list = [] if options.identifiers: id_list = options.identifiers else: for _id in db.MovieListBase().supported_ids: if movie_lookup.get(_id): id_list.append({_id: movie_lookup.get(_id)}) if id_list: console('Setting movie identifiers:') for ident in id_list: for key in ident: console(f'{key}: {ident[key]}') movie.ids = db.get_db_movie_identifiers(identifier_list=id_list, session=session) session.merge(movie) console(f'Successfully added movie {title} to movie list {movie_list.name} ')
[docs] def movie_list_del(options): with Session() as session: try: movie_list = db.get_list_by_exact_name(options.list_name) except NoResultFound: console(f'Could not find movie list with name {options.list_name}') return try: movie_exist = db.get_movie_by_id( list_id=movie_list.id, movie_id=int(options.movie), session=session ) except NoResultFound: console( f'Could not find movie with ID {int(options.movie)} in list `{options.list_name}`' ) return except ValueError: title, year = split_title_year(options.movie) movie_exist = db.get_movie_by_title_and_year( list_id=movie_list.id, title=title, year=year, session=session ) if not movie_exist: console(f'Could not find movie with title {options.movie} in list {options.list_name}') return console(f'Removing movie {movie_exist.title} from list {options.list_name}') session.delete(movie_exist)
[docs] def movie_list_purge(options): with Session() as session: try: movie_list = db.get_list_by_exact_name(options.list_name) except NoResultFound: console(f'Could not find movie list with name {options.list_name}') return console(f'Deleting list {options.list_name}') session.delete(movie_list)
[docs] @event('options.register') def register_parser_arguments(): # Common option to be used in multiple subparsers movie_parser = ArgumentParser(add_help=False) movie_parser.add_argument('movie_title', metavar='<MOVIE TITLE>', help='Title of the movie') name_or_id_parser = ArgumentParser(add_help=False) name_or_id_parser.add_argument( 'movie', metavar='<NAME or ID>', help='Title or ID of the movie' ) identifiers_parser = ArgumentParser(add_help=False) identifiers_parser.add_argument( '-i', '--identifiers', metavar='<identifiers>', nargs='+', type=movie_list_keyword_type, help='Can be a string or a list of string with the format imdb_id=XXX, tmdb_id=XXX, etc', ) list_name_parser = ArgumentParser(add_help=False) list_name_parser.add_argument( 'list_name', nargs='?', metavar='<LIST NAME>', default='movies', help="Name of movie list to operate on (Default is 'movies')", ) # Register subcommand parser = options.register_command('movie-list', do_cli, help='View and manage movie lists') # Set up our subparsers subparsers = parser.add_subparsers(title='actions', metavar='<action>', dest='list_action') subparsers.add_parser('all', parents=[table_parser], help='Shows all existing movie lists') subparsers.add_parser( 'list', parents=[list_name_parser, table_parser], help='List movies from a list' ) subparsers.add_parser( 'add', parents=[list_name_parser, movie_parser, identifiers_parser], help='Add a movie to a list', ) subparsers.add_parser( 'del', parents=[list_name_parser, name_or_id_parser], help='Remove a movie from a list using its title or ID', ) subparsers.add_parser( 'purge', parents=[list_name_parser], help='Removes an entire list with all of its movies. Use this with caution', )