From 939cb0ba987f927f1ad900d45f98ba6d8e03e9d4 Mon Sep 17 00:00:00 2001 From: danieldg Date: Sat, 26 Sep 2009 01:43:09 +0000 Subject: Clean up SocketEngine interface to allow edge-triggered I/O and sockets that do not force readability. git-svn-id: http://svn.inspircd.org/repository/trunk/inspircd@11760 e03df62e-2008-0410-955e-edbf42e46eb7 --- src/socketengines/socketengine_epoll.cpp | 128 +++++++++++++++--------------- src/socketengines/socketengine_iocp.cpp | 12 +-- src/socketengines/socketengine_kqueue.cpp | 96 +++++++++++----------- src/socketengines/socketengine_poll.cpp | 126 ++++++++++++----------------- src/socketengines/socketengine_ports.cpp | 85 +++++++++++--------- src/socketengines/socketengine_select.cpp | 77 +++++++----------- 6 files changed, 243 insertions(+), 281 deletions(-) (limited to 'src/socketengines') diff --git a/src/socketengines/socketengine_epoll.cpp b/src/socketengines/socketengine_epoll.cpp index 7fed6f250..672ff4a7b 100644 --- a/src/socketengines/socketengine_epoll.cpp +++ b/src/socketengines/socketengine_epoll.cpp @@ -18,7 +18,18 @@ EPollEngine::EPollEngine() { - MAX_DESCRIPTORS = 0; + int max = ulimit(4, 0); + if (max > 0) + { + MAX_DESCRIPTORS = max; + } + else + { + ServerInstance->Logs->Log("SOCKET", DEFAULT, "ERROR: Can't determine maximum number of open sockets!"); + printf("ERROR: Can't determine maximum number of open sockets!\n"); + ServerInstance->Exit(EXIT_STATUS_SOCKETENGINE); + } + // This is not a maximum, just a hint at the eventual number of sockets that may be polled. EngineHandle = epoll_create(GetMaxFds() / 4); @@ -26,11 +37,10 @@ EPollEngine::EPollEngine() { ServerInstance->Logs->Log("SOCKET",DEFAULT, "ERROR: Could not initialize socket engine: %s", strerror(errno)); ServerInstance->Logs->Log("SOCKET",DEFAULT, "ERROR: Your kernel probably does not have the proper features. This is a fatal error, exiting now."); - printf("ERROR: Could not initialize socket engine: %s\n", strerror(errno)); + printf("ERROR: Could not initialize epoll socket engine: %s\n", strerror(errno)); printf("ERROR: Your kernel probably does not have the proper features. This is a fatal error, exiting now.\n"); ServerInstance->Exit(EXIT_STATUS_SOCKETENGINE); } - CurrentSetSize = 0; ref = new EventHandler* [GetMaxFds()]; events = new struct epoll_event[GetMaxFds()]; @@ -45,18 +55,35 @@ EPollEngine::~EPollEngine() delete[] events; } -bool EPollEngine::AddFd(EventHandler* eh, bool writeFirst) +static int mask_to_epoll(int event_mask) { - int fd = eh->GetFd(); - if ((fd < 0) || (fd > GetMaxFds() - 1)) + int rv = 0; + if (event_mask & (FD_WANT_POLL_READ | FD_WANT_POLL_WRITE)) { - ServerInstance->Logs->Log("SOCKET",DEBUG,"AddFd out of range: (fd: %d, max: %d)", fd, GetMaxFds()); - return false; + // we need to use standard polling on this FD + if (event_mask & (FD_WANT_POLL_READ | FD_WANT_FAST_READ)) + rv |= EPOLLIN; + if (event_mask & (FD_WANT_POLL_WRITE | FD_WANT_FAST_WRITE)) + rv |= EPOLLOUT; } + else + { + // we can use edge-triggered polling on this FD + rv = EPOLLET; + if (event_mask & (FD_WANT_FAST_READ | FD_WANT_EDGE_READ)) + rv |= EPOLLIN; + if (event_mask & (FD_WANT_FAST_WRITE | FD_WANT_EDGE_WRITE)) + rv |= EPOLLOUT; + } + return rv; +} - if (GetRemainingFds() <= 1) +bool EPollEngine::AddFd(EventHandler* eh, int event_mask) +{ + int fd = eh->GetFd(); + if ((fd < 0) || (fd > GetMaxFds() - 1)) { - ServerInstance->Logs->Log("SOCKET",DEBUG,"No remaining FDs cannot add fd: %d", fd); + ServerInstance->Logs->Log("SOCKET",DEBUG,"AddFd out of range: (fd: %d, max: %d)", fd, GetMaxFds()); return false; } @@ -68,7 +95,7 @@ bool EPollEngine::AddFd(EventHandler* eh, bool writeFirst) struct epoll_event ev; memset(&ev,0,sizeof(ev)); - ev.events = writeFirst ? EPOLLOUT : EPOLLIN; + ev.events = mask_to_epoll(event_mask); ev.data.fd = fd; int i = epoll_ctl(EngineHandle, EPOLL_CTL_ADD, fd, &ev); if (i < 0) @@ -80,20 +107,24 @@ bool EPollEngine::AddFd(EventHandler* eh, bool writeFirst) ServerInstance->Logs->Log("SOCKET",DEBUG,"New file descriptor: %d", fd); ref[fd] = eh; + SocketEngine::SetEventMask(eh, event_mask); CurrentSetSize++; return true; } -void EPollEngine::WantWrite(EventHandler* eh) +void EPollEngine::OnSetEvent(EventHandler* eh, int old_mask, int new_mask) { - /** Use oneshot so that the system removes the writeable - * status for us and saves us a call. - */ - struct epoll_event ev; - memset(&ev,0,sizeof(ev)); - ev.events = EPOLLIN | EPOLLOUT; - ev.data.fd = eh->GetFd(); - epoll_ctl(EngineHandle, EPOLL_CTL_MOD, eh->GetFd(), &ev); + int old_events = mask_to_epoll(old_mask); + int new_events = mask_to_epoll(new_mask); + if (old_events != new_events) + { + // ok, we actually have something to tell the kernel about + struct epoll_event ev; + memset(&ev,0,sizeof(ev)); + ev.events = new_events; + ev.data.fd = eh->GetFd(); + epoll_ctl(EngineHandle, EPOLL_CTL_MOD, eh->GetFd(), &ev); + } } bool EPollEngine::DelFd(EventHandler* eh, bool force) @@ -117,37 +148,12 @@ bool EPollEngine::DelFd(EventHandler* eh, bool force) } ref[fd] = NULL; - CurrentSetSize--; ServerInstance->Logs->Log("SOCKET",DEBUG,"Remove file descriptor: %d", fd); + CurrentSetSize--; return true; } -int EPollEngine::GetMaxFds() -{ - if (MAX_DESCRIPTORS) - return MAX_DESCRIPTORS; - - int max = ulimit(4, 0); - if (max > 0) - { - MAX_DESCRIPTORS = max; - return max; - } - else - { - ServerInstance->Logs->Log("SOCKET", DEFAULT, "ERROR: Can't determine maximum number of open sockets!"); - printf("ERROR: Can't determine maximum number of open sockets!\n"); - ServerInstance->Exit(EXIT_STATUS_SOCKETENGINE); - } - return 0; -} - -int EPollEngine::GetRemainingFds() -{ - return GetMaxFds() - CurrentSetSize; -} - int EPollEngine::DispatchEvents() { socklen_t codesize = sizeof(int); @@ -158,11 +164,13 @@ int EPollEngine::DispatchEvents() for (int j = 0; j < i; j++) { + EventHandler* eh = ref[events[j].data.fd]; + if (!eh) + continue; if (events[j].events & EPOLLHUP) { ErrorEvents++; - if (ref[events[j].data.fd]) - ref[events[j].data.fd]->HandleEvent(EVENT_ERROR, 0); + eh->HandleEvent(EVENT_ERROR, 0); continue; } if (events[j].events & EPOLLERR) @@ -171,26 +179,20 @@ int EPollEngine::DispatchEvents() /* Get error number */ if (getsockopt(events[j].data.fd, SOL_SOCKET, SO_ERROR, &errcode, &codesize) < 0) errcode = errno; - if (ref[events[j].data.fd]) - ref[events[j].data.fd]->HandleEvent(EVENT_ERROR, errcode); + eh->HandleEvent(EVENT_ERROR, errcode); continue; } - if (events[j].events & EPOLLOUT) + if (events[j].events & EPOLLIN) { - WriteEvents++; - struct epoll_event ev; - memset(&ev,0,sizeof(ev)); - ev.events = EPOLLIN; - ev.data.fd = events[j].data.fd; - epoll_ctl(EngineHandle, EPOLL_CTL_MOD, events[j].data.fd, &ev); - if (ref[events[j].data.fd]) - ref[events[j].data.fd]->HandleEvent(EVENT_WRITE); + ReadEvents++; + SetEventMask(eh, eh->GetEventMask() & ~FD_READ_WILL_BLOCK); + eh->HandleEvent(EVENT_READ); } - else + if (events[j].events & EPOLLOUT) { - ReadEvents++; - if (ref[events[j].data.fd]) - ref[events[j].data.fd]->HandleEvent(EVENT_READ); + WriteEvents++; + SetEventMask(eh, eh->GetEventMask() & ~FD_WRITE_WILL_BLOCK); + eh->HandleEvent(EVENT_WRITE); } } diff --git a/src/socketengines/socketengine_iocp.cpp b/src/socketengines/socketengine_iocp.cpp index 3c3181909..e09fb4d0a 100644 --- a/src/socketengines/socketengine_iocp.cpp +++ b/src/socketengines/socketengine_iocp.cpp @@ -33,7 +33,6 @@ IOCPEngine::IOCPEngine() /* Null variables out. */ CurrentSetSize = 0; - EngineHandle = 0; MAX_DESCRIPTORS = 10240; ref = new EventHandler* [10240]; memset(ref, 0, sizeof(EventHandler*) * MAX_DESCRIPTORS); @@ -47,7 +46,7 @@ IOCPEngine::~IOCPEngine() delete[] ref; } -bool IOCPEngine::AddFd(EventHandler* eh, bool writeFirst) +bool IOCPEngine::AddFd(EventHandler* eh, int event_mask) { /* Does it at least look valid? */ if (!eh) @@ -92,7 +91,7 @@ bool IOCPEngine::AddFd(EventHandler* eh, bool writeFirst) ServerInstance->Logs->Log("SOCKET",DEBUG, "New fake fd: %u, real fd: %u, address 0x%p", *fake_fd, eh->GetFd(), eh); /* post a write event if there is data to be written */ - if(writeFirst) + if (event_mask & (FD_WANT_POLL_WRITE | FD_WANT_FAST_WRITE)) WantWrite(eh); /* we're all good =) */ @@ -107,6 +106,7 @@ bool IOCPEngine::AddFd(EventHandler* eh, bool writeFirst) } ++CurrentSetSize; + SocketEngine::SetEventMask(eh, event_mask); ref[*fake_fd] = eh; return true; @@ -171,7 +171,7 @@ bool IOCPEngine::DelFd(EventHandler* eh, bool force /* = false */) return true; } -void IOCPEngine::WantWrite(EventHandler* eh) +void IOCPEngine::OnSetEvent(EventHandler* eh, int old_mask, int new_mask) { if (!eh) return; @@ -183,7 +183,7 @@ void IOCPEngine::WantWrite(EventHandler* eh) return; /* Post event - write begin */ - if(!eh->GetExt("windows_writeevent", m_writeEvent)) + if((new_mask & (FD_WANT_POLL_WRITE | FD_WANT_FAST_WRITE)) && !eh->GetExt("windows_writeevent", m_writeEvent)) { ULONG_PTR completion_key = (ULONG_PTR)*fake_fd; Overlapped * ov = new Overlapped(SOCKET_IO_EVENT_WRITE_READY, 0); @@ -315,6 +315,7 @@ int IOCPEngine::DispatchEvents() { WriteEvents++; eh->Shrink("windows_writeevent"); + SetEventMask(eh, eh->GetEventMask() & ~FD_WRITE_WILL_BLOCK); eh->HandleEvent(EVENT_WRITE, 0); } break; @@ -322,6 +323,7 @@ int IOCPEngine::DispatchEvents() case SOCKET_IO_EVENT_READ_READY: { ReadEvents++; + SetEventMask(eh, eh->GetEventMask() & ~FD_READ_WILL_BLOCK); if(ov->m_params) { // if we had params, it means we are a udp socket with a udp_overlap pointer in this long. diff --git a/src/socketengines/socketengine_kqueue.cpp b/src/socketengines/socketengine_kqueue.cpp index cbe3e959d..c9734e85d 100644 --- a/src/socketengines/socketengine_kqueue.cpp +++ b/src/socketengines/socketengine_kqueue.cpp @@ -54,16 +54,13 @@ KQueueEngine::~KQueueEngine() delete[] ke_list; } -bool KQueueEngine::AddFd(EventHandler* eh, bool writeFirst) +bool KQueueEngine::AddFd(EventHandler* eh, int event_mask) { int fd = eh->GetFd(); if ((fd < 0) || (fd > GetMaxFds() - 1)) return false; - if (GetRemainingFds() <= 1) - return false; - if (ref[fd]) return false; @@ -79,12 +76,13 @@ bool KQueueEngine::AddFd(EventHandler* eh, bool writeFirst) return false; } - if (writeFirst) { + if (event_mask & (FD_WANT_POLL_WRITE | FD_WANT_FAST_WRITE)) { // ...and sometimes want to write WantWrite(eh); } ref[fd] = eh; + SocketEngine::SetEventMask(eh, event_mask); CurrentSetSize++; ServerInstance->Logs->Log("SOCKET",DEBUG,"New file descriptor: %d", fd); @@ -126,38 +124,41 @@ bool KQueueEngine::DelFd(EventHandler* eh, bool force) return true; } -void KQueueEngine::WantWrite(EventHandler* eh) +void KQueueEngine::OnSetEvent(EventHandler* eh, int old_mask, int new_mask) { - struct kevent ke; - // EV_ONESHOT since we only ever want one write event - EV_SET(&ke, eh->GetFd(), EVFILT_WRITE, EV_ADD | EV_ONESHOT, 0, 0, NULL); - int i = kevent(EngineHandle, &ke, 1, 0, 0, NULL); - if (i < 0) { - ServerInstance->Logs->Log("SOCKET",DEFAULT,"Failed to mark for writing: %d %s", - eh->GetFd(), strerror(errno)); + if ((new_mask & FD_WANT_POLL_WRITE) && !(old_mask & FD_WANT_POLL_WRITE)) + { + // new poll-style write + struct kevent ke; + EV_SET(&ke, eh->GetFd(), EVFILT_WRITE, EV_ADD, 0, 0, NULL); + int i = kevent(EngineHandle, &ke, 1, 0, 0, NULL); + if (i < 0) { + ServerInstance->Logs->Log("SOCKET",DEFAULT,"Failed to mark for writing: %d %s", + eh->GetFd(), strerror(errno)); + } } -} - -int KQueueEngine::GetMaxFds() -{ - if (!MAX_DESCRIPTORS) + else if ((old_mask & FD_WANT_POLL_WRITE) && !(new_mask & FD_WANT_POLL_WRITE)) { - int mib[2], maxfiles; - size_t len; - - mib[0] = CTL_KERN; - mib[1] = KERN_MAXFILES; - len = sizeof(maxfiles); - sysctl(mib, 2, &maxfiles, &len, NULL, 0); - MAX_DESCRIPTORS = maxfiles; - return maxfiles; + // removing poll-style write + struct kevent ke; + EV_SET(&ke, eh->GetFd(), EVFILT_WRITE, EV_DELETE, 0, 0, NULL); + int i = kevent(EngineHandle, &ke, 1, 0, 0, NULL); + if (i < 0) { + ServerInstance->Logs->Log("SOCKET",DEFAULT,"Failed to mark for writing: %d %s", + eh->GetFd(), strerror(errno)); + } + } + if ((new_mask & FD_WANT_EDGE_WRITE) && !(old_mask & FD_WANT_EDGE_WRITE)) + { + // new one-shot write + struct kevent ke; + EV_SET(&ke, eh->GetFd(), EVFILT_WRITE, EV_ADD | EV_ONESHOT, 0, 0, NULL); + int i = kevent(EngineHandle, &ke, 1, 0, 0, NULL); + if (i < 0) { + ServerInstance->Logs->Log("SOCKET",DEFAULT,"Failed to mark for writing: %d %s", + eh->GetFd(), strerror(errno)); + } } - return MAX_DESCRIPTORS; -} - -int KQueueEngine::GetRemainingFds() -{ - return GetMaxFds() - CurrentSetSize; } int KQueueEngine::DispatchEvents() @@ -171,34 +172,31 @@ int KQueueEngine::DispatchEvents() for (int j = 0; j < i; j++) { + EventHandler* eh = ref[ke_list[j].ident]; + if (!eh) + continue; if (ke_list[j].flags & EV_EOF) { - /* We love you kqueue, oh yes we do *sings*! - * kqueue gives us the error number directly in the EOF state! - * Unlike smelly epoll and select, where we have to getsockopt - * to get the error, this saves us time and cpu cycles. Go BSD! - */ ErrorEvents++; - if (ref[ke_list[j].ident]) - ref[ke_list[j].ident]->HandleEvent(EVENT_ERROR, ke_list[j].fflags); + eh->HandleEvent(EVENT_ERROR, ke_list[j].fflags); continue; } if (ke_list[j].filter == EVFILT_WRITE) { - /* We only ever add write events with EV_ONESHOT, which - * means they are automatically removed once such a - * event fires, so nothing to do here. - */ - WriteEvents++; - if (ref[ke_list[j].ident]) - ref[ke_list[j].ident]->HandleEvent(EVENT_WRITE); + /* When mask is FD_WANT_FAST_WRITE, we set a one-shot + * write, so we need to clear that bit to detect when it + * set again. + */ + const int bits_to_clr = FD_WANT_FAST_WRITE | FD_WRITE_WILL_BLOCK; + SetEventMask(eh, eh->GetEventMask() & ~bits_to_clr); + eh->HandleEvent(EVENT_WRITE); } if (ke_list[j].filter == EVFILT_READ) { ReadEvents++; - if (ref[ke_list[j].ident]) - ref[ke_list[j].ident]->HandleEvent(EVENT_READ); + SetEventMask(eh, eh->GetEventMask() & ~FD_READ_WILL_BLOCK); + eh->HandleEvent(EVENT_READ); } } diff --git a/src/socketengines/socketengine_poll.cpp b/src/socketengines/socketengine_poll.cpp index 6d5ddb9f5..6f50e2798 100644 --- a/src/socketengines/socketengine_poll.cpp +++ b/src/socketengines/socketengine_poll.cpp @@ -21,9 +21,28 @@ PollEngine::PollEngine() { - // Poll requires no special setup (which is nice). CurrentSetSize = 0; - MAX_DESCRIPTORS = 0; +#ifndef __FreeBSD__ + int max = ulimit(4, 0); + if (max > 0) + { + MAX_DESCRIPTORS = max; + } + else + { + ServerInstance->Logs->Log("SOCKET", DEFAULT, "ERROR: Can't determine maximum number of open sockets: %s", strerror(errno)); + printf("ERROR: Can't determine maximum number of open sockets: %s\n", strerror(errno)); + ServerInstance->Exit(EXIT_STATUS_SOCKETENGINE); + } +#else + int mib[2]; + size_t len; + + mib[0] = CTL_KERN; + mib[1] = KERN_MAXFILES; + len = sizeof(MAX_DESCRIPTORS); + sysctl(mib, 2, &MAX_DESCRIPTORS, &len, NULL, 0); +#endif ref = new EventHandler* [GetMaxFds()]; events = new struct pollfd[GetMaxFds()]; @@ -39,7 +58,17 @@ PollEngine::~PollEngine() delete[] events; } -bool PollEngine::AddFd(EventHandler* eh, bool writeFirst) +static int mask_to_poll(int event_mask) +{ + int rv = 0; + if (event_mask & (FD_WANT_POLL_READ | FD_WANT_FAST_READ)) + rv |= POLLIN; + if (event_mask & (FD_WANT_POLL_WRITE | FD_WANT_FAST_WRITE)) + rv |= POLLOUT; + return rv; +} + +bool PollEngine::AddFd(EventHandler* eh, int event_mask) { int fd = eh->GetFd(); if ((fd < 0) || (fd > GetMaxFds() - 1)) @@ -48,12 +77,6 @@ bool PollEngine::AddFd(EventHandler* eh, bool writeFirst) return false; } - if (GetRemainingFds() <= 1) - { - ServerInstance->Logs->Log("SOCKET",DEBUG,"No remaining FDs cannot add fd: %d", fd); - return false; - } - if (fd_mappings.find(fd) != fd_mappings.end()) { ServerInstance->Logs->Log("SOCKET",DEBUG,"Attempt to add duplicate fd: %d", fd); @@ -65,16 +88,10 @@ bool PollEngine::AddFd(EventHandler* eh, bool writeFirst) fd_mappings[fd] = index; ref[index] = eh; events[index].fd = fd; - if (writeFirst) - { - events[index].events = POLLOUT; - } - else - { - events[index].events = POLLIN; - } + events[index].events = mask_to_poll(event_mask); ServerInstance->Logs->Log("SOCKET", DEBUG,"New file descriptor: %d (%d; index %d)", fd, events[fd].events, index); + SocketEngine::SetEventMask(eh, event_mask); CurrentSetSize++; return true; } @@ -87,16 +104,16 @@ EventHandler* PollEngine::GetRef(int fd) return ref[it->second]; } -void PollEngine::WantWrite(EventHandler* eh) +void PollEngine::OnSetEvent(EventHandler* eh, int old_mask, int new_mask) { std::map::iterator it = fd_mappings.find(eh->GetFd()); if (it == fd_mappings.end()) { - ServerInstance->Logs->Log("SOCKET",DEBUG,"WantWrite() on unknown fd: %d", eh->GetFd()); + ServerInstance->Logs->Log("SOCKET",DEBUG,"SetEvents() on unknown fd: %d", eh->GetFd()); return; } - events[it->second].events = POLLIN | POLLOUT; + events[it->second].events = mask_to_poll(new_mask); } bool PollEngine::DelFd(EventHandler* eh, bool force) @@ -147,48 +164,6 @@ bool PollEngine::DelFd(EventHandler* eh, bool force) return true; } -int PollEngine::GetMaxFds() -{ -#ifndef __FreeBSD__ - if (MAX_DESCRIPTORS) - return MAX_DESCRIPTORS; - - int max = ulimit(4, 0); - if (max > 0) - { - MAX_DESCRIPTORS = max; - return max; - } - else - { - MAX_DESCRIPTORS = 0; - ServerInstance->Logs->Log("SOCKET", DEFAULT, "ERROR: Can't determine maximum number of open sockets: %s", strerror(errno)); - printf("ERROR: Can't determine maximum number of open sockets: %s\n", strerror(errno)); - ServerInstance->Exit(EXIT_STATUS_SOCKETENGINE); - } - return 0; -#else - if (!MAX_DESCRIPTORS) - { - int mib[2], maxfiles; - size_t len; - - mib[0] = CTL_KERN; - mib[1] = KERN_MAXFILES; - len = sizeof(maxfiles); - sysctl(mib, 2, &maxfiles, &len, NULL, 0); - MAX_DESCRIPTORS = maxfiles; - return maxfiles; - } - return MAX_DESCRIPTORS; -#endif -} - -int PollEngine::GetRemainingFds() -{ - return MAX_DESCRIPTORS - CurrentSetSize; -} - int PollEngine::DispatchEvents() { int i = poll(events, CurrentSetSize, 1000); @@ -203,11 +178,13 @@ int PollEngine::DispatchEvents() { if (events[index].revents) processed++; + EventHandler* eh = ref[index]; + if (!eh) + continue; if (events[index].revents & POLLHUP) { - if (ref[index]) - ref[index]->HandleEvent(EVENT_ERROR, 0); + eh->HandleEvent(EVENT_ERROR, 0); continue; } @@ -219,25 +196,20 @@ int PollEngine::DispatchEvents() // Get error number if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &errcode, &codesize) < 0) errcode = errno; - if (ref[index]) - ref[index]->HandleEvent(EVENT_ERROR, errcode); + eh->HandleEvent(EVENT_ERROR, errcode); continue; } - if (events[index].revents & POLLOUT) + if (events[index].revents & POLLIN) { - // Switch to wanting read again - // event handlers have to request to write again if they need it - events[index].events = POLLIN; - - if (ref[index]) - ref[index]->HandleEvent(EVENT_WRITE); + SetEventMask(eh, eh->GetEventMask() & ~FD_READ_WILL_BLOCK); + eh->HandleEvent(EVENT_READ); } - - if (events[index].revents & POLLIN) + + if (events[index].revents & POLLOUT) { - if (ref[index]) - ref[index]->HandleEvent(EVENT_READ); + SetEventMask(eh, eh->GetEventMask() & ~FD_WRITE_WILL_BLOCK); + eh->HandleEvent(EVENT_WRITE); } } } diff --git a/src/socketengines/socketengine_ports.cpp b/src/socketengines/socketengine_ports.cpp index eb08839d0..a99806fc4 100644 --- a/src/socketengines/socketengine_ports.cpp +++ b/src/socketengines/socketengine_ports.cpp @@ -19,7 +19,18 @@ PortsEngine::PortsEngine() { - MAX_DESCRIPTORS = 0; + int max = ulimit(4, 0); + if (max > 0) + { + MAX_DESCRIPTORS = max; + return max; + } + else + { + ServerInstance->Logs->Log("SOCKET", DEFAULT, "ERROR: Can't determine maximum number of open sockets!"); + printf("ERROR: Can't determine maximum number of open sockets!\n"); + ServerInstance->Exit(EXIT_STATUS_SOCKETENGINE); + } EngineHandle = port_create(); if (EngineHandle == -1) @@ -44,29 +55,38 @@ PortsEngine::~PortsEngine() delete[] events; } -bool PortsEngine::AddFd(EventHandler* eh, bool writeFirst) +static int mask_to_events(int event_mask) +{ + int rv = 0; + if (event_mask & (FD_WANT_POLL_READ | FD_WANT_FAST_READ)) + rv |= POLLRDNORM; + if (event_mask & (FD_WANT_POLL_WRITE | FD_WANT_FAST_WRITE)) + rv |= POLLWRNORM; + return rv; +} + +bool PortsEngine::AddFd(EventHandler* eh, int event_mask) { int fd = eh->GetFd(); if ((fd < 0) || (fd > GetMaxFds() - 1)) return false; - if (GetRemainingFds() <= 1) - return false; - if (ref[fd]) return false; ref[fd] = eh; - port_associate(EngineHandle, PORT_SOURCE_FD, fd, writeFirst ? POLLWRNORM : POLLRDNORM, eh); + SocketEngine::SetEventMask(eh, event_mask); + port_associate(EngineHandle, PORT_SOURCE_FD, fd, mask_to_events(event_mask), eh); ServerInstance->Logs->Log("SOCKET",DEBUG,"New file descriptor: %d", fd); CurrentSetSize++; return true; } -void PortsEngine::WantWrite(EventHandler* eh) +void PortsEngine::WantWrite(EventHandler* eh, int old_mask, int new_mask) { - port_associate(EngineHandle, PORT_SOURCE_FD, eh->GetFd(), POLLRDNORM | POLLWRNORM, eh); + if (mask_to_events(new_mask) != mask_to_events(old_mask)) + port_associate(EngineHandle, PORT_SOURCE_FD, eh->GetFd(), mask_to_events(new_mask), eh); } bool PortsEngine::DelFd(EventHandler* eh, bool force) @@ -84,31 +104,6 @@ bool PortsEngine::DelFd(EventHandler* eh, bool force) return true; } -int PortsEngine::GetMaxFds() -{ - if (MAX_DESCRIPTORS) - return MAX_DESCRIPTORS; - - int max = ulimit(4, 0); - if (max > 0) - { - MAX_DESCRIPTORS = max; - return max; - } - else - { - ServerInstance->Logs->Log("SOCKET", DEFAULT, "ERROR: Can't determine maximum number of open sockets!"); - printf("ERROR: Can't determine maximum number of open sockets!\n"); - ServerInstance->Exit(EXIT_STATUS_SOCKETENGINE); - } -#include -} - -int PortsEngine::GetRemainingFds() -{ - return GetMaxFds() - CurrentSetSize; -} - int PortsEngine::DispatchEvents() { struct timespec poll_time; @@ -132,15 +127,27 @@ int PortsEngine::DispatchEvents() case PORT_SOURCE_FD: { int fd = this->events[i].portev_object; - if (ref[fd]) + EventHandler* eh = ref[fd]; + if (eh) { - // reinsert port for next time around - port_associate(EngineHandle, PORT_SOURCE_FD, fd, POLLRDNORM, ref[fd]); - if ((this->events[i].portev_events & POLLRDNORM)) + int mask = eh->GetEventMask(); + if (events[i].portev_events & POLLWRNORM) + mask &= ~(FD_WRITE_WILL_BLOCK | FD_WANT_FAST_WRITE); + if (events[i].portev_events & POLLRDNORM) + mask &= ~FD_READ_WILL_BLOCK; + // reinsert port for next time around, pretending to be one-shot for writes + SetEventMask(ev, mask); + port_associate(EngineHandle, PORT_SOURCE_FD, fd, mask_to_events(mask), eh); + if (events[i].portev_events & POLLRDNORM) + { ReadEvents++; - else + eh->HandleEvent(EVENT_READ); + } + if (events[i].portev_events & POLLWRNORM) + { WriteEvents++; - ref[fd]->HandleEvent((this->events[i].portev_events & POLLRDNORM) ? EVENT_READ : EVENT_WRITE); + eh->HandleEvent(EVENT_WRITE); + } } } default: diff --git a/src/socketengines/socketengine_select.cpp b/src/socketengines/socketengine_select.cpp index 7f6a4e283..f089fd698 100644 --- a/src/socketengines/socketengine_select.cpp +++ b/src/socketengines/socketengine_select.cpp @@ -21,10 +21,8 @@ SelectEngine::SelectEngine() { MAX_DESCRIPTORS = FD_SETSIZE; - EngineHandle = 0; CurrentSetSize = 0; - writeable.assign(GetMaxFds(), false); ref = new EventHandler* [GetMaxFds()]; memset(ref, 0, GetMaxFds() * sizeof(EventHandler*)); } @@ -34,33 +32,23 @@ SelectEngine::~SelectEngine() delete[] ref; } -bool SelectEngine::AddFd(EventHandler* eh, bool writeFirst) +bool SelectEngine::AddFd(EventHandler* eh, int) { int fd = eh->GetFd(); if ((fd < 0) || (fd > GetMaxFds() - 1)) return false; - if (GetRemainingFds() <= 1) - return false; - if (ref[fd]) return false; - fds.insert(fd); ref[fd] = eh; + SocketEngine::SetEventMask(eh, event_mask); CurrentSetSize++; - writeable[eh->GetFd()] = writeFirst; - ServerInstance->Logs->Log("SOCKET",DEBUG,"New file descriptor: %d", fd); return true; } -void SelectEngine::WantWrite(EventHandler* eh) -{ - writeable[eh->GetFd()] = true; -} - bool SelectEngine::DelFd(EventHandler* eh, bool force) { int fd = eh->GetFd(); @@ -68,10 +56,6 @@ bool SelectEngine::DelFd(EventHandler* eh, bool force) if ((fd < 0) || (fd > GetMaxFds() - 1)) return false; - std::set::iterator t = fds.find(fd); - if (t != fds.end()) - fds.erase(t); - CurrentSetSize--; ref[fd] = NULL; @@ -79,14 +63,9 @@ bool SelectEngine::DelFd(EventHandler* eh, bool force) return true; } -int SelectEngine::GetMaxFds() +void SelectEngine::OnSetEvent(EventHandler* eh, int old_mask, int new_mask) { - return FD_SETSIZE; -} - -int SelectEngine::GetRemainingFds() -{ - return GetMaxFds() - CurrentSetSize; + // deal with it later } int SelectEngine::DispatchEvents() @@ -96,24 +75,26 @@ int SelectEngine::DispatchEvents() socklen_t codesize = sizeof(int); int errcode = 0; + fd_set wfdset, rfdset, errfdset; FD_ZERO(&wfdset); FD_ZERO(&rfdset); FD_ZERO(&errfdset); - /* Populate the select FD set (this is why select sucks compared to epoll, kqueue, IOCP) */ - for (std::set::iterator a = fds.begin(); a != fds.end(); a++) + /* Populate the select FD sets (this is why select sucks compared to epoll, kqueue, IOCP) */ + for (int i = 0; i < FD_SETSIZE; i++) { - /* Explicitly one-time writeable */ - if (writeable[*a]) - FD_SET (*a, &wfdset); - else - FD_SET (*a, &rfdset); - - /* All sockets must receive error notifications regardless */ - FD_SET (*a, &errfdset); + EventHandler* eh = ref[i]; + if (!eh) + continue; + int state = eh->GetEventMask(); + if (state & (FD_WANT_POLL_READ | FD_WANT_FAST_READ)) + FD_SET (i, &rfdset); + if (state & (FD_WANT_POLL_WRITE | FD_WANT_FAST_WRITE)) + FD_SET (i, &wfdset); + FD_SET (i, &errfdset); } - /* One second waits */ + /* One second wait */ tval.tv_sec = 1; tval.tv_usec = 0; @@ -123,16 +104,15 @@ int SelectEngine::DispatchEvents() if (sresult < 1) return 0; - std::vector copy(fds.begin(), fds.end()); - for (std::vector::iterator a = copy.begin(); a != copy.end(); a++) + for (int i = 0; i < FD_SETSIZE; i++) { - EventHandler* ev = ref[*a]; + EventHandler* ev = ref[i]; if (ev) { - if (FD_ISSET (ev->GetFd(), &errfdset)) + if (FD_ISSET (i, &errfdset)) { ErrorEvents++; - if (getsockopt(ev->GetFd(), SOL_SOCKET, SO_ERROR, (char*)&errcode, &codesize) < 0) + if (getsockopt(i, SOL_SOCKET, SO_ERROR, (char*)&errcode, &codesize) < 0) errcode = errno; ev->HandleEvent(EVENT_ERROR, errcode); @@ -145,16 +125,17 @@ int SelectEngine::DispatchEvents() * If an error event occurs above it is not worth processing the * read and write states even if set. */ - if (FD_ISSET (ev->GetFd(), &wfdset)) + if (FD_ISSET (i, &rfdset)) { - WriteEvents++; - writeable[ev->GetFd()] = false; - ev->HandleEvent(EVENT_WRITE); + ReadEvents++; + SetEventMask(eh, eh->GetEventMask() & ~FD_READ_WILL_BLOCK); + ev->HandleEvent(EVENT_READ); } - if (FD_ISSET (ev->GetFd(), &rfdset)) + if (FD_ISSET (i, &wfdset)) { - ReadEvents++; - ev->HandleEvent(EVENT_READ); + WriteEvents++; + SetEventMask(eh, eh->GetEventMask() & ~FD_WRITE_WILL_BLOCK); + ev->HandleEvent(EVENT_WRITE); } } } -- cgit v1.2.3