From 09262ea7e490f75ff9770edca5099ed7e276e270 Mon Sep 17 00:00:00 2001 From: peavey Date: Fri, 5 Sep 2008 22:29:45 +0000 Subject: Now with threaded queries. git-svn-id: http://svn.inspircd.org/repository/trunk/inspircd@10401 e03df62e-2008-0410-955e-edbf42e46eb7 --- src/modules/extra/m_mssql.cpp | 106 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 100 insertions(+), 6 deletions(-) (limited to 'src/modules') diff --git a/src/modules/extra/m_mssql.cpp b/src/modules/extra/m_mssql.cpp index 97e5157b0..c59f02260 100644 --- a/src/modules/extra/m_mssql.cpp +++ b/src/modules/extra/m_mssql.cpp @@ -27,14 +27,33 @@ class SQLConn; class MsSQLResult; class ResultNotifier; +class ModuleMsSQL; typedef std::map ConnMap; -typedef std::deque paramlist; typedef std::deque ResultQueue; ResultNotifier* resultnotify = NULL; ResultNotifier* resultdispatch = NULL; int QueueFD = -1; +ConnMap connections; +Mutex* QueueMutex; +Mutex* ResultsMutex; +Mutex* LoggingMutex; + + +class QueryThread : public Thread +{ + private: + ModuleMsSQL* Parent; + InspIRCd* Instance; + public: + QueryThread(InspIRCd* si, ModuleMsSQL* mod) + : Thread(), Parent(mod), Instance(si) + { + } + ~QueryThread() { } + virtual void Run(); +}; class ResultNotifier : public BufferedSocket { @@ -272,6 +291,8 @@ class SQLConn : public classbase TDSCONTEXT* context; public: + QueryQueue queue; + SQLConn(InspIRCd* SI, Module* m, const SQLhost& hi) : Instance(SI), mod(m), host(hi), login(NULL), sock(NULL), context(NULL) { @@ -282,19 +303,25 @@ class SQLConn : public classbase { if (tds_process_simple_query(sock) != TDS_SUCCEED) { + LoggingMutex->Enable(true); Instance->Logs->Log("m_mssql",DEFAULT, "WARNING: Could not select database " + host.name + " for DB with id: " + host.id); + LoggingMutex->Enable(false); CloseDB(); } } else { + LoggingMutex->Enable(true); Instance->Logs->Log("m_mssql",DEFAULT, "WARNING: Could not select database " + host.name + " for DB with id: " + host.id); + LoggingMutex->Enable(false); CloseDB(); } } else { + LoggingMutex->Enable(true); Instance->Logs->Log("m_mssql",DEFAULT, "WARNING: Could not connect to DB with id: " + host.id); + LoggingMutex->Enable(false); CloseDB(); } } @@ -379,12 +406,14 @@ class SQLConn : public classbase *queryend = 0; req.query.q = query; - MsSQLResult* res = new MsSQLResult(mod, req.GetSource(), req.id); + MsSQLResult* res = new MsSQLResult((Module*)mod, req.GetSource(), req.id); res->dbid = host.id; res->query = req.query.q; char* msquery = strdup(req.query.q.data()); + LoggingMutex->Enable(true); Instance->Logs->Log("m_mssql",DEBUG,"doing Query: %s",msquery); + LoggingMutex->Enable(false); if (tds_submit_query(sock, msquery) != TDS_SUCCEED) { std::string error("failed to execute: "+std::string(req.query.q.data())); @@ -452,7 +481,9 @@ class SQLConn : public classbase break; } } + ResultsMutex->Enable(true); results.push_back(res); + ResultsMutex->Enable(false); SendNotify(); return SQLerror(); } @@ -460,14 +491,18 @@ class SQLConn : public classbase static int HandleMessage(const TDSCONTEXT * pContext, TDSSOCKET * pTdsSocket, TDSMESSAGE * pMessage) { SQLConn* sc = (SQLConn*)pContext->parent; + LoggingMutex->Enable(true); sc->Instance->Logs->Log("m_mssql", DEBUG, "Message for DB with id: %s -> %s", sc->host.id.c_str(), pMessage->message); + LoggingMutex->Enable(false); return 0; } static int HandleError(const TDSCONTEXT * pContext, TDSSOCKET * pTdsSocket, TDSMESSAGE * pMessage) { SQLConn* sc = (SQLConn*)pContext->parent; + LoggingMutex->Enable(true); sc->Instance->Logs->Log("m_mssql", DEFAULT, "Error for DB with id: %s -> %s", sc->host.id.c_str(), pMessage->message); + LoggingMutex->Enable(false); return 0; } @@ -545,6 +580,7 @@ class SQLConn : public classbase while (results.size()) { MsSQLResult* res = results[0]; + ResultsMutex->Enable(true); if (res->GetDest()) { res->Send(); @@ -558,6 +594,7 @@ class SQLConn : public classbase delete res; } results.pop_front(); + ResultsMutex->Enable(false); } } @@ -605,19 +642,29 @@ class SQLConn : public classbase send(QueueFD, &id, 1, 0); } + void DoLeadingQuery() + { + SQLrequest& req = queue.front(); + req.error = Query(req); + } + }; class ModuleMsSQL : public Module { private: - ConnMap connections; unsigned long currid; + QueryThread* Dispatcher; public: ModuleMsSQL(InspIRCd* Me) : Module(Me), currid(0) { + LoggingMutex = ServerInstance->Mutexes->CreateMutex(); + ResultsMutex = ServerInstance->Mutexes->CreateMutex(); + QueueMutex = ServerInstance->Mutexes->CreateMutex(); + ServerInstance->Modules->UseInterface("SQLutils"); if (!ServerInstance->Modules->PublishFeature("SQL", this)) @@ -629,6 +676,9 @@ class ModuleMsSQL : public Module ReadConf(); + Dispatcher = new QueryThread(ServerInstance, this); + ServerInstance->Threads->Create(Dispatcher); + ServerInstance->Modules->PublishInterface("SQL", this); Implementation eventlist[] = { I_OnRequest, I_OnRehash }; ServerInstance->Modules->Attach(eventlist, this, 2); @@ -636,6 +686,7 @@ class ModuleMsSQL : public Module virtual ~ModuleMsSQL() { + delete Dispatcher; ClearQueue(); ClearAllConnections(); @@ -659,6 +710,10 @@ class ModuleMsSQL : public Module ServerInstance->Modules->UnpublishInterface("SQL", this); ServerInstance->Modules->UnpublishFeature("SQL"); ServerInstance->Modules->DoneWithInterface("SQLutils"); + + delete LoggingMutex; + delete ResultsMutex; + delete QueueMutex; } @@ -733,7 +788,9 @@ class ModuleMsSQL : public Module { if (HasHost(hi)) { + LoggingMutex->Enable(true); ServerInstance->Logs->Log("m_mssql",DEFAULT, "WARNING: A MsSQL connection with id: %s already exists. Aborting database open attempt.", hi.id.c_str()); + LoggingMutex->Enable(false); return; } @@ -771,7 +828,9 @@ class ModuleMsSQL : public Module virtual void OnRehash(User* user, const std::string ¶meter) { + QueueMutex->Enable(true); ReadConf(); + QueueMutex->Enable(false); } virtual const char* OnRequest(Request* request) @@ -779,18 +838,27 @@ class ModuleMsSQL : public Module if(strcmp(SQLREQID, request->GetId()) == 0) { SQLrequest* req = (SQLrequest*)request; + + QueueMutex->Enable(true); + ConnMap::iterator iter; + + const char* returnval = NULL; + if((iter = connections.find(req->dbid)) != connections.end()) { req->id = NewID(); - req->error = iter->second->Query(*req); - return SQLSUCCESS; + iter->second->queue.push(*req); + returnval= SQLSUCCESS; } else { req->error.Id(SQL_BAD_DBID); - return NULL; } + + QueueMutex->Enable(false); + + return returnval; } return NULL; } @@ -815,4 +883,30 @@ void ResultNotifier::Dispatch() ((ModuleMsSQL*)mod)->SendQueue(); } +void QueryThread::Run() +{ + while (this->GetExitFlag() == false) + { + SQLConn* conn = NULL; + QueueMutex->Enable(true); + for (ConnMap::iterator i = connections.begin(); i != connections.end(); i++) + { + if (i->second->queue.totalsize()) + { + conn = i->second; + break; + } + } + QueueMutex->Enable(false); + if (conn) + { + conn->DoLeadingQuery(); + QueueMutex->Enable(true); + conn->queue.pop(); + QueueMutex->Enable(false); + } + usleep(1000); + } +} + MODULE_INIT(ModuleMsSQL) -- cgit v1.2.3