summaryrefslogtreecommitdiff
path: root/src/modules
diff options
context:
space:
mode:
authorbrain <brain@e03df62e-2008-0410-955e-edbf42e46eb7>2006-07-21 23:27:47 +0000
committerbrain <brain@e03df62e-2008-0410-955e-edbf42e46eb7>2006-07-21 23:27:47 +0000
commit328132ad334490db9e73c7f732c0b618090fa5b0 (patch)
tree6a76d181baf1cc5db70eb16ded897bc72af5051a /src/modules
parent21fe5137706628f0dc67202caae2a71d1e97e8f6 (diff)
Added notification socket
git-svn-id: http://svn.inspircd.org/repository/trunk/inspircd@4482 e03df62e-2008-0410-955e-edbf42e46eb7
Diffstat (limited to 'src/modules')
-rw-r--r--src/modules/extra/m_mysql.cpp177
1 files changed, 91 insertions, 86 deletions
diff --git a/src/modules/extra/m_mysql.cpp b/src/modules/extra/m_mysql.cpp
index 507702a4a..844e74c30 100644
--- a/src/modules/extra/m_mysql.cpp
+++ b/src/modules/extra/m_mysql.cpp
@@ -34,16 +34,21 @@ using namespace std;
class SQLConnection;
+class Notifier;
extern InspIRCd* ServerInstance;
typedef std::map<std::string, SQLConnection*> ConnMap;
bool giveup = false;
+static Module* SQLModule = NULL;
+static Notifier* MessagePipe = NULL;
#if !defined(MYSQL_VERSION_ID) || MYSQL_VERSION_ID<32224
#define mysql_field_count mysql_num_fields
#endif
+typedef std::deque<SQLresult*> ResultQueue;
+
class QueryQueue : public classbase
{
private:
@@ -157,6 +162,8 @@ private:
/* A mutex to wrap around queue accesses */
pthread_mutex_t queue_mutex = PTHREAD_MUTEX_INITIALIZER;
+pthread_mutex_t results_mutex = PTHREAD_MUTEX_INITIALIZER;
+
class MySQLresult : public SQLresult
{
int currentrow;
@@ -166,6 +173,7 @@ class MySQLresult : public SQLresult
SQLfieldMap* fieldmap;
int rows;
int cols;
+ public:
MySQLresult(Module* self, Module* to, MYSQL_RES* res, int affected_rows) : SQLresult(self, to), currentrow(0), fieldmap(NULL)
{
@@ -207,6 +215,11 @@ class MySQLresult : public SQLresult
log(DEBUG, "Created new MySQL result; %d rows, %d columns", rows, cols);
}
+ MySQLresult(Module* self, Module* to, SQLerror e) : SQLresult(self, to)
+ {
+ error = e;
+ }
+
~MySQLresult()
{
}
@@ -310,6 +323,10 @@ class MySQLresult : public SQLresult
}
};
+class SQLConnection;
+
+void NotifyMainThread(SQLConnection* connection_with_new_result);
+
class SQLConnection : public classbase
{
protected:
@@ -323,15 +340,16 @@ class SQLConnection : public classbase
std::string db;
std::map<std::string,std::string> thisrow;
bool Enabled;
- long id;
+ std::string id;
public:
QueryQueue queue;
+ ResultQueue rq;
// This constructor creates an SQLConnection object with the given credentials, and creates the underlying
// MYSQL struct, but does not connect yet.
- SQLConnection(std::string thishost, std::string thisuser, std::string thispass, std::string thisdb, long myid)
+ SQLConnection(std::string thishost, std::string thisuser, std::string thispass, std::string thisdb, const std::string &myid)
{
this->Enabled = true;
this->host = thishost;
@@ -367,7 +385,7 @@ class SQLConnection : public classbase
unsigned long paramlen;
/* Total length of query, used for binary-safety in mysql_real_query */
- unsigned long querylength;
+ unsigned long querylength = 0;
paramlen = 0;
@@ -432,83 +450,36 @@ class SQLConnection : public classbase
{
/* Successfull query */
res = mysql_use_result(&connection);
- }
-
-
- }
-
- // This method issues a query that expects multiple rows of results. Use GetRow() and QueryDone() to retrieve
- // multiple rows.
- bool QueryResult(std::string query)
- {
- if (!CheckConnection()) return false;
-
- int r = mysql_query(&connection, query.c_str());
- if (!r)
- {
- res = mysql_use_result(&connection);
- }
- return (!r);
- }
-
- // This method issues a query that just expects a number of 'effected' rows (e.g. UPDATE or DELETE FROM).
- // the number of effected rows is returned in the return value.
- long QueryCount(std::string query)
- {
- /* If the connection is down, we return a negative value - New to 1.1 */
- if (!CheckConnection()) return -1;
-
- int r = mysql_query(&connection, query.c_str());
- if (!r)
- {
- res = mysql_store_result(&connection);
unsigned long rows = mysql_affected_rows(&connection);
- mysql_free_result(res);
- return rows;
+ MySQLresult* r = new MySQLresult(SQLModule, req.GetSource(), res, rows);
+ r->dbid = this->GetID();
+ r->query = req.query.q;
+ /* Put this new result onto the results queue.
+ * XXX: Remember to mutex the queue!
+ */
+ pthread_mutex_lock(&results_mutex);
+ rq.push_back(r);
+ pthread_mutex_unlock(&results_mutex);
}
- return 0;
- }
-
- // This method fetches a row, if available from the database. You must issue a query
- // using QueryResult() first! The row's values are returned as a map of std::string
- // where each item is keyed by the column name.
- std::map<std::string,std::string> GetRow()
- {
- thisrow.clear();
- if (res)
+ else
{
- row = mysql_fetch_row(res);
- if (row)
- {
- unsigned int field_count = 0;
- MYSQL_FIELD *fields = mysql_fetch_fields(res);
- if(mysql_field_count(&connection) == 0)
- return thisrow;
- if (fields && mysql_field_count(&connection))
- {
- while (field_count < mysql_field_count(&connection))
- {
- std::string a = (fields[field_count].name ? fields[field_count].name : "");
- std::string b = (row[field_count] ? row[field_count] : "");
- thisrow[a] = b;
- field_count++;
- }
- return thisrow;
- }
- }
+ /* XXX: See /usr/include/mysql/mysqld_error.h for a list of
+ * possible error numbers and error messages */
+ SQLerror e((SQLerrorNum)mysql_errno(&connection), mysql_error(&connection));
+ MySQLresult* r = new MySQLresult(SQLModule, req.GetSource(), e);
+ r->dbid = this->GetID();
+ r->query = req.query.q;
+
+ pthread_mutex_lock(&results_mutex);
+ rq.push_back(r);
+ pthread_mutex_unlock(&results_mutex);
}
- return thisrow;
- }
- bool QueryDone()
- {
- if (res)
- {
- mysql_free_result(res);
- res = NULL;
- return true;
- }
- else return false;
+ /* Now signal the main thread that we've got a result to process.
+ * Pass them this connection id as what to examine
+ */
+
+ NotifyMainThread(this);
}
bool ConnectionLost()
@@ -532,7 +503,7 @@ class SQLConnection : public classbase
return mysql_error(&connection);
}
- long GetID()
+ const std::string& GetID()
{
return id;
}
@@ -542,14 +513,9 @@ class SQLConnection : public classbase
return host;
}
- void Enable()
+ void SetEnable(bool Enable)
{
- Enabled = true;
- }
-
- void Disable()
- {
- Enabled = false;
+ Enabled = Enable;
}
bool IsEnabled()
@@ -565,7 +531,7 @@ void ConnectDatabases(Server* Srv)
{
for (ConnMap::iterator i = Connections.begin(); i != Connections.end(); i++)
{
- i->second->Enable();
+ i->second->SetEnable(true);
if (i->second->Connect())
{
Srv->Log(DEFAULT,"SQL: Successfully connected database "+i->second->GetHost());
@@ -573,7 +539,7 @@ void ConnectDatabases(Server* Srv)
else
{
Srv->Log(DEFAULT,"SQL: Failed to connect database "+i->second->GetHost()+": Error: "+i->second->GetError());
- i->second->Disable();
+ i->second->SetEnable(false);
}
}
}
@@ -594,7 +560,7 @@ void LoadDatabases(ConfigReader* ThisConf, Server* Srv)
Srv->Log(DEBUG,"Read database settings");
if ((db != "") && (host != "") && (user != "") && (id != "") && (pass != ""))
{
- SQLConnection* ThisSQL = new SQLConnection(host,user,pass,db,atoi(id.c_str()));
+ SQLConnection* ThisSQL = new SQLConnection(host,user,pass,db,id);
Srv->Log(DEFAULT,"Loaded database: "+ThisSQL->GetHost());
Connections[id] = ThisSQL;
Srv->Log(DEBUG,"Pushed back connection");
@@ -603,8 +569,42 @@ void LoadDatabases(ConfigReader* ThisConf, Server* Srv)
ConnectDatabases(Srv);
}
+void NotifyMainThread(SQLConnection* connection_with_new_result)
+{
+ /* Here we connect() to the socket the main thread has open.
+ * The main thread is using a nonblocking socket tied into
+ * the socket engine, so they wont block and they'll receive
+ * nearly instant notification. Because we're in a seperate
+ * thread, we can just use standard connect(), and we can
+ * block if we like. We just send the connection id of the
+ * connection back.
+ */
+ log(DEBUG,"Notify of result on connection: %s",connection_with_new_result->GetID().c_str());
+}
+
void* DispatcherThread(void* arg);
+class Notifier : public InspSocket
+{
+ sockaddr_in sock_us;
+ socklen_t uslen;
+
+ public:
+
+ Notifier() : InspSocket("127.0.0.1", 0, true, 3000)
+ {
+ if (getsockname(this->fd,(sockaddr*)&sock_us,&uslen))
+ {
+ throw ModuleException("Could not create random listening port on localhost");
+ }
+ }
+
+ std::string GetPort()
+ {
+ return ConvToStr(ntohs(sock_us.sin_port));
+ }
+};
+
class ModuleSQL : public Module
{
public:
@@ -668,6 +668,11 @@ class ModuleSQL : public Module
Srv = Me;
Conf = new ConfigReader();
currid = 0;
+ SQLModule = this;
+
+ MessagePipe = new Notifier();
+ log(DEBUG,"Bound notifier to 127.0.0.1:%s",MessagePipe->GetPort().c_str());
+
pthread_attr_t attribs;
pthread_attr_init(&attribs);
pthread_attr_setdetachstate(&attribs, PTHREAD_CREATE_DETACHED);