import os
from collections.abc import MutableSet
from datetime import datetime
from pathlib import Path
from babelfish import Language
from loguru import logger
from sqlalchemy import Column, DateTime, ForeignKey, Integer, Unicode, and_, func
from sqlalchemy.orm import relationship
from flexget import plugin
from flexget.db_schema import versioned_base, with_session
from flexget.entry import Entry
from flexget.event import event
from flexget.manager import Session
from flexget.utils.template import RenderError
from flexget.utils.tools import parse_timedelta
logger = logger.bind(name='subtitle_list')
Base = versioned_base('subtitle_list', 1)
#: Video extensions stolen from https://github.com/Diaoul/subliminal/blob/master/subliminal/video.py
VIDEO_EXTENSIONS = (
'.3g2',
'.3gp',
'.3gp2',
'.3gpp',
'.60d',
'.ajp',
'.asf',
'.asx',
'.avchd',
'.avi',
'.bik',
'.bix',
'.box',
'.cam',
'.dat',
'.divx',
'.dmf',
'.dv',
'.dvr-ms',
'.evo',
'.flc',
'.fli',
'.flic',
'.flv',
'.flx',
'.gvi',
'.gvp',
'.h264',
'.m1v',
'.m2p',
'.m2ts',
'.m2v',
'.m4e',
'.m4v',
'.mjp',
'.mjpeg',
'.mjpg',
'.mkv',
'.moov',
'.mov',
'.movhd',
'.movie',
'.movx',
'.mp4',
'.mpe',
'.mpeg',
'.mpg',
'.mpv',
'.mpv2',
'.mxf',
'.nsv',
'.nut',
'.ogg',
'.ogm',
'.omf',
'.ps',
'.qt',
'.ram',
'.rm',
'.rmvb',
'.swf',
'.ts',
'.vfw',
'.vid',
'.video',
'.viv',
'.vivo',
'.vob',
'.vro',
'.wm',
'.wmv',
'.wmx',
'.wrap',
'.wvx',
'.wx',
'.x264',
'.xvid',
)
[docs]
def normalize_language(language):
if isinstance(language, Language):
return str(language)
return str(Language.fromietf(language))
[docs]
class SubtitleListList(Base):
__tablename__ = 'subtitle_list_lists'
id = Column(Integer, primary_key=True)
name = Column(Unicode, unique=True)
added = Column(DateTime, default=datetime.now)
files = relationship(
'SubtitleListFile', backref='list', cascade='all, delete, delete-orphan', lazy='dynamic'
)
def __repr__(self):
return f'<SubtitleListList name={self.name},id={self.id}>'
[docs]
def to_dict(self):
return {'id': self.id, 'name': self.name, 'added_on': self.added}
[docs]
class SubtitleListFile(Base):
__tablename__ = 'subtitle_list_files'
id = Column(Integer, primary_key=True)
added = Column(DateTime, default=datetime.now)
title = Column(Unicode)
location = Column(Unicode)
list_id = Column(Integer, ForeignKey(SubtitleListList.id), nullable=False)
languages = relationship(
'SubtitleListLanguage', backref='file', lazy='joined', cascade='all, delete-orphan'
)
remove_after = Column(Unicode)
def __repr__(self):
return f'<SubtitleListFile title={self.title},path={self.location},list_name={self.list.name}>'
[docs]
def to_entry(self):
entry = Entry()
entry['title'] = self.title
entry['url'] = f'mock://localhost/subtitle_list/{self.id}'
entry['location'] = Path(self.location)
entry['remove_after'] = self.remove_after
entry['added'] = self.added
entry['subtitle_languages'] = []
for subtitle_language in self.languages:
entry['subtitle_languages'].append(Language.fromietf(subtitle_language.language))
return entry
[docs]
def to_dict(self):
subtitle_languages = [
subtitle_list_language.language for subtitle_list_language in self.languages
]
return {
'id': self.id,
'added_on': self.added,
'title': self.title,
'location': self.location,
'subtitle_languages': subtitle_languages,
}
[docs]
class SubtitleListLanguage(Base):
__tablename__ = 'subtitle_list_languages'
id = Column(Integer, primary_key=True)
added = Column(DateTime, default=datetime.now)
language = Column(Unicode)
subtitle_list_file_id = Column(Integer, ForeignKey('subtitle_list_files.id'))
[docs]
class SubtitleList(MutableSet):
schema = {
'type': 'object',
'properties': {
'list': {'type': 'string'},
'languages': {'type': 'array', 'items': {'type': 'string'}, 'minItems': 1},
'check_subtitles': {'type': 'boolean', 'default': True},
'remove_after': {'type': 'string', 'format': 'interval'},
'path': {'type': 'string'},
'allow_dir': {'type': 'boolean', 'default': False},
'recursion_depth': {'type': 'integer', 'default': 1, 'minimum': 1},
'force_file_existence': {'type': 'boolean', 'default': True},
},
'required': ['list'],
'additionalProperties': False,
}
[docs]
def _db_list(self, session):
return (
session
.query(SubtitleListList)
.filter(SubtitleListList.name == self.config['list'])
.first()
)
[docs]
def _from_iterable(self, it):
# TODO: is this the right answer? the returned object won't have our custom __contains__ logic
return set(it)
@with_session
def __init__(self, config, session=None):
self.config = config
db_list = self._db_list(session)
if not db_list:
session.add(SubtitleListList(name=self.config['list']))
def __iter__(self):
with Session() as session:
return iter([file.to_entry() for file in self._db_list(session).files])
def __len__(self):
with Session() as session:
return self._db_list(session).files.count()
[docs]
def add(self, entry):
with Session() as session:
path = self._extract_path(entry)
if not path:
logger.error('Entry {} does not represent a local file/dir.', entry['title'])
return None
path_exists = path.exists()
if self.config['force_file_existence'] and not path_exists:
logger.error('Path {} does not exist. Not adding to list.', path)
return None
if path_exists and not self.config.get('allow_dir') and path.is_dir():
logger.error(
'Path {} is a directory and "allow_dir"={}.', path, self.config['allow_dir']
)
return None
# Check if this is already in the list, refresh info if so
db_list = self._db_list(session=session)
db_file = self._find_entry(entry, session=session)
# Just delete and re-create to refresh
if db_file:
session.delete(db_file)
db_file = SubtitleListFile()
db_file.title = entry['title']
db_file.location = str(path)
db_file.languages = []
db_file.remove_after = self.config.get('remove_after')
db_file.languages = []
normalized_languages = {
normalize_language(subtitle_language)
for subtitle_language in self.config.get('languages', [])
}
for subtitle_language in normalized_languages:
language = SubtitleListLanguage(language=subtitle_language)
db_file.languages.append(language)
logger.debug('adding entry {} with languages {}', entry, normalized_languages)
db_list.files.append(db_file)
session.commit()
return db_file.to_entry()
[docs]
def discard(self, entry):
with Session() as session:
db_file = self._find_entry(entry, session=session)
if db_file:
logger.debug('deleting file {}', db_file)
session.delete(db_file)
def __contains__(self, entry):
return self._find_entry(entry, match_file_to_dir=True) is not None
@with_session
def _find_entry(self, entry, match_file_to_dir=False, session=None):
"""Find `SubtitleListFile` corresponding to this entry, if it exists."""
path = self._extract_path(entry)
res = self._db_list(session).files.filter(SubtitleListFile.location == str(path)).first()
if not res and match_file_to_dir:
path = path.parent
res = (
self._db_list(session).files.filter(SubtitleListFile.location == str(path)).first()
)
return res
@with_session
def _find_language(self, file_id, language, session=None):
return (
session
.query(SubtitleListLanguage)
.filter(
and_(
func.lower(SubtitleListLanguage.language) == str(language).lower(),
SubtitleListLanguage.subtitle_list_file_id == file_id,
)
)
.first()
)
@property
def immutable(self):
return False
@property
def online(self):
"""Set the online status of the plugin.
Online plugin should be treated differently in certain situations, like test mode
"""
return False
@with_session
def get(self, entry, session):
match = self._find_entry(entry=entry, session=session)
return match.to_entry() if match else None
[docs]
class PluginSubtitleList:
"""Subtitle list."""
schema = SubtitleList.schema
[docs]
@staticmethod
def get_list(config):
return SubtitleList(config)
[docs]
def all_subtitles_exist(self, file, wanted_languages):
try:
import subliminal
existing_languages = {
subtitle.language
for subtitle in subliminal.core.search_external_subtitles(file).values()
}
if wanted_languages and len(wanted_languages - existing_languages) == 0:
logger.info('Local subtitle(s) already exists for {}.', file)
return True
except ImportError:
logger.warning('Subliminal not found. Unable to check for local subtitles.')
else:
return False
[docs]
@classmethod
def _expired(cls, file, config):
added_interval = file['added'].diff()
return bool(
(file['remove_after'] and added_interval > parse_timedelta(file['remove_after']))
or (
config.get('remove_after')
and added_interval > parse_timedelta(config['remove_after'])
)
)
[docs]
@event('plugin.register')
def register_plugin():
plugin.register(PluginSubtitleList, 'subtitle_list', api_ver=2, interfaces=['task', 'list'])