summaryrefslogtreecommitdiff
path: root/src/modules
diff options
context:
space:
mode:
Diffstat (limited to 'src/modules')
-rw-r--r--src/modules/extra/m_mysql.cpp139
1 files changed, 95 insertions, 44 deletions
diff --git a/src/modules/extra/m_mysql.cpp b/src/modules/extra/m_mysql.cpp
index 08f71c929..ae219df70 100644
--- a/src/modules/extra/m_mysql.cpp
+++ b/src/modules/extra/m_mysql.cpp
@@ -67,16 +67,23 @@ class SQLConnection;
class MySQLresult;
class DispatcherThread;
-struct QueueItem
+struct QQueueItem
{
SQLQuery* q;
SQLConnection* c;
- QueueItem(SQLQuery* Q, SQLConnection* C) : q(Q), c(C) {}
+ QQueueItem(SQLQuery* Q, SQLConnection* C) : q(Q), c(C) {}
+};
+
+struct RQueueItem
+{
+ SQLQuery* q;
+ MySQLresult* r;
+ RQueueItem(SQLQuery* Q, MySQLresult* R) : q(Q), r(R) {}
};
typedef std::map<std::string, SQLConnection*> ConnMap;
-typedef std::deque<QueueItem> QueryQueue;
-typedef std::deque<MySQLresult*> ResultQueue;
+typedef std::deque<QQueueItem> QueryQueue;
+typedef std::deque<RQueueItem> ResultQueue;
/** MySQL module
* */
@@ -84,14 +91,15 @@ class ModuleSQL : public Module
{
public:
DispatcherThread* Dispatcher;
- QueryQueue qq;
- ResultQueue rq;
- ConnMap connections;
+ QueryQueue qq; // MUST HOLD MUTEX
+ ResultQueue rq; // MUST HOLD MUTEX
+ ConnMap connections; // main thread only
ModuleSQL();
void init();
~ModuleSQL();
void OnRehash(User* user);
+ void OnUnloadModule(Module* mod);
Version GetVersion();
};
@@ -115,14 +123,13 @@ class DispatcherThread : public SocketThread
class MySQLresult : public SQLResult
{
public:
- SQLQuery* query;
SQLerror err;
int currentrow;
int rows;
std::vector<std::string> colnames;
std::vector<SQLEntries> fieldlists;
- MySQLresult(SQLQuery* q, MYSQL_RES* res, int affected_rows) : query(q), err(SQL_NO_ERROR), currentrow(0), rows(0)
+ MySQLresult(MYSQL_RES* res, int affected_rows) : err(SQL_NO_ERROR), currentrow(0), rows(0)
{
if (affected_rows >= 1)
{
@@ -166,7 +173,7 @@ class MySQLresult : public SQLResult
}
}
- MySQLresult(SQLQuery* q, SQLerror& e) : query(q), err(e)
+ MySQLresult(SQLerror& e) : err(e)
{
}
@@ -217,11 +224,11 @@ class SQLConnection : public SQLProvider
public:
reference<ConfigTag> config;
MYSQL *connection;
- bool active;
+ Mutex lock;
// This constructor creates an SQLConnection object with the given credentials, but does not connect yet.
SQLConnection(Module* p, ConfigTag* tag) : SQLProvider(p, "SQL/" + tag->getString("id")),
- config(tag), active(false)
+ config(tag)
{
}
@@ -310,30 +317,23 @@ class SQLConnection : public SQLProvider
return (ModuleSQL*)(Module*)creator;
}
- void DoBlockingQuery(SQLQuery* req)
+ MySQLresult* DoBlockingQuery(SQLQuery* req)
{
+
/* Parse the command string and dispatch it to mysql */
if (CheckConnection() && !mysql_real_query(connection, req->query.data(), req->query.length()))
{
/* Successfull query */
MYSQL_RES* res = mysql_use_result(connection);
unsigned long rows = mysql_affected_rows(connection);
- MySQLresult* r = new MySQLresult(req, res, rows);
- Parent()->Dispatcher->LockQueue();
- Parent()->rq.push_back(r);
- Parent()->Dispatcher->NotifyParent();
- Parent()->Dispatcher->UnlockQueue();
+ return new MySQLresult(res, rows);
}
else
{
/* XXX: See /usr/include/mysql/mysqld_error.h for a list of
* possible error numbers and error messages */
SQLerror e(SQL_QREPLY_FAIL, ConvToStr(mysql_errno(connection)) + std::string(": ") + mysql_error(connection));
- MySQLresult* r = new MySQLresult(req, e);
- Parent()->Dispatcher->LockQueue();
- Parent()->rq.push_back(r);
- Parent()->Dispatcher->NotifyParent();
- Parent()->Dispatcher->UnlockQueue();
+ return new MySQLresult(e);
}
}
@@ -359,7 +359,7 @@ class SQLConnection : public SQLProvider
void submit(SQLQuery* q)
{
Parent()->Dispatcher->LockQueue();
- Parent()->qq.push_back(QueueItem(q, this));
+ Parent()->qq.push_back(QQueueItem(q, this));
Parent()->Dispatcher->UnlockQueueWakeup();
}
};
@@ -374,8 +374,8 @@ void ModuleSQL::init()
Dispatcher = new DispatcherThread(this);
ServerInstance->Threads->Start(Dispatcher);
- Implementation eventlist[] = { I_OnRehash };
- ServerInstance->Modules->Attach(eventlist, this, 1);
+ Implementation eventlist[] = { I_OnRehash, I_OnUnloadModule };
+ ServerInstance->Modules->Attach(eventlist, this, 2);
}
ModuleSQL::~ModuleSQL()
@@ -394,7 +394,6 @@ ModuleSQL::~ModuleSQL()
void ModuleSQL::OnRehash(User* user)
{
- Dispatcher->LockQueue();
ConnMap conns;
ConfigTagList tags = ServerInstance->Config->ConfTags("database");
for(ConfigIter i = tags.first; i != tags.second; i++)
@@ -415,21 +414,56 @@ void ModuleSQL::OnRehash(User* user)
connections.erase(curr);
}
}
+
+ // now clean up the deleted databases
+ Dispatcher->LockQueue();
+ SQLerror err(SQL_BAD_DBID);
for(ConnMap::iterator i = connections.begin(); i != connections.end(); i++)
{
- if (i->second->active)
+ ServerInstance->Modules->DelService(*i->second);
+ // it might be running a query on this database. Wait for that to complete
+ i->second->lock.Lock();
+ i->second->lock.Unlock();
+ // now remove all active queries to this DB
+ for(unsigned int j = qq.size() - 1; j >= 0; j--)
{
- // can't delete it now. Next rehash will try to kill it again
- conns.insert(*i);
+ if (qq[j].c == i->second)
+ {
+ qq[j].q->OnError(err);
+ delete qq[j].q;
+ qq.erase(qq.begin() + j);
+ }
}
- else
+ // finally, nuke the connection
+ delete i->second;
+ }
+ Dispatcher->UnlockQueue();
+ connections.swap(conns);
+}
+
+void ModuleSQL::OnUnloadModule(Module* mod)
+{
+ SQLerror err(SQL_BAD_DBID);
+ Dispatcher->LockQueue();
+ for(unsigned int i = qq.size() - 1; i >= 0; i--)
+ {
+ if (qq[i].q->creator == mod)
{
- ServerInstance->Modules->DelService(*i->second);
- delete i->second;
+ if (i == 0)
+ {
+ // need to wait until the query is done
+ // (the result will be discarded)
+ qq[i].c->lock.Lock();
+ qq[i].c->lock.Unlock();
+ }
+ qq[i].q->OnError(err);
+ delete qq[i].q;
+ qq.erase(qq.begin() + i);
}
}
- connections.swap(conns);
Dispatcher->UnlockQueue();
+ // clean up any result queue entries
+ Dispatcher->OnNotify();
}
Version ModuleSQL::GetVersion()
@@ -444,13 +478,30 @@ void DispatcherThread::Run()
{
if (!Parent->qq.empty())
{
- QueueItem i = Parent->qq.front();
- Parent->qq.pop_front();
- i.c->active = true;
+ QQueueItem i = Parent->qq.front();
+ i.c->lock.Lock();
this->UnlockQueue();
- i.c->DoBlockingQuery(i.q);
+ MySQLresult* res = i.c->DoBlockingQuery(i.q);
+ i.c->lock.Unlock();
+
+ /*
+ * At this point, the main thread could be working on:
+ * Rehash - delete i.c out from under us. We don't care about that.
+ * UnloadModule - delete i.q and the qq item. Need to avoid reporting results.
+ */
+
this->LockQueue();
- i.c->active = false;
+ if (Parent->qq.front().q == i.q)
+ {
+ Parent->qq.pop_front();
+ Parent->rq.push_back(RQueueItem(i.q, res));
+ NotifyParent();
+ }
+ else
+ {
+ // UnloadModule ate the query
+ delete res;
+ }
}
else
{
@@ -469,13 +520,13 @@ void DispatcherThread::OnNotify()
this->LockQueue();
for(ResultQueue::iterator i = Parent->rq.begin(); i != Parent->rq.end(); i++)
{
- MySQLresult* res = *i;
+ MySQLresult* res = i->r;
if (res->err.id == SQL_NO_ERROR)
- res->query->OnResult(*res);
+ i->q->OnResult(*res);
else
- res->query->OnError(res->err);
- delete res->query;
- delete res;
+ i->q->OnError(res->err);
+ delete i->q;
+ delete i->r;
}
Parent->rq.clear();
this->UnlockQueue();