121 lines
		
	
	
		
			3.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			121 lines
		
	
	
		
			3.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import threading
 | |
| import time
 | |
| from datetime import datetime
 | |
| import json
 | |
| from . import fire, get_context, context_switch, register_auxiliary, auxiliary
 | |
| from . import arff
 | |
| import logging
 | |
| import sys
 | |
| 
 | |
| 
 | |
| logger = logging.getLogger(__name__)
 | |
| 
 | |
| class EventGenerator:
 | |
|     """
 | |
|     An event generator uses a generation function to generate events from
 | |
|     any external source.
 | |
|     """
 | |
|     def __init__(self, context, generator, event_name='tweet', **kwargs):
 | |
|         self.context = context
 | |
|         self.event_name = event_name
 | |
|         self.generator = generator
 | |
|         self.generator_args = kwargs
 | |
|         self.stop_flag = threading.Event()
 | |
| 
 | |
|     def start(self):
 | |
|         """
 | |
|         Starts a thread to handle run this generator.
 | |
|         """
 | |
|         thread = threading.Thread(target=self.run)
 | |
|         thread.start()
 | |
| 
 | |
|     def stop(self):
 | |
|         """
 | |
|         Requests shutdown of generator.
 | |
|         """
 | |
|         self.stop_flag.set()
 | |
| 
 | |
|     def run(self):
 | |
|         """
 | |
|         Invoke the generator to get a sequence of events.
 | |
| 
 | |
|         This method passes an event to the generator which will be set to True
 | |
|         if the generator should terminate. Immediate termination is not required.
 | |
|         """
 | |
|         logger.debug("Running event generator")
 | |
|         with context_switch(self.context):
 | |
|             for event in self.generator(self.stop_flag, **self.generator_args):
 | |
|                 fire(self.event_name, event)
 | |
| 
 | |
|     
 | |
| def offline_tweets(stop, data_file, time_factor=1000, arff_file=None):
 | |
|     """
 | |
|     Offline tweet replay.
 | |
| 
 | |
|     Takes a datafile formatted with 1 tweet per line, and generates a sequence of
 | |
|     scaled realtime items.
 | |
|     """
 | |
| 
 | |
|     # timing functions return false if we need to abort
 | |
|     def delayer(duration):
 | |
|         logger.debug("Delay for next tweet {}s ({}s real)".format(delay, delay/time_factor))
 | |
|         return not stop.wait(delay / time_factor)
 | |
| 
 | |
|     def immediate(duration):
 | |
|         return not stop.is_set()
 | |
| 
 | |
|     # select timing function based on time_factor
 | |
|     delayed = immediate if time_factor is None else delayer
 | |
| 
 | |
|     arff_data = None
 | |
|     if arff_file:
 | |
|         arff_file = open(arff_file, 'r', encoding='utf-8')
 | |
|         arff_data = arff.load(arff_file)
 | |
| 
 | |
|     with open(data_file, encoding='utf-8') as data:
 | |
|         last_time = None
 | |
|         lines = 0
 | |
|         for line in data:
 | |
|             lines += 1
 | |
| 
 | |
|             try:
 | |
|                 tweet = json.loads(line)
 | |
|                 if arff_file:
 | |
|                     try:
 | |
|                         extra_data = next(arff_data)
 | |
|                     except StopIteration:
 | |
|                         extra_data = None
 | |
|                     except ValueError as e:
 | |
|                         logger.error("Could not read arff line for tweet (reason: {})".format(e))
 | |
|                         extra_data = None
 | |
|                     tweet['extra'] = extra_data
 | |
|             except ValueError as e:
 | |
|                 logger.error("Could not read tweet on {}:{} (reason: {})".format(data_file,lines, e))
 | |
|                 continue
 | |
| 
 | |
|             # time scale the tweet
 | |
|             tweet_time = datetime.strptime(tweet['created_at'], '%a %b %d %H:%M:%S %z %Y')
 | |
| 
 | |
|             if not last_time:
 | |
|                 last_time = tweet_time
 | |
|             
 | |
|             wait = tweet_time - last_time 
 | |
|             delay = wait.total_seconds()
 | |
|    
 | |
|             # delay and yield or break depending on success
 | |
|             if delayed(delay):
 | |
|                 yield tweet
 | |
|                 last_time = tweet_time
 | |
|             else:
 | |
|                 break
 | |
|         if arff_file:
 | |
|             arff_file.close()
 | |
| 
 | |
|       
 | |
| def start_offline_tweets(data_file, event_name='tweet', aux_name='tweeter', **kwargs):
 | |
|     context = get_context()
 | |
|     if context is None:
 | |
|         raise NotImplementedError("Can not start offline tweet replay outside of a context.")
 | |
|     register_auxiliary(aux_name, EventGenerator(context, generator=offline_tweets, data_file=data_file, event_name=event_name, **kwargs))
 | |
|     auxiliary(aux_name).start()
 |