From a39bd74d3e942db1b465cae643e1a4766ffb304e Mon Sep 17 00:00:00 2001 From: Julian Brown Date: Sun, 15 Mar 2015 13:36:54 +0000 Subject: When trying spooled messages, account for the local-interface in grouping for a connection. Bug 1141 --- src/src/deliver.c | 16 +++++ src/src/functions.h | 5 +- src/src/ip.c | 88 +++++++++++------------ src/src/structs.h | 3 + src/src/transport.c | 173 ++++++++++++++++++++++++++++++++++++---------- src/src/transports/smtp.c | 87 ++++++++++++++++++++++- 6 files changed, 286 insertions(+), 86 deletions(-) (limited to 'src') diff --git a/src/src/deliver.c b/src/src/deliver.c index 65b4824ab..1cdecc6e9 100644 --- a/src/src/deliver.c +++ b/src/src/deliver.c @@ -7865,6 +7865,22 @@ if (!regex_IGNOREQUOTA) regex_IGNOREQUOTA = } +uschar * +deliver_get_sender_address (uschar * id) +{ +if (!spool_open_datafile(id)) + return NULL; + +sprintf(CS spoolname, "%s-H", id); +if (spool_read_header(spoolname, TRUE, TRUE) != spool_read_OK) + return NULL; + +(void)close(deliver_datafile); +deliver_datafile = -1; + +return sender_address; +} + /* vi: aw ai sw=2 */ /* End of deliver.c */ diff --git a/src/src/functions.h b/src/src/functions.h index df40d6456..7195afa88 100644 --- a/src/src/functions.h +++ b/src/src/functions.h @@ -133,6 +133,9 @@ extern void deliver_msglog(const char *, ...) PRINTF_FUNCTION(1,2); extern void deliver_set_expansions(address_item *); extern int deliver_split_address(address_item *); extern void deliver_succeeded(address_item *); + +extern uschar *deliver_get_sender_address (uschar *id); + #ifdef WITH_OLD_DEMIME extern int demime(uschar **); #endif @@ -424,7 +427,7 @@ extern uschar *tod_stamp(int); extern void tls_modify_variables(tls_support *); extern BOOL transport_check_waiting(const uschar *, const uschar *, int, uschar *, - BOOL *); + BOOL *, oicf, void*); extern void transport_init(void); extern BOOL transport_pass_socket(const uschar *, const uschar *, const uschar *, uschar *, int); diff --git a/src/src/ip.c b/src/src/ip.c index 83c8d167b..5b37e3898 100644 --- a/src/src/ip.c +++ b/src/src/ip.c @@ -228,14 +228,11 @@ alarm(0); can't think of any other way of doing this. It converts a connection refused into a timeout if the timeout is set to 999999. */ -if (running_in_test_harness) +if (running_in_test_harness && save_errno == ECONNREFUSED && timeout == 999999) { - if (save_errno == ECONNREFUSED && timeout == 999999) - { - rc = -1; - save_errno = EINTR; - sigalrm_seen = TRUE; - } + rc = -1; + save_errno = EINTR; + sigalrm_seen = TRUE; } /* Success */ @@ -245,7 +242,7 @@ if (rc >= 0) return 0; /* A failure whose error code is "Interrupted system call" is in fact an externally applied timeout if the signal handler has been run. */ -errno = (save_errno == EINTR && sigalrm_seen)? ETIMEDOUT : save_errno; +errno = save_errno == EINTR && sigalrm_seen ? ETIMEDOUT : save_errno; return -1; } @@ -360,53 +357,57 @@ bad: int ip_tcpsocket(const uschar * hostport, uschar ** errstr, int tmo) { - int scan; - uschar hostname[256]; - unsigned int portlow, porthigh; - - /* extract host and port part */ - scan = sscanf(CS hostport, "%255s %u-%u", hostname, &portlow, &porthigh); - if ( scan != 3 ) { - if ( scan != 2 ) { - *errstr = string_sprintf("invalid socket '%s'", hostport); - return -1; +int scan; +uschar hostname[256]; +unsigned int portlow, porthigh; + +/* extract host and port part */ +scan = sscanf(CS hostport, "%255s %u-%u", hostname, &portlow, &porthigh); +if (scan != 3) + { + if (scan != 2) + { + *errstr = string_sprintf("invalid socket '%s'", hostport); + return -1; } - porthigh = portlow; + porthigh = portlow; } - return ip_connectedsocket(SOCK_STREAM, hostname, portlow, porthigh, - tmo, NULL, errstr); +return ip_connectedsocket(SOCK_STREAM, hostname, portlow, porthigh, + tmo, NULL, errstr); } int ip_unixsocket(const uschar * path, uschar ** errstr) { - int sock; - struct sockaddr_un server; +int sock; +struct sockaddr_un server; - if ((sock = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) { - *errstr = US"can't open UNIX socket."; - return -1; +if ((sock = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) + { + *errstr = US"can't open UNIX socket."; + return -1; } - server.sun_family = AF_UNIX; - Ustrncpy(server.sun_path, path, sizeof(server.sun_path)-1); - server.sun_path[sizeof(server.sun_path)-1] = '\0'; - if (connect(sock, (struct sockaddr *) &server, sizeof(server)) < 0) { - int err = errno; - (void)close(sock); - *errstr = string_sprintf("unable to connect to UNIX socket (%s): %s", - path, strerror(err)); - return -1; - } - return sock; +server.sun_family = AF_UNIX; +Ustrncpy(server.sun_path, path, sizeof(server.sun_path)-1); +server.sun_path[sizeof(server.sun_path)-1] = '\0'; +if (connect(sock, (struct sockaddr *) &server, sizeof(server)) < 0) + { + int err = errno; + (void)close(sock); + *errstr = string_sprintf("unable to connect to UNIX socket (%s): %s", + path, strerror(err)); + return -1; + } +return sock; } int ip_streamsocket(const uschar * spec, uschar ** errstr, int tmo) { - return *spec == '/' - ? ip_unixsocket(spec, errstr) : ip_tcpsocket(spec, errstr, tmo); +return *spec == '/' + ? ip_unixsocket(spec, errstr) : ip_tcpsocket(spec, errstr, tmo); } /************************************************* @@ -461,7 +462,7 @@ if (timeout <= 0) } /* Wait until the socket is ready */ -for (;;) +do { FD_ZERO (&select_inset); FD_SET (fd, &select_inset); @@ -497,9 +498,8 @@ for (;;) } /* If the socket is ready, break out of the loop. */ - - if (FD_ISSET(fd, &select_inset)) break; } +while (!FD_ISSET(fd, &select_inset)); return TRUE; } @@ -702,13 +702,9 @@ while (last > first) return TRUE; } else if (c > 0) - { first = middle + 1; - } else - { last = middle; - } } return FALSE; } diff --git a/src/src/structs.h b/src/src/structs.h index 85207b52f..6ec52e1ec 100644 --- a/src/src/structs.h +++ b/src/src/structs.h @@ -791,4 +791,7 @@ typedef struct acl_block { int verb; } acl_block; +/* smtp transport calc outbound_ip */ +typedef BOOL (*oicf) (uschar *message_id, void *data); + /* End of structs.h */ diff --git a/src/src/transport.c b/src/src/transport.c index fea914646..7b3d46908 100644 --- a/src/src/transport.c +++ b/src/src/transport.c @@ -1625,13 +1625,24 @@ Arguments: as set by the caller transport new_message_id set to the message id of a waiting message more set TRUE if there are yet more messages waiting + oicf_func function to call to validate if it is ok to send + to this message_id from the current instance. + oicf_data opaque data for oicf_func Returns: TRUE if new_message_id set; FALSE otherwise */ +typedef struct msgq_s msgq_t; + +typedef struct msgq_s +{ + uschar message_id [MESSAGE_ID_LENGTH + 1]; + BOOL bKeep; +} msgq_t; + BOOL transport_check_waiting(const uschar *transport_name, const uschar *hostname, - int local_message_max, uschar *new_message_id, BOOL *more) + int local_message_max, uschar *new_message_id, BOOL *more, oicf oicf_func, void *oicf_data) { dbdata_wait *host_record; int host_length, path_len; @@ -1639,6 +1650,16 @@ open_db dbblock; open_db *dbm_file; uschar buffer[256]; +msgq_t *msgq = NULL; +int msgq_count = 0; +int msgq_actual = 0; +int i; +BOOL bFound = FALSE; +uschar spool_dir [PATH_MAX]; +uschar spool_file [PATH_MAX]; +struct stat statbuf; +BOOL bContinuation = FALSE; + *more = FALSE; DEBUG(D_transport) @@ -1691,58 +1712,107 @@ until one is found for which a spool file actually exists. If the record gets emptied, delete it and continue with any continuation records that may exist. */ -host_length = host_record->count * MESSAGE_ID_LENGTH; +/* For Bug 1141, I refactored this major portion of the routine, it is risky +but the 1 off will remain without it. This code now allows me to SKIP over +a message I do not want to send out on this run. */ -/* Loop to handle continuation host records in the database */ +sprintf(CS spool_dir, "%s/input/", spool_directory); -for (;;) +host_length = host_record->count * MESSAGE_ID_LENGTH; + +while (1) { - BOOL found = FALSE; + /* create an array to read entire message queue into memory for processing */ - sprintf(CS buffer, "%s/input/", spool_directory); - path_len = Ustrlen(buffer); + msgq = (msgq_t*) malloc(sizeof(msgq_t) * host_record->count); + msgq_count = host_record->count; + msgq_actual = msgq_count; - for (host_length -= MESSAGE_ID_LENGTH; host_length >= 0; - host_length -= MESSAGE_ID_LENGTH) + for (i = 0; i < host_record->count; ++i) { - struct stat statbuf; - Ustrncpy(new_message_id, host_record->text + host_length, + msgq[i].bKeep = TRUE; + + Ustrncpy(msgq[i].message_id, host_record->text + (i * MESSAGE_ID_LENGTH), MESSAGE_ID_LENGTH); - new_message_id[MESSAGE_ID_LENGTH] = 0; + msgq[i].message_id[MESSAGE_ID_LENGTH] = 0; + } + + /* first thing remove current message id if it exists */ + for (i = 0; i < msgq_count; ++i) + if (Ustrcmp(msgq[i].message_id, message_id) == 0) + { + msgq[i].bKeep = FALSE; + break; + } + + /* now find the next acceptable message_id */ + + bFound = FALSE; + + for (i = msgq_count - 1; i >= 0; --i) if (msgq[i].bKeep) + { if (split_spool_directory) - sprintf(CS(buffer + path_len), "%c/%s-D", new_message_id[5], new_message_id); + sprintf(CS spool_file, "%s%c/%s-D", + spool_dir, new_message_id[5], msgq[i].message_id); else - sprintf(CS(buffer + path_len), "%s-D", new_message_id); + sprintf(CS spool_file, "%s%s-D", spool_dir, msgq[i].message_id); - /* The listed message may be the one we are currently processing. If - so, we want to remove it from the list without doing anything else. - If not, do a stat to see if it is an existing message. If it is, break - the loop to handle it. No need to bother about locks; as this is all - "hint" processing, it won't matter if it doesn't exist by the time exim - actually tries to deliver it. */ - - if (Ustrcmp(new_message_id, message_id) != 0 && - Ustat(buffer, &statbuf) == 0) + if (Ustat(spool_file, &statbuf) != 0) + msgq[i].bKeep = FALSE; + else if (!oicf_func || oicf_func(msgq[i].message_id, oicf_data)) { - found = TRUE; + Ustrcpy(new_message_id, msgq[i].message_id); + msgq[i].bKeep = FALSE; + bFound = TRUE; break; } } - /* If we have removed all the message ids from the record delete the record. - If there is a continuation record, fetch it and remove it from the file, - as it will be rewritten as the main record. Repeat in the case of an - empty continuation. */ + /* re-count */ + for (msgq_actual = 0, i = 0; i < msgq_count; ++i) + if (msgq[i].bKeep) + msgq_actual++; + + /* reassemble the host record, based on removed message ids, from in + * memory queue. + */ + + if (msgq_actual <= 0) + { + host_length = 0; + host_record->count = 0; + } + else + { + host_length = msgq_actual * MESSAGE_ID_LENGTH; + host_record->count = msgq_actual; + + if (msgq_actual < msgq_count) + { + int new_count; + for (new_count = 0, i = 0; i < msgq_count; ++i) + if (msgq[i].bKeep) + Ustrncpy(&host_record->text[new_count++ * MESSAGE_ID_LENGTH], + msgq[i].message_id, MESSAGE_ID_LENGTH); + + host_record->text[new_count * MESSAGE_ID_LENGTH] = 0; + } + } + +/* Jeremy: check for a continuation record, this code I do not know how to +test but the code should work */ + + bContinuation = FALSE; while (host_length <= 0) { int i; - dbdata_wait *newr = NULL; + dbdata_wait * newr = NULL; /* Search for a continuation */ - for (i = host_record->sequence - 1; i >= 0 && newr == NULL; i--) + for (i = host_record->sequence - 1; i >= 0 && !newr; i--) { sprintf(CS buffer, "%.200s:%d", hostname, i); newr = dbfn_read(dbm_file, buffer); @@ -1750,7 +1820,7 @@ for (;;) /* If no continuation, delete the current and break the loop */ - if (newr == NULL) + if (!newr) { dbfn_delete(dbm_file, hostname); break; @@ -1761,11 +1831,12 @@ for (;;) dbfn_delete(dbm_file, buffer); host_record = newr; host_length = host_record->count * MESSAGE_ID_LENGTH; - } - /* If we found an existing message, break the continuation loop. */ + bContinuation = TRUE; + } - if (found) break; + if (bFound) + break; /* If host_length <= 0 we have emptied a record and not found a good message, and there are no continuation records. Otherwise there is a continuation @@ -1777,6 +1848,26 @@ for (;;) DEBUG(D_transport) debug_printf("waiting messages already delivered\n"); return FALSE; } + + /* we were not able to find an acceptable message, nor was there a + * continuation record. So bug out, outer logic will clean this up. + */ + + if (!bContinuation) + { + Ustrcpy (new_message_id, message_id); + dbfn_close(dbm_file); + return FALSE; + } + } /* we need to process a continuation record */ + +/* clean up in memory queue */ +if (msgq) + { + free (msgq); + msgq = NULL; + msgq_count = 0; + msgq_actual = 0; } /* Control gets here when an existing message has been encountered; its @@ -1786,7 +1877,19 @@ record if required, close the database, and return TRUE. */ if (host_length > 0) { + uschar msg [MESSAGE_ID_LENGTH + 1]; + int i; + host_record->count = host_length/MESSAGE_ID_LENGTH; + + /* rebuild the host_record->text */ + + for (i = 0; i < host_record->count; ++i) + { + Ustrncpy(msg, host_record->text + (i*MESSAGE_ID_LENGTH), MESSAGE_ID_LENGTH); + msg[MESSAGE_ID_LENGTH] = 0; + } + dbfn_write(dbm_file, hostname, host_record, (int)sizeof(dbdata_wait) + host_length); *more = TRUE; } @@ -1795,8 +1898,6 @@ dbfn_close(dbm_file); return TRUE; } - - /************************************************* * Deliver waiting message down same socket * *************************************************/ diff --git a/src/src/transports/smtp.c b/src/src/transports/smtp.c index 11d7fdd12..446b3702b 100644 --- a/src/src/transports/smtp.c +++ b/src/src/transports/smtp.c @@ -700,8 +700,6 @@ router_name = transport_name = NULL; } #endif - - /************************************************* * Synchronize SMTP responses * *************************************************/ @@ -1210,6 +1208,80 @@ return OK; #endif + +typedef struct smtp_compare_s +{ + uschar *current_sender_address; + struct transport_instance *tblock; +} smtp_compare_t; + +/* +Create a unique string that identifies this message, it is based on +sender_address, helo_data and tls_certificate if enabled. */ + +static uschar * +smtp_local_identity(uschar * sender, struct transport_instance * tblock) +{ +address_item * addr1; +uschar * if1 = US""; +uschar * helo1 = US""; +#ifdef SUPPORT_TLS +uschar * tlsc1 = US""; +#endif +uschar * save_sender_address = sender_address; +uschar * local_identity = NULL; +smtp_transport_options_block * ob = + (smtp_transport_options_block *)tblock->options_block; + +sender_address = sender; + +addr1 = deliver_make_addr (sender, TRUE); +deliver_set_expansions(addr1); + +if (ob->interface) + if1 = expand_string(ob->interface); + +if (ob->helo_data) + helo1 = expand_string(ob->helo_data); + +#ifdef SUPPORT_TLS +if (ob->tls_certificate) + tlsc1 = expand_string(ob->tls_certificate); +local_identity = string_sprintf ("%s^%s^%s", if1, helo1, tlsc1); +#else +local_identity = string_sprintf ("%s^%s", if1, helo1); +#endif + +deliver_set_expansions(NULL); +sender_address = save_sender_address; + +return local_identity; +} + + + +/* This routine is a callback that is called from transport_check_waiting. +This function will evaluate the incoming message versus the previous +message. If the incoming message is using a different local identity then +we will veto this new message. */ + +static BOOL +smtp_are_same_identities(uschar * message_id, smtp_compare_t * s_compare) +{ +uschar * save_sender_address = sender_address; +uschar * current_local_identity = + smtp_local_identity(s_compare->current_sender_address, s_compare->tblock); +uschar * new_sender_address = deliver_get_sender_address(message_id); +uschar * message_local_identity = + smtp_local_identity(new_sender_address, s_compare->tblock); + +sender_address = save_sender_address; + +return Ustrcmp(current_local_identity, message_local_identity) == 0; +} + + + /************************************************* * Deliver address list to given host * *************************************************/ @@ -1293,13 +1365,16 @@ smtp_inblock inblock; smtp_outblock outblock; int max_rcpt = tblock->max_addresses; uschar *igquotstr = US""; + uschar *helo_data = NULL; + uschar *message = NULL; uschar new_message_id[MESSAGE_ID_LENGTH + 1]; uschar *p; uschar buffer[4096]; uschar inbuffer[4096]; uschar outbuffer[4096]; +address_item * current_address; suppress_tls = suppress_tls; /* stop compiler warning when no TLS support */ @@ -2520,6 +2595,11 @@ DEBUG(D_transport) if (completed_address && ok && send_quit) { BOOL more; + smtp_compare_t t_compare; + + t_compare.tblock = tblock; + t_compare.current_sender_address = sender_address; + if ( first_addr != NULL || continue_more || ( ( tls_out.active < 0 @@ -2527,7 +2607,8 @@ if (completed_address && ok && send_quit) ) && transport_check_waiting(tblock->name, host->name, - tblock->connection_max_messages, new_message_id, &more) + tblock->connection_max_messages, new_message_id, &more, + (oicf)smtp_are_same_identities, (void*)&t_compare) ) ) { uschar *msg; -- cgit v1.2.3