summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/rbot/journal.rb44
-rw-r--r--lib/rbot/journal/postgres.rb33
-rw-r--r--test/test_journal.rb79
3 files changed, 118 insertions, 38 deletions
diff --git a/lib/rbot/journal.rb b/lib/rbot/journal.rb
index cc0578de..71b0c7ff 100644
--- a/lib/rbot/journal.rb
+++ b/lib/rbot/journal.rb
@@ -76,7 +76,7 @@ module Journal
end
def ==(other)
- @id == other.id
+ (@id == other.id) rescue false
end
def self.create(topic, payload, opt={})
@@ -104,16 +104,32 @@ module Journal
end
# returns a array of message instances that match the query
- def find(query, limit=10, offset=0)
+ def find(query=nil, limit=100, offset=0)
end
# returns the number of messages that match the query
- def count(query)
+ def count(query=nil)
end
- # delete messages that match the query
- def delete(query)
+ # remove messages that match the query
+ def remove(query=nil)
end
+
+ # destroy the underlying table/collection
+ def drop
+ end
+
+ # Returns all classes from the namespace that implement this interface
+ def self.get_impl
+ ObjectSpace.each_object(Class).select { |klass| klass < self }
+ end
+ end
+
+ def create(name, uri)
+ log 'load journal storage adapter: ' + name
+ load File.join(File.dirname(__FILE__), 'journal', name + '.rb')
+ cls = AbstractStorage.get_impl.first
+ cls.new(uri: uri)
end
end
@@ -270,7 +286,12 @@ module Journal
# overrides the internal consumer with a block
@consumer = opts[:consumer]
# storage backend
- @storage = opts[:storage]
+ if @bot
+ @storage = opts[:storage] || Storage.create(
+ @bot.config['journal.storage'], @bot.config['journal.storage.uri'])
+ else
+ @storage = opts[:storage]
+ end
@queue = Queue.new
# consumer thread:
@thread = Thread.new do
@@ -334,6 +355,17 @@ module Journal
@subscriptions.delete subscription
end
+ def find(query, limit=100, offset=0, &block)
+ if block_given?
+ begin
+ res = @storage.find(query, limit, offset)
+ block.call(res)
+ end until res.length > 0
+ else
+ @storage.find(query, limit, offset)
+ end
+ end
+
end
end # Journal
diff --git a/lib/rbot/journal/postgres.rb b/lib/rbot/journal/postgres.rb
index 57c539a7..65c67eb9 100644
--- a/lib/rbot/journal/postgres.rb
+++ b/lib/rbot/journal/postgres.rb
@@ -53,9 +53,14 @@ module Journal
[m.id, m.topic, m.timestamp, JSON.generate(m.payload)])
end
- def find(query, limit=100, offset=0)
- sql, params = query_to_sql(query)
- sql = 'SELECT * FROM journal WHERE ' + sql + ' LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i]
+ def find(query=nil, limit=100, offset=0)
+ if query
+ sql, params = query_to_sql(query)
+ sql = 'SELECT * FROM journal WHERE ' + sql + ' LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i]
+ else
+ sql = 'SELECT * FROM journal LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i]
+ params = []
+ end
res = @conn.exec_params(sql, params)
res.map do |row|
timestamp = DateTime.strptime(row['timestamp'], '%Y-%m-%d %H:%M:%S%z')
@@ -65,13 +70,29 @@ module Journal
end
# returns the number of messages that match the query
- def count(query)
- sql, params = query_to_sql(query)
- sql = 'SELECT COUNT(*) FROM journal WHERE ' + sql
+ def count(query=nil)
+ if query
+ sql, params = query_to_sql(query)
+ sql = 'SELECT COUNT(*) FROM journal WHERE ' + sql
+ else
+ sql = 'SELECT COUNT(*) FROM journal'
+ params = []
+ end
res = @conn.exec_params(sql, params)
res[0]['count'].to_i
end
+ def remove(query=nil)
+ if query
+ sql, params = query_to_sql(query)
+ sql = 'DELETE FROM journal WHERE ' + sql
+ else
+ sql = 'DELETE FROM journal;'
+ params = []
+ end
+ res = @conn.exec_params(sql, params)
+ end
+
def drop
@conn.exec('DROP TABLE journal;') rescue nil
end
diff --git a/test/test_journal.rb b/test/test_journal.rb
index 2a522aa7..3f1ce766 100644
--- a/test/test_journal.rb
+++ b/test/test_journal.rb
@@ -224,41 +224,68 @@ class JournalStoragePostgresTest < Test::Unit::TestCase
assert_equal(['foo.%.bar'], @storage.query_to_sql(q)[1])
end
- def test_insert
- # the test message to persist
+ def test_operations
+ # insertion
m = JournalMessage.create('log.core', {foo: {bar: 'baz'}})
- # insert the test message:
@storage.insert(m)
- # find the test message by query:
- q = Query.define do
- topic 'log.core'
- end
- res = @storage.find(q)
- _m = res.first
- assert_equal(m, _m) # this only checks id
+ # query by id
+ res = @storage.find(Query.define { id m.id })
+ assert_equal(1, res.length)
+ assert_equal(m, res.first)
+
+ # check timestamp was returned correctly:
assert_equal(m.timestamp.strftime('%Y-%m-%d %H:%M:%S%z'),
- _m.timestamp.strftime('%Y-%m-%d %H:%M:%S%z'))
- assert_equal('log.core', _m.topic)
- assert_equal({'foo' => {'bar' => 'baz'}}, _m.payload)
- assert_equal(1, @storage.count(q))
+ res.first.timestamp.strftime('%Y-%m-%d %H:%M:%S%z'))
+
+ # check if payload was returned correctly:
+ assert_equal({'foo' => {'bar' => 'baz'}}, res.first.payload)
+
+ # query by topic
+ assert_equal(m, @storage.find(Query.define { topic('log.core') }).first)
+ assert_equal(m, @storage.find(Query.define { topic('log.*') }).first)
+ assert_equal(m, @storage.find(Query.define { topic('*.*') }).first)
+
+ # query by timestamp range
+ assert_equal(1, @storage.find(Query.define {
+ timestamp(from: Time.now-DAY, to: Time.now+DAY) }).length)
+ assert_equal(0, @storage.find(Query.define {
+ timestamp(from: Time.now-DAY*2, to: Time.now-DAY) }).length)
+
+ # query by payload
+ res = @storage.find(Query.define { payload('foo.bar' => 'baz') })
+ assert_equal(m, res.first)
+ res = @storage.find(Query.define { payload('foo.bar' => 'x') })
+ assert_true(res.empty?)
+
+ # without arguments: find and count
+ assert_equal(1, @storage.count)
+ assert_equal(m, @storage.find.first)
end
- def test_query_range
- timestamp = Time.now - DAY*7
- m = JournalMessage.create('log.core', {foo: {bar: 'baz'}},
- timestamp: timestamp)
- assert_equal(timestamp, m.timestamp)
+ def test_operations_multiple
+ # test operations on multiple messages
+ # insert a bunch:
+ @storage.insert(JournalMessage.create('test.topic', {name: 'one'}))
+ @storage.insert(JournalMessage.create('test.topic', {name: 'two'}))
+ @storage.insert(JournalMessage.create('test.topic', {name: 'three'}))
+ @storage.insert(JournalMessage.create('archived.topic', {name: 'four'},
+ timestamp: Time.now - DAY*100))
+ @storage.insert(JournalMessage.create('complex', {name: 'five', country: {
+ name: 'Italy'
+ }}))
+ @storage.insert(JournalMessage.create('complex', {name: 'six', country: {
+ name: 'Austria'
+ }}))
- @storage.insert(m)
- @storage.insert(JournalMessage.create('a.foo', {}))
- @storage.insert(JournalMessage.create('b.bar', {}))
- @storage.insert(JournalMessage.create('b.baz', {}))
- r = @storage.find(Query.define { timestamp(from: timestamp-DAY, to: timestamp+DAY) })
+ end
+
+ def test_journal
+ received = []
+ # this journal persists messages in the test storage:
+ journal = JournalBroker.new(storage: @storage)
- assert_equal(1, r.length)
- assert_equal(m, r.first)
end