12 g "github.com/anacrolix/generics"
13 "github.com/anacrolix/missinggo/v2/resource"
14 "github.com/anacrolix/sync"
16 "github.com/anacrolix/torrent/metainfo"
19 type piecePerResource struct {
21 opts ResourcePiecesOpts
24 type ResourcePiecesOpts struct {
25 // After marking a piece complete, don't bother deleting its incomplete blobs.
26 LeaveIncompleteChunks bool
27 // Sized puts require being able to stream from a statement executed on another connection.
28 // Without them, we buffer the entire read and then put that.
33 func NewResourcePieces(p PieceProvider) ClientImpl {
34 return NewResourcePiecesOpts(p, ResourcePiecesOpts{})
37 func NewResourcePiecesOpts(p PieceProvider, opts ResourcePiecesOpts) ClientImpl {
38 return &piecePerResource{
44 type piecePerResourceTorrentImpl struct {
49 func (piecePerResourceTorrentImpl) Close() error {
53 func (s piecePerResource) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) {
54 t := piecePerResourceTorrentImpl{
56 make([]sync.RWMutex, info.NumPieces()),
58 return TorrentImpl{PieceWithHash: t.Piece, Close: t.Close}, nil
61 func (s piecePerResourceTorrentImpl) Piece(p metainfo.Piece, pieceHash g.Option[[]byte]) PieceImpl {
62 return piecePerResourcePiece{
65 piecePerResource: s.piecePerResource,
66 mu: &s.locks[p.Index()],
70 type PieceProvider interface {
74 type ConsecutiveChunkReader interface {
75 ReadConsecutiveChunks(prefix string) (io.ReadCloser, error)
78 type PrefixDeleter interface {
79 DeletePrefix(prefix string) error
82 type piecePerResourcePiece struct {
84 // The piece hash if we have it. It could be 20 or 32 bytes depending on the info version.
85 pieceHash g.Option[[]byte]
87 // This protects operations that move complete/incomplete pieces around, which can trigger read
88 // errors that may cause callers to do more drastic things.
92 var _ io.WriterTo = piecePerResourcePiece{}
94 func (s piecePerResourcePiece) WriteTo(w io.Writer) (int64, error) {
97 if s.mustIsComplete() {
98 r, err := s.completed().Get()
100 return 0, fmt.Errorf("getting complete instance: %w", err)
105 if ccr, ok := s.rp.(ConsecutiveChunkReader); ok {
106 return s.writeConsecutiveIncompleteChunks(ccr, w)
108 return io.Copy(w, io.NewSectionReader(s, 0, s.mp.Length()))
111 func (s piecePerResourcePiece) writeConsecutiveIncompleteChunks(ccw ConsecutiveChunkReader, w io.Writer) (int64, error) {
112 r, err := ccw.ReadConsecutiveChunks(s.incompleteDirPath() + "/")
120 // Returns if the piece is complete. Ok should be true, because we are the definitive source of
122 func (s piecePerResourcePiece) mustIsComplete() bool {
123 completion := s.Completion()
125 panic("must know complete definitively")
127 return completion.Complete
130 func (s piecePerResourcePiece) Completion() (_ Completion) {
136 fi, err := s.completed().Stat()
138 Complete: err == nil && fi.Size() == s.mp.Length(),
143 type SizedPutter interface {
144 PutSized(io.Reader, int64) error
147 func (s piecePerResourcePiece) MarkComplete() (err error) {
150 incompleteChunks := s.getChunks()
151 r, err := func() (io.ReadCloser, error) {
152 if ccr, ok := s.rp.(ConsecutiveChunkReader); ok {
153 return ccr.ReadConsecutiveChunks(s.incompleteDirPath() + "/")
155 return io.NopCloser(io.NewSectionReader(incompleteChunks, 0, s.mp.Length())), nil
158 return fmt.Errorf("getting incomplete chunks reader: %w", err)
161 completedInstance := s.completed()
163 if sp, ok := completedInstance.(SizedPutter); ok && !s.opts.NoSizedPuts {
164 return sp.PutSized(r, s.mp.Length())
166 return completedInstance.Put(r)
169 if err != nil || s.opts.LeaveIncompleteChunks {
173 // I think we do this synchronously here since we don't want callers to act on the completed
174 // piece if we're concurrently still deleting chunks. The caller may decide to start
175 // downloading chunks again and won't expect us to delete them. It seems to be much faster
176 // to let the resource provider do this if possible.
177 if pd, ok := s.rp.(PrefixDeleter); ok {
178 err = pd.DeletePrefix(s.incompleteDirPath() + "/")
180 err = fmt.Errorf("deleting incomplete prefix: %w", err)
183 var wg sync.WaitGroup
184 for _, c := range incompleteChunks {
196 func (s piecePerResourcePiece) MarkNotComplete() error {
199 return s.completed().Delete()
202 func (s piecePerResourcePiece) ReadAt(b []byte, off int64) (int, error) {
205 if s.mustIsComplete() {
206 return s.completed().ReadAt(b, off)
208 return s.getChunks().ReadAt(b, off)
211 func (s piecePerResourcePiece) WriteAt(b []byte, off int64) (n int, err error) {
214 i, err := s.rp.NewInstance(path.Join(s.incompleteDirPath(), strconv.FormatInt(off, 10)))
218 r := bytes.NewReader(b)
219 if sp, ok := i.(SizedPutter); ok {
220 err = sp.PutSized(r, r.Size())
230 instance resource.Instance
235 func (me chunks) ReadAt(b []byte, off int64) (int, error) {
240 if me[0].offset <= off {
245 n, err := me[0].instance.ReadAt(b, off-me[0].offset)
249 if err == nil || err == io.EOF {
250 n_, err := me[1:].ReadAt(b[n:], off+int64(n))
256 func (s piecePerResourcePiece) getChunks() (chunks chunks) {
257 names, err := s.incompleteDir().Readdirnames()
261 for _, n := range names {
262 offset, err := strconv.ParseInt(n, 10, 64)
266 i, err := s.rp.NewInstance(path.Join(s.incompleteDirPath(), n))
270 chunks = append(chunks, chunk{offset, i})
272 sort.Slice(chunks, func(i, j int) bool {
273 return chunks[i].offset < chunks[j].offset
278 func (s piecePerResourcePiece) completedInstancePath() string {
279 return path.Join("completed", s.hashHex())
282 func (s piecePerResourcePiece) completed() resource.Instance {
283 i, err := s.rp.NewInstance(s.completedInstancePath())
290 func (s piecePerResourcePiece) incompleteDirPath() string {
291 return path.Join("incompleted", s.hashHex())
294 func (s piecePerResourcePiece) incompleteDir() resource.DirInstance {
295 i, err := s.rp.NewInstance(s.incompleteDirPath())
299 return i.(resource.DirInstance)
302 func (me piecePerResourcePiece) hashHex() string {
303 return hex.EncodeToString(me.pieceHash.Unwrap())