Source code for flexget.task_queue

from __future__ import annotations

import queue
import threading
import time
from typing import TYPE_CHECKING

from loguru import logger
from sqlalchemy.exc import OperationalError, ProgrammingError

from flexget.task import TaskAbort

if TYPE_CHECKING:
    from flexget.task import Task

logger = logger.bind(name='task_queue')


[docs] class TaskQueue: """Task processing thread. Only executes one task at a time, if more are requested they are queued up and run in turn. """ def __init__(self) -> None: self.run_queue: queue.PriorityQueue[Task] = queue.PriorityQueue() self._shutdown_now = False self._shutdown_when_finished = False self.current_task: Task | None = None self._thread = None
[docs] def start(self) -> None: # We don't override `threading.Thread` because debugging this seems unsafe with pydevd. # Overriding __len__(self) seems to cause a debugger deadlock. # Don't instantiate the Thread() until `start()`, to make sure we have daemonized (forked) first. if not self._thread: self._thread = threading.Thread(target=self.run, name='task_queue', daemon=True) self._thread.start()
[docs] def run(self) -> None: while not self._shutdown_now: # Grab the first job from the run queue and do it try: self.current_task = self.run_queue.get(timeout=0.5) except queue.Empty: if self._shutdown_when_finished: self._shutdown_now = True continue try: self.current_task.execute() except TaskAbort as e: logger.debug('task {} aborted: {!r}', self.current_task.name, e) except (ProgrammingError, OperationalError): logger.critical('Database error while running a task. Attempting to recover.') self.current_task.manager.crash_report() except Exception: logger.critical('BUG: Unhandled exception during task queue run loop.') self.current_task.manager.crash_report() finally: self.run_queue.task_done() self.current_task = None remaining_jobs = self.run_queue.qsize() if remaining_jobs: logger.warning( 'task queue shut down with {} tasks remaining in the queue to run.', remaining_jobs ) else: logger.debug('task queue shut down')
[docs] def is_alive(self) -> bool: return self._thread and self._thread.is_alive()
[docs] def put(self, task: Task): """Add a task to be executed to the queue.""" self.run_queue.put(task)
def __len__(self) -> int: return self.run_queue.qsize()
[docs] def shutdown(self, finish_queue: bool = True) -> None: """Request shutdown. :param bool finish_queue: Should all tasks be finished before ending thread. """ logger.debug('task queue shutdown requested') if finish_queue: self._shutdown_when_finished = True if self.run_queue.qsize(): logger.verbose( 'There are {} tasks to execute. Shutdown will commence when they have completed.', self.run_queue.qsize(), ) else: self._shutdown_now = True
[docs] def wait(self) -> None: """Wait for the thread to exit. Allow abortion of task queue with ctrl-c """ try: while self._thread.is_alive(): time.sleep(0.5) except KeyboardInterrupt: logger.error('Got ctrl-c, shutting down after running task (if any) completes') self.shutdown(finish_queue=False) # We still wait to finish cleanly, pressing ctrl-c again will abort while self._thread.is_alive(): time.sleep(0.5)