Source code for flexget.plugins.input.twitterfeed
import re
from loguru import logger
from flexget import plugin
from flexget.entry import Entry
from flexget.event import event
logger = logger.bind(name='twitterfeed')
# Size of the chunks when fetching a timeline
CHUNK_SIZE = 200
# Maximum number of tweets to fetch no matter how (if there is no
# since_id for example or a too old since_id)
MAX_TWEETS = 1000
[docs]
class TwitterFeed:
"""Parse a twitter feed.
Example::
twitterfeed:
account: <account>
consumer_key: <consumer_key>
consumer_secret: <consumer_secret>
access_token_key: <access_token_key>
access_token_secret: <access_token_secret>
By default, the 50 last tweets are fetched corresponding to the option:
all_entries: yes
To change that default number:
tweets: 75
Beware that Twitter only allows 300 requests during a 15 minutes
window.
If you want to process only new tweets:
all_entries: no
That option's behaviour is changed if the corresponding task's
configuration has been changed. In that case, new tweets are
fetched and if there are no more than `tweets`, older ones are
fetched to have `tweets` of them in total.
"""
schema = {
'type': 'object',
'properties': {
'account': {'type': 'string'},
'consumer_key': {'type': 'string'},
'consumer_secret': {'type': 'string'},
'access_token_key': {'type': 'string'},
'access_token_secret': {'type': 'string'},
'all_entries': {'type': 'boolean', 'default': True},
'tweets': {'type': 'number', 'default': 50},
},
'required': [
'account',
'consumer_key',
'consumer_secret',
'access_token_secret',
'access_token_key',
],
'additionalProperties': False,
}
[docs]
def on_task_start(self, task, config):
try:
import twitter # noqa: F401
except ImportError:
raise plugin.PluginError('twitter module required', logger=logger)
[docs]
def on_task_input(self, task, config):
import twitter
account = config['account']
logger.debug('Looking at twitter account `{}`', account)
try:
self.api = twitter.Api(
consumer_key=config['consumer_key'],
consumer_secret=config['consumer_secret'],
access_token_key=config['access_token_key'],
access_token_secret=config['access_token_secret'],
)
except twitter.TwitterError as ex:
raise plugin.PluginError(
f'Unable to authenticate to twitter for task {task.name}: {ex}'
)
if config['all_entries']:
logger.debug(
'Fetching {} last tweets from {} timeline', config['tweets'], config['account']
)
tweets = self.get_tweets(account, number=config['tweets'])
else:
# Fetching from where we left off last time
since_id = task.simple_persistence.get('since_id', None)
if since_id:
logger.debug(
'Fetching from tweet id {} from {} timeline', since_id, config['account']
)
kwargs = {'since_id': since_id}
else:
logger.debug('No since_id, fetching last {} tweets', config['tweets'])
kwargs = {'number': config['tweets']}
tweets = self.get_tweets(account, **kwargs)
if task.config_modified and len(tweets) < config['tweets']:
logger.debug(
'Configuration modified; fetching at least {} tweets', config['tweets']
)
max_id = tweets[-1].id if tweets else None
remaining_tweets = config['tweets'] - len(tweets)
tweets = tweets + self.get_tweets(account, max_id=max_id, number=remaining_tweets)
if tweets:
last_tweet = tweets[0]
logger.debug('New last tweet id: {}', last_tweet.id)
task.simple_persistence['since_id'] = last_tweet.id
logger.debug('{} tweets fetched', len(tweets))
for t in tweets:
logger.debug('id:{}', t.id)
return [self.entry_from_tweet(e) for e in tweets]
[docs]
def get_tweets(self, account, number=MAX_TWEETS, since_id=None, max_id=None):
"""Fetch tweets from twitter account `account`."""
import twitter
all_tweets = []
while number > 0:
try:
tweets = self.api.GetUserTimeline(
screen_name=account,
include_rts=False,
exclude_replies=True,
count=min(number, CHUNK_SIZE),
since_id=since_id,
max_id=max_id,
)
except twitter.TwitterError as e:
raise plugin.PluginError(f'Unable to fetch timeline {account} for {e}')
if not tweets:
break
all_tweets += tweets
number -= len(tweets)
max_id = tweets[-1].id - 1
return all_tweets
[docs]
def entry_from_tweet(self, tweet):
new_entry = Entry()
new_entry['title'] = tweet.text
urls = re.findall(r'(https?://\S+)', tweet.text)
new_entry['urls'] = urls
if urls:
new_entry['url'] = urls[0]
return new_entry
[docs]
@event('plugin.register')
def register_plugin():
plugin.register(TwitterFeed, 'twitterfeed', api_ver=2)