From fa41615da7020d4d951ed3a0b98464bed66ff58b Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Thu, 8 Oct 2015 23:59:27 +0100 Subject: max_parallel transport option --- doc/doc-docbook/spec.xfpt | 39 +++++++++++++- doc/doc-txt/NewStuff | 2 + src/src/deliver.c | 122 ++++++++++++++++++++++++++++++++++++------- src/src/globals.c | 1 + src/src/structs.h | 1 + src/src/transport.c | 2 + test/confs/0288 | 1 + test/confs/0611 | 68 ++++++++++++++++++++++++ test/log/0288 | 4 ++ test/log/0611 | 27 ++++++++++ test/scripts/0000-Basic/0288 | 31 ++++++++++- test/scripts/0000-Basic/0611 | 43 +++++++++++++++ test/stdout/0288 | 24 +++++++++ test/stdout/0390 | 2 + 14 files changed, 347 insertions(+), 20 deletions(-) create mode 100644 test/confs/0611 create mode 100644 test/log/0611 create mode 100644 test/scripts/0000-Basic/0611 diff --git a/doc/doc-docbook/spec.xfpt b/doc/doc-docbook/spec.xfpt index ed3533a0c..f9a8efa98 100644 --- a/doc/doc-docbook/spec.xfpt +++ b/doc/doc-docbook/spec.xfpt @@ -1575,7 +1575,7 @@ If a host is unreachable for a period of time, a number of messages may be waiting for it by the time it recovers, and sending them in a single SMTP connection is clearly beneficial. Whenever a delivery to a remote host is deferred, -.cindex "hints database" +.cindex "hints database" "deferred deliveries" Exim makes a note in its hints database, and whenever a successful SMTP delivery has happened, it looks to see if any other messages are waiting for the same host. If any are found, they are sent over the same SMTP @@ -20539,6 +20539,32 @@ transport, the &[initgroups()]& function is called when running the transport to ensure that any additional groups associated with the uid are set up. +.new +.option max_parallel transports integer&!! unset +.cindex limit "transport parallelism" +.cindex transport "parallel processes" +.cindex transport "concurrency limit" +.cindex "delivery" "parallelism for transport" +If this option is set and expands to an integer greater than zero +it limits the number of concurrent runs of the transport. +The control does not apply to shadow transports. + +.cindex "hints database" "transport concurrency control" +Exim implements this control by means of a hints database in which a record is +incremented whenever a transport process is beaing created. The record +is decremented and possibly removed when the process terminates. +Obviously there is scope for +records to get left lying around if there is a system or program crash. To +guard against this, Exim ignores any records that are more than six hours old. + +If you use this option, you should also arrange to delete the +relevant hints database whenever your system reboots. The names of the files +start with &_misc_& and they are kept in the &_spool/db_& directory. There +may be one or two files, depending on the type of DBM in use. The same files +are used for ETRN and smtp transport serialization. +.wen + + .option message_size_limit transports string&!! 0 .cindex "limit" "message size per transport" .cindex "size" "of message, limit" @@ -22436,6 +22462,10 @@ If two messages arrive at almost the same time, and both are routed to a pipe delivery, the two pipe transports may be run concurrently. You must ensure that any pipe commands you set up are robust against this happening. If the commands write to a file, the &%exim_lock%& utility might be of use. +.new +Alternatively the &%max_parallel%& option could be used with a value +of "1" to enforce serialization. +.wen @@ -23618,6 +23648,10 @@ start with &_misc_& and they are kept in the &_spool/db_& directory. There may be one or two files, depending on the type of DBM in use. The same files are used for ETRN serialization. +.new +See also the &%max_parallel%& generic transport option. +.wen + .option size_addition smtp integer 1024 .cindex "SMTP" "SIZE" @@ -36265,6 +36299,9 @@ Serializing ETRN runs (when &%smtp_etrn_serialize%& is set) .next Serializing delivery to a specific host (when &%serialize_hosts%& is set in an &(smtp)& transport) +.next +Limiting the concurrency of specific transports (when &%max_parallel%& is set +in a transport) .endlist diff --git a/doc/doc-txt/NewStuff b/doc/doc-txt/NewStuff index e7d42d43b..a7185a4e2 100644 --- a/doc/doc-txt/NewStuff +++ b/doc/doc-txt/NewStuff @@ -15,6 +15,8 @@ Version 4.87 2. New $callout_address variable records the address used for a spam=, malware= or verify= callout. + 3. Transports now take a "max_parallel" option, to limit concurrency. + Version 4.86 ------------ diff --git a/src/src/deliver.c b/src/src/deliver.c index a1d16eced..6a3df89bb 100644 --- a/src/src/deliver.c +++ b/src/src/deliver.c @@ -1945,9 +1945,6 @@ if ( !shadowing } } -/*XXX prefer to do max_parallel check before we fork. Are we allowed to defer -this late (we could be a shadow tpt)? */ - /* Create the pipe for inter-process communication. */ if (pipe(pfd) != 0) @@ -2317,6 +2314,22 @@ if (addr->special_action == SPECIAL_WARN && addr->transport->warn_message) + +/* Put the chain of addrs on the defer list. Retry will happen +on the next queue run, earlier if triggered by a new message. +Loop for the next set of addresses. */ + +static void +deferlist_chain(address_item * addr) +{ +address_item * next; +for (next = addr; next->next; next = next->next) ; +next->next = addr_defer; +addr_defer = addr; +} + + + /************************************************* * Do local deliveries * *************************************************/ @@ -2348,6 +2361,7 @@ while (addr_local) int logflags = LOG_MAIN; int logchar = dont_deliver? '*' : '='; transport_instance *tp; + uschar * serialize_key = NULL; /* Pick the first undelivered address off the chain */ @@ -2483,7 +2497,7 @@ while (addr_local) last = next; batch_count++; } - else anchor = &(next->next); /* Skip the address */ + else anchor = &next->next; /* Skip the address */ } } @@ -2614,6 +2628,40 @@ while (addr_local) if (!addr) continue; + /* If the transport is limited for parallellism, enforce that here. + We use a hints DB entry, incremented here and decremented after + the transport (and any shadow transport) completes. */ + + if (tp->max_parallel) + { + int_eximarith_t max_parallel = + expand_string_integer(tp->max_parallel, TRUE); + if (expand_string_message) + { + logflags |= LOG_PANIC; + log_write(0, LOG_MAIN|LOG_PANIC, "Failed to expand max_parallel option " + "in %s transport (%s): %s", tp->name, addr->address, + expand_string_message); + for (addr2 = addr->next; addr; addr = addr2, addr2 = addr2->next) + post_process_one(addr, DEFER, logflags, DTYPE_TRANSPORT, 0); + continue; + } + if ( max_parallel > 0 + && !enq_start( + serialize_key = string_sprintf("tpt-serialize-%s", tp->name), + (unsigned) max_parallel) + ) + { + DEBUG(D_transport) + debug_printf("skipping tpt %s because parallelism limit %u reached\n", + tp->name, (unsigned) max_parallel); + + deferlist_chain(addr); + continue; + } + } + + /* So, finally, we do have some addresses that can be passed to the transport. Before doing so, set up variables that are relevant to a single delivery. */ @@ -2719,6 +2767,10 @@ while (addr_local) deliver_set_expansions(NULL); + /* If the transport was parallelism-limited, decrement the hints DB record. */ + + if (serialize_key) enq_end(serialize_key); + /* Now we can process the results of the real transport. We must take each address off the chain first, because post_process_one() puts it on another chain. */ @@ -3730,7 +3782,14 @@ while (parcount > max) "remote delivery process count got out of step"); parcount = 0; } - else remote_post_process(doneaddr, LOG_MAIN, NULL, fallback); + else + { + transport_instance * tp = doneaddr->transport; + if (tp->max_parallel) + enq_end(string_sprintf("tpt-serialize-%s", tp->name)); + + remote_post_process(doneaddr, LOG_MAIN, NULL, fallback); + } } } @@ -3853,6 +3912,7 @@ for (delivery_count = 0; addr_remote; delivery_count++) address_item *last = addr; address_item *next; uschar * panicmsg; + uschar * serialize_key = NULL; /* Pull the first address right off the list. */ @@ -4027,6 +4087,34 @@ for (delivery_count = 0; addr_remote; delivery_count++) return FALSE; } + /* If the transport is limited for parallellism, enforce that here. + The hints DB entry is decremented in par_reduce(), when we reap the + transport process. */ + + if (tp->max_parallel) + { + int_eximarith_t max_parallel = + expand_string_integer(tp->max_parallel, TRUE); + if (expand_string_message) + { + panicmsg = expand_string_message; + goto panic_continue; + } + if ( max_parallel > 0 + && !enq_start( + serialize_key = string_sprintf("tpt-serialize-%s", tp->name), + (unsigned) max_parallel) + ) + { + DEBUG(D_transport) + debug_printf("skipping tpt %s because parallelism limit %u reached\n", + tp->name, (unsigned) max_parallel); + + deferlist_chain(addr); + continue; + } + } + /* Set up the expansion variables for this set of addresses */ deliver_set_expansions(addr); @@ -4055,7 +4143,7 @@ for (delivery_count = 0; addr_remote; delivery_count++) { panicmsg = string_sprintf("Failed to expand return path \"%s\": %s", tp->return_path, expand_string_message); - goto panic_continue; + goto enq_continue; } } @@ -4066,7 +4154,7 @@ for (delivery_count = 0; addr_remote; delivery_count++) if (!findugid(addr, tp, &uid, &gid, &use_initgroups)) { panicmsg = NULL; - goto panic_continue; + goto enq_continue; } /* If this transport has a setup function, call it now so that it gets @@ -4104,11 +4192,11 @@ for (delivery_count = 0; addr_remote; delivery_count++) if (!ok) { DEBUG(D_deliver) debug_printf("not suitable for continue_transport\n"); - next = addr; + if (serialize_key) enq_end(serialize_key); if (addr->fallback_hosts && !fallback) { - for (;; next = next->next) + for (next = addr; ; next = next->next) { next->host_list = next->fallback_hosts; DEBUG(D_deliver) debug_printf("%s queued for fallback host(s)\n", next->address); @@ -4119,11 +4207,7 @@ for (delivery_count = 0; addr_remote; delivery_count++) } else - { - while (next->next) next = next->next; - next->next = addr_defer; - addr_defer = addr; - } + deferlist_chain(addr); continue; } @@ -4185,7 +4269,7 @@ for (delivery_count = 0; addr_remote; delivery_count++) if (!pipe_done) { panicmsg = string_sprintf("unable to create pipe: %s", strerror(errno)); - goto panic_continue; + goto enq_continue; } /* Find a free slot in the pardata list. Must do this after the possible @@ -4203,7 +4287,7 @@ for (delivery_count = 0; addr_remote; delivery_count++) (void)close(pfd[pipe_write]); (void)close(pfd[pipe_read]); panicmsg = US"Unexpectedly no free subprocess slot"; - goto panic_continue; + goto enq_continue; } /* Now fork a subprocess to do the remote delivery, but before doing so, @@ -4532,7 +4616,7 @@ for (delivery_count = 0; addr_remote; delivery_count++) (void)close(pfd[pipe_read]); panicmsg = string_sprintf("fork failed for remote delivery to %s: %s", addr->domain, strerror(errno)); - goto panic_continue; + goto enq_continue; } /* Fork succeeded; increment the count, and remember relevant data for @@ -4567,6 +4651,8 @@ for (delivery_count = 0; addr_remote; delivery_count++) continue; +enq_continue: + if (serialize_key) enq_end(serialize_key); panic_continue: remote_post_process(addr, LOG_MAIN|LOG_PANIC, panicmsg, fallback); continue; @@ -7448,7 +7534,7 @@ if (!addr_defer) #ifdef EXPERIMENTAL_EVENT (void) event_raise(event_action, US"msg:complete", NULL); #endif -} + } /* If there are deferred addresses, we are keeping this message because it is not yet completed. Lose any temporary files that were catching output from diff --git a/src/src/globals.c b/src/src/globals.c index 55a101ffa..20e578e27 100644 --- a/src/src/globals.c +++ b/src/src/globals.c @@ -1424,6 +1424,7 @@ transport_instance transport_defaults = { NULL, /* remove_headers */ NULL, /* return_path */ NULL, /* debug_string */ + NULL, /* max_parallel */ NULL, /* message_size_limit */ NULL, /* headers_rewrite */ NULL, /* rewrite_rules */ diff --git a/src/src/structs.h b/src/src/structs.h index c36d08ca7..713702ea5 100644 --- a/src/src/structs.h +++ b/src/src/structs.h @@ -171,6 +171,7 @@ typedef struct transport_instance { uschar *remove_headers; /* Remove these headers */ uschar *return_path; /* Overriding (rewriting) return path */ uschar *debug_string; /* Debugging output */ + uschar *max_parallel; /* Number of concurrent instances */ uschar *message_size_limit; /* Biggest message this transport handles */ uschar *headers_rewrite; /* Rules for rewriting headers */ rewrite_rule *rewrite_rules; /* Parsed rewriting rules */ diff --git a/src/src/transport.c b/src/src/transport.c index a6ad3ed34..c258bfd9d 100644 --- a/src/src/transport.c +++ b/src/src/transport.c @@ -84,6 +84,8 @@ optionlist optionlist_transports[] = { (void *)offsetof(transport_instance, home_dir) }, { "initgroups", opt_bool|opt_public, (void *)offsetof(transport_instance, initgroups) }, + { "max_parallel", opt_stringptr|opt_public, + (void *)offsetof(transport_instance, max_parallel) }, { "message_size_limit", opt_stringptr|opt_public, (void *)offsetof(transport_instance, message_size_limit) }, { "rcpt_include_affixes", opt_bool|opt_public, diff --git a/test/confs/0288 b/test/confs/0288 index 270ffb29e..1d41d0804 100644 --- a/test/confs/0288 +++ b/test/confs/0288 @@ -1,4 +1,5 @@ # Exim test configuration 0288 +# serialize_hosts option on smtp transport exim_path = EXIM_PATH host_lookup_order = bydns diff --git a/test/confs/0611 b/test/confs/0611 new file mode 100644 index 000000000..b1bff27a1 --- /dev/null +++ b/test/confs/0611 @@ -0,0 +1,68 @@ +# Exim test configuration 0611 +# max_parallel on transport + +SERVER= + +exim_path = EXIM_PATH +host_lookup_order = bydns +primary_hostname = myhost.test.ex +spool_directory = DIR/spool +log_file_path = DIR/spool/log/%slog +gecos_pattern = "" +gecos_name = CALLER_NAME + +# ----- Main settings ----- + +qualify_domain = test.ex +queue_run_in_order +log_selector = +received_recipients + +acl_smtp_rcpt = accept ${if eq {SERVER}{server} {delay = 2s}} + +# ----- Routers ----- + +begin routers + +server: + condition = ${if eq {SERVER}{server} {yes}{no}} + driver = redirect + data = :blackhole: + +rmt_client: + local_parts = a:b:c + driver = manualroute + route_list = * 127.0.0.1 + self = send + transport = smtp + +lcl_client: + local_parts = x:y:z + driver = accept + transport = pipe + +# ----- Transports ----- + +begin transports + +smtp: + driver = smtp + port = PORT_D + max_rcpt = 1 + connection_max_messages = 1 + max_parallel = 2 + +pipe: + driver = pipe + command = "sleep 2; cat > /dev/null" + use_shell = true + max_parallel = 1 + +# ----- Retry ----- + + +begin retry + +* * F,1h,10m + + +# End diff --git a/test/log/0288 b/test/log/0288 index 6e082602a..5ca2885a4 100644 --- a/test/log/0288 +++ b/test/log/0288 @@ -3,3 +3,7 @@ 1999-03-02 09:44:33 10HmaX-0005vi-00 == b@test.ex R=all T=smtp defer (-53): connection limit reached for all hosts 1999-03-02 09:44:33 10HmaX-0005vi-00 => a@test.ex R=all T=smtp H=127.0.0.1 [127.0.0.1] C="250 OK" 1999-03-02 09:44:33 End queue run: pid=pppp +1999-03-02 09:44:33 Start queue run: pid=pppp +1999-03-02 09:44:33 10HmaX-0005vi-00 => b@test.ex R=all T=smtp H=127.0.0.1 [127.0.0.1] C="250 OK" +1999-03-02 09:44:33 10HmaX-0005vi-00 Completed +1999-03-02 09:44:33 End queue run: pid=pppp diff --git a/test/log/0611 b/test/log/0611 new file mode 100644 index 000000000..39c10bc12 --- /dev/null +++ b/test/log/0611 @@ -0,0 +1,27 @@ +1999-03-02 09:44:33 10HmaX-0005vi-00 <= CALLER@test.ex U=CALLER P=local S=sss for a b c +1999-03-02 09:44:33 exim x.yz daemon started: pid=pppp, no queue runs, listening for SMTP on port 1225 +1999-03-02 09:44:33 Start queue run: pid=pppp +1999-03-02 09:44:33 10HmaY-0005vi-00 <= CALLER@test.ex H=localhost (myhost.test.ex) [127.0.0.1] P=esmtp S=sss id=E10HmaX-0005vi-00@myhost.test.ex for a@test.ex +1999-03-02 09:44:33 10HmaY-0005vi-00 => :blackhole: R=server +1999-03-02 09:44:33 10HmaY-0005vi-00 Completed +1999-03-02 09:44:33 10HmaX-0005vi-00 => a@test.ex R=rmt_client T=smtp H=127.0.0.1 [127.0.0.1] C="250 OK id=10HmaY-0005vi-00" +1999-03-02 09:44:33 10HmaZ-0005vi-00 <= CALLER@test.ex H=localhost (myhost.test.ex) [127.0.0.1] P=esmtp S=sss id=E10HmaX-0005vi-00@myhost.test.ex for b@test.ex +1999-03-02 09:44:33 10HmaZ-0005vi-00 => :blackhole: R=server +1999-03-02 09:44:33 10HmaZ-0005vi-00 Completed +1999-03-02 09:44:33 10HmaX-0005vi-00 => b@test.ex R=rmt_client T=smtp H=127.0.0.1 [127.0.0.1] C="250 OK id=10HmaZ-0005vi-00" +1999-03-02 09:44:33 End queue run: pid=pppp +1999-03-02 09:44:33 Start queue run: pid=pppp +1999-03-02 09:44:33 10HmbA-0005vi-00 <= CALLER@test.ex H=localhost (myhost.test.ex) [127.0.0.1] P=esmtp S=sss id=E10HmaX-0005vi-00@myhost.test.ex for c@test.ex +1999-03-02 09:44:33 10HmbA-0005vi-00 => :blackhole: R=server +1999-03-02 09:44:33 10HmbA-0005vi-00 Completed +1999-03-02 09:44:33 10HmaX-0005vi-00 => c@test.ex R=rmt_client T=smtp H=127.0.0.1 [127.0.0.1] C="250 OK id=10HmbA-0005vi-00" +1999-03-02 09:44:33 10HmaX-0005vi-00 Completed +1999-03-02 09:44:33 End queue run: pid=pppp +1999-03-02 09:44:33 10HmbB-0005vi-00 <= CALLER@test.ex U=CALLER P=local S=sss for y +1999-03-02 09:44:33 10HmbC-0005vi-00 <= CALLER@test.ex U=CALLER P=local S=sss for z +1999-03-02 09:44:33 10HmbB-0005vi-00 => y R=lcl_client T=pipe +1999-03-02 09:44:33 10HmbB-0005vi-00 Completed +1999-03-02 09:44:33 Start queue run: pid=pppp +1999-03-02 09:44:33 10HmbC-0005vi-00 => z R=lcl_client T=pipe +1999-03-02 09:44:33 10HmbC-0005vi-00 Completed +1999-03-02 09:44:33 End queue run: pid=pppp diff --git a/test/scripts/0000-Basic/0288 b/test/scripts/0000-Basic/0288 index 7b867cedb..ed381dad7 100644 --- a/test/scripts/0000-Basic/0288 +++ b/test/scripts/0000-Basic/0288 @@ -1,9 +1,12 @@ # serialize_hosts need_ipv4 # +# preload the spool exim -odq a b . **** +# +# a slow server as a test target server PORT_S 220 ESMTP EHLO @@ -21,6 +24,32 @@ DATA QUIT 250 OK **** +# +# First message should go; second does not wait for 1st complete +# on same conn due to connection_max_messages, then is deferred +# as second transport run aborted by serialize_hosts. +exim -q +**** +# +# a server as a test target +server PORT_S +220 ESMTP +EHLO +250-OK +250 HELP +MAIL FROM: +250 Sender OK +RCPT TO: +250 Recipient OK +DATA +354 Send data +. +250 OK +QUIT +250 OK +**** +# +# Remaining message on queue should go immediately; no delay +# associated with retry rules exim -q **** -no_msglog_check diff --git a/test/scripts/0000-Basic/0611 b/test/scripts/0000-Basic/0611 new file mode 100644 index 000000000..c35284147 --- /dev/null +++ b/test/scripts/0000-Basic/0611 @@ -0,0 +1,43 @@ +# max_parallel on transport +need_ipv4 +# +# Remote transport: +# preload the spool +exim -odq a b c +. +**** +# +# a slow server as a test target +exim -DSERVER=server -bd -oX PORT_D +**** +# +# First and second messages should go, as separate conns due to +# connection_max_messages, third is deferred +# as third transport run denied by max_parallel +exim -q +**** +# +# +# Remaining message on queue should go immediately; no delay +# associated with retry rules +exim -q +**** +killdaemon +# +######## +# +# +# Local transport: +# Only one message should go as the transport takes a long +# time and we set max_parallel=1 to serialize it +exim y +**** +exim z +**** +# +# +sleep 3 +# +# Remaining message on queue should go immediately; no delay +# associated with retry rules +exim -q diff --git a/test/stdout/0288 b/test/stdout/0288 index 8f36a023d..3eab9dfe7 100644 --- a/test/stdout/0288 +++ b/test/stdout/0288 @@ -25,3 +25,27 @@ Date: Tue, 2 Mar 1999 09:44:33 +0000 QUIT 250 OK End of script +Listening on port 1224 ... +Connection request from [127.0.0.1] +220 ESMTP +EHLO myhost.test.ex +250-OK +250 HELP +MAIL FROM: +250 Sender OK +RCPT TO: +250 Recipient OK +DATA +354 Send data +Received: from CALLER by myhost.test.ex with local (Exim x.yz) + (envelope-from ) + id 10HmaX-0005vi-00; Tue, 2 Mar 1999 09:44:33 +0000 +Message-Id: +From: CALLER_NAME +Date: Tue, 2 Mar 1999 09:44:33 +0000 + +. +250 OK +QUIT +250 OK +End of script diff --git a/test/stdout/0390 b/test/stdout/0390 index a80e34c38..ca6f90cbd 100644 --- a/test/stdout/0390 +++ b/test/stdout/0390 @@ -52,6 +52,7 @@ headers_remove = headers_rewrite = home_directory = no_initgroups +max_parallel = message_size_limit = no_rcpt_include_affixes retry_use_local_part @@ -106,6 +107,7 @@ headers_remove = headers_rewrite = home_directory = no_initgroups +max_parallel = message_size_limit = no_rcpt_include_affixes retry_use_local_part -- cgit v1.2.3