diff options
-rw-r--r-- | lib/rbot/journal.rb | 44 | ||||
-rw-r--r-- | lib/rbot/journal/postgres.rb | 33 | ||||
-rw-r--r-- | test/test_journal.rb | 79 |
3 files changed, 118 insertions, 38 deletions
diff --git a/lib/rbot/journal.rb b/lib/rbot/journal.rb index cc0578de..71b0c7ff 100644 --- a/lib/rbot/journal.rb +++ b/lib/rbot/journal.rb @@ -76,7 +76,7 @@ module Journal end def ==(other) - @id == other.id + (@id == other.id) rescue false end def self.create(topic, payload, opt={}) @@ -104,16 +104,32 @@ module Journal end # returns a array of message instances that match the query - def find(query, limit=10, offset=0) + def find(query=nil, limit=100, offset=0) end # returns the number of messages that match the query - def count(query) + def count(query=nil) end - # delete messages that match the query - def delete(query) + # remove messages that match the query + def remove(query=nil) end + + # destroy the underlying table/collection + def drop + end + + # Returns all classes from the namespace that implement this interface + def self.get_impl + ObjectSpace.each_object(Class).select { |klass| klass < self } + end + end + + def create(name, uri) + log 'load journal storage adapter: ' + name + load File.join(File.dirname(__FILE__), 'journal', name + '.rb') + cls = AbstractStorage.get_impl.first + cls.new(uri: uri) end end @@ -270,7 +286,12 @@ module Journal # overrides the internal consumer with a block @consumer = opts[:consumer] # storage backend - @storage = opts[:storage] + if @bot + @storage = opts[:storage] || Storage.create( + @bot.config['journal.storage'], @bot.config['journal.storage.uri']) + else + @storage = opts[:storage] + end @queue = Queue.new # consumer thread: @thread = Thread.new do @@ -334,6 +355,17 @@ module Journal @subscriptions.delete subscription end + def find(query, limit=100, offset=0, &block) + if block_given? + begin + res = @storage.find(query, limit, offset) + block.call(res) + end until res.length > 0 + else + @storage.find(query, limit, offset) + end + end + end end # Journal diff --git a/lib/rbot/journal/postgres.rb b/lib/rbot/journal/postgres.rb index 57c539a7..65c67eb9 100644 --- a/lib/rbot/journal/postgres.rb +++ b/lib/rbot/journal/postgres.rb @@ -53,9 +53,14 @@ module Journal [m.id, m.topic, m.timestamp, JSON.generate(m.payload)]) end - def find(query, limit=100, offset=0) - sql, params = query_to_sql(query) - sql = 'SELECT * FROM journal WHERE ' + sql + ' LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i] + def find(query=nil, limit=100, offset=0) + if query + sql, params = query_to_sql(query) + sql = 'SELECT * FROM journal WHERE ' + sql + ' LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i] + else + sql = 'SELECT * FROM journal LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i] + params = [] + end res = @conn.exec_params(sql, params) res.map do |row| timestamp = DateTime.strptime(row['timestamp'], '%Y-%m-%d %H:%M:%S%z') @@ -65,13 +70,29 @@ module Journal end # returns the number of messages that match the query - def count(query) - sql, params = query_to_sql(query) - sql = 'SELECT COUNT(*) FROM journal WHERE ' + sql + def count(query=nil) + if query + sql, params = query_to_sql(query) + sql = 'SELECT COUNT(*) FROM journal WHERE ' + sql + else + sql = 'SELECT COUNT(*) FROM journal' + params = [] + end res = @conn.exec_params(sql, params) res[0]['count'].to_i end + def remove(query=nil) + if query + sql, params = query_to_sql(query) + sql = 'DELETE FROM journal WHERE ' + sql + else + sql = 'DELETE FROM journal;' + params = [] + end + res = @conn.exec_params(sql, params) + end + def drop @conn.exec('DROP TABLE journal;') rescue nil end diff --git a/test/test_journal.rb b/test/test_journal.rb index 2a522aa7..3f1ce766 100644 --- a/test/test_journal.rb +++ b/test/test_journal.rb @@ -224,41 +224,68 @@ class JournalStoragePostgresTest < Test::Unit::TestCase assert_equal(['foo.%.bar'], @storage.query_to_sql(q)[1]) end - def test_insert - # the test message to persist + def test_operations + # insertion m = JournalMessage.create('log.core', {foo: {bar: 'baz'}}) - # insert the test message: @storage.insert(m) - # find the test message by query: - q = Query.define do - topic 'log.core' - end - res = @storage.find(q) - _m = res.first - assert_equal(m, _m) # this only checks id + # query by id + res = @storage.find(Query.define { id m.id }) + assert_equal(1, res.length) + assert_equal(m, res.first) + + # check timestamp was returned correctly: assert_equal(m.timestamp.strftime('%Y-%m-%d %H:%M:%S%z'), - _m.timestamp.strftime('%Y-%m-%d %H:%M:%S%z')) - assert_equal('log.core', _m.topic) - assert_equal({'foo' => {'bar' => 'baz'}}, _m.payload) - assert_equal(1, @storage.count(q)) + res.first.timestamp.strftime('%Y-%m-%d %H:%M:%S%z')) + + # check if payload was returned correctly: + assert_equal({'foo' => {'bar' => 'baz'}}, res.first.payload) + + # query by topic + assert_equal(m, @storage.find(Query.define { topic('log.core') }).first) + assert_equal(m, @storage.find(Query.define { topic('log.*') }).first) + assert_equal(m, @storage.find(Query.define { topic('*.*') }).first) + + # query by timestamp range + assert_equal(1, @storage.find(Query.define { + timestamp(from: Time.now-DAY, to: Time.now+DAY) }).length) + assert_equal(0, @storage.find(Query.define { + timestamp(from: Time.now-DAY*2, to: Time.now-DAY) }).length) + + # query by payload + res = @storage.find(Query.define { payload('foo.bar' => 'baz') }) + assert_equal(m, res.first) + res = @storage.find(Query.define { payload('foo.bar' => 'x') }) + assert_true(res.empty?) + + # without arguments: find and count + assert_equal(1, @storage.count) + assert_equal(m, @storage.find.first) end - def test_query_range - timestamp = Time.now - DAY*7 - m = JournalMessage.create('log.core', {foo: {bar: 'baz'}}, - timestamp: timestamp) - assert_equal(timestamp, m.timestamp) + def test_operations_multiple + # test operations on multiple messages + # insert a bunch: + @storage.insert(JournalMessage.create('test.topic', {name: 'one'})) + @storage.insert(JournalMessage.create('test.topic', {name: 'two'})) + @storage.insert(JournalMessage.create('test.topic', {name: 'three'})) + @storage.insert(JournalMessage.create('archived.topic', {name: 'four'}, + timestamp: Time.now - DAY*100)) + @storage.insert(JournalMessage.create('complex', {name: 'five', country: { + name: 'Italy' + }})) + @storage.insert(JournalMessage.create('complex', {name: 'six', country: { + name: 'Austria' + }})) - @storage.insert(m) - @storage.insert(JournalMessage.create('a.foo', {})) - @storage.insert(JournalMessage.create('b.bar', {})) - @storage.insert(JournalMessage.create('b.baz', {})) - r = @storage.find(Query.define { timestamp(from: timestamp-DAY, to: timestamp+DAY) }) + end + + def test_journal + received = [] + # this journal persists messages in the test storage: + journal = JournalBroker.new(storage: @storage) - assert_equal(1, r.length) - assert_equal(m, r.first) end |