summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLuke Shumaker <lukeshu@sbcglobal.net>2016-04-12 00:01:27 -0400
committerLuke Shumaker <lukeshu@sbcglobal.net>2016-04-12 00:01:27 -0400
commit24de3887e5e3bba2b8a4f60d1224ac927b98b1ed (patch)
tree20a27018a7fae78205e0bfee7fac4db4c82c0dea
parentc8f6735fce3d7ba5aa8019775d26b6999ed5a344 (diff)
stuff
- fix memory leaks (except for those from libusb) - fix graceful shutdown - fix systemd weirdness
-rw-r--r--Makefile7
-rw-r--r--src/freenect-server.c130
-rw-r--r--src/multipart-replace-http-server.c53
-rw-r--r--src/multipart-replace.c55
-rw-r--r--src/multipart-replace.h4
-rw-r--r--src/wg.c86
-rw-r--r--src/wg.h13
-rw-r--r--systemd/freenect-server-http.service.in1
-rw-r--r--systemd/freenect-server.service.in8
-rw-r--r--systemd/freenect-server@.socket.in25
10 files changed, 220 insertions, 162 deletions
diff --git a/Makefile b/Makefile
index b17dec3..1e2e3bd 100644
--- a/Makefile
+++ b/Makefile
@@ -40,7 +40,7 @@ all: build
include config.mk
progs = freenect-server multipart-replace-http-server
-units = freenect-server-http.service freenect-server-http.socket freenect-server.service freenect-server@.socket
+units = freenect-server-http.service freenect-server-http.socket freenect-server.service
build: $(addprefix src/,$(progs)) $(addprefix systemd/,$(units))
.PHONY: build
@@ -53,12 +53,11 @@ CPPFLAGS += -MD -MF $(@D)/$(@F:%.o=.%.mk) -MP
-include src/.*.mk
# Dependencies that don't get figured out automatically
-src/freenect-server: src/util.o -lfreenect -lusb-1.0 -ljpeg
-src/multipart-replace-http-server: src/util.o src/wg.o src/multipart-replace.o -lpthread
+src/freenect-server: src/util.o -lsystemd -lfreenect -lusb-1.0 -ljpeg
+src/multipart-replace-http-server: src/util.o src/wg.o src/multipart-replace.o -lsystemd -lpthread
systemd/freenect-server-http.service: .var.user .var.group .var.bindir
systemd/freenect-server-http.socket: .var.httpstream
systemd/freenect-server.service: .var.user .var.group .var.bindir
-systemd/freenect-server@.socket: .var.user .var.group
# Rules to make things
COPYING.GPL3:
diff --git a/src/freenect-server.c b/src/freenect-server.c
index a36701c..286455b 100644
--- a/src/freenect-server.c
+++ b/src/freenect-server.c
@@ -24,10 +24,13 @@
#include <errno.h>
#include <error.h>
#include <inttypes.h>
+#include <stdbool.h>
#include <stdio.h>
-#include <stdlib.h> /* atexit */
+#include <stdlib.h> /* atexit, EXIT_FAILURE */
#include <unistd.h> /* dup2 */
+#include <systemd/sd-daemon.h>
+
#include <libfreenect/libfreenect.h>
#include <libusb-1.0/libusb.h>
@@ -35,10 +38,12 @@
#include "util.h"
-FILE *depth_stream = NULL;
-FILE *video_stream = NULL;
-FILE *accel_stream = NULL;
-freenect_context *ctx;
+void stop(int sig);
+#define threaderror(stat, errnum, ...) do { \
+ error(0, errnum, __VA_ARGS__); \
+ exitcode = stat; \
+ stop(0); \
+ } while(0)
struct mpjpg_encoder {
struct jpeg_error_mgr jpeg_encoder_err;
@@ -46,10 +51,17 @@ struct mpjpg_encoder {
unsigned char *jpeg_buf;
unsigned long jpeg_len;
};
-
+
+int exitcode = 0;
+bool running = true;
+FILE *depth_stream = NULL;
+FILE *video_stream = NULL;
+FILE *accel_stream = NULL;
+freenect_context *ctx = NULL;
+freenect_device *dev = NULL;
struct mpjpg_encoder depth_encoder;
struct mpjpg_encoder video_encoder;
-
+
void mpjpg_init(struct mpjpg_encoder *e) {
e->jpeg_encoder.err = jpeg_std_error(&e->jpeg_encoder_err);
jpeg_create_compress(&e->jpeg_encoder);
@@ -63,7 +75,12 @@ void mpjpg_init(struct mpjpg_encoder *e) {
jpeg_mem_dest(&e->jpeg_encoder, &e->jpeg_buf, &e->jpeg_len);
}
-void write_imageframe(FILE *stream, struct mpjpg_encoder *e, JSAMPLE *data) {
+void mpjpg_destroy(struct mpjpg_encoder *e) {
+ jpeg_destroy_compress(&e->jpeg_encoder);
+ free(e->jpeg_buf);
+}
+
+void write_imageframe(const char *name, FILE *stream, struct mpjpg_encoder *e, JSAMPLE *data) {
JSAMPLE *row_pointers[480];
for (size_t i = 0; i < 480; i++)
row_pointers[i] = &data[i*640*3];
@@ -79,14 +96,14 @@ void write_imageframe(FILE *stream, struct mpjpg_encoder *e, JSAMPLE *data) {
"Content-length: %lu\r\n"
"\r\n",
e->jpeg_len) < 1)
- error(EXIT_FAILURE, ferror(stream), "writing header");
+ threaderror(EXIT_FAILURE, ferror(stream), "%s: writing header", name);
if (fwrite(e->jpeg_buf, e->jpeg_len, 1, stream) < 1)
- error(EXIT_FAILURE, ferror(stream), "writing body");
+ threaderror(EXIT_FAILURE, ferror(stream), "%s: writing body", name);
if (fwrite("\r\n", 2, 1, stream) < 1)
- error(EXIT_FAILURE, ferror(stream), "writing footer");
+ threaderror(EXIT_FAILURE, ferror(stream), "%s: writing footer", name);
fflush(stream);
}
-
+
void handle_accel(freenect_device *dev UNUSED, freenect_raw_tilt_state* data) {
double x, y, z, angle;
freenect_get_mks_accel(data, &x, &y, &z);
@@ -116,14 +133,16 @@ void handle_accel(freenect_device *dev UNUSED, freenect_raw_tilt_state* data) {
" \"motor\": \"%s\" }\n",
x, y, z, angle, motor);
if (len < 0)
- error(EXIT_FAILURE, errno, "asprintf");
- fprintf(accel_stream,
+ threaderror(EXIT_FAILURE, errno, "accel.mjson: asprintf");
+ if (fprintf(accel_stream,
"--ffserver\r\n"
"Content-type: application/json\r\n"
"Content-length: %d\r\n"
"\r\n"
"%s\r\n",
- len, json);
+ len, json) < 0)
+ threaderror(EXIT_FAILURE, ferror(accel_stream), "accel.mjson: write");
+ free(json);
fflush(accel_stream);
}
@@ -131,46 +150,28 @@ uint8_t depth_rgb[640*480*3];
void handle_depth(freenect_device *dev, void *depth, uint32_t timestamp UNUSED) {
uint32_t size = freenect_get_current_depth_mode(dev).bytes;
if (size != 640*480*2)
- error(EXIT_FAILURE, 0, "handle_depth: expected 640*480*2 byte frame, but got %"PRId32, size);
+ threaderror(EXIT_FAILURE, 0, "handle_depth: expected 640*480*2 byte frame, but got %"PRId32, size);
/* scale the 11-bit values into 8-bit values */
uint16_t *depth_grey = depth;
for (size_t i = 0; i < 640*480; i++)
depth_rgb[i*3+0] = depth_rgb[i*3+1] = depth_rgb[i*3+2] = (uint8_t)(depth_grey[i]*8.0/11.0);
/* write the image */
- write_imageframe(depth_stream, &depth_encoder, depth_rgb);
+ write_imageframe("depth.mjpg", depth_stream, &depth_encoder, depth_rgb);
}
void handle_video(freenect_device *dev, void *rgb, uint32_t timestamp UNUSED) {
uint32_t size = freenect_get_current_video_mode(dev).bytes;
if (size != 640*480*3)
- error(EXIT_FAILURE, 0, "handle_video: expected 640*480*3 byte frame, but got %"PRId32, size);
+ threaderror(EXIT_FAILURE, 0, "handle_video: expected 640*480*3 byte frame, but got %"PRId32, size);
/* write the image */
- write_imageframe(video_stream, &video_encoder, rgb);
-}
-
-void print_mode(const char *name, freenect_frame_mode mode) {
- /* This is just a courtesy function to let the user know the mode
- if it becomes a bother for maintainability just comment out the
- code in its body. It will only break if struct entries go missing.
- */
- printf("%s Mode: {%d, %d, {%d}, %d, %d, %d, %d, %d, %d, %d}\n", name,
- mode.reserved, (int)mode.resolution, (int)mode.video_format, mode.bytes, mode.width,
- mode.height, mode.data_bits_per_pixel, mode.padding_bits_per_pixel,
- mode.framerate, mode.is_valid);
+ write_imageframe("video.mjpg", video_stream, &video_encoder, rgb);
}
-
-void cleanup() {
- log("STOPPING=1");
- if (ctx)
- freenect_shutdown(ctx);
- if (video_stream)
- fclose(video_stream);
- if (depth_stream)
- fclose(depth_stream);
- if (accel_stream)
- fclose(accel_stream);
- fflush(stderr);
- sleep(5); /* work around systemd bug dropping log messages */
+
+void stop(int sig) {
+ if (sig != 0)
+ log("Caught %d", sig);
+ sd_notify(0, "STOPPING=1");
+ running = false;
}
FILE *xfopen(const char *path, const char *mode) {
@@ -194,10 +195,13 @@ void usage() {
printf("Usage: %s [-h] [-v video.mjpg|-V video_fd] [-d depth.mjpg|-D depth_fd] [-a accel.mjson|-A accel_fd]\n", program_invocation_name);
}
+void cleanup(void);
+
int main(int argc, char *argv[]) {
int res = 0;
- freenect_device *dev;
+ mpjpg_init(&video_encoder);
+ mpjpg_init(&depth_encoder);
atexit(cleanup);
for (int i = 1; i < argc; i++) {
@@ -247,9 +251,6 @@ int main(int argc, char *argv[]) {
return EXIT_FAILURE;
}
- mpjpg_init(&depth_encoder);
- mpjpg_init(&video_encoder);
-
res = freenect_init(&ctx, 0);
if (res) {
error(EXIT_FAILURE, 0, "freenect_init: %s", libusb_strerror(res));
@@ -267,20 +268,26 @@ int main(int argc, char *argv[]) {
}
if (video_stream) {
- print_mode("Video", freenect_find_video_mode(FREENECT_RESOLUTION_MEDIUM, FREENECT_VIDEO_RGB));
freenect_set_video_mode(dev, freenect_find_video_mode(FREENECT_RESOLUTION_MEDIUM, FREENECT_VIDEO_RGB));
freenect_start_video(dev);
freenect_set_video_callback(dev, handle_video);
}
if (depth_stream) {
- print_mode("Depth", freenect_find_depth_mode(FREENECT_RESOLUTION_MEDIUM, FREENECT_DEPTH_11BIT));
freenect_set_depth_mode(dev, freenect_find_depth_mode(FREENECT_RESOLUTION_MEDIUM, FREENECT_DEPTH_11BIT));
freenect_start_depth(dev);
freenect_set_depth_callback(dev, handle_depth);
}
- while (freenect_process_events(ctx) >= 0) {
+ signal(SIGPIPE, SIG_IGN);
+ signal(SIGTERM, stop);
+ signal(SIGQUIT, stop);
+ signal(SIGINT, stop);
+ while (running) {
+ if (freenect_process_events(ctx) < 0) {
+ stop(0);
+ continue;
+ }
if (accel_stream) {
freenect_raw_tilt_state* state;
freenect_update_tilt_state(dev);
@@ -288,8 +295,25 @@ int main(int argc, char *argv[]) {
handle_accel(dev, state);
}
}
+}
- freenect_stop_depth(dev);
- freenect_stop_video(dev);
- freenect_close_device(dev);
+void cleanup() {
+ if (dev) {
+ freenect_stop_depth(dev);
+ freenect_stop_video(dev);
+ freenect_close_device(dev);
+ }
+ if (ctx)
+ freenect_shutdown(ctx);
+ if (video_stream)
+ fclose(video_stream);
+ if (depth_stream)
+ fclose(depth_stream);
+ if (accel_stream)
+ fclose(accel_stream);
+ mpjpg_destroy(&video_encoder);
+ mpjpg_destroy(&depth_encoder);
+ fflush(stderr);
+ if (sd_booted() > 0) sleep(5); /* work around systemd bug dropping log messages */
+ _exit(exitcode);
}
diff --git a/src/multipart-replace-http-server.c b/src/multipart-replace-http-server.c
index 0541ff9..bc188b8 100644
--- a/src/multipart-replace-http-server.c
+++ b/src/multipart-replace-http-server.c
@@ -29,6 +29,8 @@
#include <sys/un.h>
#include <unistd.h>
+#include <systemd/sd-daemon.h>
+
#include "util.h"
#include "wg.h"
#include "multipart-replace.h"
@@ -44,23 +46,28 @@ struct httpfile *filev = NULL;
struct reader_thread_args {
struct multipart_replace_stream *stream;
int fd;
- const char *filename;
- const char *boundary;
+ char *filename;
+ char *boundary;
};
-
+int exitcode = 0;
bool running = true;
int sock;
struct wg wg;
void stop(int sig) {
- log("Caught %d", sig);
- log("STOPPING=1");
+ if (sig != 0)
+ log("Caught %d", sig);
+ sd_notify(0, "STOPPING=1");
running = false;
close(sock);
}
-#define threaderror(stat, errnum, ...) do { error(0, errnum, __VA_ARGS__); stop(0); } while(0)
+#define threaderror(stat, errnum, ...) do { \
+ error(0, errnum, __VA_ARGS__); \
+ exitcode = stat; \
+ stop(0); \
+ } while(0)
void *reader_thread(void *args_anon) {
struct reader_thread_args *args = args_anon;
@@ -71,10 +78,12 @@ void *reader_thread(void *args_anon) {
pthread_setname_np(pthread_self(), name);
debug("starting thread: %s", name);
- multipart_replace_reader(args->stream, args->fd, args->boundary);
- sleep(5);
- threaderror(EXIT_FAILURE, 0, "multipart_replace stream ended: %s", args->filename);
- wg_sub(&wg, 1);
+ int ret = multipart_replace_reader(args->stream, args->fd, args->boundary);
+ threaderror(ret, 0, "multipart_replace stream ended: %s", args->filename);
+ free(args->filename);
+ free(args->boundary);
+ free(args);
+ wg_sub(&wg);
return NULL;
}
@@ -83,9 +92,9 @@ void start_multipart_replace_reader(struct multipart_replace_stream *s, int fd,
args->stream = s;
args->fd = fd;
args->filename = strdup(filename);
- args->boundary = boundary;
+ args->boundary = strdup(boundary);
pthread_t thread;
- wg_add(&wg, 1);
+ wg_add(&wg);
pthread_create(&thread, NULL, reader_thread, args);
}
@@ -132,13 +141,13 @@ void connection_handler(int fd) {
"HTTP/1.1 405 Method Not Allowed\r\n"
"Allow: GET\r\n"
"\r\n");
- close(fd);
+ fclose(netstream);
return;
}
char *version = &line_buf[4];
char *path = strsep(&version, " ");
if (strcmp(version, "HTTP/1.1\r\n") != 0 && strcmp(version, "HTTP/1.0\r\n")) {
- close(fd);
+ fclose(netstream);
return;
}
path = strdupa(path);
@@ -146,6 +155,7 @@ void connection_handler(int fd) {
/* run through the rest of the headers */
while (strcmp(line_buf, "\r\n") != 0 && line_len >= 0)
line_len = getline(&line_buf, &line_cap, netstream);
+ free(line_buf);
if (strcmp(path, "/") == 0) {
log("200 %s", path);
@@ -167,7 +177,7 @@ void connection_handler(int fd) {
" </ul>\n"
"</body>\n"
"</html>\n");
- close(fd);
+ fclose(netstream);
return;
}
struct multipart_replace_stream *vidstream = file_get(path);
@@ -187,7 +197,7 @@ void connection_handler(int fd) {
"\r\n",
boundary);
multipart_replace_writer(vidstream, fd, boundary);
- close(fd);
+ fclose(netstream);
}
void *connection_thread(void *arg_anon) {
@@ -196,7 +206,7 @@ void *connection_thread(void *arg_anon) {
log("Connection %d opened", fd);
connection_handler(fd);
log("Connection %d closed", fd);
- wg_sub(&wg, 1);
+ wg_sub(&wg);
return NULL;
}
@@ -323,7 +333,8 @@ void usage() {
void cleanup() {
fflush(stderr);
- sleep(5); /* work around systemd bug dropping log messages */
+ if (sd_booted() > 0) sleep(5); /* work around systemd bug dropping log messages */
+ _exit(exitcode);
}
pthread_t main_thread;
@@ -372,12 +383,14 @@ int main(int argc, char *argv[]) {
if (conn < 0)
continue;
pthread_t thread;
- wg_add(&wg, 1);
+ wg_add(&wg);
pthread_create(&thread, NULL, connection_thread, (void*)(intptr_t)conn);
}
wg_wait(&wg);
- for (size_t i = 0; i < filec; i++)
+ for (size_t i = 0; i < filec; i++) {
destroy_multipart_replace_stream(filev[i].stream);
+ free(filev[i].stream);
+ }
free(filev);
return 0;
}
diff --git a/src/multipart-replace.c b/src/multipart-replace.c
index b1fe662..fa893f2 100644
--- a/src/multipart-replace.c
+++ b/src/multipart-replace.c
@@ -27,7 +27,25 @@
#include "multipart-replace.h"
#include "util.h"
-#define error(stat, errnum, ...) do { error(0, errnum, __VA_ARGS__); goto end; } while(0)
+#define error(stat, errnum, ...) do { \
+ error(0, errnum, __VA_ARGS__); \
+ ret = stat; \
+ goto end; \
+ } while(0)
+#define stdioerror(stream, name) do { \
+ if (feof(stream)) { \
+ error(0, 0, "%s: EOF", name); \
+ } else { \
+ int err = ferror(stream); \
+ if (err != 0) { \
+ error(0, err, "%s", name); \
+ } else { \
+ error(0, errno, "%s: umm... what? Not EOF, but also not error. Here's errno", name); \
+ } \
+ ret = EXIT_FAILURE; \
+ } \
+ goto end; \
+ } while(0)
static
char *boundary_line(const char *old) {
@@ -57,7 +75,8 @@ ssize_t safe_atoi(char *str) {
return atoi(str);
}
-void multipart_replace_reader(struct multipart_replace_stream *s, int fd, const char *_boundary) {
+int multipart_replace_reader(struct multipart_replace_stream *s, int fd, const char *_boundary) {
+ int ret = 0;
FILE *stream = fdopen(fd, "r");
char *boundary = boundary_line(_boundary);
@@ -66,33 +85,23 @@ void multipart_replace_reader(struct multipart_replace_stream *s, int fd, const
ssize_t line_len = 0;
ssize_t content_length = -1;
- bool first = true;
while (running) {
content_length = -1;
/* scan for the first non-empty line */
do {
line_len = getline(&line_buf, &line_cap, stream);
if (line_len < 0)
- error(EXIT_FAILURE, ferror(stream), "source hung up");
+ stdioerror(stream, "src");
} while (strcmp(line_buf, "\r\n") == 0);
/* make sure it matches the boundary separator */
- if (first) {
- while (strcmp(line_buf, boundary) != 0) {
- line_len = getline(&line_buf, &line_cap, stream);
- if (line_len < 0)
- error(EXIT_FAILURE, ferror(stream), "source hung up");
- }
- first = false;
- } else {
- if (strcmp(line_buf, boundary) != 0)
- error(EXIT_FAILURE, 0, "line does not match boundary: \"%s\"", line_buf);
- }
+ if (strcmp(line_buf, boundary) != 0)
+ error(EXIT_FAILURE, 0, "line does not match boundary: \"%s\"", line_buf);
/* read the frame header (MIME headers) */
s->back->len = 0;
do {
line_len = getline(&line_buf, &line_cap, stream);
if (line_len < 0)
- error(EXIT_FAILURE, ferror(stream), "source hung up");
+ stdioerror(stream, "src");
/* append the line to the frame contents */
if ((ssize_t)s->back->cap < s->back->len + line_len)
s->back->buf = xrealloc(s->back->buf, s->back->cap = s->back->len + line_len);
@@ -110,7 +119,7 @@ void multipart_replace_reader(struct multipart_replace_stream *s, int fd, const
if ((ssize_t)s->back->cap < s->back->len + content_length)
s->back->buf = xrealloc(s->back->buf, s->back->cap = s->back->len + content_length);
if (fread(&s->back->buf[s->back->len], content_length, 1, stream) != 1)
- error(EXIT_FALURE, ferror(stream), "fread(%zd)", s->back->len);
+ stdioerror(stream, "src");
s->back->len += content_length;
/* swap the frames */
@@ -123,10 +132,13 @@ void multipart_replace_reader(struct multipart_replace_stream *s, int fd, const
}
end:
free(boundary);
+ free(line_buf);
fclose(stream);
+ return ret;
}
-void multipart_replace_writer(struct multipart_replace_stream *s, int fd, const char *_boundary) {
+int multipart_replace_writer(struct multipart_replace_stream *s, int fd, const char *_boundary) {
+ int ret = 0;
FILE *stream = fdopen(fd, "w");
struct frame myframe = { 0 };
long lastframe = 0;
@@ -145,12 +157,12 @@ void multipart_replace_writer(struct multipart_replace_stream *s, int fd, const
/* send the frame to the client */
if (fwrite(boundary, boundary_len, 1, stream) < 1)
- error(EXIT_FAILURE, ferror(stream), "fwrite(boundary)");
+ stdioerror(stream, "dst <- boundary");
if (fwrite(myframe.buf, myframe.len, 1, stream) < 1)
- error(EXIT_FAILURE, ferror(stream), "fwrite(frame.buf)");
+ stdioerror(stream, "dst <- frame");
/* send a blank line for pleasantness */
if (fwrite("\r\n", 2, 1, stream) < 1)
- error(EXIT_FAILURE, ferror(stream), "fwrite(\"\\r\\n\")");
+ stdioerror(stream, "dst <- nl");
fflush(stream);
/* poll until there's a new frame */
@@ -166,6 +178,7 @@ void multipart_replace_writer(struct multipart_replace_stream *s, int fd, const
free(boundary);
free(myframe.buf);
fclose(stream);
+ return ret;
}
void init_multipart_replace_stream(struct multipart_replace_stream *s) {
diff --git a/src/multipart-replace.h b/src/multipart-replace.h
index 502cecc..e4cced0 100644
--- a/src/multipart-replace.h
+++ b/src/multipart-replace.h
@@ -34,8 +34,8 @@ struct multipart_replace_stream {
long framecount;
};
-void multipart_replace_reader(struct multipart_replace_stream *s, int fd, const char *boundary);
-void multipart_replace_writer(struct multipart_replace_stream *s, int fd, const char *boundary);
+int multipart_replace_reader(struct multipart_replace_stream *s, int fd, const char *boundary);
+int multipart_replace_writer(struct multipart_replace_stream *s, int fd, const char *boundary);
void init_multipart_replace_stream(struct multipart_replace_stream *s);
void destroy_multipart_replace_stream(struct multipart_replace_stream *s);
diff --git a/src/wg.c b/src/wg.c
index c326f53..4e673b4 100644
--- a/src/wg.c
+++ b/src/wg.c
@@ -26,37 +26,75 @@
/* pthread_cond_t is overly complicated. Just use a self-pipe. */
+static
+void *wg_gc(void *wg_anon) {
+ struct wg *wg = wg_anon;
+
+ pthread_t thread;
+ while (1) {
+ ssize_t r = read(wg->fd_threads[0], &thread, sizeof(thread));
+ if (r < 0) {
+ if (errno == EINTR)
+ continue;
+ error(EXIT_FAILURE, errno, "wg_gc: read");
+ } else if ((size_t)r < sizeof(thread)) {
+ error(EXIT_FAILURE, 0, "wg_gc: read: only read %zd/%zu", r, sizeof(thread));
+ }
+
+ pthread_join(thread, NULL);
+
+ int p;
+ if ((p = pthread_mutex_lock(&wg->lock)) != 0)
+ error(EXIT_FAILURE, p, "wg_gc: pthread_mutex_lock");
+ wg->count--;
+ if (wg->count == 0) {
+ if ((p = pthread_mutex_unlock(&wg->lock)) != 0)
+ error(EXIT_FAILURE, p, "wg_gc: pthread_mutex_unlock");
+ return NULL;
+ }
+ if ((p = pthread_mutex_unlock(&wg->lock)) != 0)
+ error(EXIT_FAILURE, p, "wg_gc: pthread_mutex_unlock");
+ }
+}
+
void wg_init(struct wg *wg) {
wg->count = 0;
- pthread_mutex_init(&wg->lock, NULL);
- int fds[2];
- if (pipe(fds) != 0)
- error(EXIT_FAILURE, errno, "pipe");
- wg->fd_wait = fds[0];
- wg->fd_signal = fds[1];
+ int r;
+ if ((r = pthread_mutex_init(&wg->lock, NULL)) != 0)
+ error(EXIT_FAILURE, r, "wg_init: pthread_mutex_init");
+ if (pipe(wg->fd_threads) != 0)
+ error(EXIT_FAILURE, errno, "wg_init: pipe");
+ if ((r = pthread_create(&wg->gc, NULL, wg_gc, (void*)wg)) != 0)
+ error(EXIT_FAILURE, r, "wg_init: pthread_create");
}
-void wg_add(struct wg *wg, unsigned int n) {
- pthread_mutex_lock(&wg->lock);
- wg->count += n;
- pthread_mutex_unlock(&wg->lock);
+void wg_add(struct wg *wg) {
+ int r;
+ if ((r = pthread_mutex_lock(&wg->lock)) != 0)
+ error(EXIT_FAILURE, r, "wg_add: pthread_mutex_lock");
+ wg->count++;
+ if ((r = pthread_mutex_unlock(&wg->lock)) != 0)
+ error(EXIT_FAILURE, r, "wg_add: pthread_mutex_unlock");
}
-void wg_sub(struct wg *wg, unsigned int n) {
- pthread_mutex_lock(&wg->lock);
- wg->count -= n;
- if (wg->count == 0)
- if (write(wg->fd_signal, " ", 1) < 1)
- error(EXIT_FAILURE, errno, "write");
- pthread_mutex_unlock(&wg->lock);
+void wg_sub(struct wg *wg) {
+ pthread_t thread = pthread_self();
+ ssize_t r = write(wg->fd_threads[1], &thread, sizeof(thread));
+ if (r < 0) {
+ error(EXIT_FAILURE, errno, "wg_sub: write");
+ } else if ((size_t)r < sizeof(thread)) {
+ error(EXIT_FAILURE, 0, "wg_sub: only wrote %zd/%zu", r, sizeof(thread));
+ }
}
void wg_wait(struct wg *wg) {
- char b;
- retry:
- if (read(wg->fd_wait, &b, 1) == -1) {
- if (errno == EINTR)
- goto retry;
- error(EXIT_FAILURE, errno, "wg_wait");
- }
+ int r;
+ if ((r = pthread_join(wg->gc, NULL)) != 0)
+ error(EXIT_FAILURE, r, "wg_wait: pthread_join");
+ if (close(wg->fd_threads[1]) != 0)
+ error(EXIT_FAILURE, errno, "wg_wait: close(fd_threads[1])");
+ if (close(wg->fd_threads[0]) != 0)
+ error(EXIT_FAILURE, errno, "wg_wait: close(fd_threads[0])");
+ if ((r = pthread_mutex_destroy(&wg->lock)) != 0)
+ error(EXIT_FAILURE, r, "wg_wait: pthread_mutex_destroy");
}
diff --git a/src/wg.h b/src/wg.h
index c7efcb2..b2eb4c4 100644
--- a/src/wg.h
+++ b/src/wg.h
@@ -21,18 +21,17 @@
#include <pthread.h>
-/* Thread management tools modeled on https://golang.org/pkg/sync/#WaitGroup */
-
-/* pthread_cond_t is overly complicated. Just use a self-pipe. */
+/* When you call wg_wait, the waitgroup is destroyed. You must
+ * re-wg_init it if you want to reuse it. */
struct wg {
int count;
pthread_mutex_t lock;
- int fd_wait;
- int fd_signal;
+ int fd_threads[2];
+ pthread_t gc;
};
void wg_init(struct wg *);
-void wg_add(struct wg *, unsigned int);
-void wg_sub(struct wg *, unsigned int);
+void wg_add(struct wg *);
+void wg_sub(struct wg *);
void wg_wait(struct wg*);
diff --git a/systemd/freenect-server-http.service.in b/systemd/freenect-server-http.service.in
index 8bb5b44..2aa97d7 100644
--- a/systemd/freenect-server-http.service.in
+++ b/systemd/freenect-server-http.service.in
@@ -17,7 +17,6 @@
Description=Kinect HTTP media streamer
After=network.target
Requires=freenect-server-http.socket freenect-server.service
-PartOf=freenect-server.service
After=freenect-server.service
[Service]
diff --git a/systemd/freenect-server.service.in b/systemd/freenect-server.service.in
index c6a719e..c0417b6 100644
--- a/systemd/freenect-server.service.in
+++ b/systemd/freenect-server.service.in
@@ -15,13 +15,11 @@
[Unit]
Description=Kinect media streamer backend
-After=network.target
-Requires=freenect-server@accel.mjson.socket freenect-server@depth.mjpg.socket freenect-server@video.mjpg.socket
[Service]
Type=simple
User=@user@
Group=@group@
-ExecStart=@bindir@/freenect-server -V systemd:freenect-server@video.mjpg.socket -D systemd:freenect-server@depth.mjpg.socket -A systemd:freenect-server@accel.mjson.socket
-
-Restart=always
+RuntimeDirectory=freenect-server
+ExecStartPre=/usr/bin/mkfifo /run/freenect-server/video.mjpg /run/freenect-server/depth.mjpg /run/freenect-server/accel.mjson
+ExecStart=@bindir@/freenect-server -v /run/freenect-server/video.mjpg -d /run/freenect-server/depth.mjpg -a /run/freenect-server/accel.mjson
diff --git a/systemd/freenect-server@.socket.in b/systemd/freenect-server@.socket.in
deleted file mode 100644
index 8be3205..0000000
--- a/systemd/freenect-server@.socket.in
+++ /dev/null
@@ -1,25 +0,0 @@
-# Copyright (C) 2016 Luke Shumaker
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-[Unit]
-Description=Kinect media streamer backend %I stream
-After=network.target
-
-[Socket]
-SocketUser=@user@
-SocketGroup=@group@
-ListenFIFO=/run/freenect-server/%I
-Service=freenect-server.service
-RemoveOnStop=true \ No newline at end of file