"fmt"
"io"
"log"
- "math/rand"
"net"
"sort"
"sync"
// Values are the piece indices that changed.
pieceStateChanges *pubsub.PubSub
chunkSize pp.Integer
- // Chunks that are wanted before all others. This is for
- // responsive/streaming readers that want to unblock ASAP.
- urgent map[request]struct{}
// Total length of the torrent in bytes. Stored because it's not O(1) to
// get this from the info dict.
length int64
// Closed when .Info is set.
gotMetainfo chan struct{}
- connPiecePriorites sync.Pool
+ readers map[*Reader]struct{}
}
var (
t.displayName = dn
}
-func (t *torrent) newConnPiecePriorities() []int {
- _ret := t.connPiecePriorites.Get()
- if _ret != nil {
- piecePrioritiesReused.Add(1)
- return _ret.([]int)
- }
- piecePrioritiesNew.Add(1)
- return rand.Perm(t.numPieces())
-}
-
func (t *torrent) pieceComplete(piece int) bool {
// TODO: This is called when setting metadata, and before storage is
// assigned, which doesn't seem right.
missinggo.CopyExact(piece.Hash[:], hash)
}
for _, conn := range t.Conns {
- t.initRequestOrdering(conn)
if err := conn.setNumPieces(t.numPieces()); err != nil {
log.Printf("closing connection: %s", err)
conn.Close()
func (t *torrent) pieceState(index int) (ret PieceState) {
p := &t.Pieces[index]
- ret.Priority = p.Priority
+ ret.Priority = t.piecePriority(index)
if t.pieceComplete(index) {
ret.Complete = true
}
fmt.Fprintln(w)
}
fmt.Fprintf(w, "Urgent:")
- for req := range t.urgent {
- fmt.Fprintf(w, " %v", req)
- }
+ t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
+ fmt.Fprintf(w, " %d:%d", begin, end)
+ return true
+ })
fmt.Fprintln(w)
fmt.Fprintf(w, "Trackers: ")
for _, tier := range t.Trackers {
if t.Pieces[r.Index].pendingChunk(r.chunkSpec, t.chunkSize) {
return true
}
- _, ok := t.urgent[r]
- return ok
-}
-
-func (t *torrent) urgentChunkInPiece(piece int) bool {
- p := pp.Integer(piece)
- for req := range t.urgent {
- if req.Index == p {
- return true
- }
- }
+ // TODO: What about pieces that were wanted, but aren't now, and aren't
+ // completed either? That used to be done here.
return false
}
if p.Hashing {
return false
}
- if p.Priority == PiecePriorityNone {
- if !t.urgentChunkInPiece(index) {
- return false
- }
- }
+
// Put piece complete check last, since it's the slowest as it can involve
// calling out into external data stores.
return !t.pieceComplete(index)
}
func (t *torrent) connHasWantedPieces(c *connection) bool {
- return c.pieceRequestOrder != nil && !c.pieceRequestOrder.Empty()
+ return !t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
+ for i := begin; i < end; i++ {
+ if c.PeerHasPiece(i) {
+ return false
+ }
+ }
+ return true
+ })
}
func (t *torrent) extentPieces(off, _len int64) (pieces []int) {
}
return true
}
+
+func (t *torrent) forUrgentPieces(f func(piece int) (again bool)) (all bool) {
+ return t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
+ if begin < end {
+ if !f(begin) {
+ return false
+ }
+ }
+ return true
+ })
+}
+
+func (t *torrent) readersChanged(cl *Client) {
+ for _, c := range t.Conns {
+ cl.replenishConnRequests(t, c)
+ }
+ cl.openNewConns(t)
+}
+
+func (t *torrent) byteRegionPieces(off, size int64) (begin, end int) {
+ if off >= t.length {
+ return
+ }
+ if off < 0 {
+ size += off
+ off = 0
+ }
+ if size <= 0 {
+ return
+ }
+ begin = int(off / t.Info.PieceLength)
+ end = int((off + size + t.Info.PieceLength - 1) / t.Info.PieceLength)
+ if end > t.Info.NumPieces() {
+ end = t.Info.NumPieces()
+ }
+ return
+}
+
+func (t *torrent) forReaderWantedRegionPieces(f func(begin, end int) (more bool)) (all bool) {
+ for r := range t.readers {
+ r.mu.Lock()
+ pos, readahead := r.pos, r.readahead
+ r.mu.Unlock()
+ if readahead < 1 {
+ readahead = 1
+ }
+ begin, end := t.byteRegionPieces(pos, readahead)
+ if begin >= end {
+ continue
+ }
+ if !f(begin, end) {
+ return false
+ }
+ }
+ return true
+}
+
+func (t *torrent) piecePriority(piece int) (ret piecePriority) {
+ ret = PiecePriorityNone
+ if t.pieceComplete(piece) {
+ return
+ }
+ raiseRet := func(prio piecePriority) {
+ if prio > ret {
+ ret = prio
+ }
+ }
+ t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
+ if piece == begin {
+ raiseRet(PiecePriorityNow)
+ }
+ if begin <= piece && piece < end {
+ raiseRet(PiecePriorityReadahead)
+ }
+ return true
+ })
+ return
+}