From: Matt Joiner Date: Thu, 8 Feb 2024 13:11:33 +0000 (+1100) Subject: Misc tidying X-Git-Tag: v1.54.0~2 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=8c3338f37851c634172b6ed1a02d7c06e09f69d9;p=btrtrc.git Misc tidying --- diff --git a/peer.go b/peer.go index 608ccb12..23bf5468 100644 --- a/peer.go +++ b/peer.go @@ -719,7 +719,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error { cl.event.Broadcast() // We do this because we've written a chunk, and may change PieceState.Partial. - t.publishPieceChange(pieceIndex(ppReq.Index)) + t.publishPieceStateChange(pieceIndex(ppReq.Index)) return nil } diff --git a/peerconn.go b/peerconn.go index ee47dd1b..00fffc93 100644 --- a/peerconn.go +++ b/peerconn.go @@ -447,7 +447,7 @@ func (cn *PeerConn) peerSentBitfield(bf []bool) error { return nil } -func (cn *PeerConn) onPeerHasAllPieces() { +func (cn *PeerConn) onPeerHasAllPiecesNoTriggers() { t := cn.t if t.haveInfo() { cn._peerPieces.Iterate(func(x uint32) bool { @@ -458,6 +458,14 @@ func (cn *PeerConn) onPeerHasAllPieces() { t.addConnWithAllPieces(&cn.Peer) cn.peerSentHaveAll = true cn._peerPieces.Clear() +} + +func (cn *PeerConn) onPeerHasAllPieces() { + cn.onPeerHasAllPiecesNoTriggers() + cn.peerHasAllPiecesTriggers() +} + +func (cn *PeerConn) peerHasAllPiecesTriggers() { if !cn.t._pendingPieces.IsEmpty() { cn.updateRequests("Peer.onPeerHasAllPieces") } @@ -1044,7 +1052,7 @@ func (c *PeerConn) sendChunk(r Request, msg func(pp.Message) bool, state *peerRe }) } -func (c *PeerConn) setTorrent(t *Torrent) { +func (c *Peer) setTorrent(t *Torrent) { if c.t != nil { panic("connection already associated with a torrent") } diff --git a/peerconn_test.go b/peerconn_test.go index e294b6b6..3fdcbff1 100644 --- a/peerconn_test.go +++ b/peerconn_test.go @@ -25,11 +25,11 @@ func TestSendBitfieldThenHave(t *testing.T) { var cl Client cl.init(TestingConfig(t)) cl.initLogger() + qtc := qt.New(t) c := cl.newConnection(nil, newConnectionOpts{network: "io.Pipe"}) c.setTorrent(cl.newTorrent(metainfo.Hash{}, nil)) - if err := c.t.setInfo(&metainfo.Info{Pieces: make([]byte, metainfo.HashSize*3)}); err != nil { - t.Log(err) - } + err := c.t.setInfo(&metainfo.Info{Pieces: make([]byte, metainfo.HashSize*3)}) + qtc.Assert(err, qt.IsNil) r, w := io.Pipe() // c.r = r c.w = w diff --git a/request-strategy-impls.go b/request-strategy-impls.go index 5d91e64a..ae505450 100644 --- a/request-strategy-impls.go +++ b/request-strategy-impls.go @@ -1,6 +1,7 @@ package torrent import ( + g "github.com/anacrolix/generics" "github.com/anacrolix/torrent/metainfo" request_strategy "github.com/anacrolix/torrent/request-strategy" "github.com/anacrolix/torrent/storage" @@ -12,7 +13,7 @@ type requestStrategyInput struct { } func (r requestStrategyInput) Torrent(ih metainfo.Hash) request_strategy.Torrent { - return requestStrategyTorrent{r.cl.torrents[ih]} + return requestStrategyTorrent{g.MapMustGet(r.cl.torrents, ih)} } func (r requestStrategyInput) Capacity() (int64, bool) { diff --git a/request-strategy/order.go b/request-strategy/order.go index 92793f58..e5c4792f 100644 --- a/request-strategy/order.go +++ b/request-strategy/order.go @@ -2,7 +2,6 @@ package requestStrategy import ( "bytes" - "expvar" g "github.com/anacrolix/generics" "github.com/anacrolix/multiless" @@ -41,8 +40,6 @@ func pieceOrderLess(i, j *pieceRequestOrderItem) multiless.Computation { }) } -var packageExpvarMap = expvar.NewMap("request-strategy") - // Calls f with requestable pieces in order. func GetRequestablePieces( input Input, pro *PieceRequestOrder, diff --git a/torrent.go b/torrent.go index 0b1baba5..8ea2acde 100644 --- a/torrent.go +++ b/torrent.go @@ -1166,7 +1166,7 @@ type PieceStateChange struct { PieceState } -func (t *Torrent) publishPieceChange(piece pieceIndex) { +func (t *Torrent) publishPieceStateChange(piece pieceIndex) { t.cl._mu.Defer(func() { cur := t.pieceState(piece) p := &t.pieces[piece] @@ -1230,7 +1230,7 @@ func (t *Torrent) maybeNewConns() { t.openNewConns() } -func (t *Torrent) piecePriorityChanged(piece pieceIndex, reason string) { +func (t *Torrent) onPiecePendingTriggers(piece pieceIndex, reason string) { if t._pendingPieces.Contains(uint32(piece)) { t.iterPeers(func(c *Peer) { // if c.requestState.Interested { @@ -1249,10 +1249,10 @@ func (t *Torrent) piecePriorityChanged(piece pieceIndex, reason string) { }) } t.maybeNewConns() - t.publishPieceChange(piece) + t.publishPieceStateChange(piece) } -func (t *Torrent) updatePiecePriority(piece pieceIndex, reason string) { +func (t *Torrent) updatePiecePriorityNoTriggers(piece pieceIndex) (pendingChanged bool) { if !t.closed.IsSet() { // It would be possible to filter on pure-priority changes here to avoid churning the piece // request order. @@ -1262,15 +1262,16 @@ func (t *Torrent) updatePiecePriority(piece pieceIndex, reason string) { newPrio := p.uncachedPriority() // t.logger.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio) if newPrio == PiecePriorityNone { - if !t._pendingPieces.CheckedRemove(uint32(piece)) { - return - } + return t._pendingPieces.CheckedRemove(uint32(piece)) } else { - if !t._pendingPieces.CheckedAdd(uint32(piece)) { - return - } + return t._pendingPieces.CheckedAdd(uint32(piece)) + } +} + +func (t *Torrent) updatePiecePriority(piece pieceIndex, reason string) { + if t.updatePiecePriorityNoTriggers(piece) { + t.onPiecePendingTriggers(piece, reason) } - t.piecePriorityChanged(piece, reason) } func (t *Torrent) updateAllPiecePriorities(reason string) { @@ -1420,7 +1421,7 @@ func (t *Torrent) updatePieceCompletion(piece pieceIndex) bool { t.logger.Printf("marked piece %v complete but still has dirtiers", piece) } if changed { - log.Fstr("piece %d completion changed: %+v -> %+v", piece, cached, uncached).LogLevel(log.Debug, t.logger) + t.logger.Levelf(log.Debug, "piece %d completion changed: %+v -> %+v", piece, cached, uncached) t.pieceCompletionChanged(piece, "Torrent.updatePieceCompletion") } return changed @@ -1957,7 +1958,7 @@ func (t *Torrent) numTotalPeers() int { // Reconcile bytes transferred before connection was associated with a // torrent. -func (t *Torrent) reconcileHandshakeStats(c *PeerConn) { +func (t *Torrent) reconcileHandshakeStats(c *Peer) { if c._stats != (ConnStats{ // Handshakes should only increment these fields: BytesWritten: c._stats.BytesWritten, @@ -2123,10 +2124,10 @@ func (t *Torrent) pieceHashed(piece pieceIndex, passed bool, hashIoErr error) { } p.marking = true - t.publishPieceChange(piece) + t.publishPieceStateChange(piece) defer func() { p.marking = false - t.publishPieceChange(piece) + t.publishPieceStateChange(piece) }() if passed { @@ -2268,7 +2269,7 @@ func (t *Torrent) tryCreatePieceHasher() bool { p := t.piece(pi) t.piecesQueuedForHash.Remove(bitmap.BitIndex(pi)) p.hashing = true - t.publishPieceChange(pi) + t.publishPieceStateChange(pi) t.updatePiecePriority(pi, "Torrent.tryCreatePieceHasher") t.storageLock.RLock() t.activePieceHashes++ @@ -2362,7 +2363,7 @@ func (t *Torrent) queuePieceCheck(pieceIndex pieceIndex) { return } t.piecesQueuedForHash.Add(bitmap.BitIndex(pieceIndex)) - t.publishPieceChange(pieceIndex) + t.publishPieceStateChange(pieceIndex) t.updatePiecePriority(pieceIndex, "Torrent.queuePieceCheck") t.tryCreateMorePieceHashers() }