diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/journal/compress.c | 221 | ||||
| -rw-r--r-- | src/journal/test-compress-benchmark.c | 36 | ||||
| -rw-r--r-- | src/journal/test-compress.c | 9 | 
3 files changed, 186 insertions, 80 deletions
| diff --git a/src/journal/compress.c b/src/journal/compress.c index c66043e503..8c92e26edd 100644 --- a/src/journal/compress.c +++ b/src/journal/compress.c @@ -22,6 +22,7 @@  #include <stdlib.h>  #include <string.h>  #include <unistd.h> +#include <sys/mman.h>  #ifdef HAVE_XZ  #  include <lzma.h> @@ -29,6 +30,7 @@  #ifdef HAVE_LZ4  #  include <lz4.h> +#  include <lz4frame.h>  #endif  #include "compress.h" @@ -37,6 +39,11 @@  #include "sparse-endian.h"  #include "journal-def.h" +#ifdef HAVE_LZ4 +DEFINE_TRIVIAL_CLEANUP_FUNC(LZ4F_compressionContext_t, LZ4F_freeCompressionContext); +DEFINE_TRIVIAL_CLEANUP_FUNC(LZ4F_decompressionContext_t, LZ4F_freeDecompressionContext); +#endif +  #define ALIGN_8(l) ALIGN_TO(l, sizeof(size_t))  static const char* const object_compressed_table[_OBJECT_COMPRESSED_MAX] = { @@ -416,81 +423,96 @@ int compress_stream_xz(int fdf, int fdt, uint64_t max_bytes) {  #endif  } -#define LZ4_BUFSIZE (512*1024) +#define LZ4_BUFSIZE (512*1024u)  int compress_stream_lz4(int fdf, int fdt, uint64_t max_bytes) {  #ifdef HAVE_LZ4 +        LZ4F_errorCode_t c; +        _cleanup_(LZ4F_freeCompressionContextp) LZ4F_compressionContext_t ctx = NULL; +        _cleanup_free_ char *buf = NULL; +        char *src = NULL; +        size_t size, n, total_in = 0, total_out = 0, offset = 0, frame_size; +        struct stat st; +        int r; +        static const LZ4F_compressOptions_t options = { +                .stableSrc = 1, +        }; +        static const LZ4F_preferences_t preferences = { +                .frameInfo.blockSizeID = 5, +        }; -        _cleanup_free_ char *buf1 = NULL, *buf2 = NULL, *out = NULL; -        char *buf; -        LZ4_stream_t lz4_data = {}; -        le32_t header; -        size_t total_in = 0, total_out = sizeof(header); -        ssize_t n; +        c = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION); +        if (LZ4F_isError(c)) +                return -ENOMEM; -        assert(fdf >= 0); -        assert(fdt >= 0); +        if (fstat(fdf, &st) < 0) +                return log_debug_errno(errno, "fstat() failed: %m"); -        buf1 = malloc(LZ4_BUFSIZE); -        buf2 = malloc(LZ4_BUFSIZE); -        out = malloc(LZ4_COMPRESSBOUND(LZ4_BUFSIZE)); -        if (!buf1 || !buf2 || !out) -                return log_oom(); +        frame_size = LZ4F_compressBound(LZ4_BUFSIZE, &preferences); +        size =  frame_size + 64*1024; /* add some space for header and trailer */ +        buf = malloc(size); +        if (!buf) +                return -ENOMEM; -        buf = buf1; -        for (;;) { -                size_t m; -                int r; +        n = offset = LZ4F_compressBegin(ctx, buf, size, &preferences); +        if (LZ4F_isError(n)) +                return -EINVAL; -                m = LZ4_BUFSIZE; -                if (max_bytes != (uint64_t) -1 && (uint64_t) m > (max_bytes - total_in)) -                        m = (size_t) (max_bytes - total_in); +        src = mmap(NULL, st.st_size, PROT_READ, MAP_PRIVATE, fdf, 0); +        if (src == MAP_FAILED) +                return -errno; -                n = read(fdf, buf, m); -                if (n < 0) -                        return -errno; -                if (n == 0) -                        break; +        log_debug("Buffer size is %zu bytes, header size %zu bytes.", size, n); -                total_in += n; +        while (total_in < (size_t) st.st_size) { +                ssize_t k; -                r = LZ4_compress_continue(&lz4_data, buf, out, n); -                if (r == 0) { -                        log_error("LZ4 compression failed."); -                        return -EBADMSG; +                k = MIN(LZ4_BUFSIZE, st.st_size - total_in); +                n = LZ4F_compressUpdate(ctx, buf + offset, size - offset, +                                        src + total_in, k, &options); +                if (LZ4F_isError(n)) { +                        r = -ENOTRECOVERABLE; +                        goto cleanup;                  } -                header = htole32(r); -                errno = 0; +                total_in += k; +                offset += n; +                total_out += n; -                n = write(fdt, &header, sizeof(header)); -                if (n < 0) -                        return -errno; -                if (n != sizeof(header)) -                        return errno ? -errno : -EIO; - -                n = loop_write(fdt, out, r, false); -                if (n < 0) -                        return n; +                if (max_bytes != (uint64_t) -1 && total_out > (size_t) max_bytes) { +                        log_debug("Compressed stream longer than %zd bytes", max_bytes); +                        return -EFBIG; +                } -                total_out += sizeof(header) + r; +                if (size - offset < frame_size + 4) { +                        k = loop_write(fdt, buf, offset, false); +                        if (k < 0) { +                                r = k; +                                goto cleanup; +                        } +                        offset = 0; +                } +        } -                buf = buf == buf1 ? buf2 : buf1; +        n = LZ4F_compressEnd(ctx, buf + offset, size - offset, &options); +        if (LZ4F_isError(n)) { +                r = -ENOTRECOVERABLE; +                goto cleanup;          } -        header = htole32(0); -        n = write(fdt, &header, sizeof(header)); -        if (n < 0) -                return -errno; -        if (n != sizeof(header)) -                return errno ? -errno : -EIO; +        offset += n; +        total_out += n; +        r = loop_write(fdt, buf, offset, false); +        if (r < 0) +                goto cleanup;          log_debug("LZ4 compression finished (%zu -> %zu bytes, %.1f%%)",                    total_in, total_out,                    (double) total_out / total_in * 100); - -        return 0; + cleanup: +        munmap(src, st.st_size); +        return r;  #else          return -EPROTONOSUPPORT;  #endif @@ -510,7 +532,7 @@ int decompress_stream_xz(int fdf, int fdt, uint64_t max_bytes) {          ret = lzma_stream_decoder(&s, UINT64_MAX, 0);          if (ret != LZMA_OK) { -                log_error("Failed to initialize XZ decoder: code %u", ret); +                log_debug("Failed to initialize XZ decoder: code %u", ret);                  return -ENOMEM;          } @@ -536,7 +558,7 @@ int decompress_stream_xz(int fdf, int fdt, uint64_t max_bytes) {                  ret = lzma_code(&s, action);                  if (ret != LZMA_OK && ret != LZMA_STREAM_END) { -                        log_error("Decompression failed: code %u", ret); +                        log_debug("Decompression failed: code %u", ret);                          return -EBADMSG;                  } @@ -566,14 +588,14 @@ int decompress_stream_xz(int fdf, int fdt, uint64_t max_bytes) {                  }          }  #else -        log_error("Cannot decompress file. Compiled without XZ support."); +        log_debug("Cannot decompress file. Compiled without XZ support.");          return -EPROTONOSUPPORT;  #endif  } -int decompress_stream_lz4(int fdf, int fdt, uint64_t max_bytes) { -  #ifdef HAVE_LZ4 +static int decompress_stream_lz4_v1(int fdf, int fdt, uint64_t max_bytes) { +          _cleanup_free_ char *buf = NULL, *out = NULL;          size_t buf_size = 0;          LZ4_streamDecode_t lz4_data = {}; @@ -585,7 +607,7 @@ int decompress_stream_lz4(int fdf, int fdt, uint64_t max_bytes) {          out = malloc(4*LZ4_BUFSIZE);          if (!out) -                return log_oom(); +                return -ENOMEM;          for (;;) {                  ssize_t m; @@ -606,22 +628,24 @@ int decompress_stream_lz4(int fdf, int fdt, uint64_t max_bytes) {                   * not accept buffers compressed by newer binaries then.                   */                  if (m > LZ4_COMPRESSBOUND(LZ4_BUFSIZE * 4)) { -                        log_error("Compressed stream block too big: %zd bytes", m); -                        return -EBADMSG; +                        log_debug("Compressed stream block too big: %zd bytes", m); +                        return -ENOBUFS;                  }                  total_in += sizeof(header) + m;                  if (!GREEDY_REALLOC(buf, buf_size, m)) -                        return log_oom(); +                        return -ENOMEM;                  r = loop_read_exact(fdf, buf, m, false);                  if (r < 0)                          return r;                  r = LZ4_decompress_safe_continue(&lz4_data, buf, out, m, 4*LZ4_BUFSIZE); -                if (r <= 0) -                        log_error("LZ4 decompression failed."); +                if (r <= 0) { +                        log_debug("LZ4 decompression failed (legacy format)."); +                        return -EBADMSG; +                }                  total_out += r; @@ -635,13 +659,80 @@ int decompress_stream_lz4(int fdf, int fdt, uint64_t max_bytes) {                          return r;          } -        log_debug("LZ4 decompression finished (%zu -> %zu bytes, %.1f%%)", +        log_debug("LZ4 decompression finished (legacy format, %zu -> %zu bytes, %.1f%%)",                    total_in, total_out,                    (double) total_out / total_in * 100);          return 0; +} + +static int decompress_stream_lz4_v2(int in, int out, uint64_t max_bytes) { +        size_t c; +        _cleanup_(LZ4F_freeDecompressionContextp) LZ4F_decompressionContext_t ctx = NULL; +        _cleanup_free_ char *buf = NULL; +        char *src; +        struct stat st; +        int r = 0; +        size_t total_in = 0, total_out = 0; + +        c = LZ4F_createDecompressionContext(&ctx, LZ4F_VERSION); +        if (LZ4F_isError(c)) +                return -ENOMEM; + +        if (fstat(in, &st) < 0) +                return log_debug_errno(errno, "fstat() failed: %m"); + +        buf = malloc(LZ4_BUFSIZE); +        if (!buf) +                return -ENOMEM; + +        src = mmap(NULL, st.st_size, PROT_READ, MAP_PRIVATE, in, 0); +        if (src == MAP_FAILED) +                return -errno; + +        while (total_in < (size_t) st.st_size) { +                size_t produced = LZ4_BUFSIZE; +                size_t used = st.st_size - total_in; + +                c = LZ4F_decompress(ctx, buf, &produced, src + total_in, &used, NULL); +                if (LZ4F_isError(c)) { +                        r = -EBADMSG; +                        goto cleanup; +                } + +                total_in += used; +                total_out += produced; + +                if (max_bytes != (uint64_t) -1 && total_out > (size_t) max_bytes) { +                        log_debug("Decompressed stream longer than %zd bytes", max_bytes); +                        r = -EFBIG; +                        goto cleanup; +                } + +                r = loop_write(out, buf, produced, false); +                if (r < 0) +                        goto cleanup; +        } + +        log_debug("LZ4 decompression finished (%zu -> %zu bytes, %.1f%%)", +                  total_in, total_out, +                  (double) total_out / total_in * 100); + cleanup: +        munmap(src, st.st_size); +        return r; +} +#endif + +int decompress_stream_lz4(int fdf, int fdt, uint64_t max_bytes) { +#ifdef HAVE_LZ4 +        int r; + +        r = decompress_stream_lz4_v2(fdf, fdt, max_bytes); +        if (r == -EBADMSG) +                r = decompress_stream_lz4_v1(fdf, fdt, max_bytes); +        return r;  #else -        log_error("Cannot decompress file. Compiled without LZ4 support."); +        log_debug("Cannot decompress file. Compiled without LZ4 support.");          return -EPROTONOSUPPORT;  #endif  } diff --git a/src/journal/test-compress-benchmark.c b/src/journal/test-compress-benchmark.c index c8e5b76c6c..0be6820a14 100644 --- a/src/journal/test-compress-benchmark.c +++ b/src/journal/test-compress-benchmark.c @@ -20,6 +20,7 @@  #include "compress.h"  #include "util.h"  #include "macro.h" +#include "random-util.h"  typedef int (compress_t)(const void *src, uint64_t src_size, void *dst, size_t *dst_size);  typedef int (decompress_t)(const void *src, uint64_t src_size, @@ -27,20 +28,31 @@ typedef int (decompress_t)(const void *src, uint64_t src_size,  #define MAX_SIZE (1024*1024LU) -static char* make_buf(size_t count) { +static char* make_buf(size_t count, const char *type) {          char *buf;          size_t i;          buf = malloc(count);          assert_se(buf); -        for (i = 0; i < count; i++) -                buf[i] = 'a' + i % ('z' - 'a' + 1); +        if (streq(type, "zeros")) +                memzero(buf, count); +        else if (streq(type, "simple")) +                for (i = 0; i < count; i++) +                        buf[i] = 'a' + i % ('z' - 'a' + 1); +        else if (streq(type, "random")) { +                random_bytes(buf, count/10); +                random_bytes(buf + 2*count/10, count/10); +                random_bytes(buf + 4*count/10, count/20); +                random_bytes(buf + 6*count/10, count/20); +                random_bytes(buf + 8*count/10, count/20); +        } else +                assert_not_reached("here");          return buf;  } -static void test_compress_decompress(const char* label, +static void test_compress_decompress(const char* label, const char* type,                                       compress_t compress, decompress_t decompress) {          usec_t n, n2 = 0;          float dt; @@ -50,7 +62,7 @@ static void test_compress_decompress(const char* label,          size_t buf2_allocated = 0;          size_t skipped = 0, compressed = 0, total = 0; -        text = make_buf(MAX_SIZE); +        text = make_buf(MAX_SIZE, type);          buf = calloc(MAX_SIZE + 1, 1);          assert_se(text && buf); @@ -62,7 +74,8 @@ static void test_compress_decompress(const char* label,                  r = compress(text, i, buf, &j);                  /* assume compression must be successful except for small inputs */ -                assert_se(r == 0 || (i < 2048 && r == -ENOBUFS)); +                assert_se(r == 0 || (i < 2048 && r == -ENOBUFS) || streq(type, "random")); +                  /* check for overwrites */                  assert_se(buf[i] == 0);                  if (r != 0) { @@ -91,23 +104,26 @@ static void test_compress_decompress(const char* label,          dt = (n2-n) / 1e6; -        log_info("%s: compressed & decompressed %zu bytes in %.2fs (%.2fMiB/s), " +        log_info("%s/%s: compressed & decompressed %zu bytes in %.2fs (%.2fMiB/s), "                   "mean compresion %.2f%%, skipped %zu bytes", -                 label, total, dt, +                 label, type, total, dt,                   total / 1024. / 1024 / dt,                   100 - compressed * 100. / total,                   skipped);  }  int main(int argc, char *argv[]) { +        const char *i;          log_set_max_level(LOG_DEBUG); +        NULSTR_FOREACH(i, "zeros\0simple\0random\0") {  #ifdef HAVE_XZ -        test_compress_decompress("XZ", compress_blob_xz, decompress_blob_xz); +                test_compress_decompress("XZ", i, compress_blob_xz, decompress_blob_xz);  #endif  #ifdef HAVE_LZ4 -        test_compress_decompress("LZ4", compress_blob_lz4, decompress_blob_lz4); +                test_compress_decompress("LZ4", i, compress_blob_lz4, decompress_blob_lz4);  #endif +        }          return 0;  } diff --git a/src/journal/test-compress.c b/src/journal/test-compress.c index f17c00e60d..e51b12407f 100644 --- a/src/journal/test-compress.c +++ b/src/journal/test-compress.c @@ -144,8 +144,8 @@ static void test_compress_stream(int compression,                                   const char *srcfile) {          _cleanup_close_ int src = -1, dst = -1, dst2 = -1; -        char pattern[] = "/tmp/systemd-test.xz.XXXXXX", -             pattern2[] = "/tmp/systemd-test.xz.XXXXXX"; +        char pattern[] = "/tmp/systemd-test.compressed.XXXXXX", +             pattern2[] = "/tmp/systemd-test.compressed.XXXXXX";          int r;          _cleanup_free_ char *cmd = NULL, *cmd2;          struct stat st = {}; @@ -185,7 +185,7 @@ static void test_compress_stream(int compression,          assert_se(lseek(dst, 1, SEEK_SET) == 1);          r = decompress(dst, dst2, st.st_size); -        assert_se(r == -EBADMSG); +        assert_se(r == -EBADMSG || r == 0);          assert_se(lseek(dst, 0, SEEK_SET) == 0);          assert_se(lseek(dst2, 0, SEEK_SET) == 0); @@ -236,8 +236,7 @@ int main(int argc, char *argv[]) {                                     compress_blob_lz4, decompress_startswith_lz4,                                     data, sizeof(data), true); -        /* Produced stream is not compatible with lz4 binary, skip lz4cat check. */ -        test_compress_stream(OBJECT_COMPRESSED_LZ4, NULL, +        test_compress_stream(OBJECT_COMPRESSED_LZ4, "lz4cat",                               compress_stream_lz4, decompress_stream_lz4, argv[0]);  #else          log_info("/* LZ4 test skipped */"); | 
