]> Sergey Matveev's repositories - btrtrc.git/blob - storage/piece-resource.go
Remove torrentfs-macos job from CI
[btrtrc.git] / storage / piece-resource.go
1 package storage
2
3 import (
4         "bytes"
5         "fmt"
6         "io"
7         "path"
8         "sort"
9         "strconv"
10         "sync"
11
12         "github.com/anacrolix/missinggo/v2/resource"
13
14         "github.com/anacrolix/torrent/metainfo"
15 )
16
17 type piecePerResource struct {
18         rp   PieceProvider
19         opts ResourcePiecesOpts
20 }
21
22 type ResourcePiecesOpts struct {
23         // After marking a piece complete, don't bother deleting its incomplete blobs.
24         LeaveIncompleteChunks bool
25         // Sized puts require being able to stream from a statement executed on another connection.
26         // Without them, we buffer the entire read and then put that.
27         NoSizedPuts bool
28         Capacity    *int64
29 }
30
31 func NewResourcePieces(p PieceProvider) ClientImpl {
32         return NewResourcePiecesOpts(p, ResourcePiecesOpts{})
33 }
34
35 func NewResourcePiecesOpts(p PieceProvider, opts ResourcePiecesOpts) ClientImpl {
36         return &piecePerResource{
37                 rp:   p,
38                 opts: opts,
39         }
40 }
41
42 type piecePerResourceTorrentImpl struct {
43         piecePerResource
44         locks []sync.RWMutex
45 }
46
47 func (piecePerResourceTorrentImpl) Close() error {
48         return nil
49 }
50
51 func (s piecePerResource) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) {
52         t := piecePerResourceTorrentImpl{
53                 s,
54                 make([]sync.RWMutex, info.NumPieces()),
55         }
56         return TorrentImpl{Piece: t.Piece, Close: t.Close}, nil
57 }
58
59 func (s piecePerResourceTorrentImpl) Piece(p metainfo.Piece) PieceImpl {
60         return piecePerResourcePiece{
61                 mp:               p,
62                 piecePerResource: s.piecePerResource,
63                 mu:               &s.locks[p.Index()],
64         }
65 }
66
67 type PieceProvider interface {
68         resource.Provider
69 }
70
71 type ConsecutiveChunkReader interface {
72         ReadConsecutiveChunks(prefix string) (io.ReadCloser, error)
73 }
74
75 type piecePerResourcePiece struct {
76         mp metainfo.Piece
77         piecePerResource
78         // This protects operations that move complete/incomplete pieces around, which can trigger read
79         // errors that may cause callers to do more drastic things.
80         mu *sync.RWMutex
81 }
82
83 var _ io.WriterTo = piecePerResourcePiece{}
84
85 func (s piecePerResourcePiece) WriteTo(w io.Writer) (int64, error) {
86         s.mu.RLock()
87         defer s.mu.RUnlock()
88         if s.mustIsComplete() {
89                 r, err := s.completed().Get()
90                 if err != nil {
91                         return 0, fmt.Errorf("getting complete instance: %w", err)
92                 }
93                 defer r.Close()
94                 return io.Copy(w, r)
95         }
96         if ccr, ok := s.rp.(ConsecutiveChunkReader); ok {
97                 return s.writeConsecutiveIncompleteChunks(ccr, w)
98         }
99         return io.Copy(w, io.NewSectionReader(s, 0, s.mp.Length()))
100 }
101
102 func (s piecePerResourcePiece) writeConsecutiveIncompleteChunks(ccw ConsecutiveChunkReader, w io.Writer) (int64, error) {
103         r, err := ccw.ReadConsecutiveChunks(s.incompleteDirPath() + "/")
104         if err != nil {
105                 return 0, err
106         }
107         defer r.Close()
108         return io.Copy(w, r)
109 }
110
111 // Returns if the piece is complete. Ok should be true, because we are the definitive source of
112 // truth here.
113 func (s piecePerResourcePiece) mustIsComplete() bool {
114         completion := s.Completion()
115         if !completion.Ok {
116                 panic("must know complete definitively")
117         }
118         return completion.Complete
119 }
120
121 func (s piecePerResourcePiece) Completion() Completion {
122         s.mu.RLock()
123         defer s.mu.RUnlock()
124         fi, err := s.completed().Stat()
125         return Completion{
126                 Complete: err == nil && fi.Size() == s.mp.Length(),
127                 Ok:       true,
128         }
129 }
130
131 type SizedPutter interface {
132         PutSized(io.Reader, int64) error
133 }
134
135 func (s piecePerResourcePiece) MarkComplete() error {
136         s.mu.Lock()
137         defer s.mu.Unlock()
138         incompleteChunks := s.getChunks()
139         r, err := func() (io.ReadCloser, error) {
140                 if ccr, ok := s.rp.(ConsecutiveChunkReader); ok {
141                         return ccr.ReadConsecutiveChunks(s.incompleteDirPath() + "/")
142                 }
143                 return io.NopCloser(io.NewSectionReader(incompleteChunks, 0, s.mp.Length())), nil
144         }()
145         if err != nil {
146                 return fmt.Errorf("getting incomplete chunks reader: %w", err)
147         }
148         defer r.Close()
149         completedInstance := s.completed()
150         err = func() error {
151                 if sp, ok := completedInstance.(SizedPutter); ok && !s.opts.NoSizedPuts {
152                         return sp.PutSized(r, s.mp.Length())
153                 } else {
154                         return completedInstance.Put(r)
155                 }
156         }()
157         if err == nil && !s.opts.LeaveIncompleteChunks {
158                 // I think we do this synchronously here since we don't want callers to act on the completed
159                 // piece if we're concurrently still deleting chunks. The caller may decide to start
160                 // downloading chunks again and won't expect us to delete them. It seems to be much faster
161                 // to let the resource provider do this if possible.
162                 var wg sync.WaitGroup
163                 for _, c := range incompleteChunks {
164                         wg.Add(1)
165                         go func(c chunk) {
166                                 defer wg.Done()
167                                 c.instance.Delete()
168                         }(c)
169                 }
170                 wg.Wait()
171         }
172         return err
173 }
174
175 func (s piecePerResourcePiece) MarkNotComplete() error {
176         s.mu.Lock()
177         defer s.mu.Unlock()
178         return s.completed().Delete()
179 }
180
181 func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (int, error) {
182         s.mu.RLock()
183         defer s.mu.RUnlock()
184         if s.mustIsComplete() {
185                 return s.completed().ReadAt(b, off)
186         }
187         return s.getChunks().ReadAt(b, off)
188 }
189
190 func (s piecePerResourcePiece) WriteAt(b []byte, off int64) (n int, err error) {
191         s.mu.RLock()
192         defer s.mu.RUnlock()
193         i, err := s.rp.NewInstance(path.Join(s.incompleteDirPath(), strconv.FormatInt(off, 10)))
194         if err != nil {
195                 panic(err)
196         }
197         r := bytes.NewReader(b)
198         if sp, ok := i.(SizedPutter); ok {
199                 err = sp.PutSized(r, r.Size())
200         } else {
201                 err = i.Put(r)
202         }
203         n = len(b) - r.Len()
204         return
205 }
206
207 type chunk struct {
208         offset   int64
209         instance resource.Instance
210 }
211
212 type chunks []chunk
213
214 func (me chunks) ReadAt(b []byte, off int64) (int, error) {
215         for {
216                 if len(me) == 0 {
217                         return 0, io.EOF
218                 }
219                 if me[0].offset <= off {
220                         break
221                 }
222                 me = me[1:]
223         }
224         n, err := me[0].instance.ReadAt(b, off-me[0].offset)
225         if n == len(b) {
226                 return n, nil
227         }
228         if err == nil || err == io.EOF {
229                 n_, err := me[1:].ReadAt(b[n:], off+int64(n))
230                 return n + n_, err
231         }
232         return n, err
233 }
234
235 func (s piecePerResourcePiece) getChunks() (chunks chunks) {
236         names, err := s.incompleteDir().Readdirnames()
237         if err != nil {
238                 return
239         }
240         for _, n := range names {
241                 offset, err := strconv.ParseInt(n, 10, 64)
242                 if err != nil {
243                         panic(err)
244                 }
245                 i, err := s.rp.NewInstance(path.Join(s.incompleteDirPath(), n))
246                 if err != nil {
247                         panic(err)
248                 }
249                 chunks = append(chunks, chunk{offset, i})
250         }
251         sort.Slice(chunks, func(i, j int) bool {
252                 return chunks[i].offset < chunks[j].offset
253         })
254         return
255 }
256
257 func (s piecePerResourcePiece) completedInstancePath() string {
258         return path.Join("completed", s.mp.Hash().HexString())
259 }
260
261 func (s piecePerResourcePiece) completed() resource.Instance {
262         i, err := s.rp.NewInstance(s.completedInstancePath())
263         if err != nil {
264                 panic(err)
265         }
266         return i
267 }
268
269 func (s piecePerResourcePiece) incompleteDirPath() string {
270         return path.Join("incompleted", s.mp.Hash().HexString())
271 }
272
273 func (s piecePerResourcePiece) incompleteDir() resource.DirInstance {
274         i, err := s.rp.NewInstance(s.incompleteDirPath())
275         if err != nil {
276                 panic(err)
277         }
278         return i.(resource.DirInstance)
279 }