diff options
-rw-r--r-- | src/modules/extra/m_ziplink.cpp | 220 | ||||
-rw-r--r-- | src/socketengine_epoll.cpp | 6 | ||||
-rw-r--r-- | src/socketengine_kqueue.cpp | 3 | ||||
-rw-r--r-- | src/socketengine_select.cpp | 28 |
4 files changed, 161 insertions, 96 deletions
diff --git a/src/modules/extra/m_ziplink.cpp b/src/modules/extra/m_ziplink.cpp index 27902add7..0647517c2 100644 --- a/src/modules/extra/m_ziplink.cpp +++ b/src/modules/extra/m_ziplink.cpp @@ -41,10 +41,105 @@ */ -enum izip_status { IZIP_WAITFIRST, IZIP_OPEN, IZIP_CLOSED }; +enum izip_status { IZIP_OPEN, IZIP_CLOSED }; const unsigned int CHUNK = 16384; +class CountedBuffer : public classbase +{ + int bufptr; /* Current tail location */ + unsigned char* buffer; /* Current buffer contents */ + int bufsz; /* Current buffer size */ + int amount_expected; /* Amount of data expected */ + int amount_read; /* Amount of data read so far */ + public: + CountedBuffer() + { + bufsz = 1024; + buffer = new unsigned char[bufsz + 1]; + bufptr = 0; + amount_read = 0; + } + + ~CountedBuffer() + { + delete[] buffer; + } + + void AddData(unsigned char* data, int data_length) + { + if ((data_length + bufptr) > bufsz) + { + /* Buffer is too small, enlarge it and copy contents */ + int old_bufsz = bufsz; + unsigned char* temp = buffer; + + bufsz += data_length; + buffer = new unsigned char[bufsz + 1]; + + memcpy(buffer, temp, old_bufsz); + + delete[] temp; + } + + memcpy(buffer + bufptr, data, data_length); + bufptr += data_length; + amount_read += data_length; + + if ((!amount_expected) && (amount_read >= 4)) + { + /* We have enough to read an int */ + int* size = (int*)buffer; + amount_expected = ntohl(*size); + } + } + + int GetFrame(unsigned char* frame, int maxsize) + { + if (amount_expected) + { + /* We know how much we're expecting... + * Do we have enough yet? + */ + if ((amount_read - 4) >= amount_expected) + { + int amt_ex = amount_expected; + /* Yes, we have enough now */ + memcpy(frame, buffer + 4, amount_expected > maxsize ? maxsize : amount_expected); + RemoveFirstFrame(); + return (amt_ex > maxsize) ? maxsize : amt_ex; + } + } + /* Not enough for a frame yet, COME AGAIN! */ + return 0; + } + + void RemoveFirstFrame() + { + unsigned char* temp = buffer; + + bufsz -= (amount_expected + 4); + buffer = new unsigned char[bufsz + 1]; + + memcpy(buffer, temp + amount_expected, bufsz); + + amount_read -= (amount_expected + 4); + + if (amount_read >= 4) + { + /* We have enough to read an int */ + int* size = (int*)buffer; + amount_expected = ntohl(*size); + } + else + amount_expected = 0; + + bufptr = 0; + + delete[] temp; + } +}; + /** Represents an ZIP user's extra data */ class izip_session : public classbase @@ -55,7 +150,7 @@ class izip_session : public classbase izip_status status; int need_bytes; int fd; - std::string inbuf; + CountedBuffer* inbuf; }; class ModuleZLib : public Module @@ -157,9 +252,9 @@ class ModuleZLib : public Module /* allocate deflate state */ session->fd = fd; - session->status = IZIP_WAITFIRST; + session->status = IZIP_OPEN; - session->need_bytes = 0; + session->inbuf = new CountedBuffer(); session->c_stream.zalloc = (alloc_func)0; session->c_stream.zfree = (free_func)0; @@ -188,108 +283,67 @@ class ModuleZLib : public Module if (session->status == IZIP_CLOSED) return 1; - int size = 0; - - if (session->need_bytes) - { - size = session->need_bytes; - } - else - { - if (read(fd, &size, sizeof(size)) != sizeof(size)) - return 0; - size = ntohl(size); - } - - ServerInstance->Log(DEBUG,"Size of frame to read: %d%s", size, session->need_bytes ? " (remainder of last frame)" : ""); + unsigned char compr[CHUNK + 1]; - unsigned char compr[size+1+session->need_bytes]; + readresult = read(fd, compr, CHUNK); - readresult = read(fd, compr + session->need_bytes, size); - - if (readresult == size) + if (readresult > 0) { - if(session->status == IZIP_WAITFIRST) - { - session->status = IZIP_OPEN; - } - - /* Reassemble first part of last frame */ - if (session->need_bytes) - { - for (size_t i = 0; i < session->inbuf.length(); i++) - compr[i] = session->inbuf[i]; - } - - session->d_stream.next_in = (Bytef*)compr; - session->d_stream.avail_in = 0; - session->d_stream.next_out = (Bytef*)buffer; - if (inflateInit(&session->d_stream) != Z_OK) - return -EBADF; - session->status = IZIP_OPEN; - - while ((session->d_stream.total_out < count) && (session->d_stream.total_in < (unsigned int)readresult)) - { - session->d_stream.avail_in = session->d_stream.avail_out = 1; /* force small buffers */ - if (inflate(&session->d_stream, Z_NO_FLUSH) == Z_STREAM_END) - break; - } - - inflateEnd(&session->d_stream); - - total_in_compressed += readresult; - readresult = session->d_stream.total_out; - total_in_uncompressed += session->d_stream.total_out; - - buffer[readresult] = 0; - session->need_bytes = 0; - } - else - { - /* We need to buffer here */ - ServerInstance->Log(DEBUG,"Didnt read whole frame, got %d bytes of %d!", readresult, size); - session->need_bytes = ((readresult > -1) ? (size - readresult) : (size)); - if (readresult > 0) + session->inbuf->AddData(compr, readresult); + + int size = session->inbuf->GetFrame(compr, CHUNK); + if (size) { - /* Do it this way because it needs to be binary safe */ - for (int i = 0; i < readresult; i++) - session->inbuf += compr[i]; + + session->d_stream.next_in = (Bytef*)compr; + session->d_stream.avail_in = 0; + session->d_stream.next_out = (Bytef*)buffer; + if (inflateInit(&session->d_stream) != Z_OK) + return -EBADF; + + while ((session->d_stream.total_out < count) && (session->d_stream.total_in < (unsigned int)size)) + { + session->d_stream.avail_in = session->d_stream.avail_out = 1; + if (inflate(&session->d_stream, Z_NO_FLUSH) == Z_STREAM_END) + break; + } + + inflateEnd(&session->d_stream); + + total_in_compressed += readresult; + readresult = session->d_stream.total_out; + total_in_uncompressed += session->d_stream.total_out; + + buffer[session->d_stream.total_out] = 0; } } - return (readresult > 0); } virtual int OnRawSocketWrite(int fd, const char* buffer, int count) { + izip_session* session = &sessions[fd]; int ocount = count; + if (!count) { ServerInstance->Log(DEBUG,"Nothing to do!"); return 1; } - unsigned char compr[count*2+4]; - - izip_session* session = &sessions[fd]; - - if(session->status == IZIP_WAITFIRST) + if(session->status != IZIP_OPEN) { - session->status = IZIP_OPEN; + CloseSession(session); + return 0; } + unsigned char compr[count*2+4]; + if (deflateInit(&session->c_stream, Z_BEST_COMPRESSION) != Z_OK) { ServerInstance->Log(DEBUG,"Deflate init failed"); } - if(session->status != IZIP_OPEN) - { - ServerInstance->Log(DEBUG,"State not open!"); - CloseSession(session); - return 0; - } - session->c_stream.next_in = (Bytef*)buffer; session->c_stream.next_out = compr+4; @@ -329,7 +383,11 @@ class ModuleZLib : public Module void CloseSession(izip_session* session) { - session->status = IZIP_CLOSED; + if (session->status = IZIP_OPEN) + { + session->status = IZIP_CLOSED; + delete session->inbuf; + } } }; diff --git a/src/socketengine_epoll.cpp b/src/socketengine_epoll.cpp index 06124a48c..bc0013b4c 100644 --- a/src/socketengine_epoll.cpp +++ b/src/socketengine_epoll.cpp @@ -140,7 +140,8 @@ int EPollEngine::DispatchEvents() if (events[j].events & EPOLLHUP) { ServerInstance->Log(DEBUG,"Handle error event on fd %d", events[j].data.fd); - ref[events[j].data.fd]->HandleEvent(EVENT_ERROR, 0); + if (ref[events[j].data.fd]) + ref[events[j].data.fd]->HandleEvent(EVENT_ERROR, 0); continue; } if (events[j].events & EPOLLERR) @@ -149,7 +150,8 @@ int EPollEngine::DispatchEvents() if (getsockopt(events[j].data.fd, SOL_SOCKET, SO_ERROR, &errcode, &codesize) < 0) errcode = errno; ServerInstance->Log(DEBUG,"Handle error event on fd %d: %s", events[j].data.fd, strerror(errcode)); - ref[events[j].data.fd]->HandleEvent(EVENT_ERROR, errcode); + if (ref[events[j].data.fd]) + ref[events[j].data.fd]->HandleEvent(EVENT_ERROR, errcode); continue; } if (events[j].events & EPOLLOUT) diff --git a/src/socketengine_kqueue.cpp b/src/socketengine_kqueue.cpp index cad88d393..ed26faa27 100644 --- a/src/socketengine_kqueue.cpp +++ b/src/socketengine_kqueue.cpp @@ -150,7 +150,8 @@ int KQueueEngine::DispatchEvents() * Unlike smelly epoll and select, where we have to getsockopt * to get the error, this saves us time and cpu cycles. Go BSD! */ - ref[ke_list[j].ident]->HandleEvent(EVENT_ERROR, ke_list[j].fflags); + if (ref[ke_list[j].ident]) + ref[ke_list[j].ident]->HandleEvent(EVENT_ERROR, ke_list[j].fflags); continue; } if (ke_list[j].flags & EVFILT_WRITE) diff --git a/src/socketengine_select.cpp b/src/socketengine_select.cpp index 19a2ec882..b7a9c3597 100644 --- a/src/socketengine_select.cpp +++ b/src/socketengine_select.cpp @@ -147,22 +147,26 @@ int SelectEngine::DispatchEvents() { if (getsockopt(ev[i]->GetFd(), SOL_SOCKET, SO_ERROR, &errcode, &codesize) < 0) errcode = errno; - ev[i]->HandleEvent(EVENT_ERROR, errcode); + + v[i]->HandleEvent(EVENT_ERROR, errcode); } continue; } - ServerInstance->Log(DEBUG,"Handle %s event on fd %d",writeable[ev[i]->GetFd()] || !ev[i]->Readable() ? "write" : "read", ev[i]->GetFd()); - if (writeable[ev[i]->GetFd()]) + if (ev[i]) { - if (ev[i]) - ev[i]->HandleEvent(EVENT_WRITE); - writeable[ev[i]->GetFd()] = false; - - } - else - { - if (ev[i]) - ev[i]->HandleEvent(ev[i]->Readable() ? EVENT_READ : EVENT_WRITE); + ServerInstance->Log(DEBUG,"Handle %s event on fd %d",writeable[ev[i]->GetFd()] || !ev[i]->Readable() ? "write" : "read", ev[i]->GetFd()); + if (writeable[ev[i]->GetFd()]) + { + if (ev[i]) + ev[i]->HandleEvent(EVENT_WRITE); + writeable[ev[i]->GetFd()] = false; + + } + else + { + if (ev[i]) + ev[i]->HandleEvent(ev[i]->Readable() ? EVENT_READ : EVENT_WRITE); + } } } } |