summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbrain <brain@e03df62e-2008-0410-955e-edbf42e46eb7>2005-05-23 17:52:46 +0000
committerbrain <brain@e03df62e-2008-0410-955e-edbf42e46eb7>2005-05-23 17:52:46 +0000
commit45b07a069108d661f7d3b63b040e4db5166a2dd8 (patch)
treea7905796cc4d41f5b10fe735232bed1d8ea55828
parentb8e97ead2880337ea270ed36b785b6103c46a5c2 (diff)
Output buffering on server connections
git-svn-id: http://svn.inspircd.org/repository/trunk/inspircd@1475 e03df62e-2008-0410-955e-edbf42e46eb7
-rw-r--r--include/connection.h35
-rw-r--r--include/servers.h4
-rw-r--r--src/connection.cpp68
-rw-r--r--src/inspircd.cpp2
-rw-r--r--src/servers.cpp37
5 files changed, 136 insertions, 10 deletions
diff --git a/include/connection.h b/include/connection.h
index ef18ec7c0..420839b0d 100644
--- a/include/connection.h
+++ b/include/connection.h
@@ -85,6 +85,14 @@ class ircd_connector : public Extensible
*/
std::string version;
+ /** SendQ of the outbound connector, does not have a limit
+ */
+ std::string sendq;
+
+ /** Write error of connection
+ */
+ std::string WriteError;
+
public:
/** IRCD Buffer for input characters, holds as many lines as are
@@ -111,7 +119,10 @@ class ircd_connector : public Extensible
* whilever both servers are connected to B.
*/
std::vector<std::string> routes;
-
+
+ /** Constructor clears the sendq and initialises the fd to -1
+ */
+ ircd_connector();
/** Create an outbound connection to a listening socket
*/
@@ -204,6 +215,28 @@ class ircd_connector : public Extensible
* If the server has no version string an empty string is returned.
*/
std::string GetVersionString();
+
+ /** Adds data to the connection's sendQ to be flushed later
+ * Fails if there is an error pending on the connection.
+ */
+ bool AddWriteBuf(std::string data);
+
+ /** Flushes as much of the data from the buffer as possible,
+ * and advances the queue pointer to what is left.
+ */
+ bool FlushWriteBuf();
+
+ /** Sets the error string for this connection
+ */
+ void SetWriteError(std::string error);
+
+ /** Gets the error string for this connection
+ */
+ std::string GetWriteError();
+
+ /** Returns true if there is data to be written that hasn't been sent yet
+ */
+ bool HasBufferedOutput();
};
diff --git a/include/servers.h b/include/servers.h
index e7d7f2547..e9940704c 100644
--- a/include/servers.h
+++ b/include/servers.h
@@ -116,6 +116,10 @@ class serverrec : public connection
* (reserved for core use)
*/
bool AddIncoming(int fd,char* targethost, int sourceport);
+
+ /** Flushes all data waiting to be written for all of this server's connections
+ */
+ void FlushWriteBuffers();
};
#endif
diff --git a/src/connection.cpp b/src/connection.cpp
index 222251bb4..7d5df66f9 100644
--- a/src/connection.cpp
+++ b/src/connection.cpp
@@ -72,10 +72,18 @@ std::string CreateSum()
connection::connection()
{
- fd = 0;
+ fd = -1;
}
+ircd_connector::ircd_connector()
+{
+ fd = -1;
+ port = 0;
+ sendq = "";
+ WriteError = "";
+}
+
char* ircd_connector::GetServerIP()
{
return this->host;
@@ -154,6 +162,59 @@ std::string ircd_connector::GetBuffer()
return ret;
}
+bool ircd_connector::AddWriteBuf(std::string data)
+{
+ log(DEBUG,"connector::AddWriteBuf(%s)",data.c_str());
+ if (this->GetWriteError() != "")
+ return false;
+ std::stringstream stream;
+ stream << sendq << data;
+ sendq = stream.str();
+ return true;
+}
+
+bool ircd_connector::HasBufferedOutput()
+{
+ return (sendq.length() > 0);
+}
+
+// send AS MUCH OF THE USERS SENDQ as we are able to (might not be all of it)
+bool ircd_connector::FlushWriteBuf()
+{
+ log(DEBUG,"connector::FlushWriteBuf()");
+ if (sendq.length())
+ {
+ char* tb = (char*)this->sendq.c_str();
+ int n_sent = write(this->fd,tb,this->sendq.length());
+ if (n_sent == -1)
+ {
+ this->SetWriteError(strerror(errno));
+ return false;
+ }
+ else
+ {
+ log(DEBUG,"Wrote %d chars to socket",n_sent);
+ // advance the queue
+ tb += n_sent;
+ this->sendq = tb;
+ return true;
+ }
+ }
+ return true;
+}
+
+void ircd_connector::SetWriteError(std::string error)
+{
+ if (this->WriteError == "")
+ this->WriteError = error;
+}
+
+std::string ircd_connector::GetWriteError()
+{
+ return this->WriteError;
+}
+
+
bool ircd_connector::MakeOutboundConnection(char* newhost, int newport)
{
log(DEBUG,"MakeOutboundConnection: Original param: %s",newhost);
@@ -255,11 +316,8 @@ void ircd_connector::SetState(int newstate)
void ircd_connector::CloseConnection()
{
- int flags = fcntl(this->fd, F_GETFL, 0);
- fcntl(this->fd, F_SETFL, flags ^ O_NONBLOCK);
+ shutdown(this->fd,2);
close(this->fd);
- flags = fcntl(this->fd, F_GETFL, 0);
- fcntl(this->fd, F_SETFL, flags | O_NONBLOCK);
}
void ircd_connector::SetDescriptor(int newfd)
diff --git a/src/inspircd.cpp b/src/inspircd.cpp
index ef9835fd4..132832bc1 100644
--- a/src/inspircd.cpp
+++ b/src/inspircd.cpp
@@ -2881,6 +2881,8 @@ int InspIRCd(char** argv, int argc)
std::deque<std::string> sums;
for (int x = 0; x < SERVERportCount; x++)
{
+ if (me[x])
+ me[x]->FlushWriteBuffers();
sums.clear();
msgs.clear();
while ((me[x]) && (me[x]->RecvPacket(msgs, tcp_host, sums))) // returns 0 or more lines (can be multiple lines!)
diff --git a/src/servers.cpp b/src/servers.cpp
index fe29948bc..d2ace18e8 100644
--- a/src/servers.cpp
+++ b/src/servers.cpp
@@ -246,6 +246,23 @@ ircd_connector* serverrec::FindHost(std::string findhost)
return NULL;
}
+void serverrec::FlushWriteBuffers()
+{
+ for (int i = 0; i < this->connectors.size(); i++)
+ {
+ if (this->connectors[i].HasBufferedOutput())
+ {
+ if (!this->connectors[i].FlushWriteBuf())
+ {
+ // if we're here the write() caused an error, we cannot proceed
+ WriteOpers("*** Lost single connection to %s, link inactive and retrying: %s",this->connectors[i].GetServerName().c_str(),this->connectors[i].GetWriteError().c_str());
+ this->connectors[i].CloseConnection();
+ this->connectors[i].SetState(STATE_DISCONNECTED);
+ }
+ }
+ }
+}
+
bool serverrec::SendPacket(char *message, const char* sendhost)
{
if ((!message) || (!sendhost))
@@ -264,7 +281,6 @@ bool serverrec::SendPacket(char *message, const char* sendhost)
if (cn->GetState() == STATE_DISCONNECTED)
{
- log(DEBUG,"\n\n\n\nMain route to %s is down, seeking alternative\n\n\n\n",sendhost);
// fix: can only route one hop to avoid a loop
if (strncmp(message,"R ",2))
{
@@ -289,22 +305,35 @@ bool serverrec::SendPacket(char *message, const char* sendhost)
}
char buffer[MAXBUF];
snprintf(buffer,MAXBUF,"& %s",sendhost);
+ WriteOpers("*** All connections to %s lost.",sendhost);
NetSendToAllExcept(sendhost,buffer);
- log(DEBUG,"\n\nThere are no routes to %s, we're gonna boot the server off!\n\n",sendhost);
DoSplit(sendhost);
return false;
}
// returns false if the packet could not be sent (e.g. target host down)
- if (send(cn->GetDescriptor(),message,strlen(message),0)<0)
+ if (!cn->AddWriteBuf(message))
{
- log(DEBUG,"send() failed for serverrec::SendPacket(): %s",strerror(errno));
+ // if we're here, there was an error pending, and the send cannot proceed
+ log(DEBUG,"cn->AddWriteBuf() failed for serverrec::SendPacket(): %s",cn->GetWriteError().c_str());
log(DEBUG,"Disabling connector: %s",cn->GetServerName().c_str());
cn->CloseConnection();
cn->SetState(STATE_DISCONNECTED);
+ WriteOpers("*** Lost single connection to %s, link inactive and retrying: %s",cn->GetServerName().c_str(),cn->GetWriteError().c_str());
// retry the packet along a new route so either arrival OR failure are gauranteed (bugfix)
return this->SendPacket(message,sendhost);
}
+ if (!cn->FlushWriteBuf())
+ {
+ // if we're here the write() caused an error, we cannot proceed
+ log(DEBUG,"cn->FlushWriteBuf() failed for serverrec::SendPacket(): %s",cn->GetWriteError().c_str());
+ log(DEBUG,"Disabling connector: %s",cn->GetServerName().c_str());
+ cn->CloseConnection();
+ cn->SetState(STATE_DISCONNECTED);
+ WriteOpers("*** Lost single connection to %s, link inactive and retrying: %s",cn->GetServerName().c_str(),cn->GetWriteError().c_str());
+ // retry the packet along a new route so either arrival OR failure are gauranteed
+ return this->SendPacket(message,sendhost);
+ }
return true;
}
}