X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=warc%2Fcompressed.go;fp=warc%2Fcompressed.go;h=b1de1ce31fa8ce0f7d45eb76f2c7292d031ac189;hb=bae1cfe5ce46a1b758ccc4dddda2751b6ac47f3e;hp=baa683043f1e2560c7e3b51290cc7daab87131a4;hpb=0c0a261a6ef4fddfc34a9150005f7964cc69c420;p=tofuproxy.git diff --git a/warc/compressed.go b/warc/compressed.go index baa6830..b1de1ce 100644 --- a/warc/compressed.go +++ b/warc/compressed.go @@ -18,69 +18,145 @@ along with this program. If not, see . package warc import ( - "compress/gzip" - "fmt" + "bufio" + "bytes" "io" + "log" "os" - "path" - - "github.com/klauspost/compress/zstd" + "os/exec" + "strconv" + "strings" + "sync" ) -type Compressed struct { - r io.ReadCloser - fd *os.File - offset int64 -} - -func (c *Compressed) Read(p []byte) (int, error) { - n, err := c.r.Read(p) - c.offset += int64(n) - return n, err -} +type CompressedReader struct { + cmd *exec.Cmd + fd *os.File + stdout io.ReadCloser + offsets []Offset -func (c *Compressed) Close() error { - c.r.Close() - return c.fd.Close() + offW *os.File + offReader sync.WaitGroup } -func (c *Compressed) Seek(offset int64, whence int) (int64, error) { - if whence != io.SeekStart { - panic("can only seek from the start") +func NewCompressedReader( + warcPath, unCmd string, + offsets []Offset, + uOffset int64, +) (*CompressedReader, error) { + var offZ, offU int64 + for _, off := range offsets { + if uOffset < offU+off.U { + break + } + offU += off.U + offZ += off.Z } - if _, err := io.CopyN(io.Discard, c, offset-c.offset); err != nil { - return 0, err + fd, err := os.Open(warcPath) + if err != nil { + return nil, err } - c.offset = offset - return c.offset, nil -} - -func Open(warcPath string) (io.ReadSeekCloser, error) { - ext := path.Ext(warcPath) - switch ext { - case ".warc": - return os.Open(warcPath) - case ".gz": - fd, err := os.Open(warcPath) - if err != nil { + var dict []byte + if len(offsets) > 0 && offsets[0].U == 0 { + dict = make([]byte, offsets[0].Z) + if _, err = io.ReadFull(fd, dict); err != nil { + fd.Close() return nil, err } - gzr, err := gzip.NewReader(fd) + } + if _, err = fd.Seek(offZ, io.SeekStart); err != nil { + fd.Close() + return nil, err + } + cmd := exec.Command(unCmd) + stdout, err := cmd.StdoutPipe() + if err != nil { + fd.Close() + return nil, err + } + if dict == nil { + cmd.Stdin = fd + } else { + cmd.Stdin = io.MultiReader(bytes.NewReader(dict), fd) + } + if offsets == nil { + offR, offW, err := os.Pipe() if err != nil { + fd.Close() return nil, err } - gzr.Multistream(true) - return &Compressed{r: gzr, fd: fd}, nil - case ".zst": - fd, err := os.Open(warcPath) + cmd.ExtraFiles = append(cmd.ExtraFiles, offW) + err = cmd.Start() if err != nil { + fd.Close() + offW.Close() return nil, err } - zstdr, err := zstd.NewReader(fd) + r := CompressedReader{ + cmd: cmd, + fd: fd, + stdout: stdout, + offW: offW, + } + r.offReader.Add(1) + go r.offsetsReader(offR) + return &r, nil + } + err = cmd.Start() + if err != nil { + fd.Close() + return nil, err + } + _, err = io.CopyN(io.Discard, stdout, uOffset-offU) + if err != nil { + cmd.Process.Kill() + fd.Close() + return nil, err + } + return &CompressedReader{cmd: cmd, fd: fd, stdout: stdout}, nil +} + +func (r *CompressedReader) offsetsReader(offsets *os.File) { + scanner := bufio.NewScanner(offsets) + for scanner.Scan() { + l := scanner.Text() + cols := strings.Split(l, "\t") + if len(cols) != 2 { + log.Println("len(cols) != 2:", l) + continue + } + z, err := strconv.ParseUint(cols[0], 10, 64) if err != nil { - return nil, err + log.Println(err) + continue + } + u, err := strconv.ParseUint(cols[1], 10, 64) + if err != nil { + log.Println(err) + continue } - return &Compressed{r: zstdr.IOReadCloser(), fd: fd}, nil + r.offsets = append(r.offsets, Offset{int64(z), int64(u)}) } - return nil, fmt.Errorf("unknown extensions: %s", ext) + err := scanner.Err() + if err != nil { + log.Println(err) + } + r.offReader.Done() +} + +func (r *CompressedReader) Read(p []byte) (int, error) { + return r.stdout.Read(p) +} + +func (r *CompressedReader) Close() error { + err := r.cmd.Process.Kill() + r.stdout.Close() + r.fd.Close() + r.offW.Close() + r.offReader.Wait() + return err +} + +func (r *CompressedReader) Offsets() []Offset { + return r.offsets }