Changed tweets, added eca
This commit is contained in:
120
eca/generators.py
Normal file
120
eca/generators.py
Normal file
@@ -0,0 +1,120 @@
|
||||
import threading
|
||||
import time
|
||||
from datetime import datetime
|
||||
import json
|
||||
from . import fire, get_context, context_switch, register_auxiliary, auxiliary
|
||||
from . import arff
|
||||
import logging
|
||||
import sys
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class EventGenerator:
|
||||
"""
|
||||
An event generator uses a generation function to generate events from
|
||||
any external source.
|
||||
"""
|
||||
def __init__(self, context, generator, event_name='tweet', **kwargs):
|
||||
self.context = context
|
||||
self.event_name = event_name
|
||||
self.generator = generator
|
||||
self.generator_args = kwargs
|
||||
self.stop_flag = threading.Event()
|
||||
|
||||
def start(self):
|
||||
"""
|
||||
Starts a thread to handle run this generator.
|
||||
"""
|
||||
thread = threading.Thread(target=self.run)
|
||||
thread.start()
|
||||
|
||||
def stop(self):
|
||||
"""
|
||||
Requests shutdown of generator.
|
||||
"""
|
||||
self.stop_flag.set()
|
||||
|
||||
def run(self):
|
||||
"""
|
||||
Invoke the generator to get a sequence of events.
|
||||
|
||||
This method passes an event to the generator which will be set to True
|
||||
if the generator should terminate. Immediate termination is not required.
|
||||
"""
|
||||
logger.debug("Running event generator")
|
||||
with context_switch(self.context):
|
||||
for event in self.generator(self.stop_flag, **self.generator_args):
|
||||
fire(self.event_name, event)
|
||||
|
||||
|
||||
def offline_tweets(stop, data_file, time_factor=1000, arff_file=None):
|
||||
"""
|
||||
Offline tweet replay.
|
||||
|
||||
Takes a datafile formatted with 1 tweet per line, and generates a sequence of
|
||||
scaled realtime items.
|
||||
"""
|
||||
|
||||
# timing functions return false if we need to abort
|
||||
def delayer(duration):
|
||||
logger.debug("Delay for next tweet {}s ({}s real)".format(delay, delay/time_factor))
|
||||
return not stop.wait(delay / time_factor)
|
||||
|
||||
def immediate(duration):
|
||||
return not stop.is_set()
|
||||
|
||||
# select timing function based on time_factor
|
||||
delayed = immediate if time_factor is None else delayer
|
||||
|
||||
arff_data = None
|
||||
if arff_file:
|
||||
arff_file = open(arff_file, 'r', encoding='utf-8')
|
||||
arff_data = arff.load(arff_file)
|
||||
|
||||
with open(data_file, encoding='utf-8') as data:
|
||||
last_time = None
|
||||
lines = 0
|
||||
for line in data:
|
||||
lines += 1
|
||||
|
||||
try:
|
||||
tweet = json.loads(line)
|
||||
if arff_file:
|
||||
try:
|
||||
extra_data = next(arff_data)
|
||||
except StopIteration:
|
||||
extra_data = None
|
||||
except ValueError as e:
|
||||
logger.error("Could not read arff line for tweet (reason: {})".format(e))
|
||||
extra_data = None
|
||||
tweet['extra'] = extra_data
|
||||
except ValueError as e:
|
||||
logger.error("Could not read tweet on {}:{} (reason: {})".format(data_file,lines, e))
|
||||
continue
|
||||
|
||||
# time scale the tweet
|
||||
tweet_time = datetime.strptime(tweet['created_at'], '%a %b %d %H:%M:%S %z %Y')
|
||||
|
||||
if not last_time:
|
||||
last_time = tweet_time
|
||||
|
||||
wait = tweet_time - last_time
|
||||
delay = wait.total_seconds()
|
||||
|
||||
# delay and yield or break depending on success
|
||||
if delayed(delay):
|
||||
yield tweet
|
||||
last_time = tweet_time
|
||||
else:
|
||||
break
|
||||
if arff_file:
|
||||
arff_file.close()
|
||||
|
||||
|
||||
def start_offline_tweets(data_file, event_name='tweet', aux_name='tweeter', **kwargs):
|
||||
context = get_context()
|
||||
if context is None:
|
||||
raise NotImplementedError("Can not start offline tweet replay outside of a context.")
|
||||
register_auxiliary(aux_name, EventGenerator(context, generator=offline_tweets, data_file=data_file, event_name=event_name, **kwargs))
|
||||
auxiliary(aux_name).start()
|
||||
Reference in New Issue
Block a user