]> Sergey Matveev's repositories - btrtrc.git/blobdiff - torrent.go
Use relative availabilities to determine piece request order
[btrtrc.git] / torrent.go
index ce4d7f598140786b886ec4b0fceac4246f02001e..f82032be768a0fefda299fb902719ba18a5c6491 100644 (file)
@@ -85,7 +85,6 @@ type Torrent struct {
        files     *[]*File
 
        webSeeds map[string]*Peer
-
        // Active peer connections, running message stream loops. TODO: Make this
        // open (not-closed) connections only.
        conns               map[*PeerConn]struct{}
@@ -137,6 +136,7 @@ type Torrent struct {
        activePieceHashes         int
        initialPieceCheckDisabled bool
 
+       connsWithAllPieces map[*Peer]struct{}
        // Count of each request across active connections.
        pendingRequests map[RequestIndex]*Peer
        lastRequested   map[RequestIndex]time.Time
@@ -149,8 +149,12 @@ type Torrent struct {
        Complete chansync.Flag
 }
 
-func (t *Torrent) pieceAvailabilityFromPeers(i pieceIndex) (count int) {
+func (t *Torrent) selectivePieceAvailabilityFromPeers(i pieceIndex) (count int) {
+       // This could be done with roaring.BitSliceIndexing.
        t.iterPeers(func(peer *Peer) {
+               if _, ok := t.connsWithAllPieces[peer]; ok {
+                       return
+               }
                if peer.peerHasPiece(i) {
                        count++
                }
@@ -163,10 +167,10 @@ func (t *Torrent) decPieceAvailability(i pieceIndex) {
                return
        }
        p := t.piece(i)
-       if p.availability <= 0 {
-               panic(p.availability)
+       if p.relativeAvailability <= 0 {
+               panic(p.relativeAvailability)
        }
-       p.availability--
+       p.relativeAvailability--
        t.updatePieceRequestOrder(i)
 }
 
@@ -174,7 +178,7 @@ func (t *Torrent) incPieceAvailability(i pieceIndex) {
        // If we don't the info, this should be reconciled when we do.
        if t.haveInfo() {
                p := t.piece(i)
-               p.availability++
+               p.relativeAvailability++
                t.updatePieceRequestOrder(i)
        }
 }
@@ -443,12 +447,12 @@ func (t *Torrent) onSetInfo() {
        t.initPieceRequestOrder()
        for i := range t.pieces {
                p := &t.pieces[i]
-               // Need to add availability before updating piece completion, as that may result in conns
+               // Need to add relativeAvailability before updating piece completion, as that may result in conns
                // being dropped.
-               if p.availability != 0 {
-                       panic(p.availability)
+               if p.relativeAvailability != 0 {
+                       panic(p.relativeAvailability)
                }
-               p.availability = int64(t.pieceAvailabilityFromPeers(i))
+               p.relativeAvailability = t.selectivePieceAvailabilityFromPeers(i)
                t.addRequestOrderPiece(i)
                t.updatePieceCompletion(pieceIndex(i))
                if !t.initialPieceCheckDisabled && !p.storageCompletionOk {
@@ -571,7 +575,7 @@ func (t *Torrent) newMetadataExtensionMessage(c *PeerConn, msgType pp.ExtendedMe
 
 type pieceAvailabilityRun struct {
        Count        pieceIndex
-       Availability int64
+       Availability int
 }
 
 func (me pieceAvailabilityRun) String() string {
@@ -580,10 +584,10 @@ func (me pieceAvailabilityRun) String() string {
 
 func (t *Torrent) pieceAvailabilityRuns() (ret []pieceAvailabilityRun) {
        rle := missinggo.NewRunLengthEncoder(func(el interface{}, count uint64) {
-               ret = append(ret, pieceAvailabilityRun{Availability: el.(int64), Count: int(count)})
+               ret = append(ret, pieceAvailabilityRun{Availability: el.(int), Count: int(count)})
        })
        for i := range t.pieces {
-               rle.Append(t.pieces[i].availability, 1)
+               rle.Append(t.pieces[i].availability(), 1)
        }
        rle.Flush()
        return
@@ -836,6 +840,12 @@ func (t *Torrent) close(wg *sync.WaitGroup) (err error) {
        if t.storage != nil {
                t.deletePieceRequestOrder()
        }
+       for i := range t.pieces {
+               p := t.piece(i)
+               if p.relativeAvailability != 0 {
+                       panic(fmt.Sprintf("piece %v has relative availability %v", i, p.relativeAvailability))
+               }
+       }
        t.pex.Reset()
        t.cl.event.Broadcast()
        t.pieceStateChanges.Close()
@@ -1407,14 +1417,20 @@ func (t *Torrent) deletePeerConn(c *PeerConn) (ret bool) {
                })
        }
        t.assertPendingRequests()
+       if t.numActivePeers() == 0 && len(t.connsWithAllPieces) != 0 {
+               panic(t.connsWithAllPieces)
+       }
        return
 }
 
 func (t *Torrent) decPeerPieceAvailability(p *Peer) {
+       if t.deleteConnWithAllPieces(p) {
+               return
+       }
        if !t.haveInfo() {
                return
        }
-       p.newPeerPieces().Iterate(func(i uint32) bool {
+       p.peerPieces().Iterate(func(i uint32) bool {
                p.t.decPieceAvailability(pieceIndex(i))
                return true
        })
@@ -2319,3 +2335,20 @@ func (t *Torrent) cancelRequest(r RequestIndex) *Peer {
 func (t *Torrent) requestingPeer(r RequestIndex) *Peer {
        return t.pendingRequests[r]
 }
+
+func (t *Torrent) addConnWithAllPieces(p *Peer) {
+       if t.connsWithAllPieces == nil {
+               t.connsWithAllPieces = make(map[*Peer]struct{}, t.maxEstablishedConns)
+       }
+       t.connsWithAllPieces[p] = struct{}{}
+}
+
+func (t *Torrent) deleteConnWithAllPieces(p *Peer) bool {
+       _, ok := t.connsWithAllPieces[p]
+       delete(t.connsWithAllPieces, p)
+       return ok
+}
+
+func (t *Torrent) numActivePeers() int {
+       return len(t.conns) + len(t.webSeeds)
+}