Source code for flexget.plugins.filter.exists_movie

import re
from pathlib import Path

from loguru import logger

from flexget import plugin
from flexget.config_schema import one_or_more
from flexget.event import event
from flexget.utils.tools import TimedDict

logger = logger.bind(name='exists_movie')


[docs] def merge_found_qualities(existing_qualities: dict[str, set], new_qualities: dict[str, set]): """Merge the qualities from new_qualities dict into existing_qualities dict.""" for movie_id, quals in new_qualities.items(): existing_qualities.setdefault(movie_id, set()).update(quals)
[docs] class FilterExistsMovie: """Reject existing movies. Syntax: exists_movie: path: /path/to/movies [type: {dirs|files}] [allow_different_qualities: {better|yes|no}] [lookup: {imdb|no}] [recursive: {yes|no}] """ schema = { 'anyOf': [ one_or_more({'type': 'string', 'format': 'path'}), { 'type': 'object', 'properties': { 'path': one_or_more({'type': 'string', 'format': 'path'}), 'allow_different_qualities': { 'enum': ['better', True, False], 'default': False, }, 'type': {'enum': ['files', 'dirs'], 'default': 'dirs'}, 'lookup': {'enum': ['imdb', False], 'default': False}, 'recursive': {'type': 'boolean', 'default': False}, }, 'required': ['path'], 'additionalProperties': False, }, ] } dir_pattern = re.compile(r'\b(cd.\d|subs?|samples?)\b', re.IGNORECASE) file_pattern = re.compile(r'\.(avi|mkv|mp4|mpg|webm)$', re.IGNORECASE) def __init__(self): self.cache: dict[Path, dict[str, set]] = TimedDict(cache_time='1 hour')
[docs] def prepare_config(self, config): # if config is not a dict, assign value to 'path' key if not isinstance(config, dict): config = {'path': config} if not config.get('type'): config['type'] = 'dirs' # if only a single path is passed turn it into a 1 element list if isinstance(config['path'], str): config['path'] = [config['path']] return config
[docs] @plugin.priority(-1) def on_task_filter(self, task, config): if not task.accepted: logger.debug('nothing accepted, aborting') return config = self.prepare_config(config) imdb_lookup = plugin.get('imdb_lookup', self) incompatible_files = 0 incompatible_entries = 0 count_entries = 0 count_files = 0 # Maps movie identifier: set of found qualitites existing_qualities: dict[str, set] = {} for folder in config['path']: folder = Path(folder).expanduser() # see if this path has already been scanned cached_qualities = self.cache.get(folder, None) if cached_qualities: logger.verbose('Using cached scan for {} ...', folder) merge_found_qualities(existing_qualities, cached_qualities) continue path_qualities: dict[str, set] = {} if not folder.is_dir(): logger.critical('Path {} does not exist', folder) continue logger.verbose('Scanning path {} ...', folder) # Help debugging by removing a lot of noise # logging.getLogger('movieparser').setLevel(logging.WARNING) # logging.getLogger('imdb_lookup').setLevel(logging.WARNING) # scan through items = [] for p in folder.rglob('*') if config.get('recursive') else folder.iterdir(): if config.get('type') == 'dirs' and p.is_dir(): if self.dir_pattern.search(p.name): continue logger.debug('detected dir with name {}, adding to check list', p.name) items.append(p.name) elif config.get('type') == 'files' and p.is_file(): if not self.file_pattern.search(p.name): continue logger.debug('detected file with name {}, adding to check list', p.name) items.append(p.name) if not items: logger.verbose( 'No items with type {} were found in {}', config.get('type'), folder ) continue for item in items: count_files += 1 movie = plugin.get('parsing', self).parse_movie(item) if config.get('lookup') == 'imdb': try: movie_id = imdb_lookup.imdb_id_lookup( movie_title=movie.name, movie_year=movie.year, raw_title=item, session=task.session, ) except plugin.PluginError as e: logger.trace('{} lookup failed ({})', item, e.value) incompatible_files += 1 continue else: movie_id = movie.name if movie.name is not None and movie.year is not None: movie_id = f'{movie.name} {movie.year}' if movie_id is not None: path_qualities.setdefault(movie_id, set()).add(movie.quality) logger.trace('adding: {}', movie_id) # store to cache and extend to found list self.cache[folder] = path_qualities merge_found_qualities(existing_qualities, path_qualities) logger.debug('-- Start filtering entries ----------------------------------') # do actual filtering for entry in task.accepted: count_entries += 1 logger.debug('trying to parse entry {}', entry['title']) if config.get('lookup') == 'imdb': if not entry.get('imdb_id', eval_lazy=False): try: imdb_lookup.lookup(entry) except plugin.PluginError as e: logger.trace('entry {} imdb failed ({})', entry['title'], e.value) incompatible_entries += 1 continue movie_id = entry['imdb_id'] else: if not entry.get('movie_name', eval_lazy=False): movie = plugin.get('parsing', self).parse_movie(entry['title']) entry['movie_name'] = movie.name entry['movie_year'] = movie.year if entry.get('movie_year', eval_lazy=False): movie_id = f'{entry["movie_name"]} {entry["movie_year"]}' else: movie_id = entry['movie_name'] # actual filtering if movie_id in existing_qualities: if config.get('allow_different_qualities') == 'better': if all(entry['quality'] > qual for qual in existing_qualities[movie_id]): logger.trace('better quality') continue elif ( config.get('allow_different_qualities') and entry['quality'] not in existing_qualities[movie_id] ): logger.trace('wrong quality') continue entry.reject('movie exists') if incompatible_files or incompatible_entries: logger.verbose( 'There were some incompatible items. {} of {} entries and {} of {} directories could not be verified.', incompatible_entries, count_entries, incompatible_files, count_files, ) logger.debug('-- Finished filtering entries -------------------------------')
[docs] @event('plugin.register') def register_plugin(): plugin.register(FilterExistsMovie, 'exists_movie', interfaces=['task'], api_ver=2)