Doing the writes with the connection loop goroutine prevents memory use blowing out when chunks aren't written out quickly enough.
"github.com/anacrolix/missinggo"
. "github.com/anacrolix/missinggo"
- "github.com/anacrolix/missinggo/perf"
"github.com/anacrolix/missinggo/pubsub"
"github.com/anacrolix/sync"
"github.com/anacrolix/utp"
}
}())
case pp.Piece:
- err = me.downloadedChunk(t, c, &msg)
+ me.downloadedChunk(t, c, &msg)
case pp.Extended:
switch msg.ExtendedID {
case pp.HandshakeExtendedID:
if !t.haveInfo() {
return true
}
- if len(t.pendingPieces) != 0 {
- return true
+ for i := range t.pendingPieces {
+ if t.wantPiece(i) {
+ return true
+ }
}
- return !t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
+ return !t.forReaderOffsetPieces(func(begin, end int) (again bool) {
for i := begin; i < end; i++ {
if !t.pieceComplete(i) {
return false
}
// Handle a received chunk from a peer.
-func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) error {
+func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) {
chunksReceived.Add(1)
req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
if !t.wantChunk(req) {
unwantedChunksReceived.Add(1)
c.UnwantedChunksReceived++
- return nil
+ return
}
c.UsefulChunksReceived++
me.upload(t, c)
- piece.pendingWritesMutex.Lock()
- piece.pendingWrites++
- piece.pendingWritesMutex.Unlock()
- go func() {
- defer me.event.Broadcast()
- defer func() {
- piece.pendingWritesMutex.Lock()
- piece.pendingWrites--
- if piece.pendingWrites == 0 {
- piece.noPendingWrites.Broadcast()
- }
- piece.pendingWritesMutex.Unlock()
- }()
- // Write the chunk out.
- tr := perf.NewTimer()
- err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
- if err != nil {
- log.Printf("error writing chunk: %s", err)
- return
- }
- tr.Stop("write chunk")
- me.mu.Lock()
- if c.peerTouchedPieces == nil {
- c.peerTouchedPieces = make(map[int]struct{})
- }
- c.peerTouchedPieces[index] = struct{}{}
- me.mu.Unlock()
- }()
-
- // log.Println("got chunk", req)
- me.event.Broadcast()
- defer t.publishPieceChange(int(req.Index))
+ // Need to record that it hasn't been written yet, before we attempt to do
+ // anything with it.
+ piece.incrementPendingWrites()
// Record that we have the chunk.
piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
- // It's important that the piece is potentially queued before we check if
- // the piece is still wanted, because if it is queued, it won't be wanted.
- if t.pieceAllDirty(index) {
- me.queuePieceCheck(t, int(req.Index))
- }
// Cancel pending requests for this chunk.
for _, c := range t.Conns {
}
}
- return nil
+ me.mu.Unlock()
+ // Write the chunk out.
+ err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
+ me.mu.Lock()
+
+ piece.decrementPendingWrites()
+
+ if err != nil {
+ log.Printf("error writing chunk: %s", err)
+ t.pendRequest(req)
+ return
+ }
+
+ // It's important that the piece is potentially queued before we check if
+ // the piece is still wanted, because if it is queued, it won't be wanted.
+ if t.pieceAllDirty(index) {
+ me.queuePieceCheck(t, int(req.Index))
+ }
+
+ if c.peerTouchedPieces == nil {
+ c.peerTouchedPieces = make(map[int]struct{})
+ }
+ c.peerTouchedPieces[index] = struct{}{}
+
+ me.event.Broadcast()
+ t.publishPieceChange(int(req.Index))
+ return
}
// Return the connections that touched a piece, and clear the entry while
}) {
return
}
- c.t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
+ c.t.forReaderOffsetPieces(func(begin, end int) (again bool) {
for i := begin + 1; i < end; i++ {
if !c.t.connRequestPiecePendingChunks(c, i) {
return false
return true
})
for i := range c.t.pendingPieces {
+ if !c.t.wantPiece(i) {
+ continue
+ }
if !c.t.connRequestPiecePendingChunks(c, i) {
return
}
p.DirtyChunks[i] = true
}
+func (p *piece) pendChunkIndex(i int) {
+ if i >= len(p.DirtyChunks) {
+ return
+ }
+ p.DirtyChunks[i] = false
+}
+
func chunkIndexSpec(index int, pieceLength, chunkSize pp.Integer) chunkSpec {
ret := chunkSpec{pp.Integer(index) * chunkSize, chunkSize}
if ret.Begin+ret.Length > pieceLength {
"time"
"github.com/anacrolix/missinggo"
+ "github.com/anacrolix/missinggo/perf"
"github.com/anacrolix/missinggo/pubsub"
"github.com/bradfitz/iter"
}
fmt.Fprintln(w)
}
- fmt.Fprintf(w, "Urgent:")
- t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
+ fmt.Fprintf(w, "Reader Pieces:")
+ t.forReaderOffsetPieces(func(begin, end int) (again bool) {
fmt.Fprintf(w, " %d:%d", begin, end)
return true
})
}
func (t *torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
+ tr := perf.NewTimer()
n, err := t.data.WriteAt(data, int64(piece)*t.Info.PieceLength+begin)
if err == nil && n != len(data) {
err = io.ErrShortWrite
}
+ if err == nil {
+ tr.Stop("write chunk")
+ }
return
}
if p.Hashing {
return false
}
+ if t.pieceComplete(index) {
+ return false
+ }
+ if _, ok := t.pendingPieces[index]; ok {
+ return true
+ }
+ return !t.forReaderOffsetPieces(func(begin, end int) bool {
+ return index < begin || index >= end
+ })
+}
- // Put piece complete check last, since it's the slowest as it can involve
- // calling out into external data stores.
- return !t.pieceComplete(index)
+func (t *torrent) forNeededPieces(f func(piece int) (more bool)) (all bool) {
+ return t.forReaderOffsetPieces(func(begin, end int) (more bool) {
+ for i := begin; begin < end; i++ {
+ if !f(i) {
+ return false
+ }
+ }
+ return true
+ })
}
func (t *torrent) connHasWantedPieces(c *connection) bool {
- return !t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
+ for i := range t.pendingPieces {
+ if c.PeerHasPiece(i) {
+ return true
+ }
+ }
+ return !t.forReaderOffsetPieces(func(begin, end int) (again bool) {
for i := begin; i < end; i++ {
if c.PeerHasPiece(i) {
return false
}
func (t *torrent) pieceNumPendingChunks(piece int) int {
+ if t.pieceComplete(piece) {
+ return 0
+ }
return t.pieceNumChunks(piece) - t.Pieces[piece].numDirtyChunks()
}
}
func (t *torrent) forUrgentPieces(f func(piece int) (again bool)) (all bool) {
- return t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
+ return t.forReaderOffsetPieces(func(begin, end int) (again bool) {
if begin < end {
if !f(begin) {
return false
return
}
-func (t *torrent) forReaderWantedRegionPieces(f func(begin, end int) (more bool)) (all bool) {
+// Returns true if all iterations complete without breaking.
+func (t *torrent) forReaderOffsetPieces(f func(begin, end int) (more bool)) (all bool) {
for r := range t.readers {
r.mu.Lock()
pos, readahead := r.pos, r.readahead
ret = prio
}
}
- t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
+ t.forReaderOffsetPieces(func(begin, end int) (again bool) {
if piece == begin {
raiseRet(PiecePriorityNow)
}
}
return true
}
+
+func (t *torrent) pendRequest(req request) {
+ ci := chunkIndex(req.chunkSpec, t.chunkSize)
+ t.Pieces[req.Index].pendChunkIndex(ci)
+}