diff options
author | Luke Shumaker <lukeshu@sbcglobal.net> | 2016-04-12 00:01:27 -0400 |
---|---|---|
committer | Luke Shumaker <lukeshu@sbcglobal.net> | 2016-04-12 00:01:27 -0400 |
commit | 24de3887e5e3bba2b8a4f60d1224ac927b98b1ed (patch) | |
tree | 20a27018a7fae78205e0bfee7fac4db4c82c0dea | |
parent | c8f6735fce3d7ba5aa8019775d26b6999ed5a344 (diff) |
stuff
- fix memory leaks (except for those from libusb)
- fix graceful shutdown
- fix systemd weirdness
-rw-r--r-- | Makefile | 7 | ||||
-rw-r--r-- | src/freenect-server.c | 130 | ||||
-rw-r--r-- | src/multipart-replace-http-server.c | 53 | ||||
-rw-r--r-- | src/multipart-replace.c | 55 | ||||
-rw-r--r-- | src/multipart-replace.h | 4 | ||||
-rw-r--r-- | src/wg.c | 86 | ||||
-rw-r--r-- | src/wg.h | 13 | ||||
-rw-r--r-- | systemd/freenect-server-http.service.in | 1 | ||||
-rw-r--r-- | systemd/freenect-server.service.in | 8 | ||||
-rw-r--r-- | systemd/freenect-server@.socket.in | 25 |
10 files changed, 220 insertions, 162 deletions
@@ -40,7 +40,7 @@ all: build include config.mk progs = freenect-server multipart-replace-http-server -units = freenect-server-http.service freenect-server-http.socket freenect-server.service freenect-server@.socket +units = freenect-server-http.service freenect-server-http.socket freenect-server.service build: $(addprefix src/,$(progs)) $(addprefix systemd/,$(units)) .PHONY: build @@ -53,12 +53,11 @@ CPPFLAGS += -MD -MF $(@D)/$(@F:%.o=.%.mk) -MP -include src/.*.mk # Dependencies that don't get figured out automatically -src/freenect-server: src/util.o -lfreenect -lusb-1.0 -ljpeg -src/multipart-replace-http-server: src/util.o src/wg.o src/multipart-replace.o -lpthread +src/freenect-server: src/util.o -lsystemd -lfreenect -lusb-1.0 -ljpeg +src/multipart-replace-http-server: src/util.o src/wg.o src/multipart-replace.o -lsystemd -lpthread systemd/freenect-server-http.service: .var.user .var.group .var.bindir systemd/freenect-server-http.socket: .var.httpstream systemd/freenect-server.service: .var.user .var.group .var.bindir -systemd/freenect-server@.socket: .var.user .var.group # Rules to make things COPYING.GPL3: diff --git a/src/freenect-server.c b/src/freenect-server.c index a36701c..286455b 100644 --- a/src/freenect-server.c +++ b/src/freenect-server.c @@ -24,10 +24,13 @@ #include <errno.h> #include <error.h> #include <inttypes.h> +#include <stdbool.h> #include <stdio.h> -#include <stdlib.h> /* atexit */ +#include <stdlib.h> /* atexit, EXIT_FAILURE */ #include <unistd.h> /* dup2 */ +#include <systemd/sd-daemon.h> + #include <libfreenect/libfreenect.h> #include <libusb-1.0/libusb.h> @@ -35,10 +38,12 @@ #include "util.h" -FILE *depth_stream = NULL; -FILE *video_stream = NULL; -FILE *accel_stream = NULL; -freenect_context *ctx; +void stop(int sig); +#define threaderror(stat, errnum, ...) do { \ + error(0, errnum, __VA_ARGS__); \ + exitcode = stat; \ + stop(0); \ + } while(0) struct mpjpg_encoder { struct jpeg_error_mgr jpeg_encoder_err; @@ -46,10 +51,17 @@ struct mpjpg_encoder { unsigned char *jpeg_buf; unsigned long jpeg_len; }; - + +int exitcode = 0; +bool running = true; +FILE *depth_stream = NULL; +FILE *video_stream = NULL; +FILE *accel_stream = NULL; +freenect_context *ctx = NULL; +freenect_device *dev = NULL; struct mpjpg_encoder depth_encoder; struct mpjpg_encoder video_encoder; - + void mpjpg_init(struct mpjpg_encoder *e) { e->jpeg_encoder.err = jpeg_std_error(&e->jpeg_encoder_err); jpeg_create_compress(&e->jpeg_encoder); @@ -63,7 +75,12 @@ void mpjpg_init(struct mpjpg_encoder *e) { jpeg_mem_dest(&e->jpeg_encoder, &e->jpeg_buf, &e->jpeg_len); } -void write_imageframe(FILE *stream, struct mpjpg_encoder *e, JSAMPLE *data) { +void mpjpg_destroy(struct mpjpg_encoder *e) { + jpeg_destroy_compress(&e->jpeg_encoder); + free(e->jpeg_buf); +} + +void write_imageframe(const char *name, FILE *stream, struct mpjpg_encoder *e, JSAMPLE *data) { JSAMPLE *row_pointers[480]; for (size_t i = 0; i < 480; i++) row_pointers[i] = &data[i*640*3]; @@ -79,14 +96,14 @@ void write_imageframe(FILE *stream, struct mpjpg_encoder *e, JSAMPLE *data) { "Content-length: %lu\r\n" "\r\n", e->jpeg_len) < 1) - error(EXIT_FAILURE, ferror(stream), "writing header"); + threaderror(EXIT_FAILURE, ferror(stream), "%s: writing header", name); if (fwrite(e->jpeg_buf, e->jpeg_len, 1, stream) < 1) - error(EXIT_FAILURE, ferror(stream), "writing body"); + threaderror(EXIT_FAILURE, ferror(stream), "%s: writing body", name); if (fwrite("\r\n", 2, 1, stream) < 1) - error(EXIT_FAILURE, ferror(stream), "writing footer"); + threaderror(EXIT_FAILURE, ferror(stream), "%s: writing footer", name); fflush(stream); } - + void handle_accel(freenect_device *dev UNUSED, freenect_raw_tilt_state* data) { double x, y, z, angle; freenect_get_mks_accel(data, &x, &y, &z); @@ -116,14 +133,16 @@ void handle_accel(freenect_device *dev UNUSED, freenect_raw_tilt_state* data) { " \"motor\": \"%s\" }\n", x, y, z, angle, motor); if (len < 0) - error(EXIT_FAILURE, errno, "asprintf"); - fprintf(accel_stream, + threaderror(EXIT_FAILURE, errno, "accel.mjson: asprintf"); + if (fprintf(accel_stream, "--ffserver\r\n" "Content-type: application/json\r\n" "Content-length: %d\r\n" "\r\n" "%s\r\n", - len, json); + len, json) < 0) + threaderror(EXIT_FAILURE, ferror(accel_stream), "accel.mjson: write"); + free(json); fflush(accel_stream); } @@ -131,46 +150,28 @@ uint8_t depth_rgb[640*480*3]; void handle_depth(freenect_device *dev, void *depth, uint32_t timestamp UNUSED) { uint32_t size = freenect_get_current_depth_mode(dev).bytes; if (size != 640*480*2) - error(EXIT_FAILURE, 0, "handle_depth: expected 640*480*2 byte frame, but got %"PRId32, size); + threaderror(EXIT_FAILURE, 0, "handle_depth: expected 640*480*2 byte frame, but got %"PRId32, size); /* scale the 11-bit values into 8-bit values */ uint16_t *depth_grey = depth; for (size_t i = 0; i < 640*480; i++) depth_rgb[i*3+0] = depth_rgb[i*3+1] = depth_rgb[i*3+2] = (uint8_t)(depth_grey[i]*8.0/11.0); /* write the image */ - write_imageframe(depth_stream, &depth_encoder, depth_rgb); + write_imageframe("depth.mjpg", depth_stream, &depth_encoder, depth_rgb); } void handle_video(freenect_device *dev, void *rgb, uint32_t timestamp UNUSED) { uint32_t size = freenect_get_current_video_mode(dev).bytes; if (size != 640*480*3) - error(EXIT_FAILURE, 0, "handle_video: expected 640*480*3 byte frame, but got %"PRId32, size); + threaderror(EXIT_FAILURE, 0, "handle_video: expected 640*480*3 byte frame, but got %"PRId32, size); /* write the image */ - write_imageframe(video_stream, &video_encoder, rgb); -} - -void print_mode(const char *name, freenect_frame_mode mode) { - /* This is just a courtesy function to let the user know the mode - if it becomes a bother for maintainability just comment out the - code in its body. It will only break if struct entries go missing. - */ - printf("%s Mode: {%d, %d, {%d}, %d, %d, %d, %d, %d, %d, %d}\n", name, - mode.reserved, (int)mode.resolution, (int)mode.video_format, mode.bytes, mode.width, - mode.height, mode.data_bits_per_pixel, mode.padding_bits_per_pixel, - mode.framerate, mode.is_valid); + write_imageframe("video.mjpg", video_stream, &video_encoder, rgb); } - -void cleanup() { - log("STOPPING=1"); - if (ctx) - freenect_shutdown(ctx); - if (video_stream) - fclose(video_stream); - if (depth_stream) - fclose(depth_stream); - if (accel_stream) - fclose(accel_stream); - fflush(stderr); - sleep(5); /* work around systemd bug dropping log messages */ + +void stop(int sig) { + if (sig != 0) + log("Caught %d", sig); + sd_notify(0, "STOPPING=1"); + running = false; } FILE *xfopen(const char *path, const char *mode) { @@ -194,10 +195,13 @@ void usage() { printf("Usage: %s [-h] [-v video.mjpg|-V video_fd] [-d depth.mjpg|-D depth_fd] [-a accel.mjson|-A accel_fd]\n", program_invocation_name); } +void cleanup(void); + int main(int argc, char *argv[]) { int res = 0; - freenect_device *dev; + mpjpg_init(&video_encoder); + mpjpg_init(&depth_encoder); atexit(cleanup); for (int i = 1; i < argc; i++) { @@ -247,9 +251,6 @@ int main(int argc, char *argv[]) { return EXIT_FAILURE; } - mpjpg_init(&depth_encoder); - mpjpg_init(&video_encoder); - res = freenect_init(&ctx, 0); if (res) { error(EXIT_FAILURE, 0, "freenect_init: %s", libusb_strerror(res)); @@ -267,20 +268,26 @@ int main(int argc, char *argv[]) { } if (video_stream) { - print_mode("Video", freenect_find_video_mode(FREENECT_RESOLUTION_MEDIUM, FREENECT_VIDEO_RGB)); freenect_set_video_mode(dev, freenect_find_video_mode(FREENECT_RESOLUTION_MEDIUM, FREENECT_VIDEO_RGB)); freenect_start_video(dev); freenect_set_video_callback(dev, handle_video); } if (depth_stream) { - print_mode("Depth", freenect_find_depth_mode(FREENECT_RESOLUTION_MEDIUM, FREENECT_DEPTH_11BIT)); freenect_set_depth_mode(dev, freenect_find_depth_mode(FREENECT_RESOLUTION_MEDIUM, FREENECT_DEPTH_11BIT)); freenect_start_depth(dev); freenect_set_depth_callback(dev, handle_depth); } - while (freenect_process_events(ctx) >= 0) { + signal(SIGPIPE, SIG_IGN); + signal(SIGTERM, stop); + signal(SIGQUIT, stop); + signal(SIGINT, stop); + while (running) { + if (freenect_process_events(ctx) < 0) { + stop(0); + continue; + } if (accel_stream) { freenect_raw_tilt_state* state; freenect_update_tilt_state(dev); @@ -288,8 +295,25 @@ int main(int argc, char *argv[]) { handle_accel(dev, state); } } +} - freenect_stop_depth(dev); - freenect_stop_video(dev); - freenect_close_device(dev); +void cleanup() { + if (dev) { + freenect_stop_depth(dev); + freenect_stop_video(dev); + freenect_close_device(dev); + } + if (ctx) + freenect_shutdown(ctx); + if (video_stream) + fclose(video_stream); + if (depth_stream) + fclose(depth_stream); + if (accel_stream) + fclose(accel_stream); + mpjpg_destroy(&video_encoder); + mpjpg_destroy(&depth_encoder); + fflush(stderr); + if (sd_booted() > 0) sleep(5); /* work around systemd bug dropping log messages */ + _exit(exitcode); } diff --git a/src/multipart-replace-http-server.c b/src/multipart-replace-http-server.c index 0541ff9..bc188b8 100644 --- a/src/multipart-replace-http-server.c +++ b/src/multipart-replace-http-server.c @@ -29,6 +29,8 @@ #include <sys/un.h> #include <unistd.h> +#include <systemd/sd-daemon.h> + #include "util.h" #include "wg.h" #include "multipart-replace.h" @@ -44,23 +46,28 @@ struct httpfile *filev = NULL; struct reader_thread_args { struct multipart_replace_stream *stream; int fd; - const char *filename; - const char *boundary; + char *filename; + char *boundary; }; - +int exitcode = 0; bool running = true; int sock; struct wg wg; void stop(int sig) { - log("Caught %d", sig); - log("STOPPING=1"); + if (sig != 0) + log("Caught %d", sig); + sd_notify(0, "STOPPING=1"); running = false; close(sock); } -#define threaderror(stat, errnum, ...) do { error(0, errnum, __VA_ARGS__); stop(0); } while(0) +#define threaderror(stat, errnum, ...) do { \ + error(0, errnum, __VA_ARGS__); \ + exitcode = stat; \ + stop(0); \ + } while(0) void *reader_thread(void *args_anon) { struct reader_thread_args *args = args_anon; @@ -71,10 +78,12 @@ void *reader_thread(void *args_anon) { pthread_setname_np(pthread_self(), name); debug("starting thread: %s", name); - multipart_replace_reader(args->stream, args->fd, args->boundary); - sleep(5); - threaderror(EXIT_FAILURE, 0, "multipart_replace stream ended: %s", args->filename); - wg_sub(&wg, 1); + int ret = multipart_replace_reader(args->stream, args->fd, args->boundary); + threaderror(ret, 0, "multipart_replace stream ended: %s", args->filename); + free(args->filename); + free(args->boundary); + free(args); + wg_sub(&wg); return NULL; } @@ -83,9 +92,9 @@ void start_multipart_replace_reader(struct multipart_replace_stream *s, int fd, args->stream = s; args->fd = fd; args->filename = strdup(filename); - args->boundary = boundary; + args->boundary = strdup(boundary); pthread_t thread; - wg_add(&wg, 1); + wg_add(&wg); pthread_create(&thread, NULL, reader_thread, args); } @@ -132,13 +141,13 @@ void connection_handler(int fd) { "HTTP/1.1 405 Method Not Allowed\r\n" "Allow: GET\r\n" "\r\n"); - close(fd); + fclose(netstream); return; } char *version = &line_buf[4]; char *path = strsep(&version, " "); if (strcmp(version, "HTTP/1.1\r\n") != 0 && strcmp(version, "HTTP/1.0\r\n")) { - close(fd); + fclose(netstream); return; } path = strdupa(path); @@ -146,6 +155,7 @@ void connection_handler(int fd) { /* run through the rest of the headers */ while (strcmp(line_buf, "\r\n") != 0 && line_len >= 0) line_len = getline(&line_buf, &line_cap, netstream); + free(line_buf); if (strcmp(path, "/") == 0) { log("200 %s", path); @@ -167,7 +177,7 @@ void connection_handler(int fd) { " </ul>\n" "</body>\n" "</html>\n"); - close(fd); + fclose(netstream); return; } struct multipart_replace_stream *vidstream = file_get(path); @@ -187,7 +197,7 @@ void connection_handler(int fd) { "\r\n", boundary); multipart_replace_writer(vidstream, fd, boundary); - close(fd); + fclose(netstream); } void *connection_thread(void *arg_anon) { @@ -196,7 +206,7 @@ void *connection_thread(void *arg_anon) { log("Connection %d opened", fd); connection_handler(fd); log("Connection %d closed", fd); - wg_sub(&wg, 1); + wg_sub(&wg); return NULL; } @@ -323,7 +333,8 @@ void usage() { void cleanup() { fflush(stderr); - sleep(5); /* work around systemd bug dropping log messages */ + if (sd_booted() > 0) sleep(5); /* work around systemd bug dropping log messages */ + _exit(exitcode); } pthread_t main_thread; @@ -372,12 +383,14 @@ int main(int argc, char *argv[]) { if (conn < 0) continue; pthread_t thread; - wg_add(&wg, 1); + wg_add(&wg); pthread_create(&thread, NULL, connection_thread, (void*)(intptr_t)conn); } wg_wait(&wg); - for (size_t i = 0; i < filec; i++) + for (size_t i = 0; i < filec; i++) { destroy_multipart_replace_stream(filev[i].stream); + free(filev[i].stream); + } free(filev); return 0; } diff --git a/src/multipart-replace.c b/src/multipart-replace.c index b1fe662..fa893f2 100644 --- a/src/multipart-replace.c +++ b/src/multipart-replace.c @@ -27,7 +27,25 @@ #include "multipart-replace.h" #include "util.h" -#define error(stat, errnum, ...) do { error(0, errnum, __VA_ARGS__); goto end; } while(0) +#define error(stat, errnum, ...) do { \ + error(0, errnum, __VA_ARGS__); \ + ret = stat; \ + goto end; \ + } while(0) +#define stdioerror(stream, name) do { \ + if (feof(stream)) { \ + error(0, 0, "%s: EOF", name); \ + } else { \ + int err = ferror(stream); \ + if (err != 0) { \ + error(0, err, "%s", name); \ + } else { \ + error(0, errno, "%s: umm... what? Not EOF, but also not error. Here's errno", name); \ + } \ + ret = EXIT_FAILURE; \ + } \ + goto end; \ + } while(0) static char *boundary_line(const char *old) { @@ -57,7 +75,8 @@ ssize_t safe_atoi(char *str) { return atoi(str); } -void multipart_replace_reader(struct multipart_replace_stream *s, int fd, const char *_boundary) { +int multipart_replace_reader(struct multipart_replace_stream *s, int fd, const char *_boundary) { + int ret = 0; FILE *stream = fdopen(fd, "r"); char *boundary = boundary_line(_boundary); @@ -66,33 +85,23 @@ void multipart_replace_reader(struct multipart_replace_stream *s, int fd, const ssize_t line_len = 0; ssize_t content_length = -1; - bool first = true; while (running) { content_length = -1; /* scan for the first non-empty line */ do { line_len = getline(&line_buf, &line_cap, stream); if (line_len < 0) - error(EXIT_FAILURE, ferror(stream), "source hung up"); + stdioerror(stream, "src"); } while (strcmp(line_buf, "\r\n") == 0); /* make sure it matches the boundary separator */ - if (first) { - while (strcmp(line_buf, boundary) != 0) { - line_len = getline(&line_buf, &line_cap, stream); - if (line_len < 0) - error(EXIT_FAILURE, ferror(stream), "source hung up"); - } - first = false; - } else { - if (strcmp(line_buf, boundary) != 0) - error(EXIT_FAILURE, 0, "line does not match boundary: \"%s\"", line_buf); - } + if (strcmp(line_buf, boundary) != 0) + error(EXIT_FAILURE, 0, "line does not match boundary: \"%s\"", line_buf); /* read the frame header (MIME headers) */ s->back->len = 0; do { line_len = getline(&line_buf, &line_cap, stream); if (line_len < 0) - error(EXIT_FAILURE, ferror(stream), "source hung up"); + stdioerror(stream, "src"); /* append the line to the frame contents */ if ((ssize_t)s->back->cap < s->back->len + line_len) s->back->buf = xrealloc(s->back->buf, s->back->cap = s->back->len + line_len); @@ -110,7 +119,7 @@ void multipart_replace_reader(struct multipart_replace_stream *s, int fd, const if ((ssize_t)s->back->cap < s->back->len + content_length) s->back->buf = xrealloc(s->back->buf, s->back->cap = s->back->len + content_length); if (fread(&s->back->buf[s->back->len], content_length, 1, stream) != 1) - error(EXIT_FALURE, ferror(stream), "fread(%zd)", s->back->len); + stdioerror(stream, "src"); s->back->len += content_length; /* swap the frames */ @@ -123,10 +132,13 @@ void multipart_replace_reader(struct multipart_replace_stream *s, int fd, const } end: free(boundary); + free(line_buf); fclose(stream); + return ret; } -void multipart_replace_writer(struct multipart_replace_stream *s, int fd, const char *_boundary) { +int multipart_replace_writer(struct multipart_replace_stream *s, int fd, const char *_boundary) { + int ret = 0; FILE *stream = fdopen(fd, "w"); struct frame myframe = { 0 }; long lastframe = 0; @@ -145,12 +157,12 @@ void multipart_replace_writer(struct multipart_replace_stream *s, int fd, const /* send the frame to the client */ if (fwrite(boundary, boundary_len, 1, stream) < 1) - error(EXIT_FAILURE, ferror(stream), "fwrite(boundary)"); + stdioerror(stream, "dst <- boundary"); if (fwrite(myframe.buf, myframe.len, 1, stream) < 1) - error(EXIT_FAILURE, ferror(stream), "fwrite(frame.buf)"); + stdioerror(stream, "dst <- frame"); /* send a blank line for pleasantness */ if (fwrite("\r\n", 2, 1, stream) < 1) - error(EXIT_FAILURE, ferror(stream), "fwrite(\"\\r\\n\")"); + stdioerror(stream, "dst <- nl"); fflush(stream); /* poll until there's a new frame */ @@ -166,6 +178,7 @@ void multipart_replace_writer(struct multipart_replace_stream *s, int fd, const free(boundary); free(myframe.buf); fclose(stream); + return ret; } void init_multipart_replace_stream(struct multipart_replace_stream *s) { diff --git a/src/multipart-replace.h b/src/multipart-replace.h index 502cecc..e4cced0 100644 --- a/src/multipart-replace.h +++ b/src/multipart-replace.h @@ -34,8 +34,8 @@ struct multipart_replace_stream { long framecount; }; -void multipart_replace_reader(struct multipart_replace_stream *s, int fd, const char *boundary); -void multipart_replace_writer(struct multipart_replace_stream *s, int fd, const char *boundary); +int multipart_replace_reader(struct multipart_replace_stream *s, int fd, const char *boundary); +int multipart_replace_writer(struct multipart_replace_stream *s, int fd, const char *boundary); void init_multipart_replace_stream(struct multipart_replace_stream *s); void destroy_multipart_replace_stream(struct multipart_replace_stream *s); @@ -26,37 +26,75 @@ /* pthread_cond_t is overly complicated. Just use a self-pipe. */ +static +void *wg_gc(void *wg_anon) { + struct wg *wg = wg_anon; + + pthread_t thread; + while (1) { + ssize_t r = read(wg->fd_threads[0], &thread, sizeof(thread)); + if (r < 0) { + if (errno == EINTR) + continue; + error(EXIT_FAILURE, errno, "wg_gc: read"); + } else if ((size_t)r < sizeof(thread)) { + error(EXIT_FAILURE, 0, "wg_gc: read: only read %zd/%zu", r, sizeof(thread)); + } + + pthread_join(thread, NULL); + + int p; + if ((p = pthread_mutex_lock(&wg->lock)) != 0) + error(EXIT_FAILURE, p, "wg_gc: pthread_mutex_lock"); + wg->count--; + if (wg->count == 0) { + if ((p = pthread_mutex_unlock(&wg->lock)) != 0) + error(EXIT_FAILURE, p, "wg_gc: pthread_mutex_unlock"); + return NULL; + } + if ((p = pthread_mutex_unlock(&wg->lock)) != 0) + error(EXIT_FAILURE, p, "wg_gc: pthread_mutex_unlock"); + } +} + void wg_init(struct wg *wg) { wg->count = 0; - pthread_mutex_init(&wg->lock, NULL); - int fds[2]; - if (pipe(fds) != 0) - error(EXIT_FAILURE, errno, "pipe"); - wg->fd_wait = fds[0]; - wg->fd_signal = fds[1]; + int r; + if ((r = pthread_mutex_init(&wg->lock, NULL)) != 0) + error(EXIT_FAILURE, r, "wg_init: pthread_mutex_init"); + if (pipe(wg->fd_threads) != 0) + error(EXIT_FAILURE, errno, "wg_init: pipe"); + if ((r = pthread_create(&wg->gc, NULL, wg_gc, (void*)wg)) != 0) + error(EXIT_FAILURE, r, "wg_init: pthread_create"); } -void wg_add(struct wg *wg, unsigned int n) { - pthread_mutex_lock(&wg->lock); - wg->count += n; - pthread_mutex_unlock(&wg->lock); +void wg_add(struct wg *wg) { + int r; + if ((r = pthread_mutex_lock(&wg->lock)) != 0) + error(EXIT_FAILURE, r, "wg_add: pthread_mutex_lock"); + wg->count++; + if ((r = pthread_mutex_unlock(&wg->lock)) != 0) + error(EXIT_FAILURE, r, "wg_add: pthread_mutex_unlock"); } -void wg_sub(struct wg *wg, unsigned int n) { - pthread_mutex_lock(&wg->lock); - wg->count -= n; - if (wg->count == 0) - if (write(wg->fd_signal, " ", 1) < 1) - error(EXIT_FAILURE, errno, "write"); - pthread_mutex_unlock(&wg->lock); +void wg_sub(struct wg *wg) { + pthread_t thread = pthread_self(); + ssize_t r = write(wg->fd_threads[1], &thread, sizeof(thread)); + if (r < 0) { + error(EXIT_FAILURE, errno, "wg_sub: write"); + } else if ((size_t)r < sizeof(thread)) { + error(EXIT_FAILURE, 0, "wg_sub: only wrote %zd/%zu", r, sizeof(thread)); + } } void wg_wait(struct wg *wg) { - char b; - retry: - if (read(wg->fd_wait, &b, 1) == -1) { - if (errno == EINTR) - goto retry; - error(EXIT_FAILURE, errno, "wg_wait"); - } + int r; + if ((r = pthread_join(wg->gc, NULL)) != 0) + error(EXIT_FAILURE, r, "wg_wait: pthread_join"); + if (close(wg->fd_threads[1]) != 0) + error(EXIT_FAILURE, errno, "wg_wait: close(fd_threads[1])"); + if (close(wg->fd_threads[0]) != 0) + error(EXIT_FAILURE, errno, "wg_wait: close(fd_threads[0])"); + if ((r = pthread_mutex_destroy(&wg->lock)) != 0) + error(EXIT_FAILURE, r, "wg_wait: pthread_mutex_destroy"); } @@ -21,18 +21,17 @@ #include <pthread.h> -/* Thread management tools modeled on https://golang.org/pkg/sync/#WaitGroup */ - -/* pthread_cond_t is overly complicated. Just use a self-pipe. */ +/* When you call wg_wait, the waitgroup is destroyed. You must + * re-wg_init it if you want to reuse it. */ struct wg { int count; pthread_mutex_t lock; - int fd_wait; - int fd_signal; + int fd_threads[2]; + pthread_t gc; }; void wg_init(struct wg *); -void wg_add(struct wg *, unsigned int); -void wg_sub(struct wg *, unsigned int); +void wg_add(struct wg *); +void wg_sub(struct wg *); void wg_wait(struct wg*); diff --git a/systemd/freenect-server-http.service.in b/systemd/freenect-server-http.service.in index 8bb5b44..2aa97d7 100644 --- a/systemd/freenect-server-http.service.in +++ b/systemd/freenect-server-http.service.in @@ -17,7 +17,6 @@ Description=Kinect HTTP media streamer After=network.target Requires=freenect-server-http.socket freenect-server.service -PartOf=freenect-server.service After=freenect-server.service [Service] diff --git a/systemd/freenect-server.service.in b/systemd/freenect-server.service.in index c6a719e..c0417b6 100644 --- a/systemd/freenect-server.service.in +++ b/systemd/freenect-server.service.in @@ -15,13 +15,11 @@ [Unit] Description=Kinect media streamer backend -After=network.target -Requires=freenect-server@accel.mjson.socket freenect-server@depth.mjpg.socket freenect-server@video.mjpg.socket [Service] Type=simple User=@user@ Group=@group@ -ExecStart=@bindir@/freenect-server -V systemd:freenect-server@video.mjpg.socket -D systemd:freenect-server@depth.mjpg.socket -A systemd:freenect-server@accel.mjson.socket - -Restart=always +RuntimeDirectory=freenect-server +ExecStartPre=/usr/bin/mkfifo /run/freenect-server/video.mjpg /run/freenect-server/depth.mjpg /run/freenect-server/accel.mjson +ExecStart=@bindir@/freenect-server -v /run/freenect-server/video.mjpg -d /run/freenect-server/depth.mjpg -a /run/freenect-server/accel.mjson diff --git a/systemd/freenect-server@.socket.in b/systemd/freenect-server@.socket.in deleted file mode 100644 index 8be3205..0000000 --- a/systemd/freenect-server@.socket.in +++ /dev/null @@ -1,25 +0,0 @@ -# Copyright (C) 2016 Luke Shumaker -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see <http://www.gnu.org/licenses/>. - -[Unit] -Description=Kinect media streamer backend %I stream -After=network.target - -[Socket] -SocketUser=@user@ -SocketGroup=@group@ -ListenFIFO=/run/freenect-server/%I -Service=freenect-server.service -RemoveOnStop=true
\ No newline at end of file |