Source code for flexget.components.ftp.ftp_download

import ftplib
import os
import time
from pathlib import Path, PurePosixPath
from urllib.parse import unquote, urlparse

from loguru import logger

from flexget import plugin
from flexget.event import event
from flexget.utils.template import RenderError

logger = logger.bind(name='ftp')


[docs] class OutputFtp: """Ftp Download plugin. input-url: ftp://<user>:<password>@<host>:<port>/<path to file> Example: ftp://anonymous:anon@my-ftp-server.com:21/torrent-files-dir config: ftp_download: use-ssl: <True/False> ftp_tmp_path: <path> delete_origin: <True/False> download_empty_dirs: <True/False> Todo: - Resume downloads - create banlists files - validate connection parameters """ schema = { 'type': 'object', 'properties': { 'use-ssl': {'type': 'boolean', 'default': False}, 'ftp_tmp_path': {'type': 'string', 'format': 'path'}, 'delete_origin': {'type': 'boolean', 'default': False}, 'download_empty_dirs': {'type': 'boolean', 'default': False}, }, 'additionalProperties': False, }
[docs] def prepare_config(self, config, task): config.setdefault('use-ssl', False) config.setdefault('delete_origin', False) config.setdefault('ftp_tmp_path', str(task.manager.config_base / 'temp')) config.setdefault('download_empty_dirs', False) return config
[docs] def ftp_connect(self, config, ftp_url, current_path: PurePosixPath): ftp = ftplib.FTP_TLS() if config['use-ssl'] else ftplib.FTP() # ftp.set_debuglevel(2) logger.debug('Connecting to {}', ftp_url.hostname) ftp.connect(ftp_url.hostname, ftp_url.port) ftp.login(ftp_url.username, ftp_url.password) if config['use-ssl']: ftp.prot_p() ftp.sendcmd('TYPE I') ftp.set_pasv(True) logger.debug('Changing directory to: {}', current_path) ftp.cwd(str(current_path)) return ftp
[docs] def check_connection(self, ftp, config, ftp_url, current_path: PurePosixPath): try: ftp.voidcmd('NOOP') except (OSError, ftplib.Error): ftp = self.ftp_connect(config, ftp_url, current_path) return ftp
[docs] def on_task_download(self, task, config): config = self.prepare_config(config, task) for entry in task.accepted: ftp_url = urlparse(entry.get('url')) ftp_url = ftp_url._replace(path=unquote(ftp_url.path)) current_path = PurePosixPath(ftp_url.path).parent try: ftp = self.ftp_connect(config, ftp_url, current_path) except ftplib.all_errors as e: entry.fail(f'Unable to connect to server : {e}') break to_path = config['ftp_tmp_path'] try: to_path = Path(entry.render(to_path)) except RenderError as err: raise plugin.PluginError( f'Path value replacement `{to_path}` failed: {err.args[0]}' ) if not to_path.exists(): logger.debug('Creating base path: {}', to_path) to_path.mkdir(parents=True) if not to_path.is_dir(): raise plugin.PluginWarning(f'Destination `{to_path}` is not a directory.') file_name = Path(ftp_url.path).name try: # Directory ftp = self.check_connection(ftp, config, ftp_url, current_path) ftp.cwd(file_name) self.ftp_walk( ftp, to_path / file_name, config, ftp_url, PurePosixPath(ftp_url.path) ) ftp = self.check_connection(ftp, config, ftp_url, current_path) ftp.cwd('..') if config['delete_origin']: ftp.rmd(file_name) except ftplib.error_perm: # File self.ftp_down(ftp, file_name, to_path, config, ftp_url, current_path) ftp.close()
[docs] def on_task_output(self, task, config): """Count this as an output plugin."""
[docs] def ftp_walk(self, ftp, tmp_path: Path, config, ftp_url, current_path: PurePosixPath): logger.debug('DIR->{}', ftp.pwd()) logger.debug('FTP tmp_path : {}', tmp_path) try: ftp = self.check_connection(ftp, config, ftp_url, current_path) dirs = ftp.nlst(ftp.pwd()) except ftplib.error_perm as ex: logger.info('Error {}', ex) return ftp if not dirs: if config['download_empty_dirs']: tmp_path.mkdir() else: logger.debug('Empty directory, skipping.') return ftp for file_name in (path for path in dirs if path not in ('.', '..')): file_name = os.path.basename(file_name) try: ftp = self.check_connection(ftp, config, ftp_url, current_path) ftp.cwd(file_name) if not tmp_path.is_dir(): tmp_path.mkdir() logger.debug('Directory {} created', tmp_path) ftp = self.ftp_walk( ftp, tmp_path / os.path.basename(file_name), config, ftp_url, current_path / os.path.basename(file_name), ) ftp = self.check_connection(ftp, config, ftp_url, current_path) ftp.cwd('..') if config['delete_origin']: ftp.rmd(os.path.basename(file_name)) except ftplib.error_perm: ftp = self.ftp_down( ftp, os.path.basename(file_name), tmp_path, config, ftp_url, current_path ) return self.check_connection(ftp, config, ftp_url, current_path)
[docs] def ftp_down( self, ftp, file_name, tmp_path: Path, config, ftp_url, current_path: PurePosixPath ): logger.debug('Downloading {} into {}', file_name, tmp_path) if not tmp_path.exists(): tmp_path.mkdir(parents=True) local_file = (tmp_path / file_name).open('a+b') ftp = self.check_connection(ftp, config, ftp_url, current_path) try: ftp.sendcmd('TYPE I') file_size = ftp.size(file_name) except Exception: file_size = 1 max_attempts = 5 size_at_last_err = 0 logger.info('Starting download of {} into {}', file_name, tmp_path) while file_size > local_file.tell(): try: if local_file.tell() != 0: ftp = self.check_connection(ftp, config, ftp_url, current_path) ftp.retrbinary(f'RETR {file_name}', local_file.write, local_file.tell()) else: ftp = self.check_connection(ftp, config, ftp_url, current_path) ftp.retrbinary(f'RETR {file_name}', local_file.write) except Exception as error: if max_attempts != 0: if size_at_last_err == local_file.tell(): # Nothing new was downloaded so the error is most likely connected to the resume functionality. # Delete the downloaded file and try again from the beginning. local_file.close() (tmp_path / file_name).unlink() local_file = (tmp_path / file_name).open('a+b') max_attempts -= 1 size_at_last_err = local_file.tell() logger.debug('Retrying download after error {}', error.args[0]) # Short timeout before retry. time.sleep(1) else: logger.error('Too many errors downloading {}. Aborting.', file_name) break local_file.close() if config['delete_origin']: ftp = self.check_connection(ftp, config, ftp_url, current_path) ftp.delete(file_name) return ftp
[docs] @event('plugin.register') def register_plugin(): plugin.register(OutputFtp, 'ftp_download', api_ver=2)