Source code for flexget.api.core.authentication
from __future__ import annotations
import base64
from contextlib import suppress
from typing import TYPE_CHECKING
from flask import request
from flask import session as flask_session
from flask_login import LoginManager
from flask_login.utils import current_app, current_user, login_user
from flask_restx import inputs
from werkzeug.security import check_password_hash
from flexget.api import api_app
from flexget.api.app import APIResource, Unauthorized, api, base_message_schema, success_response
from flexget.utils.database import with_session
from flexget.webserver import User
if TYPE_CHECKING:
from flask import Request, Response
from sqlalchemy.orm import Session
login_manager = LoginManager()
login_manager.init_app(api_app)
@login_manager.request_loader
@with_session
def load_user_from_request(request: Request, session: Session = None) -> User | None:
auth_value = request.headers.get('Authorization')
if not auth_value:
return None
# Login using api key
if auth_value.startswith('Token'):
with suppress(TypeError, ValueError):
token = auth_value.replace('Token ', '', 1)
return session.query(User).filter(User.token == token).first()
# Login using basic auth
if auth_value.startswith('Basic'):
with suppress(TypeError, ValueError, UnicodeDecodeError):
credentials = base64.b64decode(auth_value.replace('Basic ', '', 1)).decode()
username, password = credentials.split(':')
user = session.query(User).filter(User.name == username).first()
if user and user.password and check_password_hash(user.password, password):
return user
return None
@login_manager.user_loader
@with_session
def load_user(username: str, session: Session = None) -> User | None:
return session.query(User).filter(User.name == username).first()
[docs]
@api_app.before_request
def check_valid_login() -> Response | None:
# Allow access to root, login and swagger documentation without authentication
if (
request.path == '/'
or request.path.startswith('/auth/login')
or request.path.startswith('/auth/logout')
or request.path.startswith('/swagger')
or request.method == 'OPTIONS'
):
return None
if not current_user.is_authenticated:
return current_app.login_manager.unauthorized()
return None
# API Authentication and Authorization
auth_api = api.namespace('auth', description='Authentication')
login_api_schema = api.schema_model(
'auth.login',
{
'type': 'object',
'properties': {'username': {'type': 'string'}, 'password': {'type': 'string'}},
'required': ['username', 'password'],
'additionalProperties': False,
},
)
login_parser = api.parser()
login_parser.add_argument(
'remember',
type=inputs.boolean,
required=False,
default=False,
help='Remember login (sets persistent session cookies)',
)
[docs]
@auth_api.route('/login/')
class LoginAPI(APIResource):
[docs]
@api.validate(login_api_schema, description='Username and Password')
@api.response(Unauthorized)
@api.response(200, 'Login successful', model=base_message_schema)
@api.doc(expect=[login_parser])
def post(self, session: Session = None) -> Response:
"""Login with username and password."""
data = request.json
user_name = data.get('username')
password = data.get('password')
if data:
user = session.query(User).filter(User.name == user_name.lower()).first()
if user:
if user_name == 'flexget' and not user.password:
raise Unauthorized(
'If this is your first time running the WebUI you need to set a password via'
' the command line by running `flexget web passwd <new_password>`'
)
if user.password and check_password_hash(user.password, password):
args = login_parser.parse_args()
login_user(user, remember=args['remember'])
return success_response('user logged in')
raise Unauthorized('Invalid username or password')
[docs]
@auth_api.route('/logout/')
class LogoutAPI(APIResource):
[docs]
@api.response(200, 'Logout successful', model=base_message_schema)
def post(self, session: Session = None) -> Response:
"""Logout and clear session cookies."""
flask_session.clear()
resp = success_response('User logged out')
resp.set_cookie('flexget.token', '', expires=0)
return resp