/* multipart-replace-http-server - A server to multiplex * mixed/x-multipart-replace streams over HTTP * * Copyright (C) 2016 Luke Shumaker * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #include #include #include /* for open */ #include /* for {get,free}addrinfo() */ #include #include /* atexit */ #include /* for EXIT_FAILURE */ #include #include #include /* for open */ #include #include #include "util.h" #include "wg.h" #include "multipart-replace.h" struct httpfile { char name[256]; struct multipart_replace_stream *stream; }; size_t filec = 0; struct httpfile *filev = NULL; struct reader_thread_args { struct multipart_replace_stream *stream; int fd; const char *filename; const char *boundary; }; bool running = true; int sock; struct wg wg; void stop(int sig) { log("Caught %d", sig); log("STOPPING=1"); running = false; close(sock); } #define threaderror(stat, errnum, ...) do { error(0, errnum, __VA_ARGS__); stop(0); } while(0) void *reader_thread(void *args_anon) { struct reader_thread_args *args = args_anon; char *name = alloca(strlen("read ")+strlen(args->filename)+1); strcpy(name, "read "); strcpy(&name[strlen("read ")], args->filename); pthread_setname_np(pthread_self(), name); debug("starting thread: %s", name); multipart_replace_reader(args->stream, args->fd, args->boundary); sleep(5); threaderror(EXIT_FAILURE, 0, "multipart_replace stream ended: %s", args->filename); wg_sub(&wg, 1); return NULL; } void start_multipart_replace_reader(struct multipart_replace_stream *s, int fd, const char *filename, const char *boundary) { struct reader_thread_args *args = xrealloc(NULL, sizeof(struct reader_thread_args)); args->stream = s; args->fd = fd; args->filename = strdup(filename); args->boundary = boundary; pthread_t thread; wg_add(&wg, 1); pthread_create(&thread, NULL, reader_thread, args); } void file_add(const char *filename) { struct multipart_replace_stream *stream = xrealloc(NULL, sizeof(struct multipart_replace_stream)); init_multipart_replace_stream(stream); int fd = open(filename, O_RDONLY); if (fd < 0) { threaderror(EXIT_FAILURE, errno, "opening file: %s", filename); return; } filev = xrealloc(filev, (++filec)*sizeof(*filev)); char *shortname = strrchr(filename, '/'); if (shortname == NULL) { shortname = alloca(strlen(filename)+2); shortname[0] = '/'; strcpy(&shortname[1], filename); } strncpy(filev[filec-1].name, shortname, sizeof(filev[filec-1].name)); log("added file #%zd: %s", filec-1, filev[filec-1].name); filev[filec-1].stream = stream; start_multipart_replace_reader(stream, fd, filev[filec-1].name, "ffserver" /* FIXME */); } struct multipart_replace_stream *file_get(const char *filename) { for (size_t i = 0; i < filec; i++) { if (strncmp(filename, filev[i].name, sizeof(filev[i].name)) == 0) { return filev[i].stream; } } return NULL; } void connection_handler(int fd) { FILE *netstream = fdopen(fd, "r"); char *line_buf = NULL; size_t line_cap = 0; ssize_t line_len = 0; line_len = getline(&line_buf, &line_cap, netstream); /* expect line to be "GET /path.mjpg HTTP/1.1" */ if (strncmp(line_buf, "GET ", 4) != 0) { dprintf(fd, "HTTP/1.1 405 Method Not Allowed\r\n" "Allow: GET\r\n" "\r\n"); close(fd); return; } char *version = &line_buf[4]; char *path = strsep(&version, " "); if (strcmp(version, "HTTP/1.1\r\n") != 0 && strcmp(version, "HTTP/1.0\r\n")) { close(fd); return; } path = strdupa(path); /* run through the rest of the headers */ while (strcmp(line_buf, "\r\n") != 0 && line_len >= 0) line_len = getline(&line_buf, &line_cap, netstream); if (strcmp(path, "/") == 0) { log("200 %s", path); dprintf(fd, "HTTP/1.1 200 OK\r\n" "Content-Type: text/html; charset=utf-8\r\n" "\r\n" "\n" "\n" "\n" " multipart/x-mixed-replace HTTP server\n" "\n" "\n" "
    \n" ); for (size_t i = 0; i < filec; i++) dprintf(fd, "
  • %s
  • \n", filev[i].name, filev[i].name); dprintf(fd, "
\n" "\n" "\n"); close(fd); return; } struct multipart_replace_stream *vidstream = file_get(path); if (vidstream == NULL) { log("404 %s", path); dprintf(fd, "HTTP/1.1 404 Not Found\r\n" "\r\n"); return; } const char *boundary = "boundary" /* FIXME */; log("200 %s", path); dprintf(fd, "HTTP/1.1 200 OK\r\n" "Content-Type: multipart/x-mixed-replace;boundary=%s\r\n" "\r\n", boundary); multipart_replace_writer(vidstream, fd, boundary); close(fd); } void *connection_thread(void *arg_anon) { pthread_setname_np(pthread_self(), "connection"); int fd = (int)(intptr_t)arg_anon; log("Connection %d opened", fd); connection_handler(fd); log("Connection %d closed", fd); wg_sub(&wg, 1); return NULL; } /* same error codes values as -getaddrinfo(); */ int sockstream_listen(const char *type, const char *addr) { int sock; if (strcmp(type, "fd") == 0) { sock = get_fd(addr); if (sock < 0) { errno = -sock; return -EAI_SYSTEM; } /* make sure it's a socket */ struct stat st; if (fstat(sock, &st) < 0) return -EAI_SYSTEM; if (!S_ISSOCK(st.st_mode)) { errno = ENOTSOCK; return -EAI_SYSTEM; } /* make sure the socket is a stream */ int fd_socktype = 0; socklen_t l = sizeof(fd_socktype); if (getsockopt(sock, SOL_SOCKET, SO_TYPE, &fd_socktype, &l) < 0) return -EAI_SYSTEM; if (l != sizeof(fd_socktype)) { errno = EINVAL; return -EAI_SYSTEM; } if (fd_socktype != SOCK_STREAM) { errno = ENOSTR; return -EAI_SYSTEM; } /* make sure the socket is listening */ int fd_listening = 0; l = sizeof(fd_listening); if (getsockopt(sock, SOL_SOCKET, SO_ACCEPTCONN, &fd_listening, &l) < 0) return -EAI_SYSTEM; if (l != sizeof(fd_listening)) { errno = EINVAL; return -EAI_SYSTEM; } if (!fd_listening) listen(sock, SOMAXCONN); /* return the socket */ return sock; } else if (strcmp(type, "unix") == 0) { struct sockaddr_un un_addr = { 0 }; if (strlen(addr)+1 > sizeof(un_addr.sun_path)) { errno = ENAMETOOLONG; return -EAI_SYSTEM; } un_addr.sun_family = AF_UNIX; strcpy(un_addr.sun_path, addr); int sock = socket(AF_UNIX, SOCK_STREAM, 0); if (sock < 0) return -EAI_SYSTEM; unlink(addr); if (bind(sock, (struct sockaddr *)&un_addr, sizeof(un_addr)) < 0) return -EAI_SYSTEM; if (listen(sock, SOMAXCONN) < 0) return -EAI_SYSTEM; return sock; } else if (strcmp(type, "tcp4") == 0 || strcmp(type, "tcp6") == 0) { struct addrinfo *ai_addr = NULL; struct addrinfo ai_hints = { 0 }; char *host = strdupa(addr); char *col = strrchr(host, ':'); if (col == NULL) { errno = EINVAL; return -EAI_SYSTEM; } char *port = &col[1]; *col = '\0'; if (host[0] == '\0') host = NULL; ai_hints.ai_family = (strcmp(type, "tcp6") == 0) ? AF_INET6 : AF_INET; ai_hints.ai_socktype = SOCK_STREAM; ai_hints.ai_flags = AI_PASSIVE; int r = getaddrinfo(host, port, &ai_hints, &ai_addr); if (r < 0) return r; if (r > 0) return -r; int sock = socket(ai_addr->ai_family, ai_addr->ai_socktype, ai_addr->ai_protocol); if (sock < 0) return -EAI_SYSTEM; int yes = 1; setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); if (bind(sock, ai_addr->ai_addr, ai_addr->ai_addrlen) < 0) return -EAI_SYSTEM; if (listen(sock, SOMAXCONN) < 0) return -EAI_SYSTEM; freeaddrinfo(ai_addr); return sock; } else { errno = EINVAL; return -EAI_SYSTEM; } } void usage() { printf("Usage: %s [-h] ADDRTYPE ADDR [FILENAME...]\n", program_invocation_name); printf("Multiplex several multipart/x-mixed-replace streams over HTTP.\n" "\n" "ADDRTYPE is \"tcp4\", \"tcp6\", \"fd\", or \"unix\". For tcp4/6, ADDR\n" "is a socket address in standard notation. For fd, ADDR is \"stdin\",\n" "\"stdout\", \"stder\", \"systemd\", or an unsigned integer. For unix,\n" "ADDR is a file path.\n" "\n" "The leading directories of the FILENAMEs are stripped; they are served\n" "at just the base filename. As common sense should eventually suggest,\n" "the FILENAMEs should be pipes or sockets.\n"); } void cleanup() { fflush(stderr); sleep(5); /* work around systemd bug dropping log messages */ } pthread_t main_thread; void progname() { char threadname[16]; if (pthread_self() != main_thread && pthread_getname_np(pthread_self(), threadname, sizeof(threadname)) == 0) fprintf(stderr, "%s:%s: ", program_invocation_name, threadname); else fprintf(stderr, "%s: ", program_invocation_name); } int main(int argc, char *argv[]) { if (argc >=2 && strcmp(argv[1], "-h") == 0) { usage(); return EXIT_SUCCESS; } if (argc < 3) { dup2(2, 1); usage(); return EXIT_FAILURE; } main_thread = pthread_self(); error_print_progname = progname; atexit(cleanup); sock = sockstream_listen(argv[1], argv[2]); if (sock < 0) { if (sock == -EAI_SYSTEM) error(EXIT_FAILURE, errno, "Opening socket %s %s", argv[1], argv[2]); else error(EXIT_FAILURE, 0, "Opening socket %s %s: %s", argv[1], argv[2], gai_strerror(-sock)); } wg_init(&wg); for (int i = 3; i < argc; i++) file_add(argv[i]); signal(SIGPIPE, SIG_IGN); signal(SIGTERM, stop); signal(SIGQUIT, stop); signal(SIGINT, stop); while (running) { int conn = accept(sock, NULL, NULL); if (conn < 0) continue; pthread_t thread; wg_add(&wg, 1); pthread_create(&thread, NULL, connection_thread, (void*)(intptr_t)conn); } wg_wait(&wg); for (size_t i = 0; i < filec; i++) destroy_multipart_replace_stream(filev[i].stream); free(filev); return 0; }