Source code for flexget.components.backlog.db

import pickle
from datetime import datetime

from loguru import logger
from sqlalchemy import Column, DateTime, Index, Integer, String, Unicode, column, select

from flexget import db_schema
from flexget.entry import Entry
from flexget.utils import json, serialization
from flexget.utils.database import entry_synonym, with_session
from flexget.utils.sqlalchemy_utils import table_add_column, table_schema

logger = logger.bind(name='backlog.db')
Base = db_schema.versioned_base('backlog', 3)


[docs] class BacklogEntry(Base): __tablename__ = 'backlog' id = Column(Integer, primary_key=True) task = Column('feed', String) title = Column(String) expire = Column(DateTime) _json = Column('json', Unicode) entry = entry_synonym('_json') def __repr__(self): return f'<BacklogEntry(title={self.title})>'
Index('ix_backlog_feed_expire', BacklogEntry.task, BacklogEntry.expire) @db_schema.upgrade('backlog') def upgrade(ver, session): if ver is None: # Make sure there is no data we can't load in the backlog table backlog_table = table_schema('backlog', session) try: for item in session.query(column('entry')).select_from(backlog_table).all(): pickle.loads(item.entry) except (ImportError, TypeError): # If there were problems, we can drop the data. logger.info('Backlog table contains unloadable data, clearing old data.') session.execute(backlog_table.delete()) ver = 0 if ver == 0: backlog_table = table_schema('backlog', session) logger.info('Creating index on backlog table.') Index('ix_backlog_feed_expire', backlog_table.c.feed, backlog_table.c.expire).create( bind=session.bind ) ver = 1 if ver == 1: table = table_schema('backlog', session) table_add_column(table, 'json', Unicode, session) # Make sure we get the new schema with the added column table = table_schema('backlog', session) for row in session.execute(select(table.c.id, table.c.entry)): try: p = pickle.loads(row['entry']) session.execute( table .update() .where(table.c.id == row['id']) .values(json=json.dumps(p, encode_datetime=True)) ) except KeyError as e: logger.error('Unable error upgrading backlog pickle object due to {}', str(e)) ver = 2 if ver == 2: table = table_schema('backlog', session) for row in session.execute(select(table.c.id, table.c.json)): if not row['json']: # Seems there could be invalid data somehow. See #2590 continue data = json.loads(row['json'], decode_datetime=True) # If title looked like a date, make sure it's a string title = str(data.pop('title')) e = Entry(title=title, **data) session.execute( table.update().where(table.c.id == row['id']).values(json=serialization.dumps(e)) ) ver = 3 return ver @with_session def get_entries(task=None, session=None): query = session.query(BacklogEntry) if task: query = query.filter(BacklogEntry.task == task) return query.all() @with_session def clear_entries(task=None, all=False, session=None): """Clear entries from backlog. :param task: If given, only entries from specified task will be cleared. :param all: If True, all entries will be cleared, otherwise only expired entries will be cleared. :returns: Number of entries cleared from backlog. """ query = session.query(BacklogEntry) if task: query = query.filter(BacklogEntry.task == task) if not all: query = query.filter(BacklogEntry.expire < datetime.now()) return query.delete()