diff options
Diffstat (limited to 'src/inspsocket.cpp')
-rw-r--r-- | src/inspsocket.cpp | 414 |
1 files changed, 204 insertions, 210 deletions
diff --git a/src/inspsocket.cpp b/src/inspsocket.cpp index f22645168..9bfc6a73e 100644 --- a/src/inspsocket.cpp +++ b/src/inspsocket.cpp @@ -23,18 +23,15 @@ #include "inspircd.h" -#include "socket.h" -#include "inspstring.h" -#include "socketengine.h" #include "iohook.h" -#ifndef DISABLE_WRITEV -#include <sys/uio.h> -#endif - -#ifndef IOV_MAX -#define IOV_MAX 1024 -#endif +static IOHook* GetNextHook(IOHook* hook) +{ + IOHookMiddle* const iohm = IOHookMiddle::ToMiddleHook(hook); + if (iohm) + return iohm->GetNextHook(); + return NULL; +} BufferedSocket::BufferedSocket() { @@ -110,7 +107,7 @@ BufferedSocketError BufferedSocket::BeginConnect(const irc::sockets::sockaddrs& if (!SocketEngine::AddFd(this, FD_WANT_NO_READ | FD_WANT_SINGLE_WRITE | FD_WRITE_WILL_BLOCK)) return I_ERR_NOMOREFDS; - this->Timeout = new SocketTimeout(this->GetFd(), this, timeout, ServerInstance->Time()); + this->Timeout = new SocketTimeout(this->GetFd(), this, timeout); ServerInstance->Timers.AddTimer(this->Timeout); ServerInstance->Logs->Log("SOCKET", LOG_DEBUG, "BufferedSocket::DoConnect success"); @@ -123,19 +120,15 @@ void StreamSocket::Close() { // final chance, dump as much of the sendq as we can DoWrite(); - if (GetIOHook()) + + IOHook* hook = GetIOHook(); + DelIOHook(); + while (hook) { - try - { - GetIOHook()->OnStreamSocketClose(this); - } - catch (CoreException& modexcept) - { - ServerInstance->Logs->Log("SOCKET", LOG_DEFAULT, "%s threw an exception: %s", - modexcept.GetSource().c_str(), modexcept.GetReason().c_str()); - } - delete iohook; - DelIOHook(); + hook->OnStreamSocketClose(this); + IOHook* const nexthook = GetNextHook(hook); + delete hook; + hook = nexthook; } SocketEngine::Shutdown(this, 2); SocketEngine::Close(this); @@ -153,67 +146,79 @@ bool StreamSocket::GetNextLine(std::string& line, char delim) std::string::size_type i = recvq.find(delim); if (i == std::string::npos) return false; - line = recvq.substr(0, i); - // TODO is this the most efficient way to split? - recvq = recvq.substr(i + 1); + line.assign(recvq, 0, i); + recvq.erase(0, i + 1); return true; } -void StreamSocket::DoRead() +int StreamSocket::HookChainRead(IOHook* hook, std::string& rq) { - if (GetIOHook()) + if (!hook) + return ReadToRecvQ(rq); + + IOHookMiddle* const iohm = IOHookMiddle::ToMiddleHook(hook); + if (iohm) { - int rv = -1; - try - { - rv = GetIOHook()->OnStreamSocketRead(this, recvq); - } - catch (CoreException& modexcept) - { - ServerInstance->Logs->Log("SOCKET", LOG_DEFAULT, "%s threw an exception: %s", - modexcept.GetSource().c_str(), modexcept.GetReason().c_str()); - return; - } - if (rv > 0) - OnDataReady(); - if (rv < 0) - SetError("Read Error"); // will not overwrite a better error message + // Call the next hook to put data into the recvq of the current hook + const int ret = HookChainRead(iohm->GetNextHook(), iohm->GetRecvQ()); + if (ret <= 0) + return ret; } - else + return hook->OnStreamSocketRead(this, rq); +} + +void StreamSocket::DoRead() +{ + const std::string::size_type prevrecvqsize = recvq.size(); + + const int result = HookChainRead(GetIOHook(), recvq); + if (result < 0) { + SetError("Read Error"); // will not overwrite a better error message + return; + } + + if (recvq.size() > prevrecvqsize) + OnDataReady(); +} + +int StreamSocket::ReadToRecvQ(std::string& rq) +{ char* ReadBuffer = ServerInstance->GetReadBuffer(); int n = SocketEngine::Recv(this, ReadBuffer, ServerInstance->Config->NetBufferSize, 0); if (n == ServerInstance->Config->NetBufferSize) { SocketEngine::ChangeEventMask(this, FD_WANT_FAST_READ | FD_ADD_TRIAL_READ); - recvq.append(ReadBuffer, n); - OnDataReady(); + rq.append(ReadBuffer, n); } else if (n > 0) { SocketEngine::ChangeEventMask(this, FD_WANT_FAST_READ); - recvq.append(ReadBuffer, n); - OnDataReady(); + rq.append(ReadBuffer, n); } else if (n == 0) { error = "Connection closed"; SocketEngine::ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE); + return -1; } else if (SocketEngine::IgnoreError()) { SocketEngine::ChangeEventMask(this, FD_WANT_FAST_READ | FD_READ_WILL_BLOCK); + return 0; } else if (errno == EINTR) { SocketEngine::ChangeEventMask(this, FD_WANT_FAST_READ | FD_ADD_TRIAL_READ); + return 0; } else { error = SocketEngine::LastError(); SocketEngine::ChangeEventMask(this, FD_WANT_NO_READ | FD_WANT_NO_WRITE); + return -1; } - } + return n; } /* Don't try to prepare huge blobs of data to send to a blocked socket */ @@ -221,120 +226,56 @@ static const int MYIOV_MAX = IOV_MAX < 128 ? IOV_MAX : 128; void StreamSocket::DoWrite() { - if (sendq.empty()) + if (getSendQSize() == 0) return; - if (!error.empty() || fd < 0 || fd == INT_MAX) + if (!error.empty() || fd < 0) { ServerInstance->Logs->Log("SOCKET", LOG_DEBUG, "DoWrite on errored or closed socket"); return; } -#ifndef DISABLE_WRITEV - if (GetIOHook()) -#endif + SendQueue* psendq = &sendq; + IOHook* hook = GetIOHook(); + while (hook) { - int rv = -1; - try - { - while (error.empty() && !sendq.empty()) - { - if (sendq.size() > 1 && sendq[0].length() < 1024) - { - // Avoid multiple repeated SSL encryption invocations - // This adds a single copy of the queue, but avoids - // much more overhead in terms of system calls invoked - // by the IOHook. - // - // The length limit of 1024 is to prevent merging strings - // more than once when writes begin to block. - std::string tmp; - tmp.reserve(1280); - while (!sendq.empty() && tmp.length() < 1024) - { - tmp.append(sendq.front()); - sendq.pop_front(); - } - sendq.push_front(tmp); - } - std::string& front = sendq.front(); - int itemlen = front.length(); - if (GetIOHook()) - { - rv = GetIOHook()->OnStreamSocketWrite(this, front); - if (rv > 0) - { - // consumed the entire string, and is ready for more - sendq_len -= itemlen; - sendq.pop_front(); - } - else if (rv == 0) - { - // socket has blocked. Stop trying to send data. - // IOHook has requested unblock notification from the socketengine + int rv = hook->OnStreamSocketWrite(this, *psendq); + psendq = NULL; - // Since it is possible that a partial write took place, adjust sendq_len - sendq_len = sendq_len - itemlen + front.length(); - return; - } - else - { - SetError("Write Error"); // will not overwrite a better error message - return; - } - } -#ifdef DISABLE_WRITEV - else - { - rv = SocketEngine::Send(this, front.data(), itemlen, 0); - if (rv == 0) - { - SetError("Connection closed"); - return; - } - else if (rv < 0) - { - if (errno == EINTR || SocketEngine::IgnoreError()) - SocketEngine::ChangeEventMask(this, FD_WANT_FAST_WRITE | FD_WRITE_WILL_BLOCK); - else - SetError(SocketEngine::LastError()); - return; - } - else if (rv < itemlen) - { - SocketEngine::ChangeEventMask(this, FD_WANT_FAST_WRITE | FD_WRITE_WILL_BLOCK); - front = front.substr(rv); - sendq_len -= rv; - return; - } - else - { - sendq_len -= itemlen; - sendq.pop_front(); - if (sendq.empty()) - SocketEngine::ChangeEventMask(this, FD_WANT_EDGE_WRITE); - } - } -#endif - } + // rv == 0 means the socket has blocked. Stop trying to send data. + // IOHook has requested unblock notification from the socketengine. + if (rv == 0) + break; + + if (rv < 0) + { + SetError("Write Error"); // will not overwrite a better error message + break; } - catch (CoreException& modexcept) + + IOHookMiddle* const iohm = IOHookMiddle::ToMiddleHook(hook); + hook = NULL; + if (iohm) { - ServerInstance->Logs->Log("SOCKET", LOG_DEBUG, "%s threw an exception: %s", - modexcept.GetSource().c_str(), modexcept.GetReason().c_str()); + psendq = &iohm->GetSendQ(); + hook = iohm->GetNextHook(); } } -#ifndef DISABLE_WRITEV - else - { + + if (psendq) + FlushSendQ(*psendq); +} + +void StreamSocket::FlushSendQ(SendQueue& sq) +{ // don't even try if we are known to be blocking if (GetEventMask() & FD_WRITE_WILL_BLOCK) return; // start out optimistic - we won't need to write any more int eventChange = FD_WANT_EDGE_WRITE; - while (error.empty() && sendq_len && eventChange == FD_WANT_EDGE_WRITE) + while (error.empty() && !sq.empty() && eventChange == FD_WANT_EDGE_WRITE) { // Prepare a writev() call to write all buffers efficiently - int bufcount = sendq.size(); + int bufcount = sq.size(); // cap the number of buffers at MYIOV_MAX if (bufcount > MYIOV_MAX) @@ -343,22 +284,25 @@ void StreamSocket::DoWrite() } int rv_max = 0; - iovec* iovecs = new iovec[bufcount]; - for(int i=0; i < bufcount; i++) + int rv; { - iovecs[i].iov_base = const_cast<char*>(sendq[i].data()); - iovecs[i].iov_len = sendq[i].length(); - rv_max += sendq[i].length(); + SocketEngine::IOVector iovecs[MYIOV_MAX]; + size_t j = 0; + for (SendQueue::const_iterator i = sq.begin(), end = i+bufcount; i != end; ++i, j++) + { + const SendQueue::Element& elem = *i; + iovecs[j].iov_base = const_cast<char*>(elem.data()); + iovecs[j].iov_len = elem.length(); + rv_max += elem.length(); + } + rv = SocketEngine::WriteV(this, iovecs, bufcount); } - int rv = writev(fd, iovecs, bufcount); - delete[] iovecs; - if (rv == (int)sendq_len) + if (rv == (int)sq.bytes()) { // it's our lucky day, everything got written out. Fast cleanup. // This won't ever happen if the number of buffers got capped. - sendq_len = 0; - sendq.clear(); + sq.clear(); } else if (rv > 0) { @@ -368,20 +312,19 @@ void StreamSocket::DoWrite() // it's going to block now eventChange = FD_WANT_FAST_WRITE | FD_WRITE_WILL_BLOCK; } - sendq_len -= rv; - while (rv > 0 && !sendq.empty()) + while (rv > 0 && !sq.empty()) { - std::string& front = sendq.front(); + const SendQueue::Element& front = sq.front(); if (front.length() <= (size_t)rv) { // this string got fully written out rv -= front.length(); - sendq.pop_front(); + sq.pop_front(); } else { // stopped in the middle of this string - front = front.substr(rv); + sq.erase_front(rv); rv = 0; } } @@ -413,8 +356,6 @@ void StreamSocket::DoWrite() { SocketEngine::ChangeEventMask(this, eventChange); } - } -#endif } void StreamSocket::WriteData(const std::string &data) @@ -428,7 +369,6 @@ void StreamSocket::WriteData(const std::string &data) /* Append the data to the back of the queue ready for writing */ sendq.push_back(data); - sendq_len += data.length(); SocketEngine::ChangeEventMask(this, FD_ADD_TRIAL_WRITE); } @@ -464,7 +404,7 @@ bool SocketTimeout::Tick(time_t) void BufferedSocket::OnConnected() { } void BufferedSocket::OnTimeout() { return; } -void BufferedSocket::DoWrite() +void BufferedSocket::OnEventHandlerWrite() { if (state == I_CONNECTING) { @@ -473,73 +413,127 @@ void BufferedSocket::DoWrite() if (!GetIOHook()) SocketEngine::ChangeEventMask(this, FD_WANT_FAST_READ | FD_WANT_EDGE_WRITE); } - this->StreamSocket::DoWrite(); + this->StreamSocket::OnEventHandlerWrite(); } BufferedSocket::~BufferedSocket() { this->Close(); - if (Timeout) + // The timer is removed from the TimerManager in Timer::~Timer() + delete Timeout; +} + +void StreamSocket::OnEventHandlerError(int errornum) +{ + if (!error.empty()) + return; + + if (errornum == 0) + SetError("Connection closed"); + else + SetError(SocketEngine::GetError(errornum)); + + BufferedSocketError errcode = I_ERR_OTHER; + switch (errornum) { - // The timer is removed from the TimerManager in Timer::~Timer() - delete Timeout; + case ETIMEDOUT: + errcode = I_ERR_TIMEOUT; + break; + case ECONNREFUSED: + case 0: + errcode = I_ERR_CONNECT; + break; + case EADDRINUSE: + errcode = I_ERR_BIND; + break; + case EPIPE: + case EIO: + errcode = I_ERR_WRITE; + break; } + + // Log and call OnError() + CheckError(errcode); } -void StreamSocket::HandleEvent(EventType et, int errornum) +void StreamSocket::OnEventHandlerRead() { if (!error.empty()) return; - BufferedSocketError errcode = I_ERR_OTHER; - try { - switch (et) - { - case EVENT_ERROR: - { - if (errornum == 0) - SetError("Connection closed"); - else - SetError(SocketEngine::GetError(errornum)); - switch (errornum) - { - case ETIMEDOUT: - errcode = I_ERR_TIMEOUT; - break; - case ECONNREFUSED: - case 0: - errcode = I_ERR_CONNECT; - break; - case EADDRINUSE: - errcode = I_ERR_BIND; - break; - case EPIPE: - case EIO: - errcode = I_ERR_WRITE; - break; - } - break; - } - case EVENT_READ: - { - DoRead(); - break; - } - case EVENT_WRITE: - { - DoWrite(); - break; - } - } + + try + { + DoRead(); } catch (CoreException& ex) { - ServerInstance->Logs->Log("SOCKET", LOG_DEFAULT, "Caught exception in socket processing on FD %d - '%s'", - fd, ex.GetReason().c_str()); + ServerInstance->Logs->Log("SOCKET", LOG_DEFAULT, "Caught exception in socket processing on FD %d - '%s'", fd, ex.GetReason().c_str()); SetError(ex.GetReason()); } + CheckError(I_ERR_OTHER); +} + +void StreamSocket::OnEventHandlerWrite() +{ + if (!error.empty()) + return; + + DoWrite(); + CheckError(I_ERR_OTHER); +} + +void StreamSocket::CheckError(BufferedSocketError errcode) +{ if (!error.empty()) { ServerInstance->Logs->Log("SOCKET", LOG_DEBUG, "Error on FD %d - '%s'", fd, error.c_str()); OnError(errcode); } } + +IOHook* StreamSocket::GetModHook(Module* mod) const +{ + for (IOHook* curr = GetIOHook(); curr; curr = GetNextHook(curr)) + { + if (curr->prov->creator == mod) + return curr; + } + return NULL; +} + +void StreamSocket::AddIOHook(IOHook* newhook) +{ + IOHook* curr = GetIOHook(); + if (!curr) + { + iohook = newhook; + return; + } + + IOHookMiddle* lasthook; + while (curr) + { + lasthook = IOHookMiddle::ToMiddleHook(curr); + if (!lasthook) + return; + curr = lasthook->GetNextHook(); + } + + lasthook->SetNextHook(newhook); +} + +size_t StreamSocket::getSendQSize() const +{ + size_t ret = sendq.bytes(); + IOHook* curr = GetIOHook(); + while (curr) + { + const IOHookMiddle* const iohm = IOHookMiddle::ToMiddleHook(curr); + if (!iohm) + break; + + ret += iohm->GetSendQ().bytes(); + curr = iohm->GetNextHook(); + } + return ret; +} |