summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLuke Shumaker <lukeshu@sbcglobal.net>2016-04-08 16:39:03 -0400
committerLuke Shumaker <lukeshu@sbcglobal.net>2016-04-08 16:39:03 -0400
commit608f4f254ccf95a01b5480e2dbe7a1b6e54c8609 (patch)
treeb6a2dfe147ced25a86e1954c992435afd6b20678
parentd3a6398406f1a3f222c7ca00d4e4608267148456 (diff)
sitting here
-rw-r--r--Makefile2
-rw-r--r--multipart-replace-http-server.c47
-rw-r--r--multipart-replace.c49
-rw-r--r--multipart-replace.h6
-rw-r--r--wg.c45
-rw-r--r--wg.h19
6 files changed, 139 insertions, 29 deletions
diff --git a/Makefile b/Makefile
index 05f71c9..9b0bf86 100644
--- a/Makefile
+++ b/Makefile
@@ -24,7 +24,7 @@ all: freenect-server multipart-replace-http-server
.PHONY: all
freenect-server: util.o -lfreenect -lusb-1.0 -ljpeg
-multipart-replace-http-server: util.o multipart-replace.o -lpthread
+multipart-replace-http-server: util.o wg.o multipart-replace.o -lpthread
clean:
rm -f -- *.o .*.mk freenect-server freenect-server--kinect multipart-replace-http-server
diff --git a/multipart-replace-http-server.c b/multipart-replace-http-server.c
index 29b0ff5..726714f 100644
--- a/multipart-replace-http-server.c
+++ b/multipart-replace-http-server.c
@@ -14,6 +14,7 @@
#include <unistd.h>
#include "util.h"
+#include "wg.h"
#include "multipart-replace.h"
struct httpfile {
@@ -31,6 +32,20 @@ struct reader_thread_args {
const char *boundary;
};
+
+bool running = true;
+int sock;
+struct wg wg;
+
+void stop(int sig) {
+ log("Caught %d", sig);
+ log("STOPPING=1");
+ running = false;
+ close(sock);
+}
+
+#define threaderror(stat, errnum, ...) do { error(0, errnum, __VA_ARGS__); stop(0); } while(0)
+
void *reader_thread(void *args_anon) {
struct reader_thread_args *args = args_anon;
@@ -42,7 +57,8 @@ void *reader_thread(void *args_anon) {
multipart_replace_reader(args->stream, args->fd, args->boundary);
sleep(5);
- error(EXIT_FAILURE, 0, "multipart_replace stream ended: %s", args->filename);
+ threaderror(EXIT_FAILURE, 0, "multipart_replace stream ended: %s", args->filename);
+ wg_sub(&wg, 1);
return NULL;
}
@@ -50,9 +66,10 @@ void start_multipart_replace_reader(struct multipart_replace_stream *s, int fd,
struct reader_thread_args *args = xrealloc(NULL, sizeof(struct reader_thread_args));
args->stream = s;
args->fd = fd;
- args->filename = filename;
+ args->filename = strdup(filename);
args->boundary = boundary;
pthread_t thread;
+ wg_add(&wg, 1);
pthread_create(&thread, NULL, reader_thread, args);
}
@@ -60,8 +77,10 @@ void file_add(const char *filename) {
struct multipart_replace_stream *stream = xrealloc(NULL, sizeof(struct multipart_replace_stream));
init_multipart_replace_stream(stream);
int fd = open(filename, O_RDONLY);
- if (fd < 0)
- error(EXIT_FAILURE, errno, "opening file: %s", filename);
+ if (fd < 0) {
+ threaderror(EXIT_FAILURE, errno, "opening file: %s", filename);
+ return;
+ }
filev = xrealloc(filev, (++filec)*sizeof(*filev));
char *shortname = strrchr(filename, '/');
if (shortname == NULL) {
@@ -161,6 +180,7 @@ void *connection_thread(void *arg_anon) {
log("Connection %d opened", fd);
connection_handler(fd);
log("Connection %d closed", fd);
+ wg_sub(&wg, 1);
return NULL;
}
@@ -286,7 +306,6 @@ void usage() {
}
void cleanup() {
- log("STOPPING=1");
fflush(stderr);
sleep(5); /* work around systemd bug dropping log messages */
}
@@ -316,25 +335,33 @@ int main(int argc, char *argv[]) {
error_print_progname = progname;
atexit(cleanup);
- int sock = sockstream_listen(argv[1], argv[2]);
+ sock = sockstream_listen(argv[1], argv[2]);
if (sock < 0) {
if (sock == -EAI_SYSTEM)
- error(1, errno, "Opening socket %s %s", argv[1], argv[2]);
+ error(EXIT_FAILURE, errno, "Opening socket %s %s", argv[1], argv[2]);
else
- error(1, 0, "Opening socket %s %s: %s", argv[1], argv[2], gai_strerror(-sock));
+ error(EXIT_FAILURE, 0, "Opening socket %s %s: %s", argv[1], argv[2], gai_strerror(-sock));
}
+ wg_init(&wg);
for (int i = 3; i < argc; i++)
file_add(argv[i]);
signal(SIGPIPE, SIG_IGN);
- while (1) {
+ signal(SIGTERM, stop);
+ signal(SIGQUIT, stop);
+ signal(SIGINT, stop);
+ while (running) {
int conn = accept(sock, NULL, NULL);
if (conn < 0)
continue;
pthread_t thread;
+ wg_add(&wg, 1);
pthread_create(&thread, NULL, connection_thread, (void*)(intptr_t)conn);
}
-
+ wg_wait(&wg);
+ for (size_t i = 0; i < filec; i++)
+ destroy_multipart_replace_stream(filev[i].stream);
+ free(filev);
return 0;
}
diff --git a/multipart-replace.c b/multipart-replace.c
index a10ba4d..9a5e268 100644
--- a/multipart-replace.c
+++ b/multipart-replace.c
@@ -4,13 +4,14 @@
#include <error.h>
#include <stdio.h>
#include <stdlib.h>
+#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "multipart-replace.h"
#include "util.h"
-#define error(stat, errnum, ...) do { error(0, errnum, __VA_ARGS__); return; } while(0)
+#define error(stat, errnum, ...) do { error(0, errnum, __VA_ARGS__); goto end; } while(0)
static
char *boundary_line(const char *old) {
@@ -40,9 +41,9 @@ ssize_t safe_atoi(char *str) {
return atoi(str);
}
-void multipart_replace_reader(struct multipart_replace_stream *s, int fd, const char *boundary) {
+void multipart_replace_reader(struct multipart_replace_stream *s, int fd, const char *_boundary) {
FILE *stream = fdopen(fd, "r");
- boundary = boundary_line(boundary);
+ char *boundary = boundary_line(_boundary);
char *line_buf = NULL;
size_t line_cap = 0;
@@ -50,7 +51,7 @@ void multipart_replace_reader(struct multipart_replace_stream *s, int fd, const
ssize_t content_length = -1;
bool first = true;
- while (1) {
+ while (running) {
content_length = -1;
/* scan for the first non-empty line */
do {
@@ -103,27 +104,21 @@ void multipart_replace_reader(struct multipart_replace_stream *s, int fd, const
s->back = tmp;
s->framecount++;
pthread_rwlock_unlock(&s->frontlock);
-
- //debug("got frame on %d", fd);
}
+ end:
+ free(boundary);
+ fclose(stream);
}
-void multipart_replace_writer(struct multipart_replace_stream *s, int fd, const char *boundary) {
+void multipart_replace_writer(struct multipart_replace_stream *s, int fd, const char *_boundary) {
FILE *stream = fdopen(fd, "w");
struct frame myframe = { 0 };
long lastframe = 0;
- boundary = boundary_line(boundary);
+ char *boundary = boundary_line(_boundary);
size_t boundary_len = strlen(boundary);
- while(1) {
- /* poll until there's a new frame */
- pthread_rwlock_rdlock(&s->frontlock);
- while (s->framecount == lastframe) {
- pthread_rwlock_unlock(&s->frontlock);
- usleep(30000); /* a bit over 30 FPS */
- pthread_rwlock_rdlock(&s->frontlock);
- }
-
+ pthread_rwlock_rdlock(&s->frontlock);
+ while (running) {
/* get the most recent frame (copy `s->front` to `myframe`) */
if (myframe.cap < (size_t)s->front->len)
myframe.buf = xrealloc(myframe.buf, myframe.cap = s->front->len);
@@ -141,7 +136,20 @@ void multipart_replace_writer(struct multipart_replace_stream *s, int fd, const
if (fwrite("\r\n", 2, 1, stream) < 1)
error(EXIT_FAILURE, ferror(stream), "fwrite(\"\\r\\n\")");
fflush(stream);
+
+ /* poll until there's a new frame */
+ pthread_rwlock_rdlock(&s->frontlock);
+ while (s->framecount == lastframe && running) {
+ pthread_rwlock_unlock(&s->frontlock);
+ usleep(30000); /* a bit over 30 FPS */
+ pthread_rwlock_rdlock(&s->frontlock);
+ }
}
+ pthread_rwlock_unlock(&s->frontlock);
+ end:
+ free(boundary);
+ free(myframe.buf);
+ fclose(stream);
}
void init_multipart_replace_stream(struct multipart_replace_stream *s) {
@@ -150,4 +158,11 @@ void init_multipart_replace_stream(struct multipart_replace_stream *s) {
s->front = &s->a;
s->back = &s->b;
pthread_rwlock_init(&s->frontlock, NULL);
+ s->framecount = 0;
+}
+
+void destroy_multipart_replace_stream(struct multipart_replace_stream *s) {
+ free(s->a.buf);
+ free(s->b.buf);
+ pthread_rwlock_destroy(&s->frontlock);
}
diff --git a/multipart-replace.h b/multipart-replace.h
index 126f6fa..2aa5e61 100644
--- a/multipart-replace.h
+++ b/multipart-replace.h
@@ -3,6 +3,7 @@
#pragma once
#include <pthread.h>
+#include <stdbool.h>
struct frame {
ssize_t len;
@@ -19,4 +20,7 @@ struct multipart_replace_stream {
void multipart_replace_reader(struct multipart_replace_stream *s, int fd, const char *boundary);
void multipart_replace_writer(struct multipart_replace_stream *s, int fd, const char *boundary);
-void init_multipart_replace_stream();
+void init_multipart_replace_stream(struct multipart_replace_stream *s);
+void destroy_multipart_replace_stream(struct multipart_replace_stream *s);
+
+extern bool running;
diff --git a/wg.c b/wg.c
new file mode 100644
index 0000000..4f60765
--- /dev/null
+++ b/wg.c
@@ -0,0 +1,45 @@
+#include <errno.h>
+#include <error.h>
+#include <stdlib.h> /* for EXIT_FAILURE */
+#include <unistd.h>
+
+#include "wg.h"
+
+/* Thread management tools modeled on https://golang.org/pkg/sync/#WaitGroup */
+
+/* pthread_cond_t is overly complicated. Just use a self-pipe. */
+
+void wg_init(struct wg *wg) {
+ wg->count = 0;
+ pthread_mutex_init(&wg->lock, NULL);
+ int fds[2];
+ if (pipe(fds) != 0)
+ error(EXIT_FAILURE, errno, "pipe");
+ wg->fd_wait = fds[0];
+ wg->fd_signal = fds[1];
+}
+
+void wg_add(struct wg *wg, unsigned int n) {
+ pthread_mutex_lock(&wg->lock);
+ wg->count += n;
+ pthread_mutex_unlock(&wg->lock);
+}
+
+void wg_sub(struct wg *wg, unsigned int n) {
+ pthread_mutex_lock(&wg->lock);
+ wg->count -= n;
+ if (wg->count == 0)
+ if (write(wg->fd_signal, " ", 1) < 1)
+ error(EXIT_FAILURE, errno, "write");
+ pthread_mutex_unlock(&wg->lock);
+}
+
+void wg_wait(struct wg *wg) {
+ char b;
+ retry:
+ if (read(wg->fd_wait, &b, 1) == -1) {
+ if (errno == EINTR)
+ goto retry;
+ error(EXIT_FAILURE, errno, "wg_wait");
+ }
+}
diff --git a/wg.h b/wg.h
new file mode 100644
index 0000000..777e8a3
--- /dev/null
+++ b/wg.h
@@ -0,0 +1,19 @@
+#pragma once
+
+#include <pthread.h>
+
+/* Thread management tools modeled on https://golang.org/pkg/sync/#WaitGroup */
+
+/* pthread_cond_t is overly complicated. Just use a self-pipe. */
+
+struct wg {
+ int count;
+ pthread_mutex_t lock;
+ int fd_wait;
+ int fd_signal;
+};
+
+void wg_init(struct wg *);
+void wg_add(struct wg *, unsigned int);
+void wg_sub(struct wg *, unsigned int);
+void wg_wait(struct wg*);