diff options
Diffstat (limited to 'src/modules/extra/m_pgsql.cpp')
-rw-r--r-- | src/modules/extra/m_pgsql.cpp | 161 |
1 files changed, 103 insertions, 58 deletions
diff --git a/src/modules/extra/m_pgsql.cpp b/src/modules/extra/m_pgsql.cpp index c44c66bc8..14e32ac36 100644 --- a/src/modules/extra/m_pgsql.cpp +++ b/src/modules/extra/m_pgsql.cpp @@ -57,14 +57,6 @@ typedef std::map<std::string, SQLConn*> ConnMap; */ enum SQLstatus { CREAD, CWRITE, WREAD, WWRITE }; -inline std::string pop_front_r(std::deque<std::string> &d) -{ - std::string r = d.front(); - d.pop_front(); - return r; - -} - /** QueryQueue, a queue of queries waiting to be executed. * This maintains two queues internally, one for 'priority' * queries and one for less important ones. Each queue has @@ -74,21 +66,38 @@ inline std::string pop_front_r(std::deque<std::string> &d) * queries in the priority queue they will be executed first, * 'unimportant' queries will only be executed when the * priority queue is empty. + * + * These are lists of SQLresult so we can, from the moment the + * SQLrequest is recieved, be beginning to construct the result + * object. The copy in the deque can then be submitted in-situ + * and finally deleted from this queue. No copies of the SQLresult :) + * + * Because we work on the SQLresult in-situ, we need a way of accessing the + * result we are currently processing, QueryQueue::front(), but that call + * needs to always return the same element until that element is removed + * from the queue, this is what the 'which' variable is. New queries are + * always added to the back of one of the two queues, but if when front() + * is first called then the priority queue is empty then front() will return + * a query from the normal queue, but if a query is then added to the priority + * queue then front() must continue to return the front of the *normal* queue + * until pop() is called. */ class QueryQueue { private: - std::deque<std::string> priority; /* The priority queue */ - std::deque<std::string> normal; /* The 'normal' queue */ + std::deque<SQLresult> priority; /* The priority queue */ + std::deque<SQLresult> normal; /* The 'normal' queue */ + enum { PRI, NOR, NON } which; /* Which queue the currently active element is at the front of */ public: QueryQueue() + : which(NON) { } - void push_back(const std::string &q, bool pri = false) + void push(const Query &q, bool pri = false) { log(DEBUG, "QueryQueue::push_back(): Adding %s query to queue: %s", ((pri) ? "priority" : "non-priority"), q.c_str()); @@ -98,21 +107,47 @@ public: normal.push_back(q); } - inline std::string pop_front() + void pop() { - std::string res; - - if(priority.size()) + if((which == PRI) && priority.size()) { - return pop_front_r(priority); + priority.pop_front(); } - else if(normal.size()) + else if((which == NOR) && normal.size()) { - return pop_front_r(normal); + normal.pop_front(); } - else + + /* Silently do nothing if there was no element to pop() */ + } + + SQLresult& front() + { + switch(which) { - return ""; + case PRI: + return priority.front(); + case NOR: + return normal.front(); + default: + if(priority.size()) + { + which = PRI; + return priority.front(); + } + + if(normal.size()) + { + which = NOR; + return normal.front(); + } + + /* This will probably result in a segfault, + * but the caller should have checked totalsize() + * first so..meh - moron :p + */ + + return priority.front(); } } @@ -120,6 +155,11 @@ public: { return std::make_pair(priority.size(), normal.size()); } + + int totalsize() + { + return priority.size() + normal.size(); + } }; /** SQLConn represents one SQL session. @@ -141,14 +181,16 @@ private: bool ssl; /* If we should require SSL */ PGconn* sql; /* PgSQL database connection handle */ SQLstatus status; /* PgSQL database connection status */ + bool qinprog;/* If there is currently a query in progress */ + QueryQueue queue; /* Queue of queries waiting to be executed on this connection */ + Query query; /* The currently active query on this connection */ public: - QueryQueue queue; /* Queue of queries waiting to be executed on this connection */ /* This class should only ever be created inside this module, using this constructor, so we don't have to worry about the default ones */ SQLConn(Server* srv, const std::string &h, unsigned int p, const std::string &d, const std::string &u, const std::string &pwd, bool s) - : InspSocket::InspSocket(), Srv(srv), dbhost(h), dbport(p), dbname(d), dbuser(u), dbpass(pwd), ssl(s), sql(NULL), status(CWRITE) + : InspSocket::InspSocket(), Srv(srv), dbhost(h), dbport(p), dbname(d), dbuser(u), dbpass(pwd), ssl(s), sql(NULL), status(CWRITE), qinprog(false) { log(DEBUG, "Creating new PgSQL connection to database %s on %s:%u (%s/%s)", dbname.c_str(), dbhost.c_str(), dbport, dbuser.c_str(), dbpass.c_str()); @@ -191,7 +233,7 @@ public: ~SQLConn() { - + Close(); } bool DoResolve() @@ -280,11 +322,14 @@ public: virtual void Close() { + log(DEBUG,"SQLConn::Close"); + + if(this->fd > 01) + socket_ref[this->fd] = NULL; this->fd = -1; this->state = I_ERROR; this->OnError(I_ERR_SOCKET); this->ClosePending = true; - log(DEBUG,"SQLConn::Close"); if(sql) { @@ -303,20 +348,18 @@ public: log(DEBUG, "PGconnectPoll: PGRES_POLLING_WRITING"); WantWrite(); status = CWRITE; - DoPoll(); - break; + return DoPoll(); case PGRES_POLLING_READING: log(DEBUG, "PGconnectPoll: PGRES_POLLING_READING"); status = CREAD; break; case PGRES_POLLING_FAILED: log(DEBUG, "PGconnectPoll: PGRES_POLLING_FAILED: %s", PQerrorMessage(sql)); - Close(); return false; case PGRES_POLLING_OK: log(DEBUG, "PGconnectPoll: PGRES_POLLING_OK"); status = WWRITE; - break; + return DoConnectedPoll() default: log(DEBUG, "PGconnectPoll: wtf?"); break; @@ -325,8 +368,15 @@ public: return true; } - bool ProcessData() + bool DoConnectedPoll() { + if(!qinprog && queue.totalsize()) + { + /* There's no query currently in progress, and there's queries in the queue. */ + query = queue.pop_front(); + DoQuery(); + } + if(PQconsumeInput(sql)) { log(DEBUG, "PQconsumeInput succeeded"); @@ -353,6 +403,8 @@ public: PQclear(result); } + + qinprog = false; } return true; @@ -420,13 +472,15 @@ public: bool DoEvent() { + bool ret; + if((status == CREAD) || (status == CWRITE)) { - DoPoll(); + ret = DoPoll(); } else { - ProcessData(); + ret = DoConnectedPoll(); } switch(PQflush(sql)) @@ -443,7 +497,7 @@ public: break; } - return true; + return ret; } std::string MkInfoStr() @@ -482,39 +536,30 @@ public: return "Err...what, erm..BUG!"; } - bool Query(const std::string &query) + SQLerror Query(const Query &query, bool pri) { + queue.push_back(query, pri); + if((status == WREAD) || (status == WWRITE)) { - if(PQsendQuery(sql, query.c_str())) + if(!qinprog) { - log(DEBUG, "Dispatched query: %s", query.c_str()); - return true; - } - else - { - log(DEBUG, "Failed to dispatch query: %s", PQerrorMessage(sql)); - return false; + if(PQsendQuery(sql, query.c_str())) + { + log(DEBUG, "Dispatched query: %s", query.c_str()); + qinprog = true; + return SQLerror(); + } + else + { + log(DEBUG, "Failed to dispatch query: %s", PQerrorMessage(sql)); + return SQLerror(QSEND_FAIL, PQerrorMessage(sql)); + } } } log(DEBUG, "Can't query until connection is complete"); - return false; - } - - virtual void OnClose() - { - /* Close PgSQL connection */ - } - - virtual void OnError(InspSocketError e) - { - /* Unsure if we need this, we should be reading/writing via the PgSQL API rather than the insp one... */ - } - - virtual void OnTimeout() - { - /* Unused, I think */ + return SQLerror(BAD_CONN, "Can't query until connection is complete"); } }; @@ -584,7 +629,7 @@ public: if((iter = connections.find(req->dbid)) != connections.end()) { /* Execute query */ - iter->second->queue.push_back(req->query, req->pri); + req->error = iter->second->Query(Query(req->query, req->GetSource(), this), req->pri); return SQLSUCCESS; } |