Source code for flexget.components.archives.utils
"""Utilities for handling RAR and ZIP archives.
Provides wrapper archive and exception classes to simplify
archive extraction
"""
import os
import shutil
import zipfile
from pathlib import Path
from loguru import logger
try:
import rarfile
except ImportError:
rarfile = None
logger = logger.bind(name='archive')
[docs]
class ArchiveError(Exception):
"""Base exception for archive."""
[docs]
class NeedRarFile(ArchiveError):
"""Exception to be raised when rarfile module is missing."""
[docs]
class BadArchive(ArchiveError):
"""Wrapper exception for BadZipFile and BadRarFile."""
[docs]
class NeedFirstVolume(ArchiveError):
"""Wrapper exception for rarfile.NeedFirstVolume."""
[docs]
class PathError(ArchiveError):
"""Exception to be raised when an archive file doesn't exist."""
[docs]
class FSError(ArchiveError):
"""Exception to be raised on OS/IO exceptions."""
[docs]
class FileAlreadyExists(ArchiveError):
"""Exception to be raised when destination file already exists."""
[docs]
def rarfile_set_path_sep(separator):
"""Set the path separator on rarfile module."""
if rarfile:
rarfile.PATH_SEP = separator
[docs]
def makepath(path: Path) -> None:
"""Make directories as needed."""
if not Path(path).exists():
logger.debug('Creating path: {}', path)
Path(path).mkdir(parents=True)
[docs]
class Archive:
"""Base archive class. Assumes an interface similar to zipfile.ZipFile or rarfile.RarFile."""
def __init__(self, archive_object, path):
self.path = path
self.archive = archive_object(self.path)
[docs]
def close(self):
"""Release open resources."""
self.archive.close()
[docs]
def delete(self):
"""Delete the volumes that make up this archive."""
volumes = self.volumes()
self.close()
try:
for volume in volumes:
Path(volume).unlink()
logger.verbose('Deleted archive: {}', volume)
except OSError as error:
raise FSError(error)
[docs]
def volumes(self):
"""Return the list of volumes that comprise this archive."""
return [self.path]
[docs]
def infolist(self):
"""Return a list of info objects describing the contents of this archive."""
infolist = []
for info in self.archive.infolist():
try:
archive_info = ArchiveInfo(info)
infolist.append(archive_info)
except ValueError as e:
logger.debug(e)
return infolist
[docs]
def open(self, member):
"""Return file-like object from where the data of a member file can be read."""
return self.archive.open(member)
[docs]
class RarArchive(Archive):
"""Wrapper class for rarfile.RarFile."""
def __init__(self, path):
RarArchive.check_import()
try:
super().__init__(rarfile.RarFile, path)
except rarfile.BadRarFile as error:
raise BadArchive(error)
except rarfile.NeedFirstVolume as error:
raise NeedFirstVolume(error)
except rarfile.Error as error:
raise ArchiveError(error)
[docs]
def volumes(self):
"""Return the list of volumes that comprise this archive."""
return self.archive.volumelist()
[docs]
def open(self, member):
"""Return file-like object from where the data of a member file can be read."""
try:
return super().open(member)
except rarfile.Error as error:
raise ArchiveError(error)
[docs]
@staticmethod
def check_import():
if not rarfile:
raise NeedRarFile('Python module rarfile needed to handle RAR archives')
[docs]
class ZipArchive(Archive):
"""Wrapper class for zipfile.ZipFile."""
def __init__(self, path):
try:
super().__init__(zipfile.ZipFile, path)
except zipfile.BadZipfile as error:
raise BadArchive(error)
[docs]
def open(self, member):
"""Return file-like object from where the data of a member file can be read."""
try:
return super().open(member)
except zipfile.BadZipfile as error:
raise ArchiveError(error)
[docs]
class ArchiveInfo:
"""Wrapper class for archive info objects."""
def __init__(self, info):
self.info = info
self.path = info.filename
self.filename = os.path.basename(self.path)
if self._is_dir():
raise ValueError(f'Appears to be a directory: {self.path}')
[docs]
def _is_dir(self):
"""Indicate if info object looks to be a directory."""
if hasattr(self.info, 'isdir'):
return self.info.isdir()
return not self.filename
[docs]
def open_archive(archive_path):
"""Return the appropriate archive object."""
archive = None
if not archive_path.exists():
raise PathError("Path doesn't exist")
if zipfile.is_zipfile(archive_path):
archive = ZipArchive(archive_path)
logger.debug('Successfully opened ZIP: {}', archive_path)
elif rarfile and rarfile.is_rarfile(archive_path):
archive = RarArchive(archive_path)
logger.debug('Successfully opened RAR: {}', archive_path)
elif not rarfile:
logger.warning('Rarfile module not installed; unable to handle RAR archives.')
return archive
[docs]
def is_archive(path):
"""Attempt to open an entry as an archive; return True on success, False on failure."""
archive = None
try:
archive = open_archive(path)
if archive:
archive.close()
return True
except (OSError, ArchiveError) as error:
logger.debug('Failed to open file as archive: {} ({})', path, error)
return False