121 lines
3.8 KiB
Python
121 lines
3.8 KiB
Python
import threading
|
|
import time
|
|
from datetime import datetime
|
|
import json
|
|
from . import fire, get_context, context_switch, register_auxiliary, auxiliary
|
|
from . import arff
|
|
import logging
|
|
import sys
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class EventGenerator:
|
|
"""
|
|
An event generator uses a generation function to generate events from
|
|
any external source.
|
|
"""
|
|
def __init__(self, context, generator, event_name='tweet', **kwargs):
|
|
self.context = context
|
|
self.event_name = event_name
|
|
self.generator = generator
|
|
self.generator_args = kwargs
|
|
self.stop_flag = threading.Event()
|
|
|
|
def start(self):
|
|
"""
|
|
Starts a thread to handle run this generator.
|
|
"""
|
|
thread = threading.Thread(target=self.run)
|
|
thread.start()
|
|
|
|
def stop(self):
|
|
"""
|
|
Requests shutdown of generator.
|
|
"""
|
|
self.stop_flag.set()
|
|
|
|
def run(self):
|
|
"""
|
|
Invoke the generator to get a sequence of events.
|
|
|
|
This method passes an event to the generator which will be set to True
|
|
if the generator should terminate. Immediate termination is not required.
|
|
"""
|
|
logger.debug("Running event generator")
|
|
with context_switch(self.context):
|
|
for event in self.generator(self.stop_flag, **self.generator_args):
|
|
fire(self.event_name, event)
|
|
|
|
|
|
def offline_tweets(stop, data_file, time_factor=1000, arff_file=None):
|
|
"""
|
|
Offline tweet replay.
|
|
|
|
Takes a datafile formatted with 1 tweet per line, and generates a sequence of
|
|
scaled realtime items.
|
|
"""
|
|
|
|
# timing functions return false if we need to abort
|
|
def delayer(duration):
|
|
logger.debug("Delay for next tweet {}s ({}s real)".format(delay, delay/time_factor))
|
|
return not stop.wait(delay / time_factor)
|
|
|
|
def immediate(duration):
|
|
return not stop.is_set()
|
|
|
|
# select timing function based on time_factor
|
|
delayed = immediate if time_factor is None else delayer
|
|
|
|
arff_data = None
|
|
if arff_file:
|
|
arff_file = open(arff_file, 'r', encoding='utf-8')
|
|
arff_data = arff.load(arff_file)
|
|
|
|
with open(data_file, encoding='utf-8') as data:
|
|
last_time = None
|
|
lines = 0
|
|
for line in data:
|
|
lines += 1
|
|
|
|
try:
|
|
tweet = json.loads(line)
|
|
if arff_file:
|
|
try:
|
|
extra_data = next(arff_data)
|
|
except StopIteration:
|
|
extra_data = None
|
|
except ValueError as e:
|
|
logger.error("Could not read arff line for tweet (reason: {})".format(e))
|
|
extra_data = None
|
|
tweet['extra'] = extra_data
|
|
except ValueError as e:
|
|
logger.error("Could not read tweet on {}:{} (reason: {})".format(data_file,lines, e))
|
|
continue
|
|
|
|
# time scale the tweet
|
|
tweet_time = datetime.strptime(tweet['created_at'], '%a %b %d %H:%M:%S %z %Y')
|
|
|
|
if not last_time:
|
|
last_time = tweet_time
|
|
|
|
wait = tweet_time - last_time
|
|
delay = wait.total_seconds()
|
|
|
|
# delay and yield or break depending on success
|
|
if delayed(delay):
|
|
yield tweet
|
|
last_time = tweet_time
|
|
else:
|
|
break
|
|
if arff_file:
|
|
arff_file.close()
|
|
|
|
|
|
def start_offline_tweets(data_file, event_name='tweet', aux_name='tweeter', **kwargs):
|
|
context = get_context()
|
|
if context is None:
|
|
raise NotImplementedError("Can not start offline tweet replay outside of a context.")
|
|
register_auxiliary(aux_name, EventGenerator(context, generator=offline_tweets, data_file=data_file, event_name=event_name, **kwargs))
|
|
auxiliary(aux_name).start()
|