}
piece.QueuedForHash = true
t.publishPieceChange(pieceIndex)
- go cl.verifyPiece(t, pieceIndex)
+ go t.verifyPiece(pieceIndex)
}
// Queue a piece check if one isn't already queued, and the piece has never
}
}
-// Also handles choking and unchoking of the remote peer.
-func (cl *Client) upload(t *Torrent, c *connection) {
- if cl.config.NoUpload {
- return
- }
- if !c.PeerInterested {
- return
- }
- seeding := t.seeding()
- if !seeding && !t.connHasWantedPieces(c) {
- // There's no reason to upload to this peer.
- return
- }
- // Breaking or completing this loop means we don't want to upload to the
- // peer anymore, and we choke them.
-another:
- for seeding || c.chunksSent < c.UsefulChunksReceived+6 {
- // We want to upload to the peer.
- c.Unchoke()
- for r := range c.PeerRequests {
- res := cl.uploadLimit.ReserveN(time.Now(), int(r.Length))
- delay := res.Delay()
- if delay > 0 {
- res.Cancel()
- go func() {
- time.Sleep(delay)
- cl.mu.Lock()
- defer cl.mu.Unlock()
- cl.upload(t, c)
- }()
- return
- }
- err := cl.sendChunk(t, c, r)
- if err != nil {
- i := int(r.Index)
- if t.pieceComplete(i) {
- t.updatePieceCompletion(i)
- if !t.pieceComplete(i) {
- // We had the piece, but not anymore.
- break another
- }
- }
- log.Printf("error sending chunk %+v to peer: %s", r, err)
- // If we failed to send a chunk, choke the peer to ensure they
- // flush all their requests. We've probably dropped a piece,
- // but there's no way to communicate this to the peer. If they
- // ask for it again, we'll kick them to allow us to send them
- // an updated bitfield.
- break another
- }
- delete(c.PeerRequests, r)
- goto another
- }
- return
- }
- c.Choke()
-}
-
func (cl *Client) sendChunk(t *Torrent, c *connection, r request) error {
// Count the chunk being sent, even if it isn't.
b := make([]byte, r.Length)
return true
}
-// Return the connections that touched a piece, and clear the entry while
-// doing it.
-func (cl *Client) reapPieceTouches(t *Torrent, piece int) (ret []*connection) {
- for _, c := range t.conns {
- if _, ok := c.peerTouchedPieces[piece]; ok {
- ret = append(ret, c)
- delete(c.peerTouchedPieces, piece)
- }
- }
- return
-}
-
-func (cl *Client) pieceHashed(t *Torrent, piece int, correct bool) {
- if t.closed.IsSet() {
- return
- }
- p := &t.pieces[piece]
- if p.EverHashed {
- // Don't score the first time a piece is hashed, it could be an
- // initial check.
- if correct {
- pieceHashedCorrect.Add(1)
- } else {
- log.Printf("%s: piece %d (%x) failed hash", t, piece, p.Hash)
- pieceHashedNotCorrect.Add(1)
- }
- }
- p.EverHashed = true
- touchers := cl.reapPieceTouches(t, piece)
- if correct {
- for _, c := range touchers {
- c.goodPiecesDirtied++
- }
- err := p.Storage().MarkComplete()
- if err != nil {
- log.Printf("%T: error completing piece %d: %s", t.storage, piece, err)
- }
- t.updatePieceCompletion(piece)
- } else if len(touchers) != 0 {
- log.Printf("dropping and banning %d conns that touched piece", len(touchers))
- for _, c := range touchers {
- c.badPiecesDirtied++
- t.cl.banPeerIP(missinggo.AddrIP(c.remoteAddr()))
- t.dropConnection(c)
- }
- }
- cl.pieceChanged(t, piece)
-}
-
-func (cl *Client) onCompletedPiece(t *Torrent, piece int) {
- t.pendingPieces.Remove(piece)
- t.pendAllChunkSpecs(piece)
- for _, conn := range t.conns {
- conn.Have(piece)
- for r := range conn.Requests {
- if int(r.Index) == piece {
- conn.Cancel(r)
- }
- }
- // Could check here if peer doesn't have piece, but due to caching
- // some peers may have said they have a piece but they don't.
- cl.upload(t, conn)
- }
-}
-
-func (cl *Client) onFailedPiece(t *Torrent, piece int) {
- if t.pieceAllDirty(piece) {
- t.pendAllChunkSpecs(piece)
- }
- if !t.wantPieceIndex(piece) {
- return
- }
- cl.openNewConns(t)
- for _, conn := range t.conns {
- if conn.PeerHasPiece(piece) {
- conn.updateRequests()
- }
- }
-}
-
-func (cl *Client) pieceChanged(t *Torrent, piece int) {
- correct := t.pieceComplete(piece)
- defer cl.event.Broadcast()
- if correct {
- cl.onCompletedPiece(t, piece)
- } else {
- cl.onFailedPiece(t, piece)
- }
- t.updatePiecePriority(piece)
-}
-
-func (cl *Client) verifyPiece(t *Torrent, piece int) {
- cl.mu.Lock()
- defer cl.mu.Unlock()
- p := &t.pieces[piece]
- for p.Hashing || t.storage == nil {
- cl.event.Wait()
- }
- p.QueuedForHash = false
- if t.closed.IsSet() || t.pieceComplete(piece) {
- t.updatePiecePriority(piece)
- return
- }
- p.Hashing = true
- t.publishPieceChange(piece)
- cl.mu.Unlock()
- sum := t.hashPiece(piece)
- cl.mu.Lock()
- p.Hashing = false
- cl.pieceHashed(t, piece, sum == p.Hash)
-}
-
// Returns handles to all the torrents loaded in the Client.
func (cl *Client) Torrents() (ret []*Torrent) {
cl.mu.Lock()
cl.peerUnchoked(t, c)
case pp.Interested:
c.PeerInterested = true
- cl.upload(t, c)
+ c.upload()
case pp.NotInterested:
c.PeerInterested = false
c.Choke()
c.PeerRequests = make(map[request]struct{}, maxRequests)
}
c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{}
- cl.upload(t, c)
+ c.upload()
case pp.Cancel:
req := newRequest(msg.Index, msg.Begin, msg.Length)
if !c.PeerCancel(req) {
c.UsefulChunksReceived++
c.lastUsefulChunkReceived = time.Now()
- cl.upload(t, c)
+ c.upload()
// Need to record that it hasn't been written yet, before we attempt to do
// anything with it.
t.publishPieceChange(int(req.Index))
return
}
+
+// Also handles choking and unchoking of the remote peer.
+func (c *connection) upload() {
+ t := c.t
+ cl := t.cl
+ if cl.config.NoUpload {
+ return
+ }
+ if !c.PeerInterested {
+ return
+ }
+ seeding := t.seeding()
+ if !seeding && !t.connHasWantedPieces(c) {
+ // There's no reason to upload to this peer.
+ return
+ }
+ // Breaking or completing this loop means we don't want to upload to the
+ // peer anymore, and we choke them.
+another:
+ for seeding || c.chunksSent < c.UsefulChunksReceived+6 {
+ // We want to upload to the peer.
+ c.Unchoke()
+ for r := range c.PeerRequests {
+ res := cl.uploadLimit.ReserveN(time.Now(), int(r.Length))
+ delay := res.Delay()
+ if delay > 0 {
+ res.Cancel()
+ go func() {
+ time.Sleep(delay)
+ cl.mu.Lock()
+ defer cl.mu.Unlock()
+ c.upload()
+ }()
+ return
+ }
+ err := cl.sendChunk(t, c, r)
+ if err != nil {
+ i := int(r.Index)
+ if t.pieceComplete(i) {
+ t.updatePieceCompletion(i)
+ if !t.pieceComplete(i) {
+ // We had the piece, but not anymore.
+ break another
+ }
+ }
+ log.Printf("error sending chunk %+v to peer: %s", r, err)
+ // If we failed to send a chunk, choke the peer to ensure they
+ // flush all their requests. We've probably dropped a piece,
+ // but there's no way to communicate this to the peer. If they
+ // ask for it again, we'll kick them to allow us to send them
+ // an updated bitfield.
+ break another
+ }
+ delete(c.PeerRequests, r)
+ goto another
+ }
+ return
+ }
+ c.Choke()
+}
return nil
}
-func (t *Torrent) verifyPiece(piece int) {
- t.cl.verifyPiece(t, piece)
-}
-
func (t *Torrent) haveAllMetadataPieces() bool {
if t.haveInfo() {
return true
}
func (t *Torrent) pieceChanged(piece int) {
- t.cl.pieceChanged(t, piece)
+ correct := t.pieceComplete(piece)
+ defer t.cl.event.Broadcast()
+ if correct {
+ t.onCompletedPiece(piece)
+ } else {
+ t.onFailedPiece(piece)
+ }
+ t.updatePiecePriority(piece)
}
func (t *Torrent) openNewConns() {
func (t *Torrent) mu() missinggo.RWLocker {
return &t.cl.mu
}
+
+func (t *Torrent) pieceHashed(piece int, correct bool) {
+ if t.closed.IsSet() {
+ return
+ }
+ p := &t.pieces[piece]
+ if p.EverHashed {
+ // Don't score the first time a piece is hashed, it could be an
+ // initial check.
+ if correct {
+ pieceHashedCorrect.Add(1)
+ } else {
+ log.Printf("%s: piece %d (%x) failed hash", t, piece, p.Hash)
+ pieceHashedNotCorrect.Add(1)
+ }
+ }
+ p.EverHashed = true
+ touchers := t.reapPieceTouches(piece)
+ if correct {
+ for _, c := range touchers {
+ c.goodPiecesDirtied++
+ }
+ err := p.Storage().MarkComplete()
+ if err != nil {
+ log.Printf("%T: error completing piece %d: %s", t.storage, piece, err)
+ }
+ t.updatePieceCompletion(piece)
+ } else if len(touchers) != 0 {
+ log.Printf("dropping and banning %d conns that touched piece", len(touchers))
+ for _, c := range touchers {
+ c.badPiecesDirtied++
+ t.cl.banPeerIP(missinggo.AddrIP(c.remoteAddr()))
+ t.dropConnection(c)
+ }
+ }
+ t.pieceChanged(piece)
+}
+
+func (t *Torrent) onCompletedPiece(piece int) {
+ t.pendingPieces.Remove(piece)
+ t.pendAllChunkSpecs(piece)
+ for _, conn := range t.conns {
+ conn.Have(piece)
+ for r := range conn.Requests {
+ if int(r.Index) == piece {
+ conn.Cancel(r)
+ }
+ }
+ // Could check here if peer doesn't have piece, but due to caching
+ // some peers may have said they have a piece but they don't.
+ conn.upload()
+ }
+}
+
+func (t *Torrent) onFailedPiece(piece int) {
+ cl := t.cl
+ if t.pieceAllDirty(piece) {
+ t.pendAllChunkSpecs(piece)
+ }
+ if !t.wantPieceIndex(piece) {
+ return
+ }
+ cl.openNewConns(t)
+ for _, conn := range t.conns {
+ if conn.PeerHasPiece(piece) {
+ conn.updateRequests()
+ }
+ }
+}
+
+func (t *Torrent) verifyPiece(piece int) {
+ cl := t.cl
+ cl.mu.Lock()
+ defer cl.mu.Unlock()
+ p := &t.pieces[piece]
+ for p.Hashing || t.storage == nil {
+ cl.event.Wait()
+ }
+ p.QueuedForHash = false
+ if t.closed.IsSet() || t.pieceComplete(piece) {
+ t.updatePiecePriority(piece)
+ return
+ }
+ p.Hashing = true
+ t.publishPieceChange(piece)
+ cl.mu.Unlock()
+ sum := t.hashPiece(piece)
+ cl.mu.Lock()
+ p.Hashing = false
+ t.pieceHashed(piece, sum == p.Hash)
+}
+
+// Return the connections that touched a piece, and clear the entry while
+// doing it.
+func (t *Torrent) reapPieceTouches(piece int) (ret []*connection) {
+ for _, c := range t.conns {
+ if _, ok := c.peerTouchedPieces[piece]; ok {
+ ret = append(ret, c)
+ delete(c.peerTouchedPieces, piece)
+ }
+ }
+ return
+}