diff options
3 files changed, 273 insertions, 43 deletions
diff --git a/lib/rbot/journal/mongo.rb b/lib/rbot/journal/mongo.rb
new file mode 100644
index 00000000..24e9cfcc
--- /dev/null
+++ b/lib/rbot/journal/mongo.rb
@@ -0,0 +1,120 @@
+# encoding: UTF-8
+#-- vim:sw=2:et
+# :title: journal backend for mongoDB
+require 'mongo'
+require 'json'
+module Irc
+class Bot
+module Journal
+ module Storage
+ class MongoStorage < AbstractStorage
+ attr_reader :client
+ def initialize(opts={})
+ Mongo::Logger.logger.level = Logger::WARN
+ @uri = opts[:uri] || 'mongodb://'
+ @client =
+ @collection = @client['journal']
+ log 'journal storage: mongodb connected to ' + @uri
+ drop if opts[:drop]
+ @collection.indexes.create_one({topic: 1})
+ end
+ def ensure_index(key)
+ @collection.indexes.create_one({'payload.'+key => 1})
+ end
+ def insert(m)
+ @collection.insert_one({
+ '_id' =>,
+ 'topic' => m.topic,
+ 'timestamp' => m.timestamp,
+ 'payload' => m.payload
+ })
+ end
+ def find(query=nil, limit=100, offset=0)
+ query_cursor(query).skip(offset).limit(limit).map do |document|
+ document['_id'], timestamp: document['timestamp'].localtime,
+ topic: document['topic'], payload: document['payload'].to_h)
+ end
+ end
+ # returns the number of messages that match the query
+ def count(query=nil)
+ query_cursor(query).count
+ end
+ def remove(query=nil)
+ query_cursor(query).delete_many
+ end
+ def drop
+ @collection.drop
+ end
+ def query_cursor(query)
+ unless query
+ return @collection.find()
+ end
+ query_and = []
+ # ID query OR condition
+ unless
+ query_and << {
+ '$or' => { |_id|
+ {'_id' => _id}
+ }
+ }
+ end
+ unless query.topic.empty?
+ query_and << {
+ '$or' => { |topic|
+ if topic.include?('*')
+ pattern = topic.gsub('.', '\.').gsub('*', '.*')
+ {'topic' => {'$regex' => pattern}}
+ else
+ {'topic' => topic}
+ end
+ }
+ }
+ end
+ if query.timestamp[:from] or query.timestamp[:to]
+ where = {}
+ if query.timestamp[:from]
+ where['$gte'] = query.timestamp[:from]
+ end
+ if query.timestamp[:to]
+ where['$lte'] = query.timestamp[:to]
+ end
+ query_and << {'timestamp' => where}
+ end
+ unless query.payload.empty?
+ query_and << {
+ '$or' => { |key, value|
+ key = 'payload.' + key
+ {key => value}
+ }
+ }
+ end
+ @collection.find({
+ '$and' => query_and
+ })
+ end
+ end
+ end
+end # Journal
+end # Bot
+end # Irc
diff --git a/lib/rbot/journal/postgres.rb b/lib/rbot/journal/postgres.rb
index 52f5fb36..e63aefee 100644
--- a/lib/rbot/journal/postgres.rb
+++ b/lib/rbot/journal/postgres.rb
@@ -7,6 +7,28 @@
require 'pg'
require 'json'
+# as a replacement for CREATE INDEX IF NOT EXIST that is not in postgres.
+# define function to be able to create an index in case it doesnt exist:
+# source:
+CREATE OR REPLACE FUNCTION create_index(table_name text, index_name text, column_name text) RETURNS void AS $$
+ l_count integer;
+ select count(*)
+ into l_count
+ from pg_indexes
+ where schemaname = 'public'
+ and tablename = lower(table_name)
+ and indexname = lower(index_name);
+ if l_count = 0 then
+ execute 'create index ' || index_name || ' on ' || table_name || '(' || column_name || ')';
+ end if;
+$$ LANGUAGE plpgsql;
module Irc
class Bot
module Journal
@@ -17,9 +39,10 @@ module Journal
attr_reader :conn
def initialize(opts={})
- @uri = opts[:uri] || 'postgresql://localhost/rbot_journal'
+ @uri = opts[:uri] || 'postgresql://localhost/rbot'
@conn = PG.connect(@uri)
@conn.exec('set client_min_messages = warning')
+ @conn.exec(CREATE_INDEX)
@version = @conn.exec('SHOW server_version;')[0]['server_version']
@version.gsub!(/^(\d+\.\d+)$/, '\1.0')
@@ -35,6 +58,7 @@ module Journal
drop if opts[:drop]
+ create_index('topic_index', 'topic')
def create_table
@@ -46,6 +70,23 @@ module Journal
payload %s NOT NULL)' % [@jsonb ? 'JSONB' : 'JSON'])
+ def create_index(index_name, column_name)
+ debug 'journal postges backend: create index %s for %s' % [
+ index_name, column_name]
+ @conn.exec_params('SELECT create_index($1, $2, $3)', [
+ 'journal', index_name, column_name])
+ end
+ def create_payload_index(key)
+ index_name = 'idx_payload_' + key.gsub('.', '_')
+ column = sql_payload_selector(key)
+ create_index(index_name, column)
+ end
+ def ensure_index(key)
+ create_payload_index(key)
+ end
def insert(m)
@conn.exec_params('INSERT INTO journal VALUES ($1, $2, $3, $4);',
[, m.topic, m.timestamp, JSON.generate(m.payload)])
@@ -95,6 +136,19 @@ module Journal
@conn.exec('DROP TABLE journal;') rescue nil
+ def sql_payload_selector(key)
+ selector = 'payload'
+ k = key.to_s.split('.')
+ k.each_index { |i|
+ if i >= k.length-1
+ selector += '->>\'%s\'' % [@conn.escape_string(k[i])]
+ else
+ selector += '->\'%s\'' % [@conn.escape_string(k[i])]
+ end
+ }
+ selector
+ end
def query_to_sql(query)
params = []
placeholder = do |value|
@@ -142,15 +196,7 @@ module Journal
unless query.payload.empty?
list = []
query.payload.each_pair do |key, value|
- selector = 'payload'
- k = key.to_s.split('.')
- k.each_index { |i|
- if i >= k.length-1
- selector += '->>\'%s\'' % [@conn.escape_string(k[i])]
- else
- selector += '->\'%s\'' % [@conn.escape_string(k[i])]
- end
- }
+ selector = sql_payload_selector(key)
list << selector + ' = ' +
sql[:list] << {
diff --git a/test/test_journal.rb b/test/test_journal.rb
index 9e8f0654..b9f5c612 100644
--- a/test/test_journal.rb
+++ b/test/test_journal.rb
@@ -4,6 +4,9 @@ require 'test/unit'
require 'rbot/ircbot'
require 'rbot/journal'
require 'rbot/journal/postgres.rb'
+require 'rbot/journal/mongo.rb'
+require 'benchmark'
@@ -186,47 +189,17 @@ class JournalBrokerTest < Test::Unit::TestCase
-class JournalStoragePostgresTest < Test::Unit::TestCase
+module JournalStorageTestMixin
include Irc::Bot::Journal
- def setup
- @storage =
- uri: ENV['DB_URI'] || 'postgresql://localhost/rbot_journal',
- drop: true)
- end
def teardown
- def test_query_to_sql
- q = Query.define do
- id 'foo'
- id 'bar', 'baz'
- topic 'log.irc.*'
- topic 'log.core', 'baz'
- timestamp from:, to: + 60 * 10
- payload 'action': :privmsg, 'alice': 'bob'
- payload 'channel': '#rbot'
- payload '': 'baz'
- end
- sql = @storage.query_to_sql(q)
- assert_equal("(id = $1 OR id = $2 OR id = $3) AND (topic ILIKE $4 OR topic ILIKE $5 OR topic ILIKE $6) AND (timestamp >= $7 AND timestamp <= $8) AND (payload->>'action' = $9 OR payload->>'alice' = $10 OR payload->>'channel' = $11 OR payload->'foo'->>'bar' = $12)", sql[0])
- q = Query.define do
- id 'foo'
- end
- assert_equal('(id = $1)', @storage.query_to_sql(q)[0])
- q = Query.define do
- topic 'foo.*.bar'
- end
- assert_equal('(topic ILIKE $1)', @storage.query_to_sql(q)[0])
- assert_equal([''], @storage.query_to_sql(q)[1])
- end
def test_operations
# insertion
- m = JournalMessage.create('log.core', {foo: {bar: 'baz'}})
+ m = JournalMessage.create('log.core', {foo: {bar: 'baz', qux: 42}})
# query by id
@@ -239,7 +212,7 @@ class JournalStoragePostgresTest < Test::Unit::TestCase
res.first.timestamp.strftime('%Y-%m-%d %H:%M:%S%z'))
# check if payload was returned correctly:
- assert_equal({'foo' => {'bar' => 'baz'}}, res.first.payload)
+ assert_equal({'foo' => {'bar' => 'baz', 'qux' => 42}}, res.first.payload)
# query by topic
assert_equal(m, @storage.find(Query.define { topic('log.core') }).first)
@@ -304,5 +277,96 @@ class JournalStoragePostgresTest < Test::Unit::TestCase
assert_equal(1, journal.count)
+ NUM=150_000
+ def test_benchmark
+ puts
+ assert_equal(0, @storage.count)
+ # prepare messages to insert, we benchmark the storage backend not ruby
+ num = 0
+ messages = (0...NUM).map do
+ num += 1
+ JournalMessage.create(
+ 'test.topic.num_'+num.to_s, {answer: {number: '42', word: 'forty-two'}})
+ end
+ # iter is the number of operations performed WITHIN block
+ def benchmark(label, iter, &block)
+ time = Benchmark.realtime do
+ yield
+ end
+ puts label + ' %d iterations, duration: %.3fms (%.3fms / iteration)' % [iter, time*1000, (time*1000) / iter]
+ end
+ benchmark(@storage.class.to_s+'~insert', messages.length) do
+ messages.each { |m|
+ @storage.insert(m)
+ }
+ end
+ benchmark(@storage.class.to_s+'~find_by_id', messages.length) do
+ messages.each { |m|
+ @storage.find(Query.define { id })
+ }
+ end
+ benchmark(@storage.class.to_s+'~find_by_topic', messages.length) do
+ messages.each { |m|
+ @storage.find(Query.define { topic m.topic })
+ }
+ end
+ benchmark(@storage.class.to_s+'~find_by_topic_wildcard', messages.length) do
+ messages.each { |m|
+ @storage.find(Query.define { topic m.topic.gsub('topic', '*') })
+ }
+ end
+ end
+class JournalStoragePostgresTest < Test::Unit::TestCase
+ include JournalStorageTestMixin
+ def setup
+ @storage =
+ uri: ENV['DB_URI'] || 'postgresql://localhost/rbot_journal',
+ drop: true)
+ end
+ def test_query_to_sql
+ q = Query.define do
+ id 'foo'
+ id 'bar', 'baz'
+ topic 'log.irc.*'
+ topic 'log.core', 'baz'
+ timestamp from:, to: + 60 * 10
+ payload 'action': :privmsg, 'alice': 'bob'
+ payload 'channel': '#rbot'
+ payload '': 'baz'
+ end
+ sql = @storage.query_to_sql(q)
+ assert_equal("(id = $1 OR id = $2 OR id = $3) AND (topic ILIKE $4 OR topic ILIKE $5 OR topic ILIKE $6) AND (timestamp >= $7 AND timestamp <= $8) AND (payload->>'action' = $9 OR payload->>'alice' = $10 OR payload->>'channel' = $11 OR payload->'foo'->>'bar' = $12)", sql[0])
+ q = Query.define do
+ id 'foo'
+ end
+ assert_equal('(id = $1)', @storage.query_to_sql(q)[0])
+ q = Query.define do
+ topic 'foo.*.bar'
+ end
+ assert_equal('(topic ILIKE $1)', @storage.query_to_sql(q)[0])
+ assert_equal([''], @storage.query_to_sql(q)[1])
+ end
+class JournalStorageMongoTest < Test::Unit::TestCase
+ include JournalStorageTestMixin
+ def setup
+ @storage =
+ drop: true)
+ end