Source code for flexget.components.bittorrent.torrent

from pathlib import Path

from loguru import logger

from flexget import plugin
from flexget.event import event
from flexget.utils.bittorrent import Torrent, is_torrent_file

logger = logger.bind(name='modif_torrent')


[docs] class TorrentFilename: """Make sure that entries containing torrent-file have .torrent extension. This is enabled always by default (builtins). """ TORRENT_PRIO = 255
[docs] @plugin.priority(TORRENT_PRIO) def on_task_modify(self, task, config): # Only scan through accepted entries, as the file must have been downloaded in order to parse anything for entry in task.accepted: # skip if entry does not have file assigned if 'file' not in entry: logger.trace("{} doesn't have a file associated", entry['title']) continue if not Path(entry['file']).exists(): logger.debug('File {} does not exist', entry['file']) continue if Path(entry['file']).stat().st_size == 0: logger.debug('File {} is 0 bytes in size', entry['file']) continue if not is_torrent_file(Path(entry['file'])): continue logger.debug('{} seems to be a torrent', entry['title']) # create torrent object from torrent try: with Path(entry['file']).open('rb') as f: # NOTE: this reads entire file into memory, but we're pretty sure it's # a small torrent file since it starts with TORRENT_RE data = f.read() if 'content-length' in entry and len(data) != entry['content-length']: entry.fail( "Torrent file length doesn't match to the one reported by the server" ) self.purge(entry) continue # construct torrent object try: torrent = Torrent(data) except SyntaxError as e: entry.fail(f'{e.args[0]} - broken or invalid torrent file received') self.purge(entry) continue logger.trace('working on torrent {}', torrent) entry['torrent'] = torrent entry['torrent_info_hash'] = torrent.info_hash # if we do not have good filename (by download plugin) # for this entry, try to generate one from torrent content if entry.get('filename'): if not entry['filename'].lower().endswith('.torrent'): # filename present but without .torrent extension, add it entry['filename'] += '.torrent' else: # generate filename from torrent or fall back to title plus extension entry['filename'] = self.make_filename(torrent, entry) except Exception: logger.exception('Found an error')
[docs] @plugin.priority(TORRENT_PRIO) def on_task_output(self, task, config): for entry in task.entries: if 'torrent' in entry and entry['torrent'].modified: # re-write data into a file logger.debug('Writing modified torrent file for {}', entry['title']) with Path(entry['file']).open('wb+') as f: f.write(entry['torrent'].encode())
[docs] def make_filename(self, torrent, entry): """Build a filename for this torrent.""" title = entry['title'] files = torrent.get_filelist() if len(files) == 1: # single file, if filename is longer than title use it fn = files[0]['name'] if len(fn) > len(title): title = fn[: fn.rfind('.')] # neatify title title = title.replace('/', '_') title = title.replace(' ', '_') title = title.replace('\u200b', '') # title = title.encode('iso8859-1', 'ignore') # Damn \u200b -character, how I loathe thee # TODO: replace only zero width spaces, leave unicode alone? fn = f'{title}.torrent' logger.debug('make_filename made {}', fn) return fn
[docs] def purge(self, entry): if Path(entry['file']).exists(): logger.debug('removing temp file {} from {}', entry['file'], entry['title']) Path(entry['file']).unlink() del entry['file']
[docs] @event('plugin.register') def register_plugin(): plugin.register(TorrentFilename, 'torrent', builtin=True, api_ver=2)