diff options
Diffstat (limited to 'src/modules')
-rw-r--r-- | src/modules/extra/m_mysql.cpp | 312 |
1 files changed, 178 insertions, 134 deletions
diff --git a/src/modules/extra/m_mysql.cpp b/src/modules/extra/m_mysql.cpp index aef6abca9..d1329151c 100644 --- a/src/modules/extra/m_mysql.cpp +++ b/src/modules/extra/m_mysql.cpp @@ -3,13 +3,13 @@ * +------------------------------------+ * * InspIRCd is copyright (C) 2002-2004 ChatSpike-Dev. - * E-mail: - * <brain@chatspike.net> - * <Craig@chatspike.net> + * E-mail: + * <brain@chatspike.net> + * <Craig@chatspike.net> * * Written by Craig Edwards, Craig McLure, and others. * This program is free but copyrighted software; see - * the file COPYING for details. + * the file COPYING for details. * * --------------------------------------------------- */ @@ -29,17 +29,132 @@ using namespace std; /* VERSION 2 API: With nonblocking (threaded) requests */ /* $ModDesc: SQL Service Provider module for all other m_sql* modules */ -/* $CompileFlags: -pthread `mysql_config --include` */ -/* $LinkerFlags: -pthread `mysql_config --libs_r` `perl ../mysql_rpath.pl` */ +/* $CompileFlags: `mysql_config --include` */ +/* $LinkerFlags: `mysql_config --libs_r` `perl ../mysql_rpath.pl` */ + + +class SQLConnection; + +extern InspIRCd* ServerInstance; +typedef std::map<std::string, SQLConnection*> ConnMap; -/** SQLConnection represents one mysql session. - * Each session has its own persistent connection to the database. - */ #if !defined(MYSQL_VERSION_ID) || MYSQL_VERSION_ID<32224 #define mysql_field_count mysql_num_fields #endif +class QueryQueue : public classbase +{ +private: + typedef std::deque<SQLrequest> ReqDeque; + + ReqDeque priority; /* The priority queue */ + ReqDeque normal; /* The 'normal' queue */ + enum { PRI, NOR, NON } which; /* Which queue the currently active element is at the front of */ + +public: + QueryQueue() + : which(NON) + { + } + + void push(const SQLrequest &q) + { + log(DEBUG, "QueryQueue::push(): Adding %s query to queue: %s", ((q.pri) ? "priority" : "non-priority"), q.query.q.c_str()); + + if(q.pri) + priority.push_back(q); + else + normal.push_back(q); + } + + void pop() + { + if((which == PRI) && priority.size()) + { + priority.pop_front(); + } + else if((which == NOR) && normal.size()) + { + normal.pop_front(); + } + + /* Reset this */ + which = NON; + + /* Silently do nothing if there was no element to pop() */ + } + + SQLrequest& front() + { + switch(which) + { + case PRI: + return priority.front(); + case NOR: + return normal.front(); + default: + if(priority.size()) + { + which = PRI; + return priority.front(); + } + + if(normal.size()) + { + which = NOR; + return normal.front(); + } + + /* This will probably result in a segfault, + * but the caller should have checked totalsize() + * first so..meh - moron :p + */ + + return priority.front(); + } + } + + std::pair<int, int> size() + { + return std::make_pair(priority.size(), normal.size()); + } + + int totalsize() + { + return priority.size() + normal.size(); + } + + void PurgeModule(Module* mod) + { + DoPurgeModule(mod, priority); + DoPurgeModule(mod, normal); + } + +private: + void DoPurgeModule(Module* mod, ReqDeque& q) + { + for(ReqDeque::iterator iter = q.begin(); iter != q.end(); iter++) + { + if(iter->GetSource() == mod) + { + if(iter->id == front().id) + { + /* It's the currently active query.. :x */ + iter->SetSource(NULL); + } + else + { + /* It hasn't been executed yet..just remove it */ + iter = q.erase(iter); + } + } + } + } +}; + + + class SQLConnection : public classbase { protected: @@ -201,120 +316,58 @@ class SQLConnection : public classbase }; -typedef std::vector<SQLConnection> ConnectionList; +ConnMap Connections; -class ModuleSQL : public Module +void ConnectDatabases(Server* Srv) { - Server *Srv; - ConfigReader *Conf; - ConnectionList Connections; - - public: - void ConnectDatabases() + for (ConnMap::iterator i = Connections.begin(); i != Connections.end(); i++) { - for (ConnectionList::iterator i = Connections.begin(); i != Connections.end(); i++) + i->second->Enable(); + if (i->second->Connect()) { - i->Enable(); - if (i->Connect()) - { - Srv->Log(DEFAULT,"SQL: Successfully connected database "+i->GetHost()); - } - else - { - Srv->Log(DEFAULT,"SQL: Failed to connect database "+i->GetHost()+": Error: "+i->GetError()); - i->Disable(); - } + Srv->Log(DEFAULT,"SQL: Successfully connected database "+i->second->GetHost()); } - } - - void LoadDatabases(ConfigReader* ThisConf) - { - Srv->Log(DEFAULT,"SQL: Loading database settings"); - Connections.clear(); - Srv->Log(DEBUG,"Cleared connections"); - for (int j =0; j < ThisConf->Enumerate("database"); j++) + else { - std::string db = ThisConf->ReadValue("database","name",j); - std::string user = ThisConf->ReadValue("database","username",j); - std::string pass = ThisConf->ReadValue("database","password",j); - std::string host = ThisConf->ReadValue("database","hostname",j); - std::string id = ThisConf->ReadValue("database","id",j); - Srv->Log(DEBUG,"Read database settings"); - if ((db != "") && (host != "") && (user != "") && (id != "") && (pass != "")) - { - SQLConnection ThisSQL(host,user,pass,db,atoi(id.c_str())); - Srv->Log(DEFAULT,"Loaded database: "+ThisSQL.GetHost()); - Connections.push_back(ThisSQL); - Srv->Log(DEBUG,"Pushed back connection"); - } + Srv->Log(DEFAULT,"SQL: Failed to connect database "+i->second->GetHost()+": Error: "+i->second->GetError()); + i->second->Disable(); } - ConnectDatabases(); } +} - void ResultType(SQLRequest *r, SQLResult *res) - { - for (ConnectionList::iterator i = Connections.begin(); i != Connections.end(); i++) - { - if ((i->GetID() == r->GetConnID()) && (i->IsEnabled())) - { - bool xr = i->QueryResult(r->GetQuery()); - if (!xr) - { - res->SetType(SQL_ERROR); - res->SetError(i->GetError()); - return; - } - res->SetType(SQL_OK); - return; - } - } - } - void CountType(SQLRequest *r, SQLResult* res) +void LoadDatabases(ConfigReader* ThisConf, Server* Srv) +{ + Srv->Log(DEFAULT,"SQL: Loading database settings"); + Connections.clear(); + Srv->Log(DEBUG,"Cleared connections"); + for (int j =0; j < ThisConf->Enumerate("database"); j++) { - for (ConnectionList::iterator i = Connections.begin(); i != Connections.end(); i++) + std::string db = ThisConf->ReadValue("database","name",j); + std::string user = ThisConf->ReadValue("database","username",j); + std::string pass = ThisConf->ReadValue("database","password",j); + std::string host = ThisConf->ReadValue("database","hostname",j); + std::string id = ThisConf->ReadValue("database","id",j); + Srv->Log(DEBUG,"Read database settings"); + if ((db != "") && (host != "") && (user != "") && (id != "") && (pass != "")) { - if ((i->GetID() == r->GetConnID()) && (i->IsEnabled())) - { - res->SetType(SQL_COUNT); - res->SetCount(i->QueryCount(r->GetQuery())); - return; - } + SQLConnection* ThisSQL = new SQLConnection(host,user,pass,db,atoi(id.c_str())); + Srv->Log(DEFAULT,"Loaded database: "+ThisSQL->GetHost()); + Connections[id] = ThisSQL; + Srv->Log(DEBUG,"Pushed back connection"); } } + ConnectDatabases(Srv); +} - void DoneType(SQLRequest *r, SQLResult* res) - { - for (ConnectionList::iterator i = Connections.begin(); i != Connections.end(); i++) - { - if ((i->GetID() == r->GetConnID()) && (i->IsEnabled())) - { - res->SetType(SQL_DONE); - if (!i->QueryDone()) - res->SetType(SQL_ERROR); - } - } - } +void* DispatcherThread(void* arg); - void RowType(SQLRequest *r, SQLResult* res) - { - for (ConnectionList::iterator i = Connections.begin(); i != Connections.end(); i++) - { - if ((i->GetID() == r->GetConnID()) && (i->IsEnabled())) - { - log(DEBUG,"*** FOUND MATCHING ROW"); - std::map<std::string,std::string> row = i->GetRow(); - res->SetRow(row); - res->SetType(SQL_ROW); - if (!row.size()) - { - log(DEBUG,"ROW SIZE IS 0"); - res->SetType(SQL_END); - } - return; - } - } - } +class ModuleSQL : public Module +{ + public: + Server *Srv; + ConfigReader *Conf; + pthread_t Dispatcher; void Implements(char* List) { @@ -323,27 +376,6 @@ class ModuleSQL : public Module char* OnRequest(Request* request) { - if (request) - { - SQLResult* Result = new SQLResult(); - SQLRequest *r = (SQLRequest*)request->GetData(); - switch (r->GetQueryType()) - { - case SQL_RESULT: - ResultType(r,Result); - break; - case SQL_COUNT: - CountType(r,Result); - break; - case SQL_ROW: - RowType(r,Result); - break; - case SQL_DONE: - DoneType(r,Result); - break; - } - return (char*)Result; - } return NULL; } @@ -352,29 +384,41 @@ class ModuleSQL : public Module { Srv = Me; Conf = new ConfigReader(); - LoadDatabases(Conf); + pthread_attr_t attribs; + pthread_attr_init(&attribs); + pthread_attr_setdetachstate(&attribs, PTHREAD_CREATE_DETACHED); + if (pthread_create(&this->Dispatcher, &attribs, DispatcherThread, (void *)this) != 0) + { + log(DEBUG,"m_mysql: Failed to create dispatcher thread: %s", strerror(errno)); + } } virtual ~ModuleSQL() { - Connections.clear(); DELETE(Conf); } virtual void OnRehash(const std::string ¶meter) { - DELETE(Conf); - Conf = new ConfigReader(); - LoadDatabases(Conf); + /* TODO: set rehash bool here, which makes the dispatcher thread rehash at next opportunity */ } virtual Version GetVersion() { - return Version(1,0,0,0,VF_VENDOR|VF_SERVICEPROVIDER); + return Version(1,1,0,0,VF_VENDOR|VF_SERVICEPROVIDER); } }; +void* DispatcherThread(void* arg) +{ + ModuleSQL* thismodule = (ModuleSQL*)arg; + LoadDatabases(thismodule->Conf, thismodule->Srv); + + return NULL; +} + + // stuff down here is the module-factory stuff. For basic modules you can ignore this. class ModuleSQLFactory : public ModuleFactory |