]> Sergey Matveev's repositories - tofuproxy.git/blobdiff - warc/compressed.go
Download link for 0.6.0 release
[tofuproxy.git] / warc / compressed.go
index baa683043f1e2560c7e3b51290cc7daab87131a4..84b30c35e5c0e4bf91ece7d498eee2f74578698e 100644 (file)
-/*
-tofuproxy -- flexible HTTP/WARC proxy with TLS certificates management
-Copyright (C) 2021 Sergey Matveev <stargrave@stargrave.org>
-
-This program is free software: you can redistribute it and/or modify
-it under the terms of the GNU General Public License as published by
-the Free Software Foundation, version 3 of the License.
-
-This program is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-GNU General Public License for more details.
-
-You should have received a copy of the GNU General Public License
-along with this program.  If not, see <http://www.gnu.org/licenses/>.
-*/
+// tofuproxy -- flexible HTTP/HTTPS proxy, TLS terminator, X.509 TOFU
+//              manager, WARC/geminispace browser
+// Copyright (C) 2021-2024 Sergey Matveev <stargrave@stargrave.org>
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, version 3 of the License.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
 package warc
 
 import (
-       "compress/gzip"
-       "fmt"
+       "bufio"
+       "bytes"
        "io"
+       "log"
        "os"
-       "path"
-
-       "github.com/klauspost/compress/zstd"
+       "os/exec"
+       "strconv"
+       "strings"
+       "sync"
 )
 
-type Compressed struct {
-       r      io.ReadCloser
-       fd     *os.File
-       offset int64
-}
-
-func (c *Compressed) Read(p []byte) (int, error) {
-       n, err := c.r.Read(p)
-       c.offset += int64(n)
-       return n, err
-}
+type CompressedReader struct {
+       cmd     *exec.Cmd
+       fd      *os.File
+       stdout  io.ReadCloser
+       offsets []Offset
 
-func (c *Compressed) Close() error {
-       c.r.Close()
-       return c.fd.Close()
+       offW      *os.File
+       offReader sync.WaitGroup
 }
 
-func (c *Compressed) Seek(offset int64, whence int) (int64, error) {
-       if whence != io.SeekStart {
-               panic("can only seek from the start")
+func NewCompressedReader(
+       warcPath, unCmd string,
+       offsets []Offset,
+       uOffset int64,
+) (*CompressedReader, error) {
+       var offZ, offU int64
+       for _, off := range offsets {
+               if uOffset < offU+off.U {
+                       break
+               }
+               offU += off.U
+               offZ += off.Z
        }
-       if _, err := io.CopyN(io.Discard, c, offset-c.offset); err != nil {
-               return 0, err
+       fd, err := os.Open(warcPath)
+       if err != nil {
+               return nil, err
        }
-       c.offset = offset
-       return c.offset, nil
-}
-
-func Open(warcPath string) (io.ReadSeekCloser, error) {
-       ext := path.Ext(warcPath)
-       switch ext {
-       case ".warc":
-               return os.Open(warcPath)
-       case ".gz":
-               fd, err := os.Open(warcPath)
-               if err != nil {
+       var dict []byte
+       if len(offsets) > 0 && offsets[0].U == 0 {
+               dict = make([]byte, offsets[0].Z)
+               if _, err = io.ReadFull(fd, dict); err != nil {
+                       fd.Close()
                        return nil, err
                }
-               gzr, err := gzip.NewReader(fd)
+       }
+       if _, err = fd.Seek(offZ, io.SeekStart); err != nil {
+               fd.Close()
+               return nil, err
+       }
+       cmd := exec.Command(unCmd)
+       stdout, err := cmd.StdoutPipe()
+       if err != nil {
+               fd.Close()
+               return nil, err
+       }
+       if dict == nil {
+               cmd.Stdin = fd
+       } else {
+               cmd.Stdin = io.MultiReader(bytes.NewReader(dict), fd)
+       }
+       if offsets == nil {
+               offR, offW, err := os.Pipe()
                if err != nil {
+                       fd.Close()
                        return nil, err
                }
-               gzr.Multistream(true)
-               return &Compressed{r: gzr, fd: fd}, nil
-       case ".zst":
-               fd, err := os.Open(warcPath)
+               cmd.ExtraFiles = append(cmd.ExtraFiles, offW)
+               err = cmd.Start()
                if err != nil {
+                       fd.Close()
+                       offW.Close()
                        return nil, err
                }
-               zstdr, err := zstd.NewReader(fd)
+               r := CompressedReader{
+                       cmd:    cmd,
+                       fd:     fd,
+                       stdout: stdout,
+                       offW:   offW,
+               }
+               r.offReader.Add(1)
+               go r.offsetsReader(offR)
+               return &r, nil
+       }
+       err = cmd.Start()
+       if err != nil {
+               fd.Close()
+               return nil, err
+       }
+       _, err = io.CopyN(io.Discard, stdout, uOffset-offU)
+       if err != nil {
+               cmd.Process.Kill()
+               fd.Close()
+               return nil, err
+       }
+       return &CompressedReader{cmd: cmd, fd: fd, stdout: stdout}, nil
+}
+
+func (r *CompressedReader) offsetsReader(offsets *os.File) {
+       scanner := bufio.NewScanner(offsets)
+       for scanner.Scan() {
+               l := scanner.Text()
+               cols := strings.Split(l, "\t")
+               if len(cols) != 2 {
+                       log.Println("len(cols) != 2:", l)
+                       continue
+               }
+               z, err := strconv.ParseUint(cols[0], 10, 64)
                if err != nil {
-                       return nil, err
+                       log.Println(err)
+                       continue
+               }
+               u, err := strconv.ParseUint(cols[1], 10, 64)
+               if err != nil {
+                       log.Println(err)
+                       continue
                }
-               return &Compressed{r: zstdr.IOReadCloser(), fd: fd}, nil
+               r.offsets = append(r.offsets, Offset{int64(z), int64(u)})
+       }
+       err := scanner.Err()
+       if err != nil {
+               log.Println(err)
        }
-       return nil, fmt.Errorf("unknown extensions: %s", ext)
+       r.offReader.Done()
+}
+
+func (r *CompressedReader) Read(p []byte) (int, error) {
+       return r.stdout.Read(p)
+}
+
+func (r *CompressedReader) Close() error {
+       err := r.cmd.Process.Kill()
+       r.stdout.Close()
+       r.fd.Close()
+       r.offW.Close()
+       r.offReader.Wait()
+       return err
+}
+
+func (r *CompressedReader) Offsets() []Offset {
+       return r.offsets
 }