]> Sergey Matveev's repositories - btrtrc.git/blob - storage/piece-resource.go
94325027b047a27f52f25e9d5871b3a3e04d4aa1
[btrtrc.git] / storage / piece-resource.go
1 package storage
2
3 import (
4         "bytes"
5         "encoding/hex"
6         "fmt"
7         "io"
8         "path"
9         "sort"
10         "strconv"
11
12         g "github.com/anacrolix/generics"
13         "github.com/anacrolix/missinggo/v2/resource"
14         "github.com/anacrolix/sync"
15
16         "github.com/anacrolix/torrent/metainfo"
17 )
18
19 type piecePerResource struct {
20         rp   PieceProvider
21         opts ResourcePiecesOpts
22 }
23
24 type ResourcePiecesOpts struct {
25         // After marking a piece complete, don't bother deleting its incomplete blobs.
26         LeaveIncompleteChunks bool
27         // Sized puts require being able to stream from a statement executed on another connection.
28         // Without them, we buffer the entire read and then put that.
29         NoSizedPuts bool
30         Capacity    *int64
31 }
32
33 func NewResourcePieces(p PieceProvider) ClientImpl {
34         return NewResourcePiecesOpts(p, ResourcePiecesOpts{})
35 }
36
37 func NewResourcePiecesOpts(p PieceProvider, opts ResourcePiecesOpts) ClientImpl {
38         return &piecePerResource{
39                 rp:   p,
40                 opts: opts,
41         }
42 }
43
44 type piecePerResourceTorrentImpl struct {
45         piecePerResource
46         locks []sync.RWMutex
47 }
48
49 func (piecePerResourceTorrentImpl) Close() error {
50         return nil
51 }
52
53 func (s piecePerResource) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) {
54         t := piecePerResourceTorrentImpl{
55                 s,
56                 make([]sync.RWMutex, info.NumPieces()),
57         }
58         return TorrentImpl{PieceWithHash: t.Piece, Close: t.Close}, nil
59 }
60
61 func (s piecePerResourceTorrentImpl) Piece(p metainfo.Piece, pieceHash g.Option[[]byte]) PieceImpl {
62         return piecePerResourcePiece{
63                 mp:               p,
64                 pieceHash:        pieceHash,
65                 piecePerResource: s.piecePerResource,
66                 mu:               &s.locks[p.Index()],
67         }
68 }
69
70 type PieceProvider interface {
71         resource.Provider
72 }
73
74 type ConsecutiveChunkReader interface {
75         ReadConsecutiveChunks(prefix string) (io.ReadCloser, error)
76 }
77
78 type piecePerResourcePiece struct {
79         mp metainfo.Piece
80         // The piece hash if we have it. It could be 20 or 32 bytes depending on the info version.
81         pieceHash g.Option[[]byte]
82         piecePerResource
83         // This protects operations that move complete/incomplete pieces around, which can trigger read
84         // errors that may cause callers to do more drastic things.
85         mu *sync.RWMutex
86 }
87
88 var _ io.WriterTo = piecePerResourcePiece{}
89
90 func (s piecePerResourcePiece) WriteTo(w io.Writer) (int64, error) {
91         s.mu.RLock()
92         defer s.mu.RUnlock()
93         if s.mustIsComplete() {
94                 r, err := s.completed().Get()
95                 if err != nil {
96                         return 0, fmt.Errorf("getting complete instance: %w", err)
97                 }
98                 defer r.Close()
99                 return io.Copy(w, r)
100         }
101         if ccr, ok := s.rp.(ConsecutiveChunkReader); ok {
102                 return s.writeConsecutiveIncompleteChunks(ccr, w)
103         }
104         return io.Copy(w, io.NewSectionReader(s, 0, s.mp.Length()))
105 }
106
107 func (s piecePerResourcePiece) writeConsecutiveIncompleteChunks(ccw ConsecutiveChunkReader, w io.Writer) (int64, error) {
108         r, err := ccw.ReadConsecutiveChunks(s.incompleteDirPath() + "/")
109         if err != nil {
110                 return 0, err
111         }
112         defer r.Close()
113         return io.Copy(w, r)
114 }
115
116 // Returns if the piece is complete. Ok should be true, because we are the definitive source of
117 // truth here.
118 func (s piecePerResourcePiece) mustIsComplete() bool {
119         completion := s.Completion()
120         if !completion.Ok {
121                 panic("must know complete definitively")
122         }
123         return completion.Complete
124 }
125
126 func (s piecePerResourcePiece) Completion() (_ Completion) {
127         if !s.pieceHash.Ok {
128                 return
129         }
130         s.mu.RLock()
131         defer s.mu.RUnlock()
132         fi, err := s.completed().Stat()
133         return Completion{
134                 Complete: err == nil && fi.Size() == s.mp.Length(),
135                 Ok:       true,
136         }
137 }
138
139 type SizedPutter interface {
140         PutSized(io.Reader, int64) error
141 }
142
143 func (s piecePerResourcePiece) MarkComplete() error {
144         s.mu.Lock()
145         defer s.mu.Unlock()
146         incompleteChunks := s.getChunks()
147         r, err := func() (io.ReadCloser, error) {
148                 if ccr, ok := s.rp.(ConsecutiveChunkReader); ok {
149                         return ccr.ReadConsecutiveChunks(s.incompleteDirPath() + "/")
150                 }
151                 return io.NopCloser(io.NewSectionReader(incompleteChunks, 0, s.mp.Length())), nil
152         }()
153         if err != nil {
154                 return fmt.Errorf("getting incomplete chunks reader: %w", err)
155         }
156         defer r.Close()
157         completedInstance := s.completed()
158         err = func() error {
159                 if sp, ok := completedInstance.(SizedPutter); ok && !s.opts.NoSizedPuts {
160                         return sp.PutSized(r, s.mp.Length())
161                 } else {
162                         return completedInstance.Put(r)
163                 }
164         }()
165         if err == nil && !s.opts.LeaveIncompleteChunks {
166                 // I think we do this synchronously here since we don't want callers to act on the completed
167                 // piece if we're concurrently still deleting chunks. The caller may decide to start
168                 // downloading chunks again and won't expect us to delete them. It seems to be much faster
169                 // to let the resource provider do this if possible.
170                 var wg sync.WaitGroup
171                 for _, c := range incompleteChunks {
172                         wg.Add(1)
173                         go func(c chunk) {
174                                 defer wg.Done()
175                                 c.instance.Delete()
176                         }(c)
177                 }
178                 wg.Wait()
179         }
180         return err
181 }
182
183 func (s piecePerResourcePiece) MarkNotComplete() error {
184         s.mu.Lock()
185         defer s.mu.Unlock()
186         return s.completed().Delete()
187 }
188
189 func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (int, error) {
190         s.mu.RLock()
191         defer s.mu.RUnlock()
192         if s.mustIsComplete() {
193                 return s.completed().ReadAt(b, off)
194         }
195         return s.getChunks().ReadAt(b, off)
196 }
197
198 func (s piecePerResourcePiece) WriteAt(b []byte, off int64) (n int, err error) {
199         s.mu.RLock()
200         defer s.mu.RUnlock()
201         i, err := s.rp.NewInstance(path.Join(s.incompleteDirPath(), strconv.FormatInt(off, 10)))
202         if err != nil {
203                 panic(err)
204         }
205         r := bytes.NewReader(b)
206         if sp, ok := i.(SizedPutter); ok {
207                 err = sp.PutSized(r, r.Size())
208         } else {
209                 err = i.Put(r)
210         }
211         n = len(b) - r.Len()
212         return
213 }
214
215 type chunk struct {
216         offset   int64
217         instance resource.Instance
218 }
219
220 type chunks []chunk
221
222 func (me chunks) ReadAt(b []byte, off int64) (int, error) {
223         for {
224                 if len(me) == 0 {
225                         return 0, io.EOF
226                 }
227                 if me[0].offset <= off {
228                         break
229                 }
230                 me = me[1:]
231         }
232         n, err := me[0].instance.ReadAt(b, off-me[0].offset)
233         if n == len(b) {
234                 return n, nil
235         }
236         if err == nil || err == io.EOF {
237                 n_, err := me[1:].ReadAt(b[n:], off+int64(n))
238                 return n + n_, err
239         }
240         return n, err
241 }
242
243 func (s piecePerResourcePiece) getChunks() (chunks chunks) {
244         names, err := s.incompleteDir().Readdirnames()
245         if err != nil {
246                 return
247         }
248         for _, n := range names {
249                 offset, err := strconv.ParseInt(n, 10, 64)
250                 if err != nil {
251                         panic(err)
252                 }
253                 i, err := s.rp.NewInstance(path.Join(s.incompleteDirPath(), n))
254                 if err != nil {
255                         panic(err)
256                 }
257                 chunks = append(chunks, chunk{offset, i})
258         }
259         sort.Slice(chunks, func(i, j int) bool {
260                 return chunks[i].offset < chunks[j].offset
261         })
262         return
263 }
264
265 func (s piecePerResourcePiece) completedInstancePath() string {
266         return path.Join("completed", s.hashHex())
267 }
268
269 func (s piecePerResourcePiece) completed() resource.Instance {
270         i, err := s.rp.NewInstance(s.completedInstancePath())
271         if err != nil {
272                 panic(err)
273         }
274         return i
275 }
276
277 func (s piecePerResourcePiece) incompleteDirPath() string {
278         return path.Join("incompleted", s.hashHex())
279 }
280
281 func (s piecePerResourcePiece) incompleteDir() resource.DirInstance {
282         i, err := s.rp.NewInstance(s.incompleteDirPath())
283         if err != nil {
284                 panic(err)
285         }
286         return i.(resource.DirInstance)
287 }
288
289 func (me piecePerResourcePiece) hashHex() string {
290         return hex.EncodeToString(me.pieceHash.Unwrap())
291 }