import logging
import os
import re
import sys
import time
from loguru import logger
from flexget import plugin
from flexget.event import event
from flexget.utils import qualities
from flexget.utils.parsers.generic import ParseWarning, default_ignore_prefixes, name_to_re
from flexget.utils.tools import ReList
from .parser_common import MovieParseResult, SeriesParseResult
# rebulk (that underlies guessit) will use the 'regex' module rather than 're' if installed.
# For consistency, prevent that unless env variable is explicitly already enabling it.
os.environ.setdefault('REGEX_DISABLED', 'true')
from guessit.api import GuessItApi, GuessitException
from guessit.rules import rebulk_builder
from rebulk import Rebulk
from rebulk.pattern import RePattern
logger = logger.bind(name='parser_guessit')
logging.getLogger('rebulk').setLevel(logging.WARNING)
logging.getLogger('guessit').setLevel(logging.WARNING)
[docs]
def _id_regexps_function(input_string, context):
return [
match.span
for regexp in context.get('id_regexps')
for match in RePattern(regexp, children=True).matches(input_string, context)
]
_id_regexps = Rebulk().functional(
_id_regexps_function, name='regexpId', disabled=lambda context: not context.get('id_regexps')
)
[docs]
def rules_builder(config):
rebulk = rebulk_builder(config)
rebulk.rebulk(_id_regexps)
return rebulk
guessit_api = GuessItApi()
guessit_api.configure(options={}, rules_builder=rules_builder, force=True)
[docs]
def normalize_component(data):
if data is None:
return []
if isinstance(data, list):
return [d.lower().replace('-', '') for d in data]
return [data.lower().replace('-', '')]
try:
preferred_clock = time.process_time
except AttributeError:
preferred_clock = time.clock
[docs]
class ParserGuessit:
SOURCE_MAP = {
'Camera': 'cam',
'HD Camera': 'cam',
'HD Telesync': 'telesync',
'Pay-per-view': 'ppv',
'Digital TV': 'dvb',
'Video on Demand': 'vod',
'Analog HDTV': 'ahdtv',
'Ultra HDTV': 'uhdtv',
'HD Telecine': 'hdtc',
'Web': 'web-dl',
}
[docs]
@staticmethod
def _guessit_options(options):
settings = {
'name_only': True,
'allowed_languages': ['en', 'fr'],
'allowed_countries': ['us', 'uk', 'gb'],
'single_value': True,
}
options['episode_prefer_number'] = options.get('identified_by') != 'ep'
if options.get('allow_groups'):
options['expected_group'] = options['allow_groups']
if 'date_yearfirst' in options:
options['date_year_first'] = options['date_yearfirst']
if 'date_dayfirst' in options:
options['date_day_first'] = options['date_dayfirst']
# See https://github.com/guessit-io/guessit/issues/329
# https://github.com/guessit-io/guessit/pull/333
# They made changes that break backward compatibility, so we have to make do this hackery
elif options.get('date_year_first'):
options['date_day_first'] = True
settings.update(options)
return settings
[docs]
@staticmethod
def _proper_count(guessit_result):
"""Calculate a FlexGet style proper_count from a guessit result."""
version = guessit_result.get('version')
if version is None:
version = 0
elif version <= 0:
version = -1
else:
version -= 1
proper_count = guessit_result.get('proper_count', 0)
fastsub = 'fast subtitled' in normalize_component(guessit_result.values_list.get('other'))
return version + proper_count - (5 if fastsub else 0)
[docs]
def _source(self, guessit_result):
other = normalize_component(guessit_result.values_list.get('other'))
source = self.SOURCE_MAP.get(guessit_result.get('source'), guessit_result.get('source'))
# special case
if source == 'web-dl' and 'rip' in other:
source = 'webrip'
source = normalize_component(source)
if 'preair' in other:
source.append('preair')
if 'screener' in other:
if 'bluray' in source:
source.append('bdscr')
else:
source.append('dvdscr')
if 'region 5' in other or 'region c' in other:
source.append('r5')
return source
[docs]
def _quality(self, guessit_result):
"""Generate a FlexGet Quality from a guessit result."""
resolution = normalize_component(guessit_result.values_list.get('screen_size'))
other = normalize_component(guessit_result.values_list.get('other'))
if not resolution and 'high resolution' in other:
resolution.append('hr')
source = self._source(guessit_result)
codec = normalize_component(guessit_result.values_list.get('video_codec'))
if '10bit' in normalize_component(guessit_result.values_list.get('color_depth')):
codec.append('10bit')
audio = normalize_component(guessit_result.values_list.get('audio_codec'))
audio_profile = normalize_component(guessit_result.values_list.get('audio_profile'))
audio_channels = normalize_component(guessit_result.values_list.get('audio_channels'))
# unlike the other components, audio can be a bit iffy with multiple codecs, so we limit it to one
if 'dts' in audio and any(hd in audio_profile for hd in ['hd', 'master audio']):
audio = ['dtshd']
elif '5.1' in audio_channels and 'dolby digital plus' in audio:
audio = ['dd+5.1']
elif '5.1' in audio_channels and 'dolby digital' in audio:
audio = ['dd5.1']
# Make sure everything are strings (guessit will return lists when there are multiples)
flattened_qualities = []
for component in (resolution, source, codec, audio):
if isinstance(component, list):
flattened_qualities.append(' '.join(component))
elif isinstance(component, str):
flattened_qualities.append(component)
else:
raise ParseWarning(
self,
f'Guessit quality returned type {type(component)}: {component}. Expected str or list.',
)
return qualities.Quality(' '.join(flattened_qualities))
# movie_parser API
[docs]
def parse_movie(self, data, **kwargs):
logger.debug('Parsing movie: `{}` [options: {}]', data, kwargs)
start = preferred_clock()
guessit_options = self._guessit_options(kwargs)
guessit_options['type'] = 'movie'
guess_result = guessit_api.guessit(data, options=guessit_options)
# NOTE: Guessit expects str on PY3 and unicode on PY2 hence the use of future.utils.native
parsed = MovieParseResult(
data=data,
name=guess_result.get('title'),
year=guess_result.get('year'),
proper_count=self._proper_count(guess_result),
quality=self._quality(guess_result),
release_group=guess_result.get('release_group'),
valid=bool(
guess_result.get('title')
), # It's not valid if it didn't find a name, which sometimes happens
)
logger.debug('Parsing result: {} (in {} ms)', parsed, (preferred_clock() - start) * 1000)
return parsed
# series_parser API
[docs]
def parse_series(self, data, **kwargs):
logger.debug('Parsing series: `{}` [options: {}]', data, kwargs)
guessit_options = self._guessit_options(kwargs)
valid = True
if kwargs.get('name'):
expected_titles = [kwargs['name']]
if kwargs.get('alternate_names'):
expected_titles.extend(kwargs['alternate_names'])
# apostrophe support
expected_titles = [
title.replace("'", "(?:'|\\'|\\\\'|-|)?") for title in expected_titles
]
guessit_options['expected_title'] = ['re:' + title for title in expected_titles]
if kwargs.get('id_regexps'):
guessit_options['id_regexps'] = kwargs.get('id_regexps')
start = preferred_clock()
# If no series name is provided, we don't tell guessit what kind of match we are looking for
# This prevents guessit from determining that too general of matches are series
parse_type = 'episode' if kwargs.get('name') else None
if parse_type:
guessit_options['type'] = parse_type
# NOTE: Guessit expects str on PY3 and unicode on PY2 hence the use of future.utils.native
try:
guess_result = guessit_api.guessit(data, options=guessit_options)
except GuessitException:
logger.warning('Parsing {} with guessit failed. Most likely a unicode error.', data)
return SeriesParseResult(data=data, valid=False)
if guess_result.get('type') != 'episode':
valid = False
name = kwargs.get('name')
country = str(guess_result.get('country', ''))
if not name:
name = guess_result.get('title', '')
if not name:
valid = False
elif country:
name += f' ({country})'
elif guess_result.matches['title']:
# Make sure the name match is up to FlexGet standards
# Check there is no unmatched cruft before the matched name
title_start = guess_result.matches['title'][0].start
title_end = guess_result.matches['title'][0].end
if title_start != 0:
try:
pre_title = max(
match[0].end
for match in guess_result.matches.values()
if match[0].end <= title_start
)
except ValueError:
pre_title = 0
for char in reversed(data[pre_title:title_start]):
if char.isalnum() or char.isdigit():
return SeriesParseResult(data=data, valid=False)
if char.isspace() or char in '._':
continue
break
# Check the name doesn't end mid-word (guessit might put the border before or after the space after title)
if (
data[title_end - 1].isalnum() and len(data) <= title_end
) or not self._is_valid_name(data, guessit_options=guessit_options):
valid = False
# If we are in exact mode, make sure there is nothing after the title
if kwargs.get('strict_name'):
post_title = sys.maxsize
for match_type, matches in guess_result.matches.items():
if match_type in ['season', 'episode', 'date', 'regexpId']:
if matches[0].start < title_end:
continue
post_title = min(post_title, matches[0].start)
if matches[0].parent:
post_title = min(post_title, matches[0].parent.start)
for char in data[title_end:post_title]:
if char.isalnum() or char.isdigit():
valid = False
else:
valid = False
season = guess_result.get('season')
episode = guess_result.get('episode')
disc = guess_result.get('disc')
if episode is None and 'part' in guess_result:
episode = guess_result['part']
if isinstance(episode, list):
# guessit >=2.1.4 returns a list for multi-packs, but we just want the first one and the number of eps
episode = episode[0]
date = guess_result.get('date')
quality = self._quality(guess_result)
proper_count = self._proper_count(guess_result)
group = guess_result.get('release_group')
# Validate group with from_group
if not self._is_valid_groups(group, guessit_options.get('allow_groups', [])):
valid = False
if country:
try:
series_result = guessit_api.guessit(name)
series_country = str(series_result.get('country', ''))
allowed_countries = guessit_options.get('allowed_countries', [])
if country != series_country or (
not series_country
and allowed_countries
and country.lower() not in allowed_countries
):
valid = False
except GuessitException:
logger.warning('Parsing {} series with guessit failed.', name)
# Check the full list of 'episode_details' for special,
# since things like 'pilot' and 'unaired' can also show up there
special = any(
v.lower() == 'special' for v in guess_result.values_list.get('episode_details', [])
)
if 'episode' not in guess_result.values_list:
episodes = len(guess_result.values_list.get('part', []))
else:
episodes = len(guess_result.values_list['episode'])
if episodes > 3:
valid = False
if 'season' in guess_result.values_list and len(guess_result.values_list['season']) > 1:
valid = False
if disc is not None:
# Disc's are not supported
valid = False
identified_by = kwargs.get('identified_by', 'auto')
identifier_type, identifier, season_pack = None, None, False
if identified_by in ['date', 'auto'] and date:
identifier_type = 'date'
identifier = date
if not identifier_type and identified_by in ['ep', 'auto']:
if episode is not None:
if season is None and kwargs.get('allow_seasonless', True):
if 'part' in guess_result:
season = 1
else:
episode_raw = guess_result.matches['episode'][0].initiator.raw
if episode_raw and any(
c.isalpha() and c.lower() != 'v' for c in episode_raw
):
season = 1
if season is not None:
identifier_type = 'ep'
identifier = (season, episode)
elif season is not None and not special:
season_pack = True
episodes = 1
identifier_type = 'ep'
identifier = (season, 0)
if (
not identifier_type
and identified_by in ['id', 'auto']
and guess_result.matches['regexpId']
):
identifier_type = 'id'
identifier = '-'.join(match.value for match in guess_result.matches['regexpId'])
if not identifier_type and identified_by in ['sequence', 'auto'] and episode is not None:
identifier_type = 'sequence'
identifier = episode
if (not identifier_type or guessit_options.get('prefer_specials')) and (
special or guessit_options.get('assume_special')
):
identifier_type = 'special'
identifier = guess_result.get('episode_title', 'special')
if not identifier_type:
valid = False
# TODO: Legacy - Complete == invalid
if not season_pack and 'complete' in normalize_component(guess_result.get('other')):
valid = False
parsed = SeriesParseResult(
data=data,
name=name,
episodes=episodes,
identified_by=identified_by,
id=identifier,
id_type=identifier_type,
quality=quality,
proper_count=proper_count,
special=special,
group=group,
season_pack=season_pack,
valid=valid,
)
logger.debug('Parsing result: {} (in {} ms)', parsed, (preferred_clock() - start) * 1000)
return parsed
# TODO: The following functions are sort of legacy. No idea if they should be changed.
[docs]
def _is_valid_name(self, data, guessit_options):
if not guessit_options.get('name'):
return True
# name end position
name_end = 0
# regexp name matching
re_from_name = False
name_regexps = ReList(guessit_options.get('name_regexps', []))
if not name_regexps:
# if we don't have name_regexps, generate one from the name
name_regexps = ReList(
name_to_re(name, default_ignore_prefixes, None)
for name in [guessit_options['name'], *guessit_options.get('alternate_names', [])]
)
# With auto regex generation, the first regex group captures the name
re_from_name = True
# try all specified regexps on this data
for name_re in name_regexps:
match = re.search(name_re, data)
if match:
match_end = match.end(1 if re_from_name else 0)
# Always pick the longest matching regex
name_end = max(name_end, match_end)
logger.debug('NAME SUCCESS: {} matched to {}', name_re.pattern, data)
if not name_end:
# leave this invalid
logger.debug(
'FAIL: name regexps {} do not match {}',
[regexp.pattern for regexp in name_regexps],
data,
)
return False
return True
[docs]
def _is_valid_groups(self, group, allow_groups):
if not allow_groups:
return True
if not group:
return False
normalized_allow_groups = [x.lower() for x in allow_groups]
# TODO: special case for guessit with expected_group parameter
if isinstance(group, list):
return any(g.lower() in normalized_allow_groups for g in group)
return group.lower() in normalized_allow_groups
[docs]
@event('plugin.register')
def register_plugin():
plugin.register(
ParserGuessit, 'parser_guessit', interfaces=['movie_parser', 'series_parser'], api_ver=2
)