summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGiuseppe Bilotta <giuseppe.bilotta@gmail.com>2006-07-21 15:43:52 +0000
committerGiuseppe Bilotta <giuseppe.bilotta@gmail.com>2006-07-21 15:43:52 +0000
commitea96f9df9b9499614756dbb035205ecb68c5bfd8 (patch)
tree479e33e89d3a292da6190baf691ed6321c5da395
parent35601ffcbdd7dd6954bd10cc93955f9ae3825215 (diff)
Implement new message queue system. Messages in ring 0 have top priority, other messages are satisfied round-robin
-rw-r--r--lib/rbot/ircbot.rb99
-rw-r--r--lib/rbot/ircsocket.rb259
2 files changed, 279 insertions, 79 deletions
diff --git a/lib/rbot/ircbot.rb b/lib/rbot/ircbot.rb
index 6d76cc49..e82a1c98 100644
--- a/lib/rbot/ircbot.rb
+++ b/lib/rbot/ircbot.rb
@@ -272,8 +272,7 @@ class IrcBot
warning "bad nick (#{data[:nick]})"
}
@client[:ping] = proc {|data|
- # (jump the queue for pongs)
- @socket.puts "PONG #{data[:pingid]}"
+ @socket.queue "PONG #{data[:pingid]}"
}
@client[:pong] = proc {|data|
@last_ping = nil
@@ -428,8 +427,8 @@ class IrcBot
rescue => e
raise e.class, "failed to connect to IRC server at #{@config['server.name']} #{@config['server.port']}: " + e
end
- @socket.puts "PASS " + @config['server.password'] if @config['server.password']
- @socket.puts "NICK #{@nick}\nUSER #{@config['irc.user']} 4 #{@config['server.name']} :Ruby bot. (c) Tom Gilbert"
+ @socket.emergency_puts "PASS " + @config['server.password'] if @config['server.password']
+ @socket.emergency_puts "NICK #{@nick}\nUSER #{@config['irc.user']} 4 #{@config['server.name']} :Ruby bot. (c) Tom Gilbert"
start_server_pings
end
@@ -468,7 +467,6 @@ class IrcBot
error "non-net exception: #{e.class}: #{e}"
error e.inspect
error e.backtrace.join("\n")
- @socket.shutdown # now we reconnect
rescue => e
error "unexpected exception: #{e.class}: #{e}"
error e.inspect
@@ -477,11 +475,14 @@ class IrcBot
exit 2
end
- log "disconnected"
-
stop_server_pings
@channels.clear
- @socket.clearq
+ if @socket.connected?
+ @socket.clearq
+ @socket.shutdown
+ end
+
+ log "disconnected"
log "waiting to reconnect"
sleep @config['server.reconnect_wait']
@@ -495,7 +496,7 @@ class IrcBot
# Type can be PRIVMSG, NOTICE, etc, but those you should really use the
# relevant say() or notice() methods. This one should be used for IRCd
# extensions you want to use in modules.
- def sendmsg(type, where, message)
+ def sendmsg(type, where, message, chan=nil, ring=0)
# limit it according to the byterate, splitting the message
# taking into consideration the actual message length
# and all the extra stuff
@@ -504,7 +505,7 @@ class IrcBot
left = @socket.bytes_per - type.length - where.length - 4
begin
if(left >= message.length)
- sendq("#{type} #{where} :#{message}")
+ sendq "#{type} #{where} :#{message}", chan, ring
log_sent(type, where, message)
return
end
@@ -514,46 +515,88 @@ class IrcBot
message = line.slice!(lastspace, line.length) + message
message.gsub!(/^\s+/, "")
end
- sendq("#{type} #{where} :#{line}")
+ sendq "#{type} #{where} :#{line}", chan, ring
log_sent(type, where, line)
end while(message.length > 0)
end
# queue an arbitraty message for the server
- def sendq(message="")
+ def sendq(message="", chan=nil, ring=0)
# temporary
- @socket.queue(message)
+ @socket.queue(message, chan, ring)
end
# send a notice message to channel/nick +where+
- def notice(where, message)
+ def notice(where, message, mchan=nil, mring=-1)
+ if mchan == ""
+ chan = mchan
+ else
+ chan = where
+ end
+ if mring < 0
+ if where =~ /^#/
+ ring = 2
+ else
+ ring = 1
+ end
+ else
+ ring = mring
+ end
message.each_line { |line|
line.chomp!
next unless(line.length > 0)
- sendmsg("NOTICE", where, line)
+ sendmsg "NOTICE", where, line, chan, ring
}
end
# say something (PRIVMSG) to channel/nick +where+
- def say(where, message)
+ def say(where, message, mchan="", mring=-1)
+ if mchan == ""
+ chan = mchan
+ else
+ chan = where
+ end
+ if mring < 0
+ if where =~ /^#/
+ ring = 2
+ else
+ ring = 1
+ end
+ else
+ ring = mring
+ end
message.to_s.gsub(/[\r\n]+/, "\n").each_line { |line|
line.chomp!
next unless(line.length > 0)
unless((where =~ /^#/) && (@channels.has_key?(where) && @channels[where].quiet))
- sendmsg("PRIVMSG", where, line)
+ sendmsg "PRIVMSG", where, line, chan, ring
end
}
end
# perform a CTCP action with message +message+ to channel/nick +where+
- def action(where, message)
- sendq("PRIVMSG #{where} :\001ACTION #{message}\001")
+ def action(where, message, mchan="", mring=-1)
+ if mchan == ""
+ chan = mchan
+ else
+ chan = where
+ end
+ if mring < 0
+ if where =~ /^#/
+ ring = 2
+ else
+ ring = 1
+ end
+ else
+ ring = mring
+ end
+ sendq "PRIVMSG #{where} :\001ACTION #{message}\001", chan, ring
if(where =~ /^#/)
irclog "* #{@nick} #{message}", where
elsif (where =~ /^(\S*)!.*$/)
- irclog "* #{@nick}[#{where}] #{message}", $1
+ irclog "* #{@nick}[#{where}] #{message}", $1
else
- irclog "* #{@nick}[#{where}] #{message}", where
+ irclog "* #{@nick}[#{where}] #{message}", where
end
end
@@ -578,7 +621,7 @@ class IrcBot
# set topic of channel +where+ to +topic+
def topic(where, topic)
- sendq "TOPIC #{where} :#{topic}"
+ sendq "TOPIC #{where} :#{topic}", where, 2
end
# disconnect from the server and cleanup all plugins and modules
@@ -597,7 +640,7 @@ class IrcBot
debug "Clearing socket"
@socket.clearq
debug "Sending quit message"
- @socket.puts "QUIT :#{message}"
+ @socket.emergency_puts "QUIT :#{message}"
debug "Flushing socket"
@socket.flush
debug "Shutting down socket"
@@ -660,15 +703,15 @@ class IrcBot
# join a channel
def join(channel, key=nil)
if(key)
- sendq "JOIN #{channel} :#{key}"
+ sendq "JOIN #{channel} :#{key}", channel, 2
else
- sendq "JOIN #{channel}"
+ sendq "JOIN #{channel}", channel, 2
end
end
# part a channel
def part(channel, message="")
- sendq "PART #{channel} :#{message}"
+ sendq "PART #{channel} :#{message}", channel, 2
end
# attempt to change bot's nick to +name+
@@ -678,7 +721,7 @@ class IrcBot
# changing mode
def mode(channel, mode, target)
- sendq "MODE #{channel} #{mode} #{target}"
+ sendq "MODE #{channel} #{mode} #{target}", channel, 2
end
# m:: message asking for help
@@ -727,7 +770,7 @@ class IrcBot
# we want to respond to a hung server within 30 secs or so
@ping_timer = @timer.add(30) {
@last_ping = Time.now
- @socket.puts "PING :rbot"
+ @socket.queue "PING :rbot"
}
@pong_timer = @timer.add(10) {
unless @last_ping.nil?
diff --git a/lib/rbot/ircsocket.rb b/lib/rbot/ircsocket.rb
index 4ee3be23..23f29086 100644
--- a/lib/rbot/ircsocket.rb
+++ b/lib/rbot/ircsocket.rb
@@ -4,6 +4,158 @@ module Irc
require 'thread'
require 'rbot/timer'
+ class QueueRing
+ # A QueueRing is implemented as an array with elements in the form
+ # [chan, [message1, message2, ...]
+ # Note that the channel +chan+ has no actual bearing with the channels
+ # to which messages will be sent
+
+ def initialize
+ @storage = Array.new
+ @last_idx = -1
+ end
+
+ def clear
+ @storage.clear
+ @last_idx = -1
+ end
+
+ def length
+ @storage.length
+ end
+
+ def empty?
+ @storage.empty?
+ end
+
+ def push(mess, chan)
+ cmess = @storage.assoc(chan)
+ if cmess
+ idx = @storage.index(cmess)
+ cmess[1] << mess
+ @storage[idx] = cmess
+ else
+ @storage << [chan, [mess]]
+ end
+ end
+
+ def next
+ if empty?
+ warning "trying to access empty ring"
+ return nil
+ end
+ save_idx = @last_idx
+ @last_idx = (@last_idx + 1) % @storage.length
+ mess = @storage[@last_idx][1].first
+ @last_idx = save_idx
+ mess
+ end
+
+ def shift
+ if empty?
+ warning "trying to access empty ring"
+ return nil
+ end
+ @last_idx = (@last_idx + 1) % @storage.length
+ mess = @storage[@last_idx][1].shift
+ @storage.delete(@storage[@last_idx]) if @storage[@last_idx][1] == []
+ mess
+ end
+
+ end
+
+ class MessageQueue
+ def initialize
+ # a MessageQueue is an array of QueueRings
+ # rings have decreasing priority, so messages in ring 0
+ # are more important than messages in ring 1, and so on
+ @rings = Array.new(3) { |i|
+ if i > 0
+ QueueRing.new
+ else
+ # ring 0 is special in that if it's not empty, it will
+ # be popped. IOW, ring 0 can starve the other rings
+ # ring 0 is strictly FIFO and is therefore implemented
+ # as an array
+ Array.new
+ end
+ }
+ # the other rings are satisfied round-robin
+ @last_ring = 0
+ end
+
+ def clear
+ @rings.each { |r|
+ r.clear
+ }
+ @last_ring = 0
+ end
+
+ def push(mess, chan=nil, cring=0)
+ ring = cring
+ if ring == 0
+ warning "message #{mess} at ring 0 has channel #{chan}: channel will be ignored" if !chan.nil?
+ @rings[0] << mess
+ else
+ error "message #{mess} at ring #{ring} must have a channel" if chan.nil?
+ @rings[ring].push mess, chan
+ end
+ end
+
+ def empty?
+ @rings.each { |r|
+ return false unless r.empty?
+ }
+ return true
+ end
+
+ def length
+ len = 0
+ @rings.each { |r|
+ len += r.length
+ }
+ len
+ end
+
+ def next
+ if empty?
+ warning "trying to access empty ring"
+ return nil
+ end
+ if !@rings[0].empty?
+ mess = @rings[0].first
+ else
+ save_ring = @last_ring
+ (@rings.length - 1).times {
+ @last_ring = ((@last_ring + 1) % (@rings.length - 1)) + 1
+ if !@rings[@last_ring].empty?
+ mess = @rings[@last_ring].next
+ break
+ end
+ }
+ @last_ring = save_ring
+ end
+ return mess
+ end
+
+ def shift
+ if empty?
+ warning "trying to access empty ring"
+ return nil
+ end
+ if !@rings[0].empty?
+ return @rings[0].shift
+ end
+ (@rings.length - 1).times {
+ @last_ring = ((@last_ring + 1) % (@rings.length - 1)) + 1
+ if !@rings[@last_ring].empty?
+ return @rings[@last_ring].shift
+ end
+ }
+ end
+
+ end
+
# wrapped TCPSocket for communication with the server.
# emulates a subset of TCPSocket functionality
class IrcSocket
@@ -87,8 +239,8 @@ module Irc
# open a TCP connection to the server
def connect
if connected?
- debug "reconnecting socket while connected"
- shutdown
+ warning "reconnecting while connected"
+ return
end
if(@host)
begin
@@ -105,7 +257,7 @@ module Irc
end
@qthread = false
@qmutex = Mutex.new
- @sendq = Array.new
+ @sendq = MessageQueue.new
end
def sendq_delay=(newfreq)
@@ -142,17 +294,6 @@ module Irc
if @throttle_bytes > 0
# If we ever reach the limit, we halve the actual allowed byterate
# until we manage to reset the throttle.
- # I don't know if this is the best way, though, because the real
- # problem is probably non-queued messages like PINGs and PONGs.
- # A better solution would probably be to have two queues,
- # one for priority messages and another one for normal messages.
- # Even better, we should have:
- # * one queue for server stuff
- # * one for each channel
- # * one for each private communication
- # The server queue would have priority, everything else would be served
- # round-robin, so that someone making the bot flood one channel wouldn't
- # prevent the bot from working on other channels (or private communications)
if @throttle_bytes >= @bytes_per
@throttle_div = 0.5
end
@@ -168,9 +309,12 @@ module Irc
@throttle_bytes += more
end
- # used to send lines to the remote IRCd
+ # used to send lines to the remote IRCd by skipping the queue
# message: IRC message to send
- def puts(message)
+ # it should only be used for stuff that *must not* be queued,
+ # i.e. the initial PASS, NICK and USER command
+ # or the final QUIT message
+ def emergency_puts(message)
@qmutex.synchronize do
# debug "In puts - got mutex"
puts_critical(message)
@@ -195,60 +339,64 @@ module Irc
end
end
- def queue(msg)
+ def queue(msg, chan=nil, ring=0)
if @sendq_delay > 0
@qmutex.synchronize do
- @sendq.push msg
+ @sendq.push msg, chan, ring
+ @timer.start
end
- @timer.start
else
# just send it if queueing is disabled
- self.puts(msg)
+ self.emergency_puts(msg)
end
end
# pop a message off the queue, send it
def spool
- if @sendq.empty?
- @timer.stop
- return
- end
- now = Time.new
- if (now >= (@last_send + @sendq_delay))
- # reset burst counter after @sendq_delay has passed
- @burst = 0
- debug "in spool, resetting @burst"
- elsif (@burst >= @sendq_burst)
- # nope. can't send anything, come back to us next tick...
- @timer.start
- return
- end
@qmutex.synchronize do
- debug "(can send #{@sendq_burst - @burst} lines, there are #{@sendq.length} to send)"
+ debug "in spooler"
+ if @sendq.empty?
+ @timer.stop
+ return
+ end
+ now = Time.new
+ if (now >= (@last_send + @sendq_delay))
+ # reset burst counter after @sendq_delay has passed
+ debug "resetting @burst"
+ @burst = 0
+ elsif (@burst >= @sendq_burst)
+ # nope. can't send anything, come back to us next tick...
+ debug "can't send yet"
+ @timer.start
+ return
+ end
+ # debug "Queue: #{@sendq.inspect}"
+ debug "can send #{@sendq_burst - @burst} lines, there are #{@sendq.length} to send"
(@sendq_burst - @burst).times do
break if @sendq.empty?
- mess = @sendq[0]
+ mess = @sendq.next
+ # debug "Next message is #{mess.inspect}"
if @throttle_bytes == 0 or mess.length+@throttle_bytes < @bytes_per
- debug "(flood protection: sending message of length #{mess.length})"
- debug "(byterate: #{byterate}, throttle bytes: #{@throttle_bytes})"
+ debug "flood protection: sending message of length #{mess.length}"
+ debug "(byterate: #{byterate}, throttle bytes: #{@throttle_bytes})"
puts_critical(@sendq.shift)
else
- debug "(flood protection: throttling message of length #{mess.length})"
- debug "(byterate: #{byterate}, throttle bytes: #{@throttle_bytes})"
+ debug "flood protection: throttling message of length #{mess.length}"
+ debug "(byterate: #{byterate}, throttle bytes: #{@throttle_bytes})"
run_throttle
break
end
end
- end
- if @sendq.empty?
- @timer.stop
+ if @sendq.empty?
+ @timer.stop
+ end
end
end
def clearq
if @sock
- unless @sendq.empty?
- @qmutex.synchronize do
+ @qmutex.synchronize do
+ unless @sendq.empty?
@sendq.clear
end
end
@@ -271,6 +419,7 @@ module Irc
def shutdown(how=2)
@sock.shutdown(how) unless @sock.nil?
@sock = nil
+ @burst = 0
end
private
@@ -278,12 +427,20 @@ module Irc
# same as puts, but expects to be called with a mutex held on @qmutex
def puts_critical(message)
# debug "in puts_critical"
- debug "SEND: #{message.inspect}"
- @sock.send(message + "\n",0)
- @last_send = Time.new
- @lines_sent += 1
- @burst += 1
- run_throttle(message.length + 1)
+ begin
+ debug "SEND: #{message.inspect}"
+ if @sock.nil?
+ error "SEND attempted on closed socket"
+ else
+ @sock.send(message + "\n",0)
+ @last_send = Time.new
+ @lines_sent += 1
+ @burst += 1
+ run_throttle(message.length + 1)
+ end
+ rescue => e
+ error "SEND failed: #{e.inspect}"
+ end
end
end