]> Sergey Matveev's repositories - btrtrc.git/blob - storage/possum/possum-provider.go
Add possum storage
[btrtrc.git] / storage / possum / possum-provider.go
1 //go:build !android
2
3 package possumTorrentStorage
4
5 import (
6         "cmp"
7         "fmt"
8         "github.com/anacrolix/log"
9         possum "github.com/anacrolix/possum/go"
10         possumResource "github.com/anacrolix/possum/go/resource"
11         "github.com/anacrolix/torrent/storage"
12         "io"
13         "sort"
14         "strconv"
15 )
16
17 // Extends possum resource.Provider with an efficient implementation of torrent
18 // storage.ConsecutiveChunkReader.
19 type Provider struct {
20         possumResource.Provider
21         Logger log.Logger
22 }
23
24 var _ storage.ConsecutiveChunkReader = Provider{}
25
26 // Sorts by a precomputed key but swaps on another slice at the same time.
27 type keySorter[T any, K cmp.Ordered] struct {
28         orig []T
29         keys []K
30 }
31
32 func (o keySorter[T, K]) Len() int {
33         return len(o.keys)
34 }
35
36 func (o keySorter[T, K]) Less(i, j int) bool {
37         return o.keys[i] < o.keys[j]
38 }
39
40 func (o keySorter[T, K]) Swap(i, j int) {
41         o.keys[i], o.keys[j] = o.keys[j], o.keys[i]
42         o.orig[i], o.orig[j] = o.orig[j], o.orig[i]
43 }
44
45 // TODO: Should the parent ReadConsecutiveChunks method take the expected number of bytes to avoid
46 // trying to read discontinuous or incomplete sequences of chunks?
47 func (p Provider) ReadConsecutiveChunks(prefix string) (rc io.ReadCloser, err error) {
48         p.Logger.Levelf(log.Debug, "ReadConsecutiveChunks(%q)", prefix)
49         //debug.PrintStack()
50         pr, err := p.Handle.NewReader()
51         if err != nil {
52                 return
53         }
54         defer func() {
55                 if err != nil {
56                         pr.End()
57                 }
58         }()
59         items, err := pr.ListItems(prefix)
60         if err != nil {
61                 return
62         }
63         keys := make([]int64, 0, len(items))
64         for _, item := range items {
65                 var i int64
66                 offsetStr := item.Key
67                 i, err = strconv.ParseInt(offsetStr, 10, 64)
68                 if err != nil {
69                         err = fmt.Errorf("failed to parse offset %q: %w", offsetStr, err)
70                         return
71                 }
72                 keys = append(keys, i)
73         }
74         sort.Sort(keySorter[possum.Item, int64]{items, keys})
75         offset := int64(0)
76         consValues := make([]consecutiveValue, 0, len(items))
77         for i, item := range items {
78                 itemOffset := keys[i]
79                 if itemOffset > offset {
80                         // We can't provide a continuous read.
81                         break
82                 }
83                 if itemOffset+item.Stat.Size() <= offset {
84                         // This item isn't needed
85                         continue
86                 }
87                 var v possum.Value
88                 v, err = pr.Add(prefix + item.Key)
89                 if err != nil {
90                         return
91                 }
92                 consValues = append(consValues, consecutiveValue{
93                         pv:     v,
94                         offset: itemOffset,
95                         size:   item.Stat.Size(),
96                 })
97                 offset += item.Stat.Size() - (offset - itemOffset)
98         }
99         err = pr.Begin()
100         if err != nil {
101                 return
102         }
103         rc, pw := io.Pipe()
104         go func() {
105                 defer pr.End()
106                 err := p.writeConsecutiveValues(consValues, pw)
107                 err = pw.CloseWithError(err)
108                 if err != nil {
109                         panic(err)
110                 }
111         }()
112         return
113 }
114
115 type consecutiveValue struct {
116         pv     possum.Value
117         offset int64
118         size   int64
119 }
120
121 func (pp Provider) writeConsecutiveValues(
122         values []consecutiveValue, pw *io.PipeWriter,
123 ) (err error) {
124         off := int64(0)
125         for _, v := range values {
126                 var n int64
127                 valueOff := off - v.offset
128                 n, err = io.Copy(pw, io.NewSectionReader(v.pv, valueOff, v.size-valueOff))
129                 if err != nil {
130                         return
131                 }
132                 off += n
133         }
134         return nil
135 }