]> Sergey Matveev's repositories - btrtrc.git/blob - storage/piece-resource.go
Add PrefixDeleter support to ResourcePieces storage
[btrtrc.git] / storage / piece-resource.go
1 package storage
2
3 import (
4         "bytes"
5         "encoding/hex"
6         "fmt"
7         "io"
8         "path"
9         "sort"
10         "strconv"
11
12         g "github.com/anacrolix/generics"
13         "github.com/anacrolix/missinggo/v2/resource"
14         "github.com/anacrolix/sync"
15
16         "github.com/anacrolix/torrent/metainfo"
17 )
18
19 type piecePerResource struct {
20         rp   PieceProvider
21         opts ResourcePiecesOpts
22 }
23
24 type ResourcePiecesOpts struct {
25         // After marking a piece complete, don't bother deleting its incomplete blobs.
26         LeaveIncompleteChunks bool
27         // Sized puts require being able to stream from a statement executed on another connection.
28         // Without them, we buffer the entire read and then put that.
29         NoSizedPuts bool
30         Capacity    *int64
31 }
32
33 func NewResourcePieces(p PieceProvider) ClientImpl {
34         return NewResourcePiecesOpts(p, ResourcePiecesOpts{})
35 }
36
37 func NewResourcePiecesOpts(p PieceProvider, opts ResourcePiecesOpts) ClientImpl {
38         return &piecePerResource{
39                 rp:   p,
40                 opts: opts,
41         }
42 }
43
44 type piecePerResourceTorrentImpl struct {
45         piecePerResource
46         locks []sync.RWMutex
47 }
48
49 func (piecePerResourceTorrentImpl) Close() error {
50         return nil
51 }
52
53 func (s piecePerResource) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) {
54         t := piecePerResourceTorrentImpl{
55                 s,
56                 make([]sync.RWMutex, info.NumPieces()),
57         }
58         return TorrentImpl{PieceWithHash: t.Piece, Close: t.Close}, nil
59 }
60
61 func (s piecePerResourceTorrentImpl) Piece(p metainfo.Piece, pieceHash g.Option[[]byte]) PieceImpl {
62         return piecePerResourcePiece{
63                 mp:               p,
64                 pieceHash:        pieceHash,
65                 piecePerResource: s.piecePerResource,
66                 mu:               &s.locks[p.Index()],
67         }
68 }
69
70 type PieceProvider interface {
71         resource.Provider
72 }
73
74 type ConsecutiveChunkReader interface {
75         ReadConsecutiveChunks(prefix string) (io.ReadCloser, error)
76 }
77
78 type PrefixDeleter interface {
79         DeletePrefix(prefix string) error
80 }
81
82 type piecePerResourcePiece struct {
83         mp metainfo.Piece
84         // The piece hash if we have it. It could be 20 or 32 bytes depending on the info version.
85         pieceHash g.Option[[]byte]
86         piecePerResource
87         // This protects operations that move complete/incomplete pieces around, which can trigger read
88         // errors that may cause callers to do more drastic things.
89         mu *sync.RWMutex
90 }
91
92 var _ io.WriterTo = piecePerResourcePiece{}
93
94 func (s piecePerResourcePiece) WriteTo(w io.Writer) (int64, error) {
95         s.mu.RLock()
96         defer s.mu.RUnlock()
97         if s.mustIsComplete() {
98                 r, err := s.completed().Get()
99                 if err != nil {
100                         return 0, fmt.Errorf("getting complete instance: %w", err)
101                 }
102                 defer r.Close()
103                 return io.Copy(w, r)
104         }
105         if ccr, ok := s.rp.(ConsecutiveChunkReader); ok {
106                 return s.writeConsecutiveIncompleteChunks(ccr, w)
107         }
108         return io.Copy(w, io.NewSectionReader(s, 0, s.mp.Length()))
109 }
110
111 func (s piecePerResourcePiece) writeConsecutiveIncompleteChunks(ccw ConsecutiveChunkReader, w io.Writer) (int64, error) {
112         r, err := ccw.ReadConsecutiveChunks(s.incompleteDirPath() + "/")
113         if err != nil {
114                 return 0, err
115         }
116         defer r.Close()
117         return io.Copy(w, r)
118 }
119
120 // Returns if the piece is complete. Ok should be true, because we are the definitive source of
121 // truth here.
122 func (s piecePerResourcePiece) mustIsComplete() bool {
123         completion := s.Completion()
124         if !completion.Ok {
125                 panic("must know complete definitively")
126         }
127         return completion.Complete
128 }
129
130 func (s piecePerResourcePiece) Completion() (_ Completion) {
131         if !s.pieceHash.Ok {
132                 return
133         }
134         s.mu.RLock()
135         defer s.mu.RUnlock()
136         fi, err := s.completed().Stat()
137         return Completion{
138                 Complete: err == nil && fi.Size() == s.mp.Length(),
139                 Ok:       true,
140         }
141 }
142
143 type SizedPutter interface {
144         PutSized(io.Reader, int64) error
145 }
146
147 func (s piecePerResourcePiece) MarkComplete() (err error) {
148         s.mu.Lock()
149         defer s.mu.Unlock()
150         incompleteChunks := s.getChunks()
151         r, err := func() (io.ReadCloser, error) {
152                 if ccr, ok := s.rp.(ConsecutiveChunkReader); ok {
153                         return ccr.ReadConsecutiveChunks(s.incompleteDirPath() + "/")
154                 }
155                 return io.NopCloser(io.NewSectionReader(incompleteChunks, 0, s.mp.Length())), nil
156         }()
157         if err != nil {
158                 return fmt.Errorf("getting incomplete chunks reader: %w", err)
159         }
160         defer r.Close()
161         completedInstance := s.completed()
162         err = func() error {
163                 if sp, ok := completedInstance.(SizedPutter); ok && !s.opts.NoSizedPuts {
164                         return sp.PutSized(r, s.mp.Length())
165                 } else {
166                         return completedInstance.Put(r)
167                 }
168         }()
169         if err != nil || s.opts.LeaveIncompleteChunks {
170                 return
171         }
172
173         // I think we do this synchronously here since we don't want callers to act on the completed
174         // piece if we're concurrently still deleting chunks. The caller may decide to start
175         // downloading chunks again and won't expect us to delete them. It seems to be much faster
176         // to let the resource provider do this if possible.
177         if pd, ok := s.rp.(PrefixDeleter); ok {
178                 err = pd.DeletePrefix(s.incompleteDirPath() + "/")
179                 if err != nil {
180                         err = fmt.Errorf("deleting incomplete prefix: %w", err)
181                 }
182         } else {
183                 var wg sync.WaitGroup
184                 for _, c := range incompleteChunks {
185                         wg.Add(1)
186                         go func(c chunk) {
187                                 defer wg.Done()
188                                 c.instance.Delete()
189                         }(c)
190                 }
191                 wg.Wait()
192         }
193         return err
194 }
195
196 func (s piecePerResourcePiece) MarkNotComplete() error {
197         s.mu.Lock()
198         defer s.mu.Unlock()
199         return s.completed().Delete()
200 }
201
202 func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (int, error) {
203         s.mu.RLock()
204         defer s.mu.RUnlock()
205         if s.mustIsComplete() {
206                 return s.completed().ReadAt(b, off)
207         }
208         return s.getChunks().ReadAt(b, off)
209 }
210
211 func (s piecePerResourcePiece) WriteAt(b []byte, off int64) (n int, err error) {
212         s.mu.RLock()
213         defer s.mu.RUnlock()
214         i, err := s.rp.NewInstance(path.Join(s.incompleteDirPath(), strconv.FormatInt(off, 10)))
215         if err != nil {
216                 panic(err)
217         }
218         r := bytes.NewReader(b)
219         if sp, ok := i.(SizedPutter); ok {
220                 err = sp.PutSized(r, r.Size())
221         } else {
222                 err = i.Put(r)
223         }
224         n = len(b) - r.Len()
225         return
226 }
227
228 type chunk struct {
229         offset   int64
230         instance resource.Instance
231 }
232
233 type chunks []chunk
234
235 func (me chunks) ReadAt(b []byte, off int64) (int, error) {
236         for {
237                 if len(me) == 0 {
238                         return 0, io.EOF
239                 }
240                 if me[0].offset <= off {
241                         break
242                 }
243                 me = me[1:]
244         }
245         n, err := me[0].instance.ReadAt(b, off-me[0].offset)
246         if n == len(b) {
247                 return n, nil
248         }
249         if err == nil || err == io.EOF {
250                 n_, err := me[1:].ReadAt(b[n:], off+int64(n))
251                 return n + n_, err
252         }
253         return n, err
254 }
255
256 func (s piecePerResourcePiece) getChunks() (chunks chunks) {
257         names, err := s.incompleteDir().Readdirnames()
258         if err != nil {
259                 return
260         }
261         for _, n := range names {
262                 offset, err := strconv.ParseInt(n, 10, 64)
263                 if err != nil {
264                         panic(err)
265                 }
266                 i, err := s.rp.NewInstance(path.Join(s.incompleteDirPath(), n))
267                 if err != nil {
268                         panic(err)
269                 }
270                 chunks = append(chunks, chunk{offset, i})
271         }
272         sort.Slice(chunks, func(i, j int) bool {
273                 return chunks[i].offset < chunks[j].offset
274         })
275         return
276 }
277
278 func (s piecePerResourcePiece) completedInstancePath() string {
279         return path.Join("completed", s.hashHex())
280 }
281
282 func (s piecePerResourcePiece) completed() resource.Instance {
283         i, err := s.rp.NewInstance(s.completedInstancePath())
284         if err != nil {
285                 panic(err)
286         }
287         return i
288 }
289
290 func (s piecePerResourcePiece) incompleteDirPath() string {
291         return path.Join("incompleted", s.hashHex())
292 }
293
294 func (s piecePerResourcePiece) incompleteDir() resource.DirInstance {
295         i, err := s.rp.NewInstance(s.incompleteDirPath())
296         if err != nil {
297                 panic(err)
298         }
299         return i.(resource.DirInstance)
300 }
301
302 func (me piecePerResourcePiece) hashHex() string {
303         return hex.EncodeToString(me.pieceHash.Unwrap())
304 }