Source code for flexget.components.managed_lists.lists.entry_list.db

import pickle
from collections.abc import MutableSet
from datetime import datetime

from loguru import logger
from sqlalchemy import Column, DateTime, ForeignKey, Integer, Unicode, func, or_, select
from sqlalchemy.orm import relationship
from sqlalchemy.sql.elements import and_

from flexget import db_schema
from flexget.db_schema import versioned_base
from flexget.entry import Entry
from flexget.manager import Session
from flexget.utils import json, serialization
from flexget.utils.database import entry_synonym, with_session
from flexget.utils.sqlalchemy_utils import table_add_column, table_schema

logger = logger.bind(name='entry_list.db')
Base = versioned_base('entry_list', 2)


@db_schema.upgrade('entry_list')
def upgrade(ver, session):
    if None is ver:
        ver = 0
    if ver == 0:
        table = table_schema('entry_list_entries', session)
        table_add_column(table, 'json', Unicode, session)
        # Make sure we get the new schema with the added column
        table = table_schema('entry_list_entries', session)
        for row in session.execute(select(table.c.id, table.c.entry)):
            try:
                p = pickle.loads(row['entry'])
                session.execute(
                    table
                    .update()
                    .where(table.c.id == row['id'])
                    .values(json=json.dumps(p, encode_datetime=True))
                )
            except KeyError as e:
                logger.error('Unable error upgrading entry_list pickle object due to {}', str(e))

        ver = 1
    if ver == 1:
        table = table_schema('entry_list_entries', session)
        for row in session.execute(select(table.c.id, table.c.json)):
            if not row['json']:
                # Seems there could be invalid data somehow. See #2590
                continue
            data = json.loads(row['json'], decode_datetime=True)
            # If title looked like a date, make sure it's a string
            title = str(data.pop('title'))
            e = Entry(title=title, **data)
            session.execute(
                table.update().where(table.c.id == row['id']).values(json=serialization.dumps(e))
            )
        ver = 2
    return ver


[docs] class EntryListList(Base): __tablename__ = 'entry_list_lists' id = Column(Integer, primary_key=True) name = Column(Unicode, unique=True) added = Column(DateTime, default=datetime.now) entries = relationship( 'EntryListEntry', backref='list', cascade='all, delete, delete-orphan', lazy='dynamic' )
[docs] def to_dict(self): return {'id': self.id, 'name': self.name, 'added_on': self.added}
[docs] class EntryListEntry(Base): __tablename__ = 'entry_list_entries' id = Column(Integer, primary_key=True) list_id = Column(Integer, ForeignKey(EntryListList.id), nullable=False) added = Column(DateTime, default=datetime.now) title = Column(Unicode) original_url = Column(Unicode) _json = Column('json', Unicode) entry = entry_synonym('_json') def __init__(self, entry, entry_list_id): self.title = entry['title'] self.original_url = entry.get('original_url') or entry['url'] self.entry = entry self.list_id = entry_list_id def __repr__(self): return f'<EntryListEntry,title={self.title},original_url={self.original_url}>'
[docs] def to_dict(self): return { 'id': self.id, 'list_id': self.list_id, 'added_on': self.added, 'title': self.title, 'original_url': self.original_url, 'entry': json.coerce(self.entry), }
[docs] class DBEntrySet(MutableSet):
[docs] def _db_list(self, session): return session.query(EntryListList).filter(EntryListList.name == self.config).first()
def __init__(self, config): self.config = config with Session() as session: if not self._db_list(session): session.add(EntryListList(name=self.config))
[docs] def _entry_query(self, session, entry): return ( session .query(EntryListEntry) .filter( and_( EntryListEntry.list_id == self._db_list(session).id, or_( EntryListEntry.title == entry['title'], and_( EntryListEntry.original_url, EntryListEntry.original_url == entry['original_url'], ), ), ) ) .first() )
def __iter__(self): with Session() as session: for e in self._db_list(session).entries.order_by(EntryListEntry.added.desc()).all(): logger.debug('returning {}', e.entry) yield e.entry def __contains__(self, entry): with Session() as session: return self._entry_query(session, entry) is not None def __len__(self): with Session() as session: return self._db_list(session).entries.count()
[docs] def discard(self, entry): with Session() as session: db_entry = self._entry_query(session=session, entry=entry) if db_entry: logger.debug('deleting entry {}', db_entry) session.delete(db_entry)
[docs] def add(self, entry): with Session() as session: stored_entry = self._entry_query(session, entry) if stored_entry: # Refresh all the fields if we already have this entry logger.debug('refreshing entry {}', entry) stored_entry.entry = entry else: logger.debug('adding entry {} to list {}', entry, self._db_list(session).name) stored_entry = EntryListEntry(entry=entry, entry_list_id=self._db_list(session).id) session.add(stored_entry)
@property def immutable(self): return False
[docs] def _from_iterable(self, it): # TODO: is this the right answer? the returned object won't have our custom __contains__ logic return set(it)
@property def online(self): """Set the online status of the plugin. Online plugin should be treated differently in certain situations, like test mode """ return False
[docs] def get(self, entry): with Session() as session: match = self._entry_query(session=session, entry=entry) return Entry(match.entry) if match else None
@with_session def get_entry_lists(name=None, session=None): logger.debug('retrieving entry lists') query = session.query(EntryListList) if name: logger.debug('searching for entry lists with name {}', name) query = query.filter(EntryListList.name.contains(name)) return query.all() @with_session def get_list_by_exact_name(name, session=None): logger.debug('returning entry list with name {}', name) return ( session.query(EntryListList).filter(func.lower(EntryListList.name) == name.lower()).one() ) @with_session def get_list_by_id(list_id, session=None): logger.debug('fetching entry list with id {}', list_id) return session.query(EntryListList).filter(EntryListList.id == list_id).one() @with_session def delete_list_by_id(list_id, session=None): entry_list = get_list_by_id(list_id=list_id, session=session) if entry_list: logger.debug('deleting entry list with id {}', list_id) session.delete(entry_list) @with_session def get_entries_by_list_id( list_id, start=None, stop=None, order_by='title', descending=False, entry_ids=None, session=None, ): logger.debug('querying entries from entry list with id {}', list_id) query = session.query(EntryListEntry).filter(EntryListEntry.list_id == list_id) if entry_ids: query = query.filter(EntryListEntry.id.in_(entry_ids)) if descending: query = query.order_by(getattr(EntryListEntry, order_by).desc()) else: query = query.order_by(getattr(EntryListEntry, order_by)) return query.slice(start, stop).all() @with_session def get_entry_by_title(list_id, title, session=None): entry_list = get_list_by_id(list_id=list_id, session=session) if entry_list: return ( session .query(EntryListEntry) .filter(and_(EntryListEntry.title == title, EntryListEntry.list_id == list_id)) .first() ) return None @with_session def get_entry_by_id(list_id, entry_id, session=None): logger.debug('fetching entry with id {} from list id {}', entry_id, list_id) return ( session .query(EntryListEntry) .filter(and_(EntryListEntry.id == entry_id, EntryListEntry.list_id == list_id)) .one() )