]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Fix downloading of unwanted chunks, and write out downloaded chunks using the connect...
authorMatt Joiner <anacrolix@gmail.com>
Wed, 27 Jan 2016 18:54:48 +0000 (05:54 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Wed, 27 Jan 2016 18:54:48 +0000 (05:54 +1100)
Doing the writes with the connection loop goroutine prevents memory use blowing out when chunks aren't written out quickly enough.

client.go
connection.go
piece.go
torrent.go

index 422180deed3345ef38f618a7afcd6fd0bb2d5fa5..912bc5912ef4a25746dd5721bacad627387628bf 100644 (file)
--- a/client.go
+++ b/client.go
@@ -24,7 +24,6 @@ import (
 
        "github.com/anacrolix/missinggo"
        . "github.com/anacrolix/missinggo"
-       "github.com/anacrolix/missinggo/perf"
        "github.com/anacrolix/missinggo/pubsub"
        "github.com/anacrolix/sync"
        "github.com/anacrolix/utp"
@@ -1482,7 +1481,7 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error {
                                }
                        }())
                case pp.Piece:
-                       err = me.downloadedChunk(t, c, &msg)
+                       me.downloadedChunk(t, c, &msg)
                case pp.Extended:
                        switch msg.ExtendedID {
                        case pp.HandshakeExtendedID:
@@ -1675,10 +1674,12 @@ func (t *torrent) needData() bool {
        if !t.haveInfo() {
                return true
        }
-       if len(t.pendingPieces) != 0 {
-               return true
+       for i := range t.pendingPieces {
+               if t.wantPiece(i) {
+                       return true
+               }
        }
-       return !t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
+       return !t.forReaderOffsetPieces(func(begin, end int) (again bool) {
                for i := begin; i < end; i++ {
                        if !t.pieceComplete(i) {
                                return false
@@ -2363,7 +2364,7 @@ func (me *Client) WaitAll() bool {
 }
 
 // Handle a received chunk from a peer.
-func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) error {
+func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) {
        chunksReceived.Add(1)
 
        req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
@@ -2382,7 +2383,7 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
        if !t.wantChunk(req) {
                unwantedChunksReceived.Add(1)
                c.UnwantedChunksReceived++
-               return nil
+               return
        }
 
        c.UsefulChunksReceived++
@@ -2390,45 +2391,11 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
 
        me.upload(t, c)
 
-       piece.pendingWritesMutex.Lock()
-       piece.pendingWrites++
-       piece.pendingWritesMutex.Unlock()
-       go func() {
-               defer me.event.Broadcast()
-               defer func() {
-                       piece.pendingWritesMutex.Lock()
-                       piece.pendingWrites--
-                       if piece.pendingWrites == 0 {
-                               piece.noPendingWrites.Broadcast()
-                       }
-                       piece.pendingWritesMutex.Unlock()
-               }()
-               // Write the chunk out.
-               tr := perf.NewTimer()
-               err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
-               if err != nil {
-                       log.Printf("error writing chunk: %s", err)
-                       return
-               }
-               tr.Stop("write chunk")
-               me.mu.Lock()
-               if c.peerTouchedPieces == nil {
-                       c.peerTouchedPieces = make(map[int]struct{})
-               }
-               c.peerTouchedPieces[index] = struct{}{}
-               me.mu.Unlock()
-       }()
-
-       // log.Println("got chunk", req)
-       me.event.Broadcast()
-       defer t.publishPieceChange(int(req.Index))
+       // Need to record that it hasn't been written yet, before we attempt to do
+       // anything with it.
+       piece.incrementPendingWrites()
        // Record that we have the chunk.
        piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
-       // It's important that the piece is potentially queued before we check if
-       // the piece is still wanted, because if it is queued, it won't be wanted.
-       if t.pieceAllDirty(index) {
-               me.queuePieceCheck(t, int(req.Index))
-       }
 
        // Cancel pending requests for this chunk.
        for _, c := range t.Conns {
@@ -2437,7 +2404,33 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
                }
        }
 
-       return nil
+       me.mu.Unlock()
+       // Write the chunk out.
+       err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
+       me.mu.Lock()
+
+       piece.decrementPendingWrites()
+
+       if err != nil {
+               log.Printf("error writing chunk: %s", err)
+               t.pendRequest(req)
+               return
+       }
+
+       // It's important that the piece is potentially queued before we check if
+       // the piece is still wanted, because if it is queued, it won't be wanted.
+       if t.pieceAllDirty(index) {
+               me.queuePieceCheck(t, int(req.Index))
+       }
+
+       if c.peerTouchedPieces == nil {
+               c.peerTouchedPieces = make(map[int]struct{})
+       }
+       c.peerTouchedPieces[index] = struct{}{}
+
+       me.event.Broadcast()
+       t.publishPieceChange(int(req.Index))
+       return
 }
 
 // Return the connections that touched a piece, and clear the entry while
index 75c26d46ce653bfe7b93f2e96236595a3ab8f7fd..cfa0fe04c34c6ccacd8364b79d7ff89e9f91299e 100644 (file)
@@ -564,7 +564,7 @@ func (c *connection) fillRequests() {
        }) {
                return
        }
-       c.t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
+       c.t.forReaderOffsetPieces(func(begin, end int) (again bool) {
                for i := begin + 1; i < end; i++ {
                        if !c.t.connRequestPiecePendingChunks(c, i) {
                                return false
@@ -573,6 +573,9 @@ func (c *connection) fillRequests() {
                return true
        })
        for i := range c.t.pendingPieces {
+               if !c.t.wantPiece(i) {
+                       continue
+               }
                if !c.t.connRequestPiecePendingChunks(c, i) {
                        return
                }
index 1c86b7eae305899cc0056a4fdc5d02e6e1b20b02..0643878c2e73fe29f50eee208133337208b8776c 100644 (file)
--- a/piece.go
+++ b/piece.go
@@ -61,6 +61,13 @@ func (p *piece) unpendChunkIndex(i int) {
        p.DirtyChunks[i] = true
 }
 
+func (p *piece) pendChunkIndex(i int) {
+       if i >= len(p.DirtyChunks) {
+               return
+       }
+       p.DirtyChunks[i] = false
+}
+
 func chunkIndexSpec(index int, pieceLength, chunkSize pp.Integer) chunkSpec {
        ret := chunkSpec{pp.Integer(index) * chunkSize, chunkSize}
        if ret.Begin+ret.Length > pieceLength {
index 38990144eef6b587a4200c37c5bd02385b9ceeac..bb2aa78d27a1cdda4affa14f79ea44e592bfe2e6 100644 (file)
@@ -12,6 +12,7 @@ import (
        "time"
 
        "github.com/anacrolix/missinggo"
+       "github.com/anacrolix/missinggo/perf"
        "github.com/anacrolix/missinggo/pubsub"
        "github.com/bradfitz/iter"
 
@@ -422,8 +423,8 @@ func (t *torrent) writeStatus(w io.Writer, cl *Client) {
                }
                fmt.Fprintln(w)
        }
-       fmt.Fprintf(w, "Urgent:")
-       t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
+       fmt.Fprintf(w, "Reader Pieces:")
+       t.forReaderOffsetPieces(func(begin, end int) (again bool) {
                fmt.Fprintf(w, " %d:%d", begin, end)
                return true
        })
@@ -567,10 +568,14 @@ func (t *torrent) offsetRequest(off int64) (req request, ok bool) {
 }
 
 func (t *torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
+       tr := perf.NewTimer()
        n, err := t.data.WriteAt(data, int64(piece)*t.Info.PieceLength+begin)
        if err == nil && n != len(data) {
                err = io.ErrShortWrite
        }
+       if err == nil {
+               tr.Stop("write chunk")
+       }
        return
 }
 
@@ -729,14 +734,35 @@ func (t *torrent) wantPiece(index int) bool {
        if p.Hashing {
                return false
        }
+       if t.pieceComplete(index) {
+               return false
+       }
+       if _, ok := t.pendingPieces[index]; ok {
+               return true
+       }
+       return !t.forReaderOffsetPieces(func(begin, end int) bool {
+               return index < begin || index >= end
+       })
+}
 
-       // Put piece complete check last, since it's the slowest as it can involve
-       // calling out into external data stores.
-       return !t.pieceComplete(index)
+func (t *torrent) forNeededPieces(f func(piece int) (more bool)) (all bool) {
+       return t.forReaderOffsetPieces(func(begin, end int) (more bool) {
+               for i := begin; begin < end; i++ {
+                       if !f(i) {
+                               return false
+                       }
+               }
+               return true
+       })
 }
 
 func (t *torrent) connHasWantedPieces(c *connection) bool {
-       return !t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
+       for i := range t.pendingPieces {
+               if c.PeerHasPiece(i) {
+                       return true
+               }
+       }
+       return !t.forReaderOffsetPieces(func(begin, end int) (again bool) {
                for i := begin; i < end; i++ {
                        if c.PeerHasPiece(i) {
                                return false
@@ -781,6 +807,9 @@ func (t *torrent) publishPieceChange(piece int) {
 }
 
 func (t *torrent) pieceNumPendingChunks(piece int) int {
+       if t.pieceComplete(piece) {
+               return 0
+       }
        return t.pieceNumChunks(piece) - t.Pieces[piece].numDirtyChunks()
 }
 
@@ -798,7 +827,7 @@ func (t *torrent) pieceAllDirty(piece int) bool {
 }
 
 func (t *torrent) forUrgentPieces(f func(piece int) (again bool)) (all bool) {
-       return t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
+       return t.forReaderOffsetPieces(func(begin, end int) (again bool) {
                if begin < end {
                        if !f(begin) {
                                return false
@@ -836,7 +865,8 @@ func (t *torrent) byteRegionPieces(off, size int64) (begin, end int) {
        return
 }
 
-func (t *torrent) forReaderWantedRegionPieces(f func(begin, end int) (more bool)) (all bool) {
+// Returns true if all iterations complete without breaking.
+func (t *torrent) forReaderOffsetPieces(f func(begin, end int) (more bool)) (all bool) {
        for r := range t.readers {
                r.mu.Lock()
                pos, readahead := r.pos, r.readahead
@@ -868,7 +898,7 @@ func (t *torrent) piecePriority(piece int) (ret piecePriority) {
                        ret = prio
                }
        }
-       t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
+       t.forReaderOffsetPieces(func(begin, end int) (again bool) {
                if piece == begin {
                        raiseRet(PiecePriorityNow)
                }
@@ -913,3 +943,8 @@ func (t *torrent) connRequestPiecePendingChunks(c *connection, piece int) (more
        }
        return true
 }
+
+func (t *torrent) pendRequest(req request) {
+       ci := chunkIndex(req.chunkSpec, t.chunkSize)
+       t.Pieces[req.Index].pendChunkIndex(ci)
+}