13 "github.com/anacrolix/missinggo/v2/resource"
15 "github.com/anacrolix/torrent/metainfo"
18 type piecePerResource struct {
20 opts ResourcePiecesOpts
23 type ResourcePiecesOpts struct {
24 LeaveIncompleteChunks bool
28 func NewResourcePieces(p PieceProvider) ClientImpl {
29 return NewResourcePiecesOpts(p, ResourcePiecesOpts{})
32 func NewResourcePiecesOpts(p PieceProvider, opts ResourcePiecesOpts) ClientImpl {
33 return &piecePerResource{
39 type piecePerResourceTorrentImpl struct {
44 func (piecePerResourceTorrentImpl) Close() error {
48 func (s piecePerResource) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) {
49 return piecePerResourceTorrentImpl{
51 make([]sync.RWMutex, info.NumPieces()),
55 func (s piecePerResourceTorrentImpl) Piece(p metainfo.Piece) PieceImpl {
56 return piecePerResourcePiece{
58 piecePerResource: s.piecePerResource,
59 mu: &s.locks[p.Index()],
63 type PieceProvider interface {
67 type ConsecutiveChunkReader interface {
68 ReadConsecutiveChunks(prefix string) (io.ReadCloser, error)
71 type piecePerResourcePiece struct {
74 // This protects operations that move complete/incomplete pieces around, which can trigger read
75 // errors that may cause callers to do more drastic things.
79 var _ io.WriterTo = piecePerResourcePiece{}
81 func (s piecePerResourcePiece) WriteTo(w io.Writer) (int64, error) {
84 if s.mustIsComplete() {
85 r, err := s.completed().Get()
87 return 0, fmt.Errorf("getting complete instance: %w", err)
92 if ccr, ok := s.rp.(ConsecutiveChunkReader); ok {
93 return s.writeConsecutiveIncompleteChunks(ccr, w)
95 return io.Copy(w, io.NewSectionReader(s, 0, s.mp.Length()))
98 func (s piecePerResourcePiece) writeConsecutiveIncompleteChunks(ccw ConsecutiveChunkReader, w io.Writer) (int64, error) {
99 r, err := ccw.ReadConsecutiveChunks(s.incompleteDirPath() + "/")
107 // Returns if the piece is complete. Ok should be true, because we are the definitive source of
109 func (s piecePerResourcePiece) mustIsComplete() bool {
110 completion := s.Completion()
112 panic("must know complete definitively")
114 return completion.Complete
117 func (s piecePerResourcePiece) Completion() Completion {
120 fi, err := s.completed().Stat()
122 Complete: err == nil && fi.Size() == s.mp.Length(),
127 type SizedPutter interface {
128 PutSized(io.Reader, int64) error
131 func (s piecePerResourcePiece) MarkComplete() error {
134 incompleteChunks := s.getChunks()
135 r, err := func() (io.ReadCloser, error) {
136 if ccr, ok := s.rp.(ConsecutiveChunkReader); ok {
137 return ccr.ReadConsecutiveChunks(s.incompleteDirPath() + "/")
139 return ioutil.NopCloser(io.NewSectionReader(incompleteChunks, 0, s.mp.Length())), nil
142 return fmt.Errorf("getting incomplete chunks reader: %w", err)
145 completedInstance := s.completed()
147 if sp, ok := completedInstance.(SizedPutter); ok && !s.opts.NoSizedPuts {
148 return sp.PutSized(r, s.mp.Length())
150 return completedInstance.Put(r)
153 if err == nil && !s.opts.LeaveIncompleteChunks {
154 // I think we do this synchronously here since we don't want callers to act on the completed
155 // piece if we're concurrently still deleting chunks. The caller may decide to start
156 // downloading chunks again and won't expect us to delete them. It seems to be much faster
157 // to let the resource provider do this if possible.
158 var wg sync.WaitGroup
159 for _, c := range incompleteChunks {
171 func (s piecePerResourcePiece) MarkNotComplete() error {
174 return s.completed().Delete()
177 func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (int, error) {
180 if s.mustIsComplete() {
181 return s.completed().ReadAt(b, off)
183 return s.getChunks().ReadAt(b, off)
186 func (s piecePerResourcePiece) WriteAt(b []byte, off int64) (n int, err error) {
189 i, err := s.rp.NewInstance(path.Join(s.incompleteDirPath(), strconv.FormatInt(off, 10)))
193 r := bytes.NewReader(b)
194 if sp, ok := i.(SizedPutter); ok {
195 err = sp.PutSized(r, r.Size())
205 instance resource.Instance
210 func (me chunks) ReadAt(b []byte, off int64) (int, error) {
215 if me[0].offset <= off {
220 n, err := me[0].instance.ReadAt(b, off-me[0].offset)
224 if err == nil || err == io.EOF {
225 n_, err := me[1:].ReadAt(b[n:], off+int64(n))
231 func (s piecePerResourcePiece) getChunks() (chunks chunks) {
232 names, err := s.incompleteDir().Readdirnames()
236 for _, n := range names {
237 offset, err := strconv.ParseInt(n, 10, 64)
241 i, err := s.rp.NewInstance(path.Join(s.incompleteDirPath(), n))
245 chunks = append(chunks, chunk{offset, i})
247 sort.Slice(chunks, func(i, j int) bool {
248 return chunks[i].offset < chunks[j].offset
253 func (s piecePerResourcePiece) completedInstancePath() string {
254 return path.Join("completed", s.mp.Hash().HexString())
257 func (s piecePerResourcePiece) completed() resource.Instance {
258 i, err := s.rp.NewInstance(s.completedInstancePath())
265 func (s piecePerResourcePiece) incompleteDirPath() string {
266 return path.Join("incompleted", s.mp.Hash().HexString())
269 func (s piecePerResourcePiece) incompleteDir() resource.DirInstance {
270 i, err := s.rp.NewInstance(s.incompleteDirPath())
274 return i.(resource.DirInstance)