summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Hecker <apoc@geekosphere.org>2015-06-14 01:36:33 +0200
committerMatthias Hecker <apoc@geekosphere.org>2015-06-14 01:36:33 +0200
commitd864b0348f25d845fa312cedfd5011b2d25022dc (patch)
tree27a48c76edc4ab3e104eb061fd84fb486da4e7a4
parent613750983b43546d8bd2732c9159ff766a1c42bd (diff)
journal: unsubscribe added
-rw-r--r--lib/rbot/journal.rb29
-rw-r--r--test/test_journal.rb11
2 files changed, 33 insertions, 7 deletions
diff --git a/lib/rbot/journal.rb b/lib/rbot/journal.rb
index 4cab11c8..09ee7369 100644
--- a/lib/rbot/journal.rb
+++ b/lib/rbot/journal.rb
@@ -209,8 +209,20 @@ module Journal
end
-
class JournalBroker
+ class Subscription
+ attr_reader :query
+ attr_reader :block
+ def initialize(broker, query, block)
+ @broker = broker
+ @query = query
+ @block = block
+ end
+ def cancel
+ @broker.unsubscribe(self)
+ end
+ end
+
def initialize(opts={})
# overrides the internal consumer with a block
@consumer = opts[:consumer]
@@ -240,9 +252,9 @@ module Journal
@consumer.call(message) if @consumer
# notify subscribers
- @subscriptions.each do |query, block|
- if query.matches? message
- block.call(message)
+ @subscriptions.each do |s|
+ if s.query.matches? message
+ s.block.call(message)
end
end
end
@@ -255,7 +267,6 @@ module Journal
@thread.raise ConsumeInterrupt.new
end
-
def publish(topic, payload)
@queue.push JournalMessage::create(topic, payload)
end
@@ -263,7 +274,13 @@ module Journal
# subscribe to messages that match the given query
def subscribe(query, &block)
raise ArgumentError.new unless block_given?
- @subscriptions << [query, block]
+ s = Subscription.new(self, query, block)
+ @subscriptions << s
+ s
+ end
+
+ def unsubscribe(subscription)
+ @subscriptions.delete subscription
end
end
diff --git a/test/test_journal.rb b/test/test_journal.rb
index d7a70a7c..cee8da0f 100644
--- a/test/test_journal.rb
+++ b/test/test_journal.rb
@@ -160,7 +160,7 @@ class JournalBrokerTest < Test::Unit::TestCase
journal = JournalBroker.new
# subscribe to messages:
- journal.subscribe(Query.define { topic 'foo' }) do |message|
+ sub = journal.subscribe(Query.define { topic 'foo' }) do |message|
received << message
end
@@ -172,6 +172,15 @@ class JournalBrokerTest < Test::Unit::TestCase
# wait for messages to be consumed:
sleep 0.1
assert_equal(2, received.length)
+
+ received.clear
+
+ journal.publish 'foo', {}
+ sleep 0.1
+ sub.cancel
+ journal.publish 'foo', {}
+ sleep 0.1
+ assert_equal(1, received.length)
end
end