diff options
author | Jeremy Harris <jgh146exb@wizmail.org> | 2020-01-29 13:30:24 +0000 |
---|---|---|
committer | Jeremy Harris <jgh146exb@wizmail.org> | 2020-01-29 13:30:24 +0000 |
commit | 92562f63be6fae2526d68171d60bf87027551f88 (patch) | |
tree | 8abca413700a4ffb53401afcc3b70756ed41c531 /src | |
parent | 60e2e80a81b642a78a58997719dc26828eed2c2b (diff) |
Two-phase queue run perf: parallel processes for phase one
Diffstat (limited to 'src')
-rw-r--r-- | src/src/queue.c | 58 |
1 files changed, 53 insertions, 5 deletions
diff --git a/src/src/queue.c b/src/src/queue.c index f808ed103..d472b9851 100644 --- a/src/src/queue.c +++ b/src/src/queue.c @@ -346,6 +346,7 @@ const pcre *selectstring_regex_sender = NULL; uschar *log_detail = NULL; int subcount = 0; uschar subdirs[64]; +pid_t qpid[4] = {0}; /* Parallelism factor for q2stage 1st phase */ #ifdef MEASURE_TIMING report_time_since(×tamp_startup, US"queue_run start"); @@ -468,18 +469,40 @@ for (int i = queue_run_in_order ? -1 : 0; (double)load_average/1000.0, (double)deliver_queue_load_max/1000.0); + /* If initial of a 2-phase run, maintain a set of child procs + to get disk parallelism */ + + if (f.queue_2stage && !queue_run_in_order) + { + int i; + if (qpid[nelem(qpid) - 1]) + { + DEBUG(D_queue_run) debug_printf("q2stage waiting for child\n"); + waitpid(qpid[0], NULL, 0); + DEBUG(D_queue_run) debug_printf("q2stage reaped child %d\n", (int)qpid[0]); + for (i = 0; i < nelem(qpid) - 1; i++) qpid[i] = qpid[i+1]; + qpid[i] = 0; + } + else + for (i = 0; qpid[i]; ) i++; + DEBUG(D_queue_run) debug_printf("q2stage forking\n"); + if ((qpid[i] = fork())) + continue; /* parent loops around */ + DEBUG(D_queue_run) debug_printf("q2stage child\n"); + } + /* Skip this message unless it's within the ID limits */ if (stop_id && Ustrncmp(fq->text, stop_id, MESSAGE_ID_LENGTH) > 0) - continue; + goto go_around; if (start_id && Ustrncmp(fq->text, start_id, MESSAGE_ID_LENGTH) < 0) - continue; + goto go_around; /* Check that the message still exists */ message_subdir[0] = fq->dir_uschar; if (Ustat(spool_fname(US"input", message_subdir, fq->text, US""), &statbuf) < 0) - continue; + goto go_around; /* There are some tests that require the reading of the header file. Ensure the store used is scavenged afterwards so that this process doesn't keep @@ -499,7 +522,7 @@ for (int i = queue_run_in_order ? -1 : 0; follow. If the message is chosen for delivery, the header is read again in the deliver_message() function, in a subprocess. */ - if (spool_read_header(fq->text, FALSE, TRUE) != spool_read_OK) continue; + if (spool_read_header(fq->text, FALSE, TRUE) != spool_read_OK) goto go_around; f.dont_deliver = orig_dont_deliver; /* Now decide if we want to deliver this message. As we have read the @@ -571,7 +594,7 @@ for (int i = queue_run_in_order ? -1 : 0; spool_clear_header_globals(); store_reset(reset_point2); - if (!wanted) continue; /* With next message */ + if (!wanted) goto go_around; /* With next message */ } /* OK, got a message we want to deliver. Create a pipe which will @@ -660,6 +683,10 @@ for (int i = queue_run_in_order ? -1 : 0; (void)close(pfd[pipe_read]); set_process_info("running queue"); + /* If initial of a 2-phase run, we are a child - so just exit */ + if (f.queue_2stage && !queue_run_in_order) + exim_exit(EXIT_SUCCESS, US"2-phase child"); + /* If we are in the test harness, and this is not the first of a 2-stage queue run, update fudged queue times. */ @@ -668,6 +695,14 @@ for (int i = queue_run_in_order ? -1 : 0; uschar * fqtnext = Ustrchr(fudged_queue_times, '/'); if (fqtnext) fudged_queue_times = fqtnext + 1; } + + + continue; + + go_around: + /* If initial of a 2-phase run, we are a child - so just exit */ + if (f.queue_2stage && !queue_run_in_order) + exim_exit(EXIT_SUCCESS, US"2-phase child"); } /* End loop for list of messages */ tree_nonrecipients = NULL; @@ -695,6 +730,19 @@ turned off. */ if (f.queue_2stage) { + + /* wait for last children */ + for (int i = 0; i < nelem(qpid); i++) + if (qpid[i]) + { + DEBUG(D_queue_run) debug_printf("q2stage reaped child %d\n", (int)qpid[i]); + waitpid(qpid[i], NULL, 0); + } + else break; + +#ifdef MEASURE_TIMING + report_time_since(×tamp_startup, US"queue_run 1st phase done"); +#endif f.queue_2stage = FALSE; queue_run(start_id, stop_id, TRUE); } |