]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Move a bunch of Client methods onto more appropriate types
authorMatt Joiner <anacrolix@gmail.com>
Tue, 22 Nov 2016 10:12:53 +0000 (21:12 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 22 Nov 2016 10:12:53 +0000 (21:12 +1100)
client.go
connection.go
torrent.go

index 7ce5686f04e32ad2479c9531189906f10e9e2baa..8c356eea324b2dc1981eb442f30d4d079d739511 100644 (file)
--- a/client.go
+++ b/client.go
@@ -42,7 +42,7 @@ func (cl *Client) queuePieceCheck(t *Torrent, pieceIndex int) {
        }
        piece.QueuedForHash = true
        t.publishPieceChange(pieceIndex)
-       go cl.verifyPiece(t, pieceIndex)
+       go t.verifyPiece(pieceIndex)
 }
 
 // Queue a piece check if one isn't already queued, and the piece has never
@@ -1081,64 +1081,6 @@ func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connect
        }
 }
 
-// Also handles choking and unchoking of the remote peer.
-func (cl *Client) upload(t *Torrent, c *connection) {
-       if cl.config.NoUpload {
-               return
-       }
-       if !c.PeerInterested {
-               return
-       }
-       seeding := t.seeding()
-       if !seeding && !t.connHasWantedPieces(c) {
-               // There's no reason to upload to this peer.
-               return
-       }
-       // Breaking or completing this loop means we don't want to upload to the
-       // peer anymore, and we choke them.
-another:
-       for seeding || c.chunksSent < c.UsefulChunksReceived+6 {
-               // We want to upload to the peer.
-               c.Unchoke()
-               for r := range c.PeerRequests {
-                       res := cl.uploadLimit.ReserveN(time.Now(), int(r.Length))
-                       delay := res.Delay()
-                       if delay > 0 {
-                               res.Cancel()
-                               go func() {
-                                       time.Sleep(delay)
-                                       cl.mu.Lock()
-                                       defer cl.mu.Unlock()
-                                       cl.upload(t, c)
-                               }()
-                               return
-                       }
-                       err := cl.sendChunk(t, c, r)
-                       if err != nil {
-                               i := int(r.Index)
-                               if t.pieceComplete(i) {
-                                       t.updatePieceCompletion(i)
-                                       if !t.pieceComplete(i) {
-                                               // We had the piece, but not anymore.
-                                               break another
-                                       }
-                               }
-                               log.Printf("error sending chunk %+v to peer: %s", r, err)
-                               // If we failed to send a chunk, choke the peer to ensure they
-                               // flush all their requests. We've probably dropped a piece,
-                               // but there's no way to communicate this to the peer. If they
-                               // ask for it again, we'll kick them to allow us to send them
-                               // an updated bitfield.
-                               break another
-                       }
-                       delete(c.PeerRequests, r)
-                       goto another
-               }
-               return
-       }
-       c.Choke()
-}
-
 func (cl *Client) sendChunk(t *Torrent, c *connection, r request) error {
        // Count the chunk being sent, even if it isn't.
        b := make([]byte, r.Length)
@@ -1332,118 +1274,6 @@ func (cl *Client) WaitAll() bool {
        return true
 }
 
-// Return the connections that touched a piece, and clear the entry while
-// doing it.
-func (cl *Client) reapPieceTouches(t *Torrent, piece int) (ret []*connection) {
-       for _, c := range t.conns {
-               if _, ok := c.peerTouchedPieces[piece]; ok {
-                       ret = append(ret, c)
-                       delete(c.peerTouchedPieces, piece)
-               }
-       }
-       return
-}
-
-func (cl *Client) pieceHashed(t *Torrent, piece int, correct bool) {
-       if t.closed.IsSet() {
-               return
-       }
-       p := &t.pieces[piece]
-       if p.EverHashed {
-               // Don't score the first time a piece is hashed, it could be an
-               // initial check.
-               if correct {
-                       pieceHashedCorrect.Add(1)
-               } else {
-                       log.Printf("%s: piece %d (%x) failed hash", t, piece, p.Hash)
-                       pieceHashedNotCorrect.Add(1)
-               }
-       }
-       p.EverHashed = true
-       touchers := cl.reapPieceTouches(t, piece)
-       if correct {
-               for _, c := range touchers {
-                       c.goodPiecesDirtied++
-               }
-               err := p.Storage().MarkComplete()
-               if err != nil {
-                       log.Printf("%T: error completing piece %d: %s", t.storage, piece, err)
-               }
-               t.updatePieceCompletion(piece)
-       } else if len(touchers) != 0 {
-               log.Printf("dropping and banning %d conns that touched piece", len(touchers))
-               for _, c := range touchers {
-                       c.badPiecesDirtied++
-                       t.cl.banPeerIP(missinggo.AddrIP(c.remoteAddr()))
-                       t.dropConnection(c)
-               }
-       }
-       cl.pieceChanged(t, piece)
-}
-
-func (cl *Client) onCompletedPiece(t *Torrent, piece int) {
-       t.pendingPieces.Remove(piece)
-       t.pendAllChunkSpecs(piece)
-       for _, conn := range t.conns {
-               conn.Have(piece)
-               for r := range conn.Requests {
-                       if int(r.Index) == piece {
-                               conn.Cancel(r)
-                       }
-               }
-               // Could check here if peer doesn't have piece, but due to caching
-               // some peers may have said they have a piece but they don't.
-               cl.upload(t, conn)
-       }
-}
-
-func (cl *Client) onFailedPiece(t *Torrent, piece int) {
-       if t.pieceAllDirty(piece) {
-               t.pendAllChunkSpecs(piece)
-       }
-       if !t.wantPieceIndex(piece) {
-               return
-       }
-       cl.openNewConns(t)
-       for _, conn := range t.conns {
-               if conn.PeerHasPiece(piece) {
-                       conn.updateRequests()
-               }
-       }
-}
-
-func (cl *Client) pieceChanged(t *Torrent, piece int) {
-       correct := t.pieceComplete(piece)
-       defer cl.event.Broadcast()
-       if correct {
-               cl.onCompletedPiece(t, piece)
-       } else {
-               cl.onFailedPiece(t, piece)
-       }
-       t.updatePiecePriority(piece)
-}
-
-func (cl *Client) verifyPiece(t *Torrent, piece int) {
-       cl.mu.Lock()
-       defer cl.mu.Unlock()
-       p := &t.pieces[piece]
-       for p.Hashing || t.storage == nil {
-               cl.event.Wait()
-       }
-       p.QueuedForHash = false
-       if t.closed.IsSet() || t.pieceComplete(piece) {
-               t.updatePiecePriority(piece)
-               return
-       }
-       p.Hashing = true
-       t.publishPieceChange(piece)
-       cl.mu.Unlock()
-       sum := t.hashPiece(piece)
-       cl.mu.Lock()
-       p.Hashing = false
-       cl.pieceHashed(t, piece, sum == p.Hash)
-}
-
 // Returns handles to all the torrents loaded in the Client.
 func (cl *Client) Torrents() (ret []*Torrent) {
        cl.mu.Lock()
index fbc888c3fbedf766783d3ee50f61c5d40743a151..06cb7ca8fb3ba312fc1ddc828ab0603daead393a 100644 (file)
@@ -741,7 +741,7 @@ func (c *connection) mainReadLoop() error {
                        cl.peerUnchoked(t, c)
                case pp.Interested:
                        c.PeerInterested = true
-                       cl.upload(t, c)
+                       c.upload()
                case pp.NotInterested:
                        c.PeerInterested = false
                        c.Choke()
@@ -767,7 +767,7 @@ func (c *connection) mainReadLoop() error {
                                c.PeerRequests = make(map[request]struct{}, maxRequests)
                        }
                        c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{}
-                       cl.upload(t, c)
+                       c.upload()
                case pp.Cancel:
                        req := newRequest(msg.Index, msg.Begin, msg.Length)
                        if !c.PeerCancel(req) {
@@ -955,7 +955,7 @@ func (c *connection) receiveChunk(msg *pp.Message) {
        c.UsefulChunksReceived++
        c.lastUsefulChunkReceived = time.Now()
 
-       cl.upload(t, c)
+       c.upload()
 
        // Need to record that it hasn't been written yet, before we attempt to do
        // anything with it.
@@ -1000,3 +1000,63 @@ func (c *connection) receiveChunk(msg *pp.Message) {
        t.publishPieceChange(int(req.Index))
        return
 }
+
+// Also handles choking and unchoking of the remote peer.
+func (c *connection) upload() {
+       t := c.t
+       cl := t.cl
+       if cl.config.NoUpload {
+               return
+       }
+       if !c.PeerInterested {
+               return
+       }
+       seeding := t.seeding()
+       if !seeding && !t.connHasWantedPieces(c) {
+               // There's no reason to upload to this peer.
+               return
+       }
+       // Breaking or completing this loop means we don't want to upload to the
+       // peer anymore, and we choke them.
+another:
+       for seeding || c.chunksSent < c.UsefulChunksReceived+6 {
+               // We want to upload to the peer.
+               c.Unchoke()
+               for r := range c.PeerRequests {
+                       res := cl.uploadLimit.ReserveN(time.Now(), int(r.Length))
+                       delay := res.Delay()
+                       if delay > 0 {
+                               res.Cancel()
+                               go func() {
+                                       time.Sleep(delay)
+                                       cl.mu.Lock()
+                                       defer cl.mu.Unlock()
+                                       c.upload()
+                               }()
+                               return
+                       }
+                       err := cl.sendChunk(t, c, r)
+                       if err != nil {
+                               i := int(r.Index)
+                               if t.pieceComplete(i) {
+                                       t.updatePieceCompletion(i)
+                                       if !t.pieceComplete(i) {
+                                               // We had the piece, but not anymore.
+                                               break another
+                                       }
+                               }
+                               log.Printf("error sending chunk %+v to peer: %s", r, err)
+                               // If we failed to send a chunk, choke the peer to ensure they
+                               // flush all their requests. We've probably dropped a piece,
+                               // but there's no way to communicate this to the peer. If they
+                               // ask for it again, we'll kick them to allow us to send them
+                               // an updated bitfield.
+                               break another
+                       }
+                       delete(c.PeerRequests, r)
+                       goto another
+               }
+               return
+       }
+       c.Choke()
+}
index 2577960ca3eea6bf6cd5c78778c4c2e7b35a12b7..c2baf9c10dd5a0eeaf2421d8488776f53122592c 100644 (file)
@@ -296,10 +296,6 @@ func (t *Torrent) setInfoBytes(b []byte) error {
        return nil
 }
 
-func (t *Torrent) verifyPiece(piece int) {
-       t.cl.verifyPiece(t, piece)
-}
-
 func (t *Torrent) haveAllMetadataPieces() bool {
        if t.haveInfo() {
                return true
@@ -1009,7 +1005,14 @@ func (t *Torrent) pendRequest(req request) {
 }
 
 func (t *Torrent) pieceChanged(piece int) {
-       t.cl.pieceChanged(t, piece)
+       correct := t.pieceComplete(piece)
+       defer t.cl.event.Broadcast()
+       if correct {
+               t.onCompletedPiece(piece)
+       } else {
+               t.onFailedPiece(piece)
+       }
+       t.updatePiecePriority(piece)
 }
 
 func (t *Torrent) openNewConns() {
@@ -1415,3 +1418,106 @@ func (t *Torrent) SetMaxEstablishedConns(max int) (oldMax int) {
 func (t *Torrent) mu() missinggo.RWLocker {
        return &t.cl.mu
 }
+
+func (t *Torrent) pieceHashed(piece int, correct bool) {
+       if t.closed.IsSet() {
+               return
+       }
+       p := &t.pieces[piece]
+       if p.EverHashed {
+               // Don't score the first time a piece is hashed, it could be an
+               // initial check.
+               if correct {
+                       pieceHashedCorrect.Add(1)
+               } else {
+                       log.Printf("%s: piece %d (%x) failed hash", t, piece, p.Hash)
+                       pieceHashedNotCorrect.Add(1)
+               }
+       }
+       p.EverHashed = true
+       touchers := t.reapPieceTouches(piece)
+       if correct {
+               for _, c := range touchers {
+                       c.goodPiecesDirtied++
+               }
+               err := p.Storage().MarkComplete()
+               if err != nil {
+                       log.Printf("%T: error completing piece %d: %s", t.storage, piece, err)
+               }
+               t.updatePieceCompletion(piece)
+       } else if len(touchers) != 0 {
+               log.Printf("dropping and banning %d conns that touched piece", len(touchers))
+               for _, c := range touchers {
+                       c.badPiecesDirtied++
+                       t.cl.banPeerIP(missinggo.AddrIP(c.remoteAddr()))
+                       t.dropConnection(c)
+               }
+       }
+       t.pieceChanged(piece)
+}
+
+func (t *Torrent) onCompletedPiece(piece int) {
+       t.pendingPieces.Remove(piece)
+       t.pendAllChunkSpecs(piece)
+       for _, conn := range t.conns {
+               conn.Have(piece)
+               for r := range conn.Requests {
+                       if int(r.Index) == piece {
+                               conn.Cancel(r)
+                       }
+               }
+               // Could check here if peer doesn't have piece, but due to caching
+               // some peers may have said they have a piece but they don't.
+               conn.upload()
+       }
+}
+
+func (t *Torrent) onFailedPiece(piece int) {
+       cl := t.cl
+       if t.pieceAllDirty(piece) {
+               t.pendAllChunkSpecs(piece)
+       }
+       if !t.wantPieceIndex(piece) {
+               return
+       }
+       cl.openNewConns(t)
+       for _, conn := range t.conns {
+               if conn.PeerHasPiece(piece) {
+                       conn.updateRequests()
+               }
+       }
+}
+
+func (t *Torrent) verifyPiece(piece int) {
+       cl := t.cl
+       cl.mu.Lock()
+       defer cl.mu.Unlock()
+       p := &t.pieces[piece]
+       for p.Hashing || t.storage == nil {
+               cl.event.Wait()
+       }
+       p.QueuedForHash = false
+       if t.closed.IsSet() || t.pieceComplete(piece) {
+               t.updatePiecePriority(piece)
+               return
+       }
+       p.Hashing = true
+       t.publishPieceChange(piece)
+       cl.mu.Unlock()
+       sum := t.hashPiece(piece)
+       cl.mu.Lock()
+       p.Hashing = false
+       t.pieceHashed(piece, sum == p.Hash)
+}
+
+// Return the connections that touched a piece, and clear the entry while
+// doing it.
+func (t *Torrent) reapPieceTouches(piece int) (ret []*connection) {
+       for _, c := range t.conns {
+               if _, ok := c.peerTouchedPieces[piece]; ok {
+                       ret = append(ret, c)
+                       delete(c.peerTouchedPieces, piece)
+               }
+       }
+       return
+}