Source code for flexget.components.managed_lists.lists.regexp_list.db

from datetime import datetime

from loguru import logger
from sqlalchemy import Column, DateTime, ForeignKey, Integer, Unicode, and_, func
from sqlalchemy.orm import relationship

from flexget.db_schema import versioned_base, with_session
from flexget.entry import Entry

logger = logger.bind(name='regexp_list')
Base = versioned_base('regexp_list', 1)


[docs] class RegexpListList(Base): __tablename__ = 'regexp_list_lists' id = Column(Integer, primary_key=True) name = Column(Unicode, unique=True) added = Column(DateTime, default=datetime.now) regexps = relationship( 'RegexListRegexp', backref='list', cascade='all, delete, delete-orphan', lazy='dynamic' ) def __repr__(self): return f'<RegexpListList name={self.name},id={self.id}>'
[docs] def to_dict(self): return {'id': self.id, 'name': self.name, 'added_on': self.added}
[docs] class RegexListRegexp(Base): __tablename__ = 'regexp_list_regexps' id = Column(Integer, primary_key=True) added = Column(DateTime, default=datetime.now) regexp = Column(Unicode) list_id = Column(Integer, ForeignKey(RegexpListList.id), nullable=False) def __repr__(self): return f'<RegexListRegexp regexp={self.regexp},list_name={self.list.name}>'
[docs] def to_entry(self): entry = Entry() entry['title'] = entry['regexp'] = self.regexp entry['url'] = f'mock://localhost/regexp_list/{self.id}' return entry
[docs] def to_dict(self): return {'id': self.id, 'added_on': self.added, 'regexp': self.regexp}
@with_session def get_regexp_lists(name=None, session=None): logger.debug('retrieving regexp lists') query = session.query(RegexpListList) if name: logger.debug('filtering by name {}', name) query = query.filter(RegexpListList.name.contains(name)) return query.all() @with_session def get_list_by_exact_name(name, session=None): logger.debug('returning list with name {}', name) return ( session .query(RegexpListList) .filter(func.lower(RegexpListList.name) == name.lower()) .one_or_none() ) @with_session def get_regexps_by_list_id( list_id, count=False, start=None, stop=None, order_by='added', descending=False, session=None ): query = session.query(RegexListRegexp).filter(RegexListRegexp.list_id == list_id) if count: return query.count() query = query.slice(start, stop).from_self() if descending: query = query.order_by(getattr(RegexListRegexp, order_by).desc()) else: query = query.order_by(getattr(RegexListRegexp, order_by)) return query.all() @with_session def get_list_by_id(list_id, session=None): logger.debug('fetching list with id {}', list_id) return session.query(RegexpListList).filter(RegexpListList.id == list_id).one_or_none() @with_session def get_regexp(list_id, regexp, session=None): regexp_list = get_list_by_id(list_id=list_id, session=session) if regexp_list: logger.debug('searching for regexp {} in list {}', regexp, list_id) return ( session .query(RegexListRegexp) .filter( and_( func.lower(RegexListRegexp.regexp) == regexp.lower(), RegexListRegexp.list_id == list_id, ) ) .first() ) return None @with_session def create_list(list_name, session=None): """Only creates the list if it doesn't exist. :param str list_name: Name of the list :param Session session: :return: regex list with name list_name """ regexp_list = get_list_by_exact_name(list_name, session=session) if not regexp_list: regexp_list = RegexpListList(name=list_name) session.merge(regexp_list) session.commit() return regexp_list @with_session def add_to_list_by_name(list_name, regexp, session=None): regexp_list = create_list(list_name, session=session) existing_regexp = get_regexp(regexp_list.id, regexp, session=session) if not existing_regexp: new_regexp = RegexListRegexp(regexp=regexp, list_id=regexp_list.id) session.merge(new_regexp) session.commit()