diff options
Diffstat (limited to 'lib/rbot/journal.rb')
-rw-r--r-- | lib/rbot/journal.rb | 132 |
1 files changed, 81 insertions, 51 deletions
diff --git a/lib/rbot/journal.rb b/lib/rbot/journal.rb index 0b4324fe..c5bfcfea 100644 --- a/lib/rbot/journal.rb +++ b/lib/rbot/journal.rb @@ -3,6 +3,8 @@ #++ # # :title: rbot's persistent message queue +# +# Author:: Matthias Hecker (apoc@geekosphere.org) require 'thread' require 'securerandom' @@ -22,19 +24,8 @@ module Journal =end - Config.register Config::StringValue.new('journal.storage', - :default => nil, - :requires_restart => true, - :desc => 'storage engine used by the journal') - Config.register Config::StringValue.new('journal.storage.uri', - :default => nil, - :requires_restart => true, - :desc => 'storage database uri') - class InvalidJournalMessage < StandardError end - class ConsumeInterrupt < StandardError - end class StorageError < StandardError end @@ -61,6 +52,7 @@ module Journal end end + # Access payload value by key. def get(pkey, default=:exception) # IDENTITY = Object.new instead of :ex..? value = pkey.split('.').reduce(@payload) do |hash, key| if hash.has_key?(key) or hash.has_key?(key.to_sym) @@ -75,6 +67,11 @@ module Journal end end + # Access payload value by key alias for get(key, nil). + def [](key) + get(key, nil) + end + def ==(other) (@id == other.id) rescue false end @@ -104,7 +101,7 @@ module Journal end # returns a array of message instances that match the query - def find(query=nil, limit=100, offset=0) + def find(query=nil, limit=100, offset=0, &block) end # returns the number of messages that match the query @@ -176,10 +173,20 @@ module Journal attr_reader :payload def initialize(query) - @id = query[:id] - @topic = query[:topic] - @timestamp = query[:timestamp] - @payload = query[:payload] + @id = query[:id] || [] + @id = [@id] if @id.is_a? String + @topic = query[:topic] || [] + @topic = [@topic] if @topic.is_a? String + @timestamp = { + from: nil, to: nil + } + if query[:timestamp] and query[:timestamp][:from] + @timestamp[:from] = query[:timestamp][:from] + end + if query[:timestamp] and query[:timestamp][:to] + @timestamp[:to] = query[:timestamp][:to] + end + @payload = query[:payload] || {} end # returns true if the given message matches the query @@ -270,11 +277,11 @@ module Journal class JournalBroker class Subscription - attr_reader :query + attr_reader :topic attr_reader :block - def initialize(broker, query, block) + def initialize(broker, topic, block) @broker = broker - @query = query + @topic = topic @block = block end def cancel @@ -285,36 +292,27 @@ module Journal def initialize(opts={}) # overrides the internal consumer with a block @consumer = opts[:consumer] - @bot = opts[:bot] # storage backend - if @bot - @storage = opts[:storage] || Storage.create( - @bot.config['journal.storage'], @bot.config['journal.storage.uri']) - else - @storage = opts[:storage] - end + @storage = opts[:storage] unless @storage warning 'journal broker: no storage set up, won\'t persist messages' end @queue = Queue.new # consumer thread: @thread = Thread.new do - loop do + while message = @queue.pop begin - consume @queue.pop + consume message # pop(true) ... rescue ThreadError => e - rescue ConsumeInterrupt => e - error 'journal broker: stop thread, consume interrupt raised' - break rescue Exception => e error 'journal broker: exception in consumer thread' error $! end end end - # TODO: this is a first naive implementation, later we do the - # message/query matching for incoming messages more efficiently @subscriptions = [] + # lookup-table for subscriptions by their topic + @topic_subs = {} end def consume(message) @@ -322,8 +320,8 @@ module Journal @consumer.call(message) if @consumer # notify subscribers - @subscriptions.each do |s| - if s.query.matches? message + if @topic_subs.has_key? message.topic + @topic_subs[message.topic].each do |s| s.block.call(message) end end @@ -335,36 +333,68 @@ module Journal true if @storage end - def join - @thread.join - end - def shutdown - @thread.raise ConsumeInterrupt.new + log 'journal shutdown' + @subscriptions.clear + @topic_subs.clear + @queue << nil + @thread.join + @thread = nil end def publish(topic, payload) - @queue.push JournalMessage::create(topic, payload) + @queue << JournalMessage::create(topic, payload) end - # subscribe to messages that match the given query - def subscribe(query, &block) + # Subscribe to receive messages from a topic. + # + # You can use this method to subscribe to messages that + # are published within a specified topic. You must provide + # a receiving block to receive messages one-by-one. + # The method returns an instance of Subscription that can + # be used to cancel the subscription by invoking cancel + # on it. + # + # journal.subscribe('irclog') do |message| + # # received irclog messages... + # end + # + def subscribe(topic=nil, &block) raise ArgumentError.new unless block_given? - s = Subscription.new(self, query, block) + s = Subscription.new(self, topic, block) @subscriptions << s + unless @topic_subs.has_key? topic + @topic_subs[topic] = [] + end + @topic_subs[topic] << s s end - def unsubscribe(subscription) - @subscriptions.delete subscription + def unsubscribe(s) + if @topic_subs.has_key? s.topic + @topic_subs[s.topic].delete(s) + end + @subscriptions.delete s end - def find(query=nil, limit=100, offset=0, &block) + # Find and return persisted messages by a query. + # + # This method will either return all messages or call the provided + # block for each message. It will filter the messages by the + # provided Query instance. Limit and offset might be used to + # constrain the result. + # The query might also be a hash or proc that is passed to + # Query.define first. + # + # @param query [Query] + # @param limit [Integer] how many items to return + # @param offset [Integer] relative offset in results + def find(query, limit=100, offset=0, &block) + unless query.is_a? Query + query = Query.define(query) + end if block_given? - begin - res = @storage.find(query, limit, offset) - block.call(res) - end until res.length > 0 + @storage.find(query, limit, offset, &block) else @storage.find(query, limit, offset) end |