]> Sergey Matveev's repositories - btrtrc.git/blob - storage/piece-resource.go
Fix TestSeedAfterDownloading when cgo is disabled
[btrtrc.git] / storage / piece-resource.go
1 package storage
2
3 import (
4         "bytes"
5         "fmt"
6         "io"
7         "io/ioutil"
8         "path"
9         "sort"
10         "strconv"
11         "sync"
12
13         "github.com/anacrolix/missinggo/v2/resource"
14
15         "github.com/anacrolix/torrent/metainfo"
16 )
17
18 type piecePerResource struct {
19         rp   PieceProvider
20         opts ResourcePiecesOpts
21 }
22
23 type ResourcePiecesOpts struct {
24         // After marking a piece complete, don't bother deleting its incomplete blobs.
25         LeaveIncompleteChunks bool
26         // Sized puts require being able to stream from a statement executed on another connection.
27         // Without them, we buffer the entire read and then put that.
28         NoSizedPuts bool
29         Capacity    *int64
30 }
31
32 func NewResourcePieces(p PieceProvider) ClientImpl {
33         return NewResourcePiecesOpts(p, ResourcePiecesOpts{})
34 }
35
36 func NewResourcePiecesOpts(p PieceProvider, opts ResourcePiecesOpts) ClientImpl {
37         return &piecePerResource{
38                 rp:   p,
39                 opts: opts,
40         }
41 }
42
43 type piecePerResourceTorrentImpl struct {
44         piecePerResource
45         locks []sync.RWMutex
46 }
47
48 func (piecePerResourceTorrentImpl) Close() error {
49         return nil
50 }
51
52 func (s piecePerResource) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) {
53         t := piecePerResourceTorrentImpl{
54                 s,
55                 make([]sync.RWMutex, info.NumPieces()),
56         }
57         return TorrentImpl{Piece: t.Piece, Close: t.Close}, nil
58 }
59
60 func (s piecePerResourceTorrentImpl) Piece(p metainfo.Piece) PieceImpl {
61         return piecePerResourcePiece{
62                 mp:               p,
63                 piecePerResource: s.piecePerResource,
64                 mu:               &s.locks[p.Index()],
65         }
66 }
67
68 type PieceProvider interface {
69         resource.Provider
70 }
71
72 type ConsecutiveChunkReader interface {
73         ReadConsecutiveChunks(prefix string) (io.ReadCloser, error)
74 }
75
76 type piecePerResourcePiece struct {
77         mp metainfo.Piece
78         piecePerResource
79         // This protects operations that move complete/incomplete pieces around, which can trigger read
80         // errors that may cause callers to do more drastic things.
81         mu *sync.RWMutex
82 }
83
84 var _ io.WriterTo = piecePerResourcePiece{}
85
86 func (s piecePerResourcePiece) WriteTo(w io.Writer) (int64, error) {
87         s.mu.RLock()
88         defer s.mu.RUnlock()
89         if s.mustIsComplete() {
90                 r, err := s.completed().Get()
91                 if err != nil {
92                         return 0, fmt.Errorf("getting complete instance: %w", err)
93                 }
94                 defer r.Close()
95                 return io.Copy(w, r)
96         }
97         if ccr, ok := s.rp.(ConsecutiveChunkReader); ok {
98                 return s.writeConsecutiveIncompleteChunks(ccr, w)
99         }
100         return io.Copy(w, io.NewSectionReader(s, 0, s.mp.Length()))
101 }
102
103 func (s piecePerResourcePiece) writeConsecutiveIncompleteChunks(ccw ConsecutiveChunkReader, w io.Writer) (int64, error) {
104         r, err := ccw.ReadConsecutiveChunks(s.incompleteDirPath() + "/")
105         if err != nil {
106                 return 0, err
107         }
108         defer r.Close()
109         return io.Copy(w, r)
110 }
111
112 // Returns if the piece is complete. Ok should be true, because we are the definitive source of
113 // truth here.
114 func (s piecePerResourcePiece) mustIsComplete() bool {
115         completion := s.Completion()
116         if !completion.Ok {
117                 panic("must know complete definitively")
118         }
119         return completion.Complete
120 }
121
122 func (s piecePerResourcePiece) Completion() Completion {
123         s.mu.RLock()
124         defer s.mu.RUnlock()
125         fi, err := s.completed().Stat()
126         return Completion{
127                 Complete: err == nil && fi.Size() == s.mp.Length(),
128                 Ok:       true,
129         }
130 }
131
132 type SizedPutter interface {
133         PutSized(io.Reader, int64) error
134 }
135
136 func (s piecePerResourcePiece) MarkComplete() error {
137         s.mu.Lock()
138         defer s.mu.Unlock()
139         incompleteChunks := s.getChunks()
140         r, err := func() (io.ReadCloser, error) {
141                 if ccr, ok := s.rp.(ConsecutiveChunkReader); ok {
142                         return ccr.ReadConsecutiveChunks(s.incompleteDirPath() + "/")
143                 }
144                 return ioutil.NopCloser(io.NewSectionReader(incompleteChunks, 0, s.mp.Length())), nil
145         }()
146         if err != nil {
147                 return fmt.Errorf("getting incomplete chunks reader: %w", err)
148         }
149         defer r.Close()
150         completedInstance := s.completed()
151         err = func() error {
152                 if sp, ok := completedInstance.(SizedPutter); ok && !s.opts.NoSizedPuts {
153                         return sp.PutSized(r, s.mp.Length())
154                 } else {
155                         return completedInstance.Put(r)
156                 }
157         }()
158         if err == nil && !s.opts.LeaveIncompleteChunks {
159                 // I think we do this synchronously here since we don't want callers to act on the completed
160                 // piece if we're concurrently still deleting chunks. The caller may decide to start
161                 // downloading chunks again and won't expect us to delete them. It seems to be much faster
162                 // to let the resource provider do this if possible.
163                 var wg sync.WaitGroup
164                 for _, c := range incompleteChunks {
165                         wg.Add(1)
166                         go func(c chunk) {
167                                 defer wg.Done()
168                                 c.instance.Delete()
169                         }(c)
170                 }
171                 wg.Wait()
172         }
173         return err
174 }
175
176 func (s piecePerResourcePiece) MarkNotComplete() error {
177         s.mu.Lock()
178         defer s.mu.Unlock()
179         return s.completed().Delete()
180 }
181
182 func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (int, error) {
183         s.mu.RLock()
184         defer s.mu.RUnlock()
185         if s.mustIsComplete() {
186                 return s.completed().ReadAt(b, off)
187         }
188         return s.getChunks().ReadAt(b, off)
189 }
190
191 func (s piecePerResourcePiece) WriteAt(b []byte, off int64) (n int, err error) {
192         s.mu.RLock()
193         defer s.mu.RUnlock()
194         i, err := s.rp.NewInstance(path.Join(s.incompleteDirPath(), strconv.FormatInt(off, 10)))
195         if err != nil {
196                 panic(err)
197         }
198         r := bytes.NewReader(b)
199         if sp, ok := i.(SizedPutter); ok {
200                 err = sp.PutSized(r, r.Size())
201         } else {
202                 err = i.Put(r)
203         }
204         n = len(b) - r.Len()
205         return
206 }
207
208 type chunk struct {
209         offset   int64
210         instance resource.Instance
211 }
212
213 type chunks []chunk
214
215 func (me chunks) ReadAt(b []byte, off int64) (int, error) {
216         for {
217                 if len(me) == 0 {
218                         return 0, io.EOF
219                 }
220                 if me[0].offset <= off {
221                         break
222                 }
223                 me = me[1:]
224         }
225         n, err := me[0].instance.ReadAt(b, off-me[0].offset)
226         if n == len(b) {
227                 return n, nil
228         }
229         if err == nil || err == io.EOF {
230                 n_, err := me[1:].ReadAt(b[n:], off+int64(n))
231                 return n + n_, err
232         }
233         return n, err
234 }
235
236 func (s piecePerResourcePiece) getChunks() (chunks chunks) {
237         names, err := s.incompleteDir().Readdirnames()
238         if err != nil {
239                 return
240         }
241         for _, n := range names {
242                 offset, err := strconv.ParseInt(n, 10, 64)
243                 if err != nil {
244                         panic(err)
245                 }
246                 i, err := s.rp.NewInstance(path.Join(s.incompleteDirPath(), n))
247                 if err != nil {
248                         panic(err)
249                 }
250                 chunks = append(chunks, chunk{offset, i})
251         }
252         sort.Slice(chunks, func(i, j int) bool {
253                 return chunks[i].offset < chunks[j].offset
254         })
255         return
256 }
257
258 func (s piecePerResourcePiece) completedInstancePath() string {
259         return path.Join("completed", s.mp.Hash().HexString())
260 }
261
262 func (s piecePerResourcePiece) completed() resource.Instance {
263         i, err := s.rp.NewInstance(s.completedInstancePath())
264         if err != nil {
265                 panic(err)
266         }
267         return i
268 }
269
270 func (s piecePerResourcePiece) incompleteDirPath() string {
271         return path.Join("incompleted", s.mp.Hash().HexString())
272 }
273
274 func (s piecePerResourcePiece) incompleteDir() resource.DirInstance {
275         i, err := s.rp.NewInstance(s.incompleteDirPath())
276         if err != nil {
277                 panic(err)
278         }
279         return i.(resource.DirInstance)
280 }