+/*.cmd
/ccerts
/cert.pem
-/certgen.cmd
/certs
/prv.pem
-/tofuproxy.cmd
-/warc-extract.cmd
--- /dev/null
+/compile_flags.txt
+/enzstd
--- /dev/null
+printf "%s" "$CFLAGS $LDFLAGS" | tr " " "\n" | grep -v "^$" | sort | uniq
--- /dev/null
+/*
+tofuproxy -- flexible HTTP/WARC proxy with TLS certificates management
+Copyright (C) 2021 Sergey Matveev <stargrave@stargrave.org>
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, version 3 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <inttypes.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/endian.h>
+
+#include <zstd.h>
+
+int
+main(int argc, char **argv)
+{
+ ZSTD_CCtx *ctx = ZSTD_createCCtx();
+ if (ctx == NULL) {
+ fputs("can not initialize ZSTD_createCCtx\n", stderr);
+ return EXIT_FAILURE;
+ };
+ size_t zCode = ZSTD_CCtx_setParameter(ctx, ZSTD_c_checksumFlag, 1);
+ if (ZSTD_isError(zCode)) {
+ fprintf(stderr, "can not setParameter: %s\n", ZSTD_getErrorName(zCode));
+ return EXIT_FAILURE;
+ };
+ zCode = ZSTD_CCtx_setParameter(ctx, ZSTD_c_compressionLevel, ZSTD_maxCLevel());
+ if (ZSTD_isError(zCode)) {
+ fprintf(stderr, "can not setParameter: %s\n", ZSTD_getErrorName(zCode));
+ return EXIT_FAILURE;
+ };
+
+ size_t srcSize = 8;
+ uint8_t *src = malloc(srcSize);
+ if (src == NULL) {
+ fprintf(stderr, "can not allocate memory: %zu\n", srcSize);
+ return EXIT_FAILURE;
+ };
+ size_t dstSize = 0;
+ uint8_t *dst = NULL;
+ size_t srcWantSize = 0;
+ size_t dstWantSize = 0;
+ size_t n = 0;
+ for (;;) {
+ n = fread(src, 1, 8, stdin);
+ if (n < 8) {
+ if (feof(stdin)) {
+ break;
+ };
+ perror("can not fread(stdin)");
+ return EXIT_FAILURE;
+ };
+ srcWantSize = (size_t)be64dec(src);
+ if (srcWantSize > srcSize) {
+ free(src);
+ srcSize = srcWantSize;
+ src = malloc(srcSize);
+ if (src == NULL) {
+ fprintf(stderr, "can not allocate memory: %zu\n", srcSize);
+ return EXIT_FAILURE;
+ };
+ };
+ n = fread(src, 1, srcWantSize, stdin);
+ if (n < srcWantSize) {
+ fprintf(stderr, "IS: %zu %zu\n", n, srcWantSize);
+ fputs("insufficient data fed\n", stderr);
+ return EXIT_FAILURE;
+ };
+ dstWantSize = ZSTD_compressBound(srcWantSize);
+ if (dstWantSize > dstSize) {
+ free(dst);
+ dstSize = dstWantSize;
+ dst = malloc(dstSize);
+ if (dst == NULL) {
+ fprintf(stderr, "can not allocate memory: %zu\n", dstSize);
+ return EXIT_FAILURE;
+ };
+ };
+ zCode = ZSTD_compress2(ctx, dst, dstSize, src, srcWantSize);
+ if (ZSTD_isError(zCode)) {
+ fprintf(stderr, "can not compress: %s\n", ZSTD_getErrorName(zCode));
+ return EXIT_FAILURE;
+ };
+ n = fwrite(dst, 1, zCode, stdout);
+ if (n < zCode) {
+ perror("can not fwrite(stdout)");
+ return EXIT_FAILURE;
+ };
+ };
+ free(dst);
+ free(src);
+ ZSTD_freeCCtx(ctx);
+ return EXIT_SUCCESS;
+};
--- /dev/null
+src=enzstd.c
+redo-ifchange $src
+${CC:-cc} ${CFLAGS} -o $3 $src $LDFLAGS -lzstd
--- /dev/null
+/*
+tofuproxy -- flexible HTTP/WARC proxy with TLS certificates management
+Copyright (C) 2021 Sergey Matveev <stargrave@stargrave.org>
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, version 3 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+package main
+
+import (
+ "bufio"
+ "compress/gzip"
+ "fmt"
+ "io"
+ "log"
+ "os"
+)
+
+type CountableReader struct {
+ *bufio.Reader
+ n int64
+}
+
+func (r *CountableReader) ReadByte() (b byte, err error) {
+ b, err = r.Reader.ReadByte()
+ if err == nil {
+ r.n++
+ }
+ return b, err
+}
+
+func (r *CountableReader) Read(p []byte) (n int, err error) {
+ n, err = r.Reader.Read(p)
+ r.n += int64(n)
+ return
+}
+
+func main() {
+ log.SetFlags(log.Lshortfile)
+ fdOff := os.NewFile(3, "fdOff")
+ br := bufio.NewReader(os.Stdin)
+ cr := &CountableReader{Reader: br}
+ bw := bufio.NewWriter(os.Stdout)
+ z, err := gzip.NewReader(cr)
+ if err != nil {
+ log.Fatalln(err)
+ }
+ z.Multistream(false)
+ var offset, offsetPrev int64
+ for {
+ written, err := io.Copy(bw, z)
+ if err != nil {
+ log.Fatalln(err)
+ }
+ offset = cr.n
+ if fdOff != nil {
+ fmt.Fprintf(fdOff, "%d\t%d\n", offset-offsetPrev, written)
+ }
+ offsetPrev = offset
+ err = z.Reset(cr)
+ if err != nil {
+ if err == io.EOF {
+ break
+ }
+ log.Fatalln(err)
+ }
+ z.Multistream(false)
+ }
+}
--- /dev/null
+/compile_flags.txt
+/unzstd
--- /dev/null
+printf "%s" "$CFLAGS $LDFLAGS" | tr " " "\n" | grep -v "^$" | sort | uniq
--- /dev/null
+/*
+tofuproxy -- flexible HTTP/WARC proxy with TLS certificates management
+Copyright (C) 2021 Sergey Matveev <stargrave@stargrave.org>
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, version 3 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+// https://iipc.github.io/warc-specifications/specifications/warc-zstd/
+
+#include <errno.h>
+#include <inttypes.h>
+#include <stdbool.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/endian.h>
+
+#include <zstd.h>
+
+int
+main(int argc, char **argv)
+{
+ ZSTD_DCtx *ctx = ZSTD_createDCtx();
+ if (ctx == NULL) {
+ fputs("can not initialize ZSTD_DCtx\n", stderr);
+ return 1;
+ };
+ FILE *fdOff = fdopen(3, "wb");
+ int rc = EXIT_FAILURE;
+ uint8_t *bufIn = NULL;
+ uint8_t *bufOut = NULL;
+ const size_t bufInSize = ZSTD_DStreamInSize();
+ bufIn = malloc(bufInSize);
+ if (bufIn == NULL) {
+ fputs("no memory\n", stderr);
+ goto Exit;
+ };
+ const size_t bufOutSize = ZSTD_DStreamOutSize();
+ bufOut = malloc(bufOutSize);
+ if (bufOut == NULL) {
+ fputs("no memory\n", stderr);
+ goto Exit;
+ };
+
+ unsigned long long bufSize = 0;
+
+ ZSTD_inBuffer bIn = {bufIn, 0, 0};
+ ZSTD_outBuffer bOut = {bufOut, 0, 0};
+
+ bool isEmpty = true;
+ bool lastBlock = false;
+ size_t n = 0;
+ size_t written = 0;
+ size_t offset = 0;
+ size_t offsetPrev = 0;
+ size_t zCode = 0;
+ReadAgain:
+ for (;;) {
+ n = fread(bufIn, 1, bufInSize, stdin);
+ if (n != bufInSize) {
+ if (feof(stdin)) {
+ lastBlock = true;
+ } else {
+ perror("can not fread(FILE)");
+ goto Exit;
+ };
+ };
+ if (n >= 8 && le32dec(bufIn) == 0x184D2A5D) {
+ // dictionary frame
+ size_t dictSize = (size_t)le32dec(bufIn + 4);
+ uint8_t *dict = malloc(dictSize);
+ if (dict == NULL) {
+ fprintf(stderr, "insufficient memory for dictionary: %zu\n", dictSize);
+ goto Exit;
+ };
+ const size_t alreadyRead = n - 8;
+ memcpy(dict, bufIn + 8, alreadyRead);
+ errno = 0;
+ n = fread(dict + alreadyRead, 1, dictSize - alreadyRead, stdin);
+ if (n != dictSize - alreadyRead) {
+ perror("can not read dictionary data");
+ free(dict);
+ goto Exit;
+ };
+ offset = dictSize + 8;
+ offsetPrev = offset;
+ if (fdOff != NULL) {
+ fprintf(fdOff, "%zu\t0\n", offset);
+ };
+ uint32_t hdr = le32dec(dict);
+ switch (hdr) {
+ case ZSTD_MAGIC_DICTIONARY:
+ zCode = ZSTD_DCtx_loadDictionary(ctx, dict, dictSize);
+ free(dict);
+ if ((zCode != 0) && (ZSTD_isError(zCode))) {
+ fprintf(
+ stderr,
+ "can not load dictionary: %s\n",
+ ZSTD_getErrorName(zCode));
+ goto Exit;
+ };
+ goto ReadAgain;
+ break;
+ case ZSTD_MAGICNUMBER:
+ bufSize = ZSTD_getFrameContentSize(dict, dictSize);
+ switch (bufSize) {
+ case ZSTD_CONTENTSIZE_UNKNOWN:
+ case ZSTD_CONTENTSIZE_ERROR:
+ fprintf(stderr, "can not determine dictionary's size\n");
+ free(dict);
+ goto Exit;
+ };
+ uint8_t *buf = malloc(bufSize);
+ if (buf == NULL) {
+ fprintf(
+ stderr, "insufficient memory for dictionary: %llu\n", bufSize);
+ free(dict);
+ goto Exit;
+ };
+ zCode = ZSTD_decompress(buf, bufSize, dict, dictSize);
+ free(dict);
+ if (ZSTD_isError(zCode)) {
+ fprintf(
+ stderr,
+ "can not decompress dictionary: %s\n",
+ ZSTD_getErrorName(zCode));
+ free(buf);
+ goto Exit;
+ };
+ zCode = ZSTD_DCtx_loadDictionary(ctx, buf, zCode);
+ free(buf);
+ if ((zCode != 0) && (ZSTD_isError(zCode))) {
+ fprintf(
+ stderr,
+ "can not load dictionary: %s\n",
+ ZSTD_getErrorName(zCode));
+ goto Exit;
+ };
+ goto ReadAgain;
+ break;
+ default:
+ fprintf(stderr, "unknown dictionary header\n");
+ free(dict);
+ goto Exit;
+ };
+ };
+ isEmpty = false;
+ bIn.size = n;
+ bIn.pos = 0;
+ while (bIn.pos < bIn.size) {
+ bOut.size = bufOutSize;
+ bOut.pos = 0;
+ zCode = ZSTD_decompressStream(ctx, &bOut, &bIn);
+ if ((zCode != 0) && (ZSTD_isError(zCode))) {
+ fprintf(stderr, "can not decompress: %s\n", ZSTD_getErrorName(zCode));
+ goto Exit;
+ };
+ n = fwrite(bufOut, 1, bOut.pos, stdout);
+ if (n != bOut.pos) {
+ perror("can not fwrite(stdout)");
+ goto Exit;
+ };
+ written += n;
+ if (zCode == 0) {
+ offset += bIn.pos;
+ if (fdOff != NULL) {
+ fprintf(fdOff, "%zu\t%zu\n", offset - offsetPrev, written);
+ };
+ offsetPrev = offset + bIn.pos;
+ written = 0;
+ };
+ };
+ if (lastBlock) {
+ break;
+ };
+ offset += bIn.pos;
+ };
+
+ if (isEmpty) {
+ fputs("empty input\n", stderr);
+ goto Exit;
+ };
+ if (zCode != 0) {
+ fprintf(stderr, "unfinished decompression: %s\n", ZSTD_getErrorName(zCode));
+ goto Exit;
+ };
+ rc = EXIT_SUCCESS;
+
+Exit:
+ if (bufOut != NULL) {
+ free(bufOut);
+ };
+ if (bufIn != NULL) {
+ free(bufIn);
+ };
+ ZSTD_freeDCtx(ctx);
+ if ((fdOff != NULL) && (fclose(fdOff) != 0)) {
+ perror("can not fclose(4)");
+ return EXIT_FAILURE;
+ };
+ return rc;
+};
--- /dev/null
+src=unzstd.c
+redo-ifchange $src
+${CC:-cc} ${CFLAGS} -o $3 $src $LDFLAGS -lzstd
package main
import (
+ "bufio"
+ "bytes"
+ "encoding/binary"
"flag"
"fmt"
"io"
uri := flag.String("uri", "", "URI to extract, if specified")
hdr := flag.Bool("hdr", false, "Also extract WARC's header")
idx := flag.Bool("idx", false, "Save WARC indexes")
+ recompress := flag.Bool("for-enzstd", false, "Output for enzstd utility")
flag.Parse()
log.SetFlags(log.Lshortfile)
+ if *recompress {
+ var hdr bytes.Buffer
+ size := make([]byte, 8)
+ bw := bufio.NewWriter(os.Stdout)
+ for _, p := range flag.Args() {
+ r, err := warc.NewReader(p)
+ if err != nil {
+ log.Fatalln(err)
+ }
+ for {
+ rec, rr, err := r.ReadRecord()
+ if err != nil {
+ if err == io.EOF {
+ break
+ }
+ log.Fatalln(err)
+ }
+ for _, line := range rec.HdrLines {
+ hdr.WriteString(line)
+ }
+ hdr.WriteString("\r\n")
+ binary.BigEndian.PutUint64(size, uint64(hdr.Len())+uint64(rec.Size)+4)
+ if _, err = bw.Write(size); err != nil {
+ log.Fatalln(err)
+ }
+ if _, err = io.Copy(bw, &hdr); err != nil {
+ log.Fatalln(err)
+ }
+ if _, err = io.Copy(bw, rr); err != nil {
+ log.Fatalln(err)
+ }
+ r.RecordWasRead()
+ if _, err = bw.Write([]byte("\r\n\r\n")); err != nil {
+ log.Fatalln(err)
+ }
+ }
+ }
+ if err := bw.Flush(); err != nil {
+ log.Fatalln(err)
+ }
+ return
+ }
+
for _, p := range flag.Args() {
log.Println("adding", p)
if err := warc.Add(p); err != nil {
if rec == nil {
continue
}
- r, err := rec.Reader(!*hdr)
+ r, err := rec.Reader(!*hdr, warc.WARCsOffsets)
if err != nil {
log.Fatalln(err)
}
There is no strict validation or checking of WARCs correctness at all!
But built-in WARC support seems to be good enough for various sources.
-Uncompressed, @command{gzip} (multiple streams and single stream are
-supported) and @command{zstd} compressed ones are supported.
+Following formats are supported:
-Searching in compressed files is @strong{slow} -- every request will
-lead to decompression of the file from the very beginning, so keeping
-uncompressed WARCs on compressed ZFS dataset is much more preferable.
-@command{tofuproxy} does not take advantage of multistream gzip files.
+@table @asis
+
+@item @file{.warc}
+Ordinary uncompressed WARC. Useful to be stored on transparently
+compressed ZFS dataset.
+
+@item @command{.warc.gz}
+GZIP compressed WARC. Multi-stream (multi-segment) formats are also
+supported and properly indexed.
+
+@item @command{.warc.zst}
+Zstandard compressed WARC, as in
+@url{https://iipc.github.io/warc-specifications/specifications/warc-zstd/, specification}.
+Multi-frame format is properly indexed. Dictionary at the beginning
+is also supported.
+
+It is processed with with @command{unzstd} (@command{redo
+cmd/unzstd/unzstd}) utility. It eats compressed stream from
+@code{stdin}, outputs decompressed data to @code{stdout}, and prints
+each frame size with corresponding decompressed data size to 3rd file
+descriptor (if it is opened). You can adjust path to it with @code{-X
+go.stargrave.org/tofuproxy/warc.UnZSTDPath} command line option during
+building.
+
+@end table
@itemize
Loading of WARC involves its whole reading and remembering where is each
URI response is located. You can @code{echo SAVE > fifos/add-warcs} to
-save in-memory index to the disk as @file{....warc.idx.gob} file. During
+save in-memory index to the disk as @file{....idx.gob} file. During
the next load, if that file exists, it is used as index immediately,
-without expensive WARC reading.
+without expensive WARC parsing.
@code{redo warc-extract.cmd} builds @command{warc-extract.cmd} utility,
that uses exactly the same code for parsing WARCs. It can be used to
smth.warc-00002.warc.gz
@end example
+Following example can be used to create multi-frame @file{.warc.zst}
+from any kind of already existing WARCs. It has better compression ratio
+and much higher decompression speed.
+
+@example
+$ redo cmd/enzstd/enzstd
+$ ./warc-extract.cmd -for-enzstd /path/to.warc.gz |
+ cmd/enzstd/enzstd > /path/to.warc.zst
+@end example
+
@url{https://www.gnu.org/software/wget/, GNU Wget} can be easily used to
create WARCs:
go addWARC(filepath.Join(fifos, "add-warcs"))
go del(
- &warc.WARCsM, func(warcPath string) { delete(warc.WARCs, warcPath) },
+ &warc.WARCsM, func(warcPath string) {
+ delete(warc.WARCs, warcPath)
+ delete(warc.WARCsOffsets, warcPath)
+ },
filepath.Join(fifos, "del-warcs"),
)
}
}
warc.WARCsM.RLock()
for warcPath, uris := range warc.WARCs {
- fmt.Fprintf(fd, "%s\t%d\n", warcPath, len(uris))
+ fmt.Fprintf(
+ fd, "%s\t%d\t%d\n",
+ warcPath, len(uris), len(warc.WARCs[warcPath]),
+ )
}
warc.WARCsM.RUnlock()
fd.Close()
log.Printf("%s: can not open %s: %+v\n", p, warcPath, err)
break
}
- log.Printf("%s: %s: added %d URIs\n", p, warcPath, len(warc.WARCs[warcPath]))
+ log.Printf(
+ "%s: %s: added %d URIs %d segments\n",
+ p, warcPath,
+ len(warc.WARCs[warcPath]),
+ len(warc.WARCsOffsets[warcPath]),
+ )
}
}
}
require (
github.com/dustin/go-humanize v1.0.0
- github.com/klauspost/compress v1.13.6
github.com/miekg/dns v1.1.29
go.cypherpunks.ru/tai64n/v2 v2.0.0
go.cypherpunks.ru/ucspi v0.0.0-20210908140534-cfdc20a8225f
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
-github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc=
-github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/miekg/dns v1.1.29 h1:xHBEhR+t5RzcFJjBLJlax2daXOrTYtr9z4WdKEfWFzg=
github.com/miekg/dns v1.1.29/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7xM=
go.cypherpunks.ru/tai64n/v2 v2.0.0 h1:AlohA1/zRqInhIGK7CVnn7tC5/vt1TaOAEyBgeu5Ruo=
return true, nil
}
- wr, err := rec.Reader(true)
+ wr, err := rec.Reader(true, warc.WARCsOffsets)
if err != nil {
log.Printf("WARC: error during %s: %+v\n", req.URL, err)
return false, err
}
+ defer wr.Close()
hj, ok := w.(http.Hijacker)
if !ok {
http.Error(w, "can not hijack", http.StatusInternalServerError)
package warc
import (
- "compress/gzip"
- "fmt"
+ "bufio"
+ "bytes"
"io"
+ "log"
"os"
- "path"
-
- "github.com/klauspost/compress/zstd"
+ "os/exec"
+ "strconv"
+ "strings"
+ "sync"
)
-type Compressed struct {
- r io.ReadCloser
- fd *os.File
- offset int64
-}
-
-func (c *Compressed) Read(p []byte) (int, error) {
- n, err := c.r.Read(p)
- c.offset += int64(n)
- return n, err
-}
+type CompressedReader struct {
+ cmd *exec.Cmd
+ fd *os.File
+ stdout io.ReadCloser
+ offsets []Offset
-func (c *Compressed) Close() error {
- c.r.Close()
- return c.fd.Close()
+ offW *os.File
+ offReader sync.WaitGroup
}
-func (c *Compressed) Seek(offset int64, whence int) (int64, error) {
- if whence != io.SeekStart {
- panic("can only seek from the start")
+func NewCompressedReader(
+ warcPath, unCmd string,
+ offsets []Offset,
+ uOffset int64,
+) (*CompressedReader, error) {
+ var offZ, offU int64
+ for _, off := range offsets {
+ if uOffset < offU+off.U {
+ break
+ }
+ offU += off.U
+ offZ += off.Z
}
- if _, err := io.CopyN(io.Discard, c, offset-c.offset); err != nil {
- return 0, err
+ fd, err := os.Open(warcPath)
+ if err != nil {
+ return nil, err
}
- c.offset = offset
- return c.offset, nil
-}
-
-func Open(warcPath string) (io.ReadSeekCloser, error) {
- ext := path.Ext(warcPath)
- switch ext {
- case ".warc":
- return os.Open(warcPath)
- case ".gz":
- fd, err := os.Open(warcPath)
- if err != nil {
+ var dict []byte
+ if len(offsets) > 0 && offsets[0].U == 0 {
+ dict = make([]byte, offsets[0].Z)
+ if _, err = io.ReadFull(fd, dict); err != nil {
+ fd.Close()
return nil, err
}
- gzr, err := gzip.NewReader(fd)
+ }
+ if _, err = fd.Seek(offZ, io.SeekStart); err != nil {
+ fd.Close()
+ return nil, err
+ }
+ cmd := exec.Command(unCmd)
+ stdout, err := cmd.StdoutPipe()
+ if err != nil {
+ fd.Close()
+ return nil, err
+ }
+ if dict == nil {
+ cmd.Stdin = fd
+ } else {
+ cmd.Stdin = io.MultiReader(bytes.NewReader(dict), fd)
+ }
+ if offsets == nil {
+ offR, offW, err := os.Pipe()
if err != nil {
+ fd.Close()
return nil, err
}
- gzr.Multistream(true)
- return &Compressed{r: gzr, fd: fd}, nil
- case ".zst":
- fd, err := os.Open(warcPath)
+ cmd.ExtraFiles = append(cmd.ExtraFiles, offW)
+ err = cmd.Start()
if err != nil {
+ fd.Close()
+ offW.Close()
return nil, err
}
- zstdr, err := zstd.NewReader(fd)
+ r := CompressedReader{
+ cmd: cmd,
+ fd: fd,
+ stdout: stdout,
+ offW: offW,
+ }
+ r.offReader.Add(1)
+ go r.offsetsReader(offR)
+ return &r, nil
+ }
+ err = cmd.Start()
+ if err != nil {
+ fd.Close()
+ return nil, err
+ }
+ _, err = io.CopyN(io.Discard, stdout, uOffset-offU)
+ if err != nil {
+ cmd.Process.Kill()
+ fd.Close()
+ return nil, err
+ }
+ return &CompressedReader{cmd: cmd, fd: fd, stdout: stdout}, nil
+}
+
+func (r *CompressedReader) offsetsReader(offsets *os.File) {
+ scanner := bufio.NewScanner(offsets)
+ for scanner.Scan() {
+ l := scanner.Text()
+ cols := strings.Split(l, "\t")
+ if len(cols) != 2 {
+ log.Println("len(cols) != 2:", l)
+ continue
+ }
+ z, err := strconv.ParseUint(cols[0], 10, 64)
if err != nil {
- return nil, err
+ log.Println(err)
+ continue
+ }
+ u, err := strconv.ParseUint(cols[1], 10, 64)
+ if err != nil {
+ log.Println(err)
+ continue
}
- return &Compressed{r: zstdr.IOReadCloser(), fd: fd}, nil
+ r.offsets = append(r.offsets, Offset{int64(z), int64(u)})
}
- return nil, fmt.Errorf("unknown extensions: %s", ext)
+ err := scanner.Err()
+ if err != nil {
+ log.Println(err)
+ }
+ r.offReader.Done()
+}
+
+func (r *CompressedReader) Read(p []byte) (int, error) {
+ return r.stdout.Read(p)
+}
+
+func (r *CompressedReader) Close() error {
+ err := r.cmd.Process.Kill()
+ r.stdout.Close()
+ r.fd.Close()
+ r.offW.Close()
+ r.offReader.Wait()
+ return err
+}
+
+func (r *CompressedReader) Offsets() []Offset {
+ return r.offsets
}
--- /dev/null
+/*
+tofuproxy -- flexible HTTP/WARC proxy with TLS certificates management
+Copyright (C) 2021 Sergey Matveev <stargrave@stargrave.org>
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, version 3 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+package warc
+
+import (
+ "bufio"
+ "compress/gzip"
+ "io"
+ "os"
+)
+
+type CountableReader struct {
+ *bufio.Reader
+ n int64
+}
+
+func (r *CountableReader) ReadByte() (b byte, err error) {
+ b, err = r.Reader.ReadByte()
+ if err == nil {
+ r.n++
+ }
+ return b, err
+}
+
+func (r *CountableReader) Read(p []byte) (n int, err error) {
+ n, err = r.Reader.Read(p)
+ r.n += int64(n)
+ return
+}
+
+type GZIPReader struct {
+ fd *os.File
+ r io.Reader
+ offsets []Offset
+}
+
+func (r *GZIPReader) Read(p []byte) (n int, err error) {
+ return r.r.Read(p)
+}
+
+func (r *GZIPReader) Close() error {
+ return r.fd.Close()
+}
+
+func (r *GZIPReader) Offsets() []Offset {
+ return r.offsets
+}
+
+func NewGZIPReader(
+ warcPath string,
+ offsets []Offset,
+ uOffset int64,
+) (*GZIPReader, error) {
+ var offZ, offU int64
+ for _, off := range offsets {
+ if uOffset < offU+off.U {
+ break
+ }
+ offU += off.U
+ offZ += off.Z
+ }
+ fd, err := os.Open(warcPath)
+ if err != nil {
+ return nil, err
+ }
+ if _, err = fd.Seek(offZ, io.SeekStart); err != nil {
+ fd.Close()
+ return nil, err
+ }
+ cr := &CountableReader{Reader: bufio.NewReader(fd)}
+ z, err := gzip.NewReader(cr)
+ if err != nil {
+ fd.Close()
+ return nil, err
+ }
+ r, w := io.Pipe()
+ gr := GZIPReader{r: r}
+ go func() {
+ z.Multistream(false)
+ var offset, offsetPrev int64
+ for {
+ written, err := io.Copy(w, z)
+ if err != nil {
+ w.CloseWithError(err)
+ return
+ }
+ offset = cr.n
+ gr.offsets = append(gr.offsets, Offset{offset - offsetPrev, written})
+ offsetPrev = offset
+ err = z.Reset(cr)
+ if err != nil {
+ if err == io.EOF {
+ break
+ }
+ w.CloseWithError(err)
+ return
+ }
+ z.Multistream(false)
+ }
+ w.CloseWithError(io.EOF)
+ }()
+ _, err = io.CopyN(io.Discard, r, uOffset-offU)
+ if err != nil {
+ fd.Close()
+ return nil, err
+ }
+ return &gr, nil
+}
--- /dev/null
+/*
+tofuproxy -- flexible HTTP/WARC proxy with TLS certificates management
+Copyright (C) 2021 Sergey Matveev <stargrave@stargrave.org>
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, version 3 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+package warc
+
+import (
+ "fmt"
+ "io"
+ "path"
+)
+
+var UnZSTDPath = "/home/stargrave/work/tofuproxy/cmd/unzstd/unzstd"
+
+type Offset struct {
+ Z int64 // Compressed frame size
+ U int64 // Its uncompressed size
+}
+
+type RawRecordReader interface {
+ io.ReadCloser
+ Offsets() []Offset
+}
+
+func Open(warcPath string, offsets []Offset, offset int64) (RawRecordReader, error) {
+ ext := path.Ext(warcPath)
+ switch ext {
+ case ".warc":
+ return NewUncompressedReader(warcPath, offset)
+ case ".gz":
+ return NewGZIPReader(warcPath, offsets, offset)
+ case ".zst":
+ return NewCompressedReader(warcPath, UnZSTDPath, offsets, offset)
+ }
+ return nil, fmt.Errorf("unknown extension: %s", ext)
+}
type Reader struct {
Path string
- r *bufio.Reader
- rsc io.ReadSeekCloser
+ rrr RawRecordReader
+ br *bufio.Reader
offset int64
prevRec *Record
+ offsets []Offset
}
func NewReader(warcPath string) (*Reader, error) {
- rsc, err := Open(warcPath)
+ rrr, err := Open(warcPath, nil, 0)
if err != nil {
return nil, err
}
return &Reader{
Path: warcPath,
- rsc: rsc,
- r: bufio.NewReader(rsc),
+ rrr: rrr,
+ br: bufio.NewReader(rrr),
}, nil
}
if r.prevRec == nil {
return nil
}
- if _, err := r.r.Discard(int(r.prevRec.Size)); err != nil {
+ if _, err := r.br.Discard(int(r.prevRec.Size)); err != nil {
return err
}
r.offset += int64(r.prevRec.HdrLen) + r.prevRec.Size
for i := 0; i < 2; i++ {
- line, err := r.r.ReadString('\n')
+ line, err := r.br.ReadString('\n')
if err != nil {
return err
}
return nil
}
-func (r *Reader) ReadRecord() (*Record, error) {
+func (r *Reader) ReadRecord() (*Record, io.Reader, error) {
r.next()
- line, err := r.r.ReadString('\n')
+ line, err := r.br.ReadString('\n')
if err != nil {
- return nil, err
+ return nil, nil, err
}
if !strings.HasPrefix(line, "WARC/") {
- return nil, fmt.Errorf("non-WARC header: %q", line)
+ return nil, nil, fmt.Errorf("non-WARC header: %q", line)
}
+ hdrLines := []string{line}
hdrLen := len(line)
hdr := NewHeader()
for {
- line, err := r.r.ReadString('\n')
+ line, err := r.br.ReadString('\n')
if err != nil {
- return nil, err
+ return nil, nil, err
}
hdrLen += len(line)
if line == CRLF {
break
}
+ hdrLines = append(hdrLines, line)
hdr.AddLine(line)
}
size, err := strconv.ParseUint(hdr.Get("Content-Length"), 10, 64)
if err != nil {
- return nil, err
+ return nil, nil, err
}
rec := &Record{
WARCPath: r.Path,
Offset: r.offset,
+ Size: int64(size),
Hdr: hdr,
HdrLen: hdrLen,
- Size: int64(size),
+ HdrLines: hdrLines,
}
r.prevRec = rec
- return rec, nil
+ return rec, &io.LimitedReader{R: r.br, N: int64(size)}, nil
+}
+
+func (r *Reader) RecordWasRead() {
+ r.prevRec.HdrLen = 0
+ r.prevRec.Size = 0
}
func (r *Reader) Close() error {
- return r.rsc.Close()
+ err := r.rrr.Close()
+ r.offsets = r.rrr.Offsets()
+ return err
}
type Record struct {
WARCPath string
Offset int64
+ Size int64
+
Hdr Header
HdrLen int
- Size int64
+ HdrLines []string
Continuations []*Record
}
}
type SelfRecordReader struct {
- r *io.LimitedReader
- rsc io.ReadSeekCloser
+ lr *io.LimitedReader
+ rrr io.ReadCloser
}
func (srr *SelfRecordReader) Read(p []byte) (n int, err error) {
- n, err = srr.r.Read(p)
+ n, err = srr.rrr.Read(p)
if err != nil {
srr.Close()
}
}
func (srr *SelfRecordReader) Close() error {
- return srr.rsc.Close()
+ return srr.rrr.Close()
}
-func (rec *Record) selfReader(noHdr bool) (*SelfRecordReader, error) {
- rsc, err := Open(rec.WARCPath)
- if err != nil {
- return nil, err
- }
+func (rec *Record) selfReader(noHdr bool, offsets []Offset) (*SelfRecordReader, error) {
offset := rec.Offset
if noHdr {
offset += int64(rec.HdrLen)
}
- if _, err = rsc.Seek(offset, io.SeekStart); err != nil {
- rsc.Close()
+ rrr, err := Open(rec.WARCPath, offsets, offset)
+ if err != nil {
return nil, err
}
- return &SelfRecordReader{r: &io.LimitedReader{R: rsc, N: rec.Size}, rsc: rsc}, nil
+ return &SelfRecordReader{lr: &io.LimitedReader{R: rrr, N: rec.Size}, rrr: rrr}, nil
}
type RecordReader struct {
srrs []*SelfRecordReader
}
-func (rec *Record) Reader(noHdr bool) (*RecordReader, error) {
+func (rec *Record) Reader(
+ noHdr bool,
+ warcOffsets map[string][]Offset,
+) (*RecordReader, error) {
srrs := make([]*SelfRecordReader, 0, 1+len(rec.Continuations))
rs := make([]io.Reader, 0, 1+len(rec.Continuations))
for i, r := range append([]*Record{rec}, rec.Continuations...) {
if i > 0 {
noHdr = true
}
- srr, err := r.selfReader(noHdr)
+ srr, err := r.selfReader(noHdr, warcOffsets[rec.WARCPath])
if err != nil {
for _, srr := range srrs {
srr.Close()
--- /dev/null
+/*
+tofuproxy -- flexible HTTP/WARC proxy with TLS certificates management
+Copyright (C) 2021 Sergey Matveev <stargrave@stargrave.org>
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, version 3 of the License.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+package warc
+
+import (
+ "io"
+ "os"
+)
+
+type UncompressedReader struct {
+ *os.File
+}
+
+func NewUncompressedReader(warcPath string, offset int64) (*UncompressedReader, error) {
+ fd, err := os.Open(warcPath)
+ if err != nil {
+ return nil, err
+ }
+ if _, err = fd.Seek(offset, io.SeekStart); err != nil {
+ fd.Close()
+ return nil, err
+ }
+ return &UncompressedReader{File: fd}, nil
+}
+
+func (r *UncompressedReader) Read(p []byte) (int, error) {
+ return r.File.Read(p)
+}
+
+func (r *UncompressedReader) Close() error {
+ return r.File.Close()
+}
+
+func (r *UncompressedReader) Offsets() []Offset {
+ return nil
+}
"os"
"strconv"
"sync"
+ "time"
)
const IndexExt = ".idx.gob"
var (
- WARCs = map[string]map[string]*Record{}
- WARCsM sync.RWMutex
+ WARCs = map[string]map[string]*Record{}
+ WARCsOffsets = map[string][]Offset{}
+ WARCsM sync.RWMutex
Incomplete = map[string]*Record{}
)
if err == nil {
defer fd.Close()
var uris map[string]*Record
- if err := gob.NewDecoder(fd).Decode(&uris); err != nil {
+ var offsets []Offset
+ dec := gob.NewDecoder(fd)
+ if err := dec.Decode(&uris); err != nil {
+ return err
+ }
+ if err := dec.Decode(&offsets); err != nil {
return err
}
WARCsM.Lock()
WARCs[warcPath] = uris
+ WARCsOffsets[warcPath] = offsets
WARCsM.Unlock()
+ log.Println("loaded marshalled index:", warcPath+IndexExt)
return nil
}
if err != nil && !os.IsNotExist(err) {
defer r.Close()
uris := map[string]*Record{}
for {
- rec, err := r.ReadRecord()
+ rec, _, err := r.ReadRecord()
if err != nil {
if err == io.EOF {
break
}
return err
}
+ rec.HdrLines = nil
segNum := rec.Hdr.Get("WARC-Segment-Number")
switch rec.Hdr.Get("WARC-Type") {
case "response":
}
incomplete.Continuations = append(incomplete.Continuations, rec)
if rec.Hdr.Get("WARC-Segment-Total-Length") != "" {
- WARCsM.Lock()
- WARCs[incomplete.WARCPath][incomplete.URI()] = incomplete
- WARCsM.Unlock()
+ if incomplete.WARCPath == warcPath {
+ uris[incomplete.URI()] = incomplete
+ } else {
+ WARCsM.Lock()
+ WARCs[incomplete.WARCPath][incomplete.URI()] = incomplete
+ WARCsM.Unlock()
+ }
delete(Incomplete, originID)
}
}
}
+ r.Close()
WARCsM.Lock()
WARCs[warcPath] = uris
+ WARCsOffsets[warcPath] = r.offsets
WARCsM.Unlock()
return nil
}
if _, err := os.Stat(p); err == nil {
continue
}
+ tmpSuffix := strconv.FormatInt(time.Now().UnixNano()+int64(os.Getpid()), 16)
fd, err := os.OpenFile(
- p+".tmp",
- os.O_CREATE|os.O_WRONLY|os.O_EXCL,
+ p+tmpSuffix,
+ os.O_WRONLY|os.O_CREATE|os.O_EXCL,
os.FileMode(0666),
)
if err != nil {
return err
}
- if err = gob.NewEncoder(fd).Encode(&uris); err != nil {
+ enc := gob.NewEncoder(fd)
+ if err = enc.Encode(&uris); err != nil {
fd.Close()
return err
}
- fd.Close()
- if err = os.Rename(p+".tmp", p); err != nil {
+ offsets := WARCsOffsets[warcPath]
+ if err = enc.Encode(&offsets); err != nil {
+ fd.Close()
+ return err
+ }
+ if err = fd.Close(); err != nil {
+ return err
+ }
+ if err = os.Rename(p+tmpSuffix, p); err != nil {
return err
}
log.Println("saved:", p)