From fa41615da7020d4d951ed3a0b98464bed66ff58b Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Thu, 8 Oct 2015 23:59:27 +0100 Subject: max_parallel transport option --- src/src/deliver.c | 122 ++++++++++++++++++++++++++++++++++++++++++++-------- src/src/globals.c | 1 + src/src/structs.h | 1 + src/src/transport.c | 2 + 4 files changed, 108 insertions(+), 18 deletions(-) (limited to 'src') diff --git a/src/src/deliver.c b/src/src/deliver.c index a1d16eced..6a3df89bb 100644 --- a/src/src/deliver.c +++ b/src/src/deliver.c @@ -1945,9 +1945,6 @@ if ( !shadowing } } -/*XXX prefer to do max_parallel check before we fork. Are we allowed to defer -this late (we could be a shadow tpt)? */ - /* Create the pipe for inter-process communication. */ if (pipe(pfd) != 0) @@ -2317,6 +2314,22 @@ if (addr->special_action == SPECIAL_WARN && addr->transport->warn_message) + +/* Put the chain of addrs on the defer list. Retry will happen +on the next queue run, earlier if triggered by a new message. +Loop for the next set of addresses. */ + +static void +deferlist_chain(address_item * addr) +{ +address_item * next; +for (next = addr; next->next; next = next->next) ; +next->next = addr_defer; +addr_defer = addr; +} + + + /************************************************* * Do local deliveries * *************************************************/ @@ -2348,6 +2361,7 @@ while (addr_local) int logflags = LOG_MAIN; int logchar = dont_deliver? '*' : '='; transport_instance *tp; + uschar * serialize_key = NULL; /* Pick the first undelivered address off the chain */ @@ -2483,7 +2497,7 @@ while (addr_local) last = next; batch_count++; } - else anchor = &(next->next); /* Skip the address */ + else anchor = &next->next; /* Skip the address */ } } @@ -2614,6 +2628,40 @@ while (addr_local) if (!addr) continue; + /* If the transport is limited for parallellism, enforce that here. + We use a hints DB entry, incremented here and decremented after + the transport (and any shadow transport) completes. */ + + if (tp->max_parallel) + { + int_eximarith_t max_parallel = + expand_string_integer(tp->max_parallel, TRUE); + if (expand_string_message) + { + logflags |= LOG_PANIC; + log_write(0, LOG_MAIN|LOG_PANIC, "Failed to expand max_parallel option " + "in %s transport (%s): %s", tp->name, addr->address, + expand_string_message); + for (addr2 = addr->next; addr; addr = addr2, addr2 = addr2->next) + post_process_one(addr, DEFER, logflags, DTYPE_TRANSPORT, 0); + continue; + } + if ( max_parallel > 0 + && !enq_start( + serialize_key = string_sprintf("tpt-serialize-%s", tp->name), + (unsigned) max_parallel) + ) + { + DEBUG(D_transport) + debug_printf("skipping tpt %s because parallelism limit %u reached\n", + tp->name, (unsigned) max_parallel); + + deferlist_chain(addr); + continue; + } + } + + /* So, finally, we do have some addresses that can be passed to the transport. Before doing so, set up variables that are relevant to a single delivery. */ @@ -2719,6 +2767,10 @@ while (addr_local) deliver_set_expansions(NULL); + /* If the transport was parallelism-limited, decrement the hints DB record. */ + + if (serialize_key) enq_end(serialize_key); + /* Now we can process the results of the real transport. We must take each address off the chain first, because post_process_one() puts it on another chain. */ @@ -3730,7 +3782,14 @@ while (parcount > max) "remote delivery process count got out of step"); parcount = 0; } - else remote_post_process(doneaddr, LOG_MAIN, NULL, fallback); + else + { + transport_instance * tp = doneaddr->transport; + if (tp->max_parallel) + enq_end(string_sprintf("tpt-serialize-%s", tp->name)); + + remote_post_process(doneaddr, LOG_MAIN, NULL, fallback); + } } } @@ -3853,6 +3912,7 @@ for (delivery_count = 0; addr_remote; delivery_count++) address_item *last = addr; address_item *next; uschar * panicmsg; + uschar * serialize_key = NULL; /* Pull the first address right off the list. */ @@ -4027,6 +4087,34 @@ for (delivery_count = 0; addr_remote; delivery_count++) return FALSE; } + /* If the transport is limited for parallellism, enforce that here. + The hints DB entry is decremented in par_reduce(), when we reap the + transport process. */ + + if (tp->max_parallel) + { + int_eximarith_t max_parallel = + expand_string_integer(tp->max_parallel, TRUE); + if (expand_string_message) + { + panicmsg = expand_string_message; + goto panic_continue; + } + if ( max_parallel > 0 + && !enq_start( + serialize_key = string_sprintf("tpt-serialize-%s", tp->name), + (unsigned) max_parallel) + ) + { + DEBUG(D_transport) + debug_printf("skipping tpt %s because parallelism limit %u reached\n", + tp->name, (unsigned) max_parallel); + + deferlist_chain(addr); + continue; + } + } + /* Set up the expansion variables for this set of addresses */ deliver_set_expansions(addr); @@ -4055,7 +4143,7 @@ for (delivery_count = 0; addr_remote; delivery_count++) { panicmsg = string_sprintf("Failed to expand return path \"%s\": %s", tp->return_path, expand_string_message); - goto panic_continue; + goto enq_continue; } } @@ -4066,7 +4154,7 @@ for (delivery_count = 0; addr_remote; delivery_count++) if (!findugid(addr, tp, &uid, &gid, &use_initgroups)) { panicmsg = NULL; - goto panic_continue; + goto enq_continue; } /* If this transport has a setup function, call it now so that it gets @@ -4104,11 +4192,11 @@ for (delivery_count = 0; addr_remote; delivery_count++) if (!ok) { DEBUG(D_deliver) debug_printf("not suitable for continue_transport\n"); - next = addr; + if (serialize_key) enq_end(serialize_key); if (addr->fallback_hosts && !fallback) { - for (;; next = next->next) + for (next = addr; ; next = next->next) { next->host_list = next->fallback_hosts; DEBUG(D_deliver) debug_printf("%s queued for fallback host(s)\n", next->address); @@ -4119,11 +4207,7 @@ for (delivery_count = 0; addr_remote; delivery_count++) } else - { - while (next->next) next = next->next; - next->next = addr_defer; - addr_defer = addr; - } + deferlist_chain(addr); continue; } @@ -4185,7 +4269,7 @@ for (delivery_count = 0; addr_remote; delivery_count++) if (!pipe_done) { panicmsg = string_sprintf("unable to create pipe: %s", strerror(errno)); - goto panic_continue; + goto enq_continue; } /* Find a free slot in the pardata list. Must do this after the possible @@ -4203,7 +4287,7 @@ for (delivery_count = 0; addr_remote; delivery_count++) (void)close(pfd[pipe_write]); (void)close(pfd[pipe_read]); panicmsg = US"Unexpectedly no free subprocess slot"; - goto panic_continue; + goto enq_continue; } /* Now fork a subprocess to do the remote delivery, but before doing so, @@ -4532,7 +4616,7 @@ for (delivery_count = 0; addr_remote; delivery_count++) (void)close(pfd[pipe_read]); panicmsg = string_sprintf("fork failed for remote delivery to %s: %s", addr->domain, strerror(errno)); - goto panic_continue; + goto enq_continue; } /* Fork succeeded; increment the count, and remember relevant data for @@ -4567,6 +4651,8 @@ for (delivery_count = 0; addr_remote; delivery_count++) continue; +enq_continue: + if (serialize_key) enq_end(serialize_key); panic_continue: remote_post_process(addr, LOG_MAIN|LOG_PANIC, panicmsg, fallback); continue; @@ -7448,7 +7534,7 @@ if (!addr_defer) #ifdef EXPERIMENTAL_EVENT (void) event_raise(event_action, US"msg:complete", NULL); #endif -} + } /* If there are deferred addresses, we are keeping this message because it is not yet completed. Lose any temporary files that were catching output from diff --git a/src/src/globals.c b/src/src/globals.c index 55a101ffa..20e578e27 100644 --- a/src/src/globals.c +++ b/src/src/globals.c @@ -1424,6 +1424,7 @@ transport_instance transport_defaults = { NULL, /* remove_headers */ NULL, /* return_path */ NULL, /* debug_string */ + NULL, /* max_parallel */ NULL, /* message_size_limit */ NULL, /* headers_rewrite */ NULL, /* rewrite_rules */ diff --git a/src/src/structs.h b/src/src/structs.h index c36d08ca7..713702ea5 100644 --- a/src/src/structs.h +++ b/src/src/structs.h @@ -171,6 +171,7 @@ typedef struct transport_instance { uschar *remove_headers; /* Remove these headers */ uschar *return_path; /* Overriding (rewriting) return path */ uschar *debug_string; /* Debugging output */ + uschar *max_parallel; /* Number of concurrent instances */ uschar *message_size_limit; /* Biggest message this transport handles */ uschar *headers_rewrite; /* Rules for rewriting headers */ rewrite_rule *rewrite_rules; /* Parsed rewriting rules */ diff --git a/src/src/transport.c b/src/src/transport.c index a6ad3ed34..c258bfd9d 100644 --- a/src/src/transport.c +++ b/src/src/transport.c @@ -84,6 +84,8 @@ optionlist optionlist_transports[] = { (void *)offsetof(transport_instance, home_dir) }, { "initgroups", opt_bool|opt_public, (void *)offsetof(transport_instance, initgroups) }, + { "max_parallel", opt_stringptr|opt_public, + (void *)offsetof(transport_instance, max_parallel) }, { "message_size_limit", opt_stringptr|opt_public, (void *)offsetof(transport_instance, message_size_limit) }, { "rcpt_include_affixes", opt_bool|opt_public, -- cgit v1.2.3