summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/rbot/journal.rb29
1 files changed, 23 insertions, 6 deletions
diff --git a/lib/rbot/journal.rb b/lib/rbot/journal.rb
index 4cab11c8..09ee7369 100644
--- a/lib/rbot/journal.rb
+++ b/lib/rbot/journal.rb
@@ -209,8 +209,20 @@ module Journal
end
-
class JournalBroker
+ class Subscription
+ attr_reader :query
+ attr_reader :block
+ def initialize(broker, query, block)
+ @broker = broker
+ @query = query
+ @block = block
+ end
+ def cancel
+ @broker.unsubscribe(self)
+ end
+ end
+
def initialize(opts={})
# overrides the internal consumer with a block
@consumer = opts[:consumer]
@@ -240,9 +252,9 @@ module Journal
@consumer.call(message) if @consumer
# notify subscribers
- @subscriptions.each do |query, block|
- if query.matches? message
- block.call(message)
+ @subscriptions.each do |s|
+ if s.query.matches? message
+ s.block.call(message)
end
end
end
@@ -255,7 +267,6 @@ module Journal
@thread.raise ConsumeInterrupt.new
end
-
def publish(topic, payload)
@queue.push JournalMessage::create(topic, payload)
end
@@ -263,7 +274,13 @@ module Journal
# subscribe to messages that match the given query
def subscribe(query, &block)
raise ArgumentError.new unless block_given?
- @subscriptions << [query, block]
+ s = Subscription.new(self, query, block)
+ @subscriptions << s
+ s
+ end
+
+ def unsubscribe(subscription)
+ @subscriptions.delete subscription
end
end