diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/src/deliver.c | 109 | ||||
-rw-r--r-- | src/src/macros.h | 1 |
2 files changed, 55 insertions, 55 deletions
diff --git a/src/src/deliver.c b/src/src/deliver.c index 6a3df89bb..87f9cfb06 100644 --- a/src/src/deliver.c +++ b/src/src/deliver.c @@ -2315,17 +2315,47 @@ if (addr->special_action == SPECIAL_WARN && addr->transport->warn_message) -/* Put the chain of addrs on the defer list. Retry will happen -on the next queue run, earlier if triggered by a new message. -Loop for the next set of addresses. */ +/* Check transport for the given concurrency limit. Return TRUE if over +the limit (or an expansion failure), else FALSE and if there was a limit, +the key for the hints database used for the concurrency count. */ -static void -deferlist_chain(address_item * addr) +static BOOL +tpt_parallel_check(transport_instance * tp, address_item * addr, uschar ** key) { -address_item * next; -for (next = addr; next->next; next = next->next) ; -next->next = addr_defer; -addr_defer = addr; +unsigned max_parallel; + +if (!tp->max_parallel) return FALSE; + +max_parallel = (unsigned) expand_string_integer(tp->max_parallel, TRUE); +if (expand_string_message) + { + log_write(0, LOG_MAIN|LOG_PANIC, "Failed to expand max_parallel option " + "in %s transport (%s): %s", tp->name, addr->address, + expand_string_message); + return TRUE; + } + +if (max_parallel > 0) + { + uschar * serialize_key = string_sprintf("tpt-serialize-%s", tp->name); + if (!enq_start(serialize_key, max_parallel)) + { + address_item * next; + DEBUG(D_transport) + debug_printf("skipping tpt %s because concurrency limit %u reached\n", + tp->name, max_parallel); + do + { + next = addr->next; + addr->message = US"concurrency limit reached for transport"; + addr->basic_errno = ERRNO_TRETRY; + post_process_one(addr, DEFER, LOG_MAIN, DTYPE_TRANSPORT, 0); + } while ((addr = next)); + return TRUE; + } + *key = serialize_key; + } +return FALSE; } @@ -2632,33 +2662,18 @@ while (addr_local) We use a hints DB entry, incremented here and decremented after the transport (and any shadow transport) completes. */ - if (tp->max_parallel) + if (tpt_parallel_check(tp, addr, &serialize_key)) { - int_eximarith_t max_parallel = - expand_string_integer(tp->max_parallel, TRUE); if (expand_string_message) { logflags |= LOG_PANIC; - log_write(0, LOG_MAIN|LOG_PANIC, "Failed to expand max_parallel option " - "in %s transport (%s): %s", tp->name, addr->address, - expand_string_message); - for (addr2 = addr->next; addr; addr = addr2, addr2 = addr2->next) + do + { + addr = addr->next; post_process_one(addr, DEFER, logflags, DTYPE_TRANSPORT, 0); - continue; - } - if ( max_parallel > 0 - && !enq_start( - serialize_key = string_sprintf("tpt-serialize-%s", tp->name), - (unsigned) max_parallel) - ) - { - DEBUG(D_transport) - debug_printf("skipping tpt %s because parallelism limit %u reached\n", - tp->name, (unsigned) max_parallel); - - deferlist_chain(addr); - continue; + } while ((addr = addr2)); } + continue; /* Loop for the next set of addresses. */ } @@ -4091,29 +4106,11 @@ for (delivery_count = 0; addr_remote; delivery_count++) The hints DB entry is decremented in par_reduce(), when we reap the transport process. */ - if (tp->max_parallel) - { - int_eximarith_t max_parallel = - expand_string_integer(tp->max_parallel, TRUE); - if (expand_string_message) - { - panicmsg = expand_string_message; + if (tpt_parallel_check(tp, addr, &serialize_key)) + if ((panicmsg = expand_string_message)) goto panic_continue; - } - if ( max_parallel > 0 - && !enq_start( - serialize_key = string_sprintf("tpt-serialize-%s", tp->name), - (unsigned) max_parallel) - ) - { - DEBUG(D_transport) - debug_printf("skipping tpt %s because parallelism limit %u reached\n", - tp->name, (unsigned) max_parallel); - - deferlist_chain(addr); - continue; - } - } + else + continue; /* Loop for the next set of addresses. */ /* Set up the expansion variables for this set of addresses */ @@ -4207,7 +4204,11 @@ for (delivery_count = 0; addr_remote; delivery_count++) } else - deferlist_chain(addr); + { + while (next->next) next = next->next; + next->next = addr_defer; + addr_defer = addr; + } continue; } @@ -6674,7 +6675,6 @@ if (addr_local) so just queue them all. */ if (queue_run_local) - { while (addr_remote) { address_item *addr = addr_remote; @@ -6684,7 +6684,6 @@ if (queue_run_local) addr->message = US"remote deliveries suppressed"; (void)post_process_one(addr, DEFER, LOG_MAIN, DTYPE_TRANSPORT, 0); } - } /* Handle remote deliveries */ diff --git a/src/src/macros.h b/src/src/macros.h index 0ce24f8cb..5a35a9b56 100644 --- a/src/src/macros.h +++ b/src/src/macros.h @@ -545,6 +545,7 @@ to conflict with system errno values. */ #define ERRNO_HRETRY (-53) /* Not time for any remote host */ #define ERRNO_LOCAL_ONLY (-54) /* Local-only delivery */ #define ERRNO_QUEUE_DOMAIN (-55) /* Domain in queue_domains */ +#define ERRNO_TRETRY (-56) /* Transport concurrency limit */ /* Special actions to take after failure or deferment. */ |