summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJeremy Harris <jgh146exb@wizmail.org>2020-01-29 13:30:24 +0000
committerJeremy Harris <jgh146exb@wizmail.org>2020-01-29 13:30:24 +0000
commit92562f63be6fae2526d68171d60bf87027551f88 (patch)
tree8abca413700a4ffb53401afcc3b70756ed41c531 /src
parent60e2e80a81b642a78a58997719dc26828eed2c2b (diff)
Two-phase queue run perf: parallel processes for phase one
Diffstat (limited to 'src')
-rw-r--r--src/src/queue.c58
1 files changed, 53 insertions, 5 deletions
diff --git a/src/src/queue.c b/src/src/queue.c
index f808ed103..d472b9851 100644
--- a/src/src/queue.c
+++ b/src/src/queue.c
@@ -346,6 +346,7 @@ const pcre *selectstring_regex_sender = NULL;
uschar *log_detail = NULL;
int subcount = 0;
uschar subdirs[64];
+pid_t qpid[4] = {0}; /* Parallelism factor for q2stage 1st phase */
#ifdef MEASURE_TIMING
report_time_since(&timestamp_startup, US"queue_run start");
@@ -468,18 +469,40 @@ for (int i = queue_run_in_order ? -1 : 0;
(double)load_average/1000.0,
(double)deliver_queue_load_max/1000.0);
+ /* If initial of a 2-phase run, maintain a set of child procs
+ to get disk parallelism */
+
+ if (f.queue_2stage && !queue_run_in_order)
+ {
+ int i;
+ if (qpid[nelem(qpid) - 1])
+ {
+ DEBUG(D_queue_run) debug_printf("q2stage waiting for child\n");
+ waitpid(qpid[0], NULL, 0);
+ DEBUG(D_queue_run) debug_printf("q2stage reaped child %d\n", (int)qpid[0]);
+ for (i = 0; i < nelem(qpid) - 1; i++) qpid[i] = qpid[i+1];
+ qpid[i] = 0;
+ }
+ else
+ for (i = 0; qpid[i]; ) i++;
+ DEBUG(D_queue_run) debug_printf("q2stage forking\n");
+ if ((qpid[i] = fork()))
+ continue; /* parent loops around */
+ DEBUG(D_queue_run) debug_printf("q2stage child\n");
+ }
+
/* Skip this message unless it's within the ID limits */
if (stop_id && Ustrncmp(fq->text, stop_id, MESSAGE_ID_LENGTH) > 0)
- continue;
+ goto go_around;
if (start_id && Ustrncmp(fq->text, start_id, MESSAGE_ID_LENGTH) < 0)
- continue;
+ goto go_around;
/* Check that the message still exists */
message_subdir[0] = fq->dir_uschar;
if (Ustat(spool_fname(US"input", message_subdir, fq->text, US""), &statbuf) < 0)
- continue;
+ goto go_around;
/* There are some tests that require the reading of the header file. Ensure
the store used is scavenged afterwards so that this process doesn't keep
@@ -499,7 +522,7 @@ for (int i = queue_run_in_order ? -1 : 0;
follow. If the message is chosen for delivery, the header is read again
in the deliver_message() function, in a subprocess. */
- if (spool_read_header(fq->text, FALSE, TRUE) != spool_read_OK) continue;
+ if (spool_read_header(fq->text, FALSE, TRUE) != spool_read_OK) goto go_around;
f.dont_deliver = orig_dont_deliver;
/* Now decide if we want to deliver this message. As we have read the
@@ -571,7 +594,7 @@ for (int i = queue_run_in_order ? -1 : 0;
spool_clear_header_globals();
store_reset(reset_point2);
- if (!wanted) continue; /* With next message */
+ if (!wanted) goto go_around; /* With next message */
}
/* OK, got a message we want to deliver. Create a pipe which will
@@ -660,6 +683,10 @@ for (int i = queue_run_in_order ? -1 : 0;
(void)close(pfd[pipe_read]);
set_process_info("running queue");
+ /* If initial of a 2-phase run, we are a child - so just exit */
+ if (f.queue_2stage && !queue_run_in_order)
+ exim_exit(EXIT_SUCCESS, US"2-phase child");
+
/* If we are in the test harness, and this is not the first of a 2-stage
queue run, update fudged queue times. */
@@ -668,6 +695,14 @@ for (int i = queue_run_in_order ? -1 : 0;
uschar * fqtnext = Ustrchr(fudged_queue_times, '/');
if (fqtnext) fudged_queue_times = fqtnext + 1;
}
+
+
+ continue;
+
+ go_around:
+ /* If initial of a 2-phase run, we are a child - so just exit */
+ if (f.queue_2stage && !queue_run_in_order)
+ exim_exit(EXIT_SUCCESS, US"2-phase child");
} /* End loop for list of messages */
tree_nonrecipients = NULL;
@@ -695,6 +730,19 @@ turned off. */
if (f.queue_2stage)
{
+
+ /* wait for last children */
+ for (int i = 0; i < nelem(qpid); i++)
+ if (qpid[i])
+ {
+ DEBUG(D_queue_run) debug_printf("q2stage reaped child %d\n", (int)qpid[i]);
+ waitpid(qpid[i], NULL, 0);
+ }
+ else break;
+
+#ifdef MEASURE_TIMING
+ report_time_since(&timestamp_startup, US"queue_run 1st phase done");
+#endif
f.queue_2stage = FALSE;
queue_run(start_id, stop_id, TRUE);
}