]> Sergey Matveev's repositories - btrtrc.git/blobdiff - peerconn.go
Fix download rate, status output
[btrtrc.git] / peerconn.go
index aa31eec80840e636620deb56ed83a599a17c2bcb..48b305a2b73d653e72808d7efdbac926baf36a0c 100644 (file)
@@ -167,7 +167,21 @@ func (cn *Peer) updateExpectingChunks() {
 }
 
 func (cn *Peer) expectingChunks() bool {
-       return len(cn.requests) != 0 && !cn.peerChoking
+       if len(cn.requests) == 0 {
+               return false
+       }
+       if !cn.interested {
+               return false
+       }
+       if !cn.peerChoking {
+               return true
+       }
+       for r := range cn.requests {
+               if cn.peerAllowedFast.Contains(bitmap.BitIndex(r.Index)) {
+                       return true
+               }
+       }
+       return false
 }
 
 // Returns true if the connection is over IPv6.
@@ -300,14 +314,20 @@ func (cn *Peer) statusFlags() (ret string) {
        return
 }
 
-// func (cn *connection) String() string {
-//     var buf bytes.Buffer
-//     cn.writeStatus(&buf, nil)
-//     return buf.String()
-// }
-
 func (cn *Peer) downloadRate() float64 {
-       return float64(cn._stats.BytesReadUsefulData.Int64()) / cn.cumInterest().Seconds()
+       num := cn._stats.BytesReadUsefulData.Int64()
+       if num == 0 {
+               return 0
+       }
+       return float64(num) / cn.totalExpectingTime().Seconds()
+}
+
+func (cn *Peer) numRequestsByPiece() (ret map[pieceIndex]int) {
+       ret = make(map[pieceIndex]int)
+       for r := range cn.requests {
+               ret[pieceIndex(r.Index)]++
+       }
+       return
 }
 
 func (cn *Peer) writeStatus(w io.Writer, t *Torrent) {
@@ -316,6 +336,12 @@ func (cn *Peer) writeStatus(w io.Writer, t *Torrent) {
                fmt.Fprint(w, "CLOSED: ")
        }
        fmt.Fprintln(w, cn.connStatusString())
+       prio, err := cn.peerPriority()
+       prioStr := fmt.Sprintf("%08x", prio)
+       if err != nil {
+               prioStr += ": " + err.Error()
+       }
+       fmt.Fprintf(w, "    bep40-prio: %v\n", prioStr)
        fmt.Fprintf(w, "    last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n",
                eventAgeString(cn.lastMessageReceived),
                eventAgeString(cn.completedHandshake),
@@ -324,29 +350,25 @@ func (cn *Peer) writeStatus(w io.Writer, t *Torrent) {
                cn.totalExpectingTime(),
        )
        fmt.Fprintf(w,
-               "    %s completed, %d pieces touched, good chunks: %v/%v-%v reqq: (%d,%d,%d]-%d, flags: %s, dr: %.1f KiB/s\n",
+               "    %s completed, %d pieces touched, good chunks: %v/%v-%v reqq: %d/(%d/%d)-%d/%d, flags: %s, dr: %.1f KiB/s\n",
                cn.completedString(),
                len(cn.peerTouchedPieces),
                &cn._stats.ChunksReadUseful,
                &cn._stats.ChunksRead,
                &cn._stats.ChunksWritten,
-               cn.requestsLowWater,
                cn.numLocalRequests(),
                cn.nominalMaxRequests(),
+               cn.PeerMaxRequests,
                len(cn.peerRequests),
+               localClientReqq,
                cn.statusFlags(),
                cn.downloadRate()/(1<<10),
        )
-       //fmt.Fprintf(w, "    next pieces: %v%s\n",
-       //      iter.ToSlice(iter.Head(10, cn.iterPendingPiecesUntyped)),
-       //      func() string {
-       //              if cn == t.fastestPeer {
-       //                      return " (fastest)"
-       //              } else {
-       //                      return ""
-       //              }
-       //      }(),
-       //)
+       fmt.Fprintf(w, "    requested pieces:")
+       for piece, count := range cn.numRequestsByPiece() {
+               fmt.Fprintf(w, " %v (%v)", piece, count)
+       }
+       fmt.Fprintf(w, "\n")
 }
 
 func (cn *Peer) close() {
@@ -445,7 +467,7 @@ func (cn *PeerConn) requestedMetadataPiece(index int) bool {
 
 // The actual value to use as the maximum outbound requests.
 func (cn *Peer) nominalMaxRequests() (ret int) {
-       return cn.PeerMaxRequests
+       return int(clamp(1, int64(cn.PeerMaxRequests), 64))
 }
 
 func (cn *Peer) totalExpectingTime() (ret time.Duration) {
@@ -1277,8 +1299,10 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
                chunksReceived.Add("due to allowed fast", 1)
        }
 
-       // TODO: This needs to happen immediately, to prevent cancels occurring asynchronously when have
-       // actually already received the piece, while we have the Client unlocked to write the data out.
+       // The request needs to be deleted immediately to prevent cancels occurring asynchronously when
+       // have actually already received the piece, while we have the Client unlocked to write the data
+       // out.
+       deletedRequest := false
        {
                if _, ok := c.requests[req]; ok {
                        for _, f := range c.callbacks.ReceivedRequested {
@@ -1287,7 +1311,8 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
                }
                // Request has been satisfied.
                if c.deleteRequest(req) {
-                       if c.expectingChunks() {
+                       deletedRequest = true
+                       if !c.peerChoking {
                                c._chunksReceivedWhileExpecting++
                        }
                } else {
@@ -1306,14 +1331,13 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
 
        c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
        c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData }))
+       if deletedRequest {
+               c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulIntendedData }))
+       }
        for _, f := range c.t.cl.config.Callbacks.ReceivedUsefulData {
                f(ReceivedUsefulDataEvent{c, msg})
        }
        c.lastUsefulChunkReceived = time.Now()
-       // if t.fastestPeer != c {
-       // log.Printf("setting fastest connection %p", c)
-       // }
-       t.fastestPeer = c
 
        // Need to record that it hasn't been written yet, before we attempt to do
        // anything with it.
@@ -1591,14 +1615,6 @@ func (l connectionTrust) Less(r connectionTrust) bool {
        return multiless.New().Bool(l.Implicit, r.Implicit).Int64(l.NetGoodPiecesDirted, r.NetGoodPiecesDirted).Less()
 }
 
-func (cn *Peer) chunksReceivedWhileExpecting() int64 {
-       return cn._chunksReceivedWhileExpecting
-}
-
-func (cn *Peer) fastest() bool {
-       return cn == cn.t.fastestPeer
-}
-
 func (cn *Peer) peerMaxRequests() int {
        return cn.PeerMaxRequests
 }