summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/rbot/journal.rb10
-rw-r--r--lib/rbot/journal/postgres.rb116
-rw-r--r--test/test_journal.rb80
3 files changed, 195 insertions, 11 deletions
diff --git a/lib/rbot/journal.rb b/lib/rbot/journal.rb
index 27565d06..cc0578de 100644
--- a/lib/rbot/journal.rb
+++ b/lib/rbot/journal.rb
@@ -75,10 +75,14 @@ module Journal
end
end
- def self.create(topic, payload)
+ def ==(other)
+ @id == other.id
+ end
+
+ def self.create(topic, payload, opt={})
JournalMessage.new(
- id: SecureRandom.uuid,
- timestamp: Time.now,
+ id: opt[:id] || SecureRandom.uuid,
+ timestamp: opt[:timestamp] || Time.now,
topic: topic,
payload: payload
)
diff --git a/lib/rbot/journal/postgres.rb b/lib/rbot/journal/postgres.rb
index 85514fb2..57c539a7 100644
--- a/lib/rbot/journal/postgres.rb
+++ b/lib/rbot/journal/postgres.rb
@@ -5,12 +5,20 @@
# :title: journal backend for postgresql
require 'pg'
+require 'json'
module Irc
class Bot
module Journal
+
+ class Query
+ end
+
module Storage
+
class PostgresStorage < AbstractStorage
+ attr_reader :conn
+
def initialize(opts={})
@uri = opts[:uri] || 'postgresql://localhost/rbot_journal'
@conn = PG.connect(@uri)
@@ -25,12 +33,118 @@ module Journal
'PostgreSQL Version too old: %s, supported: >= 9.3' % [@version])
end
@jsonb = (version >= 940)
- log 'journal storage: no jsonb support, consider upgrading postgres'
+ log 'journal storage: no jsonb support, consider upgrading postgres' unless @jsonb
+ drop if opts[:drop]
create_table
end
def create_table
+ @conn.exec('
+ CREATE TABLE IF NOT EXISTS journal
+ (id UUID PRIMARY KEY,
+ topic TEXT NOT NULL,
+ timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
+ payload %s NOT NULL)' % [@jsonb ? 'JSONB' : 'JSON'])
+ end
+
+ def insert(m)
+ @conn.exec_params('INSERT INTO journal VALUES ($1, $2, $3, $4);',
+ [m.id, m.topic, m.timestamp, JSON.generate(m.payload)])
+ end
+
+ def find(query, limit=100, offset=0)
+ sql, params = query_to_sql(query)
+ sql = 'SELECT * FROM journal WHERE ' + sql + ' LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i]
+ res = @conn.exec_params(sql, params)
+ res.map do |row|
+ timestamp = DateTime.strptime(row['timestamp'], '%Y-%m-%d %H:%M:%S%z')
+ JournalMessage.new(id: row['id'], timestamp: timestamp,
+ topic: row['topic'], payload: JSON.parse(row['payload']))
+ end
+ end
+
+ # returns the number of messages that match the query
+ def count(query)
+ sql, params = query_to_sql(query)
+ sql = 'SELECT COUNT(*) FROM journal WHERE ' + sql
+ res = @conn.exec_params(sql, params)
+ res[0]['count'].to_i
+ end
+
+ def drop
+ @conn.exec('DROP TABLE journal;') rescue nil
+ end
+
+ def query_to_sql(query)
+ params = []
+ placeholder = Proc.new do |value|
+ params << value
+ '$%d' % [params.length]
+ end
+ sql = {op: 'AND', list: []}
+
+ # ID query OR condition
+ unless query.id.empty?
+ sql[:list] << {
+ op: 'OR',
+ list: query.id.map { |id|
+ 'id = ' + placeholder.call(id)
+ }
+ }
+ end
+
+ # Topic query OR condition
+ unless query.topic.empty?
+ sql[:list] << {
+ op: 'OR',
+ list: query.topic.map { |topic|
+ 'topic ILIKE ' + placeholder.call(topic.gsub('*', '%'))
+ }
+ }
+ end
+
+ # Timestamp range query AND condition
+ if query.timestamp[:from] or query.timestamp[:to]
+ list = []
+ if query.timestamp[:from]
+ list << 'timestamp >= ' + placeholder.call(query.timestamp[:from])
+ end
+ if query.timestamp[:to]
+ list << 'timestamp <= ' + placeholder.call(query.timestamp[:to])
+ end
+ sql[:list] << {
+ op: 'AND',
+ list: list
+ }
+ end
+
+ # Payload query
+ unless query.payload.empty?
+ list = []
+ query.payload.each_pair do |key, value|
+ selector = 'payload'
+ k = key.to_s.split('.')
+ k.each_index { |i|
+ if i >= k.length-1
+ selector += '->>\'%s\'' % [@conn.escape_string(k[i])]
+ else
+ selector += '->\'%s\'' % [@conn.escape_string(k[i])]
+ end
+ }
+ list << selector + ' = ' + placeholder.call(value)
+ end
+ sql[:list] << {
+ op: 'OR',
+ list: list
+ }
+ end
+
+ sql = sql[:list].map { |stmt|
+ '(' + stmt[:list].join(' %s ' % [stmt[:op]]) + ')'
+ }.join(' %s ' % [sql[:op]])
+
+ [sql, params]
end
end
end
diff --git a/test/test_journal.rb b/test/test_journal.rb
index 1e55f54b..2a522aa7 100644
--- a/test/test_journal.rb
+++ b/test/test_journal.rb
@@ -5,6 +5,8 @@ require 'rbot/ircbot'
require 'rbot/journal'
require 'rbot/journal/postgres.rb'
+DAY=60*60*24
+
class JournalMessageTest < Test::Unit::TestCase
include Irc::Bot::Journal
@@ -100,8 +102,6 @@ class QueryTest < Test::Unit::TestCase
assert_false(q.topic_matches?('baz.alice.bob..foo'))
end
-
- DAY=60*60*24
def test_matches
q = Query.define do
#id 'foo', 'bar'
@@ -186,14 +186,80 @@ class JournalBrokerTest < Test::Unit::TestCase
end
-class JournalStorageTest < Test::Unit::TestCase
+class JournalStoragePostgresTest < Test::Unit::TestCase
include Irc::Bot::Journal
- def test_storage
- s = Storage::PostgresStorage.new(
- uri: 'postgresql://apoc:seed42@localhost/rbot_journal')
- assert_equal(true, true)
+ def setup
+ @storage = Storage::PostgresStorage.new(
+ uri: ENV['DB_URI'] || 'postgresql://localhost/rbot_journal',
+ drop: true)
+ end
+
+ def teardown
+ @storage.drop
+ end
+
+ def test_query_to_sql
+ q = Query.define do
+ id 'foo'
+ id 'bar', 'baz'
+ topic 'log.irc.*'
+ topic 'log.core', 'baz'
+ timestamp from: Time.now, to: Time.now + 60 * 10
+ payload 'action': :privmsg, 'alice': 'bob'
+ payload 'channel': '#rbot'
+ payload 'foo.bar': 'baz'
+ end
+ sql = @storage.query_to_sql(q)
+ assert_equal("(id = $1 OR id = $2 OR id = $3) AND (topic ILIKE $4 OR topic ILIKE $5 OR topic ILIKE $6) AND (timestamp >= $7 AND timestamp <= $8) AND (payload->>'action' = $9 OR payload->>'alice' = $10 OR payload->>'channel' = $11 OR payload->'foo'->>'bar' = $12)", sql[0])
+ q = Query.define do
+ id 'foo'
+ end
+ assert_equal('(id = $1)', @storage.query_to_sql(q)[0])
+ q = Query.define do
+ topic 'foo.*.bar'
+ end
+ assert_equal('(topic ILIKE $1)', @storage.query_to_sql(q)[0])
+ assert_equal(['foo.%.bar'], @storage.query_to_sql(q)[1])
+ end
+
+ def test_insert
+ # the test message to persist
+ m = JournalMessage.create('log.core', {foo: {bar: 'baz'}})
+ # insert the test message:
+ @storage.insert(m)
+
+ # find the test message by query:
+ q = Query.define do
+ topic 'log.core'
+ end
+ res = @storage.find(q)
+ _m = res.first
+ assert_equal(m, _m) # this only checks id
+ assert_equal(m.timestamp.strftime('%Y-%m-%d %H:%M:%S%z'),
+ _m.timestamp.strftime('%Y-%m-%d %H:%M:%S%z'))
+ assert_equal('log.core', _m.topic)
+ assert_equal({'foo' => {'bar' => 'baz'}}, _m.payload)
+ assert_equal(1, @storage.count(q))
+ end
+
+ def test_query_range
+ timestamp = Time.now - DAY*7
+ m = JournalMessage.create('log.core', {foo: {bar: 'baz'}},
+ timestamp: timestamp)
+ assert_equal(timestamp, m.timestamp)
+
+ @storage.insert(m)
+ @storage.insert(JournalMessage.create('a.foo', {}))
+ @storage.insert(JournalMessage.create('b.bar', {}))
+ @storage.insert(JournalMessage.create('b.baz', {}))
+
+ r = @storage.find(Query.define { timestamp(from: timestamp-DAY, to: timestamp+DAY) })
+
+ assert_equal(1, r.length)
+ assert_equal(m, r.first)
+
end
end