import datetime
from datetime import timedelta
from loguru import logger
from sqlalchemy import Boolean, Column, DateTime, Integer, String, func, select
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import relationship
from sqlalchemy.schema import ForeignKey
from flexget import db_schema
from flexget.event import event
from flexget.utils.database import with_session
from flexget.utils.sqlalchemy_utils import create_index, drop_index, index_exists
logger = logger.bind(name='status.db')
Base = db_schema.versioned_base('status', 4)
@db_schema.upgrade('status')
def upgrade(ver, session):
if ver < 3:
table_name = 'status_execution'
old_index_name = 'ix_status_execution_task_id_start_end_succeeded'
if index_exists(table_name, old_index_name, session):
drop_index(table_name, old_index_name, session)
# Creates the executions table index
create_index(table_name, session, 'task_id')
ver = 3
# Add composite indexes for better query performance
if ver < 4:
table_name = 'status_execution'
# Index for finding last successful executions with produced > 0
# This supports queries filtering by task_id, succeeded, and produced with ordering by start
create_index(table_name, session, 'task_id', 'succeeded', 'produced', 'start')
# Index for finding latest executions by task_id and start time
# This supports queries that need the max(start) for each task_id
create_index(table_name, session, 'task_id', 'start')
ver = 4
return ver
[docs]
class StatusTask(Base):
__tablename__ = 'status_task'
id = Column(Integer, primary_key=True)
name = Column('task', String)
executions = relationship(
'TaskExecution', backref='task', cascade='all, delete, delete-orphan', lazy='dynamic'
)
def __repr__(self):
return f'<StatusTask(id={self.id},name={self.name})>'
@hybrid_property
def last_execution_time(self):
if self.executions.count() == 0:
return None
return max(execution.start for execution in self.executions)
@last_execution_time.expression
@classmethod
def last_execution_time(cls):
return (
select(func.max(TaskExecution.start))
.where(TaskExecution.task_id == cls.id)
.correlate(StatusTask.__table__)
.label('last_execution_time')
)
[docs]
def to_dict(self):
return {
'id': self.id,
'name': self.name,
'last_execution_time': self.last_execution_time
and self.last_execution_time.astimezone(),
}
[docs]
class TaskExecution(Base):
__tablename__ = 'status_execution'
id = Column(Integer, primary_key=True)
task_id = Column(Integer, ForeignKey('status_task.id'), index=True)
start = Column(DateTime)
end = Column(DateTime)
succeeded = Column(Boolean, default=True)
# Entry amounts
produced = Column(Integer)
accepted = Column(Integer)
rejected = Column(Integer)
failed = Column(Integer)
abort_reason = Column(String, nullable=True)
def __repr__(self):
return f'<TaskExecution(task_id={self.task_id},start={self.start},end={self.end},succeeded={self.succeeded},p={self.produced},a={self.accepted},r={self.rejected},f={self.failed},reason={self.abort_reason})>'
[docs]
def to_dict(self):
return {
'id': self.id,
'task_id': self.task_id,
'start': self.start and self.start.astimezone(),
'end': self.end and self.end.astimezone(),
'succeeded': self.succeeded,
'produced': self.produced,
'accepted': self.accepted,
'rejected': self.rejected,
'failed': self.failed,
'abort_reason': self.abort_reason,
}
[docs]
@event('manager.db_cleanup')
def db_cleanup(manager, session):
# Purge all status data for non existing tasks
for status_task in session.query(StatusTask).all():
if status_task.name not in manager.config['tasks']:
logger.verbose('Purging obsolete status data for task {}', status_task.name)
session.delete(status_task)
# Purge task executions older than 1 year
result = (
session
.query(TaskExecution)
.filter(TaskExecution.start < datetime.datetime.now() - timedelta(days=365))
.delete()
)
if result:
logger.verbose('Removed {} task executions from history older than 1 year', result)
@with_session
def get_status_tasks(
start=None, stop=None, order_by='last_execution_time', descending=True, session=None
):
logger.debug(
'querying status tasks: start={}, stop={}, order_by={}, descending={}',
start,
stop,
order_by,
descending,
)
query = session.query(StatusTask)
if descending:
query = query.order_by(getattr(StatusTask, order_by).desc())
else:
query = query.order_by(getattr(StatusTask, order_by))
return query.slice(start, stop).all()
@with_session
def get_executions_by_task_id(
task_id,
start=None,
stop=None,
order_by='start',
descending=True,
succeeded=None,
produced=True,
start_date=None,
end_date=None,
session=None,
):
logger.debug(
'querying task executions: task_id={}, start={}, stop={}, order_by={}, '
'descending={}, succeeded={}, produced={}, start_date={}, end_date={}',
task_id,
start,
stop,
order_by,
descending,
succeeded,
produced,
start_date,
end_date,
)
query = session.query(TaskExecution).filter(TaskExecution.task_id == task_id)
if succeeded:
query = query.filter(TaskExecution.succeeded == succeeded)
if produced:
query = query.filter(TaskExecution.produced > 0)
if start_date:
query = query.filter(TaskExecution.start >= start_date)
if end_date:
query = query.filter(TaskExecution.start <= end_date)
if descending:
query = query.order_by(getattr(TaskExecution, order_by).desc())
else:
query = query.order_by(getattr(TaskExecution, order_by))
return query.slice(start, stop).all()