summaryrefslogtreecommitdiff
path: root/src/modules/extra/m_pgsql.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/modules/extra/m_pgsql.cpp')
-rw-r--r--src/modules/extra/m_pgsql.cpp161
1 files changed, 103 insertions, 58 deletions
diff --git a/src/modules/extra/m_pgsql.cpp b/src/modules/extra/m_pgsql.cpp
index c44c66bc8..14e32ac36 100644
--- a/src/modules/extra/m_pgsql.cpp
+++ b/src/modules/extra/m_pgsql.cpp
@@ -57,14 +57,6 @@ typedef std::map<std::string, SQLConn*> ConnMap;
*/
enum SQLstatus { CREAD, CWRITE, WREAD, WWRITE };
-inline std::string pop_front_r(std::deque<std::string> &d)
-{
- std::string r = d.front();
- d.pop_front();
- return r;
-
-}
-
/** QueryQueue, a queue of queries waiting to be executed.
* This maintains two queues internally, one for 'priority'
* queries and one for less important ones. Each queue has
@@ -74,21 +66,38 @@ inline std::string pop_front_r(std::deque<std::string> &d)
* queries in the priority queue they will be executed first,
* 'unimportant' queries will only be executed when the
* priority queue is empty.
+ *
+ * These are lists of SQLresult so we can, from the moment the
+ * SQLrequest is recieved, be beginning to construct the result
+ * object. The copy in the deque can then be submitted in-situ
+ * and finally deleted from this queue. No copies of the SQLresult :)
+ *
+ * Because we work on the SQLresult in-situ, we need a way of accessing the
+ * result we are currently processing, QueryQueue::front(), but that call
+ * needs to always return the same element until that element is removed
+ * from the queue, this is what the 'which' variable is. New queries are
+ * always added to the back of one of the two queues, but if when front()
+ * is first called then the priority queue is empty then front() will return
+ * a query from the normal queue, but if a query is then added to the priority
+ * queue then front() must continue to return the front of the *normal* queue
+ * until pop() is called.
*/
class QueryQueue
{
private:
- std::deque<std::string> priority; /* The priority queue */
- std::deque<std::string> normal; /* The 'normal' queue */
+ std::deque<SQLresult> priority; /* The priority queue */
+ std::deque<SQLresult> normal; /* The 'normal' queue */
+ enum { PRI, NOR, NON } which; /* Which queue the currently active element is at the front of */
public:
QueryQueue()
+ : which(NON)
{
}
- void push_back(const std::string &q, bool pri = false)
+ void push(const Query &q, bool pri = false)
{
log(DEBUG, "QueryQueue::push_back(): Adding %s query to queue: %s", ((pri) ? "priority" : "non-priority"), q.c_str());
@@ -98,21 +107,47 @@ public:
normal.push_back(q);
}
- inline std::string pop_front()
+ void pop()
{
- std::string res;
-
- if(priority.size())
+ if((which == PRI) && priority.size())
{
- return pop_front_r(priority);
+ priority.pop_front();
}
- else if(normal.size())
+ else if((which == NOR) && normal.size())
{
- return pop_front_r(normal);
+ normal.pop_front();
}
- else
+
+ /* Silently do nothing if there was no element to pop() */
+ }
+
+ SQLresult& front()
+ {
+ switch(which)
{
- return "";
+ case PRI:
+ return priority.front();
+ case NOR:
+ return normal.front();
+ default:
+ if(priority.size())
+ {
+ which = PRI;
+ return priority.front();
+ }
+
+ if(normal.size())
+ {
+ which = NOR;
+ return normal.front();
+ }
+
+ /* This will probably result in a segfault,
+ * but the caller should have checked totalsize()
+ * first so..meh - moron :p
+ */
+
+ return priority.front();
}
}
@@ -120,6 +155,11 @@ public:
{
return std::make_pair(priority.size(), normal.size());
}
+
+ int totalsize()
+ {
+ return priority.size() + normal.size();
+ }
};
/** SQLConn represents one SQL session.
@@ -141,14 +181,16 @@ private:
bool ssl; /* If we should require SSL */
PGconn* sql; /* PgSQL database connection handle */
SQLstatus status; /* PgSQL database connection status */
+ bool qinprog;/* If there is currently a query in progress */
+ QueryQueue queue; /* Queue of queries waiting to be executed on this connection */
+ Query query; /* The currently active query on this connection */
public:
- QueryQueue queue; /* Queue of queries waiting to be executed on this connection */
/* This class should only ever be created inside this module, using this constructor, so we don't have to worry about the default ones */
SQLConn(Server* srv, const std::string &h, unsigned int p, const std::string &d, const std::string &u, const std::string &pwd, bool s)
- : InspSocket::InspSocket(), Srv(srv), dbhost(h), dbport(p), dbname(d), dbuser(u), dbpass(pwd), ssl(s), sql(NULL), status(CWRITE)
+ : InspSocket::InspSocket(), Srv(srv), dbhost(h), dbport(p), dbname(d), dbuser(u), dbpass(pwd), ssl(s), sql(NULL), status(CWRITE), qinprog(false)
{
log(DEBUG, "Creating new PgSQL connection to database %s on %s:%u (%s/%s)", dbname.c_str(), dbhost.c_str(), dbport, dbuser.c_str(), dbpass.c_str());
@@ -191,7 +233,7 @@ public:
~SQLConn()
{
-
+ Close();
}
bool DoResolve()
@@ -280,11 +322,14 @@ public:
virtual void Close()
{
+ log(DEBUG,"SQLConn::Close");
+
+ if(this->fd > 01)
+ socket_ref[this->fd] = NULL;
this->fd = -1;
this->state = I_ERROR;
this->OnError(I_ERR_SOCKET);
this->ClosePending = true;
- log(DEBUG,"SQLConn::Close");
if(sql)
{
@@ -303,20 +348,18 @@ public:
log(DEBUG, "PGconnectPoll: PGRES_POLLING_WRITING");
WantWrite();
status = CWRITE;
- DoPoll();
- break;
+ return DoPoll();
case PGRES_POLLING_READING:
log(DEBUG, "PGconnectPoll: PGRES_POLLING_READING");
status = CREAD;
break;
case PGRES_POLLING_FAILED:
log(DEBUG, "PGconnectPoll: PGRES_POLLING_FAILED: %s", PQerrorMessage(sql));
- Close();
return false;
case PGRES_POLLING_OK:
log(DEBUG, "PGconnectPoll: PGRES_POLLING_OK");
status = WWRITE;
- break;
+ return DoConnectedPoll()
default:
log(DEBUG, "PGconnectPoll: wtf?");
break;
@@ -325,8 +368,15 @@ public:
return true;
}
- bool ProcessData()
+ bool DoConnectedPoll()
{
+ if(!qinprog && queue.totalsize())
+ {
+ /* There's no query currently in progress, and there's queries in the queue. */
+ query = queue.pop_front();
+ DoQuery();
+ }
+
if(PQconsumeInput(sql))
{
log(DEBUG, "PQconsumeInput succeeded");
@@ -353,6 +403,8 @@ public:
PQclear(result);
}
+
+ qinprog = false;
}
return true;
@@ -420,13 +472,15 @@ public:
bool DoEvent()
{
+ bool ret;
+
if((status == CREAD) || (status == CWRITE))
{
- DoPoll();
+ ret = DoPoll();
}
else
{
- ProcessData();
+ ret = DoConnectedPoll();
}
switch(PQflush(sql))
@@ -443,7 +497,7 @@ public:
break;
}
- return true;
+ return ret;
}
std::string MkInfoStr()
@@ -482,39 +536,30 @@ public:
return "Err...what, erm..BUG!";
}
- bool Query(const std::string &query)
+ SQLerror Query(const Query &query, bool pri)
{
+ queue.push_back(query, pri);
+
if((status == WREAD) || (status == WWRITE))
{
- if(PQsendQuery(sql, query.c_str()))
+ if(!qinprog)
{
- log(DEBUG, "Dispatched query: %s", query.c_str());
- return true;
- }
- else
- {
- log(DEBUG, "Failed to dispatch query: %s", PQerrorMessage(sql));
- return false;
+ if(PQsendQuery(sql, query.c_str()))
+ {
+ log(DEBUG, "Dispatched query: %s", query.c_str());
+ qinprog = true;
+ return SQLerror();
+ }
+ else
+ {
+ log(DEBUG, "Failed to dispatch query: %s", PQerrorMessage(sql));
+ return SQLerror(QSEND_FAIL, PQerrorMessage(sql));
+ }
}
}
log(DEBUG, "Can't query until connection is complete");
- return false;
- }
-
- virtual void OnClose()
- {
- /* Close PgSQL connection */
- }
-
- virtual void OnError(InspSocketError e)
- {
- /* Unsure if we need this, we should be reading/writing via the PgSQL API rather than the insp one... */
- }
-
- virtual void OnTimeout()
- {
- /* Unused, I think */
+ return SQLerror(BAD_CONN, "Can't query until connection is complete");
}
};
@@ -584,7 +629,7 @@ public:
if((iter = connections.find(req->dbid)) != connections.end())
{
/* Execute query */
- iter->second->queue.push_back(req->query, req->pri);
+ req->error = iter->second->Query(Query(req->query, req->GetSource(), this), req->pri);
return SQLSUCCESS;
}