import re
from datetime import datetime, timedelta
from string import capwords
from dateutil.parser import parse as parsedate
from loguru import logger
from flexget.utils import qualities
from flexget.utils.parsers.generic import ParseWarning, default_ignore_prefixes, name_to_re
from flexget.utils.parsers.parser import TitleParser
from flexget.utils.tools import ReList
logger = logger.bind(name='seriesparser')
ID_TYPES = ['ep', 'date', 'sequence', 'id'] # may also be 'special'
[docs]
class SeriesParser(TitleParser):
"""Parse series.
:name: series name
:data: data to parse
:expect_ep: expect series to be in season, ep format (ep_regexps)
:expect_id: expect series to be in id format (id_regexps)
"""
separators = '[/ -]'
roman_numeral_re = 'X{0,3}(?:IX|XI{0,4}|VI{0,4}|IV|V|I{1,4})'
english_numbers = [
'one',
'two',
'three',
'four',
'five',
'six',
'seven',
'eight',
'nine',
'ten',
]
# Make sure none of these are found embedded within a word or other numbers
ep_regexps = ReList([
TitleParser.re_not_in_word(regexp)
for regexp in [
rf'(?:series|season|s)\s?(\d{{1,4}})(?:\s(?:.*\s)?)?(?:episode|ep|e|part|pt)\s?(\d{{1,3}}|{roman_numeral_re})(?:\s?e?(\d{{1,2}}))?',
r'(?:series|season)\s?(\d{1,4})\s(\d{1,3})\s?of\s?(?:\d{1,3})',
r'(\d{1,2})\s?x\s?(\d+)(?:\s(\d{1,2}))?',
r'(\d{1,3})\s?of\s?(?:\d{1,3})',
rf'(?:episode|e|ep|part|pt)\s?(\d{{1,3}}|{roman_numeral_re})',
r'part\s({})'.format('|'.join(map(str, english_numbers))),
]
])
season_pack_regexps = ReList([
# S01 or Season 1 but not Season 1 Episode|Part 2
rf'(?:season\s?|s)(\d{{1,}}\b)(?!(?:(?:.*?\s)?(?:episode|e|ep|part|pt)\s?(?:\d{{1,3}}|{roman_numeral_re})|(?:\d{{1,3}})\s?of\s?(?:\d{{1,3}})))',
r'(\d{1,3})\s?x\s?all', # 1xAll
])
unwanted_regexps = ReList([
r'(\d{1,3})\s?x\s?(0+)[^1-9]', # 5x0
r'S(\d{1,3})D(\d{1,3})', # S3D1
r'(?:s|series|\b)\s?\d\s?(?:&\s?\d)?[\s-]*(?:complete|full)',
r'disc\s\d',
])
# Make sure none of these are found embedded within a word or other numbers
date_regexps = ReList([
TitleParser.re_not_in_word(regexp)
for regexp in [
rf'(\d{{2,4}}){separators}(\d{{1,2}}){separators}(\d{{1,2}})',
rf'(\d{{1,2}}){separators}(\d{{1,2}}){separators}(\d{{2,4}})',
rf'(\d{{4}})x(\d{{1,2}}){separators}(\d{{1,2}})',
rf'(\d{{1,2}})(?:st|nd|rd|th)?{separators}([a-z]{{3,10}}){separators}(\d{{4}})',
]
])
sequence_regexps = ReList([
TitleParser.re_not_in_word(regexp)
for regexp in [
r'(\d{1,3})(?:v(?P<version>\d))?',
rf'(?:pt|part)\s?(\d+|{roman_numeral_re})',
]
])
unwanted_sequence_regexps = ReList([r'seasons?\s?\d{1,2}'])
id_regexps = ReList([])
clean_regexps = ReList([r'\[.*?\]', r'\(.*?\)'])
# ignore prefix regexps must be passive groups with 0 or 1 occurrences eg. (?:prefix)?
ignore_prefixes = default_ignore_prefixes
def __init__(
self,
name=None,
alternate_names=None,
identified_by='auto',
name_regexps=None,
ep_regexps=None,
date_regexps=None,
sequence_regexps=None,
id_regexps=None,
strict_name=False,
allow_groups=None,
allow_seasonless=True,
date_dayfirst=None,
date_yearfirst=None,
special_ids=None,
prefer_specials=False,
assume_special=False,
):
"""Init SeriesParser.
:param string name: Name of the series parser is going to try to parse. If not supplied series name will be
guessed from data.
:param list alternate_names: Other names for this series that should be allowed.
:param string identified_by: What kind of episode numbering scheme is expected,
valid values are ep, date, sequence, id and auto (default).
:param list name_regexps: Regexps for name matching or None (default),
by default regexp is generated from name.
:param list ep_regexps: Regexps detecting episode,season format.
Given list is prioritized over built-in regexps.
:param list date_regexps: Regexps detecting date format.
Given list is prioritized over built-in regexps.
:param list sequence_regexps: Regexps detecting sequence format.
Given list is prioritized over built-in regexps.
:param list id_regexps: Custom regexps detecting id format.
Given list is prioritized over built in regexps.
:param boolean strict_name: If True name must be immediately be followed by episode identifier.
:param list allow_groups: Optionally specify list of release group names that are allowed.
:param date_dayfirst: Prefer day first notation of dates when there are multiple possible interpretations.
:param date_yearfirst: Prefer year first notation of dates when there are multiple possible interpretations.
This will also populate attribute `group`.
:param special_ids: Identifiers which will cause entry to be flagged as a special.
:param boolean prefer_specials: If True, label entry which matches both a series identifier and a special
identifier as a special.
"""
self.episodes = 1
self.name = name
self.alternate_names = alternate_names or []
self.data = ''
self.identified_by = identified_by
# Stores the type of identifier found, 'ep', 'date', 'sequence' or 'special'
self.id_type = None
self.name_regexps = ReList(name_regexps or [])
self.re_from_name = False
# If custom identifier regexps were provided, prepend them to the appropriate type of built in regexps
for mode in ID_TYPES:
listname = mode + '_regexps'
if locals()[listname]:
setattr(
self, listname, ReList(locals()[listname] + getattr(SeriesParser, listname))
)
self.specials = self.specials + [i.lower() for i in (special_ids or [])]
self.prefer_specials = prefer_specials
self.assume_special = assume_special
self.strict_name = strict_name
self.allow_groups = allow_groups or []
self.allow_seasonless = allow_seasonless
self.date_dayfirst = date_dayfirst
self.date_yearfirst = date_yearfirst
self.field = None
self._reset()
[docs]
def _reset(self):
# parse produces these
self.season = None
self.episode = None
self.episodes = 1
self.id = None
self.id_type = None
self.id_groups = None
self.quality = None
self.proper_count = 0
self.special = False
# TODO: group is only produced with allow_groups
self.group = None
self.season_pack = None
# false if item does not match series
self.valid = False
[docs]
def remove_dirt(self, data):
"""Replace some characters with spaces."""
return re.sub(r'[_.,\[\]\(\): ]+', ' ', data).strip().lower()
[docs]
def guess_name(self):
"""Attempt to guess a series name based on the provided data."""
# We need to replace certain characters with spaces to make sure episode parsing works right
# We don't remove anything, as the match positions should line up with the original title
clean_title = re.sub(r'[_.,\[\]\(\):]', ' ', self.data)
if self.parse_unwanted(clean_title):
return None
match = self.parse_date(clean_title)
if match:
self.identified_by = 'date'
else:
match = self.parse_season_packs(clean_title)
if not match:
match = self.parse_episode(clean_title)
self.identified_by = 'ep'
if not match:
return None
if match['match'].start() > 1:
# We start using the original title here, so we can properly ignore unwanted prefixes.
# Look for unwanted prefixes to find out where the series title starts
start = 0
prefix = re.match('|'.join(self.ignore_prefixes), self.data)
if prefix:
start = prefix.end()
# If an episode id is found, assume everything before it is series name
name = self.data[start : match['match'].start()]
# Remove possible episode title from series name (anything after a ' - ')
name = name.split(' - ')[0]
# Replace some special characters with spaces
name = re.sub(r'[\._\(\) ]+', ' ', name).strip(' -')
# Normalize capitalization to title case
name = capwords(name)
self.name = name
return name
return None
[docs]
def parse(self, data=None, field=None, quality=None):
# Clear the output variables before parsing
self._reset()
self.field = field
if quality:
self.quality = quality
if data:
self.data = data
if not self.data:
raise ParseWarning(self, 'No data supplied to parse.')
if not self.name:
logger.trace('No name for series `{}` supplied, guessing name.', self.data)
if not self.guess_name():
logger.trace('Could not determine a series name')
return
logger.trace('Series name for {} guessed to be {}', self.data, self.name)
# check if data appears to be unwanted (abort)
if self.parse_unwanted(self.remove_dirt(self.data)):
raise ParseWarning(self, f'`{self.data}` appears to be an episode pack')
name = self.remove_dirt(self.name)
logger.trace('name: {} data: {}', name, self.data)
# name end position
name_start = 0
name_end = 0
# regexp name matching
if not self.name_regexps:
# if we don't have name_regexps, generate one from the name
self.name_regexps = ReList(
name_to_re(name, self.ignore_prefixes, self)
for name in [self.name, *self.alternate_names]
)
# With auto regex generation, the first regex group captures the name
self.re_from_name = True
# try all specified regexps on this data
for name_re in self.name_regexps:
match = re.search(name_re, self.data)
if match:
match_start, match_end = match.span(1 if self.re_from_name else 0)
# Always pick the longest matching regex
if match_end > name_end:
name_start, name_end = match_start, match_end
logger.trace('NAME SUCCESS: {} matched to {}', name_re.pattern, self.data)
if not name_end:
# leave this invalid
logger.trace(
'FAIL: name regexps {} do not match {}',
[regexp.pattern for regexp in self.name_regexps],
self.data,
)
return
# remove series name from raw data, move any prefix to end of string
data_stripped = self.data[name_end:] + ' ' + self.data[:name_start]
data_stripped = data_stripped.lower()
logger.trace('data stripped: {}', data_stripped)
# allow group(s)
if self.allow_groups:
for group in self.allow_groups:
group = group.lower()
for fmt in ['[%s]', '-%s', '(%s)']:
if fmt % group in data_stripped:
logger.trace('{} is from group {}', self.data, group)
self.group = group
data_stripped = data_stripped.replace(fmt % group, '')
break
if self.group:
break
else:
logger.trace('{} is not from groups {}', self.data, self.allow_groups)
return # leave invalid
# Find quality and clean from data
logger.trace('parsing quality ->')
quality = qualities.Quality(data_stripped)
if quality:
# Remove quality string from data
logger.trace('quality detected, using remaining data `{}`', quality.clean_text)
data_stripped = quality.clean_text
# Don't override passed in quality
if not self.quality:
self.quality = quality
# Remove unwanted words from data for ep / id parsing
data_stripped = self.remove_words(data_stripped, self.remove, not_in_word=True)
data_parts = re.split(r'[\W_]+', data_stripped)
for part in data_parts[:]:
if part in self.propers:
self.proper_count += 1
data_parts.remove(part)
elif part == 'fastsub':
# Subtract 5 to leave room for fastsub propers before the normal release
self.proper_count -= 5
data_parts.remove(part)
elif part in self.specials:
self.special = True
data_parts.remove(part)
data_stripped = ' '.join(data_parts).strip()
logger.trace("data for date/ep/id parsing '{}'", data_stripped)
# Try date mode before ep mode
if self.identified_by in ['date', 'auto']:
date_match = self.parse_date(data_stripped)
if date_match:
if self.strict_name and date_match['match'].start() > 1:
return
self.id = date_match['date']
self.id_groups = date_match['match'].groups()
self.id_type = 'date'
self.valid = True
if not (self.special and self.prefer_specials):
return
else:
logger.trace('-> no luck with date_regexps')
if self.identified_by in ['ep', 'auto'] and not self.valid:
ep_match = self.parse_episode(data_stripped)
if ep_match:
# strict_name
if self.strict_name and ep_match['match'].start() > 1:
return
if ep_match['end_episode'] and ep_match['end_episode'] > ep_match['episode'] + 2:
# This is a pack of too many episodes, ignore it.
logger.trace(
'Series pack contains too many episodes ({}). Rejecting',
ep_match['end_episode'] - ep_match['episode'],
)
return
self.season = ep_match['season']
self.episode = ep_match['episode']
if ep_match['end_episode']:
self.episodes = (ep_match['end_episode'] - ep_match['episode']) + 1
self.id = (self.season, self.episode)
self.id_type = 'ep'
self.valid = True
if not (self.special and self.prefer_specials):
return
else:
season_pack_match = self.parse_season_packs(data_stripped)
# If a title looks like a special, give it precedence over season pack
if season_pack_match and not self.special:
if self.strict_name and season_pack_match['match'].start() > 1:
return
self.season = season_pack_match['season']
self.season_pack = True
self.id = (season_pack_match['season'], 0)
self.id_type = 'ep'
self.valid = True
else:
logger.trace('-> no luck with ep_regexps')
if self.identified_by == 'ep' and not self.season_pack:
# we should be getting season, ep !
# try to look up idiotic numbering scheme 101,102,103,201,202
# ressu: Added matching for 0101, 0102... It will fail on
# season 11 though
logger.trace('ep identifier expected. Attempting SEE format parsing.')
# remove obvious date format from this desperate try
desperate = re.sub(
r'\d{4}\s\d{1,2}\s\d{1,2}|\b(?:19|20)\d{2}\b', '', data_stripped
)
match = re.search(
self.re_not_in_word(r'(\d?\d)(\d\d)'),
desperate,
re.IGNORECASE,
)
if match:
logger.trace('-> had luck with SEE')
# strict_name
if self.strict_name and match.start() > 1:
return
self.season = int(match.group(1))
self.episode = int(match.group(2))
self.id = (self.season, self.episode)
logger.trace(self)
self.id_type = 'ep'
self.valid = True
return
logger.trace('-> no luck with SEE')
# Check id regexps
if self.identified_by in ['id', 'auto'] and not self.valid:
for id_re in self.id_regexps:
match = re.search(id_re, data_stripped)
if match:
# strict_name
if self.strict_name and match.start() > 1:
return
found_id = '-'.join(g for g in match.groups() if g)
if not found_id:
# If match groups were all blank, don't accept this match
continue
self.id = found_id
self.id_type = 'id'
self.valid = True
logger.trace("found id '{}' with regexp '{}'", self.id, id_re.pattern)
if not (self.special and self.prefer_specials):
return
break
else:
logger.trace('-> no luck with id_regexps')
# Other modes are done, check for unwanted sequence ids
if self.parse_unwanted_sequence(data_stripped):
return
# Check sequences last as they contain the broadest matches
if self.identified_by in ['sequence', 'auto'] and not self.valid:
for sequence_re in self.sequence_regexps:
match = re.search(sequence_re, data_stripped)
if match:
# strict_name
if self.strict_name and match.start() > 1:
return
# First matching group is the sequence number
try:
self.id = int(match.group(1))
except ValueError:
self.id = self.roman_to_int(match.group(1))
self.season = 0
self.episode = self.id
# If anime style version was found, overwrite the proper count with it
if 'version' in match.groupdict() and match.group('version'):
self.proper_count = int(match.group('version')) - 1
self.id_type = 'sequence'
self.valid = True
logger.trace("found id '{}' with regexp '{}'", self.id, sequence_re.pattern)
if not (self.special and self.prefer_specials):
return
break
else:
logger.trace('-> no luck with sequence_regexps')
# No id found, check if this is a special
if self.special or self.assume_special:
# Attempt to set id as the title of the special
self.id = data_stripped or 'special'
self.id_type = 'special'
self.valid = True
logger.trace("found special, setting id to '{}'", self.id)
return
if self.valid:
return
msg = f'Title `{self.data}` looks like series `{self.name}` but cannot find '
if self.identified_by == 'auto':
msg += 'any series numbering.'
else:
msg += f'a(n) `{self.identified_by}` style identifier.'
raise ParseWarning(self, msg)
[docs]
def parse_unwanted(self, data):
"""Parse data for an unwanted hits. Return True if the data contains unwanted hits."""
for unwanted_re in self.unwanted_regexps:
match = re.search(unwanted_re, data)
if match:
logger.trace('unwanted regexp {} matched {}', unwanted_re.pattern, match.groups())
return True
return None
[docs]
def parse_unwanted_sequence(self, data):
"""Parse data for an unwanted id hits.
Return True if the data contains unwanted hits.
"""
for seq_unwanted_re in self.unwanted_sequence_regexps:
match = re.search(seq_unwanted_re, data)
if match:
logger.trace('unwanted id regexp {} matched {}', seq_unwanted_re, match.groups())
return True
return None
[docs]
def parse_date(self, data):
"""Parse :data: for a date identifier.
If found, returns the date and regexp match object
If no date is found returns False
"""
for date_re in self.date_regexps:
match = re.search(date_re, data)
if match:
# Check if this is a valid date
possdates = []
try:
# By default dayfirst and yearfirst will be tried as both True and False
# if either have been defined manually, restrict that option
dayfirst_opts = [True, False]
if self.date_dayfirst is not None:
dayfirst_opts = [self.date_dayfirst]
yearfirst_opts = [True, False]
if self.date_yearfirst is not None:
yearfirst_opts = [self.date_yearfirst]
kwargs_list = (
{'dayfirst': d, 'yearfirst': y}
for d in dayfirst_opts
for y in yearfirst_opts
)
for kwargs in kwargs_list:
possdate = parsedate(' '.join(match.groups()), **kwargs)
# Don't accept dates farther than a day in the future
if possdate > datetime.now() + timedelta(days=1):
continue
# Don't accept dates that are too old
if possdate < datetime(1970, 1, 1):
continue
if possdate not in possdates:
possdates.append(possdate)
except ValueError:
logger.trace('{} is not a valid date, skipping', match.group(0))
continue
if not possdates:
logger.trace('All possible dates for {} were in the future', match.group(0))
continue
possdates.sort()
# Pick the most recent date if there are ambiguities
bestdate = possdates[-1]
return {'date': bestdate, 'match': match}
return False
[docs]
def parse_episode(self, data):
"""Parse :data: for an episode identifier.
If found, returns a dict with keys for season, episode, end_episode and the regexp match object
If no episode id is found returns False
"""
# search for season and episode number
for ep_re in self.ep_regexps:
match = re.search(ep_re, data)
if match:
logger.trace(
'found episode number with regexp {} ({})', ep_re.pattern, match.groups()
)
matches = match.groups()
if len(matches) >= 2:
season = matches[0]
episode = matches[1]
elif self.allow_seasonless:
# assume season 1 if the season was not specified
season = 1
episode = matches[0]
else:
# Return False if we are not allowing seasonless matches and one is found
return False
# Convert season and episode to integers
try:
season = int(season)
if not episode.isdigit():
try:
idx = self.english_numbers.index(str(episode).lower())
episode = 1 + idx
except ValueError:
episode = self.roman_to_int(episode)
else:
episode = int(episode)
except ValueError:
logger.critical(
'Invalid episode number match {} returned with regexp `{}` for {}',
match.groups(),
ep_re.pattern,
self.data,
)
raise
end_episode = None
if len(matches) == 3 and matches[2]:
end_episode = int(matches[2])
if end_episode <= episode or end_episode > episode + 12:
# end episode cannot be before start episode
# Assume large ranges are not episode packs TODO: is this the best way?
end_episode = None
# Successfully found an identifier, return the results
return {
'season': season,
'episode': episode,
'end_episode': end_episode,
'match': match,
}
return False
[docs]
def parse_season_packs(self, data):
"""Parse data for season packs. Return True if the data contains a hit."""
for season_pack_re in self.season_pack_regexps:
match = re.search(season_pack_re, data)
if match:
logger.trace(
'season pack regexp {} match {}', season_pack_re.pattern, match.groups()
)
matches = match.groups()
if len(matches) == 1:
# Single season full pack, no parts etc
season = int(matches[0])
return {'season': season, 'match': match}
if len(matches) == 2:
# TODO: support other formats of season packs: 1xall, s01-PART1, etc.
pass
return None
[docs]
def roman_to_int(self, roman):
"""Convert roman numerals up to 39 to integers."""
roman_map = [('X', 10), ('IX', 9), ('V', 5), ('IV', 4), ('I', 1)]
roman = roman.upper()
# Return False if this is not a roman numeral we can translate
for char in roman:
if char not in 'XVI':
raise ValueError(f'`{roman}` is not a valid roman numeral')
# Add up the parts of the numeral
i = result = 0
for numeral, integer in roman_map:
while roman[i : i + len(numeral)] == numeral:
result += integer
i += len(numeral)
return result
def __str__(self):
# for some fucking reason it's impossible to print self.field here, if someone figures out why please
# tell me!
valid = 'INVALID'
if self.valid:
valid = 'OK'
return (
f'<SeriesParser(data={self.data},name={self.name},id={self.id!s},id_type={self.id_type},identified_by={self.identified_by},season={self.season},season_pack={self.season_pack},episode={self.episode},quality={self.quality},proper={self.proper_count},'
f'status={valid})>'
)