summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/modules/extra/m_mysql.cpp312
1 files changed, 178 insertions, 134 deletions
diff --git a/src/modules/extra/m_mysql.cpp b/src/modules/extra/m_mysql.cpp
index aef6abca9..d1329151c 100644
--- a/src/modules/extra/m_mysql.cpp
+++ b/src/modules/extra/m_mysql.cpp
@@ -3,13 +3,13 @@
* +------------------------------------+
*
* InspIRCd is copyright (C) 2002-2004 ChatSpike-Dev.
- * E-mail:
- * <brain@chatspike.net>
- * <Craig@chatspike.net>
+ * E-mail:
+ * <brain@chatspike.net>
+ * <Craig@chatspike.net>
*
* Written by Craig Edwards, Craig McLure, and others.
* This program is free but copyrighted software; see
- * the file COPYING for details.
+ * the file COPYING for details.
*
* ---------------------------------------------------
*/
@@ -29,17 +29,132 @@ using namespace std;
/* VERSION 2 API: With nonblocking (threaded) requests */
/* $ModDesc: SQL Service Provider module for all other m_sql* modules */
-/* $CompileFlags: -pthread `mysql_config --include` */
-/* $LinkerFlags: -pthread `mysql_config --libs_r` `perl ../mysql_rpath.pl` */
+/* $CompileFlags: `mysql_config --include` */
+/* $LinkerFlags: `mysql_config --libs_r` `perl ../mysql_rpath.pl` */
+
+
+class SQLConnection;
+
+extern InspIRCd* ServerInstance;
+typedef std::map<std::string, SQLConnection*> ConnMap;
-/** SQLConnection represents one mysql session.
- * Each session has its own persistent connection to the database.
- */
#if !defined(MYSQL_VERSION_ID) || MYSQL_VERSION_ID<32224
#define mysql_field_count mysql_num_fields
#endif
+class QueryQueue : public classbase
+{
+private:
+ typedef std::deque<SQLrequest> ReqDeque;
+
+ ReqDeque priority; /* The priority queue */
+ ReqDeque normal; /* The 'normal' queue */
+ enum { PRI, NOR, NON } which; /* Which queue the currently active element is at the front of */
+
+public:
+ QueryQueue()
+ : which(NON)
+ {
+ }
+
+ void push(const SQLrequest &q)
+ {
+ log(DEBUG, "QueryQueue::push(): Adding %s query to queue: %s", ((q.pri) ? "priority" : "non-priority"), q.query.q.c_str());
+
+ if(q.pri)
+ priority.push_back(q);
+ else
+ normal.push_back(q);
+ }
+
+ void pop()
+ {
+ if((which == PRI) && priority.size())
+ {
+ priority.pop_front();
+ }
+ else if((which == NOR) && normal.size())
+ {
+ normal.pop_front();
+ }
+
+ /* Reset this */
+ which = NON;
+
+ /* Silently do nothing if there was no element to pop() */
+ }
+
+ SQLrequest& front()
+ {
+ switch(which)
+ {
+ case PRI:
+ return priority.front();
+ case NOR:
+ return normal.front();
+ default:
+ if(priority.size())
+ {
+ which = PRI;
+ return priority.front();
+ }
+
+ if(normal.size())
+ {
+ which = NOR;
+ return normal.front();
+ }
+
+ /* This will probably result in a segfault,
+ * but the caller should have checked totalsize()
+ * first so..meh - moron :p
+ */
+
+ return priority.front();
+ }
+ }
+
+ std::pair<int, int> size()
+ {
+ return std::make_pair(priority.size(), normal.size());
+ }
+
+ int totalsize()
+ {
+ return priority.size() + normal.size();
+ }
+
+ void PurgeModule(Module* mod)
+ {
+ DoPurgeModule(mod, priority);
+ DoPurgeModule(mod, normal);
+ }
+
+private:
+ void DoPurgeModule(Module* mod, ReqDeque& q)
+ {
+ for(ReqDeque::iterator iter = q.begin(); iter != q.end(); iter++)
+ {
+ if(iter->GetSource() == mod)
+ {
+ if(iter->id == front().id)
+ {
+ /* It's the currently active query.. :x */
+ iter->SetSource(NULL);
+ }
+ else
+ {
+ /* It hasn't been executed yet..just remove it */
+ iter = q.erase(iter);
+ }
+ }
+ }
+ }
+};
+
+
+
class SQLConnection : public classbase
{
protected:
@@ -201,120 +316,58 @@ class SQLConnection : public classbase
};
-typedef std::vector<SQLConnection> ConnectionList;
+ConnMap Connections;
-class ModuleSQL : public Module
+void ConnectDatabases(Server* Srv)
{
- Server *Srv;
- ConfigReader *Conf;
- ConnectionList Connections;
-
- public:
- void ConnectDatabases()
+ for (ConnMap::iterator i = Connections.begin(); i != Connections.end(); i++)
{
- for (ConnectionList::iterator i = Connections.begin(); i != Connections.end(); i++)
+ i->second->Enable();
+ if (i->second->Connect())
{
- i->Enable();
- if (i->Connect())
- {
- Srv->Log(DEFAULT,"SQL: Successfully connected database "+i->GetHost());
- }
- else
- {
- Srv->Log(DEFAULT,"SQL: Failed to connect database "+i->GetHost()+": Error: "+i->GetError());
- i->Disable();
- }
+ Srv->Log(DEFAULT,"SQL: Successfully connected database "+i->second->GetHost());
}
- }
-
- void LoadDatabases(ConfigReader* ThisConf)
- {
- Srv->Log(DEFAULT,"SQL: Loading database settings");
- Connections.clear();
- Srv->Log(DEBUG,"Cleared connections");
- for (int j =0; j < ThisConf->Enumerate("database"); j++)
+ else
{
- std::string db = ThisConf->ReadValue("database","name",j);
- std::string user = ThisConf->ReadValue("database","username",j);
- std::string pass = ThisConf->ReadValue("database","password",j);
- std::string host = ThisConf->ReadValue("database","hostname",j);
- std::string id = ThisConf->ReadValue("database","id",j);
- Srv->Log(DEBUG,"Read database settings");
- if ((db != "") && (host != "") && (user != "") && (id != "") && (pass != ""))
- {
- SQLConnection ThisSQL(host,user,pass,db,atoi(id.c_str()));
- Srv->Log(DEFAULT,"Loaded database: "+ThisSQL.GetHost());
- Connections.push_back(ThisSQL);
- Srv->Log(DEBUG,"Pushed back connection");
- }
+ Srv->Log(DEFAULT,"SQL: Failed to connect database "+i->second->GetHost()+": Error: "+i->second->GetError());
+ i->second->Disable();
}
- ConnectDatabases();
}
+}
- void ResultType(SQLRequest *r, SQLResult *res)
- {
- for (ConnectionList::iterator i = Connections.begin(); i != Connections.end(); i++)
- {
- if ((i->GetID() == r->GetConnID()) && (i->IsEnabled()))
- {
- bool xr = i->QueryResult(r->GetQuery());
- if (!xr)
- {
- res->SetType(SQL_ERROR);
- res->SetError(i->GetError());
- return;
- }
- res->SetType(SQL_OK);
- return;
- }
- }
- }
- void CountType(SQLRequest *r, SQLResult* res)
+void LoadDatabases(ConfigReader* ThisConf, Server* Srv)
+{
+ Srv->Log(DEFAULT,"SQL: Loading database settings");
+ Connections.clear();
+ Srv->Log(DEBUG,"Cleared connections");
+ for (int j =0; j < ThisConf->Enumerate("database"); j++)
{
- for (ConnectionList::iterator i = Connections.begin(); i != Connections.end(); i++)
+ std::string db = ThisConf->ReadValue("database","name",j);
+ std::string user = ThisConf->ReadValue("database","username",j);
+ std::string pass = ThisConf->ReadValue("database","password",j);
+ std::string host = ThisConf->ReadValue("database","hostname",j);
+ std::string id = ThisConf->ReadValue("database","id",j);
+ Srv->Log(DEBUG,"Read database settings");
+ if ((db != "") && (host != "") && (user != "") && (id != "") && (pass != ""))
{
- if ((i->GetID() == r->GetConnID()) && (i->IsEnabled()))
- {
- res->SetType(SQL_COUNT);
- res->SetCount(i->QueryCount(r->GetQuery()));
- return;
- }
+ SQLConnection* ThisSQL = new SQLConnection(host,user,pass,db,atoi(id.c_str()));
+ Srv->Log(DEFAULT,"Loaded database: "+ThisSQL->GetHost());
+ Connections[id] = ThisSQL;
+ Srv->Log(DEBUG,"Pushed back connection");
}
}
+ ConnectDatabases(Srv);
+}
- void DoneType(SQLRequest *r, SQLResult* res)
- {
- for (ConnectionList::iterator i = Connections.begin(); i != Connections.end(); i++)
- {
- if ((i->GetID() == r->GetConnID()) && (i->IsEnabled()))
- {
- res->SetType(SQL_DONE);
- if (!i->QueryDone())
- res->SetType(SQL_ERROR);
- }
- }
- }
+void* DispatcherThread(void* arg);
- void RowType(SQLRequest *r, SQLResult* res)
- {
- for (ConnectionList::iterator i = Connections.begin(); i != Connections.end(); i++)
- {
- if ((i->GetID() == r->GetConnID()) && (i->IsEnabled()))
- {
- log(DEBUG,"*** FOUND MATCHING ROW");
- std::map<std::string,std::string> row = i->GetRow();
- res->SetRow(row);
- res->SetType(SQL_ROW);
- if (!row.size())
- {
- log(DEBUG,"ROW SIZE IS 0");
- res->SetType(SQL_END);
- }
- return;
- }
- }
- }
+class ModuleSQL : public Module
+{
+ public:
+ Server *Srv;
+ ConfigReader *Conf;
+ pthread_t Dispatcher;
void Implements(char* List)
{
@@ -323,27 +376,6 @@ class ModuleSQL : public Module
char* OnRequest(Request* request)
{
- if (request)
- {
- SQLResult* Result = new SQLResult();
- SQLRequest *r = (SQLRequest*)request->GetData();
- switch (r->GetQueryType())
- {
- case SQL_RESULT:
- ResultType(r,Result);
- break;
- case SQL_COUNT:
- CountType(r,Result);
- break;
- case SQL_ROW:
- RowType(r,Result);
- break;
- case SQL_DONE:
- DoneType(r,Result);
- break;
- }
- return (char*)Result;
- }
return NULL;
}
@@ -352,29 +384,41 @@ class ModuleSQL : public Module
{
Srv = Me;
Conf = new ConfigReader();
- LoadDatabases(Conf);
+ pthread_attr_t attribs;
+ pthread_attr_init(&attribs);
+ pthread_attr_setdetachstate(&attribs, PTHREAD_CREATE_DETACHED);
+ if (pthread_create(&this->Dispatcher, &attribs, DispatcherThread, (void *)this) != 0)
+ {
+ log(DEBUG,"m_mysql: Failed to create dispatcher thread: %s", strerror(errno));
+ }
}
virtual ~ModuleSQL()
{
- Connections.clear();
DELETE(Conf);
}
virtual void OnRehash(const std::string &parameter)
{
- DELETE(Conf);
- Conf = new ConfigReader();
- LoadDatabases(Conf);
+ /* TODO: set rehash bool here, which makes the dispatcher thread rehash at next opportunity */
}
virtual Version GetVersion()
{
- return Version(1,0,0,0,VF_VENDOR|VF_SERVICEPROVIDER);
+ return Version(1,1,0,0,VF_VENDOR|VF_SERVICEPROVIDER);
}
};
+void* DispatcherThread(void* arg)
+{
+ ModuleSQL* thismodule = (ModuleSQL*)arg;
+ LoadDatabases(thismodule->Conf, thismodule->Srv);
+
+ return NULL;
+}
+
+
// stuff down here is the module-factory stuff. For basic modules you can ignore this.
class ModuleSQLFactory : public ModuleFactory