client.go | 28 ++-------------------------- client_test.go | 6 +++++- torrent.go | 64 ++++++++++++++++++++++++++++++----------------------- diff --git a/client.go b/client.go index 5dfa4b05ad592cca27643d5f9e15e81ef960cf13..7b9797e7069d9991f2595cc8f7356a776d10ef54 100644 --- a/client.go +++ b/client.go @@ -1783,34 +1783,9 @@ } return nil } -func (cl *Client) startTorrent(t *torrent) { - if t.Info == nil || t.data == nil { - panic("nope") - } - // If the client intends to upload, it needs to know what state pieces are - // in. - if !cl.config.NoUpload { - // Queue all pieces for hashing. This is done sequentially to avoid - // spamming goroutines. - for i := range t.Pieces { - t.Pieces[i].QueuedForHash = true - } - go func() { - for i := range t.Pieces { - cl.verifyPiece(t, i) - } - }() - } -} - -// Storage cannot be changed once it's set. func (cl *Client) setStorage(t *torrent, td Data) (err error) { - err = t.setStorage(td) + t.setStorage(td) cl.event.Broadcast() - if err != nil { - return - } - cl.startTorrent(t) return } @@ -2450,6 +2425,7 @@ if err != nil { log.Printf("%T: error completing piece %d: %s", t.data, piece, err) correct = false } + t.updatePieceCompletion(piece) } else if len(touchers) != 0 { log.Printf("dropping %d conns that touched piece", len(touchers)) for _, c := range touchers { diff --git a/client_test.go b/client_test.go index 6e1c9b4058be0edf8da27cf15df59581daa15812..9e8f1c5042b1b57d6001e75665eeb45243468b63 100644 --- a/client_test.go +++ b/client_test.go @@ -434,7 +434,11 @@ return 0, nil } func (me badData) WriteSectionTo(w io.Writer, off, n int64) (int64, error) { - return 0, nil + written, err := w.Write([]byte("hello")) + if err == nil { + err = io.ErrUnexpectedEOF + } + return int64(written), err } func (me badData) PieceComplete(piece int) bool { diff --git a/torrent.go b/torrent.go index c8e5e5ba3d351256d0e54432333940b0a264183b..53b5788f930c44b1b7966dde52ca9728afa17e60 100644 --- a/torrent.go +++ b/torrent.go @@ -17,7 +17,6 @@ "github.com/anacrolix/missinggo/bitmap" "github.com/anacrolix/missinggo/itertools" "github.com/anacrolix/missinggo/perf" "github.com/anacrolix/missinggo/pubsub" - "github.com/bradfitz/iter" "github.com/anacrolix/torrent/bencode" "github.com/anacrolix/torrent/metainfo" @@ -104,7 +103,8 @@ gotMetainfo chan struct{} readers map[*Reader]struct{} - pendingPieces bitmap.Bitmap + pendingPieces bitmap.Bitmap + completedPieces bitmap.Bitmap connPieceInclinationPool sync.Pool } @@ -120,6 +120,10 @@ t.displayName = dn } func (t *torrent) pieceComplete(piece int) bool { + return t.completedPieces.Get(piece) +} + +func (t *torrent) pieceCompleteUncached(piece int) bool { // TODO: This is called when setting metadata, and before storage is // assigned, which doesn't seem right. return t.data != nil && t.data.PieceComplete(piece) @@ -267,12 +271,24 @@ } return } -func (t *torrent) setStorage(td Data) (err error) { +func (t *torrent) setStorage(td Data) { if t.data != nil { t.data.Close() } t.data = td - return + t.completedPieces.Clear() + for i := range t.Pieces { + t.Pieces[i].QueuedForHash = true + } + go func() { + for i := range t.Pieces { + t.verifyPiece(i) + } + }() +} + +func (t *torrent) verifyPiece(piece int) { + t.cl.verifyPiece(t, piece) } func (t *torrent) haveAllMetadataPieces() bool { @@ -532,12 +548,7 @@ return t.Info.NumPieces() } func (t *torrent) numPiecesCompleted() (num int) { - for i := range iter.N(t.Info.NumPieces()) { - if t.pieceComplete(i) { - num++ - } - } - return + return t.completedPieces.Len() } func (t *torrent) isClosed() bool { @@ -588,9 +599,11 @@ return } func (t *torrent) bitfield() (bf []bool) { - for i := range t.Pieces { - bf = append(bf, t.havePiece(i)) - } + bf = make([]bool, t.numPieces()) + t.completedPieces.IterTyped(func(piece int) (again bool) { + bf[piece] = true + return true + }) return } @@ -678,12 +691,7 @@ func (t *torrent) haveAllPieces() bool { if !t.haveInfo() { return false } - for i := range t.Pieces { - if !t.pieceComplete(i) { - return false - } - } - return true + return t.completedPieces.Len() == t.numPieces() } func (me *torrent) haveAnyPieces() bool { @@ -877,10 +885,11 @@ newPrios[i].Raise(PiecePriorityReadahead) } return true }) + t.completedPieces.IterTyped(func(piece int) (more bool) { + newPrios[piece] = PiecePriorityNone + return true + }) for i, prio := range newPrios { - if t.pieceComplete(i) { - prio = PiecePriorityNone - } if prio != t.Pieces[i].priority { t.Pieces[i].priority = prio t.piecePriorityChanged(i) @@ -970,12 +979,7 @@ t.piecePriorityChanged(piece) } func (t *torrent) getCompletedPieces() (ret bitmap.Bitmap) { - for i := range iter.N(t.numPieces()) { - if t.pieceComplete(i) { - ret.Add(i) - } - } - return + return t.completedPieces.Copy() } func (t *torrent) unpendPieces(unpend *bitmap.Bitmap) { @@ -1033,3 +1037,7 @@ func (t *torrent) putPieceInclination(pi []int) { t.connPieceInclinationPool.Put(pi) pieceInclinationsPut.Add(1) } + +func (t *torrent) updatePieceCompletion(piece int) { + t.completedPieces.Set(piece, t.pieceCompleteUncached(piece)) +}