From b950e46bfeef5643ff68c8c78530c1eff25d024e Mon Sep 17 00:00:00 2001 From: w00t Date: Thu, 28 Aug 2008 20:28:39 +0000 Subject: Newly revamped ziplinks module, work of psychon.. resolves (a lot) of problems with ziplinks and large data amounts, fixes a few bugs, etc. (thanks for your work). - Yes, this missed the alpha. git-svn-id: http://svn.inspircd.org/repository/trunk/inspircd@10342 e03df62e-2008-0410-955e-edbf42e46eb7 --- src/modules/extra/m_ziplink.cpp | 375 +++++++++++++++++++--------------------- 1 file changed, 179 insertions(+), 196 deletions(-) (limited to 'src/modules/extra') diff --git a/src/modules/extra/m_ziplink.cpp b/src/modules/extra/m_ziplink.cpp index 483eaea3b..cb19157fa 100644 --- a/src/modules/extra/m_ziplink.cpp +++ b/src/modules/extra/m_ziplink.cpp @@ -20,119 +20,34 @@ #include "hashcomp.h" #include "transport.h" +#include + /* $ModDesc: Provides zlib link support for servers */ /* $LinkerFlags: -lz */ /* $ModDep: transport.h */ /* - * Compressed data is transmitted across the link in the following format: - * - * 0 1 2 3 4 ... n - * +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+ - * | n | Z0 -> Zn | - * +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+ - * - * Where: n is the size of a frame, in network byte order, 4 bytes. - * Z0 through Zn are Zlib compressed data, n bytes in length. - * - * If the module fails to read the entire frame, then it will buffer - * the portion of the last frame it received, then attempt to read - * the next part of the frame next time a write notification arrives. - * * ZLIB_BEST_COMPRESSION (9) is used for all sending of data with - * a flush after each frame. A frame may contain multiple lines + * a flush after each chunk. A frame may contain multiple lines * and should be treated as raw binary data. - * */ /* Status of a connection */ -enum izip_status { IZIP_OPEN, IZIP_CLOSED }; +enum izip_status { IZIP_CLOSED = 0, IZIP_OPEN }; /* Maximum transfer size per read operation */ const unsigned int CHUNK = 128 * 1024; -/* This class manages a compressed chunk of data preceeded by - * a length count. - * - * It can handle having multiple chunks of data in the buffer - * at any time. - */ -class CountedBuffer : public classbase -{ - std::string buffer; /* Current buffer contents */ - unsigned int amount_expected; /* Amount of data expected */ - public: - CountedBuffer() - { - amount_expected = 0; - } - - /** Adds arbitrary compressed data to the buffer. - * - Binsry safe, of course. - */ - void AddData(unsigned char* data, int data_length) - { - buffer.append((const char*)data, data_length); - this->NextFrameSize(); - } - - /** Works out the size of the next compressed frame - */ - void NextFrameSize() - { - if ((!amount_expected) && (buffer.length() >= 4)) - { - /* We have enough to read an int - - * Yes, this is safe, but its ugly. Give me - * a nicer way to read 4 bytes from a binary - * stream, and push them into a 32 bit int, - * and i'll consider replacing this. - */ - amount_expected = ntohl((buffer[3] << 24) | (buffer[2] << 16) | (buffer[1] << 8) | buffer[0]); - buffer = buffer.substr(4); - } - } - - /** Gets the next frame and returns its size, or returns - * zero if there isnt one available yet. - * A frame can contain multiple plaintext lines. - * - Binary safe. - */ - int GetFrame(unsigned char* frame, int maxsize) - { - if (amount_expected) - { - /* We know how much we're expecting... - * Do we have enough yet? - */ - if (buffer.length() >= amount_expected) - { - int j = 0; - for (unsigned int i = 0; i < amount_expected; i++, j++) - frame[i] = buffer[i]; - - buffer = buffer.substr(j); - amount_expected = 0; - NextFrameSize(); - return j; - } - } - /* Not enough for a frame yet, COME AGAIN! */ - return 0; - } -}; - /** Represents an zipped connections extra data */ class izip_session : public classbase { public: z_stream c_stream; /* compression stream */ - z_stream d_stream; /* decompress stream */ + z_stream d_stream; /* uncompress stream */ izip_status status; /* Connection status */ - int fd; /* File descriptor */ - CountedBuffer* inbuf; /* Holds input buffer */ - std::string outbuf; /* Holds output buffer */ + std::string outbuf; /* Holds output buffer (compressed) */ + std::string inbuf; /* Holds input buffer (compressed) */ }; class ModuleZLib : public Module @@ -145,6 +60,12 @@ class ModuleZLib : public Module float total_out_uncompressed; float total_in_uncompressed; + /* Used for reading data from the wire. + * We need a smaller buffer than CHUNK, else we could read more data + * from the wire than we can pass to the socket code. + */ + char *read_buffer; + unsigned int read_buffer_size; public: ModuleZLib(InspIRCd* Me) @@ -153,17 +74,23 @@ class ModuleZLib : public Module ServerInstance->Modules->PublishInterface("BufferedSocketHook", this); sessions = new izip_session[ServerInstance->SE->GetMaxFds()]; + for (int i = 0; i < ServerInstance->SE->GetMaxFds(); i++) + sessions[i].status = IZIP_CLOSED; total_out_compressed = total_in_compressed = 0; - total_out_uncompressed = total_out_uncompressed = 0; + total_out_uncompressed = total_in_uncompressed = 0; Implementation eventlist[] = { I_OnRawSocketConnect, I_OnRawSocketAccept, I_OnRawSocketClose, I_OnRawSocketRead, I_OnRawSocketWrite, I_OnStats, I_OnRequest }; ServerInstance->Modules->Attach(eventlist, this, 7); + + read_buffer_size = ServerInstance->Config->NetBufferSize / 4; + read_buffer = new char[read_buffer_size]; } virtual ~ModuleZLib() { ServerInstance->Modules->UnpublishInterface("BufferedSocketHook", this); delete[] sessions; + delete[] read_buffer; } virtual Version GetVersion() @@ -197,7 +124,7 @@ class ModuleZLib : public Module } else if (strcmp("IS_UNHOOK", request->GetId()) == 0) { - /* Detatch from an inspsocket */ + /* Detach from an inspsocket */ return ServerInstance->Config->DelIOHook((BufferedSocket*)ISR->Sock) ? "OK" : NULL; } else if (strcmp("IS_HSDONE", request->GetId()) == 0) @@ -231,13 +158,13 @@ class ModuleZLib : public Module * (we dont count 64 bit ints because not all systems have 64 bit ints, and floats * can still hold more. */ - float outbound_r = 100 - ((total_out_compressed / (total_out_uncompressed + 0.001)) * 100); - float inbound_r = 100 - ((total_in_compressed / (total_in_uncompressed + 0.001)) * 100); + float outbound_r = (total_out_compressed / (total_out_uncompressed + 0.001)) * 100; + float inbound_r = (total_in_compressed / (total_in_uncompressed + 0.001)) * 100; float total_compressed = total_in_compressed + total_out_compressed; float total_uncompressed = total_in_uncompressed + total_out_uncompressed; - float total_r = 100 - ((total_compressed / (total_uncompressed + 0.001)) * 100); + float total_r = (total_compressed / (total_uncompressed + 0.001)) * 100; char outbound_ratio[MAXBUF], inbound_ratio[MAXBUF], combined_ratio[MAXBUF]; @@ -258,14 +185,18 @@ class ModuleZLib : public Module return 0; } - virtual void OnRawSocketAccept(int fd, const std::string &ip, int localport) + virtual void OnRawSocketConnect(int fd) { + if ((fd < 0) || (fd > ServerInstance->SE->GetMaxFds() - 1)) + return; + izip_session* session = &sessions[fd]; /* allocate state and buffers */ - session->fd = fd; session->status = IZIP_OPEN; - session->inbuf = new CountedBuffer(); + + /* Just in case... */ + session->outbuf.clear(); session->c_stream.zalloc = (alloc_func)0; session->c_stream.zfree = (free_func)0; @@ -274,12 +205,27 @@ class ModuleZLib : public Module session->d_stream.zalloc = (alloc_func)0; session->d_stream.zfree = (free_func)0; session->d_stream.opaque = (voidpf)0; + + /* If we cant call this, well, we're boned. */ + if (inflateInit(&session->d_stream) != Z_OK) + { + session->status = IZIP_CLOSED; + return; + } + + /* Same here */ + if (deflateInit(&session->c_stream, Z_BEST_COMPRESSION) != Z_OK) + { + inflateEnd(&session->d_stream); + session->status = IZIP_CLOSED; + return; + } } - virtual void OnRawSocketConnect(int fd) + virtual void OnRawSocketAccept(int fd, const std::string &ip, int localport) { - /* Nothing special needs doing here compared to accept() */ - OnRawSocketAccept(fd, "", 0); + /* Nothing special needs doing here compared to connect() */ + OnRawSocketConnect(fd); } virtual void OnRawSocketClose(int fd) @@ -295,137 +241,168 @@ class ModuleZLib : public Module if (session->status == IZIP_CLOSED) return 0; - unsigned char compr[CHUNK + 4]; - unsigned int offset = 0; - unsigned int total_size = 0; - - /* Read CHUNK bytes at a time to the buffer (usually 128k) */ - readresult = read(fd, compr, CHUNK); + /* Read read_buffer_size bytes at a time to the buffer (usually 128k) */ + readresult = read(fd, read_buffer, read_buffer_size); /* Did we get anything? */ - if (readresult > 0) - { - /* Add it to the frame queue */ - session->inbuf->AddData(compr, readresult); - total_in_compressed += readresult; + if (readresult <= 0) + return 0; - /* Parse all completed frames */ - int size = 0; - while ((size = session->inbuf->GetFrame(compr, CHUNK)) != 0) - { - session->d_stream.next_in = (Bytef*)compr; - session->d_stream.avail_in = 0; - session->d_stream.next_out = (Bytef*)(buffer + offset); - - /* If we cant call this, well, we're boned. */ - if (inflateInit(&session->d_stream) != Z_OK) - return 0; - - while ((session->d_stream.total_out < count) && (session->d_stream.total_in < (unsigned int)size)) - { - session->d_stream.avail_in = session->d_stream.avail_out = 1; - if (inflate(&session->d_stream, Z_NO_FLUSH) == Z_STREAM_END) - break; - } - - /* Stick a fork in me, i'm done */ - inflateEnd(&session->d_stream); - - /* Update counters and offsets */ - total_size += session->d_stream.total_out; - total_in_uncompressed += session->d_stream.total_out; - offset += session->d_stream.total_out; - } + total_in_compressed += readresult; + + /* Copy the compressed data into out input buffer */ + session->inbuf.append(read_buffer, readresult); + size_t in_len = session->inbuf.length(); - /* Null-terminate the buffer -- this doesnt harm binary data */ - buffer[total_size] = 0; + /* Prepare decompression */ + session->d_stream.next_in = (Bytef *)session->inbuf.c_str(); + session->d_stream.avail_in = in_len; - /* Set the read size to the correct total size */ - readresult = total_size; + session->d_stream.next_out = (Bytef*)buffer; + /* Last byte is reserved for NULL terminating that beast */ + session->d_stream.avail_out = count - 1; + /* Z_SYNC_FLUSH: Do as much as possible */ + int ret = inflate(&session->d_stream, Z_SYNC_FLUSH); + /* TODO CloseStream() in here at random places */ + switch (ret) + { + case Z_NEED_DICT: + case Z_STREAM_ERROR: + /* This is one of the 'not supposed to happen' things. + * Memory corruption, anyone? + */ + Error(session, "General Error. This is not supposed to happen :/"); + break; + case Z_DATA_ERROR: + Error(session, "Decompression failed, malformed data"); + break; + case Z_MEM_ERROR: + Error(session, "Out of memory"); + break; + case Z_BUF_ERROR: + /* This one is non-fatal, buffer is just full + * (can't happen here). + */ + Error(session, "Internal error. This is not supposed to happen."); + break; + case Z_STREAM_END: + /* This module *never* generates these :/ */ + Error(session, "End-of-stream marker received"); + break; + case Z_OK: + break; + default: + /* NO WAI! This can't happen. All errors are handled above. */ + Error(session, "Unknown error"); + break; } - return (readresult > 0); + if (ret != Z_OK) + { + readresult = 0; + return 0; + } + + /* Update the inbut buffer */ + unsigned int input_compressed = in_len - session->d_stream.avail_in; + session->inbuf = session->inbuf.substr(input_compressed); + + /* Update counters (Old size - new size) */ + unsigned int uncompressed_length = (count - 1) - session->d_stream.avail_out; + total_in_uncompressed += uncompressed_length; + + /* Null-terminate the buffer -- this doesnt harm binary data */ + buffer[uncompressed_length] = 0; + + /* Set the read size to the correct total size */ + readresult = uncompressed_length; + + return 1; } virtual int OnRawSocketWrite(int fd, const char* buffer, int count) { izip_session* session = &sessions[fd]; - int ocount = count; if (!count) /* Nothing to do! */ return 0; if(session->status != IZIP_OPEN) - { /* Seriously, wtf? */ - CloseSession(session); return 0; - } - unsigned char compr[CHUNK + 4]; + unsigned char compr[CHUNK]; + int ret; - /* Gentlemen, start your engines! */ - if (deflateInit(&session->c_stream, Z_BEST_COMPRESSION) != Z_OK) + /* This loop is really only supposed to run once, but in case 'compr' + * is filled up somehow we are prepared to handle this situation. + */ + unsigned int offset = 0; + do { - CloseSession(session); - return 0; - } + /* Prepare compression */ + session->c_stream.next_in = (Bytef*)buffer + offset; + session->c_stream.avail_in = count - offset; - /* Set buffer sizes (we reserve 4 bytes at the start of the - * buffer for the length counters) - */ - session->c_stream.next_in = (Bytef*)buffer; - session->c_stream.next_out = compr + 4; + session->c_stream.next_out = compr; + session->c_stream.avail_out = CHUNK; - /* Compress the text */ - while ((session->c_stream.total_in < (unsigned int)count) && (session->c_stream.total_out < CHUNK)) - { - session->c_stream.avail_in = session->c_stream.avail_out = 1; - if (deflate(&session->c_stream, Z_NO_FLUSH) != Z_OK) + /* Compress the text */ + ret = deflate(&session->c_stream, Z_SYNC_FLUSH); + /* TODO CloseStream() in here at random places */ + switch (ret) { - CloseSession(session); - return 0; + case Z_OK: + break; + case Z_BUF_ERROR: + /* This one is non-fatal, buffer is just full + * (can't happen here). + */ + Error(session, "Internal error. This is not supposed to happen."); + break; + case Z_STREAM_ERROR: + /* This is one of the 'not supposed to happen' things. + * Memory corruption, anyone? + */ + Error(session, "General Error. This is also not supposed to happen."); + break; + default: + Error(session, "Unknown error"); + break; } - } - /* Finish the stream */ - for (session->c_stream.avail_out = 1; deflate(&session->c_stream, Z_FINISH) != Z_STREAM_END; session->c_stream.avail_out = 1); - deflateEnd(&session->c_stream); - total_out_uncompressed += ocount; - total_out_compressed += session->c_stream.total_out; + if (ret != Z_OK) + return 0; - /** Assemble the frame length onto the frame, in network byte order */ - compr[0] = (session->c_stream.total_out >> 24); - compr[1] = (session->c_stream.total_out >> 16); - compr[2] = (session->c_stream.total_out >> 8); - compr[3] = (session->c_stream.total_out & 0xFF); + /* Space before - space after stuff was added to this */ + unsigned int compressed = CHUNK - session->c_stream.avail_out; + unsigned int uncompressed = count - session->c_stream.avail_in; - /* Add compressed data plus leading length to the output buffer - - * Note, we may have incomplete half-sent frames in here. - */ - session->outbuf.append((const char*)compr, session->c_stream.total_out + 4); + /* Make it skip the data which was compressed already */ + offset += uncompressed; + + /* Update stats */ + total_out_uncompressed += uncompressed; + total_out_compressed += compressed; + + /* Add compressed to the output buffer */ + session->outbuf.append((const char*)compr, compressed); + } while (session->c_stream.avail_in != 0); /* Lets see how much we can send out */ - int ret = write(fd, session->outbuf.data(), session->outbuf.length()); + ret = write(fd, session->outbuf.data(), session->outbuf.length()); /* Check for errors, and advance the buffer if any was sent */ if (ret > 0) session->outbuf = session->outbuf.substr(ret); else if (ret < 1) { - if (ret == -1) - { - if (errno == EAGAIN) - return 0; - else - { - session->outbuf.clear(); - return 0; - } - } + if (errno == EAGAIN) + return 0; else { session->outbuf.clear(); + /* TODO pass errors down? */ return 0; } } @@ -433,7 +410,12 @@ class ModuleZLib : public Module /* ALL LIES the lot of it, we havent really written * this amount, but the layer above doesnt need to know. */ - return ocount; + return count; + } + + void Error(izip_session* session, const std::string &text) + { + ServerInstance->SNO->WriteToSnoMask('l', "ziplink error: " + text); } void CloseSession(izip_session* session) @@ -442,7 +424,8 @@ class ModuleZLib : public Module { session->status = IZIP_CLOSED; session->outbuf.clear(); - delete session->inbuf; + inflateEnd(&session->d_stream); + deflateEnd(&session->c_stream); } } -- cgit v1.2.3