]> Sergey Matveev's repositories - tofuproxy.git/blob - warc/uris.go
Multistream WARCs and better Zstandard support
[tofuproxy.git] / warc / uris.go
1 /*
2 tofuproxy -- flexible HTTP/WARC proxy with TLS certificates management
3 Copyright (C) 2021 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package warc
19
20 import (
21         "encoding/gob"
22         "fmt"
23         "io"
24         "log"
25         "os"
26         "strconv"
27         "sync"
28         "time"
29 )
30
31 const IndexExt = ".idx.gob"
32
33 var (
34         WARCs        = map[string]map[string]*Record{}
35         WARCsOffsets = map[string][]Offset{}
36         WARCsM       sync.RWMutex
37
38         Incomplete = map[string]*Record{}
39 )
40
41 func Add(warcPath string) error {
42         fd, err := os.Open(warcPath + IndexExt)
43         if err == nil {
44                 defer fd.Close()
45                 var uris map[string]*Record
46                 var offsets []Offset
47                 dec := gob.NewDecoder(fd)
48                 if err := dec.Decode(&uris); err != nil {
49                         return err
50                 }
51                 if err := dec.Decode(&offsets); err != nil {
52                         return err
53                 }
54                 WARCsM.Lock()
55                 WARCs[warcPath] = uris
56                 WARCsOffsets[warcPath] = offsets
57                 WARCsM.Unlock()
58                 log.Println("loaded marshalled index:", warcPath+IndexExt)
59                 return nil
60         }
61         if err != nil && !os.IsNotExist(err) {
62                 return err
63         }
64         r, err := NewReader(warcPath)
65         if err != nil {
66                 return err
67         }
68         defer r.Close()
69         uris := map[string]*Record{}
70         for {
71                 rec, _, err := r.ReadRecord()
72                 if err != nil {
73                         if err == io.EOF {
74                                 break
75                         }
76                         return err
77                 }
78                 rec.HdrLines = nil
79                 segNum := rec.Hdr.Get("WARC-Segment-Number")
80                 switch rec.Hdr.Get("WARC-Type") {
81                 case "response":
82                         uri := rec.URI()
83                         if uri == "" {
84                                 continue
85                         }
86                         if segNum == "1" {
87                                 Incomplete[rec.Hdr.Get("WARC-Record-ID")] = rec
88                                 continue
89                         }
90                         uris[uri] = rec
91                 case "continuation":
92                         originID := rec.Hdr.Get("WARC-Segment-Origin-ID")
93                         incomplete := Incomplete[originID]
94                         if incomplete == nil {
95                                 return fmt.Errorf("can not find WARC-Segment-Origin-ID: %q", originID)
96                         }
97                         segNumExpected := strconv.Itoa(len(incomplete.Continuations) + 1 + 1)
98                         if segNum != segNumExpected {
99                                 return fmt.Errorf(
100                                         "unexpected WARC-Segment-Number %s != %s",
101                                         segNum, segNumExpected,
102                                 )
103                         }
104                         incomplete.Continuations = append(incomplete.Continuations, rec)
105                         if rec.Hdr.Get("WARC-Segment-Total-Length") != "" {
106                                 if incomplete.WARCPath == warcPath {
107                                         uris[incomplete.URI()] = incomplete
108                                 } else {
109                                         WARCsM.Lock()
110                                         WARCs[incomplete.WARCPath][incomplete.URI()] = incomplete
111                                         WARCsM.Unlock()
112                                 }
113                                 delete(Incomplete, originID)
114                         }
115                 }
116         }
117         r.Close()
118         WARCsM.Lock()
119         WARCs[warcPath] = uris
120         WARCsOffsets[warcPath] = r.offsets
121         WARCsM.Unlock()
122         return nil
123 }
124
125 func SaveIndexes() error {
126         WARCsM.RLock()
127         defer WARCsM.RUnlock()
128         for warcPath, uris := range WARCs {
129                 p := warcPath + IndexExt
130                 if _, err := os.Stat(p); err == nil {
131                         continue
132                 }
133                 tmpSuffix := strconv.FormatInt(time.Now().UnixNano()+int64(os.Getpid()), 16)
134                 fd, err := os.OpenFile(
135                         p+tmpSuffix,
136                         os.O_WRONLY|os.O_CREATE|os.O_EXCL,
137                         os.FileMode(0666),
138                 )
139                 if err != nil {
140                         return err
141                 }
142                 enc := gob.NewEncoder(fd)
143                 if err = enc.Encode(&uris); err != nil {
144                         fd.Close()
145                         return err
146                 }
147                 offsets := WARCsOffsets[warcPath]
148                 if err = enc.Encode(&offsets); err != nil {
149                         fd.Close()
150                         return err
151                 }
152                 if err = fd.Close(); err != nil {
153                         return err
154                 }
155                 if err = os.Rename(p+tmpSuffix, p); err != nil {
156                         return err
157                 }
158                 log.Println("saved:", p)
159         }
160         return nil
161 }