Source code for flexget.utils.qualities

from __future__ import annotations

import copy
import functools
import re
from typing import TYPE_CHECKING

from loguru import logger

from flexget.utils.serialization import Serializer

if TYPE_CHECKING:
    from collections.abc import Iterator

logger = logger.bind(name='utils.qualities')


[docs] @functools.total_ordering class QualityComponent: def __init__( self, type: str, value: int, name: str, regexp: str | None = None, modifier: int | None = None, defaults: list[QualityComponent] | None = None, ) -> None: """Init an instance. :param type: Type of quality component. (resolution, source, codec, color_range or audio) :param value: Value used to sort this component with others of like type. :param name: Canonical name for this quality component. :param regexp: Regexps used to match this component. :param modifier: An integer that affects sorting above all other components. :param defaults: An iterable defining defaults for other quality components if this component matches. """ if type not in ['resolution', 'source', 'codec', 'color_range', 'audio']: raise ValueError(f'{type} is not a valid quality component type.') self.type = type self.value = value self.name = name self.modifier = modifier self.defaults = defaults or [] # compile regexp if regexp is None: regexp = re.escape(name) self.regexp = re.compile(r'(?<![^\W_])(' + regexp + r')(?![^\W_])', re.IGNORECASE)
[docs] def matches(self, text: str) -> tuple[bool, str]: """Test if quality matches to text. :param string text: data to be tested against :returns: tuple (matches, remaining text without quality data) """ qual_removed = self.regexp.sub('', text) return text != qual_removed, qual_removed
def __hash__(self) -> int: return hash(self.type + str(self.value)) def __bool__(self) -> bool: return bool(self.value) def __eq__(self, other) -> bool: if isinstance(other, str): other = _registry.get(other) if not isinstance(other, QualityComponent): raise TypeError(f'Cannot compare {self!r} and {other!r}') if other.type == self.type: return self.value == other.value raise TypeError(f'Cannot compare {self.type} and {other.type}') def __lt__(self, other) -> bool: if isinstance(other, str): other = _registry.get(other) if not isinstance(other, QualityComponent): raise TypeError(f'Cannot compare {self!r} and {other!r}') if other.type == self.type: return self.value < other.value raise TypeError(f'Cannot compare {self.type} and {other.type}') def __add__(self, other): if not isinstance(other, int): raise TypeError component_list = globals().get('_' + self.type + 's') index = component_list.index(self) + other if index >= len(component_list): index = -1 return component_list[index] def __sub__(self, other): if not isinstance(other, int): raise TypeError component_list = globals().get('_' + self.type + 's') index = component_list.index(self) - other index = max(index, 0) return component_list[index] def __repr__(self) -> str: return f'<{self.type.title()}(name={self.name},value={self.value})>' def __str__(self) -> str: return self.name def __deepcopy__(self, memo=None): # No mutable attributes, return a regular copy return copy.copy(self)
_resolutions = [ QualityComponent('resolution', 10, '360p'), QualityComponent('resolution', 20, '368p', '368p?'), QualityComponent('resolution', 30, '480p', '480p?'), QualityComponent('resolution', 35, '540p', '540p?'), QualityComponent('resolution', 40, '576p', '576p?'), QualityComponent('resolution', 45, 'hr'), QualityComponent('resolution', 50, '720i'), QualityComponent('resolution', 60, '720p', '(1280x)?720(p|hd)?x?([56]0)?'), QualityComponent('resolution', 70, '1080i'), QualityComponent('resolution', 80, '1080p', '(1920x)?1080p?x?([56]0)?'), QualityComponent('resolution', 90, '2160p', '((3840x)?2160p?x?([56]0)?)|4k'), ] _sources = [ QualityComponent('source', 10, 'workprint', modifier=-8), QualityComponent('source', 20, 'cam', '(?:hd)?cam', modifier=-7), QualityComponent('source', 30, 'ts', '(?:hd)?ts|telesync', modifier=-6), QualityComponent('source', 40, 'tc', 'tc|telecine', modifier=-5), QualityComponent('source', 50, 'r5', 'r[2-8c]', modifier=-4), QualityComponent('source', 60, 'hdrip', r'hd[\W_]?rip', modifier=-3), QualityComponent('source', 70, 'ppvrip', r'ppv[\W_]?rip', modifier=-2), QualityComponent('source', 80, 'preair', modifier=-1), QualityComponent('source', 90, 'tvrip', r'tv[\W_]?rip'), QualityComponent('source', 100, 'dsr', r'dsr|ds[\W_]?rip'), QualityComponent('source', 110, 'sdtv', r'(?:[sp]dtv|dvb)(?:[\W_]?rip)?'), QualityComponent('source', 120, 'dvdscr', r'(?:(?:dvd|web)[\W_]?)?scr(?:eener)?', modifier=0), QualityComponent('source', 130, 'bdscr', 'bdscr(?:eener)?'), QualityComponent('source', 140, 'webrip', r'web[\W_]?rip'), QualityComponent('source', 150, 'hdtv', r'a?hdtv(?:[\W_]?rip)?'), QualityComponent('source', 160, 'webdl', r'web(?:[\W_]?(dl|hd))?'), QualityComponent('source', 170, 'dvdrip', r'dvd(?:[\W_]?rip)?'), QualityComponent('source', 175, 'remux'), QualityComponent('source', 180, 'bluray', r'(?:b[dr][\W_]?rip|blu[\W_]?ray(?:[\W_]?rip)?)'), ] _codecs = [ QualityComponent('codec', 10, 'divx'), QualityComponent('codec', 20, 'xvid'), QualityComponent('codec', 25, 'nvenc'), QualityComponent('codec', 30, 'h264', '[hx].?264'), QualityComponent('codec', 30, 'h264', 'avc'), QualityComponent('codec', 35, 'vp9'), QualityComponent('codec', 40, 'h265', '[hx].?265'), QualityComponent('codec', 40, 'h265', 'hevc'), QualityComponent('codec', 50, 'av1', 'av-?1'), ] hdr = r'hdr([^\w]?10)?' hdr_plus = r'hdr(10)?[^\w]?(\+|p|plus)' dovi = r'(dolby[^\w]?vision|dv|dovi)' _color_ranges = [ QualityComponent('color_range', 10, '8bit', r'8[^\w]?bits?|hi8p?|sdr'), QualityComponent('color_range', 20, '10bit', r'10[^\w]?bits?|hi10p?'), QualityComponent('color_range', 30, 'hdr', hdr), QualityComponent('color_range', 40, 'hdrplus', hdr_plus), QualityComponent('color_range', 50, 'dolbyvision', dovi), QualityComponent('color_range', 60, 'hybrid_hdr', f'(({dovi}|{hdr_plus}|{hdr})\\W?){{2,3}}'), ] channels = r'(?:(?:[^\w+]?[1-7][\W_]?(?:0|1|ch)))' _audios = [ QualityComponent('audio', 10, 'mp3'), # TODO: No idea what order these should go in or if we need different regexps QualityComponent('audio', 20, 'aac', f'aac{channels}?'), QualityComponent('audio', 30, 'dd5.1', f'dd{channels}'), QualityComponent('audio', 40, 'ac3', f'ac3{channels}?'), QualityComponent('audio', 45, 'dd+5.1', f'dd[p+]{channels}'), QualityComponent('audio', 50, 'flac', f'flac{channels}?'), # The DTSs are a bit backwards, but the more specific one needs to be parsed first QualityComponent('audio', 70, 'dtshd', rf'dts[\W_]?hd(?:[\W_]?ma)?{channels}?'), QualityComponent('audio', 60, 'dts'), QualityComponent('audio', 80, 'truehd', f'truehd{channels}?'), ] _UNKNOWNS = { 'resolution': QualityComponent('resolution', 0, 'unknown'), 'source': QualityComponent('source', 0, 'unknown'), 'codec': QualityComponent('codec', 0, 'unknown'), 'color_range': QualityComponent('color_range', 0, 'unknown'), 'audio': QualityComponent('audio', 0, 'unknown'), } _registry: dict[str, QualityComponent] = {} for items in (_resolutions, _sources, _codecs, _color_ranges, _audios): for item in items: _registry[item.name] = item
[docs] def all_components() -> Iterator[QualityComponent]: return iter(_registry.values())
[docs] @functools.total_ordering class Quality(Serializer): """Parses and stores the quality of an entry in the four component categories.""" def __init__(self, text: str = '') -> None: """:param text: A string to parse quality from""" self.text = text self.clean_text = text if text: self.parse(text) else: self.resolution = _UNKNOWNS['resolution'] self.source = _UNKNOWNS['source'] self.codec = _UNKNOWNS['codec'] self.color_range = _UNKNOWNS['color_range'] self.audio = _UNKNOWNS['audio']
[docs] def parse(self, text: str) -> None: """Parse a string to determine the quality in the four component categories. :param text: The string to parse """ self.text = text self.clean_text = text self.resolution = self._find_best(_resolutions, _UNKNOWNS['resolution'], False) self.source = self._find_best(_sources, _UNKNOWNS['source']) self.codec = self._find_best(_codecs, _UNKNOWNS['codec']) self.color_range = self._find_best(_color_ranges, _UNKNOWNS['color_range'], False) self.audio = self._find_best(_audios, _UNKNOWNS['audio']) # If any of the matched components have defaults, set them now. for component in self.components: for default in component.defaults: default = _registry[default] if not getattr(self, default.type): setattr(self, default.type, default)
[docs] def _find_best( self, qlist: list[QualityComponent], default: QualityComponent, strip_all: bool = True, ) -> QualityComponent: """Find the highest matching quality component from `qlist`.""" result = None search_in = self.clean_text for item in qlist: match = item.matches(search_in) if match[0]: result = item self.clean_text = match[1] if strip_all: # In some cases we want to strip all found quality components, # even though we're going to return only the last of them. search_in = self.clean_text if item.modifier is not None: # If this item has a modifier, do not proceed to check higher qualities in the list break return result or default
@property def name(self) -> str: name = ' '.join( str(p) for p in (self.resolution, self.source, self.codec, self.color_range, self.audio) if p.value != 0 ) return name or 'unknown' @property def components(self) -> list[QualityComponent]: return [self.resolution, self.source, self.codec, self.color_range, self.audio]
[docs] @classmethod def serialize(cls, quality: Quality) -> str: return str(quality)
[docs] @classmethod def deserialize(cls, data: str, version: int) -> Quality: return cls(data)
@property def _comparator(self) -> list: modifier = sum(c.modifier for c in self.components if c.modifier) return [modifier, *self.components] def __contains__(self, other): if isinstance(other, str): other = Quality(other) if not other or not self: return False for cat in ('resolution', 'source', 'audio', 'color_range', 'codec'): othercat = getattr(other, cat) if othercat and othercat != getattr(self, cat): return False return True def __bool__(self) -> bool: return any(self._comparator) def __eq__(self, other) -> bool: if isinstance(other, str): other = Quality(other) if not isinstance(other, Quality): if other is None: return False raise TypeError(f'Cannot compare {self!r} and {other!r}') return self._comparator == other._comparator def __lt__(self, other) -> bool: if isinstance(other, str): other = Quality(other) if not isinstance(other, Quality): raise TypeError(f'Cannot compare {self!r} and {other!r}') return self._comparator < other._comparator def __repr__(self) -> str: return f'<Quality(resolution={self.resolution},source={self.source},codec={self.codec},color_range={self.color_range},audio={self.audio})>' def __str__(self) -> str: return self.name def __hash__(self) -> int: # Make these usable as dict keys return hash(self.name)
[docs] def get(quality_name: str) -> Quality: """Return a quality object based on canonical quality name.""" found_components = {} for part in quality_name.lower().split(): component = _registry.get(part) if not component: raise ValueError(f'`{part}` is not a valid quality string') if component.type in found_components: raise ValueError(f'`{component.type}` cannot be defined twice in a quality') found_components[component.type] = component if not found_components: raise ValueError('No quality specified') result = Quality() for type, component in found_components.items(): setattr(result, type, component) return result
[docs] class RequirementComponent: """Represent requirements for a given component type. Can evaluate whether a given QualityComponent meets those requirements. """ def __init__(self, type: str) -> None: self.type = type self.min: QualityComponent | None = None self.max: QualityComponent | None = None self.acceptable: set[QualityComponent] = set() self.none_of: set[QualityComponent] = set()
[docs] def reset(self) -> None: self.min = None self.max = None self.acceptable = set() self.none_of = set()
[docs] def allows(self, comp: QualityComponent, loose: bool = False) -> bool: if comp.type != self.type: raise TypeError(f'Cannot compare {comp!r} against {self.type}') if comp in self.none_of: return False if loose: return True if comp in self.acceptable: return True if self.min or self.max: if self.min and comp < self.min: return False return not (self.max and comp > self.max) return bool(not self.acceptable)
[docs] def add_requirement(self, text: str) -> None: if '-' in text: min_str, max_str = text.split('-') min_quality, max_quality = _registry[min_str], _registry[max_str] if min_quality.type != max_quality.type != self.type: raise ValueError(f'Component type mismatch: {text}') self.min, self.max = min_quality, max_quality elif '|' in text: req_quals = text.split('|') quals = {_registry[qual] for qual in req_quals} if any(qual.type != self.type for qual in quals): raise ValueError(f'Component type mismatch: {text}') self.acceptable |= quals else: qual = _registry[text.strip('!<>=+')] if qual.type != self.type: raise ValueError('Component type mismatch!') if text in _registry: self.acceptable.add(qual) elif text[0] == '<': if text[1] != '=': qual -= 1 self.max = qual elif text[0] == '>' or text.endswith('+'): if text[1] != '=' and not text.endswith('+'): qual += 1 self.min = qual elif text[0] == '!': self.none_of.add(qual)
def __eq__(self, other) -> bool: if not isinstance(other, RequirementComponent): return False return (self.max, self.max, self.acceptable, self.none_of) == ( other.max, other.max, other.acceptable, other.none_of, ) def __hash__(self) -> int: return hash(( self.min, self.max, tuple(sorted(self.acceptable)), tuple(sorted(self.none_of)), ))
[docs] class Requirements: """Represents requirements for allowable qualities. Can determine whether a given Quality passes requirements.""" def __init__(self, req: str = '') -> None: self.text = '' self.resolution = RequirementComponent('resolution') self.source = RequirementComponent('source') self.codec = RequirementComponent('codec') self.color_range = RequirementComponent('color_range') self.audio = RequirementComponent('audio') if req: self.parse_requirements(req) @property def components(self) -> list[RequirementComponent]: return [self.resolution, self.source, self.codec, self.color_range, self.audio]
[docs] def parse_requirements(self, text: str) -> None: """Parse a requirements string. :param text: The string containing quality requirements. """ text = text.lower() if self.text: self.text += ' ' self.text += text if self.text == 'any': for component in self.components: component.reset() return text = text.replace(',', ' ') parts = text.split() try: for part in parts: if '-' in part: found = _registry[part.split('-')[0]] elif '|' in part: found = _registry[part.split('|')[0]] else: found = _registry[part.strip('!<>=+')] for component in self.components: if found.type == component.type: component.add_requirement(part) except KeyError as e: raise ValueError(f'{e.args[0]} is not a valid quality component.')
[docs] def allows(self, qual: Quality | str, loose: bool = False) -> bool: """Determine whether this set of requirements allows a given quality. :param Quality qual: The quality to evaluate. :param bool loose: If True, only ! (not) requirements will be enforced. :rtype: bool :returns: True if given quality passes all component requirements. """ if isinstance(qual, str): qual = Quality(qual) for r_component, q_component in zip(self.components, qual.components, strict=False): if not r_component.allows(q_component, loose=loose): return False return True
def __eq__(self, other) -> bool: if isinstance(other, str): other = Requirements(other) return self.components == other.components def __hash__(self) -> int: return hash(tuple(self.components)) def __str__(self) -> str: return self.text or 'any' def __repr__(self) -> str: return f'<Requirements({self})>'