]> Sergey Matveev's repositories - tofuproxy.git/blobdiff - warc/uris.go
Multistream WARCs and better Zstandard support
[tofuproxy.git] / warc / uris.go
index a971ff0edce5f35660c9ec6c4c22dbc4a1c2fed3..4d8c1a34b20fef8aa77d5ba64f89d9835535b9dc 100644 (file)
@@ -25,13 +25,15 @@ import (
        "os"
        "strconv"
        "sync"
+       "time"
 )
 
 const IndexExt = ".idx.gob"
 
 var (
-       WARCs  = map[string]map[string]*Record{}
-       WARCsM sync.RWMutex
+       WARCs        = map[string]map[string]*Record{}
+       WARCsOffsets = map[string][]Offset{}
+       WARCsM       sync.RWMutex
 
        Incomplete = map[string]*Record{}
 )
@@ -41,12 +43,19 @@ func Add(warcPath string) error {
        if err == nil {
                defer fd.Close()
                var uris map[string]*Record
-               if err := gob.NewDecoder(fd).Decode(&uris); err != nil {
+               var offsets []Offset
+               dec := gob.NewDecoder(fd)
+               if err := dec.Decode(&uris); err != nil {
+                       return err
+               }
+               if err := dec.Decode(&offsets); err != nil {
                        return err
                }
                WARCsM.Lock()
                WARCs[warcPath] = uris
+               WARCsOffsets[warcPath] = offsets
                WARCsM.Unlock()
+               log.Println("loaded marshalled index:", warcPath+IndexExt)
                return nil
        }
        if err != nil && !os.IsNotExist(err) {
@@ -59,13 +68,14 @@ func Add(warcPath string) error {
        defer r.Close()
        uris := map[string]*Record{}
        for {
-               rec, err := r.ReadRecord()
+               rec, _, err := r.ReadRecord()
                if err != nil {
                        if err == io.EOF {
                                break
                        }
                        return err
                }
+               rec.HdrLines = nil
                segNum := rec.Hdr.Get("WARC-Segment-Number")
                switch rec.Hdr.Get("WARC-Type") {
                case "response":
@@ -93,15 +103,21 @@ func Add(warcPath string) error {
                        }
                        incomplete.Continuations = append(incomplete.Continuations, rec)
                        if rec.Hdr.Get("WARC-Segment-Total-Length") != "" {
-                               WARCsM.Lock()
-                               WARCs[incomplete.WARCPath][incomplete.URI()] = incomplete
-                               WARCsM.Unlock()
+                               if incomplete.WARCPath == warcPath {
+                                       uris[incomplete.URI()] = incomplete
+                               } else {
+                                       WARCsM.Lock()
+                                       WARCs[incomplete.WARCPath][incomplete.URI()] = incomplete
+                                       WARCsM.Unlock()
+                               }
                                delete(Incomplete, originID)
                        }
                }
        }
+       r.Close()
        WARCsM.Lock()
        WARCs[warcPath] = uris
+       WARCsOffsets[warcPath] = r.offsets
        WARCsM.Unlock()
        return nil
 }
@@ -114,20 +130,29 @@ func SaveIndexes() error {
                if _, err := os.Stat(p); err == nil {
                        continue
                }
+               tmpSuffix := strconv.FormatInt(time.Now().UnixNano()+int64(os.Getpid()), 16)
                fd, err := os.OpenFile(
-                       p+".tmp",
-                       os.O_CREATE|os.O_WRONLY|os.O_EXCL,
+                       p+tmpSuffix,
+                       os.O_WRONLY|os.O_CREATE|os.O_EXCL,
                        os.FileMode(0666),
                )
                if err != nil {
                        return err
                }
-               if err = gob.NewEncoder(fd).Encode(&uris); err != nil {
+               enc := gob.NewEncoder(fd)
+               if err = enc.Encode(&uris); err != nil {
                        fd.Close()
                        return err
                }
-               fd.Close()
-               if err = os.Rename(p+".tmp", p); err != nil {
+               offsets := WARCsOffsets[warcPath]
+               if err = enc.Encode(&offsets); err != nil {
+                       fd.Close()
+                       return err
+               }
+               if err = fd.Close(); err != nil {
+                       return err
+               }
+               if err = os.Rename(p+tmpSuffix, p); err != nil {
                        return err
                }
                log.Println("saved:", p)