summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/src/deliver.c122
-rw-r--r--src/src/globals.c1
-rw-r--r--src/src/structs.h1
-rw-r--r--src/src/transport.c2
4 files changed, 108 insertions, 18 deletions
diff --git a/src/src/deliver.c b/src/src/deliver.c
index a1d16eced..6a3df89bb 100644
--- a/src/src/deliver.c
+++ b/src/src/deliver.c
@@ -1945,9 +1945,6 @@ if ( !shadowing
}
}
-/*XXX prefer to do max_parallel check before we fork. Are we allowed to defer
-this late (we could be a shadow tpt)? */
-
/* Create the pipe for inter-process communication. */
if (pipe(pfd) != 0)
@@ -2317,6 +2314,22 @@ if (addr->special_action == SPECIAL_WARN && addr->transport->warn_message)
+
+/* Put the chain of addrs on the defer list. Retry will happen
+on the next queue run, earlier if triggered by a new message.
+Loop for the next set of addresses. */
+
+static void
+deferlist_chain(address_item * addr)
+{
+address_item * next;
+for (next = addr; next->next; next = next->next) ;
+next->next = addr_defer;
+addr_defer = addr;
+}
+
+
+
/*************************************************
* Do local deliveries *
*************************************************/
@@ -2348,6 +2361,7 @@ while (addr_local)
int logflags = LOG_MAIN;
int logchar = dont_deliver? '*' : '=';
transport_instance *tp;
+ uschar * serialize_key = NULL;
/* Pick the first undelivered address off the chain */
@@ -2483,7 +2497,7 @@ while (addr_local)
last = next;
batch_count++;
}
- else anchor = &(next->next); /* Skip the address */
+ else anchor = &next->next; /* Skip the address */
}
}
@@ -2614,6 +2628,40 @@ while (addr_local)
if (!addr) continue;
+ /* If the transport is limited for parallellism, enforce that here.
+ We use a hints DB entry, incremented here and decremented after
+ the transport (and any shadow transport) completes. */
+
+ if (tp->max_parallel)
+ {
+ int_eximarith_t max_parallel =
+ expand_string_integer(tp->max_parallel, TRUE);
+ if (expand_string_message)
+ {
+ logflags |= LOG_PANIC;
+ log_write(0, LOG_MAIN|LOG_PANIC, "Failed to expand max_parallel option "
+ "in %s transport (%s): %s", tp->name, addr->address,
+ expand_string_message);
+ for (addr2 = addr->next; addr; addr = addr2, addr2 = addr2->next)
+ post_process_one(addr, DEFER, logflags, DTYPE_TRANSPORT, 0);
+ continue;
+ }
+ if ( max_parallel > 0
+ && !enq_start(
+ serialize_key = string_sprintf("tpt-serialize-%s", tp->name),
+ (unsigned) max_parallel)
+ )
+ {
+ DEBUG(D_transport)
+ debug_printf("skipping tpt %s because parallelism limit %u reached\n",
+ tp->name, (unsigned) max_parallel);
+
+ deferlist_chain(addr);
+ continue;
+ }
+ }
+
+
/* So, finally, we do have some addresses that can be passed to the
transport. Before doing so, set up variables that are relevant to a
single delivery. */
@@ -2719,6 +2767,10 @@ while (addr_local)
deliver_set_expansions(NULL);
+ /* If the transport was parallelism-limited, decrement the hints DB record. */
+
+ if (serialize_key) enq_end(serialize_key);
+
/* Now we can process the results of the real transport. We must take each
address off the chain first, because post_process_one() puts it on another
chain. */
@@ -3730,7 +3782,14 @@ while (parcount > max)
"remote delivery process count got out of step");
parcount = 0;
}
- else remote_post_process(doneaddr, LOG_MAIN, NULL, fallback);
+ else
+ {
+ transport_instance * tp = doneaddr->transport;
+ if (tp->max_parallel)
+ enq_end(string_sprintf("tpt-serialize-%s", tp->name));
+
+ remote_post_process(doneaddr, LOG_MAIN, NULL, fallback);
+ }
}
}
@@ -3853,6 +3912,7 @@ for (delivery_count = 0; addr_remote; delivery_count++)
address_item *last = addr;
address_item *next;
uschar * panicmsg;
+ uschar * serialize_key = NULL;
/* Pull the first address right off the list. */
@@ -4027,6 +4087,34 @@ for (delivery_count = 0; addr_remote; delivery_count++)
return FALSE;
}
+ /* If the transport is limited for parallellism, enforce that here.
+ The hints DB entry is decremented in par_reduce(), when we reap the
+ transport process. */
+
+ if (tp->max_parallel)
+ {
+ int_eximarith_t max_parallel =
+ expand_string_integer(tp->max_parallel, TRUE);
+ if (expand_string_message)
+ {
+ panicmsg = expand_string_message;
+ goto panic_continue;
+ }
+ if ( max_parallel > 0
+ && !enq_start(
+ serialize_key = string_sprintf("tpt-serialize-%s", tp->name),
+ (unsigned) max_parallel)
+ )
+ {
+ DEBUG(D_transport)
+ debug_printf("skipping tpt %s because parallelism limit %u reached\n",
+ tp->name, (unsigned) max_parallel);
+
+ deferlist_chain(addr);
+ continue;
+ }
+ }
+
/* Set up the expansion variables for this set of addresses */
deliver_set_expansions(addr);
@@ -4055,7 +4143,7 @@ for (delivery_count = 0; addr_remote; delivery_count++)
{
panicmsg = string_sprintf("Failed to expand return path \"%s\": %s",
tp->return_path, expand_string_message);
- goto panic_continue;
+ goto enq_continue;
}
}
@@ -4066,7 +4154,7 @@ for (delivery_count = 0; addr_remote; delivery_count++)
if (!findugid(addr, tp, &uid, &gid, &use_initgroups))
{
panicmsg = NULL;
- goto panic_continue;
+ goto enq_continue;
}
/* If this transport has a setup function, call it now so that it gets
@@ -4104,11 +4192,11 @@ for (delivery_count = 0; addr_remote; delivery_count++)
if (!ok)
{
DEBUG(D_deliver) debug_printf("not suitable for continue_transport\n");
- next = addr;
+ if (serialize_key) enq_end(serialize_key);
if (addr->fallback_hosts && !fallback)
{
- for (;; next = next->next)
+ for (next = addr; ; next = next->next)
{
next->host_list = next->fallback_hosts;
DEBUG(D_deliver) debug_printf("%s queued for fallback host(s)\n", next->address);
@@ -4119,11 +4207,7 @@ for (delivery_count = 0; addr_remote; delivery_count++)
}
else
- {
- while (next->next) next = next->next;
- next->next = addr_defer;
- addr_defer = addr;
- }
+ deferlist_chain(addr);
continue;
}
@@ -4185,7 +4269,7 @@ for (delivery_count = 0; addr_remote; delivery_count++)
if (!pipe_done)
{
panicmsg = string_sprintf("unable to create pipe: %s", strerror(errno));
- goto panic_continue;
+ goto enq_continue;
}
/* Find a free slot in the pardata list. Must do this after the possible
@@ -4203,7 +4287,7 @@ for (delivery_count = 0; addr_remote; delivery_count++)
(void)close(pfd[pipe_write]);
(void)close(pfd[pipe_read]);
panicmsg = US"Unexpectedly no free subprocess slot";
- goto panic_continue;
+ goto enq_continue;
}
/* Now fork a subprocess to do the remote delivery, but before doing so,
@@ -4532,7 +4616,7 @@ for (delivery_count = 0; addr_remote; delivery_count++)
(void)close(pfd[pipe_read]);
panicmsg = string_sprintf("fork failed for remote delivery to %s: %s",
addr->domain, strerror(errno));
- goto panic_continue;
+ goto enq_continue;
}
/* Fork succeeded; increment the count, and remember relevant data for
@@ -4567,6 +4651,8 @@ for (delivery_count = 0; addr_remote; delivery_count++)
continue;
+enq_continue:
+ if (serialize_key) enq_end(serialize_key);
panic_continue:
remote_post_process(addr, LOG_MAIN|LOG_PANIC, panicmsg, fallback);
continue;
@@ -7448,7 +7534,7 @@ if (!addr_defer)
#ifdef EXPERIMENTAL_EVENT
(void) event_raise(event_action, US"msg:complete", NULL);
#endif
-}
+ }
/* If there are deferred addresses, we are keeping this message because it is
not yet completed. Lose any temporary files that were catching output from
diff --git a/src/src/globals.c b/src/src/globals.c
index 55a101ffa..20e578e27 100644
--- a/src/src/globals.c
+++ b/src/src/globals.c
@@ -1424,6 +1424,7 @@ transport_instance transport_defaults = {
NULL, /* remove_headers */
NULL, /* return_path */
NULL, /* debug_string */
+ NULL, /* max_parallel */
NULL, /* message_size_limit */
NULL, /* headers_rewrite */
NULL, /* rewrite_rules */
diff --git a/src/src/structs.h b/src/src/structs.h
index c36d08ca7..713702ea5 100644
--- a/src/src/structs.h
+++ b/src/src/structs.h
@@ -171,6 +171,7 @@ typedef struct transport_instance {
uschar *remove_headers; /* Remove these headers */
uschar *return_path; /* Overriding (rewriting) return path */
uschar *debug_string; /* Debugging output */
+ uschar *max_parallel; /* Number of concurrent instances */
uschar *message_size_limit; /* Biggest message this transport handles */
uschar *headers_rewrite; /* Rules for rewriting headers */
rewrite_rule *rewrite_rules; /* Parsed rewriting rules */
diff --git a/src/src/transport.c b/src/src/transport.c
index a6ad3ed34..c258bfd9d 100644
--- a/src/src/transport.c
+++ b/src/src/transport.c
@@ -84,6 +84,8 @@ optionlist optionlist_transports[] = {
(void *)offsetof(transport_instance, home_dir) },
{ "initgroups", opt_bool|opt_public,
(void *)offsetof(transport_instance, initgroups) },
+ { "max_parallel", opt_stringptr|opt_public,
+ (void *)offsetof(transport_instance, max_parallel) },
{ "message_size_limit", opt_stringptr|opt_public,
(void *)offsetof(transport_instance, message_size_limit) },
{ "rcpt_include_affixes", opt_bool|opt_public,