diff options
author | Jeremy Harris <jgh146exb@wizmail.org> | 2020-02-18 11:30:57 +0000 |
---|---|---|
committer | Jeremy Harris <jgh146exb@wizmail.org> | 2020-02-18 11:30:57 +0000 |
commit | ff9663026d1a318d385730c4a2c3e85508b4b00b (patch) | |
tree | 70d594f5874bd70138ead18fe287599fa3278ca4 /src | |
parent | 5bf8a51681e171328e72f5d5b5ef8fd8a67d5f05 (diff) |
Overlapped twophase-queue-run and delivery. Experimental.
Diffstat (limited to 'src')
-rw-r--r-- | src/src/EDITME | 3 | ||||
-rw-r--r-- | src/src/child.c | 3 | ||||
-rw-r--r-- | src/src/config.h.defaults | 1 | ||||
-rw-r--r-- | src/src/daemon.c | 167 | ||||
-rw-r--r-- | src/src/deliver.c | 2 | ||||
-rw-r--r-- | src/src/exim.c | 26 | ||||
-rw-r--r-- | src/src/functions.h | 7 | ||||
-rw-r--r-- | src/src/globals.c | 7 | ||||
-rw-r--r-- | src/src/globals.h | 7 | ||||
-rw-r--r-- | src/src/macros.h | 5 | ||||
-rw-r--r-- | src/src/queue.c | 35 | ||||
-rw-r--r-- | src/src/readconf.c | 3 | ||||
-rw-r--r-- | src/src/smtp_out.c | 3 | ||||
-rw-r--r-- | src/src/spool_in.c | 6 | ||||
-rw-r--r-- | src/src/transport.c | 7 |
15 files changed, 255 insertions, 27 deletions
diff --git a/src/src/EDITME b/src/src/EDITME index 352bc7d7a..8d8552346 100644 --- a/src/src/EDITME +++ b/src/src/EDITME @@ -632,6 +632,9 @@ DISABLE_MAL_MKS=yes # Uncomment the following line to include support for TLS Resumption # EXPERIMENTAL_TLS_RESUME=yes +# Uncomment the following to include the fast-ramp two-phase-queue-run support +# EXPERIMENTAL_QUEUE_RAMP=yes + ############################################################################### # THESE ARE THINGS YOU MIGHT WANT TO SPECIFY # ############################################################################### diff --git a/src/src/child.c b/src/src/child.c index d3cd88201..c5054b6fb 100644 --- a/src/src/child.c +++ b/src/src/child.c @@ -75,7 +75,7 @@ int n = 0; int extra = pcount ? *pcount : 0; uschar **argv; -argv = store_get((extra + acount + MAX_CLMACROS + 18) * sizeof(char *), FALSE); +argv = store_get((extra + acount + MAX_CLMACROS + 19) * sizeof(char *), FALSE); /* In all case, the list starts out with the path, any macros, and a changed config file. */ @@ -109,6 +109,7 @@ if (!minimal) if (debug_selector != 0) argv[n++] = string_sprintf("-d=0x%x", debug_selector); } + if (!f.testsuite_delays) argv[n++] = US"-odd"; if (f.dont_deliver) argv[n++] = US"-N"; if (f.queue_smtp) argv[n++] = US"-odqs"; if (f.synchronous_delivery) argv[n++] = US"-odi"; diff --git a/src/src/config.h.defaults b/src/src/config.h.defaults index 223e2d645..9d77f3054 100644 --- a/src/src/config.h.defaults +++ b/src/src/config.h.defaults @@ -202,6 +202,7 @@ Do not put spaces between # and the 'define'. #define EXPERIMENTAL_DCC #define EXPERIMENTAL_DSN_INFO #define EXPERIMENTAL_LMDB +#define EXPERIMENTAL_QUEUE_RAMP #define EXPERIMENTAL_QUEUEFILE #define EXPERIMENTAL_SRS #define EXPERIMENTAL_SRS_NATIVE diff --git a/src/src/daemon.c b/src/src/daemon.c index ddfd8e7dc..aedd3fb84 100644 --- a/src/src/daemon.c +++ b/src/src/daemon.c @@ -973,6 +973,102 @@ exim_exit(EXIT_SUCCESS, US"daemon"); } +#ifdef EXPERIMENTAL_QUEUE_RAMP +/************************************************* +* Listener socket for local work prompts * +*************************************************/ + +static void +daemon_notifier_socket(void) +{ +int fd; +const uschar * where; +struct sockaddr_un sun = {.sun_family = AF_UNIX}; + +DEBUG(D_any) debug_printf("creating notifier socket\n"); + +where = US"socket"; +#ifdef SOCK_CLOEXEC +if ((fd = socket(AF_UNIX, SOCK_DGRAM|SOCK_CLOEXEC, 0)) < 0) + goto bad; +#else +if ((fd = socket(AF_UNIX, SOCK_DGRAM, 0))) < 0) + goto bad; +(void)fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) | FD_CLOEXEC); +#endif + +snprintf(sun.sun_path, sizeof(sun.sun_path), "%s/%s", + spool_directory, NOTIFIER_SOCKET_NAME); +where = US"bind"; +if (bind(fd, (const struct sockaddr *)&sun, sizeof(sun)) < 0) + goto bad; + +where = US"SO_PASSCRED"; +if (setsockopt(fd, SOL_SOCKET, SO_PASSCRED, &on, sizeof(on)) < 0) + goto bad; + +/* debug_printf("%s: fd %d\n", __FUNCTION__, fd); */ +daemon_notifier_fd = fd; +return; + +bad: + log_write(0, LOG_MAIN|LOG_PANIC, "%s: %s: %s", + __FUNCTION__, where, strerror(errno)); +} + + +static uschar queuerun_msgid[MESSAGE_ID_LENGTH+1]; + +/* Return TRUE if a sigalrm should be emulated */ +static BOOL +daemon_notification(void) +{ +uschar buf[256], cbuf[256]; +struct iovec iov = {.iov_base = buf, .iov_len = sizeof(buf)-1}; +struct msghdr msg = { .msg_name = NULL, + .msg_namelen = 0, + .msg_iov = &iov, + .msg_iovlen = 1, + .msg_control = cbuf, + .msg_controllen = sizeof(cbuf) + }; +ssize_t sz; +struct cmsghdr * cp; + +buf[sizeof(buf)-1] = 0; +if ((sz = recvmsg(daemon_notifier_fd, &msg, 0)) <= 0) return FALSE; +if (sz >= sizeof(buf)) return FALSE; + +for (struct cmsghdr * cp = CMSG_FIRSTHDR(&msg); + cp; + cp = CMSG_NXTHDR(&msg, cp)) + if (cp->cmsg_level == SOL_SOCKET && cp->cmsg_type == SCM_CREDENTIALS) + { + struct ucred * cr = (struct ucred *) CMSG_DATA(cp); + if (cr->uid && cr->uid != exim_uid) + { + DEBUG(D_queue_run) debug_printf("%s: sender creds pid %d uid %d gid %d\n", + __FUNCTION__, (int)cr->pid, (int)cr->uid, (int)cr->gid); + return FALSE; + } + break; + } + +buf[sz] = 0; +switch (buf[0]) + { + case NOTIFY_MSG_QRUN: + /* this should be a message_id */ + DEBUG(D_queue_run) + debug_printf("%s: qrunner trigger: %s\n", __FUNCTION__, buf+1); + memcpy(queuerun_msgid, buf+1, MESSAGE_ID_LENGTH+1); + return TRUE; + } +return FALSE; +} +#endif /*EXPERIMENTAL_QUEUE_RAMP*/ + + /************************************************* * Exim Daemon Mainline * *************************************************/ @@ -1418,6 +1514,11 @@ if (f.background_daemon) /* We are now in the disconnected, daemon process (unless debugging). Set up the listening sockets if required. */ +#ifdef EXPERIMENTAL_QUEUE_RAMP +if (queue_fast_ramp) + daemon_notifier_socket(); +#endif + if (f.daemon_listen && !f.inetd_wait_mode) { int sk; @@ -1693,7 +1794,7 @@ if (f.inetd_wait_mode) set_process_info("daemon(%s): pre-listening socket", version_string); /* set up the timeout logic */ - sigalrm_seen = 1; + sigalrm_seen = TRUE; } else if (f.daemon_listen) @@ -1921,7 +2022,11 @@ for (;;) else { - DEBUG(D_any) debug_printf("SIGALRM received\n"); + DEBUG(D_any) debug_printf("%s received\n", +#ifdef EXPERIMENTAL_QUEUE_RAMP + *queuerun_msgid ? "qrun notification" : +#endif + "SIGALRM"); /* Do a full queue run in a child process, if required, unless we already have enough queue runners on the go. If we are not running as root, a @@ -1943,8 +2048,12 @@ for (;;) /* Close any open listening sockets in the child */ +#ifdef EXPERIMENTAL_QUEUE_RAMP + if (daemon_notifier_fd >= 0) + (void) close(daemon_notifier_fd); +#endif for (int sk = 0; sk < listen_socket_count; sk++) - (void)close(listen_sockets[sk]); + (void) close(listen_sockets[sk]); /* Reset SIGHUP and SIGCHLD in the child in both cases. */ @@ -1959,13 +2068,17 @@ for (;;) { uschar opt[8]; uschar *p = opt; - uschar *extra[5]; + uschar *extra[7]; int extracount = 1; signal(SIGALRM, SIG_DFL); *p++ = '-'; *p++ = 'q'; - if (f.queue_2stage) *p++ = 'q'; + if ( f.queue_2stage +#ifdef EXPERIMENTAL_QUEUE_RAMP + && !*queuerun_msgid +#endif + ) *p++ = 'q'; if (f.queue_run_first_delivery) *p++ = 'i'; if (f.queue_run_force) *p++ = 'f'; if (f.deliver_force_thaw) *p++ = 'f'; @@ -1974,6 +2087,14 @@ for (;;) extra[0] = *queue_name ? string_sprintf("%sG%s", opt, queue_name) : opt; +#ifdef EXPERIMENTAL_QUEUE_RAMP + if (*queuerun_msgid) + { + extra[extracount++] = queuerun_msgid; /* Trigger only the */ + extra[extracount++] = queuerun_msgid; /* one message */ + } +#endif + /* If -R or -S were on the original command line, ensure they get passed on. */ @@ -1992,15 +2113,23 @@ for (;;) /* Overlay this process with a new execution. */ - (void)child_exec_exim(CEE_EXEC_PANIC, FALSE, NULL, TRUE, extracount, - extra[0], extra[1], extra[2], extra[3], extra[4]); + (void)child_exec_exim(CEE_EXEC_PANIC, FALSE, NULL, FALSE, extracount, + extra[0], extra[1], extra[2], extra[3], extra[4], extra[5], extra[6]); /* Control never returns here. */ } /* No need to re-exec; SIGALRM remains set to the default handler */ - queue_run(NULL, NULL, FALSE); +#ifdef EXPERIMENTAL_QUEUE_RAMP + if (*queuerun_msgid) + { + f.queue_2stage = FALSE; + queue_run(queuerun_msgid, queuerun_msgid, FALSE); + } + else +#endif + queue_run(NULL, NULL, FALSE); exim_underbar_exit(EXIT_SUCCESS); } @@ -2027,7 +2156,12 @@ for (;;) /* Reset the alarm clock */ sigalrm_seen = FALSE; - ALARM(queue_interval); +#ifdef EXPERIMENTAL_QUEUE_RAMP + if (*queuerun_msgid) + *queuerun_msgid = 0; + else +#endif + ALARM(queue_interval); } } /* sigalrm_seen */ @@ -2050,6 +2184,10 @@ for (;;) fd_set select_listen; FD_ZERO(&select_listen); +#ifdef EXPERIMENTAL_QUEUE_RAMP + if (daemon_notifier_fd >= 0) + FD_SET(daemon_notifier_fd, &select_listen); +#endif for (int sk = 0; sk < listen_socket_count; sk++) { FD_SET(listen_sockets[sk], &select_listen); @@ -2105,6 +2243,16 @@ for (;;) int accept_socket = -1; if (!select_failed) + { +#ifdef EXPERIMENTAL_QUEUE_RAMP + if ( daemon_notifier_fd >= 0 + && FD_ISSET(daemon_notifier_fd, &select_listen)) + { + FD_CLR(daemon_notifier_fd, &select_listen); + sigalrm_seen = daemon_notification(); + break; /* to top of daemon loop */ + } +#endif for (int sk = 0; sk < listen_socket_count; sk++) if (FD_ISSET(listen_sockets[sk], &select_listen)) { @@ -2114,6 +2262,7 @@ for (;;) FD_CLR(listen_sockets[sk], &select_listen); break; } + } /* If select or accept has failed and this was not caused by an interruption, log the incident and try again. With asymmetric TCP/IP diff --git a/src/src/deliver.c b/src/src/deliver.c index c4160a50c..467813800 100644 --- a/src/src/deliver.c +++ b/src/src/deliver.c @@ -4642,6 +4642,7 @@ all pipes, so I do not see a reason to use non-blocking IO here search_tidyup(); + DEBUG(D_deliver) debug_printf("forking transport process\n"); if ((pid = fork()) == 0) { int fd = pfd[pipe_write]; @@ -4972,6 +4973,7 @@ all pipes, so I do not see a reason to use non-blocking IO here (void)close(fd); exit(EXIT_SUCCESS); } + DEBUG(D_deliver) debug_printf("forked transport process (%d)\n", pid); /* Back in the mainline: close the unwanted half of the pipe. */ diff --git a/src/src/exim.c b/src/src/exim.c index 98174d61a..a8f3c2248 100644 --- a/src/src/exim.c +++ b/src/src/exim.c @@ -972,15 +972,6 @@ fprintf(fp, "Support for:"); tcp_init(); if (f.tcp_fastopen_ok) fprintf(fp, " TCP_Fast_Open"); #endif -#ifdef EXPERIMENTAL_LMDB - fprintf(fp, " Experimental_LMDB"); -#endif -#ifdef EXPERIMENTAL_QUEUEFILE - fprintf(fp, " Experimental_QUEUEFILE"); -#endif -#if defined(EXPERIMENTAL_SRS) || defined(EXPERIMENTAL_SRS_NATIVE) - fprintf(fp, " Experimental_SRS"); -#endif #ifdef EXPERIMENTAL_ARC fprintf(fp, " Experimental_ARC"); #endif @@ -993,6 +984,18 @@ fprintf(fp, "Support for:"); #ifdef EXPERIMENTAL_DSN_INFO fprintf(fp, " Experimental_DSN_info"); #endif +#ifdef EXPERIMENTAL_LMDB + fprintf(fp, " Experimental_LMDB"); +#endif +#ifdef EXPERIMENTAL_QUEUE_RAMP + fprintf(fp, " Experimental_Queue_Ramp"); +#endif +#ifdef EXPERIMENTAL_QUEUEFILE + fprintf(fp, " Experimental_QUEUEFILE"); +#endif +#if defined(EXPERIMENTAL_SRS) || defined(EXPERIMENTAL_SRS_NATIVE) + fprintf(fp, " Experimental_SRS"); +#endif #ifdef EXPERIMENTAL_TLS_RESUME fprintf(fp, " Experimental_TLS_resume"); #endif @@ -3006,6 +3009,11 @@ for (i = 1; i < argc; i++) queue_only_set = TRUE; } + /* -odd: testsuite-only: add no inter-process delays */ + + else if (Ustrcmp(argrest, "d") == 0) + f.testsuite_delays = FALSE; + /* -odf: foreground delivery (smail-compatible option); same effect as -odi: interactive (synchronous) delivery (sendmail-compatible option) */ diff --git a/src/src/functions.h b/src/src/functions.h index 8b04d587d..9716a02b4 100644 --- a/src/src/functions.h +++ b/src/src/functions.h @@ -363,8 +363,11 @@ extern int vaguely_random_number_fallback(int); extern BOOL queue_action(uschar *, int, uschar **, int, int); extern void queue_check_only(void); -extern void queue_list(int, uschar **, int); extern void queue_count(void); +extern void queue_list(int, uschar **, int); +#ifdef EXPERIMENTAL_QUEUE_RAMP +extern void queue_notify_daemon(const uschar * hostname); +#endif extern void queue_run(uschar *, uschar *, BOOL); extern int random_number(int); @@ -1043,7 +1046,7 @@ static inline void testharness_pause_ms(int millisec) { #ifndef MEASURE_TIMING -if (f.running_in_test_harness) millisleep(millisec); +if (f.running_in_test_harness && f.testsuite_delays) millisleep(millisec); #endif } diff --git a/src/src/globals.c b/src/src/globals.c index 53a4d12c6..458ab487e 100644 --- a/src/src/globals.c +++ b/src/src/globals.c @@ -313,6 +313,7 @@ struct global_flags f = .system_filtering = FALSE, .taint_check_slow = FALSE, + .testsuite_delays = TRUE, .tcp_fastopen_ok = FALSE, .tcp_in_fastopen = FALSE, .tcp_in_fastopen_data = FALSE, @@ -379,6 +380,9 @@ BOOL prod_requires_admin = TRUE; BOOL proxy_session = FALSE; #endif +#ifdef EXPERIMENTAL_QUEUE_RAMP +BOOL queue_fast_ramp = FALSE; +#endif BOOL queue_list_requires_admin = TRUE; BOOL queue_only = FALSE; BOOL queue_only_load_latch = TRUE; @@ -736,6 +740,9 @@ cut_t cutthrough = { .nrcpt = 0, /* number of addresses */ }; +#ifdef EXPERIMENTAL_QUEUE_RAMP +int daemon_notifier_fd = -1; +#endif uschar *daemon_smtp_port = US"smtp"; int daemon_startup_retries = 9; int daemon_startup_sleep = 30; diff --git a/src/src/globals.h b/src/src/globals.h index 74af185ac..88751f372 100644 --- a/src/src/globals.h +++ b/src/src/globals.h @@ -275,6 +275,7 @@ extern struct global_flags { BOOL system_filtering :1; /* TRUE when running system filter */ BOOL taint_check_slow :1; /* malloc/mmap are not returning distinct ranges */ + BOOL testsuite_delays :1; /* interprocess sequencing delays, under testsuite */ BOOL tcp_fastopen_ok :1; /* appears to be supported by kernel */ BOOL tcp_in_fastopen :1; /* conn usefully used fastopen */ BOOL tcp_in_fastopen_data :1; /* fastopen carried data */ @@ -446,6 +447,9 @@ typedef struct { } cut_t; extern cut_t cutthrough; /* Deliver-concurrently */ +#ifdef EXPERIMENTAL_QUEUE_RAMP +extern int daemon_notifier_fd; /* Unix socket for notifications */ +#endif extern uschar *daemon_smtp_port; /* Can be a list of ports */ extern int daemon_startup_retries; /* Number of times to retry */ extern int daemon_startup_sleep; /* Sleep between retries */ @@ -786,6 +790,9 @@ extern uschar *prvscheck_result; /* Set during prvscheck expansion item */ extern const uschar *qualify_domain_recipient; /* Domain to qualify recipients with */ extern uschar *qualify_domain_sender; /* Domain to qualify senders with */ extern uschar *queue_domains; /* Queue these domains */ +#ifdef EXPERIMENTAL_QUEUE_RAMP +extern BOOL queue_fast_ramp; /* 2-phase queue-run overlap */ +#endif extern BOOL queue_list_requires_admin; /* TRUE if -bp requires admin */ /* immediate children */ extern pid_t queue_run_pid; /* PID of the queue running process or 0 */ diff --git a/src/src/macros.h b/src/src/macros.h index c99b152d5..ca61f530b 100644 --- a/src/src/macros.h +++ b/src/src/macros.h @@ -1100,4 +1100,9 @@ should not be one active. */ #define SVFMT_TAINT_NOCHK BIT(2) +#ifdef EXPERIMENTAL_QUEUE_RAMP +# define NOTIFIER_SOCKET_NAME "exim_daemon_notify" +# define NOTIFY_MSG_QRUN 1 /* Notify message types */ +#endif + /* End of macros.h */ diff --git a/src/src/queue.c b/src/src/queue.c index d472b9851..3c72eade6 100644 --- a/src/src/queue.c +++ b/src/src/queue.c @@ -346,7 +346,7 @@ const pcre *selectstring_regex_sender = NULL; uschar *log_detail = NULL; int subcount = 0; uschar subdirs[64]; -pid_t qpid[4] = {0}; /* Parallelism factor for q2stage 1st phase */ +pid_t qpid[1] = {0}; /* Parallelism factor for q2stage 1st phase */ #ifdef MEASURE_TIMING report_time_since(×tamp_startup, US"queue_run start"); @@ -1491,6 +1491,39 @@ if (s) } } + + +/******************************************************************************/ +/******************************************************************************/ + +#ifdef EXPERIMENTAL_QUEUE_RAMP +void +queue_notify_daemon(const uschar * msgid) +{ +uschar buf[MESSAGE_ID_LENGTH + 2]; +int fd; + +DEBUG(D_queue_run) debug_printf("%s: %s\n", __FUNCTION__, msgid); + +buf[0] = NOTIFY_MSG_QRUN; +memcpy(buf+1, msgid, MESSAGE_ID_LENGTH+1); + +if ((fd = socket(AF_UNIX, SOCK_DGRAM, 0)) >= 0) + { + struct sockaddr_un sun = {.sun_family = AF_UNIX}; + + snprintf(sun.sun_path, sizeof(sun.sun_path), "%s/%s", + spool_directory, NOTIFIER_SOCKET_NAME); + + if (sendto(fd, buf, sizeof(buf), 0, &sun, sizeof(sun)) < 0) + DEBUG(D_queue_run) + debug_printf("%s: sendto %s\n", __FUNCTION__, strerror(errno)); + close(fd); + } +else DEBUG(D_queue_run) debug_printf(" socket: %s\n", strerror(errno)); +} +#endif + #endif /*!COMPILE_UTILITY*/ /* End of queue.c */ diff --git a/src/src/readconf.c b/src/src/readconf.c index f16f51dab..c8a3dffba 100644 --- a/src/src/readconf.c +++ b/src/src/readconf.c @@ -259,6 +259,9 @@ static optionlist optionlist_config[] = { { "qualify_domain", opt_stringptr, {&qualify_domain_sender} }, { "qualify_recipient", opt_stringptr, {&qualify_domain_recipient} }, { "queue_domains", opt_stringptr, {&queue_domains} }, +#ifdef EXPERIMENTAL_QUEUE_RAMP + { "queue_fast_ramp", opt_bool, {&queue_fast_ramp} }, +#endif { "queue_list_requires_admin",opt_bool, {&queue_list_requires_admin} }, { "queue_only", opt_bool, {&queue_only} }, { "queue_only_file", opt_stringptr, {&queue_only_file} }, diff --git a/src/src/smtp_out.c b/src/src/smtp_out.c index 96ee15282..12ed5bc61 100644 --- a/src/src/smtp_out.c +++ b/src/src/smtp_out.c @@ -500,7 +500,7 @@ else rc = n; } else - + { rc = send(outblock->cctx->sock, outblock->buffer, n, #ifdef MSG_MORE more ? MSG_MORE : 0 @@ -508,6 +508,7 @@ else 0 #endif ); + } } if (rc <= 0) diff --git a/src/src/spool_in.c b/src/src/spool_in.c index 575c398a2..5f8a8226f 100644 --- a/src/src/spool_in.c +++ b/src/src/spool_in.c @@ -105,9 +105,9 @@ lock_data.l_len = SPOOL_DATA_START_OFFSET; if (fcntl(fd, F_SETLK, &lock_data) < 0) { - log_write(L_skip_delivery, - LOG_MAIN, - "Spool file is locked (another process is handling this message)"); + log_write(L_skip_delivery, LOG_MAIN, + "Spool file for %s is locked (another process is handling this message)", + id); (void)close(fd); errno = 0; return -1; diff --git a/src/src/transport.c b/src/src/transport.c index 02994d2ca..d9eba1621 100644 --- a/src/src/transport.c +++ b/src/src/transport.c @@ -1560,12 +1560,17 @@ for (host_item * host = hostlist; host; host = host->next) /* If this record is full, write it out with a new name constructed from the sequence number, increase the sequence number, and empty - the record. */ + the record. If we're doing a two-phase queue run initial phase, ping the + daemon to consider running a delivery on this host. */ if (host_record->count >= WAIT_NAME_MAX) { sprintf(CS buffer, "%.200s:%d", host->name, host_record->sequence); dbfn_write(dbm_file, buffer, host_record, sizeof(dbdata_wait) + host_length); +#ifdef EXPERIMENTAL_QUEUE_RAMP + if (f.queue_2stage && queue_fast_ramp && !queue_run_in_order) + queue_notify_daemon(message_id); +#endif host_record->sequence++; host_record->count = 0; host_length = 0; |