From bae1cfe5ce46a1b758ccc4dddda2751b6ac47f3e Mon Sep 17 00:00:00 2001 From: Sergey Matveev Date: Tue, 19 Oct 2021 14:05:32 +0300 Subject: [PATCH] Multistream WARCs and better Zstandard support --- .gitignore | 4 +- cmd/enzstd/.gitignore | 2 + cmd/enzstd/compile_flags.txt.do | 1 + cmd/enzstd/enzstd.c | 105 ++++++++++++++++ cmd/enzstd/enzstd.do | 3 + cmd/ungzip/main.go | 79 ++++++++++++ cmd/unzstd/.gitignore | 2 + cmd/unzstd/compile_flags.txt.do | 1 + cmd/unzstd/unzstd.c | 212 ++++++++++++++++++++++++++++++++ cmd/unzstd/unzstd.do | 3 + cmd/warc-extract/main.go | 49 +++++++- doc/warcs.texi | 46 +++++-- fifos/start.go | 5 +- fifos/warcs.go | 12 +- go.mod | 1 - go.sum | 2 - rounds/warc.go | 3 +- warc/compressed.go | 166 ++++++++++++++++++------- warc/gzip.go | 123 ++++++++++++++++++ warc/open.go | 49 ++++++++ warc/reader.go | 45 ++++--- warc/record.go | 31 ++--- warc/uncompressed.go | 51 ++++++++ warc/uris.go | 49 ++++++-- 24 files changed, 936 insertions(+), 108 deletions(-) create mode 100644 cmd/enzstd/.gitignore create mode 100644 cmd/enzstd/compile_flags.txt.do create mode 100644 cmd/enzstd/enzstd.c create mode 100644 cmd/enzstd/enzstd.do create mode 100644 cmd/ungzip/main.go create mode 100644 cmd/unzstd/.gitignore create mode 100644 cmd/unzstd/compile_flags.txt.do create mode 100644 cmd/unzstd/unzstd.c create mode 100644 cmd/unzstd/unzstd.do create mode 100644 warc/gzip.go create mode 100644 warc/open.go create mode 100644 warc/uncompressed.go diff --git a/.gitignore b/.gitignore index 087a229..7b40b4d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,5 @@ +/*.cmd /ccerts /cert.pem -/certgen.cmd /certs /prv.pem -/tofuproxy.cmd -/warc-extract.cmd diff --git a/cmd/enzstd/.gitignore b/cmd/enzstd/.gitignore new file mode 100644 index 0000000..aaa4938 --- /dev/null +++ b/cmd/enzstd/.gitignore @@ -0,0 +1,2 @@ +/compile_flags.txt +/enzstd diff --git a/cmd/enzstd/compile_flags.txt.do b/cmd/enzstd/compile_flags.txt.do new file mode 100644 index 0000000..13b13f1 --- /dev/null +++ b/cmd/enzstd/compile_flags.txt.do @@ -0,0 +1 @@ +printf "%s" "$CFLAGS $LDFLAGS" | tr " " "\n" | grep -v "^$" | sort | uniq diff --git a/cmd/enzstd/enzstd.c b/cmd/enzstd/enzstd.c new file mode 100644 index 0000000..cedb084 --- /dev/null +++ b/cmd/enzstd/enzstd.c @@ -0,0 +1,105 @@ +/* +tofuproxy -- flexible HTTP/WARC proxy with TLS certificates management +Copyright (C) 2021 Sergey Matveev + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, version 3 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . +*/ + +#include +#include +#include +#include + +#include + +int +main(int argc, char **argv) +{ + ZSTD_CCtx *ctx = ZSTD_createCCtx(); + if (ctx == NULL) { + fputs("can not initialize ZSTD_createCCtx\n", stderr); + return EXIT_FAILURE; + }; + size_t zCode = ZSTD_CCtx_setParameter(ctx, ZSTD_c_checksumFlag, 1); + if (ZSTD_isError(zCode)) { + fprintf(stderr, "can not setParameter: %s\n", ZSTD_getErrorName(zCode)); + return EXIT_FAILURE; + }; + zCode = ZSTD_CCtx_setParameter(ctx, ZSTD_c_compressionLevel, ZSTD_maxCLevel()); + if (ZSTD_isError(zCode)) { + fprintf(stderr, "can not setParameter: %s\n", ZSTD_getErrorName(zCode)); + return EXIT_FAILURE; + }; + + size_t srcSize = 8; + uint8_t *src = malloc(srcSize); + if (src == NULL) { + fprintf(stderr, "can not allocate memory: %zu\n", srcSize); + return EXIT_FAILURE; + }; + size_t dstSize = 0; + uint8_t *dst = NULL; + size_t srcWantSize = 0; + size_t dstWantSize = 0; + size_t n = 0; + for (;;) { + n = fread(src, 1, 8, stdin); + if (n < 8) { + if (feof(stdin)) { + break; + }; + perror("can not fread(stdin)"); + return EXIT_FAILURE; + }; + srcWantSize = (size_t)be64dec(src); + if (srcWantSize > srcSize) { + free(src); + srcSize = srcWantSize; + src = malloc(srcSize); + if (src == NULL) { + fprintf(stderr, "can not allocate memory: %zu\n", srcSize); + return EXIT_FAILURE; + }; + }; + n = fread(src, 1, srcWantSize, stdin); + if (n < srcWantSize) { + fprintf(stderr, "IS: %zu %zu\n", n, srcWantSize); + fputs("insufficient data fed\n", stderr); + return EXIT_FAILURE; + }; + dstWantSize = ZSTD_compressBound(srcWantSize); + if (dstWantSize > dstSize) { + free(dst); + dstSize = dstWantSize; + dst = malloc(dstSize); + if (dst == NULL) { + fprintf(stderr, "can not allocate memory: %zu\n", dstSize); + return EXIT_FAILURE; + }; + }; + zCode = ZSTD_compress2(ctx, dst, dstSize, src, srcWantSize); + if (ZSTD_isError(zCode)) { + fprintf(stderr, "can not compress: %s\n", ZSTD_getErrorName(zCode)); + return EXIT_FAILURE; + }; + n = fwrite(dst, 1, zCode, stdout); + if (n < zCode) { + perror("can not fwrite(stdout)"); + return EXIT_FAILURE; + }; + }; + free(dst); + free(src); + ZSTD_freeCCtx(ctx); + return EXIT_SUCCESS; +}; diff --git a/cmd/enzstd/enzstd.do b/cmd/enzstd/enzstd.do new file mode 100644 index 0000000..c381e4e --- /dev/null +++ b/cmd/enzstd/enzstd.do @@ -0,0 +1,3 @@ +src=enzstd.c +redo-ifchange $src +${CC:-cc} ${CFLAGS} -o $3 $src $LDFLAGS -lzstd diff --git a/cmd/ungzip/main.go b/cmd/ungzip/main.go new file mode 100644 index 0000000..0895aa5 --- /dev/null +++ b/cmd/ungzip/main.go @@ -0,0 +1,79 @@ +/* +tofuproxy -- flexible HTTP/WARC proxy with TLS certificates management +Copyright (C) 2021 Sergey Matveev + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, version 3 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . +*/ + +package main + +import ( + "bufio" + "compress/gzip" + "fmt" + "io" + "log" + "os" +) + +type CountableReader struct { + *bufio.Reader + n int64 +} + +func (r *CountableReader) ReadByte() (b byte, err error) { + b, err = r.Reader.ReadByte() + if err == nil { + r.n++ + } + return b, err +} + +func (r *CountableReader) Read(p []byte) (n int, err error) { + n, err = r.Reader.Read(p) + r.n += int64(n) + return +} + +func main() { + log.SetFlags(log.Lshortfile) + fdOff := os.NewFile(3, "fdOff") + br := bufio.NewReader(os.Stdin) + cr := &CountableReader{Reader: br} + bw := bufio.NewWriter(os.Stdout) + z, err := gzip.NewReader(cr) + if err != nil { + log.Fatalln(err) + } + z.Multistream(false) + var offset, offsetPrev int64 + for { + written, err := io.Copy(bw, z) + if err != nil { + log.Fatalln(err) + } + offset = cr.n + if fdOff != nil { + fmt.Fprintf(fdOff, "%d\t%d\n", offset-offsetPrev, written) + } + offsetPrev = offset + err = z.Reset(cr) + if err != nil { + if err == io.EOF { + break + } + log.Fatalln(err) + } + z.Multistream(false) + } +} diff --git a/cmd/unzstd/.gitignore b/cmd/unzstd/.gitignore new file mode 100644 index 0000000..d34d410 --- /dev/null +++ b/cmd/unzstd/.gitignore @@ -0,0 +1,2 @@ +/compile_flags.txt +/unzstd diff --git a/cmd/unzstd/compile_flags.txt.do b/cmd/unzstd/compile_flags.txt.do new file mode 100644 index 0000000..13b13f1 --- /dev/null +++ b/cmd/unzstd/compile_flags.txt.do @@ -0,0 +1 @@ +printf "%s" "$CFLAGS $LDFLAGS" | tr " " "\n" | grep -v "^$" | sort | uniq diff --git a/cmd/unzstd/unzstd.c b/cmd/unzstd/unzstd.c new file mode 100644 index 0000000..4864db6 --- /dev/null +++ b/cmd/unzstd/unzstd.c @@ -0,0 +1,212 @@ +/* +tofuproxy -- flexible HTTP/WARC proxy with TLS certificates management +Copyright (C) 2021 Sergey Matveev + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, version 3 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . +*/ + +// https://iipc.github.io/warc-specifications/specifications/warc-zstd/ + +#include +#include +#include +#include +#include +#include +#include + +#include + +int +main(int argc, char **argv) +{ + ZSTD_DCtx *ctx = ZSTD_createDCtx(); + if (ctx == NULL) { + fputs("can not initialize ZSTD_DCtx\n", stderr); + return 1; + }; + FILE *fdOff = fdopen(3, "wb"); + int rc = EXIT_FAILURE; + uint8_t *bufIn = NULL; + uint8_t *bufOut = NULL; + const size_t bufInSize = ZSTD_DStreamInSize(); + bufIn = malloc(bufInSize); + if (bufIn == NULL) { + fputs("no memory\n", stderr); + goto Exit; + }; + const size_t bufOutSize = ZSTD_DStreamOutSize(); + bufOut = malloc(bufOutSize); + if (bufOut == NULL) { + fputs("no memory\n", stderr); + goto Exit; + }; + + unsigned long long bufSize = 0; + + ZSTD_inBuffer bIn = {bufIn, 0, 0}; + ZSTD_outBuffer bOut = {bufOut, 0, 0}; + + bool isEmpty = true; + bool lastBlock = false; + size_t n = 0; + size_t written = 0; + size_t offset = 0; + size_t offsetPrev = 0; + size_t zCode = 0; +ReadAgain: + for (;;) { + n = fread(bufIn, 1, bufInSize, stdin); + if (n != bufInSize) { + if (feof(stdin)) { + lastBlock = true; + } else { + perror("can not fread(FILE)"); + goto Exit; + }; + }; + if (n >= 8 && le32dec(bufIn) == 0x184D2A5D) { + // dictionary frame + size_t dictSize = (size_t)le32dec(bufIn + 4); + uint8_t *dict = malloc(dictSize); + if (dict == NULL) { + fprintf(stderr, "insufficient memory for dictionary: %zu\n", dictSize); + goto Exit; + }; + const size_t alreadyRead = n - 8; + memcpy(dict, bufIn + 8, alreadyRead); + errno = 0; + n = fread(dict + alreadyRead, 1, dictSize - alreadyRead, stdin); + if (n != dictSize - alreadyRead) { + perror("can not read dictionary data"); + free(dict); + goto Exit; + }; + offset = dictSize + 8; + offsetPrev = offset; + if (fdOff != NULL) { + fprintf(fdOff, "%zu\t0\n", offset); + }; + uint32_t hdr = le32dec(dict); + switch (hdr) { + case ZSTD_MAGIC_DICTIONARY: + zCode = ZSTD_DCtx_loadDictionary(ctx, dict, dictSize); + free(dict); + if ((zCode != 0) && (ZSTD_isError(zCode))) { + fprintf( + stderr, + "can not load dictionary: %s\n", + ZSTD_getErrorName(zCode)); + goto Exit; + }; + goto ReadAgain; + break; + case ZSTD_MAGICNUMBER: + bufSize = ZSTD_getFrameContentSize(dict, dictSize); + switch (bufSize) { + case ZSTD_CONTENTSIZE_UNKNOWN: + case ZSTD_CONTENTSIZE_ERROR: + fprintf(stderr, "can not determine dictionary's size\n"); + free(dict); + goto Exit; + }; + uint8_t *buf = malloc(bufSize); + if (buf == NULL) { + fprintf( + stderr, "insufficient memory for dictionary: %llu\n", bufSize); + free(dict); + goto Exit; + }; + zCode = ZSTD_decompress(buf, bufSize, dict, dictSize); + free(dict); + if (ZSTD_isError(zCode)) { + fprintf( + stderr, + "can not decompress dictionary: %s\n", + ZSTD_getErrorName(zCode)); + free(buf); + goto Exit; + }; + zCode = ZSTD_DCtx_loadDictionary(ctx, buf, zCode); + free(buf); + if ((zCode != 0) && (ZSTD_isError(zCode))) { + fprintf( + stderr, + "can not load dictionary: %s\n", + ZSTD_getErrorName(zCode)); + goto Exit; + }; + goto ReadAgain; + break; + default: + fprintf(stderr, "unknown dictionary header\n"); + free(dict); + goto Exit; + }; + }; + isEmpty = false; + bIn.size = n; + bIn.pos = 0; + while (bIn.pos < bIn.size) { + bOut.size = bufOutSize; + bOut.pos = 0; + zCode = ZSTD_decompressStream(ctx, &bOut, &bIn); + if ((zCode != 0) && (ZSTD_isError(zCode))) { + fprintf(stderr, "can not decompress: %s\n", ZSTD_getErrorName(zCode)); + goto Exit; + }; + n = fwrite(bufOut, 1, bOut.pos, stdout); + if (n != bOut.pos) { + perror("can not fwrite(stdout)"); + goto Exit; + }; + written += n; + if (zCode == 0) { + offset += bIn.pos; + if (fdOff != NULL) { + fprintf(fdOff, "%zu\t%zu\n", offset - offsetPrev, written); + }; + offsetPrev = offset + bIn.pos; + written = 0; + }; + }; + if (lastBlock) { + break; + }; + offset += bIn.pos; + }; + + if (isEmpty) { + fputs("empty input\n", stderr); + goto Exit; + }; + if (zCode != 0) { + fprintf(stderr, "unfinished decompression: %s\n", ZSTD_getErrorName(zCode)); + goto Exit; + }; + rc = EXIT_SUCCESS; + +Exit: + if (bufOut != NULL) { + free(bufOut); + }; + if (bufIn != NULL) { + free(bufIn); + }; + ZSTD_freeDCtx(ctx); + if ((fdOff != NULL) && (fclose(fdOff) != 0)) { + perror("can not fclose(4)"); + return EXIT_FAILURE; + }; + return rc; +}; diff --git a/cmd/unzstd/unzstd.do b/cmd/unzstd/unzstd.do new file mode 100644 index 0000000..8b50f9f --- /dev/null +++ b/cmd/unzstd/unzstd.do @@ -0,0 +1,3 @@ +src=unzstd.c +redo-ifchange $src +${CC:-cc} ${CFLAGS} -o $3 $src $LDFLAGS -lzstd diff --git a/cmd/warc-extract/main.go b/cmd/warc-extract/main.go index 8b1aa2a..177d251 100644 --- a/cmd/warc-extract/main.go +++ b/cmd/warc-extract/main.go @@ -18,6 +18,9 @@ along with this program. If not, see . package main import ( + "bufio" + "bytes" + "encoding/binary" "flag" "fmt" "io" @@ -32,9 +35,53 @@ func main() { uri := flag.String("uri", "", "URI to extract, if specified") hdr := flag.Bool("hdr", false, "Also extract WARC's header") idx := flag.Bool("idx", false, "Save WARC indexes") + recompress := flag.Bool("for-enzstd", false, "Output for enzstd utility") flag.Parse() log.SetFlags(log.Lshortfile) + if *recompress { + var hdr bytes.Buffer + size := make([]byte, 8) + bw := bufio.NewWriter(os.Stdout) + for _, p := range flag.Args() { + r, err := warc.NewReader(p) + if err != nil { + log.Fatalln(err) + } + for { + rec, rr, err := r.ReadRecord() + if err != nil { + if err == io.EOF { + break + } + log.Fatalln(err) + } + for _, line := range rec.HdrLines { + hdr.WriteString(line) + } + hdr.WriteString("\r\n") + binary.BigEndian.PutUint64(size, uint64(hdr.Len())+uint64(rec.Size)+4) + if _, err = bw.Write(size); err != nil { + log.Fatalln(err) + } + if _, err = io.Copy(bw, &hdr); err != nil { + log.Fatalln(err) + } + if _, err = io.Copy(bw, rr); err != nil { + log.Fatalln(err) + } + r.RecordWasRead() + if _, err = bw.Write([]byte("\r\n\r\n")); err != nil { + log.Fatalln(err) + } + } + } + if err := bw.Flush(); err != nil { + log.Fatalln(err) + } + return + } + for _, p := range flag.Args() { log.Println("adding", p) if err := warc.Add(p); err != nil { @@ -63,7 +110,7 @@ func main() { if rec == nil { continue } - r, err := rec.Reader(!*hdr) + r, err := rec.Reader(!*hdr, warc.WARCsOffsets) if err != nil { log.Fatalln(err) } diff --git a/doc/warcs.texi b/doc/warcs.texi index 9170553..aba62ae 100644 --- a/doc/warcs.texi +++ b/doc/warcs.texi @@ -6,13 +6,33 @@ transparently replaced from those WARCs for corresponding URIs. There is no strict validation or checking of WARCs correctness at all! But built-in WARC support seems to be good enough for various sources. -Uncompressed, @command{gzip} (multiple streams and single stream are -supported) and @command{zstd} compressed ones are supported. +Following formats are supported: -Searching in compressed files is @strong{slow} -- every request will -lead to decompression of the file from the very beginning, so keeping -uncompressed WARCs on compressed ZFS dataset is much more preferable. -@command{tofuproxy} does not take advantage of multistream gzip files. +@table @asis + +@item @file{.warc} +Ordinary uncompressed WARC. Useful to be stored on transparently +compressed ZFS dataset. + +@item @command{.warc.gz} +GZIP compressed WARC. Multi-stream (multi-segment) formats are also +supported and properly indexed. + +@item @command{.warc.zst} +Zstandard compressed WARC, as in +@url{https://iipc.github.io/warc-specifications/specifications/warc-zstd/, specification}. +Multi-frame format is properly indexed. Dictionary at the beginning +is also supported. + +It is processed with with @command{unzstd} (@command{redo +cmd/unzstd/unzstd}) utility. It eats compressed stream from +@code{stdin}, outputs decompressed data to @code{stdout}, and prints +each frame size with corresponding decompressed data size to 3rd file +descriptor (if it is opened). You can adjust path to it with @code{-X +go.stargrave.org/tofuproxy/warc.UnZSTDPath} command line option during +building. + +@end table @itemize @@ -56,9 +76,9 @@ it contains continuation segmented records. Loading of WARC involves its whole reading and remembering where is each URI response is located. You can @code{echo SAVE > fifos/add-warcs} to -save in-memory index to the disk as @file{....warc.idx.gob} file. During +save in-memory index to the disk as @file{....idx.gob} file. During the next load, if that file exists, it is used as index immediately, -without expensive WARC reading. +without expensive WARC parsing. @code{redo warc-extract.cmd} builds @command{warc-extract.cmd} utility, that uses exactly the same code for parsing WARCs. It can be used to @@ -76,6 +96,16 @@ $ warc-extract.cmd -uri http://some/uri \ smth.warc-00002.warc.gz @end example +Following example can be used to create multi-frame @file{.warc.zst} +from any kind of already existing WARCs. It has better compression ratio +and much higher decompression speed. + +@example +$ redo cmd/enzstd/enzstd +$ ./warc-extract.cmd -for-enzstd /path/to.warc.gz | + cmd/enzstd/enzstd > /path/to.warc.zst +@end example + @url{https://www.gnu.org/software/wget/, GNU Wget} can be easily used to create WARCs: diff --git a/fifos/start.go b/fifos/start.go index 528b2a9..2f535e2 100644 --- a/fifos/start.go +++ b/fifos/start.go @@ -78,7 +78,10 @@ func Start(fifos string) { go addWARC(filepath.Join(fifos, "add-warcs")) go del( - &warc.WARCsM, func(warcPath string) { delete(warc.WARCs, warcPath) }, + &warc.WARCsM, func(warcPath string) { + delete(warc.WARCs, warcPath) + delete(warc.WARCsOffsets, warcPath) + }, filepath.Join(fifos, "del-warcs"), ) } diff --git a/fifos/warcs.go b/fifos/warcs.go index 6d38700..90e17a0 100644 --- a/fifos/warcs.go +++ b/fifos/warcs.go @@ -34,7 +34,10 @@ func listWARCs(p string) { } warc.WARCsM.RLock() for warcPath, uris := range warc.WARCs { - fmt.Fprintf(fd, "%s\t%d\n", warcPath, len(uris)) + fmt.Fprintf( + fd, "%s\t%d\t%d\n", + warcPath, len(uris), len(warc.WARCs[warcPath]), + ) } warc.WARCsM.RUnlock() fd.Close() @@ -72,7 +75,12 @@ func addWARC(p string) { log.Printf("%s: can not open %s: %+v\n", p, warcPath, err) break } - log.Printf("%s: %s: added %d URIs\n", p, warcPath, len(warc.WARCs[warcPath])) + log.Printf( + "%s: %s: added %d URIs %d segments\n", + p, warcPath, + len(warc.WARCs[warcPath]), + len(warc.WARCsOffsets[warcPath]), + ) } } } diff --git a/go.mod b/go.mod index 0dd2f43..779bef0 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,6 @@ go 1.17 require ( github.com/dustin/go-humanize v1.0.0 - github.com/klauspost/compress v1.13.6 github.com/miekg/dns v1.1.29 go.cypherpunks.ru/tai64n/v2 v2.0.0 go.cypherpunks.ru/ucspi v0.0.0-20210908140534-cfdc20a8225f diff --git a/go.sum b/go.sum index 5b0005c..a3c1ed0 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,5 @@ github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= -github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc= -github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/miekg/dns v1.1.29 h1:xHBEhR+t5RzcFJjBLJlax2daXOrTYtr9z4WdKEfWFzg= github.com/miekg/dns v1.1.29/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7xM= go.cypherpunks.ru/tai64n/v2 v2.0.0 h1:AlohA1/zRqInhIGK7CVnn7tC5/vt1TaOAEyBgeu5Ruo= diff --git a/rounds/warc.go b/rounds/warc.go index d9ab78e..ddb566c 100644 --- a/rounds/warc.go +++ b/rounds/warc.go @@ -129,11 +129,12 @@ func RoundWARC( return true, nil } - wr, err := rec.Reader(true) + wr, err := rec.Reader(true, warc.WARCsOffsets) if err != nil { log.Printf("WARC: error during %s: %+v\n", req.URL, err) return false, err } + defer wr.Close() hj, ok := w.(http.Hijacker) if !ok { http.Error(w, "can not hijack", http.StatusInternalServerError) diff --git a/warc/compressed.go b/warc/compressed.go index baa6830..b1de1ce 100644 --- a/warc/compressed.go +++ b/warc/compressed.go @@ -18,69 +18,145 @@ along with this program. If not, see . package warc import ( - "compress/gzip" - "fmt" + "bufio" + "bytes" "io" + "log" "os" - "path" - - "github.com/klauspost/compress/zstd" + "os/exec" + "strconv" + "strings" + "sync" ) -type Compressed struct { - r io.ReadCloser - fd *os.File - offset int64 -} - -func (c *Compressed) Read(p []byte) (int, error) { - n, err := c.r.Read(p) - c.offset += int64(n) - return n, err -} +type CompressedReader struct { + cmd *exec.Cmd + fd *os.File + stdout io.ReadCloser + offsets []Offset -func (c *Compressed) Close() error { - c.r.Close() - return c.fd.Close() + offW *os.File + offReader sync.WaitGroup } -func (c *Compressed) Seek(offset int64, whence int) (int64, error) { - if whence != io.SeekStart { - panic("can only seek from the start") +func NewCompressedReader( + warcPath, unCmd string, + offsets []Offset, + uOffset int64, +) (*CompressedReader, error) { + var offZ, offU int64 + for _, off := range offsets { + if uOffset < offU+off.U { + break + } + offU += off.U + offZ += off.Z } - if _, err := io.CopyN(io.Discard, c, offset-c.offset); err != nil { - return 0, err + fd, err := os.Open(warcPath) + if err != nil { + return nil, err } - c.offset = offset - return c.offset, nil -} - -func Open(warcPath string) (io.ReadSeekCloser, error) { - ext := path.Ext(warcPath) - switch ext { - case ".warc": - return os.Open(warcPath) - case ".gz": - fd, err := os.Open(warcPath) - if err != nil { + var dict []byte + if len(offsets) > 0 && offsets[0].U == 0 { + dict = make([]byte, offsets[0].Z) + if _, err = io.ReadFull(fd, dict); err != nil { + fd.Close() return nil, err } - gzr, err := gzip.NewReader(fd) + } + if _, err = fd.Seek(offZ, io.SeekStart); err != nil { + fd.Close() + return nil, err + } + cmd := exec.Command(unCmd) + stdout, err := cmd.StdoutPipe() + if err != nil { + fd.Close() + return nil, err + } + if dict == nil { + cmd.Stdin = fd + } else { + cmd.Stdin = io.MultiReader(bytes.NewReader(dict), fd) + } + if offsets == nil { + offR, offW, err := os.Pipe() if err != nil { + fd.Close() return nil, err } - gzr.Multistream(true) - return &Compressed{r: gzr, fd: fd}, nil - case ".zst": - fd, err := os.Open(warcPath) + cmd.ExtraFiles = append(cmd.ExtraFiles, offW) + err = cmd.Start() if err != nil { + fd.Close() + offW.Close() return nil, err } - zstdr, err := zstd.NewReader(fd) + r := CompressedReader{ + cmd: cmd, + fd: fd, + stdout: stdout, + offW: offW, + } + r.offReader.Add(1) + go r.offsetsReader(offR) + return &r, nil + } + err = cmd.Start() + if err != nil { + fd.Close() + return nil, err + } + _, err = io.CopyN(io.Discard, stdout, uOffset-offU) + if err != nil { + cmd.Process.Kill() + fd.Close() + return nil, err + } + return &CompressedReader{cmd: cmd, fd: fd, stdout: stdout}, nil +} + +func (r *CompressedReader) offsetsReader(offsets *os.File) { + scanner := bufio.NewScanner(offsets) + for scanner.Scan() { + l := scanner.Text() + cols := strings.Split(l, "\t") + if len(cols) != 2 { + log.Println("len(cols) != 2:", l) + continue + } + z, err := strconv.ParseUint(cols[0], 10, 64) if err != nil { - return nil, err + log.Println(err) + continue + } + u, err := strconv.ParseUint(cols[1], 10, 64) + if err != nil { + log.Println(err) + continue } - return &Compressed{r: zstdr.IOReadCloser(), fd: fd}, nil + r.offsets = append(r.offsets, Offset{int64(z), int64(u)}) } - return nil, fmt.Errorf("unknown extensions: %s", ext) + err := scanner.Err() + if err != nil { + log.Println(err) + } + r.offReader.Done() +} + +func (r *CompressedReader) Read(p []byte) (int, error) { + return r.stdout.Read(p) +} + +func (r *CompressedReader) Close() error { + err := r.cmd.Process.Kill() + r.stdout.Close() + r.fd.Close() + r.offW.Close() + r.offReader.Wait() + return err +} + +func (r *CompressedReader) Offsets() []Offset { + return r.offsets } diff --git a/warc/gzip.go b/warc/gzip.go new file mode 100644 index 0000000..335a26e --- /dev/null +++ b/warc/gzip.go @@ -0,0 +1,123 @@ +/* +tofuproxy -- flexible HTTP/WARC proxy with TLS certificates management +Copyright (C) 2021 Sergey Matveev + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, version 3 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . +*/ + +package warc + +import ( + "bufio" + "compress/gzip" + "io" + "os" +) + +type CountableReader struct { + *bufio.Reader + n int64 +} + +func (r *CountableReader) ReadByte() (b byte, err error) { + b, err = r.Reader.ReadByte() + if err == nil { + r.n++ + } + return b, err +} + +func (r *CountableReader) Read(p []byte) (n int, err error) { + n, err = r.Reader.Read(p) + r.n += int64(n) + return +} + +type GZIPReader struct { + fd *os.File + r io.Reader + offsets []Offset +} + +func (r *GZIPReader) Read(p []byte) (n int, err error) { + return r.r.Read(p) +} + +func (r *GZIPReader) Close() error { + return r.fd.Close() +} + +func (r *GZIPReader) Offsets() []Offset { + return r.offsets +} + +func NewGZIPReader( + warcPath string, + offsets []Offset, + uOffset int64, +) (*GZIPReader, error) { + var offZ, offU int64 + for _, off := range offsets { + if uOffset < offU+off.U { + break + } + offU += off.U + offZ += off.Z + } + fd, err := os.Open(warcPath) + if err != nil { + return nil, err + } + if _, err = fd.Seek(offZ, io.SeekStart); err != nil { + fd.Close() + return nil, err + } + cr := &CountableReader{Reader: bufio.NewReader(fd)} + z, err := gzip.NewReader(cr) + if err != nil { + fd.Close() + return nil, err + } + r, w := io.Pipe() + gr := GZIPReader{r: r} + go func() { + z.Multistream(false) + var offset, offsetPrev int64 + for { + written, err := io.Copy(w, z) + if err != nil { + w.CloseWithError(err) + return + } + offset = cr.n + gr.offsets = append(gr.offsets, Offset{offset - offsetPrev, written}) + offsetPrev = offset + err = z.Reset(cr) + if err != nil { + if err == io.EOF { + break + } + w.CloseWithError(err) + return + } + z.Multistream(false) + } + w.CloseWithError(io.EOF) + }() + _, err = io.CopyN(io.Discard, r, uOffset-offU) + if err != nil { + fd.Close() + return nil, err + } + return &gr, nil +} diff --git a/warc/open.go b/warc/open.go new file mode 100644 index 0000000..22792d5 --- /dev/null +++ b/warc/open.go @@ -0,0 +1,49 @@ +/* +tofuproxy -- flexible HTTP/WARC proxy with TLS certificates management +Copyright (C) 2021 Sergey Matveev + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, version 3 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . +*/ + +package warc + +import ( + "fmt" + "io" + "path" +) + +var UnZSTDPath = "/home/stargrave/work/tofuproxy/cmd/unzstd/unzstd" + +type Offset struct { + Z int64 // Compressed frame size + U int64 // Its uncompressed size +} + +type RawRecordReader interface { + io.ReadCloser + Offsets() []Offset +} + +func Open(warcPath string, offsets []Offset, offset int64) (RawRecordReader, error) { + ext := path.Ext(warcPath) + switch ext { + case ".warc": + return NewUncompressedReader(warcPath, offset) + case ".gz": + return NewGZIPReader(warcPath, offsets, offset) + case ".zst": + return NewCompressedReader(warcPath, UnZSTDPath, offsets, offset) + } + return nil, fmt.Errorf("unknown extension: %s", ext) +} diff --git a/warc/reader.go b/warc/reader.go index e76bd51..06a123b 100644 --- a/warc/reader.go +++ b/warc/reader.go @@ -29,21 +29,22 @@ const CRLF = "\r\n" type Reader struct { Path string - r *bufio.Reader - rsc io.ReadSeekCloser + rrr RawRecordReader + br *bufio.Reader offset int64 prevRec *Record + offsets []Offset } func NewReader(warcPath string) (*Reader, error) { - rsc, err := Open(warcPath) + rrr, err := Open(warcPath, nil, 0) if err != nil { return nil, err } return &Reader{ Path: warcPath, - rsc: rsc, - r: bufio.NewReader(rsc), + rrr: rrr, + br: bufio.NewReader(rrr), }, nil } @@ -51,12 +52,12 @@ func (r *Reader) next() error { if r.prevRec == nil { return nil } - if _, err := r.r.Discard(int(r.prevRec.Size)); err != nil { + if _, err := r.br.Discard(int(r.prevRec.Size)); err != nil { return err } r.offset += int64(r.prevRec.HdrLen) + r.prevRec.Size for i := 0; i < 2; i++ { - line, err := r.r.ReadString('\n') + line, err := r.br.ReadString('\n') if err != nil { return err } @@ -68,43 +69,53 @@ func (r *Reader) next() error { return nil } -func (r *Reader) ReadRecord() (*Record, error) { +func (r *Reader) ReadRecord() (*Record, io.Reader, error) { r.next() - line, err := r.r.ReadString('\n') + line, err := r.br.ReadString('\n') if err != nil { - return nil, err + return nil, nil, err } if !strings.HasPrefix(line, "WARC/") { - return nil, fmt.Errorf("non-WARC header: %q", line) + return nil, nil, fmt.Errorf("non-WARC header: %q", line) } + hdrLines := []string{line} hdrLen := len(line) hdr := NewHeader() for { - line, err := r.r.ReadString('\n') + line, err := r.br.ReadString('\n') if err != nil { - return nil, err + return nil, nil, err } hdrLen += len(line) if line == CRLF { break } + hdrLines = append(hdrLines, line) hdr.AddLine(line) } size, err := strconv.ParseUint(hdr.Get("Content-Length"), 10, 64) if err != nil { - return nil, err + return nil, nil, err } rec := &Record{ WARCPath: r.Path, Offset: r.offset, + Size: int64(size), Hdr: hdr, HdrLen: hdrLen, - Size: int64(size), + HdrLines: hdrLines, } r.prevRec = rec - return rec, nil + return rec, &io.LimitedReader{R: r.br, N: int64(size)}, nil +} + +func (r *Reader) RecordWasRead() { + r.prevRec.HdrLen = 0 + r.prevRec.Size = 0 } func (r *Reader) Close() error { - return r.rsc.Close() + err := r.rrr.Close() + r.offsets = r.rrr.Offsets() + return err } diff --git a/warc/record.go b/warc/record.go index 2dd123d..a2fa6bc 100644 --- a/warc/record.go +++ b/warc/record.go @@ -25,9 +25,11 @@ import ( type Record struct { WARCPath string Offset int64 + Size int64 + Hdr Header HdrLen int - Size int64 + HdrLines []string Continuations []*Record } @@ -45,12 +47,12 @@ func (rec *Record) TotalSize() int64 { } type SelfRecordReader struct { - r *io.LimitedReader - rsc io.ReadSeekCloser + lr *io.LimitedReader + rrr io.ReadCloser } func (srr *SelfRecordReader) Read(p []byte) (n int, err error) { - n, err = srr.r.Read(p) + n, err = srr.rrr.Read(p) if err != nil { srr.Close() } @@ -58,23 +60,19 @@ func (srr *SelfRecordReader) Read(p []byte) (n int, err error) { } func (srr *SelfRecordReader) Close() error { - return srr.rsc.Close() + return srr.rrr.Close() } -func (rec *Record) selfReader(noHdr bool) (*SelfRecordReader, error) { - rsc, err := Open(rec.WARCPath) - if err != nil { - return nil, err - } +func (rec *Record) selfReader(noHdr bool, offsets []Offset) (*SelfRecordReader, error) { offset := rec.Offset if noHdr { offset += int64(rec.HdrLen) } - if _, err = rsc.Seek(offset, io.SeekStart); err != nil { - rsc.Close() + rrr, err := Open(rec.WARCPath, offsets, offset) + if err != nil { return nil, err } - return &SelfRecordReader{r: &io.LimitedReader{R: rsc, N: rec.Size}, rsc: rsc}, nil + return &SelfRecordReader{lr: &io.LimitedReader{R: rrr, N: rec.Size}, rrr: rrr}, nil } type RecordReader struct { @@ -82,14 +80,17 @@ type RecordReader struct { srrs []*SelfRecordReader } -func (rec *Record) Reader(noHdr bool) (*RecordReader, error) { +func (rec *Record) Reader( + noHdr bool, + warcOffsets map[string][]Offset, +) (*RecordReader, error) { srrs := make([]*SelfRecordReader, 0, 1+len(rec.Continuations)) rs := make([]io.Reader, 0, 1+len(rec.Continuations)) for i, r := range append([]*Record{rec}, rec.Continuations...) { if i > 0 { noHdr = true } - srr, err := r.selfReader(noHdr) + srr, err := r.selfReader(noHdr, warcOffsets[rec.WARCPath]) if err != nil { for _, srr := range srrs { srr.Close() diff --git a/warc/uncompressed.go b/warc/uncompressed.go new file mode 100644 index 0000000..5dd60c5 --- /dev/null +++ b/warc/uncompressed.go @@ -0,0 +1,51 @@ +/* +tofuproxy -- flexible HTTP/WARC proxy with TLS certificates management +Copyright (C) 2021 Sergey Matveev + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU General Public License as published by +the Free Software Foundation, version 3 of the License. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. + +You should have received a copy of the GNU General Public License +along with this program. If not, see . +*/ + +package warc + +import ( + "io" + "os" +) + +type UncompressedReader struct { + *os.File +} + +func NewUncompressedReader(warcPath string, offset int64) (*UncompressedReader, error) { + fd, err := os.Open(warcPath) + if err != nil { + return nil, err + } + if _, err = fd.Seek(offset, io.SeekStart); err != nil { + fd.Close() + return nil, err + } + return &UncompressedReader{File: fd}, nil +} + +func (r *UncompressedReader) Read(p []byte) (int, error) { + return r.File.Read(p) +} + +func (r *UncompressedReader) Close() error { + return r.File.Close() +} + +func (r *UncompressedReader) Offsets() []Offset { + return nil +} diff --git a/warc/uris.go b/warc/uris.go index a971ff0..4d8c1a3 100644 --- a/warc/uris.go +++ b/warc/uris.go @@ -25,13 +25,15 @@ import ( "os" "strconv" "sync" + "time" ) const IndexExt = ".idx.gob" var ( - WARCs = map[string]map[string]*Record{} - WARCsM sync.RWMutex + WARCs = map[string]map[string]*Record{} + WARCsOffsets = map[string][]Offset{} + WARCsM sync.RWMutex Incomplete = map[string]*Record{} ) @@ -41,12 +43,19 @@ func Add(warcPath string) error { if err == nil { defer fd.Close() var uris map[string]*Record - if err := gob.NewDecoder(fd).Decode(&uris); err != nil { + var offsets []Offset + dec := gob.NewDecoder(fd) + if err := dec.Decode(&uris); err != nil { + return err + } + if err := dec.Decode(&offsets); err != nil { return err } WARCsM.Lock() WARCs[warcPath] = uris + WARCsOffsets[warcPath] = offsets WARCsM.Unlock() + log.Println("loaded marshalled index:", warcPath+IndexExt) return nil } if err != nil && !os.IsNotExist(err) { @@ -59,13 +68,14 @@ func Add(warcPath string) error { defer r.Close() uris := map[string]*Record{} for { - rec, err := r.ReadRecord() + rec, _, err := r.ReadRecord() if err != nil { if err == io.EOF { break } return err } + rec.HdrLines = nil segNum := rec.Hdr.Get("WARC-Segment-Number") switch rec.Hdr.Get("WARC-Type") { case "response": @@ -93,15 +103,21 @@ func Add(warcPath string) error { } incomplete.Continuations = append(incomplete.Continuations, rec) if rec.Hdr.Get("WARC-Segment-Total-Length") != "" { - WARCsM.Lock() - WARCs[incomplete.WARCPath][incomplete.URI()] = incomplete - WARCsM.Unlock() + if incomplete.WARCPath == warcPath { + uris[incomplete.URI()] = incomplete + } else { + WARCsM.Lock() + WARCs[incomplete.WARCPath][incomplete.URI()] = incomplete + WARCsM.Unlock() + } delete(Incomplete, originID) } } } + r.Close() WARCsM.Lock() WARCs[warcPath] = uris + WARCsOffsets[warcPath] = r.offsets WARCsM.Unlock() return nil } @@ -114,20 +130,29 @@ func SaveIndexes() error { if _, err := os.Stat(p); err == nil { continue } + tmpSuffix := strconv.FormatInt(time.Now().UnixNano()+int64(os.Getpid()), 16) fd, err := os.OpenFile( - p+".tmp", - os.O_CREATE|os.O_WRONLY|os.O_EXCL, + p+tmpSuffix, + os.O_WRONLY|os.O_CREATE|os.O_EXCL, os.FileMode(0666), ) if err != nil { return err } - if err = gob.NewEncoder(fd).Encode(&uris); err != nil { + enc := gob.NewEncoder(fd) + if err = enc.Encode(&uris); err != nil { fd.Close() return err } - fd.Close() - if err = os.Rename(p+".tmp", p); err != nil { + offsets := WARCsOffsets[warcPath] + if err = enc.Encode(&offsets); err != nil { + fd.Close() + return err + } + if err = fd.Close(); err != nil { + return err + } + if err = os.Rename(p+tmpSuffix, p); err != nil { return err } log.Println("saved:", p) -- 2.44.0