summaryrefslogtreecommitdiff
path: root/src/modules/extra
diff options
context:
space:
mode:
Diffstat (limited to 'src/modules/extra')
-rw-r--r--src/modules/extra/m_mysql.cpp87
1 files changed, 81 insertions, 6 deletions
diff --git a/src/modules/extra/m_mysql.cpp b/src/modules/extra/m_mysql.cpp
index 844e74c30..a6105c33d 100644
--- a/src/modules/extra/m_mysql.cpp
+++ b/src/modules/extra/m_mysql.cpp
@@ -41,6 +41,7 @@ typedef std::map<std::string, SQLConnection*> ConnMap;
bool giveup = false;
static Module* SQLModule = NULL;
static Notifier* MessagePipe = NULL;
+int QueueFD = -1;
#if !defined(MYSQL_VERSION_ID) || MYSQL_VERSION_ID<32224
@@ -571,7 +572,8 @@ void LoadDatabases(ConfigReader* ThisConf, Server* Srv)
void NotifyMainThread(SQLConnection* connection_with_new_result)
{
- /* Here we connect() to the socket the main thread has open.
+ /* Here we write() to the socket the main thread has open
+ * and we connect()ed back to before our thread became active.
* The main thread is using a nonblocking socket tied into
* the socket engine, so they wont block and they'll receive
* nearly instant notification. Because we're in a seperate
@@ -580,6 +582,8 @@ void NotifyMainThread(SQLConnection* connection_with_new_result)
* connection back.
*/
log(DEBUG,"Notify of result on connection: %s",connection_with_new_result->GetID().c_str());
+ write(QueueFD, connection_with_new_result->GetID().c_str(), connection_with_new_result->GetID().length()+1); // add one for null terminator
+ log(DEBUG,"Sent it on its way via fd=%d",QueueFD);
}
void* DispatcherThread(void* arg);
@@ -588,20 +592,64 @@ class Notifier : public InspSocket
{
sockaddr_in sock_us;
socklen_t uslen;
+ Server* Srv;
public:
- Notifier() : InspSocket("127.0.0.1", 0, true, 3000)
+ /* Create a socket on a random port. Let the tcp stack allocate us an available port */
+ Notifier(Server* S) : InspSocket("127.0.0.1", 0, true, 3000), Srv(S)
{
+ uslen = sizeof(sock_us);
if (getsockname(this->fd,(sockaddr*)&sock_us,&uslen))
{
throw ModuleException("Could not create random listening port on localhost");
}
}
- std::string GetPort()
+ Notifier(int newfd, char* ip, Server* S) : Srv(S)
{
- return ConvToStr(ntohs(sock_us.sin_port));
+ log(DEBUG,"Constructor of new socket");
+ }
+
+ /* Using getsockname and ntohs, we can determine which port number we were allocated */
+ int GetPort()
+ {
+ return ntohs(sock_us.sin_port);
+ }
+
+ virtual int OnIncomingConnection(int newsock, char* ip)
+ {
+ log(DEBUG,"Inbound connection!");
+ Notifier* n = new Notifier(newsock, ip, Srv);
+ Srv->AddSocket(n);
+ return true;
+ }
+
+ virtual bool OnDataReady()
+ {
+ log(DEBUG,"Inbound data!");
+ char* data = this->Read();
+ ConnMap::iterator iter;
+
+ if (data && *data)
+ {
+ log(DEBUG,"Looking for connection %s",data);
+ /* We expect to be sent a null terminated string */
+ if((iter = Connections.find(data)) != Connections.end())
+ {
+ log(DEBUG,"Found it!");
+
+ /* Lock the mutex, send back the data */
+ pthread_mutex_lock(&results_mutex);
+ ResultQueue::iterator n = iter->second->rq.begin();
+ (*n)->Send();
+ iter->second->rq.pop_front();
+ pthread_mutex_unlock(&results_mutex);
+ return true;
+ }
+ }
+
+ return false;
}
};
@@ -670,8 +718,9 @@ class ModuleSQL : public Module
currid = 0;
SQLModule = this;
- MessagePipe = new Notifier();
- log(DEBUG,"Bound notifier to 127.0.0.1:%s",MessagePipe->GetPort().c_str());
+ MessagePipe = new Notifier(Srv);
+ Srv->AddSocket(MessagePipe);
+ log(DEBUG,"Bound notifier to 127.0.0.1:%d",MessagePipe->GetPort());
pthread_attr_t attribs;
pthread_attr_init(&attribs);
@@ -706,6 +755,31 @@ void* DispatcherThread(void* arg)
ModuleSQL* thismodule = (ModuleSQL*)arg;
LoadDatabases(thismodule->Conf, thismodule->Srv);
+ /* Connect back to the Notifier */
+
+ if ((QueueFD = socket(AF_INET, SOCK_STREAM, 0)) == -1)
+ {
+ /* crap, we're out of sockets... */
+ log(DEBUG,"QueueFD cant be created");
+ return NULL;
+ }
+
+ log(DEBUG,"Initialize QueueFD to %d",QueueFD);
+
+ sockaddr_in addr;
+ in_addr ia;
+ inet_aton("127.0.0.1", &ia);
+ addr.sin_family = AF_INET;
+ addr.sin_addr = ia;
+ addr.sin_port = htons(MessagePipe->GetPort());
+
+ if (connect(QueueFD, (sockaddr*)&addr,sizeof(addr)) == -1)
+ {
+ /* wtf, we cant connect to it, but we just created it! */
+ log(DEBUG,"QueueFD cant connect!");
+ return NULL;
+ }
+
while (!giveup)
{
SQLConnection* conn = NULL;
@@ -725,6 +799,7 @@ void* DispatcherThread(void* arg)
/* Theres an item! */
if (conn)
{
+ log(DEBUG,"Process Leading query");
conn->DoLeadingQuery();
/* XXX: Lock */