From: Matt Joiner Date: Wed, 27 Jan 2016 18:54:48 +0000 (+1100) Subject: Fix downloading of unwanted chunks, and write out downloaded chunks using the connect... X-Git-Tag: v1.0.0~939^2~3 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=ae9bba2611a78cded94c70a8d4c4f105dd28fd41;p=btrtrc.git Fix downloading of unwanted chunks, and write out downloaded chunks using the connection loop goroutine Doing the writes with the connection loop goroutine prevents memory use blowing out when chunks aren't written out quickly enough. --- diff --git a/client.go b/client.go index 422180de..912bc591 100644 --- a/client.go +++ b/client.go @@ -24,7 +24,6 @@ import ( "github.com/anacrolix/missinggo" . "github.com/anacrolix/missinggo" - "github.com/anacrolix/missinggo/perf" "github.com/anacrolix/missinggo/pubsub" "github.com/anacrolix/sync" "github.com/anacrolix/utp" @@ -1482,7 +1481,7 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error { } }()) case pp.Piece: - err = me.downloadedChunk(t, c, &msg) + me.downloadedChunk(t, c, &msg) case pp.Extended: switch msg.ExtendedID { case pp.HandshakeExtendedID: @@ -1675,10 +1674,12 @@ func (t *torrent) needData() bool { if !t.haveInfo() { return true } - if len(t.pendingPieces) != 0 { - return true + for i := range t.pendingPieces { + if t.wantPiece(i) { + return true + } } - return !t.forReaderWantedRegionPieces(func(begin, end int) (again bool) { + return !t.forReaderOffsetPieces(func(begin, end int) (again bool) { for i := begin; i < end; i++ { if !t.pieceComplete(i) { return false @@ -2363,7 +2364,7 @@ func (me *Client) WaitAll() bool { } // Handle a received chunk from a peer. -func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) error { +func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) { chunksReceived.Add(1) req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece))) @@ -2382,7 +2383,7 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er if !t.wantChunk(req) { unwantedChunksReceived.Add(1) c.UnwantedChunksReceived++ - return nil + return } c.UsefulChunksReceived++ @@ -2390,45 +2391,11 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er me.upload(t, c) - piece.pendingWritesMutex.Lock() - piece.pendingWrites++ - piece.pendingWritesMutex.Unlock() - go func() { - defer me.event.Broadcast() - defer func() { - piece.pendingWritesMutex.Lock() - piece.pendingWrites-- - if piece.pendingWrites == 0 { - piece.noPendingWrites.Broadcast() - } - piece.pendingWritesMutex.Unlock() - }() - // Write the chunk out. - tr := perf.NewTimer() - err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece) - if err != nil { - log.Printf("error writing chunk: %s", err) - return - } - tr.Stop("write chunk") - me.mu.Lock() - if c.peerTouchedPieces == nil { - c.peerTouchedPieces = make(map[int]struct{}) - } - c.peerTouchedPieces[index] = struct{}{} - me.mu.Unlock() - }() - - // log.Println("got chunk", req) - me.event.Broadcast() - defer t.publishPieceChange(int(req.Index)) + // Need to record that it hasn't been written yet, before we attempt to do + // anything with it. + piece.incrementPendingWrites() // Record that we have the chunk. piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize)) - // It's important that the piece is potentially queued before we check if - // the piece is still wanted, because if it is queued, it won't be wanted. - if t.pieceAllDirty(index) { - me.queuePieceCheck(t, int(req.Index)) - } // Cancel pending requests for this chunk. for _, c := range t.Conns { @@ -2437,7 +2404,33 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er } } - return nil + me.mu.Unlock() + // Write the chunk out. + err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece) + me.mu.Lock() + + piece.decrementPendingWrites() + + if err != nil { + log.Printf("error writing chunk: %s", err) + t.pendRequest(req) + return + } + + // It's important that the piece is potentially queued before we check if + // the piece is still wanted, because if it is queued, it won't be wanted. + if t.pieceAllDirty(index) { + me.queuePieceCheck(t, int(req.Index)) + } + + if c.peerTouchedPieces == nil { + c.peerTouchedPieces = make(map[int]struct{}) + } + c.peerTouchedPieces[index] = struct{}{} + + me.event.Broadcast() + t.publishPieceChange(int(req.Index)) + return } // Return the connections that touched a piece, and clear the entry while diff --git a/connection.go b/connection.go index 75c26d46..cfa0fe04 100644 --- a/connection.go +++ b/connection.go @@ -564,7 +564,7 @@ func (c *connection) fillRequests() { }) { return } - c.t.forReaderWantedRegionPieces(func(begin, end int) (again bool) { + c.t.forReaderOffsetPieces(func(begin, end int) (again bool) { for i := begin + 1; i < end; i++ { if !c.t.connRequestPiecePendingChunks(c, i) { return false @@ -573,6 +573,9 @@ func (c *connection) fillRequests() { return true }) for i := range c.t.pendingPieces { + if !c.t.wantPiece(i) { + continue + } if !c.t.connRequestPiecePendingChunks(c, i) { return } diff --git a/piece.go b/piece.go index 1c86b7ea..0643878c 100644 --- a/piece.go +++ b/piece.go @@ -61,6 +61,13 @@ func (p *piece) unpendChunkIndex(i int) { p.DirtyChunks[i] = true } +func (p *piece) pendChunkIndex(i int) { + if i >= len(p.DirtyChunks) { + return + } + p.DirtyChunks[i] = false +} + func chunkIndexSpec(index int, pieceLength, chunkSize pp.Integer) chunkSpec { ret := chunkSpec{pp.Integer(index) * chunkSize, chunkSize} if ret.Begin+ret.Length > pieceLength { diff --git a/torrent.go b/torrent.go index 38990144..bb2aa78d 100644 --- a/torrent.go +++ b/torrent.go @@ -12,6 +12,7 @@ import ( "time" "github.com/anacrolix/missinggo" + "github.com/anacrolix/missinggo/perf" "github.com/anacrolix/missinggo/pubsub" "github.com/bradfitz/iter" @@ -422,8 +423,8 @@ func (t *torrent) writeStatus(w io.Writer, cl *Client) { } fmt.Fprintln(w) } - fmt.Fprintf(w, "Urgent:") - t.forReaderWantedRegionPieces(func(begin, end int) (again bool) { + fmt.Fprintf(w, "Reader Pieces:") + t.forReaderOffsetPieces(func(begin, end int) (again bool) { fmt.Fprintf(w, " %d:%d", begin, end) return true }) @@ -567,10 +568,14 @@ func (t *torrent) offsetRequest(off int64) (req request, ok bool) { } func (t *torrent) writeChunk(piece int, begin int64, data []byte) (err error) { + tr := perf.NewTimer() n, err := t.data.WriteAt(data, int64(piece)*t.Info.PieceLength+begin) if err == nil && n != len(data) { err = io.ErrShortWrite } + if err == nil { + tr.Stop("write chunk") + } return } @@ -729,14 +734,35 @@ func (t *torrent) wantPiece(index int) bool { if p.Hashing { return false } + if t.pieceComplete(index) { + return false + } + if _, ok := t.pendingPieces[index]; ok { + return true + } + return !t.forReaderOffsetPieces(func(begin, end int) bool { + return index < begin || index >= end + }) +} - // Put piece complete check last, since it's the slowest as it can involve - // calling out into external data stores. - return !t.pieceComplete(index) +func (t *torrent) forNeededPieces(f func(piece int) (more bool)) (all bool) { + return t.forReaderOffsetPieces(func(begin, end int) (more bool) { + for i := begin; begin < end; i++ { + if !f(i) { + return false + } + } + return true + }) } func (t *torrent) connHasWantedPieces(c *connection) bool { - return !t.forReaderWantedRegionPieces(func(begin, end int) (again bool) { + for i := range t.pendingPieces { + if c.PeerHasPiece(i) { + return true + } + } + return !t.forReaderOffsetPieces(func(begin, end int) (again bool) { for i := begin; i < end; i++ { if c.PeerHasPiece(i) { return false @@ -781,6 +807,9 @@ func (t *torrent) publishPieceChange(piece int) { } func (t *torrent) pieceNumPendingChunks(piece int) int { + if t.pieceComplete(piece) { + return 0 + } return t.pieceNumChunks(piece) - t.Pieces[piece].numDirtyChunks() } @@ -798,7 +827,7 @@ func (t *torrent) pieceAllDirty(piece int) bool { } func (t *torrent) forUrgentPieces(f func(piece int) (again bool)) (all bool) { - return t.forReaderWantedRegionPieces(func(begin, end int) (again bool) { + return t.forReaderOffsetPieces(func(begin, end int) (again bool) { if begin < end { if !f(begin) { return false @@ -836,7 +865,8 @@ func (t *torrent) byteRegionPieces(off, size int64) (begin, end int) { return } -func (t *torrent) forReaderWantedRegionPieces(f func(begin, end int) (more bool)) (all bool) { +// Returns true if all iterations complete without breaking. +func (t *torrent) forReaderOffsetPieces(f func(begin, end int) (more bool)) (all bool) { for r := range t.readers { r.mu.Lock() pos, readahead := r.pos, r.readahead @@ -868,7 +898,7 @@ func (t *torrent) piecePriority(piece int) (ret piecePriority) { ret = prio } } - t.forReaderWantedRegionPieces(func(begin, end int) (again bool) { + t.forReaderOffsetPieces(func(begin, end int) (again bool) { if piece == begin { raiseRet(PiecePriorityNow) } @@ -913,3 +943,8 @@ func (t *torrent) connRequestPiecePendingChunks(c *connection, piece int) (more } return true } + +func (t *torrent) pendRequest(req request) { + ci := chunkIndex(req.chunkSpec, t.chunkSize) + t.Pieces[req.Index].pendChunkIndex(ci) +}