Source code for flexget.api.core.server

from __future__ import annotations

import base64
import binascii
import copy
import json
import os
import sys
import threading
import traceback
from pathlib import Path
from time import sleep
from typing import IO, TYPE_CHECKING

import cherrypy
import yaml
from flask import Response, jsonify, request
from loguru import logger
from pyparsing import (
    Combine,
    Forward,
    Group,
    Keyword,
    OneOrMore,
    Optional,
    ParseException,
    Suppress,
    White,
    Word,
    alphanums,
    alphas,
    nums,
    one_of,
    printables,
    rest_of_line,
)
from yaml.error import MarkedYAMLError, YAMLError

from flexget._version import __version__
from flexget.api import APIResource, api
from flexget.api.app import (
    APIError,
    BadRequest,
    base_message,
    base_message_schema,
    empty_response,
    etag,
    success_response,
)
from flexget.api.app import __version__ as __api_version__
from flexget.config_schema import ConfigError
from flexget.utils.tools import get_latest_flexget_version_number

if TYPE_CHECKING:
    from collections.abc import Generator

    from sqlalchemy.orm import Session

logger = logger.bind(name='api.server')

server_api = api.namespace('server', description='Manage Daemon')


[docs] class ObjectsContainer: yaml_error_response = copy.deepcopy(base_message) yaml_error_response['properties']['column'] = {'type': 'integer'} yaml_error_response['properties']['line'] = {'type': 'integer'} yaml_error_response['properties']['reason'] = {'type': 'string'} config_validation_error = copy.deepcopy(base_message) config_validation_error['properties']['error'] = {'type': 'string'} config_validation_error['properties']['config_path'] = {'type': 'string'} pid_object = {'type': 'object', 'properties': {'pid': {'type': 'integer'}}} raw_config_object = {'type': 'object', 'properties': {'raw_config': {'type': 'string'}}} version_object = { 'type': 'object', 'properties': { 'flexget_version': {'type': 'string'}, 'api_version': {'type': 'string'}, 'latest_version': {'type': ['string', 'null']}, }, } dump_threads_object = { 'type': 'object', 'properties': { 'threads': { 'type': 'array', 'items': { 'type': 'object', 'properties': { 'name': {'type': 'string'}, 'id': {'type': 'string'}, 'dump': {'type': 'array', 'items': {'type': 'string'}}, }, }, } }, } server_manage = { 'type': 'object', 'properties': { 'operation': {'type': 'string', 'enum': ['reload', 'shutdown']}, 'force': {'type': 'boolean'}, }, 'required': ['operation'], 'additionalProperties': False, } crash_logs = { 'type': 'array', 'items': { 'type': 'object', 'properties': { 'name': {'type': 'string'}, 'content': {'type': 'array', 'items': {'type': 'string'}}, }, }, }
yaml_error_schema = api.schema_model('yaml_error_schema', ObjectsContainer.yaml_error_response) config_validation_schema = api.schema_model( 'config_validation_schema', ObjectsContainer.config_validation_error ) pid_schema = api.schema_model('server.pid', ObjectsContainer.pid_object) raw_config_schema = api.schema_model('raw_config', ObjectsContainer.raw_config_object) version_schema = api.schema_model('server.version', ObjectsContainer.version_object) dump_threads_schema = api.schema_model('server.dump_threads', ObjectsContainer.dump_threads_object) server_manage_schema = api.schema_model('server.manage', ObjectsContainer.server_manage) crash_logs_schema = api.schema_model('server.crash_logs', ObjectsContainer.crash_logs)
[docs] @server_api.route('/manage/') class ServerReloadAPI(APIResource):
[docs] @api.validate(server_manage_schema) @api.response(501, model=yaml_error_schema, description='YAML syntax error') @api.response(502, model=config_validation_schema, description='Config validation error') @api.response(200, model=base_message_schema) def post(self, session: Session = None) -> Response: """Manage server operations.""" data = request.json if data['operation'] == 'reload': try: self.manager.load_config(output_to_console=False) except YAMLError as e: if isinstance(e, MarkedYAMLError): error: dict[str, int] = {} if e.problem is not None: error.update({'reason': e.problem}) if e.context_mark is not None: error.update({ 'line': e.context_mark.line, 'column': e.context_mark.column, }) if e.problem_mark is not None: error.update({ 'line': e.problem_mark.line, 'column': e.problem_mark.column, }) raise APIError(message='Invalid YAML syntax', payload=error) except ConfigError as e: errors = [{'error': er.message, 'config_path': er.json_pointer} for er in e.errors] raise APIError(f'Error loading config: {e.args[0]}', payload={'errors': errors}) response = 'Config successfully reloaded from disk' else: self.manager.shutdown(data.get('force')) response = 'Shutdown requested' return success_response(response)
[docs] @server_api.route('/pid/') class ServerPIDAPI(APIResource):
[docs] @api.response(200, description='Reloaded config', model=pid_schema) def get(self, session: Session = None) -> Response: """Get server PID.""" return jsonify({'pid': os.getpid()})
[docs] @server_api.route('/config/') class ServerConfigAPI(APIResource):
[docs] @etag @api.response(200, description='Flexget config', model=empty_response) def get(self, session: Session = None) -> Response: """Get Flexget Config in JSON form.""" return jsonify(self.manager.config)
[docs] @server_api.route('/raw_config/') class ServerRawConfigAPI(APIResource):
[docs] @etag @api.doc(description='Return config file encoded in Base64') @api.response( 200, model=raw_config_schema, description='Flexget raw YAML config file encoded in Base64' ) def get(self, session: Session = None) -> Response: """Get raw YAML config file.""" with self.manager.config_path.open(encoding='utf-8') as f: raw_config = base64.b64encode(f.read().encode('utf-8')) return jsonify(raw_config=raw_config.decode('utf-8'))
[docs] @api.validate(raw_config_schema) @api.response(200, model=base_message_schema, description='Successfully updated config') @api.response(BadRequest) @api.response(APIError) @api.doc( description='Config file must be base64 encoded. A backup will be created, and if successful config will' ' be loaded and saved to original file.' ) def post(self, session: Session = None) -> Response: """Update config.""" config = {} data = request.json try: raw_config = base64.b64decode(data['raw_config']) except (TypeError, binascii.Error): raise BadRequest(message='payload was not a valid base64 encoded string') try: config = yaml.safe_load(raw_config) except YAMLError as e: if isinstance(e, MarkedYAMLError): error: dict[str, int] = {} if e.problem is not None: error.update({'reason': e.problem}) if e.context_mark is not None: error.update({'line': e.context_mark.line, 'column': e.context_mark.column}) if e.problem_mark is not None: error.update({'line': e.problem_mark.line, 'column': e.problem_mark.column}) raise BadRequest(message='Invalid YAML syntax', payload=error) try: backup_path = self.manager.update_config(config) except ConfigError as e: errors = [{'error': er.message, 'config_path': er.json_pointer} for er in e.errors] raise BadRequest( message=f'Error loading config: {e.args[0]}', payload={'errors': errors} ) try: self.manager.backup_config() except Exception as e: raise APIError( message='Failed to create config backup, config updated but NOT written to file', payload={'reason': str(e)}, ) try: with self.manager.config_path.open('w', encoding='utf-8') as f: f.write(raw_config.decode('utf-8').replace('\r\n', '\n')) except Exception as e: raise APIError( message='Failed to write new config to file, please load from backup', payload={'reason': str(e), 'backup_path': backup_path}, ) return success_response('Config was loaded and successfully updated to file')
[docs] @server_api.route('/version/') @api.doc( description='In case of a request error when fetching latest flexget version, ' 'that value will return as null' ) class ServerVersionAPI(APIResource):
[docs] @api.response(200, description='Flexget version', model=version_schema) def get(self, session: Session = None) -> Response: """Flexget Version.""" latest = get_latest_flexget_version_number() return jsonify({ 'flexget_version': __version__, 'api_version': __api_version__, 'latest_version': latest, })
[docs] @server_api.route('/dump_threads/', doc=False) class ServerDumpThreads(APIResource):
[docs] @api.response(200, description='Flexget threads dump', model=dump_threads_schema) def get(self, session: Session = None) -> Response: """Dump Server threads for debugging.""" id2name = {th.ident: th.name for th in threading.enumerate()} threads = [] for thread_id, stack in sys._current_frames().items(): dump = [] for filename, lineno, name, line in traceback.extract_stack(stack): dump.append(f'File: "{filename}", line {lineno}, in {name}') if line: dump.append(line.strip()) threads.append({'name': id2name.get(thread_id), 'id': thread_id, 'dump': dump}) return jsonify(threads=threads)
server_log_parser = api.parser() server_log_parser.add_argument( 'lines', type=int, default=200, help='How many lines to find before streaming' ) server_log_parser.add_argument('search', help='Search filter support google like syntax')
[docs] def reverse_readline( fh: IO, start_byte: int = 0, buf_size: int = 8192 ) -> Generator[str, None, None]: """Return the lines of a file in reverse order.""" segment: str | None = None offset = 0 if start_byte: fh.seek(start_byte) else: fh.seek(0, os.SEEK_END) total_size = remaining_size = fh.tell() while remaining_size > 0: offset = min(total_size, offset + buf_size) fh.seek(-offset, os.SEEK_END) buf = fh.read(min(remaining_size, buf_size)) remaining_size -= buf_size lines = buf.decode(sys.getfilesystemencoding()).split('\n') # the first line of the buffer is probably not a complete line so # we'll save it and append it to the last line of the next buffer # we read if segment is not None: # if the previous chunk starts right from the beginning of line # do not concact the segment to the last line of new chunk # instead, yield the segment first if buf[-1] != '\n': lines[-1] += segment else: yield segment segment = lines[0] for index in range(len(lines) - 1, 0, -1): if len(lines[index]): yield lines[index] yield segment
[docs] def file_inode(file: Path) -> int: fd = None try: fd = os.open(file, os.O_RDONLY) return os.fstat(fd).st_ino except OSError: return 0 finally: if fd: os.close(fd)
[docs] @server_api.route('/log/') class ServerLogAPI(APIResource):
[docs] @api.doc(expect=[server_log_parser]) @api.response(200, description='Streams as line delimited JSON') def get(self, session: Session = None) -> Response: """Stream Flexget log Streams as line delimited JSON.""" args = server_log_parser.parse_args() def follow(lines, search): log_parser = LogParser(search) stream_from_byte = 0 lines_found = [] if Path(self.manager.options.logfile).is_absolute(): base_log_file = Path(self.manager.options.logfile) else: base_log_file = self.manager.config_base / self.manager.options.logfile yield '{"stream": [' # Start of the json stream # Read back in the logs until we find enough lines for i in range(9): log_file = f'{base_log_file}.{i}'.rstrip('.0') # 1st log file has no number if not Path(log_file).is_file(): break with Path(log_file).open('rb') as fh: fh.seek(0, 2) # Seek to bottom of file end_byte = fh.tell() if i == 0: stream_from_byte = end_byte # Stream from this point later on if len(lines_found) >= lines: break # Read in reverse for efficiency for line in reverse_readline(fh, start_byte=end_byte): if len(lines_found) >= lines: break if log_parser.matches(line): lines_found.append(log_parser.json_string(line)) for line in reversed(lines_found): yield line + ',\n' # We need to track the inode in case the log file is rotated current_inode = file_inode(base_log_file) while True: # If the server is shutting down then end the stream nicely if cherrypy.engine.state != cherrypy.engine.states.STARTED: break new_inode = file_inode(base_log_file) if current_inode != new_inode: # File updated/rotated. Read from beginning stream_from_byte = 0 current_inode = new_inode try: with base_log_file.open('rb') as fh: fh.seek(stream_from_byte) line = fh.readline().decode(sys.getfilesystemencoding()) stream_from_byte = fh.tell() except OSError: yield '{}' continue # If a valid line is found and does not pass the filter then set it to none line = log_parser.json_string(line) if log_parser.matches(line) else '{}' if line == '{}': # If no match then delay to prevent many read hits on the file sleep(2) yield line + ',\n' yield '{}]}' # End of stream return Response(follow(args['lines'], args['search']), mimetype='text/event-stream')
[docs] class LogParser: """Filter log file. Supports * 'and', 'or' and implicit 'and' operators; * parentheses; * quoted strings; """ def __init__(self, query: str | None) -> None: self._methods = { 'and': self.evaluate_and, 'or': self.evaluate_or, 'not': self.evaluate_not, 'parenthesis': self.evaluate_parenthesis, 'quotes': self.evaluate_quotes, 'word': self.evaluate_word, } self.line = '' self.query = query.lower() if query else '' if self.query: # TODO: Cleanup operator_or = Forward() operator_word = Group(Word(alphanums)).setResultsName('word') operator_quotes_content = Forward() operator_quotes_content << ((operator_word + operator_quotes_content) | operator_word) operator_quotes = ( Group(Suppress('"') + operator_quotes_content + Suppress('"')).setResultsName( 'quotes' ) | operator_word ) operator_parenthesis = ( Group(Suppress('(') + operator_or + Suppress(')')).setResultsName('parenthesis') | operator_quotes ) operator_not = Forward() operator_not << ( Group(Suppress(Keyword('no', caseless=True)) + operator_not).setResultsName('not') | operator_parenthesis ) operator_and = Forward() operator_and << ( Group( operator_not + Suppress(Keyword('and', caseless=True)) + operator_and ).setResultsName('and') | Group(operator_not + OneOrMore(~one_of('and or') + operator_and)).setResultsName( 'and' ) | operator_not ) operator_or << ( Group( operator_and + Suppress(Keyword('or', caseless=True)) + operator_or ).setResultsName('or') | operator_and ) self._query_parser = operator_or.parseString(self.query)[0] else: self._query_parser = False integer = Word(nums) date = Combine( Combine(integer + '-' + integer + '-' + integer) + White(' ') + Combine(integer + ':' + integer + Optional(':' + integer)) ) self._log_parser = ( date.set_results_name('timestamp') + Word(alphas).set_results_name('log_level') + Word(printables).set_results_name('plugin') + ( White(' ', min=16).set_parse_action(lambda t: ['']).set_results_name('task') | White(' ', max=15) + Word(alphas).set_results_name('task') ).leave_whitespace() + rest_of_line.set_parse_action(lambda t: [t[0].strip()]).set_results_name('message') )
[docs] def evaluate_and(self, argument): return self.evaluate(argument[0]) and self.evaluate(argument[1])
[docs] def evaluate_or(self, argument): return self.evaluate(argument[0]) or self.evaluate(argument[1])
[docs] def evaluate_not(self, argument): return not self.evaluate(argument[0])
[docs] def evaluate_parenthesis(self, argument): return self.evaluate(argument[0])
[docs] def evaluate_quotes(self, argument): search_terms = [term[0] for term in argument] return ' '.join(search_terms) in ' '.join(self.line.split())
[docs] def evaluate_word(self, argument): return argument[0] in self.line
[docs] def evaluate(self, argument): return self._methods[argument.getName()](argument)
[docs] def matches(self, line): if not line: return False self.line = line.lower() if not self._query_parser: return True return self.evaluate(self._query_parser)
[docs] def json_string(self, line): try: return json.dumps(self._log_parser().parseString(line).asDict()) except ParseException: return '{}'
[docs] @server_api.route('/crash_logs/') class ServerCrashLogAPI(APIResource):
[docs] @api.response(200, 'Successfully retrieved crash logs', model=crash_logs_schema) def get(self, session: Session): """Get Crash logs.""" path = self.manager.config_base crashes = [] for file in path.glob('crash_report*.log'): with file.open() as f: crashes.append({'name': file.name, 'content': f.readlines()}) return jsonify(crashes)