]> Sergey Matveev's repositories - btrtrc.git/blob - storage/piece-resource.go
Rename new.go
[btrtrc.git] / storage / piece-resource.go
1 package storage
2
3 import (
4         "bytes"
5         "fmt"
6         "io"
7         "io/ioutil"
8         "path"
9         "sort"
10         "strconv"
11         "sync"
12
13         "github.com/anacrolix/missinggo/v2/resource"
14
15         "github.com/anacrolix/torrent/metainfo"
16 )
17
18 type piecePerResource struct {
19         rp   PieceProvider
20         opts ResourcePiecesOpts
21 }
22
23 type ResourcePiecesOpts struct {
24         LeaveIncompleteChunks bool
25         NoSizedPuts           bool
26 }
27
28 func NewResourcePieces(p PieceProvider) ClientImpl {
29         return NewResourcePiecesOpts(p, ResourcePiecesOpts{})
30 }
31
32 func NewResourcePiecesOpts(p PieceProvider, opts ResourcePiecesOpts) ClientImpl {
33         return &piecePerResource{
34                 rp:   p,
35                 opts: opts,
36         }
37 }
38
39 type piecePerResourceTorrentImpl struct {
40         piecePerResource
41         locks []sync.RWMutex
42 }
43
44 func (piecePerResourceTorrentImpl) Close() error {
45         return nil
46 }
47
48 func (s piecePerResource) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) {
49         return piecePerResourceTorrentImpl{
50                 s,
51                 make([]sync.RWMutex, info.NumPieces()),
52         }, nil
53 }
54
55 func (s piecePerResourceTorrentImpl) Piece(p metainfo.Piece) PieceImpl {
56         return piecePerResourcePiece{
57                 mp:               p,
58                 piecePerResource: s.piecePerResource,
59                 mu:               &s.locks[p.Index()],
60         }
61 }
62
63 type PieceProvider interface {
64         resource.Provider
65 }
66
67 type ConsecutiveChunkReader interface {
68         ReadConsecutiveChunks(prefix string) (io.ReadCloser, error)
69 }
70
71 type piecePerResourcePiece struct {
72         mp metainfo.Piece
73         piecePerResource
74         // This protects operations that move complete/incomplete pieces around, which can trigger read
75         // errors that may cause callers to do more drastic things.
76         mu *sync.RWMutex
77 }
78
79 var _ io.WriterTo = piecePerResourcePiece{}
80
81 func (s piecePerResourcePiece) WriteTo(w io.Writer) (int64, error) {
82         s.mu.RLock()
83         defer s.mu.RUnlock()
84         if s.mustIsComplete() {
85                 r, err := s.completed().Get()
86                 if err != nil {
87                         return 0, fmt.Errorf("getting complete instance: %w", err)
88                 }
89                 defer r.Close()
90                 return io.Copy(w, r)
91         }
92         if ccr, ok := s.rp.(ConsecutiveChunkReader); ok {
93                 return s.writeConsecutiveIncompleteChunks(ccr, w)
94         }
95         return io.Copy(w, io.NewSectionReader(s, 0, s.mp.Length()))
96 }
97
98 func (s piecePerResourcePiece) writeConsecutiveIncompleteChunks(ccw ConsecutiveChunkReader, w io.Writer) (int64, error) {
99         r, err := ccw.ReadConsecutiveChunks(s.incompleteDirPath() + "/")
100         if err != nil {
101                 return 0, err
102         }
103         defer r.Close()
104         return io.Copy(w, r)
105 }
106
107 // Returns if the piece is complete. Ok should be true, because we are the definitive source of
108 // truth here.
109 func (s piecePerResourcePiece) mustIsComplete() bool {
110         completion := s.Completion()
111         if !completion.Ok {
112                 panic("must know complete definitively")
113         }
114         return completion.Complete
115 }
116
117 func (s piecePerResourcePiece) Completion() Completion {
118         s.mu.RLock()
119         defer s.mu.RUnlock()
120         fi, err := s.completed().Stat()
121         return Completion{
122                 Complete: err == nil && fi.Size() == s.mp.Length(),
123                 Ok:       true,
124         }
125 }
126
127 type SizedPutter interface {
128         PutSized(io.Reader, int64) error
129 }
130
131 func (s piecePerResourcePiece) MarkComplete() error {
132         s.mu.Lock()
133         defer s.mu.Unlock()
134         incompleteChunks := s.getChunks()
135         r, err := func() (io.ReadCloser, error) {
136                 if ccr, ok := s.rp.(ConsecutiveChunkReader); ok {
137                         return ccr.ReadConsecutiveChunks(s.incompleteDirPath() + "/")
138                 }
139                 return ioutil.NopCloser(io.NewSectionReader(incompleteChunks, 0, s.mp.Length())), nil
140         }()
141         if err != nil {
142                 return fmt.Errorf("getting incomplete chunks reader: %w", err)
143         }
144         defer r.Close()
145         completedInstance := s.completed()
146         err = func() error {
147                 if sp, ok := completedInstance.(SizedPutter); ok && !s.opts.NoSizedPuts {
148                         return sp.PutSized(r, s.mp.Length())
149                 } else {
150                         return completedInstance.Put(r)
151                 }
152         }()
153         if err == nil && !s.opts.LeaveIncompleteChunks {
154                 // I think we do this synchronously here since we don't want callers to act on the completed
155                 // piece if we're concurrently still deleting chunks. The caller may decide to start
156                 // downloading chunks again and won't expect us to delete them. It seems to be much faster
157                 // to let the resource provider do this if possible.
158                 var wg sync.WaitGroup
159                 for _, c := range incompleteChunks {
160                         wg.Add(1)
161                         go func(c chunk) {
162                                 defer wg.Done()
163                                 c.instance.Delete()
164                         }(c)
165                 }
166                 wg.Wait()
167         }
168         return err
169 }
170
171 func (s piecePerResourcePiece) MarkNotComplete() error {
172         s.mu.Lock()
173         defer s.mu.Unlock()
174         return s.completed().Delete()
175 }
176
177 func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (int, error) {
178         s.mu.RLock()
179         defer s.mu.RUnlock()
180         if s.mustIsComplete() {
181                 return s.completed().ReadAt(b, off)
182         }
183         return s.getChunks().ReadAt(b, off)
184 }
185
186 func (s piecePerResourcePiece) WriteAt(b []byte, off int64) (n int, err error) {
187         s.mu.RLock()
188         defer s.mu.RUnlock()
189         i, err := s.rp.NewInstance(path.Join(s.incompleteDirPath(), strconv.FormatInt(off, 10)))
190         if err != nil {
191                 panic(err)
192         }
193         r := bytes.NewReader(b)
194         if sp, ok := i.(SizedPutter); ok {
195                 err = sp.PutSized(r, r.Size())
196         } else {
197                 err = i.Put(r)
198         }
199         n = len(b) - r.Len()
200         return
201 }
202
203 type chunk struct {
204         offset   int64
205         instance resource.Instance
206 }
207
208 type chunks []chunk
209
210 func (me chunks) ReadAt(b []byte, off int64) (int, error) {
211         for {
212                 if len(me) == 0 {
213                         return 0, io.EOF
214                 }
215                 if me[0].offset <= off {
216                         break
217                 }
218                 me = me[1:]
219         }
220         n, err := me[0].instance.ReadAt(b, off-me[0].offset)
221         if n == len(b) {
222                 return n, nil
223         }
224         if err == nil || err == io.EOF {
225                 n_, err := me[1:].ReadAt(b[n:], off+int64(n))
226                 return n + n_, err
227         }
228         return n, err
229 }
230
231 func (s piecePerResourcePiece) getChunks() (chunks chunks) {
232         names, err := s.incompleteDir().Readdirnames()
233         if err != nil {
234                 return
235         }
236         for _, n := range names {
237                 offset, err := strconv.ParseInt(n, 10, 64)
238                 if err != nil {
239                         panic(err)
240                 }
241                 i, err := s.rp.NewInstance(path.Join(s.incompleteDirPath(), n))
242                 if err != nil {
243                         panic(err)
244                 }
245                 chunks = append(chunks, chunk{offset, i})
246         }
247         sort.Slice(chunks, func(i, j int) bool {
248                 return chunks[i].offset < chunks[j].offset
249         })
250         return
251 }
252
253 func (s piecePerResourcePiece) completedInstancePath() string {
254         return path.Join("completed", s.mp.Hash().HexString())
255 }
256
257 func (s piecePerResourcePiece) completed() resource.Instance {
258         i, err := s.rp.NewInstance(s.completedInstancePath())
259         if err != nil {
260                 panic(err)
261         }
262         return i
263 }
264
265 func (s piecePerResourcePiece) incompleteDirPath() string {
266         return path.Join("incompleted", s.mp.Hash().HexString())
267 }
268
269 func (s piecePerResourcePiece) incompleteDir() resource.DirInstance {
270         i, err := s.rp.NewInstance(s.incompleteDirPath())
271         if err != nil {
272                 panic(err)
273         }
274         return i.(resource.DirInstance)
275 }