pp "github.com/anacrolix/torrent/peer_protocol"
)
+// Various connection-level metrics. At the Torrent level these are
+// aggregates. Chunks are messages with data payloads. Data is actual torrent
+// content without any overhead. Useful is something we needed locally.
+// Unwanted is something we didn't ask for (but may still be useful). Written
+// is things sent to the peer, and Read is stuff received from them.
type ConnStats struct {
- // Torrent "piece" messages, or data chunks.
- ChunksWritten int64 // Num piece messages sent.
- ChunksRead int64
// Total bytes on the wire. Includes handshakes and encryption.
BytesWritten int64
BytesRead int64
- // Data bytes, actual torrent data.
- DataBytesWritten int64
- DataBytesRead int64
+
+ // The rest of the stats only occur on connections after handshakes.
+
+ ChunksWritten int64
+
+ ChunksRead int64
+ ChunksReadUseful int64
+ ChunksReadUnwanted int64
+
+ DataBytesWritten int64
+ DataBytesRead int64
+ UsefulDataBytesRead int64
+
+ // Number of pieces data was written to, that subsequently passed verification.
+ GoodPiecesDirtied int64
+ // Number of pieces data was written to, that subsequently failed
+ // verification. Note that a connection may not have been the sole dirtier
+ // of a piece.
+ BadPiecesDirtied int64
}
func (cs *ConnStats) wroteMsg(msg *pp.Message) {
uTP bool
closed missinggo.Event
- stats ConnStats
- UnwantedChunksReceived int
- UsefulChunksReceived int
- chunksSent int
- goodPiecesDirtied int
- badPiecesDirtied int
+ stats ConnStats
lastMessageReceived time.Time
completedHandshake time.Time
" %s completed, %d pieces touched, good chunks: %d/%d-%d reqq: %d-%d, flags: %s\n",
cn.completedString(),
len(cn.peerTouchedPieces),
- cn.UsefulChunksReceived,
- cn.UnwantedChunksReceived+cn.UsefulChunksReceived,
- cn.chunksSent,
+ cn.stats.ChunksReadUseful,
+ cn.stats.ChunksReadUnwanted+cn.stats.ChunksReadUseful,
+ cn.stats.ChunksWritten,
cn.numLocalRequests(),
len(cn.PeerRequests),
cn.statusFlags(),
// Do we actually want this chunk?
if !t.wantPiece(req) {
unwantedChunksReceived.Add(1)
- c.UnwantedChunksReceived++
+ c.stats.ChunksReadUnwanted++
return
}
index := int(req.Index)
piece := &t.pieces[index]
- c.UsefulChunksReceived++
+ c.stats.ChunksReadUseful++
c.lastUsefulChunkReceived = time.Now()
// if t.fastestConn != c {
// log.Printf("setting fastest connection %p", c)
t.publishPieceChange(int(req.Index))
}
-// Also handles choking and unchoking of the remote peer.
-func (c *connection) upload(msg func(pp.Message) bool) bool {
- t := c.t
- cl := t.cl
- if cl.config.NoUpload {
- return true
+func (c *connection) uploadAllowed() bool {
+ if c.t.cl.config.NoUpload {
+ return false
}
- if !c.PeerInterested {
+ if c.t.seeding() {
return true
}
- seeding := t.seeding()
- if !seeding && !c.peerHasWantedPieces() {
- // There's no reason to upload to this peer.
- return true
+ if !c.peerHasWantedPieces() {
+ return false
+ }
+ // Don't upload more than 100 KiB more than we download.
+ if c.stats.DataBytesWritten >= c.stats.DataBytesRead+100<<10 {
+ return false
}
+ return true
+}
+
+func (c *connection) setRetryUploadTimer(delay time.Duration) {
+ if c.uploadTimer == nil {
+ c.uploadTimer = time.AfterFunc(delay, c.writerCond.Broadcast)
+ } else {
+ c.uploadTimer.Reset(delay)
+ }
+}
+
+// Also handles choking and unchoking of the remote peer.
+func (c *connection) upload(msg func(pp.Message) bool) bool {
// Breaking or completing this loop means we don't want to upload to the
// peer anymore, and we choke them.
another:
- for seeding || c.chunksSent < c.UsefulChunksReceived+6 {
+ for c.uploadAllowed() {
// We want to upload to the peer.
if !c.Unchoke(msg) {
return false
}
for r := range c.PeerRequests {
- res := cl.uploadLimit.ReserveN(time.Now(), int(r.Length))
+ res := c.t.cl.uploadLimit.ReserveN(time.Now(), int(r.Length))
if !res.OK() {
panic(fmt.Sprintf("upload rate limiter burst size < %d", r.Length))
}
delay := res.Delay()
if delay > 0 {
res.Cancel()
- if c.uploadTimer == nil {
- c.uploadTimer = time.AfterFunc(delay, c.writerCond.Broadcast)
- } else {
- c.uploadTimer.Reset(delay)
- }
+ c.setRetryUploadTimer(delay)
// Hard to say what to return here.
return true
}
more, err := c.sendChunk(r, msg)
if err != nil {
i := int(r.Index)
- if t.pieceComplete(i) {
- t.updatePieceCompletion(i)
- if !t.pieceComplete(i) {
+ if c.t.pieceComplete(i) {
+ c.t.updatePieceCompletion(i)
+ if !c.t.pieceComplete(i) {
// We had the piece, but not anymore.
break another
}
cn.t.dropConnection(cn)
}
-func (cn *connection) netGoodPiecesDirtied() int {
- return cn.goodPiecesDirtied - cn.badPiecesDirtied
+func (cn *connection) netGoodPiecesDirtied() int64 {
+ return cn.stats.GoodPiecesDirtied - cn.stats.BadPiecesDirtied
}
func (c *connection) peerHasWantedPieces() bool {
Begin: r.Begin,
Piece: b,
})
- c.chunksSent++
uploadChunksPosted.Add(1)
c.lastChunkSent = time.Now()
return
heap.Init(&wcs)
for wcs.Len() != 0 {
c := heap.Pop(&wcs).(*connection)
- if c.UnwantedChunksReceived >= 6 && c.UnwantedChunksReceived > c.UsefulChunksReceived {
+ if c.stats.ChunksReadUnwanted >= 6 && c.stats.ChunksReadUnwanted > c.stats.ChunksReadUseful {
return c
}
if wcs.Len() >= (t.maxEstablishedConns+1)/2 {
p.everHashed = true
if correct {
for _, c := range touchers {
- c.goodPiecesDirtied++
+ c.stats.GoodPiecesDirtied++
}
err := p.Storage().MarkComplete()
if err != nil {
if len(touchers) != 0 {
for _, c := range touchers {
// Y u do dis peer?!
- c.badPiecesDirtied++
+ c.stats.BadPiecesDirtied++
}
slices.Sort(touchers, connLessTrusted)
if t.cl.config.Debug {
- log.Printf("dropping first corresponding conn from trust: %v", func() (ret []int) {
+ log.Printf("dropping first corresponding conn from trust: %v", func() (ret []int64) {
for _, c := range touchers {
ret = append(ret, c.netGoodPiecesDirtied())
}