Source code for flexget.plugins.operate.debug_db_sessions
import inspect
import time
from threading import Lock
import sqlalchemy
from loguru import logger
from flexget import options
from flexget.event import event
from flexget.manager import Session
logger = logger.bind(name='debug_db_sess')
open_transactions_lock = (
Lock()
) # multiple threads may call events, be safe by getting lock when using
open_transactions = {}
[docs]
def find_caller(stack):
"""Find info about first non-sqlalchemy call in stack."""
for frame in stack:
# We don't care about sqlalchemy internals
module = inspect.getmodule(frame[0])
if not hasattr(module, '__name__'):
continue
if module.__name__.startswith('sqlalchemy'):
continue
return (module.__name__, *tuple(frame[2:4]), frame[4][0].strip())
logger.warning('Transaction from unknown origin')
return None, None, None, None
[docs]
def after_begin(session, transaction, connection):
caller_info = find_caller(inspect.stack()[1:])
with open_transactions_lock:
if any(info[1] is not connection.connection for info in open_transactions.values()):
logger.warning(
'Sessions from 2 threads! Transaction 0x{:08X} opened {} Already open one(s): {}',
id(transaction),
caller_info,
open_transactions,
)
elif open_transactions:
logger.debug(
'Transaction 0x{:08X} opened {} Already open one(s): {}',
id(transaction),
caller_info,
open_transactions,
)
else:
logger.debug('Transaction 0x{:08X} opened {}', id(transaction), caller_info)
# Store information about this transaction
open_transactions[transaction] = (time.time(), connection.connection, *caller_info)
[docs]
def after_flush(session, flush_context):
if session.new or session.deleted or session.dirty:
caller_info = find_caller(inspect.stack()[1:])
with open_transactions_lock:
# Dirty hack to support SQLAlchemy 1.1 without breaking backwards compatibility
# _iterate_parents was renamed to _iterate_self_and_parents in 1.1
try:
_iterate_parents = session.transaction._iterate_parents
except AttributeError:
_iterate_parents = session.transaction._iterate_self_and_parents
tid = next(id(t) for t in _iterate_parents() if t in open_transactions)
logger.debug(
'Transaction 0x{:08X} writing {} new: {} deleted: {} dirty: {}',
tid,
caller_info,
tuple(session.new),
tuple(session.deleted),
tuple(session.dirty),
)
[docs]
def after_end(session, transaction):
caller_info = find_caller(inspect.stack()[1:])
with open_transactions_lock:
if transaction not in open_transactions:
# Transaction was created but a connection was never opened for it
return
open_time = time.time() - open_transactions[transaction][0]
msg = f'Transaction 0x{id(transaction):08X} closed {caller_info} (open time {open_time})'
if open_time > 2:
logger.warning(msg)
else:
logger.debug(msg)
del open_transactions[transaction]
[docs]
@event('manager.startup')
def debug_warnings(manager):
if manager.options.debug_db_sessions:
sqlalchemy.event.listen(Session, 'after_begin', after_begin)
sqlalchemy.event.listen(Session, 'after_flush', after_flush)
sqlalchemy.event.listen(Session, 'after_transaction_end', after_end)
[docs]
@event('options.register')
def register_parser_arguments():
options.get_parser().add_argument(
'--debug-db-sessions',
action='store_true',
help='debug session starts and ends, for finding problems with db locks',
)