From: Matt Joiner Date: Sun, 9 May 2021 14:53:32 +0000 (+1000) Subject: Fix download rate, status output X-Git-Tag: v1.29.0~31^2~53 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=56e2a8a3a6e5fbfd27a09e685c8524b3c43dc7cd;p=btrtrc.git Fix download rate, status output --- diff --git a/client.go b/client.go index b0f537ca..dc850a9e 100644 --- a/client.go +++ b/client.go @@ -968,6 +968,10 @@ func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error { return nil } +// If peer requests are buffered on read, this instructs the amount of memory that might be used to +// cache pending writes. Assuming 512KiB cached for sending, for 16KiB chunks. +const localClientReqq = 1 << 5 + // See the order given in Transmission's tr_peerMsgsNew. func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) { if conn.PeerExtensionBytes.SupportsExtended() && cl.config.Extensions.SupportsExtended() { @@ -979,11 +983,8 @@ func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) { M: map[pp.ExtensionName]pp.ExtensionNumber{ pp.ExtensionNameMetadata: metadataExtendedId, }, - V: cl.config.ExtendedHandshakeClientVersion, - // If peer requests are buffered on read, this instructs the amount of memory - // that might be used to cache pending writes. Assuming 512KiB cached for - // sending, for 16KiB chunks. - Reqq: 1 << 5, + V: cl.config.ExtendedHandshakeClientVersion, + Reqq: localClientReqq, YourIp: pp.CompactIp(conn.remoteIp()), Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred, Port: cl.incomingPeerPort(), diff --git a/conn_stats.go b/conn_stats.go index 3fbc00d8..d2d52003 100644 --- a/conn_stats.go +++ b/conn_stats.go @@ -20,9 +20,10 @@ type ConnStats struct { BytesWritten Count BytesWrittenData Count - BytesRead Count - BytesReadData Count - BytesReadUsefulData Count + BytesRead Count + BytesReadData Count + BytesReadUsefulData Count + BytesReadUsefulIntendedData Count ChunksWritten Count diff --git a/peerconn.go b/peerconn.go index aa31eec8..48b305a2 100644 --- a/peerconn.go +++ b/peerconn.go @@ -167,7 +167,21 @@ func (cn *Peer) updateExpectingChunks() { } func (cn *Peer) expectingChunks() bool { - return len(cn.requests) != 0 && !cn.peerChoking + if len(cn.requests) == 0 { + return false + } + if !cn.interested { + return false + } + if !cn.peerChoking { + return true + } + for r := range cn.requests { + if cn.peerAllowedFast.Contains(bitmap.BitIndex(r.Index)) { + return true + } + } + return false } // Returns true if the connection is over IPv6. @@ -300,14 +314,20 @@ func (cn *Peer) statusFlags() (ret string) { return } -// func (cn *connection) String() string { -// var buf bytes.Buffer -// cn.writeStatus(&buf, nil) -// return buf.String() -// } - func (cn *Peer) downloadRate() float64 { - return float64(cn._stats.BytesReadUsefulData.Int64()) / cn.cumInterest().Seconds() + num := cn._stats.BytesReadUsefulData.Int64() + if num == 0 { + return 0 + } + return float64(num) / cn.totalExpectingTime().Seconds() +} + +func (cn *Peer) numRequestsByPiece() (ret map[pieceIndex]int) { + ret = make(map[pieceIndex]int) + for r := range cn.requests { + ret[pieceIndex(r.Index)]++ + } + return } func (cn *Peer) writeStatus(w io.Writer, t *Torrent) { @@ -316,6 +336,12 @@ func (cn *Peer) writeStatus(w io.Writer, t *Torrent) { fmt.Fprint(w, "CLOSED: ") } fmt.Fprintln(w, cn.connStatusString()) + prio, err := cn.peerPriority() + prioStr := fmt.Sprintf("%08x", prio) + if err != nil { + prioStr += ": " + err.Error() + } + fmt.Fprintf(w, " bep40-prio: %v\n", prioStr) fmt.Fprintf(w, " last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n", eventAgeString(cn.lastMessageReceived), eventAgeString(cn.completedHandshake), @@ -324,29 +350,25 @@ func (cn *Peer) writeStatus(w io.Writer, t *Torrent) { cn.totalExpectingTime(), ) fmt.Fprintf(w, - " %s completed, %d pieces touched, good chunks: %v/%v-%v reqq: (%d,%d,%d]-%d, flags: %s, dr: %.1f KiB/s\n", + " %s completed, %d pieces touched, good chunks: %v/%v-%v reqq: %d/(%d/%d)-%d/%d, flags: %s, dr: %.1f KiB/s\n", cn.completedString(), len(cn.peerTouchedPieces), &cn._stats.ChunksReadUseful, &cn._stats.ChunksRead, &cn._stats.ChunksWritten, - cn.requestsLowWater, cn.numLocalRequests(), cn.nominalMaxRequests(), + cn.PeerMaxRequests, len(cn.peerRequests), + localClientReqq, cn.statusFlags(), cn.downloadRate()/(1<<10), ) - //fmt.Fprintf(w, " next pieces: %v%s\n", - // iter.ToSlice(iter.Head(10, cn.iterPendingPiecesUntyped)), - // func() string { - // if cn == t.fastestPeer { - // return " (fastest)" - // } else { - // return "" - // } - // }(), - //) + fmt.Fprintf(w, " requested pieces:") + for piece, count := range cn.numRequestsByPiece() { + fmt.Fprintf(w, " %v (%v)", piece, count) + } + fmt.Fprintf(w, "\n") } func (cn *Peer) close() { @@ -445,7 +467,7 @@ func (cn *PeerConn) requestedMetadataPiece(index int) bool { // The actual value to use as the maximum outbound requests. func (cn *Peer) nominalMaxRequests() (ret int) { - return cn.PeerMaxRequests + return int(clamp(1, int64(cn.PeerMaxRequests), 64)) } func (cn *Peer) totalExpectingTime() (ret time.Duration) { @@ -1277,8 +1299,10 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { chunksReceived.Add("due to allowed fast", 1) } - // TODO: This needs to happen immediately, to prevent cancels occurring asynchronously when have - // actually already received the piece, while we have the Client unlocked to write the data out. + // The request needs to be deleted immediately to prevent cancels occurring asynchronously when + // have actually already received the piece, while we have the Client unlocked to write the data + // out. + deletedRequest := false { if _, ok := c.requests[req]; ok { for _, f := range c.callbacks.ReceivedRequested { @@ -1287,7 +1311,8 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { } // Request has been satisfied. if c.deleteRequest(req) { - if c.expectingChunks() { + deletedRequest = true + if !c.peerChoking { c._chunksReceivedWhileExpecting++ } } else { @@ -1306,14 +1331,13 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful })) c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData })) + if deletedRequest { + c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulIntendedData })) + } for _, f := range c.t.cl.config.Callbacks.ReceivedUsefulData { f(ReceivedUsefulDataEvent{c, msg}) } c.lastUsefulChunkReceived = time.Now() - // if t.fastestPeer != c { - // log.Printf("setting fastest connection %p", c) - // } - t.fastestPeer = c // Need to record that it hasn't been written yet, before we attempt to do // anything with it. @@ -1591,14 +1615,6 @@ func (l connectionTrust) Less(r connectionTrust) bool { return multiless.New().Bool(l.Implicit, r.Implicit).Int64(l.NetGoodPiecesDirted, r.NetGoodPiecesDirted).Less() } -func (cn *Peer) chunksReceivedWhileExpecting() int64 { - return cn._chunksReceivedWhileExpecting -} - -func (cn *Peer) fastest() bool { - return cn == cn.t.fastestPeer -} - func (cn *Peer) peerMaxRequests() int { return cn.PeerMaxRequests } diff --git a/request-strategy.go b/request-strategy.go index d762d0f5..e7349aef 100644 --- a/request-strategy.go +++ b/request-strategy.go @@ -3,6 +3,7 @@ package torrent import ( "sort" "time" + "unsafe" "github.com/anacrolix/multiless" pp "github.com/anacrolix/torrent/peer_protocol" @@ -56,11 +57,13 @@ func (me clientPieceRequestOrder) update() { func (me clientPieceRequestOrder) less(_i, _j int) bool { i := me.pieces[_i] j := me.pieces[_j] - ml := multiless.New() - ml.Int(int(j.prio), int(i.prio)) - ml.Bool(j.partial, i.partial) - ml.Int(i.availability, j.availability) - return ml.Less() + return multiless.New().Int( + int(j.prio), int(i.prio), + ).Bool( + j.partial, i.partial, + ).Int( + i.availability, j.availability, + ).Less() } func (cl *Client) requester() { @@ -81,45 +84,87 @@ func (cl *Client) requester() { func (cl *Client) doRequests() { requestOrder := clientPieceRequestOrder{} allPeers := make(map[*Torrent][]*Peer) - storageCapacity := make(map[*Torrent]*int64) + // Storage capacity left for this run, keyed by the storage capacity pointer on the storage + // TorrentImpl. + storageLeft := make(map[*func() *int64]*int64) for _, t := range cl.torrents { // TODO: We could do metainfo requests here. if t.haveInfo() { - value := int64(t.usualPieceSize()) - storageCapacity[t] = &value + if t.storage.Capacity != nil { + if _, ok := storageLeft[t.storage.Capacity]; !ok { + storageLeft[t.storage.Capacity] = (*t.storage.Capacity)() + } + } requestOrder.addPieces(t, t.numPieces()) } var peers []*Peer t.iterPeers(func(p *Peer) { - peers = append(peers, p) + if !p.closed.IsSet() { + peers = append(peers, p) + } + }) + // Sort in *desc* order, approximately the reverse of worseConn where appropriate. + sort.Slice(peers, func(i, j int) bool { + return multiless.New().Float64( + peers[j].downloadRate(), peers[i].downloadRate(), + ).Uintptr( + uintptr(unsafe.Pointer(peers[j])), uintptr(unsafe.Pointer(peers[i]))).Less() }) allPeers[t] = peers } requestOrder.update() requestOrder.sort() + // For a given piece, the set of allPeers indices that absorbed requests for the piece. + contributed := make(map[int]struct{}) for _, p := range requestOrder.pieces { if p.t.ignorePieceForRequests(p.index) { continue } peers := allPeers[p.t] torrentPiece := p.t.piece(p.index) - if left := storageCapacity[p.t]; left != nil { + if left := storageLeft[p.t.storage.Capacity]; left != nil { if *left < int64(torrentPiece.length()) { continue } *left -= int64(torrentPiece.length()) } p.t.piece(p.index).iterUndirtiedChunks(func(chunk ChunkSpec) bool { - for _, peer := range peers { - req := Request{pp.Integer(p.index), chunk} - _, err := peer.request(req) - if err == nil { - //log.Printf("requested %v", req) - break + req := Request{pp.Integer(p.index), chunk} + const skipAlreadyRequested = false + if skipAlreadyRequested { + alreadyRequested := false + p.t.iterPeers(func(p *Peer) { + if _, ok := p.requests[req]; ok { + alreadyRequested = true + } + }) + if alreadyRequested { + return true + } + } + alreadyRequested := false + for peerIndex, peer := range peers { + if alreadyRequested { + // Cancel all requests from "slower" peers after the one that requested it. + peer.cancel(req) + } else { + err := peer.request(req) + if err == nil { + contributed[peerIndex] = struct{}{} + alreadyRequested = true + //log.Printf("requested %v", req) + } } } return true }) + // Move requestees for this piece to the back. + lastIndex := len(peers) - 1 + for peerIndex := range contributed { + peers[peerIndex], peers[lastIndex] = peers[lastIndex], peers[peerIndex] + delete(contributed, peerIndex) + lastIndex-- + } } for _, t := range cl.torrents { t.iterPeers(func(p *Peer) { diff --git a/torrent.go b/torrent.go index b6a6f2b7..6eab0d69 100644 --- a/torrent.go +++ b/torrent.go @@ -17,6 +17,7 @@ import ( "time" "unsafe" + "github.com/anacrolix/multiless" "github.com/anacrolix/torrent/common" "github.com/anacrolix/torrent/segments" "github.com/anacrolix/torrent/webseed" @@ -91,8 +92,7 @@ type Torrent struct { maxEstablishedConns int // Set of addrs to which we're attempting to connect. Connections are // half-open until all handshakes are completed. - halfOpen map[string]PeerInfo - fastestPeer *Peer + halfOpen map[string]PeerInfo // Reserve of peers to connect to. A peer can be both here and in the // active connections if were told about the peer after connecting with @@ -156,20 +156,6 @@ func (t *Torrent) pieceAvailability(i pieceIndex) (count int) { return } -func (t *Torrent) sortPieceRequestOrder(sl []pieceIndex) { - if len(sl) != t.numPieces() { - panic(len(sl)) - } - availability := make([]int, len(sl)) - t.iterPeers(func(peer *Peer) { - for i := range availability { - if peer.peerHasPiece(i) { - availability[i]++ - } - } - }) -} - func (t *Torrent) numConns() int { return len(t.conns) } @@ -665,8 +651,15 @@ func (t *Torrent) writeStatus(w io.Writer) { spew.Fdump(w, t.statsLocked()) peers := t.peersAsSlice() - sort.Slice(peers, func(i, j int) bool { - return worseConn(peers[i], peers[j]) + sort.Slice(peers, func(_i, _j int) bool { + i := peers[_i] + j := peers[_j] + if less, ok := multiless.New().EagerSameLess( + i.downloadRate() == j.downloadRate(), i.downloadRate() < j.downloadRate(), + ).LessOk(); ok { + return less + } + return worseConn(i, j) }) for i, c := range peers { fmt.Fprintf(w, "%2d. ", i+1)