]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Fix download rate, status output
authorMatt Joiner <anacrolix@gmail.com>
Sun, 9 May 2021 14:53:32 +0000 (00:53 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 7 Jun 2021 03:01:39 +0000 (13:01 +1000)
client.go
conn_stats.go
peerconn.go
request-strategy.go
torrent.go

index b0f537ca2ff66815429c9aba892f0dbae81e67e9..dc850a9ec204663cf170ce6b01b412030dc81cd3 100644 (file)
--- a/client.go
+++ b/client.go
@@ -968,6 +968,10 @@ func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
        return nil
 }
 
+// If peer requests are buffered on read, this instructs the amount of memory that might be used to
+// cache pending writes. Assuming 512KiB cached for sending, for 16KiB chunks.
+const localClientReqq = 1 << 5
+
 // See the order given in Transmission's tr_peerMsgsNew.
 func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
        if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() {
@@ -979,11 +983,8 @@ func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
                                        M: map[pp.ExtensionName]pp.ExtensionNumber{
                                                pp.ExtensionNameMetadata: metadataExtendedId,
                                        },
-                                       V: cl.config.ExtendedHandshakeClientVersion,
-                                       // If peer requests are buffered on read, this instructs the amount of memory
-                                       // that might be used to cache pending writes. Assuming 512KiB cached for
-                                       // sending, for 16KiB chunks.
-                                       Reqq:         1 << 5,
+                                       V:            cl.config.ExtendedHandshakeClientVersion,
+                                       Reqq:         localClientReqq,
                                        YourIp:       pp.CompactIp(conn.remoteIp()),
                                        Encryption:   cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
                                        Port:         cl.incomingPeerPort(),
index 3fbc00d867bbaaae1e4618fc1e94a9093506d7f0..d2d52003d552bfabfd39ce3e020314d58cf21b8c 100644 (file)
@@ -20,9 +20,10 @@ type ConnStats struct {
        BytesWritten     Count
        BytesWrittenData Count
 
-       BytesRead           Count
-       BytesReadData       Count
-       BytesReadUsefulData Count
+       BytesRead                   Count
+       BytesReadData               Count
+       BytesReadUsefulData         Count
+       BytesReadUsefulIntendedData Count
 
        ChunksWritten Count
 
index aa31eec80840e636620deb56ed83a599a17c2bcb..48b305a2b73d653e72808d7efdbac926baf36a0c 100644 (file)
@@ -167,7 +167,21 @@ func (cn *Peer) updateExpectingChunks() {
 }
 
 func (cn *Peer) expectingChunks() bool {
-       return len(cn.requests) != 0 && !cn.peerChoking
+       if len(cn.requests) == 0 {
+               return false
+       }
+       if !cn.interested {
+               return false
+       }
+       if !cn.peerChoking {
+               return true
+       }
+       for r := range cn.requests {
+               if cn.peerAllowedFast.Contains(bitmap.BitIndex(r.Index)) {
+                       return true
+               }
+       }
+       return false
 }
 
 // Returns true if the connection is over IPv6.
@@ -300,14 +314,20 @@ func (cn *Peer) statusFlags() (ret string) {
        return
 }
 
-// func (cn *connection) String() string {
-//     var buf bytes.Buffer
-//     cn.writeStatus(&buf, nil)
-//     return buf.String()
-// }
-
 func (cn *Peer) downloadRate() float64 {
-       return float64(cn._stats.BytesReadUsefulData.Int64()) / cn.cumInterest().Seconds()
+       num := cn._stats.BytesReadUsefulData.Int64()
+       if num == 0 {
+               return 0
+       }
+       return float64(num) / cn.totalExpectingTime().Seconds()
+}
+
+func (cn *Peer) numRequestsByPiece() (ret map[pieceIndex]int) {
+       ret = make(map[pieceIndex]int)
+       for r := range cn.requests {
+               ret[pieceIndex(r.Index)]++
+       }
+       return
 }
 
 func (cn *Peer) writeStatus(w io.Writer, t *Torrent) {
@@ -316,6 +336,12 @@ func (cn *Peer) writeStatus(w io.Writer, t *Torrent) {
                fmt.Fprint(w, "CLOSED: ")
        }
        fmt.Fprintln(w, cn.connStatusString())
+       prio, err := cn.peerPriority()
+       prioStr := fmt.Sprintf("%08x", prio)
+       if err != nil {
+               prioStr += ": " + err.Error()
+       }
+       fmt.Fprintf(w, "    bep40-prio: %v\n", prioStr)
        fmt.Fprintf(w, "    last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n",
                eventAgeString(cn.lastMessageReceived),
                eventAgeString(cn.completedHandshake),
@@ -324,29 +350,25 @@ func (cn *Peer) writeStatus(w io.Writer, t *Torrent) {
                cn.totalExpectingTime(),
        )
        fmt.Fprintf(w,
-               "    %s completed, %d pieces touched, good chunks: %v/%v-%v reqq: (%d,%d,%d]-%d, flags: %s, dr: %.1f KiB/s\n",
+               "    %s completed, %d pieces touched, good chunks: %v/%v-%v reqq: %d/(%d/%d)-%d/%d, flags: %s, dr: %.1f KiB/s\n",
                cn.completedString(),
                len(cn.peerTouchedPieces),
                &cn._stats.ChunksReadUseful,
                &cn._stats.ChunksRead,
                &cn._stats.ChunksWritten,
-               cn.requestsLowWater,
                cn.numLocalRequests(),
                cn.nominalMaxRequests(),
+               cn.PeerMaxRequests,
                len(cn.peerRequests),
+               localClientReqq,
                cn.statusFlags(),
                cn.downloadRate()/(1<<10),
        )
-       //fmt.Fprintf(w, "    next pieces: %v%s\n",
-       //      iter.ToSlice(iter.Head(10, cn.iterPendingPiecesUntyped)),
-       //      func() string {
-       //              if cn == t.fastestPeer {
-       //                      return " (fastest)"
-       //              } else {
-       //                      return ""
-       //              }
-       //      }(),
-       //)
+       fmt.Fprintf(w, "    requested pieces:")
+       for piece, count := range cn.numRequestsByPiece() {
+               fmt.Fprintf(w, " %v (%v)", piece, count)
+       }
+       fmt.Fprintf(w, "\n")
 }
 
 func (cn *Peer) close() {
@@ -445,7 +467,7 @@ func (cn *PeerConn) requestedMetadataPiece(index int) bool {
 
 // The actual value to use as the maximum outbound requests.
 func (cn *Peer) nominalMaxRequests() (ret int) {
-       return cn.PeerMaxRequests
+       return int(clamp(1, int64(cn.PeerMaxRequests), 64))
 }
 
 func (cn *Peer) totalExpectingTime() (ret time.Duration) {
@@ -1277,8 +1299,10 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
                chunksReceived.Add("due to allowed fast", 1)
        }
 
-       // TODO: This needs to happen immediately, to prevent cancels occurring asynchronously when have
-       // actually already received the piece, while we have the Client unlocked to write the data out.
+       // The request needs to be deleted immediately to prevent cancels occurring asynchronously when
+       // have actually already received the piece, while we have the Client unlocked to write the data
+       // out.
+       deletedRequest := false
        {
                if _, ok := c.requests[req]; ok {
                        for _, f := range c.callbacks.ReceivedRequested {
@@ -1287,7 +1311,8 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
                }
                // Request has been satisfied.
                if c.deleteRequest(req) {
-                       if c.expectingChunks() {
+                       deletedRequest = true
+                       if !c.peerChoking {
                                c._chunksReceivedWhileExpecting++
                        }
                } else {
@@ -1306,14 +1331,13 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
 
        c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
        c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData }))
+       if deletedRequest {
+               c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulIntendedData }))
+       }
        for _, f := range c.t.cl.config.Callbacks.ReceivedUsefulData {
                f(ReceivedUsefulDataEvent{c, msg})
        }
        c.lastUsefulChunkReceived = time.Now()
-       // if t.fastestPeer != c {
-       // log.Printf("setting fastest connection %p", c)
-       // }
-       t.fastestPeer = c
 
        // Need to record that it hasn't been written yet, before we attempt to do
        // anything with it.
@@ -1591,14 +1615,6 @@ func (l connectionTrust) Less(r connectionTrust) bool {
        return multiless.New().Bool(l.Implicit, r.Implicit).Int64(l.NetGoodPiecesDirted, r.NetGoodPiecesDirted).Less()
 }
 
-func (cn *Peer) chunksReceivedWhileExpecting() int64 {
-       return cn._chunksReceivedWhileExpecting
-}
-
-func (cn *Peer) fastest() bool {
-       return cn == cn.t.fastestPeer
-}
-
 func (cn *Peer) peerMaxRequests() int {
        return cn.PeerMaxRequests
 }
index d762d0f5cfc1d37dd7722f7f46996b23356a529a..e7349aef49cb50aaae75da7a2de5f63c198f8ad9 100644 (file)
@@ -3,6 +3,7 @@ package torrent
 import (
        "sort"
        "time"
+       "unsafe"
 
        "github.com/anacrolix/multiless"
        pp "github.com/anacrolix/torrent/peer_protocol"
@@ -56,11 +57,13 @@ func (me clientPieceRequestOrder) update() {
 func (me clientPieceRequestOrder) less(_i, _j int) bool {
        i := me.pieces[_i]
        j := me.pieces[_j]
-       ml := multiless.New()
-       ml.Int(int(j.prio), int(i.prio))
-       ml.Bool(j.partial, i.partial)
-       ml.Int(i.availability, j.availability)
-       return ml.Less()
+       return multiless.New().Int(
+               int(j.prio), int(i.prio),
+       ).Bool(
+               j.partial, i.partial,
+       ).Int(
+               i.availability, j.availability,
+       ).Less()
 }
 
 func (cl *Client) requester() {
@@ -81,45 +84,87 @@ func (cl *Client) requester() {
 func (cl *Client) doRequests() {
        requestOrder := clientPieceRequestOrder{}
        allPeers := make(map[*Torrent][]*Peer)
-       storageCapacity := make(map[*Torrent]*int64)
+       // Storage capacity left for this run, keyed by the storage capacity pointer on the storage
+       // TorrentImpl.
+       storageLeft := make(map[*func() *int64]*int64)
        for _, t := range cl.torrents {
                // TODO: We could do metainfo requests here.
                if t.haveInfo() {
-                       value := int64(t.usualPieceSize())
-                       storageCapacity[t] = &value
+                       if t.storage.Capacity != nil {
+                               if _, ok := storageLeft[t.storage.Capacity]; !ok {
+                                       storageLeft[t.storage.Capacity] = (*t.storage.Capacity)()
+                               }
+                       }
                        requestOrder.addPieces(t, t.numPieces())
                }
                var peers []*Peer
                t.iterPeers(func(p *Peer) {
-                       peers = append(peers, p)
+                       if !p.closed.IsSet() {
+                               peers = append(peers, p)
+                       }
+               })
+               // Sort in *desc* order, approximately the reverse of worseConn where appropriate.
+               sort.Slice(peers, func(i, j int) bool {
+                       return multiless.New().Float64(
+                               peers[j].downloadRate(), peers[i].downloadRate(),
+                       ).Uintptr(
+                               uintptr(unsafe.Pointer(peers[j])), uintptr(unsafe.Pointer(peers[i]))).Less()
                })
                allPeers[t] = peers
        }
        requestOrder.update()
        requestOrder.sort()
+       // For a given piece, the set of allPeers indices that absorbed requests for the piece.
+       contributed := make(map[int]struct{})
        for _, p := range requestOrder.pieces {
                if p.t.ignorePieceForRequests(p.index) {
                        continue
                }
                peers := allPeers[p.t]
                torrentPiece := p.t.piece(p.index)
-               if left := storageCapacity[p.t]; left != nil {
+               if left := storageLeft[p.t.storage.Capacity]; left != nil {
                        if *left < int64(torrentPiece.length()) {
                                continue
                        }
                        *left -= int64(torrentPiece.length())
                }
                p.t.piece(p.index).iterUndirtiedChunks(func(chunk ChunkSpec) bool {
-                       for _, peer := range peers {
-                               req := Request{pp.Integer(p.index), chunk}
-                               _, err := peer.request(req)
-                               if err == nil {
-                                       //log.Printf("requested %v", req)
-                                       break
+                       req := Request{pp.Integer(p.index), chunk}
+                       const skipAlreadyRequested = false
+                       if skipAlreadyRequested {
+                               alreadyRequested := false
+                               p.t.iterPeers(func(p *Peer) {
+                                       if _, ok := p.requests[req]; ok {
+                                               alreadyRequested = true
+                                       }
+                               })
+                               if alreadyRequested {
+                                       return true
+                               }
+                       }
+                       alreadyRequested := false
+                       for peerIndex, peer := range peers {
+                               if alreadyRequested {
+                                       // Cancel all requests from "slower" peers after the one that requested it.
+                                       peer.cancel(req)
+                               } else {
+                                       err := peer.request(req)
+                                       if err == nil {
+                                               contributed[peerIndex] = struct{}{}
+                                               alreadyRequested = true
+                                               //log.Printf("requested %v", req)
+                                       }
                                }
                        }
                        return true
                })
+               // Move requestees for this piece to the back.
+               lastIndex := len(peers) - 1
+               for peerIndex := range contributed {
+                       peers[peerIndex], peers[lastIndex] = peers[lastIndex], peers[peerIndex]
+                       delete(contributed, peerIndex)
+                       lastIndex--
+               }
        }
        for _, t := range cl.torrents {
                t.iterPeers(func(p *Peer) {
index b6a6f2b7e76af18dae8caa955cb9317b472830ca..6eab0d69e616686a5c60787f560adec6b0818d0a 100644 (file)
@@ -17,6 +17,7 @@ import (
        "time"
        "unsafe"
 
+       "github.com/anacrolix/multiless"
        "github.com/anacrolix/torrent/common"
        "github.com/anacrolix/torrent/segments"
        "github.com/anacrolix/torrent/webseed"
@@ -91,8 +92,7 @@ type Torrent struct {
        maxEstablishedConns int
        // Set of addrs to which we're attempting to connect. Connections are
        // half-open until all handshakes are completed.
-       halfOpen    map[string]PeerInfo
-       fastestPeer *Peer
+       halfOpen map[string]PeerInfo
 
        // Reserve of peers to connect to. A peer can be both here and in the
        // active connections if were told about the peer after connecting with
@@ -156,20 +156,6 @@ func (t *Torrent) pieceAvailability(i pieceIndex) (count int) {
        return
 }
 
-func (t *Torrent) sortPieceRequestOrder(sl []pieceIndex) {
-       if len(sl) != t.numPieces() {
-               panic(len(sl))
-       }
-       availability := make([]int, len(sl))
-       t.iterPeers(func(peer *Peer) {
-               for i := range availability {
-                       if peer.peerHasPiece(i) {
-                               availability[i]++
-                       }
-               }
-       })
-}
-
 func (t *Torrent) numConns() int {
        return len(t.conns)
 }
@@ -665,8 +651,15 @@ func (t *Torrent) writeStatus(w io.Writer) {
        spew.Fdump(w, t.statsLocked())
 
        peers := t.peersAsSlice()
-       sort.Slice(peers, func(i, j int) bool {
-               return worseConn(peers[i], peers[j])
+       sort.Slice(peers, func(_i, _j int) bool {
+               i := peers[_i]
+               j := peers[_j]
+               if less, ok := multiless.New().EagerSameLess(
+                       i.downloadRate() == j.downloadRate(), i.downloadRate() < j.downloadRate(),
+               ).LessOk(); ok {
+                       return less
+               }
+               return worseConn(i, j)
        })
        for i, c := range peers {
                fmt.Fprintf(w, "%2d. ", i+1)