diff options
-rw-r--r-- | lib/rbot/journal.rb | 10 | ||||
-rw-r--r-- | lib/rbot/journal/postgres.rb | 116 | ||||
-rw-r--r-- | test/test_journal.rb | 80 |
3 files changed, 195 insertions, 11 deletions
diff --git a/lib/rbot/journal.rb b/lib/rbot/journal.rb index 27565d06..cc0578de 100644 --- a/lib/rbot/journal.rb +++ b/lib/rbot/journal.rb @@ -75,10 +75,14 @@ module Journal end end - def self.create(topic, payload) + def ==(other) + @id == other.id + end + + def self.create(topic, payload, opt={}) JournalMessage.new( - id: SecureRandom.uuid, - timestamp: Time.now, + id: opt[:id] || SecureRandom.uuid, + timestamp: opt[:timestamp] || Time.now, topic: topic, payload: payload ) diff --git a/lib/rbot/journal/postgres.rb b/lib/rbot/journal/postgres.rb index 85514fb2..57c539a7 100644 --- a/lib/rbot/journal/postgres.rb +++ b/lib/rbot/journal/postgres.rb @@ -5,12 +5,20 @@ # :title: journal backend for postgresql require 'pg' +require 'json' module Irc class Bot module Journal + + class Query + end + module Storage + class PostgresStorage < AbstractStorage + attr_reader :conn + def initialize(opts={}) @uri = opts[:uri] || 'postgresql://localhost/rbot_journal' @conn = PG.connect(@uri) @@ -25,12 +33,118 @@ module Journal 'PostgreSQL Version too old: %s, supported: >= 9.3' % [@version]) end @jsonb = (version >= 940) - log 'journal storage: no jsonb support, consider upgrading postgres' + log 'journal storage: no jsonb support, consider upgrading postgres' unless @jsonb + drop if opts[:drop] create_table end def create_table + @conn.exec(' + CREATE TABLE IF NOT EXISTS journal + (id UUID PRIMARY KEY, + topic TEXT NOT NULL, + timestamp TIMESTAMP WITH TIME ZONE NOT NULL, + payload %s NOT NULL)' % [@jsonb ? 'JSONB' : 'JSON']) + end + + def insert(m) + @conn.exec_params('INSERT INTO journal VALUES ($1, $2, $3, $4);', + [m.id, m.topic, m.timestamp, JSON.generate(m.payload)]) + end + + def find(query, limit=100, offset=0) + sql, params = query_to_sql(query) + sql = 'SELECT * FROM journal WHERE ' + sql + ' LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i] + res = @conn.exec_params(sql, params) + res.map do |row| + timestamp = DateTime.strptime(row['timestamp'], '%Y-%m-%d %H:%M:%S%z') + JournalMessage.new(id: row['id'], timestamp: timestamp, + topic: row['topic'], payload: JSON.parse(row['payload'])) + end + end + + # returns the number of messages that match the query + def count(query) + sql, params = query_to_sql(query) + sql = 'SELECT COUNT(*) FROM journal WHERE ' + sql + res = @conn.exec_params(sql, params) + res[0]['count'].to_i + end + + def drop + @conn.exec('DROP TABLE journal;') rescue nil + end + + def query_to_sql(query) + params = [] + placeholder = Proc.new do |value| + params << value + '$%d' % [params.length] + end + sql = {op: 'AND', list: []} + + # ID query OR condition + unless query.id.empty? + sql[:list] << { + op: 'OR', + list: query.id.map { |id| + 'id = ' + placeholder.call(id) + } + } + end + + # Topic query OR condition + unless query.topic.empty? + sql[:list] << { + op: 'OR', + list: query.topic.map { |topic| + 'topic ILIKE ' + placeholder.call(topic.gsub('*', '%')) + } + } + end + + # Timestamp range query AND condition + if query.timestamp[:from] or query.timestamp[:to] + list = [] + if query.timestamp[:from] + list << 'timestamp >= ' + placeholder.call(query.timestamp[:from]) + end + if query.timestamp[:to] + list << 'timestamp <= ' + placeholder.call(query.timestamp[:to]) + end + sql[:list] << { + op: 'AND', + list: list + } + end + + # Payload query + unless query.payload.empty? + list = [] + query.payload.each_pair do |key, value| + selector = 'payload' + k = key.to_s.split('.') + k.each_index { |i| + if i >= k.length-1 + selector += '->>\'%s\'' % [@conn.escape_string(k[i])] + else + selector += '->\'%s\'' % [@conn.escape_string(k[i])] + end + } + list << selector + ' = ' + placeholder.call(value) + end + sql[:list] << { + op: 'OR', + list: list + } + end + + sql = sql[:list].map { |stmt| + '(' + stmt[:list].join(' %s ' % [stmt[:op]]) + ')' + }.join(' %s ' % [sql[:op]]) + + [sql, params] end end end diff --git a/test/test_journal.rb b/test/test_journal.rb index 1e55f54b..2a522aa7 100644 --- a/test/test_journal.rb +++ b/test/test_journal.rb @@ -5,6 +5,8 @@ require 'rbot/ircbot' require 'rbot/journal' require 'rbot/journal/postgres.rb' +DAY=60*60*24 + class JournalMessageTest < Test::Unit::TestCase include Irc::Bot::Journal @@ -100,8 +102,6 @@ class QueryTest < Test::Unit::TestCase assert_false(q.topic_matches?('baz.alice.bob..foo')) end - - DAY=60*60*24 def test_matches q = Query.define do #id 'foo', 'bar' @@ -186,14 +186,80 @@ class JournalBrokerTest < Test::Unit::TestCase end -class JournalStorageTest < Test::Unit::TestCase +class JournalStoragePostgresTest < Test::Unit::TestCase include Irc::Bot::Journal - def test_storage - s = Storage::PostgresStorage.new( - uri: 'postgresql://apoc:seed42@localhost/rbot_journal') - assert_equal(true, true) + def setup + @storage = Storage::PostgresStorage.new( + uri: ENV['DB_URI'] || 'postgresql://localhost/rbot_journal', + drop: true) + end + + def teardown + @storage.drop + end + + def test_query_to_sql + q = Query.define do + id 'foo' + id 'bar', 'baz' + topic 'log.irc.*' + topic 'log.core', 'baz' + timestamp from: Time.now, to: Time.now + 60 * 10 + payload 'action': :privmsg, 'alice': 'bob' + payload 'channel': '#rbot' + payload 'foo.bar': 'baz' + end + sql = @storage.query_to_sql(q) + assert_equal("(id = $1 OR id = $2 OR id = $3) AND (topic ILIKE $4 OR topic ILIKE $5 OR topic ILIKE $6) AND (timestamp >= $7 AND timestamp <= $8) AND (payload->>'action' = $9 OR payload->>'alice' = $10 OR payload->>'channel' = $11 OR payload->'foo'->>'bar' = $12)", sql[0]) + q = Query.define do + id 'foo' + end + assert_equal('(id = $1)', @storage.query_to_sql(q)[0]) + q = Query.define do + topic 'foo.*.bar' + end + assert_equal('(topic ILIKE $1)', @storage.query_to_sql(q)[0]) + assert_equal(['foo.%.bar'], @storage.query_to_sql(q)[1]) + end + + def test_insert + # the test message to persist + m = JournalMessage.create('log.core', {foo: {bar: 'baz'}}) + # insert the test message: + @storage.insert(m) + + # find the test message by query: + q = Query.define do + topic 'log.core' + end + res = @storage.find(q) + _m = res.first + assert_equal(m, _m) # this only checks id + assert_equal(m.timestamp.strftime('%Y-%m-%d %H:%M:%S%z'), + _m.timestamp.strftime('%Y-%m-%d %H:%M:%S%z')) + assert_equal('log.core', _m.topic) + assert_equal({'foo' => {'bar' => 'baz'}}, _m.payload) + assert_equal(1, @storage.count(q)) + end + + def test_query_range + timestamp = Time.now - DAY*7 + m = JournalMessage.create('log.core', {foo: {bar: 'baz'}}, + timestamp: timestamp) + assert_equal(timestamp, m.timestamp) + + @storage.insert(m) + @storage.insert(JournalMessage.create('a.foo', {})) + @storage.insert(JournalMessage.create('b.bar', {})) + @storage.insert(JournalMessage.create('b.baz', {})) + + r = @storage.find(Query.define { timestamp(from: timestamp-DAY, to: timestamp+DAY) }) + + assert_equal(1, r.length) + assert_equal(m, r.first) + end end |