summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJeremy Harris <jgh146exb@wizmail.org>2015-10-13 11:43:34 +0100
committerJeremy Harris <jgh146exb@wizmail.org>2015-10-13 11:43:34 +0100
commit3070ceeeed0574aab5e69f5026b99ca19bdf2fcc (patch)
tree286781096d4616ccb253358e70e96370f5600936 /src
parentac0cac6839368273e14e574ca5b9f817edcd6208 (diff)
Log deferred deliveries for transport max_parallel
Diffstat (limited to 'src')
-rw-r--r--src/src/deliver.c109
-rw-r--r--src/src/macros.h1
2 files changed, 55 insertions, 55 deletions
diff --git a/src/src/deliver.c b/src/src/deliver.c
index 6a3df89bb..87f9cfb06 100644
--- a/src/src/deliver.c
+++ b/src/src/deliver.c
@@ -2315,17 +2315,47 @@ if (addr->special_action == SPECIAL_WARN && addr->transport->warn_message)
-/* Put the chain of addrs on the defer list. Retry will happen
-on the next queue run, earlier if triggered by a new message.
-Loop for the next set of addresses. */
+/* Check transport for the given concurrency limit. Return TRUE if over
+the limit (or an expansion failure), else FALSE and if there was a limit,
+the key for the hints database used for the concurrency count. */
-static void
-deferlist_chain(address_item * addr)
+static BOOL
+tpt_parallel_check(transport_instance * tp, address_item * addr, uschar ** key)
{
-address_item * next;
-for (next = addr; next->next; next = next->next) ;
-next->next = addr_defer;
-addr_defer = addr;
+unsigned max_parallel;
+
+if (!tp->max_parallel) return FALSE;
+
+max_parallel = (unsigned) expand_string_integer(tp->max_parallel, TRUE);
+if (expand_string_message)
+ {
+ log_write(0, LOG_MAIN|LOG_PANIC, "Failed to expand max_parallel option "
+ "in %s transport (%s): %s", tp->name, addr->address,
+ expand_string_message);
+ return TRUE;
+ }
+
+if (max_parallel > 0)
+ {
+ uschar * serialize_key = string_sprintf("tpt-serialize-%s", tp->name);
+ if (!enq_start(serialize_key, max_parallel))
+ {
+ address_item * next;
+ DEBUG(D_transport)
+ debug_printf("skipping tpt %s because concurrency limit %u reached\n",
+ tp->name, max_parallel);
+ do
+ {
+ next = addr->next;
+ addr->message = US"concurrency limit reached for transport";
+ addr->basic_errno = ERRNO_TRETRY;
+ post_process_one(addr, DEFER, LOG_MAIN, DTYPE_TRANSPORT, 0);
+ } while ((addr = next));
+ return TRUE;
+ }
+ *key = serialize_key;
+ }
+return FALSE;
}
@@ -2632,33 +2662,18 @@ while (addr_local)
We use a hints DB entry, incremented here and decremented after
the transport (and any shadow transport) completes. */
- if (tp->max_parallel)
+ if (tpt_parallel_check(tp, addr, &serialize_key))
{
- int_eximarith_t max_parallel =
- expand_string_integer(tp->max_parallel, TRUE);
if (expand_string_message)
{
logflags |= LOG_PANIC;
- log_write(0, LOG_MAIN|LOG_PANIC, "Failed to expand max_parallel option "
- "in %s transport (%s): %s", tp->name, addr->address,
- expand_string_message);
- for (addr2 = addr->next; addr; addr = addr2, addr2 = addr2->next)
+ do
+ {
+ addr = addr->next;
post_process_one(addr, DEFER, logflags, DTYPE_TRANSPORT, 0);
- continue;
- }
- if ( max_parallel > 0
- && !enq_start(
- serialize_key = string_sprintf("tpt-serialize-%s", tp->name),
- (unsigned) max_parallel)
- )
- {
- DEBUG(D_transport)
- debug_printf("skipping tpt %s because parallelism limit %u reached\n",
- tp->name, (unsigned) max_parallel);
-
- deferlist_chain(addr);
- continue;
+ } while ((addr = addr2));
}
+ continue; /* Loop for the next set of addresses. */
}
@@ -4091,29 +4106,11 @@ for (delivery_count = 0; addr_remote; delivery_count++)
The hints DB entry is decremented in par_reduce(), when we reap the
transport process. */
- if (tp->max_parallel)
- {
- int_eximarith_t max_parallel =
- expand_string_integer(tp->max_parallel, TRUE);
- if (expand_string_message)
- {
- panicmsg = expand_string_message;
+ if (tpt_parallel_check(tp, addr, &serialize_key))
+ if ((panicmsg = expand_string_message))
goto panic_continue;
- }
- if ( max_parallel > 0
- && !enq_start(
- serialize_key = string_sprintf("tpt-serialize-%s", tp->name),
- (unsigned) max_parallel)
- )
- {
- DEBUG(D_transport)
- debug_printf("skipping tpt %s because parallelism limit %u reached\n",
- tp->name, (unsigned) max_parallel);
-
- deferlist_chain(addr);
- continue;
- }
- }
+ else
+ continue; /* Loop for the next set of addresses. */
/* Set up the expansion variables for this set of addresses */
@@ -4207,7 +4204,11 @@ for (delivery_count = 0; addr_remote; delivery_count++)
}
else
- deferlist_chain(addr);
+ {
+ while (next->next) next = next->next;
+ next->next = addr_defer;
+ addr_defer = addr;
+ }
continue;
}
@@ -6674,7 +6675,6 @@ if (addr_local)
so just queue them all. */
if (queue_run_local)
- {
while (addr_remote)
{
address_item *addr = addr_remote;
@@ -6684,7 +6684,6 @@ if (queue_run_local)
addr->message = US"remote deliveries suppressed";
(void)post_process_one(addr, DEFER, LOG_MAIN, DTYPE_TRANSPORT, 0);
}
- }
/* Handle remote deliveries */
diff --git a/src/src/macros.h b/src/src/macros.h
index 0ce24f8cb..5a35a9b56 100644
--- a/src/src/macros.h
+++ b/src/src/macros.h
@@ -545,6 +545,7 @@ to conflict with system errno values. */
#define ERRNO_HRETRY (-53) /* Not time for any remote host */
#define ERRNO_LOCAL_ONLY (-54) /* Local-only delivery */
#define ERRNO_QUEUE_DOMAIN (-55) /* Domain in queue_domains */
+#define ERRNO_TRETRY (-56) /* Transport concurrency limit */
/* Special actions to take after failure or deferment. */