diff options
Diffstat (limited to 'src/modules/extra/m_mssql.cpp')
-rw-r--r-- | src/modules/extra/m_mssql.cpp | 168 |
1 files changed, 17 insertions, 151 deletions
diff --git a/src/modules/extra/m_mssql.cpp b/src/modules/extra/m_mssql.cpp index 0f1442e59..f1f155e79 100644 --- a/src/modules/extra/m_mssql.cpp +++ b/src/modules/extra/m_mssql.cpp @@ -27,8 +27,6 @@ class SQLConn; class MsSQLResult; -class ResultNotifier; -class MsSQLListener; class ModuleMsSQL; typedef std::map<std::string, SQLConn*> ConnMap; @@ -45,86 +43,25 @@ unsigned long count(const char * const str, char a) return n; } -ResultNotifier* notifier = NULL; -MsSQLListener* listener = NULL; -int QueueFD = -1; - ConnMap connections; -Mutex* QueueMutex; Mutex* ResultsMutex; Mutex* LoggingMutex; -class QueryThread : public Thread +class QueryThread : public SocketThread { private: ModuleMsSQL* Parent; InspIRCd* ServerInstance; public: QueryThread(InspIRCd* si, ModuleMsSQL* mod) - : Thread(), Parent(mod), ServerInstance(si) + : SocketThread(si), Parent(mod), ServerInstance(si) { } ~QueryThread() { } virtual void Run(); + virtual void OnNotify(); }; -class ResultNotifier : public BufferedSocket -{ - ModuleMsSQL* mod; - - public: - ResultNotifier(ModuleMsSQL* m, InspIRCd* SI, int newfd, char* ip) : BufferedSocket(SI, newfd, ip), mod(m) - { - } - - virtual bool OnDataReady() - { - char data = 0; - if (ServerInstance->SE->Recv(this, &data, 1, 0) > 0) - { - Dispatch(); - return true; - } - return false; - } - - void Dispatch(); -}; - -class MsSQLListener : public ListenSocketBase -{ - ModuleMsSQL* Parent; - irc::sockets::insp_sockaddr sock_us; - socklen_t uslen; - FileReader* index; - - public: - MsSQLListener(ModuleMsSQL* P, InspIRCd* Instance, int port, const std::string &addr) : ListenSocketBase(Instance, port, addr), Parent(P) - { - uslen = sizeof(sock_us); - if (getsockname(this->fd,(sockaddr*)&sock_us,&uslen)) - { - throw ModuleException("Could not getsockname() to find out port number for ITC port"); - } - } - - virtual void OnAcceptReady(const std::string &ipconnectedto, int nfd, const std::string &incomingip) - { - new ResultNotifier(this->Parent, this->ServerInstance, nfd, (char *)ipconnectedto.c_str()); // XXX unsafe casts suck - } - - /* Using getsockname and ntohs, we can determine which port number we were allocated */ - int GetPort() - { -#ifdef IPV6 - return ntohs(sock_us.sin6_port); -#else - return ntohs(sock_us.sin_port); -#endif - } -}; - - class MsSQLResult : public SQLresult { private: @@ -289,8 +226,6 @@ class MsSQLResult : public SQLresult { delete fl; } - - }; class SQLConn : public classbase @@ -569,7 +504,6 @@ class SQLConn : public classbase ResultsMutex->Lock(); results.push_back(res); ResultsMutex->Unlock(); - SendNotify(); return SQLerror(); } @@ -693,40 +627,6 @@ class SQLConn : public classbase } } - void SendNotify() - { - if (QueueFD < 0) - { - if ((QueueFD = socket(AF_FAMILY, SOCK_STREAM, 0)) == -1) - { - /* crap, we're out of sockets... */ - return; - } - - irc::sockets::insp_sockaddr addr; - -#ifdef IPV6 - irc::sockets::insp_aton("::1", &addr.sin6_addr); - addr.sin6_family = AF_FAMILY; - addr.sin6_port = htons(listener->GetPort()); -#else - irc::sockets::insp_inaddr ia; - irc::sockets::insp_aton("127.0.0.1", &ia); - addr.sin_family = AF_FAMILY; - addr.sin_addr = ia; - addr.sin_port = htons(listener->GetPort()); -#endif - - if (connect(QueueFD, (sockaddr*)&addr,sizeof(addr)) == -1) - { - /* wtf, we cant connect to it, but we just created it! */ - return; - } - } - char id = 0; - send(QueueFD, &id, 1, 0); - } - void DoLeadingQuery() { SQLrequest& req = queue.front(); @@ -748,7 +648,6 @@ class ModuleMsSQL : public Module { LoggingMutex = new Mutex(); ResultsMutex = new Mutex(); - QueueMutex = new Mutex(); ServerInstance->Modules->UseInterface("SQLutils"); @@ -757,25 +656,6 @@ class ModuleMsSQL : public Module throw ModuleException("m_mssql: Unable to publish feature 'SQL'"); } - /* Create a socket on a random port. Let the tcp stack allocate us an available port */ -#ifdef IPV6 - listener = new MsSQLListener(this, ServerInstance, 0, "::1"); -#else - listener = new MsSQLListener(this, ServerInstance, 0, "127.0.0.1"); -#endif - - if (listener->GetFd() == -1) - { - ServerInstance->Modules->DoneWithInterface("SQLutils"); - throw ModuleException("m_mssql: unable to create ITC pipe"); - } - else - { - LoggingMutex->Lock(); - ServerInstance->Logs->Log("m_mssql", DEBUG, "MsSQL: Interthread comms port is %d", listener->GetPort()); - LoggingMutex->Unlock(); - } - ReadConf(); queryDispatcher = new QueryThread(ServerInstance, this); @@ -792,32 +672,14 @@ class ModuleMsSQL : public Module ClearQueue(); ClearAllConnections(); - ServerInstance->SE->DelFd(listener); - ServerInstance->BufferedSocketCull(); - - if (QueueFD >= 0) - { - shutdown(QueueFD, 2); - close(QueueFD); - } - - if (notifier) - { - ServerInstance->SE->DelFd(notifier); - notifier->Close(); - ServerInstance->BufferedSocketCull(); - } - ServerInstance->Modules->UnpublishInterface("SQL", this); ServerInstance->Modules->UnpublishFeature("SQL"); ServerInstance->Modules->DoneWithInterface("SQLutils"); delete LoggingMutex; delete ResultsMutex; - delete QueueMutex; } - void SendQueue() { for (ConnMap::iterator iter = connections.begin(); iter != connections.end(); iter++) @@ -929,9 +791,9 @@ class ModuleMsSQL : public Module virtual void OnRehash(User* user, const std::string ¶meter) { - QueueMutex->Lock(); + queryDispatcher->LockQueue(); ReadConf(); - QueueMutex->Unlock(); + queryDispatcher->UnlockQueueWakeup(); } virtual const char* OnRequest(Request* request) @@ -940,7 +802,7 @@ class ModuleMsSQL : public Module { SQLrequest* req = (SQLrequest*)request; - QueueMutex->Lock(); + queryDispatcher->LockQueue(); ConnMap::iterator iter; @@ -957,7 +819,7 @@ class ModuleMsSQL : public Module req->error.Id(SQL_BAD_DBID); } - QueueMutex->Unlock(); + queryDispatcher->UnlockQueueWakeup(); return returnval; } @@ -979,17 +841,17 @@ class ModuleMsSQL : public Module }; -void ResultNotifier::Dispatch() +void QueryThread::OnNotify() { mod->SendQueue(); } void QueryThread::Run() { + this->LockQueue(); while (this->GetExitFlag() == false) { SQLConn* conn = NULL; - QueueMutex->Lock(); for (ConnMap::iterator i = connections.begin(); i != connections.end(); i++) { if (i->second->queue.totalsize()) @@ -998,16 +860,20 @@ void QueryThread::Run() break; } } - QueueMutex->Unlock(); if (conn) { + this->UnlockQueue(); conn->DoLeadingQuery(); - QueueMutex->Lock(); + this->NotifyParent(); + this->LockQueue(); conn->queue.pop(); - QueueMutex->Unlock(); } - usleep(1000); + else + { + this->WaitForQueue(); + } } + this->UnlockQueue(); } MODULE_INIT(ModuleMsSQL) |