Source code for flexget.components.archive.db

import re
from datetime import datetime, timedelta

from loguru import logger
from sqlalchemy import Column, DateTime, ForeignKey, Index, Integer, Table, Unicode
from sqlalchemy.exc import NoResultFound
from sqlalchemy.orm import relationship

from flexget import db_schema
from flexget.event import event
from flexget.utils.sqlalchemy_utils import get_index_by_name, table_schema

logger = logger.bind(name='archive.db')

SCHEMA_VER = 0

Base = db_schema.versioned_base('archive', SCHEMA_VER)

archive_tags_table = Table(
    'archive_entry_tags',
    Base.metadata,
    Column('entry_id', Integer, ForeignKey('archive_entry.id')),
    Column('tag_id', Integer, ForeignKey('archive_tag.id')),
    Index('ix_archive_tags', 'entry_id', 'tag_id'),
)

archive_sources_table = Table(
    'archive_entry_sources',
    Base.metadata,
    Column('entry_id', Integer, ForeignKey('archive_entry.id')),
    Column('source_id', Integer, ForeignKey('archive_source.id')),
    Index('ix_archive_sources', 'entry_id', 'source_id'),
)

Base.register_table(archive_tags_table)
Base.register_table(archive_sources_table)


[docs] class ArchiveEntry(Base): __tablename__ = 'archive_entry' __table_args__ = (Index('ix_archive_title_url', 'title', 'url'),) id = Column(Integer, primary_key=True) title = Column(Unicode, index=True) url = Column(Unicode, index=True) description = Column(Unicode) task = Column('feed', Unicode) # DEPRECATED, but SQLite does not support drop column added = Column(DateTime, index=True) tags = relationship('ArchiveTag', secondary=archive_tags_table) sources = relationship( 'ArchiveSource', secondary=archive_sources_table, backref='archive_entries' ) def __init__(self): self.added = datetime.now() def __str__(self): return '<ArchiveEntry(title={},url={},task={},added={})>'.format( self.title, self.url, self.task, self.added.strftime('%Y-%m-%d %H:%M'), )
[docs] class ArchiveTag(Base): __tablename__ = 'archive_tag' id = Column(Integer, primary_key=True) name = Column(Unicode, index=True) def __init__(self, name): self.name = name def __str__(self): return f'<ArchiveTag(id={self.id},name={self.name})>'
[docs] class ArchiveSource(Base): __tablename__ = 'archive_source' id = Column(Integer, primary_key=True) name = Column(Unicode, index=True) def __init__(self, name): self.name = name def __str__(self): return f'<ArchiveSource(id={self.id},name={self.name})>'
@db_schema.upgrade('archive') def upgrade(ver, session): if ver is None: # get rid of old index aet = table_schema('archive_entry', session) old_index = get_index_by_name(aet, 'archive_feed_title') if old_index is not None: logger.info('Dropping legacy index (may take a while) ...') old_index.drop() # create new index by title, url new_index = get_index_by_name( Base.metadata.tables['archive_entry'], 'ix_archive_title_url' ) if new_index: logger.info('Creating new index (may take a while) ...') new_index.create(bind=session.connection()) else: # maybe removed from the model by later migrations? logger.error('Unable to create index `ix_archive_title_url`, removed from the model?') # TODO: nag about this ? # This is safe as long as we don't delete the model completely :) # But generally never use Declarative Models in migrate! if session.query(ArchiveEntry).first(): logger.critical('----------------------------------------------') logger.critical('You should run `--archive consolidate` ') logger.critical('one time when you have time, it may take hours') logger.critical('----------------------------------------------') ver = 0 return ver
[docs] def get_source(name, session): """Return ArchiveSource from db or new one. :param string name: Source name :param session: SQLAlchemy session """ try: return session.query(ArchiveSource).filter(ArchiveSource.name == name).one() except NoResultFound: return ArchiveSource(name)
[docs] def get_tag(name, session): """Return ArchiveTag from db or new one. :param string name: Tag name :param session: SQLAlchemy session """ try: return session.query(ArchiveTag).filter(ArchiveTag.name == name).one() except NoResultFound: return ArchiveTag(name)
[docs] @event('manager.db_cleanup') def db_cleanup(manager, session): """Remove ArchiveEntry records older than 2 years.""" result = ( session .query(ArchiveEntry) .filter(ArchiveEntry.added < datetime.now() - timedelta(days=730)) .delete() ) if result: logger.verbose('Removed {} archive entries older than 2 years', result)