]> Sergey Matveev's repositories - btrtrc.git/blob - storage/piece-resource.go
Add CustomDirect BenchmarkMarkComplete sub-test
[btrtrc.git] / storage / piece-resource.go
1 package storage
2
3 import (
4         "bytes"
5         "fmt"
6         "io"
7         "io/ioutil"
8         "path"
9         "sort"
10         "strconv"
11         "sync"
12
13         "github.com/anacrolix/missinggo/v2/resource"
14
15         "github.com/anacrolix/torrent/metainfo"
16 )
17
18 type piecePerResource struct {
19         rp   PieceProvider
20         opts ResourcePiecesOpts
21 }
22
23 type ResourcePiecesOpts struct {
24         // After marking a piece complete, don't bother deleting its incomplete blobs.
25         LeaveIncompleteChunks bool
26         // Sized puts require being able to stream from a statement executed on another connection.
27         // Without them, we buffer the entire read and then put that.
28         NoSizedPuts bool
29 }
30
31 func NewResourcePieces(p PieceProvider) ClientImpl {
32         return NewResourcePiecesOpts(p, ResourcePiecesOpts{})
33 }
34
35 func NewResourcePiecesOpts(p PieceProvider, opts ResourcePiecesOpts) ClientImpl {
36         return &piecePerResource{
37                 rp:   p,
38                 opts: opts,
39         }
40 }
41
42 type piecePerResourceTorrentImpl struct {
43         piecePerResource
44         locks []sync.RWMutex
45 }
46
47 func (piecePerResourceTorrentImpl) Close() error {
48         return nil
49 }
50
51 func (s piecePerResource) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) {
52         return piecePerResourceTorrentImpl{
53                 s,
54                 make([]sync.RWMutex, info.NumPieces()),
55         }, nil
56 }
57
58 func (s piecePerResourceTorrentImpl) Piece(p metainfo.Piece) PieceImpl {
59         return piecePerResourcePiece{
60                 mp:               p,
61                 piecePerResource: s.piecePerResource,
62                 mu:               &s.locks[p.Index()],
63         }
64 }
65
66 type PieceProvider interface {
67         resource.Provider
68 }
69
70 type ConsecutiveChunkReader interface {
71         ReadConsecutiveChunks(prefix string) (io.ReadCloser, error)
72 }
73
74 type piecePerResourcePiece struct {
75         mp metainfo.Piece
76         piecePerResource
77         // This protects operations that move complete/incomplete pieces around, which can trigger read
78         // errors that may cause callers to do more drastic things.
79         mu *sync.RWMutex
80 }
81
82 var _ io.WriterTo = piecePerResourcePiece{}
83
84 func (s piecePerResourcePiece) WriteTo(w io.Writer) (int64, error) {
85         s.mu.RLock()
86         defer s.mu.RUnlock()
87         if s.mustIsComplete() {
88                 r, err := s.completed().Get()
89                 if err != nil {
90                         return 0, fmt.Errorf("getting complete instance: %w", err)
91                 }
92                 defer r.Close()
93                 return io.Copy(w, r)
94         }
95         if ccr, ok := s.rp.(ConsecutiveChunkReader); ok {
96                 return s.writeConsecutiveIncompleteChunks(ccr, w)
97         }
98         return io.Copy(w, io.NewSectionReader(s, 0, s.mp.Length()))
99 }
100
101 func (s piecePerResourcePiece) writeConsecutiveIncompleteChunks(ccw ConsecutiveChunkReader, w io.Writer) (int64, error) {
102         r, err := ccw.ReadConsecutiveChunks(s.incompleteDirPath() + "/")
103         if err != nil {
104                 return 0, err
105         }
106         defer r.Close()
107         return io.Copy(w, r)
108 }
109
110 // Returns if the piece is complete. Ok should be true, because we are the definitive source of
111 // truth here.
112 func (s piecePerResourcePiece) mustIsComplete() bool {
113         completion := s.Completion()
114         if !completion.Ok {
115                 panic("must know complete definitively")
116         }
117         return completion.Complete
118 }
119
120 func (s piecePerResourcePiece) Completion() Completion {
121         s.mu.RLock()
122         defer s.mu.RUnlock()
123         fi, err := s.completed().Stat()
124         return Completion{
125                 Complete: err == nil && fi.Size() == s.mp.Length(),
126                 Ok:       true,
127         }
128 }
129
130 type SizedPutter interface {
131         PutSized(io.Reader, int64) error
132 }
133
134 func (s piecePerResourcePiece) MarkComplete() error {
135         s.mu.Lock()
136         defer s.mu.Unlock()
137         incompleteChunks := s.getChunks()
138         r, err := func() (io.ReadCloser, error) {
139                 if ccr, ok := s.rp.(ConsecutiveChunkReader); ok {
140                         return ccr.ReadConsecutiveChunks(s.incompleteDirPath() + "/")
141                 }
142                 return ioutil.NopCloser(io.NewSectionReader(incompleteChunks, 0, s.mp.Length())), nil
143         }()
144         if err != nil {
145                 return fmt.Errorf("getting incomplete chunks reader: %w", err)
146         }
147         defer r.Close()
148         completedInstance := s.completed()
149         err = func() error {
150                 if sp, ok := completedInstance.(SizedPutter); ok && !s.opts.NoSizedPuts {
151                         return sp.PutSized(r, s.mp.Length())
152                 } else {
153                         return completedInstance.Put(r)
154                 }
155         }()
156         if err == nil && !s.opts.LeaveIncompleteChunks {
157                 // I think we do this synchronously here since we don't want callers to act on the completed
158                 // piece if we're concurrently still deleting chunks. The caller may decide to start
159                 // downloading chunks again and won't expect us to delete them. It seems to be much faster
160                 // to let the resource provider do this if possible.
161                 var wg sync.WaitGroup
162                 for _, c := range incompleteChunks {
163                         wg.Add(1)
164                         go func(c chunk) {
165                                 defer wg.Done()
166                                 c.instance.Delete()
167                         }(c)
168                 }
169                 wg.Wait()
170         }
171         return err
172 }
173
174 func (s piecePerResourcePiece) MarkNotComplete() error {
175         s.mu.Lock()
176         defer s.mu.Unlock()
177         return s.completed().Delete()
178 }
179
180 func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (int, error) {
181         s.mu.RLock()
182         defer s.mu.RUnlock()
183         if s.mustIsComplete() {
184                 return s.completed().ReadAt(b, off)
185         }
186         return s.getChunks().ReadAt(b, off)
187 }
188
189 func (s piecePerResourcePiece) WriteAt(b []byte, off int64) (n int, err error) {
190         s.mu.RLock()
191         defer s.mu.RUnlock()
192         i, err := s.rp.NewInstance(path.Join(s.incompleteDirPath(), strconv.FormatInt(off, 10)))
193         if err != nil {
194                 panic(err)
195         }
196         r := bytes.NewReader(b)
197         if sp, ok := i.(SizedPutter); ok {
198                 err = sp.PutSized(r, r.Size())
199         } else {
200                 err = i.Put(r)
201         }
202         n = len(b) - r.Len()
203         return
204 }
205
206 type chunk struct {
207         offset   int64
208         instance resource.Instance
209 }
210
211 type chunks []chunk
212
213 func (me chunks) ReadAt(b []byte, off int64) (int, error) {
214         for {
215                 if len(me) == 0 {
216                         return 0, io.EOF
217                 }
218                 if me[0].offset <= off {
219                         break
220                 }
221                 me = me[1:]
222         }
223         n, err := me[0].instance.ReadAt(b, off-me[0].offset)
224         if n == len(b) {
225                 return n, nil
226         }
227         if err == nil || err == io.EOF {
228                 n_, err := me[1:].ReadAt(b[n:], off+int64(n))
229                 return n + n_, err
230         }
231         return n, err
232 }
233
234 func (s piecePerResourcePiece) getChunks() (chunks chunks) {
235         names, err := s.incompleteDir().Readdirnames()
236         if err != nil {
237                 return
238         }
239         for _, n := range names {
240                 offset, err := strconv.ParseInt(n, 10, 64)
241                 if err != nil {
242                         panic(err)
243                 }
244                 i, err := s.rp.NewInstance(path.Join(s.incompleteDirPath(), n))
245                 if err != nil {
246                         panic(err)
247                 }
248                 chunks = append(chunks, chunk{offset, i})
249         }
250         sort.Slice(chunks, func(i, j int) bool {
251                 return chunks[i].offset < chunks[j].offset
252         })
253         return
254 }
255
256 func (s piecePerResourcePiece) completedInstancePath() string {
257         return path.Join("completed", s.mp.Hash().HexString())
258 }
259
260 func (s piecePerResourcePiece) completed() resource.Instance {
261         i, err := s.rp.NewInstance(s.completedInstancePath())
262         if err != nil {
263                 panic(err)
264         }
265         return i
266 }
267
268 func (s piecePerResourcePiece) incompleteDirPath() string {
269         return path.Join("incompleted", s.mp.Hash().HexString())
270 }
271
272 func (s piecePerResourcePiece) incompleteDir() resource.DirInstance {
273         i, err := s.rp.NewInstance(s.incompleteDirPath())
274         if err != nil {
275                 panic(err)
276         }
277         return i.(resource.DirInstance)
278 }