import http.cookiejar
from loguru import logger
from flexget import plugin
from flexget.event import event
from flexget.utils.tools import TimedDict
logger = logger.bind(name='cookies')
[docs]
class PluginCookies:
"""Add cookie to all requests (rss, resolvers, download). Anything that uses urllib2 to be exact.
Currently supports Firefox 3 cookies only.
Example::
cookies: /path/firefox/profile/something/cookies.sqlite
"""
# TODO: 1.2 Is this a good way to handle this? How long should the time be?
# Keeps loaded cookiejars cached for some time
cookiejars = TimedDict(cache_time='5 minutes')
schema = {
'oneOf': [
{'type': 'string', 'format': 'file'},
{
'type': 'object',
'properties': {
'file': {'type': 'string', 'format': 'file'},
'type': {'type': 'string', 'enum': ['firefox3', 'mozilla', 'lwp']},
},
'additionalProperties': False,
},
]
}
[docs]
def prepare_config(self, config):
if isinstance(config, str):
config = {'file': config}
if config['file'].endswith('.txt'):
config.setdefault('type', 'mozilla')
elif config['file'].endswith('.lwp'):
config.setdefault('type', 'lwp')
else:
config.setdefault('type', 'firefox3')
return config
[docs]
def sqlite2cookie(self, filename):
from io import StringIO
try:
from pysqlite2 import dbapi2 as sqlite
except ImportError:
try:
from sqlite3 import dbapi2 as sqlite # try the 2.5+ stdlib
except ImportError:
raise plugin.PluginWarning('Unable to use sqlite3 or pysqlite2', logger)
logger.debug('connecting: {}', filename)
try:
con = sqlite.connect(filename)
except Exception:
raise plugin.PluginError('Unable to open cookies sqlite database')
cur = con.cursor()
try:
cur.execute('select host, path, isSecure, expiry, name, value from moz_cookies')
except sqlite.Error:
raise plugin.PluginError(
f'{filename} does not appear to be a valid Firefox 3 cookies file', logger
)
ftstr = ['FALSE', 'TRUE']
s = StringIO()
s.write(
"""\
# Netscape HTTP Cookie File
# http://www.netscape.com/newsref/std/cookie_spec.html
# This is a generated file! Do not edit.
"""
)
count = 0
failed = 0
logger.debug('fetching all cookies')
def notabs(val):
if isinstance(val, str):
return val.replace('\t', '')
return val
while True:
try:
item = next(cur)
# remove \t from item (#582)
item = [notabs(field) for field in item]
try:
s.write(
'{}\t{}\t{}\t{}\t{}\t{}\t{}\n'.format(
item[0],
ftstr[item[0].startswith('.')],
item[1],
ftstr[item[2]],
item[3],
item[4],
item[5],
)
)
logger.trace(
'Adding cookie for {}. key: {} value: {}', item[0], item[4], item[5]
)
count += 1
except OSError:
def to_hex(x):
return ''.join(f'{ord(c):02x}' for c in x)
for i, val in enumerate(item):
if isinstance(val, str):
logger.debug('item[{}]: {}', i, to_hex(val))
else:
logger.debug('item[{}]: {}', i, val)
failed += 1
except UnicodeDecodeError:
# for some god awful reason the sqlite module can throw UnicodeDecodeError ...
logger.debug('got UnicodeDecodeError from sqlite, ignored')
failed += 1
except StopIteration:
break
logger.debug('Added {} cookies to jar. {} failed (non-ascii)', count, failed)
s.seek(0)
con.close()
cookie_jar = http.cookiejar.MozillaCookieJar()
cookie_jar._really_load(s, '', True, True)
return cookie_jar
[docs]
def on_task_start(self, task, config):
"""Task starting, install cookiejar."""
import os
config = self.prepare_config(config)
cookie_type = config.get('type')
cookie_file = os.path.expanduser(config.get('file'))
cj = self.cookiejars.get(cookie_file, None)
if cj is not None:
logger.debug('Loading cookiejar from cache.')
elif cookie_type == 'firefox3':
logger.debug('Loading {} cookies', cookie_type)
cj = self.sqlite2cookie(cookie_file)
self.cookiejars[cookie_file] = cj
else:
if cookie_type == 'mozilla':
logger.debug('Loading {} cookies', cookie_type)
cj = http.cookiejar.MozillaCookieJar()
self.cookiejars[cookie_file] = cj
elif cookie_type == 'lwp':
logger.debug('Loading {} cookies', cookie_type)
cj = http.cookiejar.LWPCookieJar()
self.cookiejars[cookie_file] = cj
else:
raise plugin.PluginError(f'Unknown cookie type {cookie_type}', logger)
try:
cj.load(filename=cookie_file, ignore_expires=True)
logger.debug('{} cookies loaded', cookie_type)
except (http.cookiejar.LoadError, OSError):
import sys
raise plugin.PluginError(
f'Cookies could not be loaded: {sys.exc_info()[1]}', logger
)
# Add cookiejar to our requests session
task.requests.add_cookiejar(cj)
[docs]
@event('plugin.register')
def register_plugin():
plugin.register(PluginCookies, 'cookies', api_ver=2)