summaryrefslogtreecommitdiff
path: root/lib/rbot/journal/postgres.rb
diff options
context:
space:
mode:
authorMatthias Hecker <apoc@geekosphere.org>2015-06-14 18:27:32 +0200
committerMatthias Hecker <apoc@geekosphere.org>2015-06-14 18:27:32 +0200
commit65de5ebca22a2d17729a63589240c734b5ca4de1 (patch)
treea589c55ffadfb719d6c00ef89023bb8dc10c012c /lib/rbot/journal/postgres.rb
parent830c8d7d6be5d0bda2df05ff5a2383362e39f8d4 (diff)
journal: finishing postgres adapter
Diffstat (limited to 'lib/rbot/journal/postgres.rb')
-rw-r--r--lib/rbot/journal/postgres.rb116
1 files changed, 115 insertions, 1 deletions
diff --git a/lib/rbot/journal/postgres.rb b/lib/rbot/journal/postgres.rb
index 85514fb2..57c539a7 100644
--- a/lib/rbot/journal/postgres.rb
+++ b/lib/rbot/journal/postgres.rb
@@ -5,12 +5,20 @@
# :title: journal backend for postgresql
require 'pg'
+require 'json'
module Irc
class Bot
module Journal
+
+ class Query
+ end
+
module Storage
+
class PostgresStorage < AbstractStorage
+ attr_reader :conn
+
def initialize(opts={})
@uri = opts[:uri] || 'postgresql://localhost/rbot_journal'
@conn = PG.connect(@uri)
@@ -25,12 +33,118 @@ module Journal
'PostgreSQL Version too old: %s, supported: >= 9.3' % [@version])
end
@jsonb = (version >= 940)
- log 'journal storage: no jsonb support, consider upgrading postgres'
+ log 'journal storage: no jsonb support, consider upgrading postgres' unless @jsonb
+ drop if opts[:drop]
create_table
end
def create_table
+ @conn.exec('
+ CREATE TABLE IF NOT EXISTS journal
+ (id UUID PRIMARY KEY,
+ topic TEXT NOT NULL,
+ timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
+ payload %s NOT NULL)' % [@jsonb ? 'JSONB' : 'JSON'])
+ end
+
+ def insert(m)
+ @conn.exec_params('INSERT INTO journal VALUES ($1, $2, $3, $4);',
+ [m.id, m.topic, m.timestamp, JSON.generate(m.payload)])
+ end
+
+ def find(query, limit=100, offset=0)
+ sql, params = query_to_sql(query)
+ sql = 'SELECT * FROM journal WHERE ' + sql + ' LIMIT %d OFFSET %d' % [limit.to_i, offset.to_i]
+ res = @conn.exec_params(sql, params)
+ res.map do |row|
+ timestamp = DateTime.strptime(row['timestamp'], '%Y-%m-%d %H:%M:%S%z')
+ JournalMessage.new(id: row['id'], timestamp: timestamp,
+ topic: row['topic'], payload: JSON.parse(row['payload']))
+ end
+ end
+
+ # returns the number of messages that match the query
+ def count(query)
+ sql, params = query_to_sql(query)
+ sql = 'SELECT COUNT(*) FROM journal WHERE ' + sql
+ res = @conn.exec_params(sql, params)
+ res[0]['count'].to_i
+ end
+
+ def drop
+ @conn.exec('DROP TABLE journal;') rescue nil
+ end
+
+ def query_to_sql(query)
+ params = []
+ placeholder = Proc.new do |value|
+ params << value
+ '$%d' % [params.length]
+ end
+ sql = {op: 'AND', list: []}
+
+ # ID query OR condition
+ unless query.id.empty?
+ sql[:list] << {
+ op: 'OR',
+ list: query.id.map { |id|
+ 'id = ' + placeholder.call(id)
+ }
+ }
+ end
+
+ # Topic query OR condition
+ unless query.topic.empty?
+ sql[:list] << {
+ op: 'OR',
+ list: query.topic.map { |topic|
+ 'topic ILIKE ' + placeholder.call(topic.gsub('*', '%'))
+ }
+ }
+ end
+
+ # Timestamp range query AND condition
+ if query.timestamp[:from] or query.timestamp[:to]
+ list = []
+ if query.timestamp[:from]
+ list << 'timestamp >= ' + placeholder.call(query.timestamp[:from])
+ end
+ if query.timestamp[:to]
+ list << 'timestamp <= ' + placeholder.call(query.timestamp[:to])
+ end
+ sql[:list] << {
+ op: 'AND',
+ list: list
+ }
+ end
+
+ # Payload query
+ unless query.payload.empty?
+ list = []
+ query.payload.each_pair do |key, value|
+ selector = 'payload'
+ k = key.to_s.split('.')
+ k.each_index { |i|
+ if i >= k.length-1
+ selector += '->>\'%s\'' % [@conn.escape_string(k[i])]
+ else
+ selector += '->\'%s\'' % [@conn.escape_string(k[i])]
+ end
+ }
+ list << selector + ' = ' + placeholder.call(value)
+ end
+ sql[:list] << {
+ op: 'OR',
+ list: list
+ }
+ end
+
+ sql = sql[:list].map { |stmt|
+ '(' + stmt[:list].join(' %s ' % [stmt[:op]]) + ')'
+ }.join(' %s ' % [sql[:op]])
+
+ [sql, params]
end
end
end