Source code for flexget.components.bittorrent.torrent_match

import os
from pathlib import Path

from loguru import logger

from flexget import plugin
from flexget.event import event
from flexget.utils.tools import aggregate_inputs

logger = logger.bind(name='torrent_match')


[docs] class TorrentMatchFile: def __init__(self, path, size): self.path = path self.size = size def __repr__(self): return f'{self.__class__.__name__}(path={self.path!s}, size={self.size})'
[docs] class TorrentMatch: """Plugin that attempts to match .torrents to local files.""" schema = { 'type': 'object', 'properties': { 'what': { 'type': 'array', 'items': { 'allOf': [ {'$ref': '/schema/plugins?phase=input'}, {'maxProperties': 1, 'minProperties': 1}, ] }, }, 'max_size_difference': {'type': 'string', 'format': 'percent', 'default': '0%'}, }, 'required': ['what'], 'additionalProperties': False, }
[docs] def get_local_files(self, config, task): cwd = Path.cwd() # save the current working directory entries = aggregate_inputs(task, config['what']) result = [] for entry in entries: location = entry.get('location') if not location or not location.exists(): logger.warning('{} is not a local file. Skipping.', entry['title']) entry.reject('not a local file') continue result.append(entry) entry['files'] = [] if location.is_file(): entry['files'].append(TorrentMatchFile(location, location.stat().st_size)) else: # change working dir to make things simpler os.chdir(location) # traverse the file tree for root, _, files in os.walk('.'): # we only need to iterate over files for f in files: file_path = Path(root) / f # We need normpath to strip out the dot abs_file_path = os.path.normpath(location / file_path) entry['files'].append( TorrentMatchFile(abs_file_path, file_path.stat().st_size) ) # restore the working directory os.chdir(cwd) return result
# Run last in download phase to make sure we have downloaded .torrent to temp before modify phase
[docs] @plugin.priority(0) def on_task_download(self, task, config): for entry in task.accepted: if 'file' not in entry and 'download' not in task.config: # If the download plugin is not enabled, we need to call it to get # our temp .torrent files plugin.get('download', self).get_temp_files( task, handle_magnets=True, fail_html=True )
[docs] def prepare_config(self, config): if not isinstance(config['max_size_difference'], float): config['max_size_difference'] = float(config['max_size_difference'].rstrip('%')) return config
# Run after 'torrent' plugin, this is not really a modify plugin though, but we need 'torrent' field
[docs] def on_task_modify(self, task, config): config = self.prepare_config(config) max_size_difference = config['max_size_difference'] local_entries = self.get_local_files(config, task) matched_entries = set() for entry in task.accepted: if 'torrent' not in entry: logger.debug('Skipping entry {} as it is not a torrent file', entry['title']) continue # Find all files and file sizes in the .torrent. torrent_files = [] for item in entry['torrent'].get_filelist(): # if torrent is a multi_file, prepend the name path = Path(item['path']) / item['name'] if entry['torrent'].is_multi_file: path = Path(entry['torrent'].name) / path torrent_files.append(TorrentMatchFile(path, item['size'])) # Iterate over the files/dirs from the what plugins for local_entry in local_entries: logger.debug( 'Checking local entry {} against {}', local_entry['title'], entry['title'] ) local_files = local_entry['files'] # skip root dir of the local entry if torrent is single file has_root_dir = entry['torrent'].is_multi_file and entry['torrent'].name if not has_root_dir: # single-file torrent_file = torrent_files[0] for local_file in local_files: if ( str(torrent_file.path) in str(local_file.path) and torrent_file.size == local_file.size ): # if the filename with ext is contained in 'location', we must grab its parent as path if torrent_file.path.name in str(local_entry['location']): entry['path'] = local_entry['location'].parent else: entry['path'] = local_entry['location'] logger.debug('Path for {} set to {}', entry['title'], entry['path']) matched_entries.add(entry) break else: matches = 0 missing_size = 0 total_size = 0 path = '' candidate_files = [] # Find candidate files ie. files whose path contains the torrent name for local_file in local_files: if entry['torrent'].name in str(local_file.path): # we need to find the path that contains the torrent name since it's multi-file if not path: # attempt to extract path from the absolute file path path = local_file.path while entry['torrent'].name in path: path = os.path.dirname(path) candidate_files.append(local_file) logger.debug('Path for {} will be set to {}', entry['title'], path) for torrent_file in torrent_files: for candidate in candidate_files: if ( str(torrent_file.path) in candidate.path and torrent_file.size == candidate.size ): logger.debug( 'Path {} matched local file path {}', torrent_file.path, candidate.path, ) matches += 1 break else: logger.debug('No local paths matched {}', torrent_file.path) missing_size += torrent_file.size total_size += torrent_file.size size_difference = missing_size / total_size * 100 # we allow torrents that either match entirely or if the total size difference is below a threshold if matches == len(torrent_files) or max_size_difference >= size_difference: matched_entries.add(entry) # set the path of the torrent entry entry['path'] = path logger.debug('Torrent {} matched path {}', entry['title'], entry['path']) # TODO: keep searching for even better matches? break for entry in set(task.accepted).difference(matched_entries): entry.reject( f'No local files matched {100 - max_size_difference}% of the torrent size' )
[docs] @event('plugin.register') def register_plugin(): plugin.register(TorrentMatch, 'torrent_match', api_ver=2)