Source code for flexget.components.managed_lists.lists.pending_list.db

from datetime import datetime

from loguru import logger
from sqlalchemy import Boolean, Column, DateTime, Integer, Unicode, func, select
from sqlalchemy.orm import relationship
from sqlalchemy.sql.elements import and_
from sqlalchemy.sql.schema import ForeignKey

from flexget import db_schema
from flexget.db_schema import versioned_base
from flexget.entry import Entry
from flexget.utils import json, serialization
from flexget.utils.database import entry_synonym, with_session
from flexget.utils.sqlalchemy_utils import table_schema

plugin_name = 'pending_list'
logger = logger.bind(name=plugin_name)
Base = versioned_base(plugin_name, 1)


@db_schema.upgrade(plugin_name)
def upgrade(ver, session):
    if ver is None:
        ver = 0
    if ver == 0:
        table = table_schema('wait_list_entries', session)
        for row in session.execute(select(table.c.id, table.c.json)):
            if not row['json']:
                # Seems there could be invalid data somehow. See #2590
                continue
            data = json.loads(row['json'], decode_datetime=True)
            # If title looked like a date, make sure it's a string
            title = str(data.pop('title'))
            e = Entry(title=title, **data)
            session.execute(
                table.update().where(table.c.id == row['id']).values(json=serialization.dumps(e))
            )
        ver = 1
    return ver


[docs] class PendingListList(Base): __tablename__ = 'pending_list_lists' id = Column(Integer, primary_key=True) name = Column(Unicode, unique=True) added = Column(DateTime, default=datetime.now) entries = relationship( 'PendingListEntry', backref='list', cascade='all, delete, delete-orphan', lazy='dynamic' )
[docs] def to_dict(self): return {'id': self.id, 'name': self.name, 'added_on': self.added}
[docs] class PendingListEntry(Base): __tablename__ = 'wait_list_entries' id = Column(Integer, primary_key=True) list_id = Column(Integer, ForeignKey(PendingListList.id), nullable=False) added = Column(DateTime, default=datetime.now) title = Column(Unicode) original_url = Column(Unicode) _json = Column('json', Unicode) entry = entry_synonym('_json') approved = Column(Boolean) def __init__(self, entry, pending_list_id): self.title = entry['title'] self.original_url = entry.get('original_url') or entry['url'] self.entry = entry self.list_id = pending_list_id self.approved = False def __repr__(self): return f'<PendingListEntry,title={self.title},original_url={self.original_url},approved={self.approved}>'
[docs] def to_dict(self): return { 'id': self.id, 'list_id': self.list_id, 'added_on': self.added, 'title': self.title, 'original_url': self.original_url, 'entry': json.coerce(self.entry), 'approved': self.approved, }
@with_session def get_pending_lists(name=None, session=None): logger.debug('retrieving pending lists') query = session.query(PendingListList) if name: logger.debug('searching for pending lists with name {}', name) query = query.filter(PendingListList.name.contains(name)) return query.all() @with_session def get_list_by_exact_name(name, session=None): logger.debug('returning pending list with name {}', name) return ( session .query(PendingListList) .filter(func.lower(PendingListList.name) == name.lower()) .one() ) @with_session def get_list_by_id(list_id, session=None): logger.debug('returning pending list with id {}', list_id) return session.query(PendingListList).filter(PendingListList.id == list_id).one() @with_session def delete_list_by_id(list_id, session=None): entry_list = get_list_by_id(list_id=list_id, session=session) if entry_list: logger.debug('deleting pending list with id {}', list_id) session.delete(entry_list) @with_session def get_entries_by_list_id( list_id, start=None, stop=None, order_by='title', descending=False, approved=False, filter=None, entry_ids=None, session=None, ): logger.debug('querying entries from pending list with id {}', list_id) query = session.query(PendingListEntry).filter(PendingListEntry.list_id == list_id) if filter: query = query.filter(func.lower(PendingListEntry.title).contains(filter.lower())) if approved: query = query.filter(PendingListEntry.approved is approved) if entry_ids: query = query.filter(PendingListEntry.id.in_(entry_ids)) if descending: query = query.order_by(getattr(PendingListEntry, order_by).desc()) else: query = query.order_by(getattr(PendingListEntry, order_by)) return query.slice(start, stop).all() @with_session def get_entry_by_title(list_id, title, session=None): entry_list = get_list_by_id(list_id=list_id, session=session) if entry_list: logger.debug('fetching entry with title `{}` from list id {}', title, list_id) return ( session .query(PendingListEntry) .filter(and_(PendingListEntry.title == title, PendingListEntry.list_id == list_id)) .first() ) return None @with_session def get_entry_by_id(list_id, entry_id, session=None): logger.debug('fetching entry with id {} from list id {}', entry_id, list_id) return ( session .query(PendingListEntry) .filter(and_(PendingListEntry.id == entry_id, PendingListEntry.list_id == list_id)) .one() )