From 3569f75d893d43ce83369edd59948a45a758afb6 Mon Sep 17 00:00:00 2001 From: Giuseppe Bilotta Date: Mon, 29 Jan 2007 22:36:33 +0000 Subject: Use the bot timer instead of Threads for periodic rss retrievals --- data/rbot/plugins/rss.rb | 187 +++++++++++++++++++++-------------------------- 1 file changed, 85 insertions(+), 102 deletions(-) (limited to 'data/rbot') diff --git a/data/rbot/plugins/rss.rb b/data/rbot/plugins/rss.rb index 745dad0d..b5a44bd7 100644 --- a/data/rbot/plugins/rss.rb +++ b/data/rbot/plugins/rss.rb @@ -107,12 +107,8 @@ class RSSFeedsPlugin < Plugin :default => 300, :validate => Proc.new{|v| v > 30}, :desc => "How many seconds to sleep before checking RSS feeds again") - @@watchThreads = Hash.new - @@mutex = Mutex.new - def initialize super - kill_threads if @registry.has_key?(:feeds) @feeds = @registry[:feeds] @feeds.keys.grep(/[A-Z]/) { |k| @@ -125,6 +121,7 @@ class RSSFeedsPlugin < Plugin else @feeds = Hash.new end + @watch = Hash.new rewatch_rss end @@ -137,21 +134,28 @@ class RSSFeedsPlugin < Plugin end def cleanup - kill_threads + stop_watches end def save @registry[:feeds] = @feeds end - def kill_threads - @@mutex.synchronize { - # Abort all running threads. - @@watchThreads.each { |url, thread| - debug "Killing thread for #{url}" - thread.kill - } - @@watchThreads = Hash.new + def stop_watch(handle) + if @watch.has_key?(handle) + begin + debug "Stopping watch #{handle}" + @bot.timer.remove(@watch[handle]) + @watch.delete(handle) + rescue => e + report_problem("Failed to stop watch for #{handle}", e, nil) + end + end + end + + def stop_watches + @watch.each_key { |k| + stop_watch(k) } end @@ -216,9 +220,7 @@ class RSSFeedsPlugin < Plugin m.reply "lemme fetch it..." title = items = nil - @@mutex.synchronize { - title, items = fetchRss(feed, m) - } + title, items = fetchRss(feed, m) return unless items # We sort the feeds in freshness order (newer ones first) @@ -248,13 +250,11 @@ class RSSFeedsPlugin < Plugin def list_rss(m, params) wanted = params[:handle] reply = String.new - @@mutex.synchronize { - @feeds.each { |handle, feed| - next if wanted and !handle.match(/#{wanted}/i) - reply << "#{feed.handle}: #{feed.url} (in format: #{feed.type ? feed.type : 'default'})" - (reply << " (watched)") if feed.watched_by?(m.replyto) - reply << "\n" - } + @feeds.each { |handle, feed| + next if wanted and !handle.match(/#{wanted}/i) + reply << "#{feed.handle}: #{feed.url} (in format: #{feed.type ? feed.type : 'default'})" + (reply << " (watched)") if feed.watched_by?(m.replyto) + reply << "\n" } if reply.empty? reply = "no feeds found" @@ -266,12 +266,10 @@ class RSSFeedsPlugin < Plugin def watched_rss(m, params) wanted = params[:handle] reply = String.new - @@mutex.synchronize { - watchlist.each { |handle, feed| - next if wanted and !handle.match(/#{wanted}/i) - next unless feed.watched_by?(m.replyto) - reply << "#{feed.handle}: #{feed.url} (in format: #{feed.type ? feed.type : 'default'})\n" - } + watchlist.each { |handle, feed| + next if wanted and !handle.match(/#{wanted}/i) + next unless feed.watched_by?(m.replyto) + reply << "#{feed.handle}: #{feed.url} (in format: #{feed.type ? feed.type : 'default'})\n" } if reply.empty? reply = "no watched feeds" @@ -296,9 +294,7 @@ class RSSFeedsPlugin < Plugin m.reply "You must specify both a handle and an url to add an RSS feed" return end - @@mutex.synchronize { - @feeds[handle.downcase] = RssBlob.new(url,handle,type) - } + @feeds[handle.downcase] = RssBlob.new(url,handle,type) reply = "Added RSS #{url} named #{handle}" if type reply << " (format: #{type})" @@ -313,9 +309,7 @@ class RSSFeedsPlugin < Plugin m.reply "someone else is watching #{feed.handle}, I won't remove it from my list" return end - @@mutex.synchronize { - @feeds.delete(feed.handle.downcase) - } + @feeds.delete(feed.handle.downcase) m.okay unless pass return end @@ -343,19 +337,14 @@ class RSSFeedsPlugin < Plugin if url add_rss(m, params) end - feed = nil - @@mutex.synchronize { - feed = @feeds.fetch(handle.downcase, nil) - } + feed = @feeds.fetch(handle.downcase, nil) if feed - @@mutex.synchronize { - if feed.add_watch(m.replyto) - watchRss(feed, m) - m.okay - else - m.reply "Already watching #{feed.handle}" - end - } + if feed.add_watch(m.replyto) + watchRss(feed, m) + m.okay + else + m.reply "Already watching #{feed.handle}" + end else m.reply "Couldn't watch feed #{handle} (no such feed found)" end @@ -374,19 +363,13 @@ class RSSFeedsPlugin < Plugin m.reply("#{m.replyto} wasn't watching #{feed.handle}") unless pass end if !feed.watched? - @@mutex.synchronize { - if @@watchThreads[handle].kind_of? Thread - @@watchThreads[handle].kill - debug "rmwatch: Killed thread for #{handle}" - @@watchThreads.delete(handle) - end - } + stop_watch(handle) end return feed end def rewatch_rss(m=nil, params=nil) - kill_threads + stop_watches # Read watches from list. watchlist.each{ |handle, feed| @@ -397,61 +380,63 @@ class RSSFeedsPlugin < Plugin private def watchRss(feed, m=nil) - if @@watchThreads.has_key?(feed.handle) + if @watch.has_key?(feed.handle) report_problem("watcher thread for #{feed.handle} is already running", nil, m) return end - @@watchThreads[feed.handle] = Thread.new do + status = Hash.new + status[:oldItems] = [] + status[:firstRun] = true + status[:failures] = 0 + @watch[feed.handle] = @bot.timer.add(@bot.config['rss.thread_sleep'], status) { debug "watcher for #{feed} started" - oldItems = [] - firstRun = true - failures = 0 - loop do - begin - debug "fetching #{feed}" - title = newItems = nil - @@mutex.synchronize { - title, newItems = fetchRss(feed) - } - unless newItems - debug "no items in feed #{feed}" - failures +=1 + oldItems = status[:oldItems] + firstRun = status[:firstRun] + failures = status[:failures] + begin + debug "fetching #{feed}" + title = newItems = nil + title, newItems = fetchRss(feed) + unless newItems + debug "no items in feed #{feed}" + failures +=1 + else + debug "Checking if new items are available for #{feed}" + if firstRun + debug "First run, we'll see next time" + firstRun = false else - debug "Checking if new items are available for #{feed}" - if firstRun - debug "First run, we'll see next time" - firstRun = false - else - otxt = oldItems.map { |item| item.to_s } - dispItems = newItems.reject { |item| - otxt.include?(item.to_s) + otxt = oldItems.map { |item| item.to_s } + dispItems = newItems.reject { |item| + otxt.include?(item.to_s) + } + if dispItems.length > 0 + debug "Found #{dispItems.length} new items in #{feed}" + # When displaying watched feeds, publish them from older to newer + dispItems.reverse.each { |item| + printFormattedRss(feed, item) } - if dispItems.length > 0 - debug "Found #{dispItems.length} new items in #{feed}" - # When displaying watched feeds, publish them from older to newer - dispItems.reverse.each { |item| - @@mutex.synchronize { - printFormattedRss(feed, item) - } - } - else - debug "No new items found in #{feed}" - end + else + debug "No new items found in #{feed}" end - oldItems = newItems.dup end - rescue Exception => e - error "Error watching #{feed}: #{e.inspect}" - debug e.backtrace.join("\n") - failures += 1 + oldItems = newItems.dup end - - seconds = @bot.config['rss.thread_sleep'] * (failures + 1) - seconds += seconds * (rand(100)-50)/100 - debug "watcher for #{feed} going to sleep #{seconds} seconds.." - sleep seconds + rescue Exception => e + error "Error watching #{feed}: #{e.inspect}" + debug e.backtrace.join("\n") + failures += 1 end - end + + status[:oldItems] = oldItems + status[:firstRun] = firstRun + status[:failures] = failures + + seconds = @bot.config['rss.thread_sleep'] * (failures + 1) + seconds += seconds * (rand(100)-50)/100 + debug "watcher for #{feed} going to sleep #{seconds} seconds.." + @bot.timer.reschedule(@watch[feed.handle], seconds) + } end def printFormattedRss(feed, item, opts=nil) @@ -508,8 +493,6 @@ class RSSFeedsPlugin < Plugin def fetchRss(feed, m=nil) begin # Use 60 sec timeout, cause the default is too low - # Do not use get_cached for RSS until we have proper cache handling - # xml = @bot.httputil.get_cached(feed.url,60,60) xml = @bot.httputil.get_cached(feed.url, 60, 60) rescue URI::InvalidURIError, URI::BadURIError => e report_problem("invalid rss feed #{feed.url}", e, m) -- cgit v1.2.3