]> Sergey Matveev's repositories - tofuproxy.git/blob - warc/compressed.go
Multistream WARCs and better Zstandard support
[tofuproxy.git] / warc / compressed.go
1 /*
2 tofuproxy -- flexible HTTP/WARC proxy with TLS certificates management
3 Copyright (C) 2021 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package warc
19
20 import (
21         "bufio"
22         "bytes"
23         "io"
24         "log"
25         "os"
26         "os/exec"
27         "strconv"
28         "strings"
29         "sync"
30 )
31
32 type CompressedReader struct {
33         cmd     *exec.Cmd
34         fd      *os.File
35         stdout  io.ReadCloser
36         offsets []Offset
37
38         offW      *os.File
39         offReader sync.WaitGroup
40 }
41
42 func NewCompressedReader(
43         warcPath, unCmd string,
44         offsets []Offset,
45         uOffset int64,
46 ) (*CompressedReader, error) {
47         var offZ, offU int64
48         for _, off := range offsets {
49                 if uOffset < offU+off.U {
50                         break
51                 }
52                 offU += off.U
53                 offZ += off.Z
54         }
55         fd, err := os.Open(warcPath)
56         if err != nil {
57                 return nil, err
58         }
59         var dict []byte
60         if len(offsets) > 0 && offsets[0].U == 0 {
61                 dict = make([]byte, offsets[0].Z)
62                 if _, err = io.ReadFull(fd, dict); err != nil {
63                         fd.Close()
64                         return nil, err
65                 }
66         }
67         if _, err = fd.Seek(offZ, io.SeekStart); err != nil {
68                 fd.Close()
69                 return nil, err
70         }
71         cmd := exec.Command(unCmd)
72         stdout, err := cmd.StdoutPipe()
73         if err != nil {
74                 fd.Close()
75                 return nil, err
76         }
77         if dict == nil {
78                 cmd.Stdin = fd
79         } else {
80                 cmd.Stdin = io.MultiReader(bytes.NewReader(dict), fd)
81         }
82         if offsets == nil {
83                 offR, offW, err := os.Pipe()
84                 if err != nil {
85                         fd.Close()
86                         return nil, err
87                 }
88                 cmd.ExtraFiles = append(cmd.ExtraFiles, offW)
89                 err = cmd.Start()
90                 if err != nil {
91                         fd.Close()
92                         offW.Close()
93                         return nil, err
94                 }
95                 r := CompressedReader{
96                         cmd:    cmd,
97                         fd:     fd,
98                         stdout: stdout,
99                         offW:   offW,
100                 }
101                 r.offReader.Add(1)
102                 go r.offsetsReader(offR)
103                 return &r, nil
104         }
105         err = cmd.Start()
106         if err != nil {
107                 fd.Close()
108                 return nil, err
109         }
110         _, err = io.CopyN(io.Discard, stdout, uOffset-offU)
111         if err != nil {
112                 cmd.Process.Kill()
113                 fd.Close()
114                 return nil, err
115         }
116         return &CompressedReader{cmd: cmd, fd: fd, stdout: stdout}, nil
117 }
118
119 func (r *CompressedReader) offsetsReader(offsets *os.File) {
120         scanner := bufio.NewScanner(offsets)
121         for scanner.Scan() {
122                 l := scanner.Text()
123                 cols := strings.Split(l, "\t")
124                 if len(cols) != 2 {
125                         log.Println("len(cols) != 2:", l)
126                         continue
127                 }
128                 z, err := strconv.ParseUint(cols[0], 10, 64)
129                 if err != nil {
130                         log.Println(err)
131                         continue
132                 }
133                 u, err := strconv.ParseUint(cols[1], 10, 64)
134                 if err != nil {
135                         log.Println(err)
136                         continue
137                 }
138                 r.offsets = append(r.offsets, Offset{int64(z), int64(u)})
139         }
140         err := scanner.Err()
141         if err != nil {
142                 log.Println(err)
143         }
144         r.offReader.Done()
145 }
146
147 func (r *CompressedReader) Read(p []byte) (int, error) {
148         return r.stdout.Read(p)
149 }
150
151 func (r *CompressedReader) Close() error {
152         err := r.cmd.Process.Kill()
153         r.stdout.Close()
154         r.fd.Close()
155         r.offW.Close()
156         r.offReader.Wait()
157         return err
158 }
159
160 func (r *CompressedReader) Offsets() []Offset {
161         return r.offsets
162 }