summaryrefslogtreecommitdiff
path: root/src/inspsocket.cpp
diff options
context:
space:
mode:
authordanieldg <danieldg@e03df62e-2008-0410-955e-edbf42e46eb7>2009-09-26 01:43:09 +0000
committerdanieldg <danieldg@e03df62e-2008-0410-955e-edbf42e46eb7>2009-09-26 01:43:09 +0000
commit939cb0ba987f927f1ad900d45f98ba6d8e03e9d4 (patch)
treef35510ff56e01ece2a0417c9a6fbad17fb7acfee /src/inspsocket.cpp
parentc8026bc2d73344e1df526f0a80694046efa4b22a (diff)
Clean up SocketEngine interface to allow edge-triggered I/O and sockets that do not force readability.
git-svn-id: http://svn.inspircd.org/repository/trunk/inspircd@11760 e03df62e-2008-0410-955e-edbf42e46eb7
Diffstat (limited to 'src/inspsocket.cpp')
-rw-r--r--src/inspsocket.cpp147
1 files changed, 95 insertions, 52 deletions
diff --git a/src/inspsocket.cpp b/src/inspsocket.cpp
index 907acea67..6348d7982 100644
--- a/src/inspsocket.cpp
+++ b/src/inspsocket.cpp
@@ -28,7 +28,7 @@ BufferedSocket::BufferedSocket(int newfd)
this->fd = newfd;
this->state = I_CONNECTED;
if (fd > -1)
- ServerInstance->SE->AddFd(this);
+ ServerInstance->SE->AddFd(this, FD_WANT_FAST_READ | FD_WANT_EDGE_WRITE);
}
void BufferedSocket::DoConnect(const std::string &ipaddr, int aport, unsigned long maxtime, const std::string &connectbindip)
@@ -97,7 +97,7 @@ BufferedSocketError BufferedSocket::BeginConnect(const irc::sockets::sockaddrs&
this->state = I_CONNECTING;
- if (!ServerInstance->SE->AddFd(this, true))
+ if (!ServerInstance->SE->AddFd(this, FD_WANT_NO_READ | FD_WANT_POLL_WRITE))
return I_ERR_NOMOREFDS;
this->Timeout = new SocketTimeout(this->GetFd(), this, timeout, ServerInstance->Time());
@@ -178,18 +178,35 @@ void StreamSocket::DoRead()
{
char* ReadBuffer = ServerInstance->GetReadBuffer();
int n = recv(fd, ReadBuffer, ServerInstance->Config->NetBufferSize, 0);
- if (n > 0)
+ if (n == ServerInstance->Config->NetBufferSize)
{
+ ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_READ | FD_ADD_TRIAL_READ);
+ recvq.append(ReadBuffer, n);
+ OnDataReady();
+ }
+ else if (n > 0)
+ {
+ ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_READ);
recvq.append(ReadBuffer, n);
OnDataReady();
}
else if (n == 0)
{
error = "Connection closed";
+ ServerInstance->SE->ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE);
+ }
+ else if (errno == EAGAIN)
+ {
+ ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_READ | FD_READ_WILL_BLOCK);
}
- else if (errno != EAGAIN && errno != EINTR)
+ else if (errno == EINTR)
+ {
+ ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_READ | FD_ADD_TRIAL_READ);
+ }
+ else
{
error = strerror(errno);
+ ServerInstance->SE->ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE);
}
}
}
@@ -255,58 +272,90 @@ void StreamSocket::DoWrite()
}
else
{
- // Prepare a writev() call to write all buffers efficiently
- int bufcount = sendq.size();
+ bool again = true;
+ while (again)
+ {
+ again = false;
+
+ // Prepare a writev() call to write all buffers efficiently
+ int bufcount = sendq.size();
- // cap the number of buffers at IOV_MAX
- if (bufcount > IOV_MAX)
- bufcount = IOV_MAX;
+ // cap the number of buffers at IOV_MAX
+ if (bufcount > IOV_MAX)
+ {
+ bufcount = IOV_MAX;
+ again = true;
+ }
- iovec* iovecs = new iovec[bufcount];
- for(int i=0; i < bufcount; i++)
- {
- iovecs[i].iov_base = const_cast<char*>(sendq[i].data());
- iovecs[i].iov_len = sendq[i].length();
- }
- int rv = writev(fd, iovecs, bufcount);
- delete[] iovecs;
- if (rv == (int)sendq_len)
- {
- // it's our lucky day, everything got written out. Fast cleanup.
- sendq_len = 0;
- sendq.clear();
- }
- else if (rv > 0)
- {
- // Partial write. Clean out strings from the sendq
- sendq_len -= rv;
- while (rv > 0 && !sendq.empty())
+ iovec* iovecs = new iovec[bufcount];
+ for(int i=0; i < bufcount; i++)
{
- std::string& front = sendq.front();
- if (front.length() < (size_t)rv)
- {
- // this string got fully written out
- rv -= front.length();
- sendq.pop_front();
- }
- else
+ iovecs[i].iov_base = const_cast<char*>(sendq[i].data());
+ iovecs[i].iov_len = sendq[i].length();
+ }
+ int rv = writev(fd, iovecs, bufcount);
+ delete[] iovecs;
+
+ if (rv == (int)sendq_len)
+ {
+ // it's our lucky day, everything got written out. Fast cleanup.
+ // This won't ever happen if the number of buffers got capped.
+ sendq_len = 0;
+ sendq.clear();
+ }
+ else if (rv > 0)
+ {
+ // Partial write. Clean out strings from the sendq
+ sendq_len -= rv;
+ while (rv > 0 && !sendq.empty())
{
- // stopped in the middle of this string
- front = front.substr(rv);
- rv = 0;
+ std::string& front = sendq.front();
+ if (front.length() < (size_t)rv)
+ {
+ // this string got fully written out
+ rv -= front.length();
+ sendq.pop_front();
+ }
+ else
+ {
+ // stopped in the middle of this string
+ front = front.substr(rv);
+ rv = 0;
+ }
}
}
+ else if (rv == 0)
+ {
+ error = "Connection closed";
+ }
+ else if (errno == EAGAIN)
+ {
+ again = false;
+ }
+ else if (errno == EINTR)
+ {
+ again = true;
+ }
+ else
+ {
+ error = strerror(errno);
+ }
}
- else if (rv == 0)
+ if (!error.empty())
{
- error = "Connection closed";
+ // error - kill all events
+ ServerInstance->SE->ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE);
}
- else if (errno != EAGAIN && errno != EINTR)
+ else if (sendq_len)
{
- error = strerror(errno);
+ // writes have blocked, we can use FAST_WRITE to find when they unblock
+ ServerInstance->SE->ChangeEventMask(this, FD_WANT_FAST_WRITE | FD_WRITE_WILL_BLOCK);
+ }
+ else
+ {
+ // writes are done, we can use EDGE_WRITE to stop asking for write
+ ServerInstance->SE->ChangeEventMask(this, FD_WANT_EDGE_WRITE);
}
- if (sendq_len && error.empty())
- ServerInstance->SE->WantWrite(this);
}
}
@@ -318,18 +367,12 @@ void StreamSocket::WriteData(const std::string &data)
data.c_str());
return;
}
- bool newWrite = sendq.empty() && !data.empty();
/* Append the data to the back of the queue ready for writing */
sendq.push_back(data);
sendq_len += data.length();
- if (newWrite)
- {
- // TODO perhaps we should try writing first, before asking SE about writes?
- // DoWrite();
- ServerInstance->SE->WantWrite(this);
- }
+ ServerInstance->SE->ChangeEventMask(this, FD_ADD_TRIAL_WRITE);
}
void SocketTimeout::Tick(time_t)