]> Sergey Matveev's repositories - btrtrc.git/blobdiff - torrent.go
It's working and the tests are usually passing
[btrtrc.git] / torrent.go
index c74476eb8d7612350f422f47db283586a3801fc4..2933c809aec794ac13639b25d401ae7a62e91e74 100644 (file)
@@ -6,7 +6,6 @@ import (
        "fmt"
        "io"
        "log"
-       "math/rand"
        "net"
        "sort"
        "sync"
@@ -62,9 +61,6 @@ type torrent struct {
        // Values are the piece indices that changed.
        pieceStateChanges *pubsub.PubSub
        chunkSize         pp.Integer
-       // Chunks that are wanted before all others. This is for
-       // responsive/streaming readers that want to unblock ASAP.
-       urgent map[request]struct{}
        // Total length of the torrent in bytes. Stored because it's not O(1) to
        // get this from the info dict.
        length int64
@@ -99,7 +95,7 @@ type torrent struct {
        // Closed when .Info is set.
        gotMetainfo chan struct{}
 
-       connPiecePriorites sync.Pool
+       readers map[*Reader]struct{}
 }
 
 var (
@@ -111,16 +107,6 @@ func (t *torrent) setDisplayName(dn string) {
        t.displayName = dn
 }
 
-func (t *torrent) newConnPiecePriorities() []int {
-       _ret := t.connPiecePriorites.Get()
-       if _ret != nil {
-               piecePrioritiesReused.Add(1)
-               return _ret.([]int)
-       }
-       piecePrioritiesNew.Add(1)
-       return rand.Perm(t.numPieces())
-}
-
 func (t *torrent) pieceComplete(piece int) bool {
        // TODO: This is called when setting metadata, and before storage is
        // assigned, which doesn't seem right.
@@ -261,7 +247,6 @@ func (t *torrent) setMetadata(md *metainfo.Info, infoBytes []byte) (err error) {
                missinggo.CopyExact(piece.Hash[:], hash)
        }
        for _, conn := range t.Conns {
-               t.initRequestOrdering(conn)
                if err := conn.setNumPieces(t.numPieces()); err != nil {
                        log.Printf("closing connection: %s", err)
                        conn.Close()
@@ -324,7 +309,7 @@ func (t *torrent) Name() string {
 
 func (t *torrent) pieceState(index int) (ret PieceState) {
        p := &t.Pieces[index]
-       ret.Priority = p.Priority
+       ret.Priority = t.piecePriority(index)
        if t.pieceComplete(index) {
                ret.Complete = true
        }
@@ -436,9 +421,10 @@ func (t *torrent) writeStatus(w io.Writer, cl *Client) {
                fmt.Fprintln(w)
        }
        fmt.Fprintf(w, "Urgent:")
-       for req := range t.urgent {
-               fmt.Fprintf(w, " %v", req)
-       }
+       t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
+               fmt.Fprintf(w, " %d:%d", begin, end)
+               return true
+       })
        fmt.Fprintln(w)
        fmt.Fprintf(w, "Trackers: ")
        for _, tier := range t.Trackers {
@@ -728,17 +714,8 @@ func (t *torrent) wantChunk(r request) bool {
        if t.Pieces[r.Index].pendingChunk(r.chunkSpec, t.chunkSize) {
                return true
        }
-       _, ok := t.urgent[r]
-       return ok
-}
-
-func (t *torrent) urgentChunkInPiece(piece int) bool {
-       p := pp.Integer(piece)
-       for req := range t.urgent {
-               if req.Index == p {
-                       return true
-               }
-       }
+       // TODO: What about pieces that were wanted, but aren't now, and aren't
+       // completed either? That used to be done here.
        return false
 }
 
@@ -754,18 +731,21 @@ func (t *torrent) wantPiece(index int) bool {
        if p.Hashing {
                return false
        }
-       if p.Priority == PiecePriorityNone {
-               if !t.urgentChunkInPiece(index) {
-                       return false
-               }
-       }
+
        // Put piece complete check last, since it's the slowest as it can involve
        // calling out into external data stores.
        return !t.pieceComplete(index)
 }
 
 func (t *torrent) connHasWantedPieces(c *connection) bool {
-       return c.pieceRequestOrder != nil && !c.pieceRequestOrder.Empty()
+       return !t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
+               for i := begin; i < end; i++ {
+                       if c.PeerHasPiece(i) {
+                               return false
+                       }
+               }
+               return true
+       })
 }
 
 func (t *torrent) extentPieces(off, _len int64) (pieces []int) {
@@ -818,3 +798,81 @@ func (t *torrent) pieceAllDirty(piece int) bool {
        }
        return true
 }
+
+func (t *torrent) forUrgentPieces(f func(piece int) (again bool)) (all bool) {
+       return t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
+               if begin < end {
+                       if !f(begin) {
+                               return false
+                       }
+               }
+               return true
+       })
+}
+
+func (t *torrent) readersChanged(cl *Client) {
+       for _, c := range t.Conns {
+               cl.replenishConnRequests(t, c)
+       }
+       cl.openNewConns(t)
+}
+
+func (t *torrent) byteRegionPieces(off, size int64) (begin, end int) {
+       if off >= t.length {
+               return
+       }
+       if off < 0 {
+               size += off
+               off = 0
+       }
+       if size <= 0 {
+               return
+       }
+       begin = int(off / t.Info.PieceLength)
+       end = int((off + size + t.Info.PieceLength - 1) / t.Info.PieceLength)
+       if end > t.Info.NumPieces() {
+               end = t.Info.NumPieces()
+       }
+       return
+}
+
+func (t *torrent) forReaderWantedRegionPieces(f func(begin, end int) (more bool)) (all bool) {
+       for r := range t.readers {
+               r.mu.Lock()
+               pos, readahead := r.pos, r.readahead
+               r.mu.Unlock()
+               if readahead < 1 {
+                       readahead = 1
+               }
+               begin, end := t.byteRegionPieces(pos, readahead)
+               if begin >= end {
+                       continue
+               }
+               if !f(begin, end) {
+                       return false
+               }
+       }
+       return true
+}
+
+func (t *torrent) piecePriority(piece int) (ret piecePriority) {
+       ret = PiecePriorityNone
+       if t.pieceComplete(piece) {
+               return
+       }
+       raiseRet := func(prio piecePriority) {
+               if prio > ret {
+                       ret = prio
+               }
+       }
+       t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
+               if piece == begin {
+                       raiseRet(PiecePriorityNow)
+               }
+               if begin <= piece && piece < end {
+                       raiseRet(PiecePriorityReadahead)
+               }
+               return true
+       })
+       return
+}