51 lines
1.5 KiB
Python
51 lines
1.5 KiB
Python
import collections
|
|
import threading
|
|
|
|
|
|
class PubSubChannel:
|
|
"""
|
|
Publish/Subscribe channel used for distribution of events.
|
|
|
|
The operations on this channel are thread-safe, but subscribers
|
|
are executed by the publishing thread. Use a queue to decouple the
|
|
publishing thread from the consuming thread.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.lock = threading.RLock()
|
|
self.subscriptions = collections.defaultdict(list)
|
|
|
|
def subscribe(self, target, event='message'):
|
|
"""
|
|
Subscribe to an event.
|
|
|
|
The optional event name can be used to subscribe selectively.
|
|
"""
|
|
with self.lock:
|
|
self.subscriptions[event].append(target)
|
|
|
|
def unsubscribe(self, target, event='message'):
|
|
"""
|
|
Unsubscribe from an event.
|
|
|
|
The optional event name can be used to unsubscribe from another event.
|
|
"""
|
|
with self.lock:
|
|
self.subscriptions[event].remove(target)
|
|
|
|
def publish(self, event='message', data=None, delay=None):
|
|
"""
|
|
Publishes an event.
|
|
|
|
The event can be accompanied by optional data. A delay can be set to
|
|
delay the publish action by the given amount of seconds.
|
|
"""
|
|
if delay is None:
|
|
with self.lock:
|
|
for target in self.subscriptions[event]:
|
|
target(event, data)
|
|
else:
|
|
def task():
|
|
self.publish(event, data)
|
|
threading.Timer(delay, task).start()
|