diff options
author | Luke Shumaker <lukeshu@sbcglobal.net> | 2016-04-08 16:39:03 -0400 |
---|---|---|
committer | Luke Shumaker <lukeshu@sbcglobal.net> | 2016-04-08 16:39:03 -0400 |
commit | 608f4f254ccf95a01b5480e2dbe7a1b6e54c8609 (patch) | |
tree | b6a2dfe147ced25a86e1954c992435afd6b20678 | |
parent | d3a6398406f1a3f222c7ca00d4e4608267148456 (diff) |
sitting here
-rw-r--r-- | Makefile | 2 | ||||
-rw-r--r-- | multipart-replace-http-server.c | 47 | ||||
-rw-r--r-- | multipart-replace.c | 49 | ||||
-rw-r--r-- | multipart-replace.h | 6 | ||||
-rw-r--r-- | wg.c | 45 | ||||
-rw-r--r-- | wg.h | 19 |
6 files changed, 139 insertions, 29 deletions
@@ -24,7 +24,7 @@ all: freenect-server multipart-replace-http-server .PHONY: all freenect-server: util.o -lfreenect -lusb-1.0 -ljpeg -multipart-replace-http-server: util.o multipart-replace.o -lpthread +multipart-replace-http-server: util.o wg.o multipart-replace.o -lpthread clean: rm -f -- *.o .*.mk freenect-server freenect-server--kinect multipart-replace-http-server diff --git a/multipart-replace-http-server.c b/multipart-replace-http-server.c index 29b0ff5..726714f 100644 --- a/multipart-replace-http-server.c +++ b/multipart-replace-http-server.c @@ -14,6 +14,7 @@ #include <unistd.h> #include "util.h" +#include "wg.h" #include "multipart-replace.h" struct httpfile { @@ -31,6 +32,20 @@ struct reader_thread_args { const char *boundary; }; + +bool running = true; +int sock; +struct wg wg; + +void stop(int sig) { + log("Caught %d", sig); + log("STOPPING=1"); + running = false; + close(sock); +} + +#define threaderror(stat, errnum, ...) do { error(0, errnum, __VA_ARGS__); stop(0); } while(0) + void *reader_thread(void *args_anon) { struct reader_thread_args *args = args_anon; @@ -42,7 +57,8 @@ void *reader_thread(void *args_anon) { multipart_replace_reader(args->stream, args->fd, args->boundary); sleep(5); - error(EXIT_FAILURE, 0, "multipart_replace stream ended: %s", args->filename); + threaderror(EXIT_FAILURE, 0, "multipart_replace stream ended: %s", args->filename); + wg_sub(&wg, 1); return NULL; } @@ -50,9 +66,10 @@ void start_multipart_replace_reader(struct multipart_replace_stream *s, int fd, struct reader_thread_args *args = xrealloc(NULL, sizeof(struct reader_thread_args)); args->stream = s; args->fd = fd; - args->filename = filename; + args->filename = strdup(filename); args->boundary = boundary; pthread_t thread; + wg_add(&wg, 1); pthread_create(&thread, NULL, reader_thread, args); } @@ -60,8 +77,10 @@ void file_add(const char *filename) { struct multipart_replace_stream *stream = xrealloc(NULL, sizeof(struct multipart_replace_stream)); init_multipart_replace_stream(stream); int fd = open(filename, O_RDONLY); - if (fd < 0) - error(EXIT_FAILURE, errno, "opening file: %s", filename); + if (fd < 0) { + threaderror(EXIT_FAILURE, errno, "opening file: %s", filename); + return; + } filev = xrealloc(filev, (++filec)*sizeof(*filev)); char *shortname = strrchr(filename, '/'); if (shortname == NULL) { @@ -161,6 +180,7 @@ void *connection_thread(void *arg_anon) { log("Connection %d opened", fd); connection_handler(fd); log("Connection %d closed", fd); + wg_sub(&wg, 1); return NULL; } @@ -286,7 +306,6 @@ void usage() { } void cleanup() { - log("STOPPING=1"); fflush(stderr); sleep(5); /* work around systemd bug dropping log messages */ } @@ -316,25 +335,33 @@ int main(int argc, char *argv[]) { error_print_progname = progname; atexit(cleanup); - int sock = sockstream_listen(argv[1], argv[2]); + sock = sockstream_listen(argv[1], argv[2]); if (sock < 0) { if (sock == -EAI_SYSTEM) - error(1, errno, "Opening socket %s %s", argv[1], argv[2]); + error(EXIT_FAILURE, errno, "Opening socket %s %s", argv[1], argv[2]); else - error(1, 0, "Opening socket %s %s: %s", argv[1], argv[2], gai_strerror(-sock)); + error(EXIT_FAILURE, 0, "Opening socket %s %s: %s", argv[1], argv[2], gai_strerror(-sock)); } + wg_init(&wg); for (int i = 3; i < argc; i++) file_add(argv[i]); signal(SIGPIPE, SIG_IGN); - while (1) { + signal(SIGTERM, stop); + signal(SIGQUIT, stop); + signal(SIGINT, stop); + while (running) { int conn = accept(sock, NULL, NULL); if (conn < 0) continue; pthread_t thread; + wg_add(&wg, 1); pthread_create(&thread, NULL, connection_thread, (void*)(intptr_t)conn); } - + wg_wait(&wg); + for (size_t i = 0; i < filec; i++) + destroy_multipart_replace_stream(filev[i].stream); + free(filev); return 0; } diff --git a/multipart-replace.c b/multipart-replace.c index a10ba4d..9a5e268 100644 --- a/multipart-replace.c +++ b/multipart-replace.c @@ -4,13 +4,14 @@ #include <error.h> #include <stdio.h> #include <stdlib.h> +#include <stdlib.h> #include <string.h> #include <unistd.h> #include "multipart-replace.h" #include "util.h" -#define error(stat, errnum, ...) do { error(0, errnum, __VA_ARGS__); return; } while(0) +#define error(stat, errnum, ...) do { error(0, errnum, __VA_ARGS__); goto end; } while(0) static char *boundary_line(const char *old) { @@ -40,9 +41,9 @@ ssize_t safe_atoi(char *str) { return atoi(str); } -void multipart_replace_reader(struct multipart_replace_stream *s, int fd, const char *boundary) { +void multipart_replace_reader(struct multipart_replace_stream *s, int fd, const char *_boundary) { FILE *stream = fdopen(fd, "r"); - boundary = boundary_line(boundary); + char *boundary = boundary_line(_boundary); char *line_buf = NULL; size_t line_cap = 0; @@ -50,7 +51,7 @@ void multipart_replace_reader(struct multipart_replace_stream *s, int fd, const ssize_t content_length = -1; bool first = true; - while (1) { + while (running) { content_length = -1; /* scan for the first non-empty line */ do { @@ -103,27 +104,21 @@ void multipart_replace_reader(struct multipart_replace_stream *s, int fd, const s->back = tmp; s->framecount++; pthread_rwlock_unlock(&s->frontlock); - - //debug("got frame on %d", fd); } + end: + free(boundary); + fclose(stream); } -void multipart_replace_writer(struct multipart_replace_stream *s, int fd, const char *boundary) { +void multipart_replace_writer(struct multipart_replace_stream *s, int fd, const char *_boundary) { FILE *stream = fdopen(fd, "w"); struct frame myframe = { 0 }; long lastframe = 0; - boundary = boundary_line(boundary); + char *boundary = boundary_line(_boundary); size_t boundary_len = strlen(boundary); - while(1) { - /* poll until there's a new frame */ - pthread_rwlock_rdlock(&s->frontlock); - while (s->framecount == lastframe) { - pthread_rwlock_unlock(&s->frontlock); - usleep(30000); /* a bit over 30 FPS */ - pthread_rwlock_rdlock(&s->frontlock); - } - + pthread_rwlock_rdlock(&s->frontlock); + while (running) { /* get the most recent frame (copy `s->front` to `myframe`) */ if (myframe.cap < (size_t)s->front->len) myframe.buf = xrealloc(myframe.buf, myframe.cap = s->front->len); @@ -141,7 +136,20 @@ void multipart_replace_writer(struct multipart_replace_stream *s, int fd, const if (fwrite("\r\n", 2, 1, stream) < 1) error(EXIT_FAILURE, ferror(stream), "fwrite(\"\\r\\n\")"); fflush(stream); + + /* poll until there's a new frame */ + pthread_rwlock_rdlock(&s->frontlock); + while (s->framecount == lastframe && running) { + pthread_rwlock_unlock(&s->frontlock); + usleep(30000); /* a bit over 30 FPS */ + pthread_rwlock_rdlock(&s->frontlock); + } } + pthread_rwlock_unlock(&s->frontlock); + end: + free(boundary); + free(myframe.buf); + fclose(stream); } void init_multipart_replace_stream(struct multipart_replace_stream *s) { @@ -150,4 +158,11 @@ void init_multipart_replace_stream(struct multipart_replace_stream *s) { s->front = &s->a; s->back = &s->b; pthread_rwlock_init(&s->frontlock, NULL); + s->framecount = 0; +} + +void destroy_multipart_replace_stream(struct multipart_replace_stream *s) { + free(s->a.buf); + free(s->b.buf); + pthread_rwlock_destroy(&s->frontlock); } diff --git a/multipart-replace.h b/multipart-replace.h index 126f6fa..2aa5e61 100644 --- a/multipart-replace.h +++ b/multipart-replace.h @@ -3,6 +3,7 @@ #pragma once #include <pthread.h> +#include <stdbool.h> struct frame { ssize_t len; @@ -19,4 +20,7 @@ struct multipart_replace_stream { void multipart_replace_reader(struct multipart_replace_stream *s, int fd, const char *boundary); void multipart_replace_writer(struct multipart_replace_stream *s, int fd, const char *boundary); -void init_multipart_replace_stream(); +void init_multipart_replace_stream(struct multipart_replace_stream *s); +void destroy_multipart_replace_stream(struct multipart_replace_stream *s); + +extern bool running; @@ -0,0 +1,45 @@ +#include <errno.h> +#include <error.h> +#include <stdlib.h> /* for EXIT_FAILURE */ +#include <unistd.h> + +#include "wg.h" + +/* Thread management tools modeled on https://golang.org/pkg/sync/#WaitGroup */ + +/* pthread_cond_t is overly complicated. Just use a self-pipe. */ + +void wg_init(struct wg *wg) { + wg->count = 0; + pthread_mutex_init(&wg->lock, NULL); + int fds[2]; + if (pipe(fds) != 0) + error(EXIT_FAILURE, errno, "pipe"); + wg->fd_wait = fds[0]; + wg->fd_signal = fds[1]; +} + +void wg_add(struct wg *wg, unsigned int n) { + pthread_mutex_lock(&wg->lock); + wg->count += n; + pthread_mutex_unlock(&wg->lock); +} + +void wg_sub(struct wg *wg, unsigned int n) { + pthread_mutex_lock(&wg->lock); + wg->count -= n; + if (wg->count == 0) + if (write(wg->fd_signal, " ", 1) < 1) + error(EXIT_FAILURE, errno, "write"); + pthread_mutex_unlock(&wg->lock); +} + +void wg_wait(struct wg *wg) { + char b; + retry: + if (read(wg->fd_wait, &b, 1) == -1) { + if (errno == EINTR) + goto retry; + error(EXIT_FAILURE, errno, "wg_wait"); + } +} @@ -0,0 +1,19 @@ +#pragma once + +#include <pthread.h> + +/* Thread management tools modeled on https://golang.org/pkg/sync/#WaitGroup */ + +/* pthread_cond_t is overly complicated. Just use a self-pipe. */ + +struct wg { + int count; + pthread_mutex_t lock; + int fd_wait; + int fd_signal; +}; + +void wg_init(struct wg *); +void wg_add(struct wg *, unsigned int); +void wg_sub(struct wg *, unsigned int); +void wg_wait(struct wg*); |