summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorpeavey <peavey@e03df62e-2008-0410-955e-edbf42e46eb7>2008-09-05 22:29:45 +0000
committerpeavey <peavey@e03df62e-2008-0410-955e-edbf42e46eb7>2008-09-05 22:29:45 +0000
commit09262ea7e490f75ff9770edca5099ed7e276e270 (patch)
tree1be3959f9f471ee7300e23763f674aef23a5ce39 /src
parentff11847cea53bc63b0c489b423a4e4fa6c05184b (diff)
Now with threaded queries.
git-svn-id: http://svn.inspircd.org/repository/trunk/inspircd@10401 e03df62e-2008-0410-955e-edbf42e46eb7
Diffstat (limited to 'src')
-rw-r--r--src/modules/extra/m_mssql.cpp106
1 files changed, 100 insertions, 6 deletions
diff --git a/src/modules/extra/m_mssql.cpp b/src/modules/extra/m_mssql.cpp
index 97e5157b0..c59f02260 100644
--- a/src/modules/extra/m_mssql.cpp
+++ b/src/modules/extra/m_mssql.cpp
@@ -27,14 +27,33 @@
class SQLConn;
class MsSQLResult;
class ResultNotifier;
+class ModuleMsSQL;
typedef std::map<std::string, SQLConn*> ConnMap;
-typedef std::deque<classbase*> paramlist;
typedef std::deque<MsSQLResult*> ResultQueue;
ResultNotifier* resultnotify = NULL;
ResultNotifier* resultdispatch = NULL;
int QueueFD = -1;
+ConnMap connections;
+Mutex* QueueMutex;
+Mutex* ResultsMutex;
+Mutex* LoggingMutex;
+
+
+class QueryThread : public Thread
+{
+ private:
+ ModuleMsSQL* Parent;
+ InspIRCd* Instance;
+ public:
+ QueryThread(InspIRCd* si, ModuleMsSQL* mod)
+ : Thread(), Parent(mod), Instance(si)
+ {
+ }
+ ~QueryThread() { }
+ virtual void Run();
+};
class ResultNotifier : public BufferedSocket
{
@@ -272,6 +291,8 @@ class SQLConn : public classbase
TDSCONTEXT* context;
public:
+ QueryQueue queue;
+
SQLConn(InspIRCd* SI, Module* m, const SQLhost& hi)
: Instance(SI), mod(m), host(hi), login(NULL), sock(NULL), context(NULL)
{
@@ -282,19 +303,25 @@ class SQLConn : public classbase
{
if (tds_process_simple_query(sock) != TDS_SUCCEED)
{
+ LoggingMutex->Enable(true);
Instance->Logs->Log("m_mssql",DEFAULT, "WARNING: Could not select database " + host.name + " for DB with id: " + host.id);
+ LoggingMutex->Enable(false);
CloseDB();
}
}
else
{
+ LoggingMutex->Enable(true);
Instance->Logs->Log("m_mssql",DEFAULT, "WARNING: Could not select database " + host.name + " for DB with id: " + host.id);
+ LoggingMutex->Enable(false);
CloseDB();
}
}
else
{
+ LoggingMutex->Enable(true);
Instance->Logs->Log("m_mssql",DEFAULT, "WARNING: Could not connect to DB with id: " + host.id);
+ LoggingMutex->Enable(false);
CloseDB();
}
}
@@ -379,12 +406,14 @@ class SQLConn : public classbase
*queryend = 0;
req.query.q = query;
- MsSQLResult* res = new MsSQLResult(mod, req.GetSource(), req.id);
+ MsSQLResult* res = new MsSQLResult((Module*)mod, req.GetSource(), req.id);
res->dbid = host.id;
res->query = req.query.q;
char* msquery = strdup(req.query.q.data());
+ LoggingMutex->Enable(true);
Instance->Logs->Log("m_mssql",DEBUG,"doing Query: %s",msquery);
+ LoggingMutex->Enable(false);
if (tds_submit_query(sock, msquery) != TDS_SUCCEED)
{
std::string error("failed to execute: "+std::string(req.query.q.data()));
@@ -452,7 +481,9 @@ class SQLConn : public classbase
break;
}
}
+ ResultsMutex->Enable(true);
results.push_back(res);
+ ResultsMutex->Enable(false);
SendNotify();
return SQLerror();
}
@@ -460,14 +491,18 @@ class SQLConn : public classbase
static int HandleMessage(const TDSCONTEXT * pContext, TDSSOCKET * pTdsSocket, TDSMESSAGE * pMessage)
{
SQLConn* sc = (SQLConn*)pContext->parent;
+ LoggingMutex->Enable(true);
sc->Instance->Logs->Log("m_mssql", DEBUG, "Message for DB with id: %s -> %s", sc->host.id.c_str(), pMessage->message);
+ LoggingMutex->Enable(false);
return 0;
}
static int HandleError(const TDSCONTEXT * pContext, TDSSOCKET * pTdsSocket, TDSMESSAGE * pMessage)
{
SQLConn* sc = (SQLConn*)pContext->parent;
+ LoggingMutex->Enable(true);
sc->Instance->Logs->Log("m_mssql", DEFAULT, "Error for DB with id: %s -> %s", sc->host.id.c_str(), pMessage->message);
+ LoggingMutex->Enable(false);
return 0;
}
@@ -545,6 +580,7 @@ class SQLConn : public classbase
while (results.size())
{
MsSQLResult* res = results[0];
+ ResultsMutex->Enable(true);
if (res->GetDest())
{
res->Send();
@@ -558,6 +594,7 @@ class SQLConn : public classbase
delete res;
}
results.pop_front();
+ ResultsMutex->Enable(false);
}
}
@@ -605,19 +642,29 @@ class SQLConn : public classbase
send(QueueFD, &id, 1, 0);
}
+ void DoLeadingQuery()
+ {
+ SQLrequest& req = queue.front();
+ req.error = Query(req);
+ }
+
};
class ModuleMsSQL : public Module
{
private:
- ConnMap connections;
unsigned long currid;
+ QueryThread* Dispatcher;
public:
ModuleMsSQL(InspIRCd* Me)
: Module(Me), currid(0)
{
+ LoggingMutex = ServerInstance->Mutexes->CreateMutex();
+ ResultsMutex = ServerInstance->Mutexes->CreateMutex();
+ QueueMutex = ServerInstance->Mutexes->CreateMutex();
+
ServerInstance->Modules->UseInterface("SQLutils");
if (!ServerInstance->Modules->PublishFeature("SQL", this))
@@ -629,6 +676,9 @@ class ModuleMsSQL : public Module
ReadConf();
+ Dispatcher = new QueryThread(ServerInstance, this);
+ ServerInstance->Threads->Create(Dispatcher);
+
ServerInstance->Modules->PublishInterface("SQL", this);
Implementation eventlist[] = { I_OnRequest, I_OnRehash };
ServerInstance->Modules->Attach(eventlist, this, 2);
@@ -636,6 +686,7 @@ class ModuleMsSQL : public Module
virtual ~ModuleMsSQL()
{
+ delete Dispatcher;
ClearQueue();
ClearAllConnections();
@@ -659,6 +710,10 @@ class ModuleMsSQL : public Module
ServerInstance->Modules->UnpublishInterface("SQL", this);
ServerInstance->Modules->UnpublishFeature("SQL");
ServerInstance->Modules->DoneWithInterface("SQLutils");
+
+ delete LoggingMutex;
+ delete ResultsMutex;
+ delete QueueMutex;
}
@@ -733,7 +788,9 @@ class ModuleMsSQL : public Module
{
if (HasHost(hi))
{
+ LoggingMutex->Enable(true);
ServerInstance->Logs->Log("m_mssql",DEFAULT, "WARNING: A MsSQL connection with id: %s already exists. Aborting database open attempt.", hi.id.c_str());
+ LoggingMutex->Enable(false);
return;
}
@@ -771,7 +828,9 @@ class ModuleMsSQL : public Module
virtual void OnRehash(User* user, const std::string &parameter)
{
+ QueueMutex->Enable(true);
ReadConf();
+ QueueMutex->Enable(false);
}
virtual const char* OnRequest(Request* request)
@@ -779,18 +838,27 @@ class ModuleMsSQL : public Module
if(strcmp(SQLREQID, request->GetId()) == 0)
{
SQLrequest* req = (SQLrequest*)request;
+
+ QueueMutex->Enable(true);
+
ConnMap::iterator iter;
+
+ const char* returnval = NULL;
+
if((iter = connections.find(req->dbid)) != connections.end())
{
req->id = NewID();
- req->error = iter->second->Query(*req);
- return SQLSUCCESS;
+ iter->second->queue.push(*req);
+ returnval= SQLSUCCESS;
}
else
{
req->error.Id(SQL_BAD_DBID);
- return NULL;
}
+
+ QueueMutex->Enable(false);
+
+ return returnval;
}
return NULL;
}
@@ -815,4 +883,30 @@ void ResultNotifier::Dispatch()
((ModuleMsSQL*)mod)->SendQueue();
}
+void QueryThread::Run()
+{
+ while (this->GetExitFlag() == false)
+ {
+ SQLConn* conn = NULL;
+ QueueMutex->Enable(true);
+ for (ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
+ {
+ if (i->second->queue.totalsize())
+ {
+ conn = i->second;
+ break;
+ }
+ }
+ QueueMutex->Enable(false);
+ if (conn)
+ {
+ conn->DoLeadingQuery();
+ QueueMutex->Enable(true);
+ conn->queue.pop();
+ QueueMutex->Enable(false);
+ }
+ usleep(1000);
+ }
+}
+
MODULE_INIT(ModuleMsSQL)