From 13762fce1926efb6b553bf20df256ccf6586f518 Mon Sep 17 00:00:00 2001 From: Bjørn Mork Date: Fri, 15 May 2015 10:25:29 +0200 Subject: ripe-atlas-fw: imported version 4670 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Bjørn Mork --- eperd/eooqd.c | 147 +++++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 109 insertions(+), 38 deletions(-) (limited to 'eperd/eooqd.c') diff --git a/eperd/eooqd.c b/eperd/eooqd.c index 4f572b7..8ce460f 100644 --- a/eperd/eooqd.c +++ b/eperd/eooqd.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013 RIPE NCC + * Copyright (c) 2013-2014 RIPE NCC * Licensed under GPLv2 or later, see file LICENSE in this tarball for details. * eooqd.c Libevent-based One-off queue daemon */ @@ -16,7 +16,7 @@ #define SUFFIX ".curr" #define OOQD_NEW_PREFIX "/home/atlas/data/new/ooq" -#define OOQD_OUT "/home/atlas/data/ooq.out/ooq" +#define OOQD_OUT_PREFIX "/home/atlas/data/out/ooq" #define ATLAS_SESSION_FILE "/home/atlas/status/con_session_id.txt" #define ATLAS_NARGS 64 /* Max arguments to a built-in command */ @@ -26,6 +26,9 @@ #define DBQ(str) "\"" #str "\"" +#define BARRIER_CMD "barrier" +#define POST_CMD "post" + struct slot { void *cmdstate; @@ -42,6 +45,9 @@ static struct int curr_busy; int curr_index; struct slot *slots; + + int barrier; + char *barrier_file; } *state; static struct builtin @@ -60,16 +66,16 @@ static struct builtin }; static const char *atlas_id; -static const char *out_filename; +static const char *queue_id; static void report(const char *fmt, ...); static void report_err(const char *fmt, ...); static void checkQueue(evutil_socket_t fd, short what, void *arg); -static void add_line(void); +static int add_line(void); static void cmddone(void *cmdstate); static void re_post(evutil_socket_t fd, short what, void *arg); -static void post_results(void); +static void post_results(int force_post); static void skip_space(char *cp, char **ncpp); static void skip_nonspace(char *cp, char **ncpp); static void find_eos(char *cp, char **ncpp); @@ -89,9 +95,10 @@ int eooqd_main(int argc, char *argv[]) atlas_id= NULL; pid_file_name= NULL; + queue_id= ""; - (void)getopt32(argv, "A:P:O:", &atlas_id, &pid_file_name, - &out_filename); + (void)getopt32(argv, "A:P:q:", &atlas_id, &pid_file_name, + &queue_id); if (argc != optind+1) { @@ -125,7 +132,6 @@ int eooqd_main(int argc, char *argv[]) strlcat(state->curr_qfile, SUFFIX, sizeof(state->curr_qfile)); signal(SIGQUIT, SIG_DFL); - chdir("/home/atlas/data"); limit.rlim_cur= RLIM_INFINITY; limit.rlim_max= RLIM_INFINITY; setrlimit(RLIMIT_CORE, &limit); @@ -168,9 +174,12 @@ int eooqd_main(int argc, char *argv[]) static void checkQueue(evutil_socket_t fd UNUSED_PARAM, short what UNUSED_PARAM, void *arg UNUSED_PARAM) { + int r; + char filename[80]; + if (!state->curr_file) { - /* Try to move queue_file to curr_qfile. This provide at most + /* Try to move queue_file to curr_qfile. This provides at most * once behavior and allows producers to create a new * queue_file while we process the old one. */ @@ -194,16 +203,20 @@ static void checkQueue(evutil_socket_t fd UNUSED_PARAM, while (state->curr_file && state->curr_busy < state->max_busy) { - add_line(); + r= add_line(); + if (r == -1) + break; /* Wait for barrier to complete */ } - check_resolv_conf2(out_filename, atlas_id); + snprintf(filename, sizeof(filename), + OOQD_OUT_PREFIX "%s/ooq.out", queue_id); + check_resolv_conf2(filename, atlas_id); } -static void add_line(void) +static int add_line(void) { char c; - int i, argc, skip, slot; + int i, argc, fd, skip, slot; size_t len; char *cp, *ncp; struct builtin *bp; @@ -215,15 +228,33 @@ static void add_line(void) char args[ATLAS_ARGSIZE]; char cmdline[256]; char filename[80]; + char filename2[80]; struct stat sb; + if (state->barrier) + { + if (state->curr_busy > 0) + return -1; + fd= open(state->barrier_file, O_CREAT, 0); + if (fd != -1) + close(fd); + else + { + report_err("unable to create barrier file '%s'", + state->barrier_file); + } + free(state->barrier_file); + state->barrier_file= NULL; + state->barrier= 0; + } + if (fgets(cmdline, sizeof(cmdline), state->curr_file) == NULL) { if (ferror(state->curr_file)) report_err("error reading queue file"); fclose(state->curr_file); state->curr_file= NULL; - return; + return 0; } cp= strchr(cmdline, '\n'); @@ -232,6 +263,33 @@ static void add_line(void) crondlog(LVL7 "atlas_run: looking for '%s'", cmdline); + /* Check for post command */ + if (strcmp(cmdline, POST_CMD) == 0) + { + /* Trigger a post */ + post_results(1 /* force_post */); + return 0; /* Done */ + } + + /* Check for barrier command */ + len= strlen(BARRIER_CMD); + if (strlen(cmdline) >= len && + strncmp(cmdline, BARRIER_CMD, len) == 0 && + cmdline[len] == ' ') + { + p= &cmdline[len]; + while (*p != '\0' && *p == ' ') + p++; + if (!validate_filename(p, SAFE_PREFIX)) + { + crondlog(LVL8 "insecure file '%s'. allowed path '%s'", + p, SAFE_PREFIX); + } + state->barrier= 1; + state->barrier_file= strdup(p); + return 0; + } + cmdstate= NULL; reason= NULL; for (bp= builtin_cmds; bp->cmd != NULL; bp++) @@ -337,7 +395,8 @@ static void add_line(void) if (state->slots[slot].cmdstate != NULL) crondlog(DIE9 "no empty slot?"); argv[argc++]= "-O"; - snprintf(filename, sizeof(filename), OOQD_NEW_PREFIX ".%d", slot); + snprintf(filename, sizeof(filename), OOQD_NEW_PREFIX "%s.%d", + queue_id, slot); argv[argc++]= filename; argv[argc]= NULL; @@ -361,11 +420,12 @@ static void add_line(void) error: if (cmdstate == NULL) { - fn= fopen(OOQD_NEW_PREFIX, "a"); + snprintf(filename, sizeof(filename), OOQD_NEW_PREFIX "%s", + queue_id); + fn= fopen(filename, "a"); if (!fn) { - crondlog(DIE9 "unable to append to '%s'", - OOQD_NEW_PREFIX); + crondlog(DIE9 "unable to append to '%s'", filename); } fprintf(fn, "RESULT { "); if (state->atlas_id) @@ -389,17 +449,21 @@ error: fprintf(fn, " }\n"); fclose(fn); - if (stat(OOQD_OUT, &sb) == -1 && - stat(OOQD_NEW_PREFIX, &sb) == 0) + snprintf(filename2, sizeof(filename2), + OOQD_OUT_PREFIX "%s/ooq", queue_id); + if (stat(filename2, &sb) == -1 && + stat(filename, &sb) == 0) { - if (rename(OOQD_NEW_PREFIX, OOQD_OUT) == -1) + if (rename(filename, filename2) == -1) { report_err("move '%s' to '%s' failed", - OOQD_NEW_PREFIX, OOQD_OUT); + filename, filename2); } } - post_results(); + post_results(0 /* !force_post */); } + + return 0; } static void cmddone(void *cmdstate) @@ -432,9 +496,9 @@ static void cmddone(void *cmdstate) report("cmddone: strange, cmd %p is busy", cmdstate); snprintf(from_filename, sizeof(from_filename), - "/home/atlas/data/new/ooq.%d", i); + OOQD_NEW_PREFIX "%s.%d", queue_id, i); snprintf(to_filename, sizeof(to_filename), - "/home/atlas/data/ooq.out/%d", i); + OOQD_OUT_PREFIX "%s/%d", queue_id, i); if (stat(to_filename, &sb) == 0) { report("output file '%s' is busy", to_filename); @@ -449,7 +513,7 @@ static void cmddone(void *cmdstate) if (state->curr_busy == 0) { - post_results(); + post_results(0 /* !force_post */); } } @@ -477,7 +541,7 @@ static void check_resolv_conf2(const char *out_file, const char *atlasid) RESOLV_CONF); evdns_base_resume(DnsBase); - if ((r != 0 || last_time != -1) && out_filename != NULL) + if ((r != 0 || last_time != -1) && out_file != NULL) { fn= fopen(out_file, "a"); if (!fn) @@ -503,10 +567,10 @@ static void re_post(evutil_socket_t fd UNUSED_PARAM, short what UNUSED_PARAM, /* Just call post_results every once in awhile in case some results * were left behind. */ - post_results(); + post_results(0 /* !force_post */); } -static void post_results(void) +static void post_results(int force_post) { int i, j, r, need_post, probe_id; const char *session_id; @@ -519,28 +583,33 @@ static void post_results(void) for (j= 0; j<5; j++) { /* Grab results and see if something need to be done. */ - need_post= 0; - - if (stat(OOQD_OUT, &sb) == 0) + need_post= force_post; + force_post= 0; /* Only one time */ + + snprintf(from_filename, sizeof(from_filename), + OOQD_NEW_PREFIX "%s", queue_id); + snprintf(to_filename, sizeof(to_filename), + OOQD_OUT_PREFIX "%s/ooq", queue_id); + if (stat(to_filename, &sb) == 0) { /* There is more to post */ need_post= 1; - } else if (stat(OOQD_NEW_PREFIX, &sb) == 0) + } else if (stat(from_filename, &sb) == 0) { - if (rename(OOQD_NEW_PREFIX, OOQD_OUT) == 0) + if (rename(from_filename, to_filename) == 0) need_post= 1; else { report_err("move '%s' to '%s' failed", - OOQD_NEW_PREFIX, OOQD_OUT); + from_filename, to_filename); } } for (i= 0; imax_busy; i++) { snprintf(from_filename, sizeof(from_filename), - "/home/atlas/data/new/ooq.%d", i); + OOQD_NEW_PREFIX "%s.%d", queue_id, i); snprintf(to_filename, sizeof(to_filename), - "/home/atlas/data/ooq.out/%d", i); + OOQD_OUT_PREFIX "%s/%d", queue_id, i); if (stat(to_filename, &sb) == 0) { /* There is more to post */ @@ -573,6 +642,8 @@ static void post_results(void) snprintf(url, sizeof(url), "http://127.0.0.1:8080/?PROBE_ID=%d&SESSION_ID=%s&SRC=oneoff", probe_id, session_id); + snprintf(from_filename, sizeof(from_filename), + OOQD_OUT_PREFIX "%s", queue_id); i= 0; argv[i++]= "httppost"; @@ -582,7 +653,7 @@ static void post_results(void) argv[i++]= "--post-header"; argv[i++]= "/home/atlas/status/p_to_c_report_header"; argv[i++]= "--post-dir"; - argv[i++]= "/home/atlas/data/ooq.out"; + argv[i++]= from_filename; argv[i++]= "--post-footer"; argv[i++]= "/home/atlas/status/con_session_id.txt"; argv[i++]= "-O"; -- cgit v1.2.3