/*
-tofuproxy -- flexible HTTP/WARC proxy with TLS certificates management
+tofuproxy -- flexible HTTP/HTTPS proxy, TLS terminator, X.509 TOFU
+ manager, WARC/geminispace browser
Copyright (C) 2021 Sergey Matveev <stargrave@stargrave.org>
This program is free software: you can redistribute it and/or modify
package warc
import (
- "compress/gzip"
- "fmt"
+ "bufio"
+ "bytes"
"io"
+ "log"
"os"
- "path"
-
- "github.com/klauspost/compress/zstd"
+ "os/exec"
+ "strconv"
+ "strings"
+ "sync"
)
-type Compressed struct {
- r io.ReadCloser
- fd *os.File
- offset int64
-}
-
-func (c *Compressed) Read(p []byte) (int, error) {
- n, err := c.r.Read(p)
- c.offset += int64(n)
- return n, err
-}
+type CompressedReader struct {
+ cmd *exec.Cmd
+ fd *os.File
+ stdout io.ReadCloser
+ offsets []Offset
-func (c *Compressed) Close() error {
- c.r.Close()
- return c.fd.Close()
+ offW *os.File
+ offReader sync.WaitGroup
}
-func (c *Compressed) Seek(offset int64, whence int) (int64, error) {
- if whence != io.SeekStart {
- panic("can only seek from the start")
+func NewCompressedReader(
+ warcPath, unCmd string,
+ offsets []Offset,
+ uOffset int64,
+) (*CompressedReader, error) {
+ var offZ, offU int64
+ for _, off := range offsets {
+ if uOffset < offU+off.U {
+ break
+ }
+ offU += off.U
+ offZ += off.Z
}
- if _, err := io.CopyN(io.Discard, c, offset-c.offset); err != nil {
- return 0, err
+ fd, err := os.Open(warcPath)
+ if err != nil {
+ return nil, err
}
- c.offset = offset
- return c.offset, nil
-}
-
-func Open(warcPath string) (io.ReadSeekCloser, error) {
- ext := path.Ext(warcPath)
- switch ext {
- case ".warc":
- return os.Open(warcPath)
- case ".gz":
- fd, err := os.Open(warcPath)
- if err != nil {
+ var dict []byte
+ if len(offsets) > 0 && offsets[0].U == 0 {
+ dict = make([]byte, offsets[0].Z)
+ if _, err = io.ReadFull(fd, dict); err != nil {
+ fd.Close()
return nil, err
}
- gzr, err := gzip.NewReader(fd)
+ }
+ if _, err = fd.Seek(offZ, io.SeekStart); err != nil {
+ fd.Close()
+ return nil, err
+ }
+ cmd := exec.Command(unCmd)
+ stdout, err := cmd.StdoutPipe()
+ if err != nil {
+ fd.Close()
+ return nil, err
+ }
+ if dict == nil {
+ cmd.Stdin = fd
+ } else {
+ cmd.Stdin = io.MultiReader(bytes.NewReader(dict), fd)
+ }
+ if offsets == nil {
+ offR, offW, err := os.Pipe()
if err != nil {
+ fd.Close()
return nil, err
}
- gzr.Multistream(true)
- return &Compressed{r: gzr, fd: fd}, nil
- case ".zst":
- fd, err := os.Open(warcPath)
+ cmd.ExtraFiles = append(cmd.ExtraFiles, offW)
+ err = cmd.Start()
if err != nil {
+ fd.Close()
+ offW.Close()
return nil, err
}
- zstdr, err := zstd.NewReader(fd)
+ r := CompressedReader{
+ cmd: cmd,
+ fd: fd,
+ stdout: stdout,
+ offW: offW,
+ }
+ r.offReader.Add(1)
+ go r.offsetsReader(offR)
+ return &r, nil
+ }
+ err = cmd.Start()
+ if err != nil {
+ fd.Close()
+ return nil, err
+ }
+ _, err = io.CopyN(io.Discard, stdout, uOffset-offU)
+ if err != nil {
+ cmd.Process.Kill()
+ fd.Close()
+ return nil, err
+ }
+ return &CompressedReader{cmd: cmd, fd: fd, stdout: stdout}, nil
+}
+
+func (r *CompressedReader) offsetsReader(offsets *os.File) {
+ scanner := bufio.NewScanner(offsets)
+ for scanner.Scan() {
+ l := scanner.Text()
+ cols := strings.Split(l, "\t")
+ if len(cols) != 2 {
+ log.Println("len(cols) != 2:", l)
+ continue
+ }
+ z, err := strconv.ParseUint(cols[0], 10, 64)
if err != nil {
- return nil, err
+ log.Println(err)
+ continue
+ }
+ u, err := strconv.ParseUint(cols[1], 10, 64)
+ if err != nil {
+ log.Println(err)
+ continue
}
- return &Compressed{r: zstdr.IOReadCloser(), fd: fd}, nil
+ r.offsets = append(r.offsets, Offset{int64(z), int64(u)})
}
- return nil, fmt.Errorf("unknown extensions: %s", ext)
+ err := scanner.Err()
+ if err != nil {
+ log.Println(err)
+ }
+ r.offReader.Done()
+}
+
+func (r *CompressedReader) Read(p []byte) (int, error) {
+ return r.stdout.Read(p)
+}
+
+func (r *CompressedReader) Close() error {
+ err := r.cmd.Process.Kill()
+ r.stdout.Close()
+ r.fd.Close()
+ r.offW.Close()
+ r.offReader.Wait()
+ return err
+}
+
+func (r *CompressedReader) Offsets() []Offset {
+ return r.offsets
}