summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/modules/extra/m_ziplink.cpp220
-rw-r--r--src/socketengine_epoll.cpp6
-rw-r--r--src/socketengine_kqueue.cpp3
-rw-r--r--src/socketengine_select.cpp28
4 files changed, 161 insertions, 96 deletions
diff --git a/src/modules/extra/m_ziplink.cpp b/src/modules/extra/m_ziplink.cpp
index 27902add7..0647517c2 100644
--- a/src/modules/extra/m_ziplink.cpp
+++ b/src/modules/extra/m_ziplink.cpp
@@ -41,10 +41,105 @@
*/
-enum izip_status { IZIP_WAITFIRST, IZIP_OPEN, IZIP_CLOSED };
+enum izip_status { IZIP_OPEN, IZIP_CLOSED };
const unsigned int CHUNK = 16384;
+class CountedBuffer : public classbase
+{
+ int bufptr; /* Current tail location */
+ unsigned char* buffer; /* Current buffer contents */
+ int bufsz; /* Current buffer size */
+ int amount_expected; /* Amount of data expected */
+ int amount_read; /* Amount of data read so far */
+ public:
+ CountedBuffer()
+ {
+ bufsz = 1024;
+ buffer = new unsigned char[bufsz + 1];
+ bufptr = 0;
+ amount_read = 0;
+ }
+
+ ~CountedBuffer()
+ {
+ delete[] buffer;
+ }
+
+ void AddData(unsigned char* data, int data_length)
+ {
+ if ((data_length + bufptr) > bufsz)
+ {
+ /* Buffer is too small, enlarge it and copy contents */
+ int old_bufsz = bufsz;
+ unsigned char* temp = buffer;
+
+ bufsz += data_length;
+ buffer = new unsigned char[bufsz + 1];
+
+ memcpy(buffer, temp, old_bufsz);
+
+ delete[] temp;
+ }
+
+ memcpy(buffer + bufptr, data, data_length);
+ bufptr += data_length;
+ amount_read += data_length;
+
+ if ((!amount_expected) && (amount_read >= 4))
+ {
+ /* We have enough to read an int */
+ int* size = (int*)buffer;
+ amount_expected = ntohl(*size);
+ }
+ }
+
+ int GetFrame(unsigned char* frame, int maxsize)
+ {
+ if (amount_expected)
+ {
+ /* We know how much we're expecting...
+ * Do we have enough yet?
+ */
+ if ((amount_read - 4) >= amount_expected)
+ {
+ int amt_ex = amount_expected;
+ /* Yes, we have enough now */
+ memcpy(frame, buffer + 4, amount_expected > maxsize ? maxsize : amount_expected);
+ RemoveFirstFrame();
+ return (amt_ex > maxsize) ? maxsize : amt_ex;
+ }
+ }
+ /* Not enough for a frame yet, COME AGAIN! */
+ return 0;
+ }
+
+ void RemoveFirstFrame()
+ {
+ unsigned char* temp = buffer;
+
+ bufsz -= (amount_expected + 4);
+ buffer = new unsigned char[bufsz + 1];
+
+ memcpy(buffer, temp + amount_expected, bufsz);
+
+ amount_read -= (amount_expected + 4);
+
+ if (amount_read >= 4)
+ {
+ /* We have enough to read an int */
+ int* size = (int*)buffer;
+ amount_expected = ntohl(*size);
+ }
+ else
+ amount_expected = 0;
+
+ bufptr = 0;
+
+ delete[] temp;
+ }
+};
+
/** Represents an ZIP user's extra data
*/
class izip_session : public classbase
@@ -55,7 +150,7 @@ class izip_session : public classbase
izip_status status;
int need_bytes;
int fd;
- std::string inbuf;
+ CountedBuffer* inbuf;
};
class ModuleZLib : public Module
@@ -157,9 +252,9 @@ class ModuleZLib : public Module
/* allocate deflate state */
session->fd = fd;
- session->status = IZIP_WAITFIRST;
+ session->status = IZIP_OPEN;
- session->need_bytes = 0;
+ session->inbuf = new CountedBuffer();
session->c_stream.zalloc = (alloc_func)0;
session->c_stream.zfree = (free_func)0;
@@ -188,108 +283,67 @@ class ModuleZLib : public Module
if (session->status == IZIP_CLOSED)
return 1;
- int size = 0;
-
- if (session->need_bytes)
- {
- size = session->need_bytes;
- }
- else
- {
- if (read(fd, &size, sizeof(size)) != sizeof(size))
- return 0;
- size = ntohl(size);
- }
-
- ServerInstance->Log(DEBUG,"Size of frame to read: %d%s", size, session->need_bytes ? " (remainder of last frame)" : "");
+ unsigned char compr[CHUNK + 1];
- unsigned char compr[size+1+session->need_bytes];
+ readresult = read(fd, compr, CHUNK);
- readresult = read(fd, compr + session->need_bytes, size);
-
- if (readresult == size)
+ if (readresult > 0)
{
- if(session->status == IZIP_WAITFIRST)
- {
- session->status = IZIP_OPEN;
- }
-
- /* Reassemble first part of last frame */
- if (session->need_bytes)
- {
- for (size_t i = 0; i < session->inbuf.length(); i++)
- compr[i] = session->inbuf[i];
- }
-
- session->d_stream.next_in = (Bytef*)compr;
- session->d_stream.avail_in = 0;
- session->d_stream.next_out = (Bytef*)buffer;
- if (inflateInit(&session->d_stream) != Z_OK)
- return -EBADF;
- session->status = IZIP_OPEN;
-
- while ((session->d_stream.total_out < count) && (session->d_stream.total_in < (unsigned int)readresult))
- {
- session->d_stream.avail_in = session->d_stream.avail_out = 1; /* force small buffers */
- if (inflate(&session->d_stream, Z_NO_FLUSH) == Z_STREAM_END)
- break;
- }
-
- inflateEnd(&session->d_stream);
-
- total_in_compressed += readresult;
- readresult = session->d_stream.total_out;
- total_in_uncompressed += session->d_stream.total_out;
-
- buffer[readresult] = 0;
- session->need_bytes = 0;
- }
- else
- {
- /* We need to buffer here */
- ServerInstance->Log(DEBUG,"Didnt read whole frame, got %d bytes of %d!", readresult, size);
- session->need_bytes = ((readresult > -1) ? (size - readresult) : (size));
- if (readresult > 0)
+ session->inbuf->AddData(compr, readresult);
+
+ int size = session->inbuf->GetFrame(compr, CHUNK);
+ if (size)
{
- /* Do it this way because it needs to be binary safe */
- for (int i = 0; i < readresult; i++)
- session->inbuf += compr[i];
+
+ session->d_stream.next_in = (Bytef*)compr;
+ session->d_stream.avail_in = 0;
+ session->d_stream.next_out = (Bytef*)buffer;
+ if (inflateInit(&session->d_stream) != Z_OK)
+ return -EBADF;
+
+ while ((session->d_stream.total_out < count) && (session->d_stream.total_in < (unsigned int)size))
+ {
+ session->d_stream.avail_in = session->d_stream.avail_out = 1;
+ if (inflate(&session->d_stream, Z_NO_FLUSH) == Z_STREAM_END)
+ break;
+ }
+
+ inflateEnd(&session->d_stream);
+
+ total_in_compressed += readresult;
+ readresult = session->d_stream.total_out;
+ total_in_uncompressed += session->d_stream.total_out;
+
+ buffer[session->d_stream.total_out] = 0;
}
}
-
return (readresult > 0);
}
virtual int OnRawSocketWrite(int fd, const char* buffer, int count)
{
+ izip_session* session = &sessions[fd];
int ocount = count;
+
if (!count)
{
ServerInstance->Log(DEBUG,"Nothing to do!");
return 1;
}
- unsigned char compr[count*2+4];
-
- izip_session* session = &sessions[fd];
-
- if(session->status == IZIP_WAITFIRST)
+ if(session->status != IZIP_OPEN)
{
- session->status = IZIP_OPEN;
+ CloseSession(session);
+ return 0;
}
+ unsigned char compr[count*2+4];
+
if (deflateInit(&session->c_stream, Z_BEST_COMPRESSION) != Z_OK)
{
ServerInstance->Log(DEBUG,"Deflate init failed");
}
- if(session->status != IZIP_OPEN)
- {
- ServerInstance->Log(DEBUG,"State not open!");
- CloseSession(session);
- return 0;
- }
-
session->c_stream.next_in = (Bytef*)buffer;
session->c_stream.next_out = compr+4;
@@ -329,7 +383,11 @@ class ModuleZLib : public Module
void CloseSession(izip_session* session)
{
- session->status = IZIP_CLOSED;
+ if (session->status = IZIP_OPEN)
+ {
+ session->status = IZIP_CLOSED;
+ delete session->inbuf;
+ }
}
};
diff --git a/src/socketengine_epoll.cpp b/src/socketengine_epoll.cpp
index 06124a48c..bc0013b4c 100644
--- a/src/socketengine_epoll.cpp
+++ b/src/socketengine_epoll.cpp
@@ -140,7 +140,8 @@ int EPollEngine::DispatchEvents()
if (events[j].events & EPOLLHUP)
{
ServerInstance->Log(DEBUG,"Handle error event on fd %d", events[j].data.fd);
- ref[events[j].data.fd]->HandleEvent(EVENT_ERROR, 0);
+ if (ref[events[j].data.fd])
+ ref[events[j].data.fd]->HandleEvent(EVENT_ERROR, 0);
continue;
}
if (events[j].events & EPOLLERR)
@@ -149,7 +150,8 @@ int EPollEngine::DispatchEvents()
if (getsockopt(events[j].data.fd, SOL_SOCKET, SO_ERROR, &errcode, &codesize) < 0)
errcode = errno;
ServerInstance->Log(DEBUG,"Handle error event on fd %d: %s", events[j].data.fd, strerror(errcode));
- ref[events[j].data.fd]->HandleEvent(EVENT_ERROR, errcode);
+ if (ref[events[j].data.fd])
+ ref[events[j].data.fd]->HandleEvent(EVENT_ERROR, errcode);
continue;
}
if (events[j].events & EPOLLOUT)
diff --git a/src/socketengine_kqueue.cpp b/src/socketengine_kqueue.cpp
index cad88d393..ed26faa27 100644
--- a/src/socketengine_kqueue.cpp
+++ b/src/socketengine_kqueue.cpp
@@ -150,7 +150,8 @@ int KQueueEngine::DispatchEvents()
* Unlike smelly epoll and select, where we have to getsockopt
* to get the error, this saves us time and cpu cycles. Go BSD!
*/
- ref[ke_list[j].ident]->HandleEvent(EVENT_ERROR, ke_list[j].fflags);
+ if (ref[ke_list[j].ident])
+ ref[ke_list[j].ident]->HandleEvent(EVENT_ERROR, ke_list[j].fflags);
continue;
}
if (ke_list[j].flags & EVFILT_WRITE)
diff --git a/src/socketengine_select.cpp b/src/socketengine_select.cpp
index 19a2ec882..b7a9c3597 100644
--- a/src/socketengine_select.cpp
+++ b/src/socketengine_select.cpp
@@ -147,22 +147,26 @@ int SelectEngine::DispatchEvents()
{
if (getsockopt(ev[i]->GetFd(), SOL_SOCKET, SO_ERROR, &errcode, &codesize) < 0)
errcode = errno;
- ev[i]->HandleEvent(EVENT_ERROR, errcode);
+
+ v[i]->HandleEvent(EVENT_ERROR, errcode);
}
continue;
}
- ServerInstance->Log(DEBUG,"Handle %s event on fd %d",writeable[ev[i]->GetFd()] || !ev[i]->Readable() ? "write" : "read", ev[i]->GetFd());
- if (writeable[ev[i]->GetFd()])
+ if (ev[i])
{
- if (ev[i])
- ev[i]->HandleEvent(EVENT_WRITE);
- writeable[ev[i]->GetFd()] = false;
-
- }
- else
- {
- if (ev[i])
- ev[i]->HandleEvent(ev[i]->Readable() ? EVENT_READ : EVENT_WRITE);
+ ServerInstance->Log(DEBUG,"Handle %s event on fd %d",writeable[ev[i]->GetFd()] || !ev[i]->Readable() ? "write" : "read", ev[i]->GetFd());
+ if (writeable[ev[i]->GetFd()])
+ {
+ if (ev[i])
+ ev[i]->HandleEvent(EVENT_WRITE);
+ writeable[ev[i]->GetFd()] = false;
+
+ }
+ else
+ {
+ if (ev[i])
+ ev[i]->HandleEvent(ev[i]->Readable() ? EVENT_READ : EVENT_WRITE);
+ }
}
}
}