summaryrefslogtreecommitdiff
path: root/lib/rbot/journal.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/rbot/journal.rb')
-rw-r--r--lib/rbot/journal.rb132
1 files changed, 81 insertions, 51 deletions
diff --git a/lib/rbot/journal.rb b/lib/rbot/journal.rb
index 0b4324fe..c5bfcfea 100644
--- a/lib/rbot/journal.rb
+++ b/lib/rbot/journal.rb
@@ -3,6 +3,8 @@
#++
#
# :title: rbot's persistent message queue
+#
+# Author:: Matthias Hecker (apoc@geekosphere.org)
require 'thread'
require 'securerandom'
@@ -22,19 +24,8 @@ module Journal
=end
- Config.register Config::StringValue.new('journal.storage',
- :default => nil,
- :requires_restart => true,
- :desc => 'storage engine used by the journal')
- Config.register Config::StringValue.new('journal.storage.uri',
- :default => nil,
- :requires_restart => true,
- :desc => 'storage database uri')
-
class InvalidJournalMessage < StandardError
end
- class ConsumeInterrupt < StandardError
- end
class StorageError < StandardError
end
@@ -61,6 +52,7 @@ module Journal
end
end
+ # Access payload value by key.
def get(pkey, default=:exception) # IDENTITY = Object.new instead of :ex..?
value = pkey.split('.').reduce(@payload) do |hash, key|
if hash.has_key?(key) or hash.has_key?(key.to_sym)
@@ -75,6 +67,11 @@ module Journal
end
end
+ # Access payload value by key alias for get(key, nil).
+ def [](key)
+ get(key, nil)
+ end
+
def ==(other)
(@id == other.id) rescue false
end
@@ -104,7 +101,7 @@ module Journal
end
# returns a array of message instances that match the query
- def find(query=nil, limit=100, offset=0)
+ def find(query=nil, limit=100, offset=0, &block)
end
# returns the number of messages that match the query
@@ -176,10 +173,20 @@ module Journal
attr_reader :payload
def initialize(query)
- @id = query[:id]
- @topic = query[:topic]
- @timestamp = query[:timestamp]
- @payload = query[:payload]
+ @id = query[:id] || []
+ @id = [@id] if @id.is_a? String
+ @topic = query[:topic] || []
+ @topic = [@topic] if @topic.is_a? String
+ @timestamp = {
+ from: nil, to: nil
+ }
+ if query[:timestamp] and query[:timestamp][:from]
+ @timestamp[:from] = query[:timestamp][:from]
+ end
+ if query[:timestamp] and query[:timestamp][:to]
+ @timestamp[:to] = query[:timestamp][:to]
+ end
+ @payload = query[:payload] || {}
end
# returns true if the given message matches the query
@@ -270,11 +277,11 @@ module Journal
class JournalBroker
class Subscription
- attr_reader :query
+ attr_reader :topic
attr_reader :block
- def initialize(broker, query, block)
+ def initialize(broker, topic, block)
@broker = broker
- @query = query
+ @topic = topic
@block = block
end
def cancel
@@ -285,36 +292,27 @@ module Journal
def initialize(opts={})
# overrides the internal consumer with a block
@consumer = opts[:consumer]
- @bot = opts[:bot]
# storage backend
- if @bot
- @storage = opts[:storage] || Storage.create(
- @bot.config['journal.storage'], @bot.config['journal.storage.uri'])
- else
- @storage = opts[:storage]
- end
+ @storage = opts[:storage]
unless @storage
warning 'journal broker: no storage set up, won\'t persist messages'
end
@queue = Queue.new
# consumer thread:
@thread = Thread.new do
- loop do
+ while message = @queue.pop
begin
- consume @queue.pop
+ consume message
# pop(true) ... rescue ThreadError => e
- rescue ConsumeInterrupt => e
- error 'journal broker: stop thread, consume interrupt raised'
- break
rescue Exception => e
error 'journal broker: exception in consumer thread'
error $!
end
end
end
- # TODO: this is a first naive implementation, later we do the
- # message/query matching for incoming messages more efficiently
@subscriptions = []
+ # lookup-table for subscriptions by their topic
+ @topic_subs = {}
end
def consume(message)
@@ -322,8 +320,8 @@ module Journal
@consumer.call(message) if @consumer
# notify subscribers
- @subscriptions.each do |s|
- if s.query.matches? message
+ if @topic_subs.has_key? message.topic
+ @topic_subs[message.topic].each do |s|
s.block.call(message)
end
end
@@ -335,36 +333,68 @@ module Journal
true if @storage
end
- def join
- @thread.join
- end
-
def shutdown
- @thread.raise ConsumeInterrupt.new
+ log 'journal shutdown'
+ @subscriptions.clear
+ @topic_subs.clear
+ @queue << nil
+ @thread.join
+ @thread = nil
end
def publish(topic, payload)
- @queue.push JournalMessage::create(topic, payload)
+ @queue << JournalMessage::create(topic, payload)
end
- # subscribe to messages that match the given query
- def subscribe(query, &block)
+ # Subscribe to receive messages from a topic.
+ #
+ # You can use this method to subscribe to messages that
+ # are published within a specified topic. You must provide
+ # a receiving block to receive messages one-by-one.
+ # The method returns an instance of Subscription that can
+ # be used to cancel the subscription by invoking cancel
+ # on it.
+ #
+ # journal.subscribe('irclog') do |message|
+ # # received irclog messages...
+ # end
+ #
+ def subscribe(topic=nil, &block)
raise ArgumentError.new unless block_given?
- s = Subscription.new(self, query, block)
+ s = Subscription.new(self, topic, block)
@subscriptions << s
+ unless @topic_subs.has_key? topic
+ @topic_subs[topic] = []
+ end
+ @topic_subs[topic] << s
s
end
- def unsubscribe(subscription)
- @subscriptions.delete subscription
+ def unsubscribe(s)
+ if @topic_subs.has_key? s.topic
+ @topic_subs[s.topic].delete(s)
+ end
+ @subscriptions.delete s
end
- def find(query=nil, limit=100, offset=0, &block)
+ # Find and return persisted messages by a query.
+ #
+ # This method will either return all messages or call the provided
+ # block for each message. It will filter the messages by the
+ # provided Query instance. Limit and offset might be used to
+ # constrain the result.
+ # The query might also be a hash or proc that is passed to
+ # Query.define first.
+ #
+ # @param query [Query]
+ # @param limit [Integer] how many items to return
+ # @param offset [Integer] relative offset in results
+ def find(query, limit=100, offset=0, &block)
+ unless query.is_a? Query
+ query = Query.define(query)
+ end
if block_given?
- begin
- res = @storage.find(query, limit, offset)
- block.call(res)
- end until res.length > 0
+ @storage.find(query, limit, offset, &block)
else
@storage.find(query, limit, offset)
end