Source code for flexget.components.pending_approval.db

from datetime import datetime, timedelta

from loguru import logger
from sqlalchemy import Boolean, Column, DateTime, Integer, String, Unicode, select

from flexget import db_schema
from flexget.entry import Entry
from flexget.event import event
from flexget.utils import json, serialization
from flexget.utils.database import entry_synonym
from flexget.utils.sqlalchemy_utils import table_schema

logger = logger.bind(name='pending_approval')
Base = db_schema.versioned_base('pending_approval', 1)


@db_schema.upgrade('pending_approval')
def upgrade(ver, session):
    if ver == 0:
        table = table_schema('pending_entries', session)
        for row in session.execute(select(table.c.id, table.c.json)):
            if not row['json']:
                # Seems there could be invalid data somehow. See #2590
                continue
            data = json.loads(row['json'], decode_datetime=True)
            # If title looked like a date, make sure it's a string
            title = str(data.pop('title'))
            e = Entry(title=title, **data)
            session.execute(
                table.update().where(table.c.id == row['id']).values(json=serialization.dumps(e))
            )

        ver = 1
    return ver


[docs] class PendingEntry(Base): __tablename__ = 'pending_entries' id = Column(Integer, primary_key=True, autoincrement=True, nullable=False) task_name = Column(Unicode) title = Column(Unicode) url = Column(String) approved = Column(Boolean) _json = Column('json', Unicode) entry = entry_synonym('_json') added = Column(DateTime, default=datetime.now) def __init__(self, task_name, entry): self.task_name = task_name self.title = entry['title'] self.url = entry['url'] self.approved = False self.entry = entry def __repr__(self): return f'<PendingEntry(task_name={self.task_name},title={self.title},url={self.url},approved={self.approved})>'
[docs] def to_dict(self): return { 'id': self.id, 'task_name': self.task_name, 'title': self.title, 'url': self.url, 'approved': self.approved, 'added': self.added, }
[docs] @event('manager.db_cleanup') def db_cleanup(manager, session): # Clean unapproved entries older than 1 year deleted = ( session .query(PendingEntry) .filter(PendingEntry.added < datetime.now() - timedelta(days=365)) .delete() ) if deleted: logger.info('Purged {} pending entries older than 1 year', deleted)
[docs] def list_pending_entries( session, task_name=None, approved=None, start=None, stop=None, sort_by='added', descending=True ): logger.debug('querying pending entries') query = session.query(PendingEntry) if task_name: query = query.filter(PendingEntry.task_name == task_name) if approved is not None: query = query.filter(PendingEntry.approved == approved) if descending: query = query.order_by(getattr(PendingEntry, sort_by).desc()) else: query = query.order_by(getattr(PendingEntry, sort_by)) return query.slice(start, stop).all()
[docs] def get_entry_by_id(session, entry_id): return session.query(PendingEntry).filter(PendingEntry.id == entry_id).one()