import hashlib
import mimetypes
import os
import shutil
import socket
import sys
import tempfile
from http.client import BadStatusLine
from pathlib import Path
from urllib.parse import unquote
from loguru import logger
from requests import RequestException
from flexget import options, plugin
from flexget.event import event
from flexget.utils.pathscrub import pathscrub
from flexget.utils.requests import parse_header
from flexget.utils.template import RenderError
from flexget.utils.tools import decode_html
logger = logger.bind(name='download')
tmpdir = tempfile.gettempdir()
[docs]
class PluginDownload:
"""Downloads content from entry url and writes it into a file.
Example::
download: ~/torrents/
Allow HTML content:
By default download plugin reports failure if received content
is a html. Usually this is some sort of custom error page without
proper http code and thus entry is assumed to be downloaded
incorrectly.
In the rare case you actually need to retrieve html-pages you must
disable this feature.
Example::
download:
path: ~/something/
fail_html: no
You may use commandline parameter --dl-path to temporarily override
all paths to another location.
"""
schema = {
'oneOf': [
{
'title': 'specify options',
'type': 'object',
'properties': {
'path': {'type': 'string', 'format': 'path'},
'fail_html': {'type': 'boolean', 'default': True},
'overwrite': {'type': 'boolean', 'default': False},
'temp': {'type': 'string', 'format': 'path'},
'filename': {'type': 'string'},
},
'additionalProperties': False,
},
{'title': 'specify path', 'type': 'string', 'format': 'path'},
{'title': 'no options', 'type': 'boolean', 'enum': [True]},
]
}
[docs]
def process_config(self, config):
"""Return plugin configuration in advanced form."""
if isinstance(config, str):
config = {'path': config}
if not isinstance(config, dict):
config = {}
if not config.get('path'):
config['require_path'] = True
config.setdefault('fail_html', True)
return config
[docs]
def on_task_download(self, task, config):
config = self.process_config(config)
# set temporary download path based on user's config setting or use fallback
tmp = config.get('temp', os.path.join(task.manager.config_base, 'temp'))
self.get_temp_files(
task,
require_path=config.get('require_path', False),
fail_html=config['fail_html'],
tmp_path=tmp,
)
[docs]
def get_temp_file(
self,
task,
entry,
require_path=False,
handle_magnets=False,
fail_html=True,
tmp_path=tmpdir,
):
"""Download entry content and store in temporary folder.
Fail entry with a reason if there was problem.
:param bool require_path:
whether or not entries without 'path' field are ignored
:param bool handle_magnets:
when used any of urls containing magnet link will replace url,
otherwise warning is printed.
:param fail_html:
fail entries which url respond with html content
:param tmp_path:
path to use for temporary files while downloading
"""
urls = entry.get('urls') or [entry['url']]
errors = []
for url in urls:
if url.startswith('magnet:'):
if handle_magnets:
# Set magnet link as main url, so a torrent client plugin can grab it
logger.debug('Accepting magnet url for {}', entry['title'])
entry['url'] = url
break
logger.warning("Can't download magnet url")
errors.append('Magnet URL')
continue
if require_path and 'path' not in entry:
# Don't fail here, there might be a magnet later in the list of urls
logger.debug('Skipping url {} because there is no path for download', url)
continue
error = self.process_entry(task, entry, url, tmp_path)
# disallow html content
html_mimes = ['html', 'text/html']
if entry.get('mime-type') in html_mimes and fail_html:
error = 'Unexpected html content received from `{}` - maybe a login page?'.format(
entry['url']
)
self.cleanup_temp_file(entry)
if not error:
# Set the main url, so we know where this file actually came from
logger.debug('Successfully retrieved {} from {}', entry['title'], url)
entry['url'] = url
break
errors.append(error)
else:
# check if entry must have a path (download: yes)
if require_path and 'path' not in entry:
logger.error("{} can't be downloaded, no path specified for entry", entry['title'])
entry.fail('no path specified for entry')
else:
entry.fail(', '.join(errors))
[docs]
def save_error_page(self, entry, task, page):
received = os.path.join(task.manager.config_base, 'received', task.name)
if not os.path.isdir(received):
os.makedirs(received)
filename = os.path.join(
received, pathscrub('{}.error'.format(entry['title']), filename=True)
)
logger.error(
'Error retrieving {}, the error page has been saved to {}', entry['title'], filename
)
with open(filename, 'wb') as outfile:
outfile.write(page)
[docs]
def get_temp_files(
self,
task,
require_path=False,
handle_magnets=False,
fail_html=True,
tmp_path=tmpdir,
):
"""Download all task content and store in temporary folder.
:param bool require_path:
whether or not entries without 'path' field are ignored
:param bool handle_magnets:
when used any of urls containing magnet link will replace url,
otherwise warning is printed.
:param fail_html:
fail entries which url respond with html content
:param tmp_path:
path to use for temporary files while downloading
"""
for entry in task.accepted:
self.get_temp_file(task, entry, require_path, handle_magnets, fail_html, tmp_path)
# TODO: a bit silly method, should be get rid of now with simpler exceptions ?
[docs]
def process_entry(self, task, entry, url, tmp_path):
"""Process `entry` by using `url`. Do not use entry['url'].
Do not fail the `entry` if there is a network issue, instead just log and return a string error.
:param task: Task
:param entry: Entry
:param url: Url to try download
:param tmp_path: Path to store temporary files
:return: String error, if failed.
"""
try:
if task.options.test:
logger.info('Would download: {}', entry['title'])
else:
if not task.manager.unit_test:
logger.info('Downloading: {}', entry['title'])
self.download_entry(task, entry, url, tmp_path)
except RequestException as e:
logger.warning('RequestException {}, while downloading {}', e, url)
return f'Network error during request: {e}'
except BadStatusLine as e:
logger.warning('Failed to reach server. Reason: {}', getattr(e, 'message', 'N/A'))
return 'BadStatusLine'
except OSError as e:
if hasattr(e, 'reason'):
logger.warning('Failed to reach server. Reason: {}', e.reason)
elif hasattr(e, 'code'):
logger.warning("The server couldn't fulfill the request. Error code: {}", e.code)
logger.opt(exception=True).debug('OSError')
return 'OSError'
except ValueError as e:
# Probably unknown url type
msg = f'ValueError {e}'
logger.warning(msg)
logger.opt(exception=True).debug(msg)
return msg
[docs]
def download_entry(self, task, entry, url, tmp_path):
"""Download `entry` by using `url`.
:raises: Several types of exceptions ...
:raises: PluginWarning
"""
logger.debug("Downloading url '{}'", url)
# get content
auth = None
if 'download_auth' in entry:
auth = entry['download_auth']
logger.debug(
'Custom auth enabled for {} download: {}', entry['title'], entry['download_auth']
)
headers = task.requests.headers
if 'download_headers' in entry:
headers.update(entry['download_headers'])
logger.debug(
'Custom headers enabled for {} download: {}',
entry['title'],
entry['download_headers'],
)
# expand ~ in temp path
# TODO: jinja?
try:
tmp_path = os.path.expanduser(tmp_path)
except RenderError as e:
entry.fail(f'Could not set temp path. Error during string replacement: {e}')
return
# Clean illegal characters from temp path name
tmp_path = pathscrub(tmp_path)
# create if missing
if not os.path.isdir(tmp_path):
logger.debug('creating tmp_path {}', tmp_path)
os.mkdir(tmp_path)
# check for write-access
if not os.access(tmp_path, os.W_OK):
raise plugin.PluginError(f'Not allowed to write to temp directory `{tmp_path}`')
# download and write data into a temp file
tmp_dir = tempfile.mkdtemp(dir=tmp_path)
fname = hashlib.md5(url.encode('utf-8', 'replace')).hexdigest()
datafile = os.path.join(tmp_dir, fname)
with task.requests.get(
url, auth=auth, raise_status=False, headers=headers, stream=True
) as response:
if response.status_code != 200:
logger.debug(
'Got {} response from server. Saving error page.', response.status_code
)
# Save the error page
if response.content:
self.save_error_page(entry, task, response.content)
# Raise the error
response.raise_for_status()
return
try:
with open(datafile, 'wb') as outfile:
outfile.writelines(
response.iter_content(chunk_size=150 * 1024, decode_unicode=False)
)
except Exception as e:
# don't leave futile files behind
logger.debug('Download interrupted, removing datafile')
os.remove(datafile)
if isinstance(e, socket.timeout):
logger.error('Timeout while downloading file')
else:
raise
else:
# Do a sanity check on downloaded file
if os.path.getsize(datafile) == 0:
entry.fail(f'File {datafile} is 0 bytes in size')
os.remove(datafile)
return
# store temp filename into entry so other plugins may read and modify content
# temp file is moved into final destination at self.output
entry['file'] = datafile
logger.debug('{} field file set to: {}', entry['title'], entry['file'])
if 'content-type' in response.headers:
entry['mime-type'] = str(parse_header(response.headers['content-type'])[0])
else:
entry['mime-type'] = 'unknown/unknown'
content_encoding = response.headers.get('content-encoding', '')
decompress = 'gzip' in content_encoding or 'deflate' in content_encoding
if 'content-length' in response.headers and not decompress:
entry['content-length'] = int(response.headers['content-length'])
# prefer content-disposition naming, note: content-disposition can be disabled completely
# by setting entry field `content-disposition` to False
if entry.get('content-disposition', True):
self.filename_from_headers(entry, response)
else:
logger.info('Content-disposition disabled for {}', entry['title'])
self.filename_ext_from_mime(entry)
if not entry.get('filename'):
filename = unquote(url.rsplit('/', 1)[1])
logger.debug('No filename - setting from url: {}', filename)
entry['filename'] = filename
logger.debug('Finishing download_entry() with filename {}', entry.get('filename'))
[docs]
def filename_ext_from_mime(self, entry):
"""Try to set filename extension from mime-type."""
extensions = mimetypes.guess_all_extensions(entry['mime-type'], strict=False)
if extensions:
logger.debug('Mimetype guess for {} is {} ', entry['mime-type'], extensions)
if entry.get('filename'):
if any(entry['filename'].endswith(extension) for extension in extensions):
logger.debug('Filename {} extension matches to mime-type', entry['filename'])
else:
# mimetypes library has no concept of a 'preferred' extension when there are multiple possibilities
# this causes the first to be used which is not always desirable, e.g. 'ksh' for 'text/plain'
extension = mimetypes.guess_extension(entry['mime-type'], strict=False)
logger.debug(
'Adding mime-type extension {} to {}', extension, entry['filename']
)
entry['filename'] = entry['filename'] + extension
else:
logger.debug("Python doesn't know extension for mime-type: {}", entry['mime-type'])
[docs]
def on_task_output(self, task, config):
"""Move downloaded content from temp folder to final destination."""
config = self.process_config(config)
for entry in task.accepted:
try:
self.output(task, entry, config)
except plugin.PluginWarning as e:
entry.fail()
logger.error('Plugin error while writing: {}', e)
except Exception:
entry.fail()
logger.exception('Exception while writing')
[docs]
def output(self, task, entry, config):
"""Move temp-file into final destination.
Raises:
PluginError if operation fails
"""
if 'file' not in entry and not task.options.test:
logger.debug('file missing, entry: {}', entry)
raise plugin.PluginError(
'Entry `{}` has no temp file associated with'.format(entry['title']), logger
)
try:
# use path from entry if has one, otherwise use from download definition parameter
path = entry.get('path', config.get('path'))
if not isinstance(path, str):
raise plugin.PluginError('Invalid `path` in entry `{}`'.format(entry['title']))
# override path from command line parameter
if task.options.dl_path:
path = task.options.dl_path
# expand variables in path
try:
path = Path(entry.render(path)).expanduser()
except RenderError as e:
entry.fail(f'Could not set path. Error during string replacement: {e}')
return
# If we are in test mode, report and return
if task.options.test:
logger.info('Would write `{}` to `{}`', entry['title'], path)
# Set a fake location, so the exec plugin can do string replacement during --test #1015
entry['location'] = path / 'TEST_MODE_NO_OUTPUT'
return
# make path
if not path.is_dir():
logger.debug('Creating directory {}', path)
try:
path.mkdir(parents=True)
except Exception:
raise plugin.PluginError(f'Cannot create path {path}', logger)
# check that temp file is present
if not os.path.exists(entry['file']):
logger.debug('entry: {}', entry)
raise plugin.PluginWarning(
"Downloaded temp file `{}` doesn't exist!?".format(entry['file'])
)
if config.get('filename'):
try:
entry['filename'] = entry.render(config['filename'])
logger.debug('set filename from config {}', entry['filename'])
except RenderError as e:
entry.fail(f'Could not set filename. Error during string replacement: {e}')
return
# if we still don't have a filename, try making one from title (last resort)
elif not entry.get('filename'):
entry['filename'] = entry['title']
logger.debug('set filename from title {}', entry['filename'])
if 'mime-type' not in entry:
logger.warning(
'Unable to figure proper filename for {}. Using title.', entry['title']
)
else:
guess = mimetypes.guess_extension(entry['mime-type'])
if not guess:
logger.warning('Unable to guess extension with mime-type {}', guess)
else:
self.filename_ext_from_mime(entry)
name = entry.get('filename', entry['title'])
# Remove illegal characters from filename #325, #353
name = pathscrub(name)
# Remove directory separators from filename #208
name = name.replace('/', ' ')
if sys.platform.startswith('win'):
name = name.replace('\\', ' ')
# remove duplicate spaces
name = ' '.join(name.split())
# combine to full path + filename
destfile = path / name
logger.debug('destfile: {}', destfile)
if destfile.exists():
import filecmp
if filecmp.cmp(entry['file'], destfile):
logger.debug("Identical destination file '{}' already exists", destfile)
elif config.get('overwrite'):
logger.debug('Overwriting already existing file {}', destfile)
else:
logger.info(
'File `{}` already exists and is not identical, download failed.', destfile
)
entry.fail(f'File `{destfile}` already exists and is not identical.')
return
else:
# move temp file
logger.debug('moving {} to {}', entry['file'], destfile)
try:
shutil.move(entry['file'], destfile)
except OSError as err:
# ignore permission errors
import errno
if not destfile.exists():
raise plugin.PluginError(f'Unable to write {destfile}: {err}')
if err.errno not in (errno.EPERM, errno.EACCES):
raise
# store final destination as output key
entry['location'] = destfile
finally:
self.cleanup_temp_file(entry)
[docs]
def on_task_learn(self, task, config):
"""Make sure all temp files are cleaned up after output phase."""
self.cleanup_temp_files(task)
[docs]
def on_task_abort(self, task, config):
"""Make sure all temp files are cleaned up when task is aborted."""
self.cleanup_temp_files(task)
[docs]
def cleanup_temp_file(self, entry):
if 'file' in entry:
if os.path.exists(entry['file']):
logger.debug('removing temp file {} from {}', entry['file'], entry['title'])
os.remove(entry['file'])
if os.path.exists(os.path.dirname(entry['file'])):
shutil.rmtree(os.path.dirname(entry['file']))
del entry['file']
[docs]
def cleanup_temp_files(self, task):
"""Check all entries for leftover temp files and deletes them."""
for entry in task.entries + task.rejected + task.failed:
self.cleanup_temp_file(entry)
[docs]
@event('plugin.register')
def register_plugin():
plugin.register(PluginDownload, 'download', api_ver=2)
[docs]
@event('options.register')
def register_parser_arguments():
options.get_parser('execute').add_argument(
'--dl-path',
dest='dl_path',
default=False,
metavar='PATH',
help='override path for download plugin, applies to all executed tasks',
)