Source code for flexget.components.bittorrent.convert_magnet
from __future__ import annotations
import time
from typing import TYPE_CHECKING
from urllib.parse import quote
from loguru import logger
from flexget import plugin
from flexget.event import event
from flexget.utils.tools import parse_timedelta
if TYPE_CHECKING:
from pathlib import Path
logger = logger.bind(name='convert_magnet')
[docs]
class ConvertMagnet:
"""Convert magnet only entries to a torrent file."""
schema = {
'oneOf': [
# Allow convert_magnet: no form to turn off plugin altogether
{'type': 'boolean'},
{
'type': 'object',
'properties': {
'timeout': {'type': 'string', 'format': 'interval'},
'force': {'type': 'boolean'},
},
'additionalProperties': False,
},
]
}
[docs]
def magnet_to_torrent(self, magnet_uri, destination_folder: Path, timeout) -> str:
import libtorrent
params = libtorrent.parse_magnet_uri(magnet_uri)
session = libtorrent.session()
lt_version = [int(v) for v in libtorrent.version.split('.')]
if lt_version > [0, 16, 13, 0] and lt_version < [1, 1, 3, 0]:
# for some reason the info_hash needs to be bytes but it's a struct called sha1_hash
params['info_hash'] = params['info_hash'].to_bytes()
if lt_version < [1, 2]:
# for versions < 1.2
params['url'] = magnet_uri
else:
params.url = magnet_uri
params.save_path = str(destination_folder)
handle = session.add_torrent(params)
logger.debug('Acquiring torrent metadata for magnet {}', magnet_uri)
timeout_value = timeout
while not handle.has_metadata():
time.sleep(0.1)
timeout_value -= 0.1
if timeout_value <= 0:
raise plugin.PluginError(f'Timed out after {timeout} seconds trying to magnetize')
logger.debug('Metadata acquired')
torrent_info = handle.get_torrent_info()
torrent_file = libtorrent.create_torrent(torrent_info)
torrent_path = destination_folder / (torrent_info.name() + '.torrent')
with torrent_path.open('wb') as f:
f.write(libtorrent.bencode(torrent_file.generate()))
logger.debug('Torrent file wrote to {}', torrent_path)
return str(torrent_path)
[docs]
def prepare_config(self, config):
if not isinstance(config, dict):
config = {}
config.setdefault('timeout', '30 seconds')
config.setdefault('force', False)
return config
[docs]
@plugin.priority(plugin.PRIORITY_FIRST)
def on_task_start(self, task, config):
if config is False:
return
try:
import libtorrent # noqa: F401
except ImportError:
raise plugin.DependencyError(
'convert_magnet', 'libtorrent', 'libtorrent package required', logger
)
[docs]
@plugin.priority(130)
def on_task_download(self, task, config):
if config is False:
return
config = self.prepare_config(config)
# Create the conversion target directory
converted_path = task.manager.config_base / 'converted'
timeout = parse_timedelta(config['timeout']).total_seconds()
if not converted_path.is_dir():
converted_path.mkdir()
for entry in task.accepted:
if entry['url'].startswith('magnet:'):
entry.setdefault('urls', [entry['url']])
try:
logger.info('Converting entry {} magnet URI to a torrent file', entry['title'])
torrent_file = self.magnet_to_torrent(entry['url'], converted_path, timeout)
except (plugin.PluginError, TypeError) as e:
logger.error(
'Unable to convert Magnet URI for entry {}: {}', entry['title'], e
)
if config['force']:
entry.fail('Magnet URI conversion failed')
continue
# Windows paths need an extra / prepended to them for url
if not torrent_file.startswith('/'):
torrent_file = '/' + torrent_file
entry['url'] = torrent_file
entry['file'] = torrent_file
# make sure it's first in the list because of how download plugin works
entry['urls'].insert(0, f'file://{quote(torrent_file)}')
[docs]
@event('plugin.register')
def register_plugin():
plugin.register(ConvertMagnet, 'convert_magnet', api_ver=2)