X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=peerconn.go;h=62f632d5f7f364546d68aceff9b76dbb8aa8de85;hb=4fdfcf8da2bc5b515d421de56fd0c180d135e897;hp=6615d79c5a4573bab13a3561b5be4d95463fe170;hpb=361300cc93afca759ac8a420a8a9402370ec8ab3;p=btrtrc.git diff --git a/peerconn.go b/peerconn.go index 6615d79c..62f632d5 100644 --- a/peerconn.go +++ b/peerconn.go @@ -3,131 +3,35 @@ package torrent import ( "bufio" "bytes" + "context" "errors" "fmt" "io" "math/rand" "net" - "sort" + "net/netip" "strconv" "strings" "sync/atomic" "time" "github.com/RoaringBitmap/roaring" + "github.com/anacrolix/generics" + . "github.com/anacrolix/generics" "github.com/anacrolix/log" - "github.com/anacrolix/missinggo/iter" "github.com/anacrolix/missinggo/v2/bitmap" "github.com/anacrolix/multiless" + "golang.org/x/exp/maps" + "golang.org/x/time/rate" - "github.com/anacrolix/chansync" "github.com/anacrolix/torrent/bencode" + "github.com/anacrolix/torrent/internal/alloclim" "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/mse" pp "github.com/anacrolix/torrent/peer_protocol" - request_strategy "github.com/anacrolix/torrent/request-strategy" + utHolepunch "github.com/anacrolix/torrent/peer_protocol/ut-holepunch" ) -type PeerSource string - -const ( - PeerSourceTracker = "Tr" - PeerSourceIncoming = "I" - PeerSourceDhtGetPeers = "Hg" // Peers we found by searching a DHT. - PeerSourceDhtAnnouncePeer = "Ha" // Peers that were announced to us by a DHT. - PeerSourcePex = "X" - // The peer was given directly, such as through a magnet link. - PeerSourceDirect = "M" -) - -type peerRequestState struct { - data []byte -} - -type PeerRemoteAddr interface { - String() string -} - -// Since we have to store all the requests in memory, we can't reasonably exceed what would be -// indexable with the memory space available. -type ( - maxRequests = int - requestState = request_strategy.PeerRequestState -) - -type Peer struct { - // First to ensure 64-bit alignment for atomics. See #262. - _stats ConnStats - - t *Torrent - - peerImpl - callbacks *Callbacks - - outgoing bool - Network string - RemoteAddr PeerRemoteAddr - // True if the connection is operating over MSE obfuscation. - headerEncrypted bool - cryptoMethod mse.CryptoMethod - Discovery PeerSource - trusted bool - closed chansync.SetOnce - // Set true after we've added our ConnStats generated during handshake to - // other ConnStat instances as determined when the *Torrent became known. - reconciledHandshakeStats bool - - lastMessageReceived time.Time - completedHandshake time.Time - lastUsefulChunkReceived time.Time - lastChunkSent time.Time - - // Stuff controlled by the local peer. - needRequestUpdate string - requestState requestState - updateRequestsTimer *time.Timer - lastRequestUpdate time.Time - peakRequests maxRequests - lastBecameInterested time.Time - priorInterest time.Duration - - lastStartedExpectingToReceiveChunks time.Time - cumulativeExpectedToReceiveChunks time.Duration - _chunksReceivedWhileExpecting int64 - - choking bool - piecesReceivedSinceLastRequestUpdate maxRequests - maxPiecesReceivedBetweenRequestUpdates maxRequests - // Chunks that we might reasonably expect to receive from the peer. Due to latency, buffering, - // and implementation differences, we may receive chunks that are no longer in the set of - // requests actually want. This could use a roaring.BSI if the memory use becomes noticeable. - validReceiveChunks map[RequestIndex]int - // Indexed by metadata piece, set to true if posted and pending a - // response. - metadataRequests []bool - sentHaves bitmap.Bitmap - - // Stuff controlled by the remote peer. - peerInterested bool - peerChoking bool - peerRequests map[Request]*peerRequestState - PeerPrefersEncryption bool // as indicated by 'e' field in extension handshake - PeerListenPort int - // The highest possible number of pieces the torrent could have based on - // communication with the peer. Generally only useful until we have the - // torrent info. - peerMinPieces pieceIndex - // Pieces we've accepted chunks for from the peer. - peerTouchedPieces map[pieceIndex]struct{} - peerAllowedFast roaring.Bitmap - - PeerMaxRequests maxRequests // Maximum pending requests the peer allows. - PeerExtensionIDs map[pp.ExtensionName]pp.ExtensionNumber - PeerClientName atomic.Value - - logger log.Logger -} - // Maintains the state of a BitTorrent-protocol based connection with a peer. type PeerConn struct { Peer @@ -139,6 +43,7 @@ type PeerConn struct { // See BEP 3 etc. PeerID PeerID PeerExtensionBytes pp.PeerExtensionBits + PeerListenPort int // The actual Conn, used for closing, and setting socket options. Do not use methods on this // while holding any mutexes. @@ -150,57 +55,60 @@ type PeerConn struct { messageWriter peerConnMsgWriter - uploadTimer *time.Timer - pex pexConnState + PeerExtensionIDs map[pp.ExtensionName]pp.ExtensionNumber + PeerClientName atomic.Value + uploadTimer *time.Timer + pex pexConnState // The pieces the peer has claimed to have. _peerPieces roaring.Bitmap // The peer has everything. This can occur due to a special message, when // we may not even know the number of pieces in the torrent yet. peerSentHaveAll bool -} -func (cn *PeerConn) connStatusString() string { - return fmt.Sprintf("%+-55q %s %s", cn.PeerID, cn.PeerExtensionBytes, cn.connString) -} + peerRequestDataAllocLimiter alloclim.Limiter -func (cn *Peer) updateExpectingChunks() { - if cn.expectingChunks() { - if cn.lastStartedExpectingToReceiveChunks.IsZero() { - cn.lastStartedExpectingToReceiveChunks = time.Now() - } - } else { - if !cn.lastStartedExpectingToReceiveChunks.IsZero() { - cn.cumulativeExpectedToReceiveChunks += time.Since(cn.lastStartedExpectingToReceiveChunks) - cn.lastStartedExpectingToReceiveChunks = time.Time{} - } - } + outstandingHolepunchingRendezvous map[netip.AddrPort]struct{} } -func (cn *Peer) expectingChunks() bool { - if cn.requestState.Requests.IsEmpty() { - return false +func (cn *PeerConn) pexStatus() string { + if !cn.bitExtensionEnabled(pp.ExtensionBitLtep) { + return "extended protocol disabled" } - if !cn.requestState.Interested { - return false + if cn.PeerExtensionIDs == nil { + return "pending extended handshake" } - if !cn.peerChoking { - return true + if !cn.supportsExtension(pp.ExtensionNamePex) { + return "unsupported" + } + if true { + return fmt.Sprintf( + "%v conns, %v unsent events", + len(cn.pex.remoteLiveConns), + cn.pex.numPending(), + ) + } else { + // This alternative branch prints out the remote live conn addresses. + return fmt.Sprintf( + "%v conns, %v unsent events", + strings.Join(generics.SliceMap( + maps.Keys(cn.pex.remoteLiveConns), + func(from netip.AddrPort) string { + return from.String() + }), ","), + cn.pex.numPending(), + ) } - haveAllowedFastRequests := false - cn.peerAllowedFast.Iterate(func(i uint32) bool { - haveAllowedFastRequests = roaringBitmapRangeCardinality( - &cn.requestState.Requests, - cn.t.pieceRequestIndexOffset(pieceIndex(i)), - cn.t.pieceRequestIndexOffset(pieceIndex(i+1)), - ) == 0 - return !haveAllowedFastRequests - }) - return haveAllowedFastRequests } -func (cn *Peer) remoteChokingPiece(piece pieceIndex) bool { - return cn.peerChoking && !cn.peerAllowedFast.Contains(bitmap.BitIndex(piece)) +func (cn *PeerConn) peerImplStatusLines() []string { + return []string{ + cn.connString, + fmt.Sprintf("peer id: %+q", cn.PeerID), + fmt.Sprintf("extensions: %v", cn.PeerExtensionBytes), + fmt.Sprintf("ltep extensions: %v", cn.PeerExtensionIDs), + fmt.Sprintf("pex: %s", cn.pexStatus()), + } } // Returns true if the connection is over IPv6. @@ -212,10 +120,12 @@ func (cn *PeerConn) ipv6() bool { return len(ip) == net.IPv6len } -// Returns true the if the dialer/initiator has the lower client peer ID. TODO: Find the -// specification for this. +// Returns true the if the dialer/initiator has the higher client peer ID. See +// https://github.com/arvidn/libtorrent/blame/272828e1cc37b042dfbbafa539222d8533e99755/src/bt_peer_connection.cpp#L3536-L3557. +// As far as I can tell, Transmission just keeps the oldest connection. func (cn *PeerConn) isPreferredDirection() bool { - return bytes.Compare(cn.t.cl.peerID[:], cn.PeerID[:]) < 0 == cn.outgoing + // True if our client peer ID is higher than the remote's peer ID. + return bytes.Compare(cn.PeerID[:], cn.t.cl.peerID[:]) < 0 == cn.outgoing } // Returns whether the left connection should be preferred over the right one, @@ -229,14 +139,6 @@ func (l *PeerConn) hasPreferredNetworkOver(r *PeerConn) bool { return ml.Less() } -func (cn *Peer) cumInterest() time.Duration { - ret := cn.priorInterest - if cn.requestState.Interested { - ret += time.Since(cn.lastBecameInterested) - } - return ret -} - func (cn *PeerConn) peerHasAllPieces() (all, known bool) { if cn.peerSentHaveAll { return true, true @@ -247,31 +149,6 @@ func (cn *PeerConn) peerHasAllPieces() (all, known bool) { return cn._peerPieces.GetCardinality() == uint64(cn.t.numPieces()), true } -func (cn *Peer) locker() *lockWithDeferreds { - return cn.t.cl.locker() -} - -func (cn *Peer) supportsExtension(ext pp.ExtensionName) bool { - _, ok := cn.PeerExtensionIDs[ext] - return ok -} - -// The best guess at number of pieces in the torrent for this peer. -func (cn *Peer) bestPeerNumPieces() pieceIndex { - if cn.t.haveInfo() { - return cn.t.numPieces() - } - return cn.peerMinPieces -} - -func (cn *Peer) completedString() string { - have := pieceIndex(cn.peerPieces().GetCardinality()) - if all, _ := cn.peerHasAllPieces(); all { - have = cn.bestPeerNumPieces() - } - return fmt.Sprintf("%d/%d", have, cn.bestPeerNumPieces()) -} - func (cn *PeerConn) onGotInfo(info *metainfo.Info) { cn.setNumPieces(info.NumPieces()) } @@ -287,13 +164,6 @@ func (cn *PeerConn) peerPieces() *roaring.Bitmap { return &cn._peerPieces } -func eventAgeString(t time.Time) string { - if t.IsZero() { - return "never" - } - return fmt.Sprintf("%.2fs ago", time.Since(t).Seconds()) -} - func (cn *PeerConn) connectionFlags() (ret string) { c := func(b byte) { ret += string([]byte{b}) @@ -314,115 +184,6 @@ func (cn *PeerConn) utp() bool { return parseNetworkString(cn.Network).Udp } -// Inspired by https://github.com/transmission/transmission/wiki/Peer-Status-Text. -func (cn *Peer) statusFlags() (ret string) { - c := func(b byte) { - ret += string([]byte{b}) - } - if cn.requestState.Interested { - c('i') - } - if cn.choking { - c('c') - } - c('-') - ret += cn.connectionFlags() - c('-') - if cn.peerInterested { - c('i') - } - if cn.peerChoking { - c('c') - } - return -} - -func (cn *Peer) downloadRate() float64 { - num := cn._stats.BytesReadUsefulData.Int64() - if num == 0 { - return 0 - } - return float64(num) / cn.totalExpectingTime().Seconds() -} - -func (cn *Peer) numRequestsByPiece() (ret map[pieceIndex]int) { - ret = make(map[pieceIndex]int) - cn.requestState.Requests.Iterate(func(x uint32) bool { - ret[pieceIndex(x/cn.t.chunksPerRegularPiece())]++ - return true - }) - return -} - -func (cn *Peer) writeStatus(w io.Writer, t *Torrent) { - // \t isn't preserved in
blocks? - if cn.closed.IsSet() { - fmt.Fprint(w, "CLOSED: ") - } - fmt.Fprintln(w, cn.connStatusString()) - prio, err := cn.peerPriority() - prioStr := fmt.Sprintf("%08x", prio) - if err != nil { - prioStr += ": " + err.Error() - } - fmt.Fprintf(w, " bep40-prio: %v\n", prioStr) - fmt.Fprintf(w, " last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n", - eventAgeString(cn.lastMessageReceived), - eventAgeString(cn.completedHandshake), - eventAgeString(cn.lastHelpful()), - cn.cumInterest(), - cn.totalExpectingTime(), - ) - fmt.Fprintf(w, - " %s completed, %d pieces touched, good chunks: %v/%v:%v reqq: %d+%v/(%d/%d):%d/%d, flags: %s, dr: %.1f KiB/s\n", - cn.completedString(), - len(cn.peerTouchedPieces), - &cn._stats.ChunksReadUseful, - &cn._stats.ChunksRead, - &cn._stats.ChunksWritten, - cn.requestState.Requests.GetCardinality(), - cn.requestState.Cancelled.GetCardinality(), - cn.nominalMaxRequests(), - cn.PeerMaxRequests, - len(cn.peerRequests), - localClientReqq, - cn.statusFlags(), - cn.downloadRate()/(1<<10), - ) - fmt.Fprintf(w, " requested pieces:") - type pieceNumRequestsType struct { - piece pieceIndex - numRequests int - } - var pieceNumRequests []pieceNumRequestsType - for piece, count := range cn.numRequestsByPiece() { - pieceNumRequests = append(pieceNumRequests, pieceNumRequestsType{piece, count}) - } - sort.Slice(pieceNumRequests, func(i, j int) bool { - return pieceNumRequests[i].piece < pieceNumRequests[j].piece - }) - for _, elem := range pieceNumRequests { - fmt.Fprintf(w, " %v(%v)", elem.piece, elem.numRequests) - } - fmt.Fprintf(w, "\n") -} - -func (p *Peer) close() { - if !p.closed.Set() { - return - } - if p.updateRequestsTimer != nil { - p.updateRequestsTimer.Stop() - } - p.peerImpl.onClose() - if p.t != nil { - p.t.decPeerPieceAvailability(p) - } - for _, f := range p.callbacks.PeerClosed { - f(p) - } -} - func (cn *PeerConn) onClose() { if cn.pex.IsEnabled() { cn.pex.Close() @@ -436,22 +197,6 @@ func (cn *PeerConn) onClose() { } } -// Peer definitely has a piece, for purposes of requesting. So it's not sufficient that we think -// they do (known=true). -func (cn *Peer) peerHasPiece(piece pieceIndex) bool { - if all, known := cn.peerHasAllPieces(); all && known { - return true - } - return cn.peerPieces().ContainsInt(piece) -} - -// 64KiB, but temporarily less to work around an issue with WebRTC. TODO: Update when -// https://github.com/pion/datachannel/issues/59 is fixed. -const ( - writeBufferHighWaterLen = 1 << 15 - writeBufferLowWaterLen = writeBufferHighWaterLen / 2 -) - // Writes a message into the write buffer. Returns whether it's okay to keep writing. Writing is // done asynchronously, so it may be that we're not able to honour backpressure from this method. func (cn *PeerConn) write(msg pp.Message) bool { @@ -486,27 +231,6 @@ func (cn *PeerConn) requestedMetadataPiece(index int) bool { return index < len(cn.metadataRequests) && cn.metadataRequests[index] } -var ( - interestedMsgLen = len(pp.Message{Type: pp.Interested}.MustMarshalBinary()) - requestMsgLen = len(pp.Message{Type: pp.Request}.MustMarshalBinary()) - // This is the maximum request count that could fit in the write buffer if it's at or below the - // low water mark when we run maybeUpdateActualRequestState. - maxLocalToRemoteRequests = (writeBufferHighWaterLen - writeBufferLowWaterLen - interestedMsgLen) / requestMsgLen -) - -// The actual value to use as the maximum outbound requests. -func (cn *Peer) nominalMaxRequests() maxRequests { - return maxRequests(maxInt(1, minInt(cn.PeerMaxRequests, cn.peakRequests*2, maxLocalToRemoteRequests))) -} - -func (cn *Peer) totalExpectingTime() (ret time.Duration) { - ret = cn.cumulativeExpectedToReceiveChunks - if !cn.lastStartedExpectingToReceiveChunks.IsZero() { - ret += time.Since(cn.lastStartedExpectingToReceiveChunks) - } - return -} - func (cn *PeerConn) onPeerSentCancel(r Request) { if _, ok := cn.peerRequests[r]; !ok { torrent.Add("unexpected cancels received", 1) @@ -528,11 +252,18 @@ func (cn *PeerConn) choke(msg messageWriter) (more bool) { Type: pp.Choke, }) if !cn.fastEnabled() { - cn.peerRequests = nil + cn.deleteAllPeerRequests() } return } +func (cn *PeerConn) deleteAllPeerRequests() { + for _, state := range cn.peerRequests { + state.allocReservation.Drop() + } + cn.peerRequests = nil +} + func (cn *PeerConn) unchoke(msg func(pp.Message) bool) bool { if !cn.choking { return true @@ -543,21 +274,6 @@ func (cn *PeerConn) unchoke(msg func(pp.Message) bool) bool { }) } -func (cn *Peer) setInterested(interested bool) bool { - if cn.requestState.Interested == interested { - return true - } - cn.requestState.Interested = interested - if interested { - cn.lastBecameInterested = time.Now() - } else if !cn.lastBecameInterested.IsZero() { - cn.priorInterest += time.Since(cn.lastBecameInterested) - } - cn.updateExpectingChunks() - // log.Printf("%p: setting interest: %v", cn, interested) - return cn.writeInterested(interested) -} - func (pc *PeerConn) writeInterested(interested bool) bool { return pc.write(pp.Message{ Type: func() pp.MessageType { @@ -570,75 +286,6 @@ func (pc *PeerConn) writeInterested(interested bool) bool { }) } -// The function takes a message to be sent, and returns true if more messages -// are okay. -type messageWriter func(pp.Message) bool - -// This function seems to only used by Peer.request. It's all logic checks, so maybe we can no-op it -// when we want to go fast. -func (cn *Peer) shouldRequest(r RequestIndex) error { - pi := pieceIndex(r / cn.t.chunksPerRegularPiece()) - if cn.requestState.Cancelled.Contains(r) { - return errors.New("request is cancelled and waiting acknowledgement") - } - if !cn.peerHasPiece(pi) { - return errors.New("requesting piece peer doesn't have") - } - if !cn.t.peerIsActive(cn) { - panic("requesting but not in active conns") - } - if cn.closed.IsSet() { - panic("requesting when connection is closed") - } - if cn.t.hashingPiece(pi) { - panic("piece is being hashed") - } - if cn.t.pieceQueuedForHash(pi) { - panic("piece is queued for hash") - } - if cn.peerChoking && !cn.peerAllowedFast.Contains(bitmap.BitIndex(pi)) { - // This could occur if we made a request with the fast extension, and then got choked and - // haven't had the request rejected yet. - if !cn.requestState.Requests.Contains(r) { - panic("peer choking and piece not allowed fast") - } - } - return nil -} - -func (cn *Peer) mustRequest(r RequestIndex) bool { - more, err := cn.request(r) - if err != nil { - panic(err) - } - return more -} - -func (cn *Peer) request(r RequestIndex) (more bool, err error) { - if err := cn.shouldRequest(r); err != nil { - panic(err) - } - if cn.requestState.Requests.Contains(r) { - return true, nil - } - if maxRequests(cn.requestState.Requests.GetCardinality()) >= cn.nominalMaxRequests() { - return true, errors.New("too many outstanding requests") - } - cn.requestState.Requests.Add(r) - if cn.validReceiveChunks == nil { - cn.validReceiveChunks = make(map[RequestIndex]int) - } - cn.validReceiveChunks[r]++ - cn.t.pendingRequests[r] = cn - cn.t.lastRequested[r] = time.Now() - cn.updateExpectingChunks() - ppReq := cn.t.requestIndexToRequest(r) - for _, f := range cn.callbacks.SentRequest { - f(PeerRequestEvent{cn, ppReq}) - } - return cn.peerImpl._request(ppReq), nil -} - func (me *PeerConn) _request(r Request) bool { return me.write(pp.Message{ Type: pp.Request, @@ -648,21 +295,6 @@ func (me *PeerConn) _request(r Request) bool { }) } -func (me *Peer) cancel(r RequestIndex) { - if !me.deleteRequest(r) { - panic("request not existing should have been guarded") - } - if me._cancel(r) { - if !me.requestState.Cancelled.CheckedAdd(r) { - panic("request already cancelled") - } - } - me.decPeakRequests() - if me.isLowOnRequests() { - me.updateRequests("Peer.cancel") - } -} - func (me *PeerConn) _cancel(r RequestIndex) bool { me.write(makeCancelMessage(me.t.requestIndexToRequest(r))) // Transmission does not send rejects for received cancels. See @@ -713,50 +345,11 @@ func (cn *PeerConn) postBitfield() { cn.sentHaves = bitmap.Bitmap{cn.t._completedPieces.Clone()} } -// Sets a reason to update requests, and if there wasn't already one, handle it. -func (cn *Peer) updateRequests(reason string) { - if cn.needRequestUpdate != "" { - return - } - if reason != peerUpdateRequestsTimerReason && !cn.isLowOnRequests() { - return - } - cn.needRequestUpdate = reason - cn.handleUpdateRequests() -} - func (cn *PeerConn) handleUpdateRequests() { // The writer determines the request state as needed when it can write. cn.tickleWriter() } -// Emits the indices in the Bitmaps bms in order, never repeating any index. -// skip is mutated during execution, and its initial values will never be -// emitted. -func iterBitmapsDistinct(skip *bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func { - return func(cb iter.Callback) { - for _, bm := range bms { - if !iter.All( - func(_i interface{}) bool { - i := _i.(int) - if skip.Contains(bitmap.BitIndex(i)) { - return true - } - skip.Add(bitmap.BitIndex(i)) - return cb(i) - }, - bm.Iter, - ) { - return - } - } - } -} - -func (cn *Peer) peerPiecesChanged() { - cn.t.maybeDropMutuallyCompletePeer(cn) -} - func (cn *PeerConn) raisePeerMinPieces(newMin pieceIndex) { if newMin > cn.peerMinPieces { cn.peerMinPieces = newMin @@ -862,7 +455,7 @@ func (cn *PeerConn) onPeerSentHaveAll() error { } func (cn *PeerConn) peerSentHaveNone() error { - if cn.peerSentHaveAll { + if !cn.peerSentHaveAll { cn.t.decPeerPieceAvailability(&cn.Peer) } cn._peerPieces.Clear() @@ -905,59 +498,10 @@ func (cn *PeerConn) wroteMsg(msg *pp.Message) { cn.allStats(func(cs *ConnStats) { cs.wroteMsg(msg) }) } -// After handshake, we know what Torrent and Client stats to include for a -// connection. -func (cn *Peer) postHandshakeStats(f func(*ConnStats)) { - t := cn.t - f(&t.stats) - f(&t.cl.stats) -} - -// All ConnStats that include this connection. Some objects are not known -// until the handshake is complete, after which it's expected to reconcile the -// differences. -func (cn *Peer) allStats(f func(*ConnStats)) { - f(&cn._stats) - if cn.reconciledHandshakeStats { - cn.postHandshakeStats(f) - } -} - func (cn *PeerConn) wroteBytes(n int64) { cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesWritten })) } -func (cn *Peer) readBytes(n int64) { - cn.allStats(add(n, func(cs *ConnStats) *Count { return &cs.BytesRead })) -} - -// Returns whether the connection could be useful to us. We're seeding and -// they want data, we don't have metainfo and they can provide it, etc. -func (c *Peer) useful() bool { - t := c.t - if c.closed.IsSet() { - return false - } - if !t.haveInfo() { - return c.supportsExtension("ut_metadata") - } - if t.seeding() && c.peerInterested { - return true - } - if c.peerHasWantedPieces() { - return true - } - return false -} - -func (c *Peer) lastHelpful() (ret time.Time) { - ret = c.lastUsefulChunkReceived - if c.t.seeding() && c.lastChunkSent.After(ret) { - ret = c.lastChunkSent - } - return -} - func (c *PeerConn) fastEnabled() bool { return c.PeerExtensionBytes.SupportsFast() && c.t.cl.config.Extensions.SupportsFast() } @@ -967,13 +511,29 @@ func (c *PeerConn) reject(r Request) { panic("fast not enabled") } c.write(r.ToMsg(pp.Reject)) - delete(c.peerRequests, r) + // It is possible to reject a request before it is added to peer requests due to being invalid. + if state, ok := c.peerRequests[r]; ok { + state.allocReservation.Drop() + delete(c.peerRequests, r) + } +} + +func (c *PeerConn) maximumPeerRequestChunkLength() (_ Option[int]) { + uploadRateLimiter := c.t.cl.config.UploadRateLimiter + if uploadRateLimiter.Limit() == rate.Inf { + return + } + return Some(uploadRateLimiter.Burst()) } -func (c *PeerConn) onReadRequest(r Request) error { +// startFetch is for testing purposes currently. +func (c *PeerConn) onReadRequest(r Request, startFetch bool) error { requestedChunkLengths.Add(strconv.FormatUint(r.Length.Uint64(), 10), 1) if _, ok := c.peerRequests[r]; ok { torrent.Add("duplicate requests received", 1) + if c.fastEnabled() { + return errors.New("received duplicate request with fast enabled") + } return nil } if c.choking { @@ -993,29 +553,52 @@ func (c *PeerConn) onReadRequest(r Request) error { // BEP 6 says we may close here if we choose. return nil } + if opt := c.maximumPeerRequestChunkLength(); opt.Ok && int(r.Length) > opt.Value { + err := fmt.Errorf("peer requested chunk too long (%v)", r.Length) + c.logger.Levelf(log.Warning, err.Error()) + if c.fastEnabled() { + c.reject(r) + return nil + } else { + return err + } + } if !c.t.havePiece(pieceIndex(r.Index)) { - // This isn't necessarily them screwing up. We can drop pieces - // from our storage, and can't communicate this to peers - // except by reconnecting. + // TODO: Tell the peer we don't have the piece, and reject this request. requestsReceivedForMissingPieces.Add(1) return fmt.Errorf("peer requested piece we don't have: %v", r.Index.Int()) } + pieceLength := c.t.pieceLength(pieceIndex(r.Index)) // Check this after we know we have the piece, so that the piece length will be known. - if r.Begin+r.Length > c.t.pieceLength(pieceIndex(r.Index)) { + if chunkOverflowsPiece(r.ChunkSpec, pieceLength) { torrent.Add("bad requests received", 1) - return errors.New("bad Request") + return errors.New("chunk overflows piece") } if c.peerRequests == nil { c.peerRequests = make(map[Request]*peerRequestState, localClientReqq) } - value := &peerRequestState{} + value := &peerRequestState{ + allocReservation: c.peerRequestDataAllocLimiter.Reserve(int64(r.Length)), + } c.peerRequests[r] = value - go c.peerRequestDataReader(r, value) + if startFetch { + // TODO: Limit peer request data read concurrency. + go c.peerRequestDataReader(r, value) + } return nil } func (c *PeerConn) peerRequestDataReader(r Request, prs *peerRequestState) { - b, err := readPeerRequestData(r, c) + // Should we depend on Torrent closure here? I think it's okay to get cancelled from elsewhere, + // or fail to read and then cleanup. Also, we used to hang here if the reservation was never + // dropped, that was fixed. + ctx := context.Background() + err := prs.allocReservation.Wait(ctx) + if err != nil { + c.logger.WithDefaultLevel(log.Debug).Levelf(log.ErrorLevel(err), "waiting for alloc limit reservation: %v", err) + return + } + b, err := c.readPeerRequestData(r) c.locker().Lock() defer c.locker().Unlock() if err != nil { @@ -1026,6 +609,7 @@ func (c *PeerConn) peerRequestDataReader(r Request, prs *peerRequestState) { } torrent.Add("peer request data read successes", 1) prs.data = b + // This might be required for the error case too (#752 and #753). c.tickleWriter() } } @@ -1040,7 +624,7 @@ func (c *PeerConn) peerRequestDataReadFailed(err error, r Request) { // https://github.com/anacrolix/torrent/issues/702#issuecomment-1000953313. logLevel = log.Debug } - c.logger.WithDefaultLevel(logLevel).Printf("error reading chunk for peer Request %v: %v", r, err) + c.logger.Levelf(logLevel, "error reading chunk for peer Request %v: %v", r, err) if c.t.closed.IsSet() { return } @@ -1071,7 +655,7 @@ func (c *PeerConn) peerRequestDataReadFailed(err error, r Request) { } } -func readPeerRequestData(r Request, c *PeerConn) ([]byte, error) { +func (c *PeerConn) readPeerRequestData(r Request) ([]byte, error) { b := make([]byte, r.Length) p := c.t.info.Piece(int(r.Index)) n, err := c.t.readAt(b, p.Offset()+int64(r.Begin)) @@ -1087,14 +671,6 @@ func readPeerRequestData(r Request, c *PeerConn) ([]byte, error) { return b, err } -func runSafeExtraneous(f func()) { - if true { - go f() - } else { - f() - } -} - func (c *PeerConn) logProtocolBehaviour(level log.Level, format string, arg ...interface{}) { c.logger.WithContextText(fmt.Sprintf( "peer id %q, ext v %q", c.PeerID, c.PeerClientName.Load(), @@ -1116,7 +692,7 @@ func (c *PeerConn) mainReadLoop() (err error) { decoder := pp.Decoder{ R: bufio.NewReaderSize(c.r, 1<<17), - MaxLength: 256 * 1024, + MaxLength: 4 * pp.Integer(max(int64(t.chunkSize), defaultChunkSize)), Pool: &t.chunkPool, } for { @@ -1151,13 +727,7 @@ func (c *PeerConn) mainReadLoop() (err error) { break } if !c.fastEnabled() { - if !c.deleteAllRequests().IsEmpty() { - c.t.iterPeers(func(p *Peer) { - if p.isLowOnRequests() { - p.updateRequests("choked by non-fast PeerConn") - } - }) - } + c.deleteAllRequests("choked by non-fast PeerConn") } else { // We don't decrement pending requests here, let's wait for the peer to either // reject or satisfy the outstanding requests. Additionally, some peers may unchoke @@ -1177,8 +747,8 @@ func (c *PeerConn) mainReadLoop() (err error) { } c.peerChoking = false preservedCount := 0 - c.requestState.Requests.Iterate(func(x uint32) bool { - if !c.peerAllowedFast.Contains(x / c.t.chunksPerRegularPiece()) { + c.requestState.Requests.Iterate(func(x RequestIndex) bool { + if !c.peerAllowedFast.Contains(c.t.pieceIndexOfRequestIndex(x)) { preservedCount++ } return true @@ -1211,7 +781,10 @@ func (c *PeerConn) mainReadLoop() (err error) { err = c.peerSentBitfield(msg.Bitfield) case pp.Request: r := newRequestFromMessage(&msg) - err = c.onReadRequest(r) + err = c.onReadRequest(r, true) + if err != nil { + err = fmt.Errorf("on reading request %v: %w", r, err) + } case pp.Piece: c.doChunkReadStats(int64(len(msg.Piece))) err = c.receiveChunk(&msg) @@ -1250,7 +823,7 @@ func (c *PeerConn) mainReadLoop() (err error) { case pp.Reject: req := newRequestFromMessage(&msg) if !c.remoteRejectedRequest(c.t.requestIndexFromRequest(req)) { - log.Printf("received invalid reject [request=%v, peer=%v]", req, c) + c.logger.Printf("received invalid reject [request=%v, peer=%v]", req, c) err = fmt.Errorf("received invalid reject [request=%v]", req) } case pp.AllowedFast: @@ -1268,31 +841,6 @@ func (c *PeerConn) mainReadLoop() (err error) { } } -// Returns true if it was valid to reject the request. -func (c *Peer) remoteRejectedRequest(r RequestIndex) bool { - if c.deleteRequest(r) { - c.decPeakRequests() - } else if !c.requestState.Cancelled.CheckedRemove(r) { - return false - } - if c.isLowOnRequests() { - c.updateRequests("Peer.remoteRejectedRequest") - } - c.decExpectedChunkReceive(r) - return true -} - -func (c *Peer) decExpectedChunkReceive(r RequestIndex) { - count := c.validReceiveChunks[r] - if count == 1 { - delete(c.validReceiveChunks, r) - } else if count > 1 { - c.validReceiveChunks[r] = count - 1 - } else { - panic(r) - } -} - func (c *PeerConn) onReadExtendedMsg(id pp.ExtensionNumber, payload []byte) (err error) { defer func() { // TODO: Should we still do this? @@ -1344,6 +892,7 @@ func (c *PeerConn) onReadExtendedMsg(id pp.ExtensionNumber, payload []byte) (err c.requestPendingMetadata() if !t.cl.config.DisablePEX { t.pex.Add(c) // we learnt enough now + // This checks the extension is supported internally. c.pex.Init(c) } return nil @@ -1357,7 +906,20 @@ func (c *PeerConn) onReadExtendedMsg(id pp.ExtensionNumber, payload []byte) (err if !c.pex.IsEnabled() { return nil // or hang-up maybe? } - return c.pex.Recv(payload) + err = c.pex.Recv(payload) + if err != nil { + err = fmt.Errorf("receiving pex message: %w", err) + } + return + case utHolepunchExtendedId: + var msg utHolepunch.Msg + err = msg.UnmarshalBinary(payload) + if err != nil { + err = fmt.Errorf("unmarshalling ut_holepunch message: %w", err) + return + } + err = c.t.handleReceivedUtHolepunchMsg(msg, c) + return default: return fmt.Errorf("unexpected extended message ID: %v", id) } @@ -1377,150 +939,6 @@ func (cn *PeerConn) rw() io.ReadWriter { }{cn.r, cn.w} } -func (c *Peer) doChunkReadStats(size int64) { - c.allStats(func(cs *ConnStats) { cs.receivedChunk(size) }) -} - -// Handle a received chunk from a peer. -func (c *Peer) receiveChunk(msg *pp.Message) error { - chunksReceived.Add("total", 1) - - ppReq := newRequestFromMessage(msg) - req := c.t.requestIndexFromRequest(ppReq) - - if c.peerChoking { - chunksReceived.Add("while choked", 1) - } - - if c.validReceiveChunks[req] <= 0 { - chunksReceived.Add("unexpected", 1) - return errors.New("received unexpected chunk") - } - c.decExpectedChunkReceive(req) - - if c.peerChoking && c.peerAllowedFast.Contains(bitmap.BitIndex(ppReq.Index)) { - chunksReceived.Add("due to allowed fast", 1) - } - - // The request needs to be deleted immediately to prevent cancels occurring asynchronously when - // have actually already received the piece, while we have the Client unlocked to write the data - // out. - intended := false - { - if c.requestState.Requests.Contains(req) { - for _, f := range c.callbacks.ReceivedRequested { - f(PeerMessageEvent{c, msg}) - } - } - // Request has been satisfied. - if c.deleteRequest(req) || c.requestState.Cancelled.CheckedRemove(req) { - intended = true - if !c.peerChoking { - c._chunksReceivedWhileExpecting++ - } - if c.isLowOnRequests() { - c.updateRequests("Peer.receiveChunk deleted request") - } - } else { - chunksReceived.Add("unintended", 1) - } - } - - t := c.t - cl := t.cl - - // Do we actually want this chunk? - if t.haveChunk(ppReq) { - // panic(fmt.Sprintf("%+v", ppReq)) - chunksReceived.Add("redundant", 1) - c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadWasted })) - return nil - } - - piece := &t.pieces[ppReq.Index] - - c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful })) - c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData })) - if intended { - c.piecesReceivedSinceLastRequestUpdate++ - c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulIntendedData })) - } - for _, f := range c.t.cl.config.Callbacks.ReceivedUsefulData { - f(ReceivedUsefulDataEvent{c, msg}) - } - c.lastUsefulChunkReceived = time.Now() - - // Need to record that it hasn't been written yet, before we attempt to do - // anything with it. - piece.incrementPendingWrites() - // Record that we have the chunk, so we aren't trying to download it while - // waiting for it to be written to storage. - piece.unpendChunkIndex(chunkIndexFromChunkSpec(ppReq.ChunkSpec, t.chunkSize)) - - // Cancel pending requests for this chunk from *other* peers. - if p := t.pendingRequests[req]; p != nil { - if p == c { - panic("should not be pending request from conn that just received it") - } - p.cancel(req) - } - - err := func() error { - cl.unlock() - defer cl.lock() - concurrentChunkWrites.Add(1) - defer concurrentChunkWrites.Add(-1) - // Write the chunk out. Note that the upper bound on chunk writing concurrency will be the - // number of connections. We write inline with receiving the chunk (with this lock dance), - // because we want to handle errors synchronously and I haven't thought of a nice way to - // defer any concurrency to the storage and have that notify the client of errors. TODO: Do - // that instead. - return t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece) - }() - - piece.decrementPendingWrites() - - if err != nil { - c.logger.WithDefaultLevel(log.Error).Printf("writing received chunk %v: %v", req, err) - t.pendRequest(req) - // Necessary to pass TestReceiveChunkStorageFailureSeederFastExtensionDisabled. I think a - // request update runs while we're writing the chunk that just failed. Then we never do a - // fresh update after pending the failed request. - c.updateRequests("Peer.receiveChunk error writing chunk") - t.onWriteChunkErr(err) - return nil - } - - c.onDirtiedPiece(pieceIndex(ppReq.Index)) - - // We need to ensure the piece is only queued once, so only the last chunk writer gets this job. - if t.pieceAllDirty(pieceIndex(ppReq.Index)) && piece.pendingWrites == 0 { - t.queuePieceCheck(pieceIndex(ppReq.Index)) - // We don't pend all chunks here anymore because we don't want code dependent on the dirty - // chunk status (such as the haveChunk call above) to have to check all the various other - // piece states like queued for hash, hashing etc. This does mean that we need to be sure - // that chunk pieces are pended at an appropriate time later however. - } - - cl.event.Broadcast() - // We do this because we've written a chunk, and may change PieceState.Partial. - t.publishPieceChange(pieceIndex(ppReq.Index)) - - return nil -} - -func (c *Peer) onDirtiedPiece(piece pieceIndex) { - if c.peerTouchedPieces == nil { - c.peerTouchedPieces = make(map[pieceIndex]struct{}) - } - c.peerTouchedPieces[piece] = struct{}{} - ds := &c.t.pieces[piece].dirtiers - if *ds == nil { - *ds = make(map[*Peer]struct{}) - } - (*ds)[c] = struct{}{} -} - func (c *PeerConn) uploadAllowed() bool { if c.t.cl.config.NoUpload { return false @@ -1594,71 +1012,6 @@ func (cn *PeerConn) ban() { cn.t.cl.banPeerIP(cn.remoteIp()) } -func (cn *Peer) netGoodPiecesDirtied() int64 { - return cn._stats.PiecesDirtiedGood.Int64() - cn._stats.PiecesDirtiedBad.Int64() -} - -func (c *Peer) peerHasWantedPieces() bool { - if all, _ := c.peerHasAllPieces(); all { - return !c.t.haveAllPieces() && !c.t._pendingPieces.IsEmpty() - } - if !c.t.haveInfo() { - return !c.peerPieces().IsEmpty() - } - return c.peerPieces().Intersects(&c.t._pendingPieces) -} - -// Returns true if an outstanding request is removed. Cancelled requests should be handled -// separately. -func (c *Peer) deleteRequest(r RequestIndex) bool { - if !c.requestState.Requests.CheckedRemove(r) { - return false - } - for _, f := range c.callbacks.DeletedRequest { - f(PeerRequestEvent{c, c.t.requestIndexToRequest(r)}) - } - c.updateExpectingChunks() - if c.t.requestingPeer(r) != c { - panic("only one peer should have a given request at a time") - } - delete(c.t.pendingRequests, r) - delete(c.t.lastRequested, r) - // c.t.iterPeers(func(p *Peer) { - // if p.isLowOnRequests() { - // p.updateRequests("Peer.deleteRequest") - // } - // }) - return true -} - -func (c *Peer) deleteAllRequests() (deleted *roaring.Bitmap) { - deleted = c.requestState.Requests.Clone() - deleted.Iterate(func(x uint32) bool { - if !c.deleteRequest(x) { - panic("request should exist") - } - return true - }) - c.assertNoRequests() - return -} - -func (c *Peer) assertNoRequests() { - if !c.requestState.Requests.IsEmpty() { - panic(c.requestState.Requests.GetCardinality()) - } -} - -func (c *Peer) cancelAllRequests() (cancelled *roaring.Bitmap) { - cancelled = c.requestState.Requests.Clone() - cancelled.Iterate(func(x uint32) bool { - c.cancel(x) - return true - }) - c.assertNoRequests() - return -} - // This is called when something has changed that should wake the writer, such as putting stuff into // the writeBuffer, or changing some state that the writer can act on. func (c *PeerConn) tickleWriter() { @@ -1667,6 +1020,7 @@ func (c *PeerConn) tickleWriter() { func (c *PeerConn) sendChunk(r Request, msg func(pp.Message) bool, state *peerRequestState) (more bool) { c.lastChunkSent = time.Now() + state.allocReservation.Release() return msg(pp.Message{ Type: pp.Piece, Index: r.Index, @@ -1684,20 +1038,6 @@ func (c *PeerConn) setTorrent(t *Torrent) { t.reconcileHandshakeStats(c) } -func (c *Peer) peerPriority() (peerPriority, error) { - return bep40Priority(c.remoteIpPort(), c.t.cl.publicAddr(c.remoteIp())) -} - -func (c *Peer) remoteIp() net.IP { - host, _, _ := net.SplitHostPort(c.RemoteAddr.String()) - return net.ParseIP(host) -} - -func (c *Peer) remoteIpPort() IpPort { - ipa, _ := tryIpPortFromNetAddr(c.RemoteAddr) - return IpPort{ipa.IP, uint16(ipa.Port)} -} - func (c *PeerConn) pexPeerFlags() pp.PexPeerFlags { f := pp.PexPeerFlags(0) if c.PeerPrefersEncryption { @@ -1715,42 +1055,35 @@ func (c *PeerConn) pexPeerFlags() pp.PexPeerFlags { // This returns the address to use if we want to dial the peer again. It incorporates the peer's // advertised listen port. func (c *PeerConn) dialAddr() PeerRemoteAddr { - if !c.outgoing && c.PeerListenPort != 0 { - switch addr := c.RemoteAddr.(type) { - case *net.TCPAddr: - dialAddr := *addr - dialAddr.Port = c.PeerListenPort - return &dialAddr - case *net.UDPAddr: - dialAddr := *addr - dialAddr.Port = c.PeerListenPort - return &dialAddr - } + if c.outgoing || c.PeerListenPort == 0 { + return c.RemoteAddr + } + addrPort, err := addrPortFromPeerRemoteAddr(c.RemoteAddr) + if err != nil { + c.logger.Levelf( + log.Warning, + "error parsing %q for alternate dial port: %v", + c.RemoteAddr, + err, + ) + return c.RemoteAddr } - return c.RemoteAddr + return netip.AddrPortFrom(addrPort.Addr(), uint16(c.PeerListenPort)) } -func (c *PeerConn) pexEvent(t pexEventType) pexEvent { +func (c *PeerConn) pexEvent(t pexEventType) (_ pexEvent, err error) { f := c.pexPeerFlags() - addr := c.dialAddr() - return pexEvent{t, addr, f, nil} + dialAddr := c.dialAddr() + addr, err := addrPortFromPeerRemoteAddr(dialAddr) + if err != nil || !addr.IsValid() { + err = fmt.Errorf("parsing dial addr %q: %w", dialAddr, err) + return + } + return pexEvent{t, addr, f, nil}, nil } func (c *PeerConn) String() string { - return fmt.Sprintf("%T %p [id=%q, exts=%v, v=%q]", c, c, c.PeerID, c.PeerExtensionBytes, c.PeerClientName.Load()) -} - -func (c *Peer) trust() connectionTrust { - return connectionTrust{c.trusted, c.netGoodPiecesDirtied()} -} - -type connectionTrust struct { - Implicit bool - NetGoodPiecesDirted int64 -} - -func (l connectionTrust) Less(r connectionTrust) bool { - return multiless.New().Bool(l.Implicit, r.Implicit).Int64(l.NetGoodPiecesDirted, r.NetGoodPiecesDirted).Less() + return fmt.Sprintf("%T %p [id=%+q, exts=%v, v=%q]", c, c, c.PeerID, c.PeerExtensionBytes, c.PeerClientName.Load()) } // Returns the pieces the peer could have based on their claims. If we don't know how many pieces @@ -1761,33 +1094,38 @@ func (cn *PeerConn) PeerPieces() *roaring.Bitmap { return cn.newPeerPieces() } -// Returns a new Bitmap that includes bits for all pieces the peer could have based on their claims. -func (cn *Peer) newPeerPieces() *roaring.Bitmap { - // TODO: Can we use copy on write? - ret := cn.peerPieces().Clone() - if all, _ := cn.peerHasAllPieces(); all { - if cn.t.haveInfo() { - ret.AddRange(0, bitmap.BitRange(cn.t.numPieces())) - } else { - ret.AddRange(0, bitmap.ToEnd) - } - } - return ret +func (pc *PeerConn) remoteIsTransmission() bool { + return bytes.HasPrefix(pc.PeerID[:], []byte("-TR")) && pc.PeerID[7] == '-' } -func (cn *Peer) stats() *ConnStats { - return &cn._stats +func (pc *PeerConn) remoteDialAddrPort() (netip.AddrPort, error) { + dialAddr := pc.dialAddr() + return addrPortFromPeerRemoteAddr(dialAddr) } -func (p *Peer) TryAsPeerConn() (*PeerConn, bool) { - pc, ok := p.peerImpl.(*PeerConn) - return pc, ok +func (pc *PeerConn) bitExtensionEnabled(bit pp.ExtensionBit) bool { + return pc.t.cl.config.Extensions.GetBit(bit) && pc.PeerExtensionBytes.GetBit(bit) } -func (p *Peer) uncancelledRequests() uint64 { - return p.requestState.Requests.GetCardinality() +func (cn *PeerConn) peerPiecesChanged() { + cn.t.maybeDropMutuallyCompletePeer(cn) } -func (pc *PeerConn) remoteIsTransmission() bool { - return bytes.HasPrefix(pc.PeerID[:], []byte("-TR")) && pc.PeerID[7] == '-' +// Returns whether the connection could be useful to us. We're seeding and +// they want data, we don't have metainfo and they can provide it, etc. +func (c *PeerConn) useful() bool { + t := c.t + if c.closed.IsSet() { + return false + } + if !t.haveInfo() { + return c.supportsExtension("ut_metadata") + } + if t.seeding() && c.peerInterested { + return true + } + if c.peerHasWantedPieces() { + return true + } + return false }