]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Move Client.downloadedChunk to connection.go
authorMatt Joiner <anacrolix@gmail.com>
Tue, 22 Nov 2016 03:16:18 +0000 (14:16 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 22 Nov 2016 03:16:18 +0000 (14:16 +1100)
client.go
connection.go

index d28f9b9db784e080eaeafb068081182f3d836108..7ce5686f04e32ad2479c9531189906f10e9e2baa 100644 (file)
--- a/client.go
+++ b/client.go
@@ -1332,78 +1332,6 @@ func (cl *Client) WaitAll() bool {
        return true
 }
 
-// Handle a received chunk from a peer.
-func (cl *Client) downloadedChunk(t *Torrent, c *connection, msg *pp.Message) {
-       chunksReceived.Add(1)
-
-       req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
-
-       // Request has been satisfied.
-       if cl.connDeleteRequest(t, c, req) {
-               defer c.updateRequests()
-       } else {
-               unexpectedChunksReceived.Add(1)
-       }
-
-       index := int(req.Index)
-       piece := &t.pieces[index]
-
-       // Do we actually want this chunk?
-       if !t.wantPiece(req) {
-               unwantedChunksReceived.Add(1)
-               c.UnwantedChunksReceived++
-               return
-       }
-
-       c.UsefulChunksReceived++
-       c.lastUsefulChunkReceived = time.Now()
-
-       cl.upload(t, c)
-
-       // Need to record that it hasn't been written yet, before we attempt to do
-       // anything with it.
-       piece.incrementPendingWrites()
-       // Record that we have the chunk.
-       piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
-
-       // Cancel pending requests for this chunk.
-       for _, c := range t.conns {
-               if cl.connCancel(t, c, req) {
-                       c.updateRequests()
-               }
-       }
-
-       cl.mu.Unlock()
-       // Write the chunk out. Note that the upper bound on chunk writing
-       // concurrency will be the number of connections.
-       err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
-       cl.mu.Lock()
-
-       piece.decrementPendingWrites()
-
-       if err != nil {
-               log.Printf("%s: error writing chunk %v: %s", t, req, err)
-               t.pendRequest(req)
-               t.updatePieceCompletion(int(msg.Index))
-               return
-       }
-
-       // It's important that the piece is potentially queued before we check if
-       // the piece is still wanted, because if it is queued, it won't be wanted.
-       if t.pieceAllDirty(index) {
-               cl.queuePieceCheck(t, int(req.Index))
-       }
-
-       if c.peerTouchedPieces == nil {
-               c.peerTouchedPieces = make(map[int]struct{})
-       }
-       c.peerTouchedPieces[index] = struct{}{}
-
-       cl.event.Broadcast()
-       t.publishPieceChange(int(req.Index))
-       return
-}
-
 // Return the connections that touched a piece, and clear the entry while
 // doing it.
 func (cl *Client) reapPieceTouches(t *Torrent, piece int) (ret []*connection) {
index 7e88f837d53ebfa4267a14f2f104323a9992ae90..be44a29048b5e0791088a21c965a14d334e040a1 100644 (file)
@@ -926,3 +926,75 @@ func (cn *connection) rw() io.ReadWriter {
                io.Writer
        }{cn.r, cn.w}
 }
+
+// Handle a received chunk from a peer.
+func (cl *Client) downloadedChunk(t *Torrent, c *connection, msg *pp.Message) {
+       chunksReceived.Add(1)
+
+       req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
+
+       // Request has been satisfied.
+       if cl.connDeleteRequest(t, c, req) {
+               defer c.updateRequests()
+       } else {
+               unexpectedChunksReceived.Add(1)
+       }
+
+       index := int(req.Index)
+       piece := &t.pieces[index]
+
+       // Do we actually want this chunk?
+       if !t.wantPiece(req) {
+               unwantedChunksReceived.Add(1)
+               c.UnwantedChunksReceived++
+               return
+       }
+
+       c.UsefulChunksReceived++
+       c.lastUsefulChunkReceived = time.Now()
+
+       cl.upload(t, c)
+
+       // Need to record that it hasn't been written yet, before we attempt to do
+       // anything with it.
+       piece.incrementPendingWrites()
+       // Record that we have the chunk.
+       piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
+
+       // Cancel pending requests for this chunk.
+       for _, c := range t.conns {
+               if cl.connCancel(t, c, req) {
+                       c.updateRequests()
+               }
+       }
+
+       cl.mu.Unlock()
+       // Write the chunk out. Note that the upper bound on chunk writing
+       // concurrency will be the number of connections.
+       err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
+       cl.mu.Lock()
+
+       piece.decrementPendingWrites()
+
+       if err != nil {
+               log.Printf("%s: error writing chunk %v: %s", t, req, err)
+               t.pendRequest(req)
+               t.updatePieceCompletion(int(msg.Index))
+               return
+       }
+
+       // It's important that the piece is potentially queued before we check if
+       // the piece is still wanted, because if it is queued, it won't be wanted.
+       if t.pieceAllDirty(index) {
+               cl.queuePieceCheck(t, int(req.Index))
+       }
+
+       if c.peerTouchedPieces == nil {
+               c.peerTouchedPieces = make(map[int]struct{})
+       }
+       c.peerTouchedPieces[index] = struct{}{}
+
+       cl.event.Broadcast()
+       t.publishPieceChange(int(req.Index))
+       return
+}