Source code for flexget.webserver

import hashlib
import random
import socket
import threading

import cherrypy
import zxcvbn
from flask import Flask, abort, redirect
from flask_login import UserMixin
from loguru import logger
from sqlalchemy import Column, Integer, Unicode
from werkzeug.security import generate_password_hash

from flexget.manager import Base
from flexget.utils.database import with_session

logger = logger.bind(name='web_server')

_home: str | None = None
_app_register: dict[str, tuple[Flask, str]] = {}
_default_app = Flask(__name__)

rand = random.SystemRandom()


[docs] def generate_key(): """Generate key for use to authentication.""" return str(hashlib.sha224(str(rand.getrandbits(128)).encode('utf-8')).hexdigest())
[docs] def get_random_string( length=12, allowed_chars='abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' ): """Return a securely generated random string. The default length of 12 with the a-z, A-Z, 0-9 character set returns a 71-bit value. log_2((26+26+10)^12) =~ 71 bits. Taken from the django.utils.crypto module. """ return ''.join(rand.choice(allowed_chars) for __ in range(length))
@with_session def get_secret(session=None): """Generate a secret key for flask applications and store it in the database.""" web_secret = session.query(WebSecret).first() if not web_secret: web_secret = WebSecret( id='1', value=get_random_string(50, 'abcdefghijklmnopqrstuvwxyz0123456789!@#$%^&*(-_=+)'), ) session.add(web_secret) session.commit() return web_secret.value
[docs] class WeakPassword(Exception): def __init__(self, value, logger=logger, **kwargs): super().__init__() # Value is expected to be a string if not isinstance(value, str): value = str(value) self.value = value self.logger = logger self.kwargs = kwargs def __str__(self): return str(self).encode('utf-8') def __unicode__(self): return str(self.value)
[docs] class User(Base, UserMixin): """User class available for flask apps to handle authentication using flask_login.""" __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(Unicode(50), unique=True) token = Column(Unicode, default=generate_key) password = Column(Unicode) def __repr__(self): return f'<User {self.name!r}>'
[docs] def get_id(self): return self.name
[docs] class WebSecret(Base): """Store flask secret in the database.""" __tablename__ = 'secret' id = Column(Unicode, primary_key=True) value = Column(Unicode)
[docs] def register_app(path, application, name): if path in _app_register: raise ValueError(f'path {path} already registered') _app_register[path] = (application, name)
[docs] def register_home(route): """Register UI home page.""" global _home _home = route
[docs] @_default_app.route('/') def start_page(): """Redirect user to registered UI home.""" if not _home: abort(404) return redirect(_home)
[docs] def setup_server(config): """Set up and start/restart the web service.""" web_server = WebServer( bind=config['bind'], port=config['port'], ssl_certificate=config['ssl_certificate'], ssl_private_key=config['ssl_private_key'], base_url=config['base_url'], ) _default_app.secret_key = get_secret() user = get_user() if not user or not user.password: logger.warning( 'No password set for web server, create one by using `flexget web passwd <password>`' ) if _app_register: web_server.start() return web_server
[docs] class WebServer(threading.Thread): # We use a regular list for periodic jobs, so you must hold this lock while using it triggers_lock = threading.Lock() def __init__( self, bind='0.0.0.0', port=5050, ssl_certificate=None, ssl_private_key=None, base_url='' ): threading.Thread.__init__(self, name='web_server') self.bind = str(bind) # String to remove unicode warning from cherrypy startup self.port = port self.ssl_certificate = ssl_certificate self.ssl_private_key = ssl_private_key self.base_url = base_url
[docs] def start(self): # If we have already started and stopped a thread, we need to reinitialize it to create a new one if not self.is_alive(): self.__init__( bind=self.bind, port=self.port, ssl_certificate=self.ssl_certificate, ssl_private_key=self.ssl_private_key, base_url=self.base_url, ) threading.Thread.start(self)
[docs] def _start_server(self): # Mount the WSGI callable object (app) on the root directory cherrypy.tree.graft(_default_app, '/') for path, (registered_app, _name) in _app_register.items(): cherrypy.tree.graft(registered_app, self.base_url + path) cherrypy.log.error_log.propagate = False cherrypy.log.access_log.propagate = False # Set the configuration of the web server cherrypy.config.update({ 'engine.autoreload.on': False, 'server.socket_port': self.port, 'server.socket_host': self.bind, 'log.screen': False, }) if self.ssl_certificate and self.ssl_private_key: cherrypy.config.update({ 'server.ssl_module': 'builtin', 'server.ssl_certificate': self.ssl_certificate, 'server.ssl_private_key': self.ssl_private_key, }) try: host = ( self.bind if self.bind != '0.0.0.0' else socket.gethostbyname(socket.gethostname()) ) except socket.gaierror: host = '127.0.0.1' protocol = 'https' if self.ssl_certificate and self.ssl_private_key else 'http' server_url = f'{protocol}://{host}:{self.port}{self.base_url}' logger.info('Web server started at {}', server_url) for path, (_registered_app, name) in _app_register.items(): logger.info('{} available at {}{}', name, server_url, path) # Start the CherryPy WSGI web server cherrypy.engine.start() cherrypy.engine.block()
[docs] def run(self): self._start_server()
[docs] def stop(self): global _app_register logger.info('Shutting down web server') cherrypy.engine.exit() # Unregister apps _app_register = {}
@with_session def get_user(username='flexget', session=None): user = session.query(User).filter(User.name == username).first() if not user: user = User() user.name = username session.add(user) return user @with_session def change_password(username='flexget', password='', session=None): check = zxcvbn.zxcvbn(password, user_inputs=[username]) if check['score'] < 3: warning = check['feedback']['warning'] suggestions = ' '.join(check['feedback']['suggestions']) message = f"Password '{password}' is not strong enough. " if warning: message += warning + ' ' if suggestions: message += f'Suggestions: {suggestions}' raise WeakPassword(message) user = get_user(username=username, session=session) user.password = str(generate_password_hash(password)) session.commit() @with_session def generate_token(username='flexget', session=None): user = get_user(username=username, session=session) user.token = generate_key() session.commit() return user.token