From: Matt Joiner Date: Fri, 2 Feb 2018 08:04:56 +0000 (+1100) Subject: Merge a bunch of stuff into ConnStats and refactor connection.upload X-Git-Tag: v1.0.0~229 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=b0c1f9950956ee34cf92c1aaa767d1fea8975ff8;p=btrtrc.git Merge a bunch of stuff into ConnStats and refactor connection.upload --- diff --git a/client.go b/client.go index 54a8b6ef..efe5f736 100644 --- a/client.go +++ b/client.go @@ -950,7 +950,7 @@ func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connect return fmt.Errorf("data has bad offset in payload: %d", begin) } t.saveMetadataPiece(piece, payload[begin:]) - c.UsefulChunksReceived++ + c.stats.ChunksReadUseful++ c.lastUsefulChunkReceived = time.Now() return t.maybeCompleteMetadata() case pp.RequestMetadataExtensionMsgType: diff --git a/conn_stats.go b/conn_stats.go index 541e2c5f..89650c9a 100644 --- a/conn_stats.go +++ b/conn_stats.go @@ -7,16 +7,34 @@ import ( pp "github.com/anacrolix/torrent/peer_protocol" ) +// Various connection-level metrics. At the Torrent level these are +// aggregates. Chunks are messages with data payloads. Data is actual torrent +// content without any overhead. Useful is something we needed locally. +// Unwanted is something we didn't ask for (but may still be useful). Written +// is things sent to the peer, and Read is stuff received from them. type ConnStats struct { - // Torrent "piece" messages, or data chunks. - ChunksWritten int64 // Num piece messages sent. - ChunksRead int64 // Total bytes on the wire. Includes handshakes and encryption. BytesWritten int64 BytesRead int64 - // Data bytes, actual torrent data. - DataBytesWritten int64 - DataBytesRead int64 + + // The rest of the stats only occur on connections after handshakes. + + ChunksWritten int64 + + ChunksRead int64 + ChunksReadUseful int64 + ChunksReadUnwanted int64 + + DataBytesWritten int64 + DataBytesRead int64 + UsefulDataBytesRead int64 + + // Number of pieces data was written to, that subsequently passed verification. + GoodPiecesDirtied int64 + // Number of pieces data was written to, that subsequently failed + // verification. Note that a connection may not have been the sole dirtier + // of a piece. + BadPiecesDirtied int64 } func (cs *ConnStats) wroteMsg(msg *pp.Message) { diff --git a/connection.go b/connection.go index fbf86e56..b04c9c14 100644 --- a/connection.go +++ b/connection.go @@ -50,12 +50,7 @@ type connection struct { uTP bool closed missinggo.Event - stats ConnStats - UnwantedChunksReceived int - UsefulChunksReceived int - chunksSent int - goodPiecesDirtied int - badPiecesDirtied int + stats ConnStats lastMessageReceived time.Time completedHandshake time.Time @@ -213,9 +208,9 @@ func (cn *connection) WriteStatus(w io.Writer, t *Torrent) { " %s completed, %d pieces touched, good chunks: %d/%d-%d reqq: %d-%d, flags: %s\n", cn.completedString(), len(cn.peerTouchedPieces), - cn.UsefulChunksReceived, - cn.UnwantedChunksReceived+cn.UsefulChunksReceived, - cn.chunksSent, + cn.stats.ChunksReadUseful, + cn.stats.ChunksReadUnwanted+cn.stats.ChunksReadUseful, + cn.stats.ChunksWritten, cn.numLocalRequests(), len(cn.PeerRequests), cn.statusFlags(), @@ -1079,14 +1074,14 @@ func (c *connection) receiveChunk(msg *pp.Message) { // Do we actually want this chunk? if !t.wantPiece(req) { unwantedChunksReceived.Add(1) - c.UnwantedChunksReceived++ + c.stats.ChunksReadUnwanted++ return } index := int(req.Index) piece := &t.pieces[index] - c.UsefulChunksReceived++ + c.stats.ChunksReadUseful++ c.lastUsefulChunkReceived = time.Now() // if t.fastestConn != c { // log.Printf("setting fastest connection %p", c) @@ -1142,51 +1137,59 @@ func (c *connection) receiveChunk(msg *pp.Message) { t.publishPieceChange(int(req.Index)) } -// Also handles choking and unchoking of the remote peer. -func (c *connection) upload(msg func(pp.Message) bool) bool { - t := c.t - cl := t.cl - if cl.config.NoUpload { - return true +func (c *connection) uploadAllowed() bool { + if c.t.cl.config.NoUpload { + return false } - if !c.PeerInterested { + if c.t.seeding() { return true } - seeding := t.seeding() - if !seeding && !c.peerHasWantedPieces() { - // There's no reason to upload to this peer. - return true + if !c.peerHasWantedPieces() { + return false + } + // Don't upload more than 100 KiB more than we download. + if c.stats.DataBytesWritten >= c.stats.DataBytesRead+100<<10 { + return false } + return true +} + +func (c *connection) setRetryUploadTimer(delay time.Duration) { + if c.uploadTimer == nil { + c.uploadTimer = time.AfterFunc(delay, c.writerCond.Broadcast) + } else { + c.uploadTimer.Reset(delay) + } +} + +// Also handles choking and unchoking of the remote peer. +func (c *connection) upload(msg func(pp.Message) bool) bool { // Breaking or completing this loop means we don't want to upload to the // peer anymore, and we choke them. another: - for seeding || c.chunksSent < c.UsefulChunksReceived+6 { + for c.uploadAllowed() { // We want to upload to the peer. if !c.Unchoke(msg) { return false } for r := range c.PeerRequests { - res := cl.uploadLimit.ReserveN(time.Now(), int(r.Length)) + res := c.t.cl.uploadLimit.ReserveN(time.Now(), int(r.Length)) if !res.OK() { panic(fmt.Sprintf("upload rate limiter burst size < %d", r.Length)) } delay := res.Delay() if delay > 0 { res.Cancel() - if c.uploadTimer == nil { - c.uploadTimer = time.AfterFunc(delay, c.writerCond.Broadcast) - } else { - c.uploadTimer.Reset(delay) - } + c.setRetryUploadTimer(delay) // Hard to say what to return here. return true } more, err := c.sendChunk(r, msg) if err != nil { i := int(r.Index) - if t.pieceComplete(i) { - t.updatePieceCompletion(i) - if !t.pieceComplete(i) { + if c.t.pieceComplete(i) { + c.t.updatePieceCompletion(i) + if !c.t.pieceComplete(i) { // We had the piece, but not anymore. break another } @@ -1214,8 +1217,8 @@ func (cn *connection) Drop() { cn.t.dropConnection(cn) } -func (cn *connection) netGoodPiecesDirtied() int { - return cn.goodPiecesDirtied - cn.badPiecesDirtied +func (cn *connection) netGoodPiecesDirtied() int64 { + return cn.stats.GoodPiecesDirtied - cn.stats.BadPiecesDirtied } func (c *connection) peerHasWantedPieces() bool { @@ -1275,7 +1278,6 @@ func (c *connection) sendChunk(r request, msg func(pp.Message) bool) (more bool, Begin: r.Begin, Piece: b, }) - c.chunksSent++ uploadChunksPosted.Add(1) c.lastChunkSent = time.Now() return diff --git a/connection_test.go b/connection_test.go index d3c299b0..5eb19d43 100644 --- a/connection_test.go +++ b/connection_test.go @@ -130,7 +130,7 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) { } w.Close() require.NoError(b, <-mrlErr) - require.EqualValues(b, b.N, cn.UsefulChunksReceived) + require.EqualValues(b, b.N, cn.stats.ChunksReadUseful) } func TestConnectionReceiveBadChunkIndex(t *testing.T) { diff --git a/torrent.go b/torrent.go index 5bf85913..d5a45f4c 100644 --- a/torrent.go +++ b/torrent.go @@ -839,7 +839,7 @@ func (t *Torrent) worstBadConn() *connection { heap.Init(&wcs) for wcs.Len() != 0 { c := heap.Pop(&wcs).(*connection) - if c.UnwantedChunksReceived >= 6 && c.UnwantedChunksReceived > c.UsefulChunksReceived { + if c.stats.ChunksReadUnwanted >= 6 && c.stats.ChunksReadUnwanted > c.stats.ChunksReadUseful { return c } if wcs.Len() >= (t.maxEstablishedConns+1)/2 { @@ -1516,7 +1516,7 @@ func (t *Torrent) pieceHashed(piece int, correct bool) { p.everHashed = true if correct { for _, c := range touchers { - c.goodPiecesDirtied++ + c.stats.GoodPiecesDirtied++ } err := p.Storage().MarkComplete() if err != nil { @@ -1526,11 +1526,11 @@ func (t *Torrent) pieceHashed(piece int, correct bool) { if len(touchers) != 0 { for _, c := range touchers { // Y u do dis peer?! - c.badPiecesDirtied++ + c.stats.BadPiecesDirtied++ } slices.Sort(touchers, connLessTrusted) if t.cl.config.Debug { - log.Printf("dropping first corresponding conn from trust: %v", func() (ret []int) { + log.Printf("dropping first corresponding conn from trust: %v", func() (ret []int64) { for _, c := range touchers { ret = append(ret, c.netGoodPiecesDirtied()) } diff --git a/torrent_stats.go b/torrent_stats.go index f155a0d8..edc4ac8e 100644 --- a/torrent_stats.go +++ b/torrent_stats.go @@ -1,7 +1,9 @@ package torrent type TorrentStats struct { - ConnStats // Aggregates stats over all connections past and present. + // Aggregates stats over all connections past and present. Some values may + // not have much meaning in the aggregate context. + ConnStats // Ordered by expected descending quantities (if all is well). TotalPeers int