aboutsummaryrefslogtreecommitdiff
path: root/eperd/eooqd.c
diff options
context:
space:
mode:
authorBjørn Mork <bjorn@mork.no>2015-05-15 10:25:29 +0200
committerBjørn Mork <bjorn@mork.no>2015-05-15 10:25:29 +0200
commit13762fce1926efb6b553bf20df256ccf6586f518 (patch)
tree7ac288e17edd643380c1e18dda5b49d02839fa85 /eperd/eooqd.c
parent73e699faf130d0fc0f2f076d95db9dbd7f42a8b6 (diff)
ripe-atlas-fw: imported version 46704670
Signed-off-by: Bjørn Mork <bjorn@mork.no>
Diffstat (limited to 'eperd/eooqd.c')
-rw-r--r--eperd/eooqd.c147
1 files changed, 109 insertions, 38 deletions
diff --git a/eperd/eooqd.c b/eperd/eooqd.c
index 4f572b7..8ce460f 100644
--- a/eperd/eooqd.c
+++ b/eperd/eooqd.c
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2013 RIPE NCC <atlas@ripe.net>
+ * Copyright (c) 2013-2014 RIPE NCC <atlas@ripe.net>
* Licensed under GPLv2 or later, see file LICENSE in this tarball for details.
* eooqd.c Libevent-based One-off queue daemon
*/
@@ -16,7 +16,7 @@
#define SUFFIX ".curr"
#define OOQD_NEW_PREFIX "/home/atlas/data/new/ooq"
-#define OOQD_OUT "/home/atlas/data/ooq.out/ooq"
+#define OOQD_OUT_PREFIX "/home/atlas/data/out/ooq"
#define ATLAS_SESSION_FILE "/home/atlas/status/con_session_id.txt"
#define ATLAS_NARGS 64 /* Max arguments to a built-in command */
@@ -26,6 +26,9 @@
#define DBQ(str) "\"" #str "\""
+#define BARRIER_CMD "barrier"
+#define POST_CMD "post"
+
struct slot
{
void *cmdstate;
@@ -42,6 +45,9 @@ static struct
int curr_busy;
int curr_index;
struct slot *slots;
+
+ int barrier;
+ char *barrier_file;
} *state;
static struct builtin
@@ -60,16 +66,16 @@ static struct builtin
};
static const char *atlas_id;
-static const char *out_filename;
+static const char *queue_id;
static void report(const char *fmt, ...);
static void report_err(const char *fmt, ...);
static void checkQueue(evutil_socket_t fd, short what, void *arg);
-static void add_line(void);
+static int add_line(void);
static void cmddone(void *cmdstate);
static void re_post(evutil_socket_t fd, short what, void *arg);
-static void post_results(void);
+static void post_results(int force_post);
static void skip_space(char *cp, char **ncpp);
static void skip_nonspace(char *cp, char **ncpp);
static void find_eos(char *cp, char **ncpp);
@@ -89,9 +95,10 @@ int eooqd_main(int argc, char *argv[])
atlas_id= NULL;
pid_file_name= NULL;
+ queue_id= "";
- (void)getopt32(argv, "A:P:O:", &atlas_id, &pid_file_name,
- &out_filename);
+ (void)getopt32(argv, "A:P:q:", &atlas_id, &pid_file_name,
+ &queue_id);
if (argc != optind+1)
{
@@ -125,7 +132,6 @@ int eooqd_main(int argc, char *argv[])
strlcat(state->curr_qfile, SUFFIX, sizeof(state->curr_qfile));
signal(SIGQUIT, SIG_DFL);
- chdir("/home/atlas/data");
limit.rlim_cur= RLIM_INFINITY;
limit.rlim_max= RLIM_INFINITY;
setrlimit(RLIMIT_CORE, &limit);
@@ -168,9 +174,12 @@ int eooqd_main(int argc, char *argv[])
static void checkQueue(evutil_socket_t fd UNUSED_PARAM,
short what UNUSED_PARAM, void *arg UNUSED_PARAM)
{
+ int r;
+ char filename[80];
+
if (!state->curr_file)
{
- /* Try to move queue_file to curr_qfile. This provide at most
+ /* Try to move queue_file to curr_qfile. This provides at most
* once behavior and allows producers to create a new
* queue_file while we process the old one.
*/
@@ -194,16 +203,20 @@ static void checkQueue(evutil_socket_t fd UNUSED_PARAM,
while (state->curr_file && state->curr_busy < state->max_busy)
{
- add_line();
+ r= add_line();
+ if (r == -1)
+ break; /* Wait for barrier to complete */
}
- check_resolv_conf2(out_filename, atlas_id);
+ snprintf(filename, sizeof(filename),
+ OOQD_OUT_PREFIX "%s/ooq.out", queue_id);
+ check_resolv_conf2(filename, atlas_id);
}
-static void add_line(void)
+static int add_line(void)
{
char c;
- int i, argc, skip, slot;
+ int i, argc, fd, skip, slot;
size_t len;
char *cp, *ncp;
struct builtin *bp;
@@ -215,15 +228,33 @@ static void add_line(void)
char args[ATLAS_ARGSIZE];
char cmdline[256];
char filename[80];
+ char filename2[80];
struct stat sb;
+ if (state->barrier)
+ {
+ if (state->curr_busy > 0)
+ return -1;
+ fd= open(state->barrier_file, O_CREAT, 0);
+ if (fd != -1)
+ close(fd);
+ else
+ {
+ report_err("unable to create barrier file '%s'",
+ state->barrier_file);
+ }
+ free(state->barrier_file);
+ state->barrier_file= NULL;
+ state->barrier= 0;
+ }
+
if (fgets(cmdline, sizeof(cmdline), state->curr_file) == NULL)
{
if (ferror(state->curr_file))
report_err("error reading queue file");
fclose(state->curr_file);
state->curr_file= NULL;
- return;
+ return 0;
}
cp= strchr(cmdline, '\n');
@@ -232,6 +263,33 @@ static void add_line(void)
crondlog(LVL7 "atlas_run: looking for '%s'", cmdline);
+ /* Check for post command */
+ if (strcmp(cmdline, POST_CMD) == 0)
+ {
+ /* Trigger a post */
+ post_results(1 /* force_post */);
+ return 0; /* Done */
+ }
+
+ /* Check for barrier command */
+ len= strlen(BARRIER_CMD);
+ if (strlen(cmdline) >= len &&
+ strncmp(cmdline, BARRIER_CMD, len) == 0 &&
+ cmdline[len] == ' ')
+ {
+ p= &cmdline[len];
+ while (*p != '\0' && *p == ' ')
+ p++;
+ if (!validate_filename(p, SAFE_PREFIX))
+ {
+ crondlog(LVL8 "insecure file '%s'. allowed path '%s'",
+ p, SAFE_PREFIX);
+ }
+ state->barrier= 1;
+ state->barrier_file= strdup(p);
+ return 0;
+ }
+
cmdstate= NULL;
reason= NULL;
for (bp= builtin_cmds; bp->cmd != NULL; bp++)
@@ -337,7 +395,8 @@ static void add_line(void)
if (state->slots[slot].cmdstate != NULL)
crondlog(DIE9 "no empty slot?");
argv[argc++]= "-O";
- snprintf(filename, sizeof(filename), OOQD_NEW_PREFIX ".%d", slot);
+ snprintf(filename, sizeof(filename), OOQD_NEW_PREFIX "%s.%d",
+ queue_id, slot);
argv[argc++]= filename;
argv[argc]= NULL;
@@ -361,11 +420,12 @@ static void add_line(void)
error:
if (cmdstate == NULL)
{
- fn= fopen(OOQD_NEW_PREFIX, "a");
+ snprintf(filename, sizeof(filename), OOQD_NEW_PREFIX "%s",
+ queue_id);
+ fn= fopen(filename, "a");
if (!fn)
{
- crondlog(DIE9 "unable to append to '%s'",
- OOQD_NEW_PREFIX);
+ crondlog(DIE9 "unable to append to '%s'", filename);
}
fprintf(fn, "RESULT { ");
if (state->atlas_id)
@@ -389,17 +449,21 @@ error:
fprintf(fn, " }\n");
fclose(fn);
- if (stat(OOQD_OUT, &sb) == -1 &&
- stat(OOQD_NEW_PREFIX, &sb) == 0)
+ snprintf(filename2, sizeof(filename2),
+ OOQD_OUT_PREFIX "%s/ooq", queue_id);
+ if (stat(filename2, &sb) == -1 &&
+ stat(filename, &sb) == 0)
{
- if (rename(OOQD_NEW_PREFIX, OOQD_OUT) == -1)
+ if (rename(filename, filename2) == -1)
{
report_err("move '%s' to '%s' failed",
- OOQD_NEW_PREFIX, OOQD_OUT);
+ filename, filename2);
}
}
- post_results();
+ post_results(0 /* !force_post */);
}
+
+ return 0;
}
static void cmddone(void *cmdstate)
@@ -432,9 +496,9 @@ static void cmddone(void *cmdstate)
report("cmddone: strange, cmd %p is busy", cmdstate);
snprintf(from_filename, sizeof(from_filename),
- "/home/atlas/data/new/ooq.%d", i);
+ OOQD_NEW_PREFIX "%s.%d", queue_id, i);
snprintf(to_filename, sizeof(to_filename),
- "/home/atlas/data/ooq.out/%d", i);
+ OOQD_OUT_PREFIX "%s/%d", queue_id, i);
if (stat(to_filename, &sb) == 0)
{
report("output file '%s' is busy", to_filename);
@@ -449,7 +513,7 @@ static void cmddone(void *cmdstate)
if (state->curr_busy == 0)
{
- post_results();
+ post_results(0 /* !force_post */);
}
}
@@ -477,7 +541,7 @@ static void check_resolv_conf2(const char *out_file, const char *atlasid)
RESOLV_CONF);
evdns_base_resume(DnsBase);
- if ((r != 0 || last_time != -1) && out_filename != NULL)
+ if ((r != 0 || last_time != -1) && out_file != NULL)
{
fn= fopen(out_file, "a");
if (!fn)
@@ -503,10 +567,10 @@ static void re_post(evutil_socket_t fd UNUSED_PARAM, short what UNUSED_PARAM,
/* Just call post_results every once in awhile in case some results
* were left behind.
*/
- post_results();
+ post_results(0 /* !force_post */);
}
-static void post_results(void)
+static void post_results(int force_post)
{
int i, j, r, need_post, probe_id;
const char *session_id;
@@ -519,28 +583,33 @@ static void post_results(void)
for (j= 0; j<5; j++)
{
/* Grab results and see if something need to be done. */
- need_post= 0;
-
- if (stat(OOQD_OUT, &sb) == 0)
+ need_post= force_post;
+ force_post= 0; /* Only one time */
+
+ snprintf(from_filename, sizeof(from_filename),
+ OOQD_NEW_PREFIX "%s", queue_id);
+ snprintf(to_filename, sizeof(to_filename),
+ OOQD_OUT_PREFIX "%s/ooq", queue_id);
+ if (stat(to_filename, &sb) == 0)
{
/* There is more to post */
need_post= 1;
- } else if (stat(OOQD_NEW_PREFIX, &sb) == 0)
+ } else if (stat(from_filename, &sb) == 0)
{
- if (rename(OOQD_NEW_PREFIX, OOQD_OUT) == 0)
+ if (rename(from_filename, to_filename) == 0)
need_post= 1;
else
{
report_err("move '%s' to '%s' failed",
- OOQD_NEW_PREFIX, OOQD_OUT);
+ from_filename, to_filename);
}
}
for (i= 0; i<state->max_busy; i++)
{
snprintf(from_filename, sizeof(from_filename),
- "/home/atlas/data/new/ooq.%d", i);
+ OOQD_NEW_PREFIX "%s.%d", queue_id, i);
snprintf(to_filename, sizeof(to_filename),
- "/home/atlas/data/ooq.out/%d", i);
+ OOQD_OUT_PREFIX "%s/%d", queue_id, i);
if (stat(to_filename, &sb) == 0)
{
/* There is more to post */
@@ -573,6 +642,8 @@ static void post_results(void)
snprintf(url, sizeof(url),
"http://127.0.0.1:8080/?PROBE_ID=%d&SESSION_ID=%s&SRC=oneoff",
probe_id, session_id);
+ snprintf(from_filename, sizeof(from_filename),
+ OOQD_OUT_PREFIX "%s", queue_id);
i= 0;
argv[i++]= "httppost";
@@ -582,7 +653,7 @@ static void post_results(void)
argv[i++]= "--post-header";
argv[i++]= "/home/atlas/status/p_to_c_report_header";
argv[i++]= "--post-dir";
- argv[i++]= "/home/atlas/data/ooq.out";
+ argv[i++]= from_filename;
argv[i++]= "--post-footer";
argv[i++]= "/home/atlas/status/con_session_id.txt";
argv[i++]= "-O";