X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=peerconn.go;h=e2d944ff269bb94426772d60f267bb359582d156;hb=178b60668cfff8c0e74dbe896c8496fd911c595a;hp=c45d87ff6d06e527f7f82a7c006014352c497d30;hpb=e86e624415db323121e8b4c0824622cdc03bc874;p=btrtrc.git diff --git a/peerconn.go b/peerconn.go index c45d87ff..e2d944ff 100644 --- a/peerconn.go +++ b/peerconn.go @@ -12,15 +12,16 @@ import ( "net/netip" "strconv" "strings" + "sync/atomic" "time" - utHolepunch "github.com/anacrolix/torrent/peer_protocol/ut-holepunch" - "github.com/RoaringBitmap/roaring" + "github.com/anacrolix/generics" . "github.com/anacrolix/generics" "github.com/anacrolix/log" "github.com/anacrolix/missinggo/v2/bitmap" "github.com/anacrolix/multiless" + "golang.org/x/exp/maps" "golang.org/x/time/rate" "github.com/anacrolix/torrent/bencode" @@ -28,6 +29,7 @@ import ( "github.com/anacrolix/torrent/metainfo" "github.com/anacrolix/torrent/mse" pp "github.com/anacrolix/torrent/peer_protocol" + utHolepunch "github.com/anacrolix/torrent/peer_protocol/ut-holepunch" ) // Maintains the state of a BitTorrent-protocol based connection with a peer. @@ -41,6 +43,7 @@ type PeerConn struct { // See BEP 3 etc. PeerID PeerID PeerExtensionBytes pp.PeerExtensionBits + PeerListenPort int // The actual Conn, used for closing, and setting socket options. Do not use methods on this // while holding any mutexes. @@ -52,8 +55,10 @@ type PeerConn struct { messageWriter peerConnMsgWriter - uploadTimer *time.Timer - pex pexConnState + PeerExtensionIDs map[pp.ExtensionName]pp.ExtensionNumber + PeerClientName atomic.Value + uploadTimer *time.Timer + pex pexConnState // The pieces the peer has claimed to have. _peerPieces roaring.Bitmap @@ -66,20 +71,44 @@ type PeerConn struct { outstandingHolepunchingRendezvous map[netip.AddrPort]struct{} } +func (cn *PeerConn) pexStatus() string { + if !cn.bitExtensionEnabled(pp.ExtensionBitLtep) { + return "extended protocol disabled" + } + if cn.PeerExtensionIDs == nil { + return "pending extended handshake" + } + if !cn.supportsExtension(pp.ExtensionNamePex) { + return "unsupported" + } + if true { + return fmt.Sprintf( + "%v conns, %v unsent events", + len(cn.pex.remoteLiveConns), + cn.pex.numPending(), + ) + } else { + // This alternative branch prints out the remote live conn addresses. + return fmt.Sprintf( + "%v conns, %v unsent events", + strings.Join(generics.SliceMap( + maps.Keys(cn.pex.remoteLiveConns), + func(from netip.AddrPort) string { + return from.String() + }), ","), + cn.pex.numPending(), + ) + } +} + func (cn *PeerConn) peerImplStatusLines() []string { - lines := make([]string, 0, 2) - lines = append( - lines, - fmt.Sprintf("%+-55q %v %s", cn.PeerID, cn.PeerExtensionBytes, cn.connString)) - if cn.supportsExtension(pp.ExtensionNamePex) { - lines = append( - lines, - fmt.Sprintf( - "pex: %v conns, %v unsent events", - cn.pex.remoteLiveConns, - cn.pex.numPending())) + return []string{ + cn.connString, + fmt.Sprintf("peer id: %+q", cn.PeerID), + fmt.Sprintf("extensions: %v", cn.PeerExtensionBytes), + fmt.Sprintf("ltep extensions: %v", cn.PeerExtensionIDs), + fmt.Sprintf("pex: %s", cn.pexStatus()), } - return lines } // Returns true if the connection is over IPv6. @@ -91,10 +120,12 @@ func (cn *PeerConn) ipv6() bool { return len(ip) == net.IPv6len } -// Returns true the if the dialer/initiator has the lower client peer ID. TODO: Find the -// specification for this. +// Returns true the if the dialer/initiator has the higher client peer ID. See +// https://github.com/arvidn/libtorrent/blame/272828e1cc37b042dfbbafa539222d8533e99755/src/bt_peer_connection.cpp#L3536-L3557. +// As far as I can tell, Transmission just keeps the oldest connection. func (cn *PeerConn) isPreferredDirection() bool { - return bytes.Compare(cn.t.cl.peerID[:], cn.PeerID[:]) < 0 == cn.outgoing + // True if our client peer ID is higher than the remote's peer ID. + return bytes.Compare(cn.PeerID[:], cn.t.cl.peerID[:]) < 0 == cn.outgoing } // Returns whether the left connection should be preferred over the right one, @@ -558,7 +589,16 @@ func (c *PeerConn) onReadRequest(r Request, startFetch bool) error { } func (c *PeerConn) peerRequestDataReader(r Request, prs *peerRequestState) { - b, err := c.readPeerRequestData(r, prs) + // Should we depend on Torrent closure here? I think it's okay to get cancelled from elsewhere, + // or fail to read and then cleanup. Also, we used to hang here if the reservation was never + // dropped, that was fixed. + ctx := context.Background() + err := prs.allocReservation.Wait(ctx) + if err != nil { + c.logger.WithDefaultLevel(log.Debug).Levelf(log.ErrorLevel(err), "waiting for alloc limit reservation: %v", err) + return + } + b, err := c.readPeerRequestData(r) c.locker().Lock() defer c.locker().Unlock() if err != nil { @@ -584,7 +624,7 @@ func (c *PeerConn) peerRequestDataReadFailed(err error, r Request) { // https://github.com/anacrolix/torrent/issues/702#issuecomment-1000953313. logLevel = log.Debug } - c.logger.WithDefaultLevel(logLevel).Printf("error reading chunk for peer Request %v: %v", r, err) + c.logger.Levelf(logLevel, "error reading chunk for peer Request %v: %v", r, err) if c.t.closed.IsSet() { return } @@ -615,20 +655,7 @@ func (c *PeerConn) peerRequestDataReadFailed(err error, r Request) { } } -func (c *PeerConn) readPeerRequestData(r Request, prs *peerRequestState) ([]byte, error) { - // Should we depend on Torrent closure here? I think it's okay to get cancelled from elsewhere, - // or fail to read and then cleanup. - ctx := context.Background() - err := prs.allocReservation.Wait(ctx) - if err != nil { - if ctx.Err() == nil { - // The error is from the reservation itself. Something is very broken, or we're not - // guarding against excessively large requests. - err = log.WithLevel(log.Critical, err) - } - err = fmt.Errorf("waiting for alloc limit reservation: %w", err) - return nil, err - } +func (c *PeerConn) readPeerRequestData(r Request) ([]byte, error) { b := make([]byte, r.Length) p := c.t.info.Piece(int(r.Index)) n, err := c.t.readAt(b, p.Offset()+int64(r.Begin)) @@ -796,8 +823,8 @@ func (c *PeerConn) mainReadLoop() (err error) { case pp.Reject: req := newRequestFromMessage(&msg) if !c.remoteRejectedRequest(c.t.requestIndexFromRequest(req)) { - c.logger.Printf("received invalid reject [request=%v, peer=%v]", req, c) - err = fmt.Errorf("received invalid reject [request=%v]", req) + err = fmt.Errorf("received invalid reject for request %v", req) + c.logger.Levelf(log.Debug, "%v", err) } case pp.AllowedFast: torrent.Add("allowed fasts received", 1) @@ -891,6 +918,8 @@ func (c *PeerConn) onReadExtendedMsg(id pp.ExtensionNumber, payload []byte) (err err = fmt.Errorf("unmarshalling ut_holepunch message: %w", err) return } + err = c.t.handleReceivedUtHolepunchMsg(msg, c) + return default: return fmt.Errorf("unexpected extended message ID: %v", id) } @@ -1026,29 +1055,35 @@ func (c *PeerConn) pexPeerFlags() pp.PexPeerFlags { // This returns the address to use if we want to dial the peer again. It incorporates the peer's // advertised listen port. func (c *PeerConn) dialAddr() PeerRemoteAddr { - if !c.outgoing && c.PeerListenPort != 0 { - switch addr := c.RemoteAddr.(type) { - case *net.TCPAddr: - dialAddr := *addr - dialAddr.Port = c.PeerListenPort - return &dialAddr - case *net.UDPAddr: - dialAddr := *addr - dialAddr.Port = c.PeerListenPort - return &dialAddr - } + if c.outgoing || c.PeerListenPort == 0 { + return c.RemoteAddr } - return c.RemoteAddr + addrPort, err := addrPortFromPeerRemoteAddr(c.RemoteAddr) + if err != nil { + c.logger.Levelf( + log.Warning, + "error parsing %q for alternate dial port: %v", + c.RemoteAddr, + err, + ) + return c.RemoteAddr + } + return netip.AddrPortFrom(addrPort.Addr(), uint16(c.PeerListenPort)) } -func (c *PeerConn) pexEvent(t pexEventType) pexEvent { +func (c *PeerConn) pexEvent(t pexEventType) (_ pexEvent, err error) { f := c.pexPeerFlags() - addr := c.dialAddr() - return pexEvent{t, addr, f, nil} + dialAddr := c.dialAddr() + addr, err := addrPortFromPeerRemoteAddr(dialAddr) + if err != nil || !addr.IsValid() { + err = fmt.Errorf("parsing dial addr %q: %w", dialAddr, err) + return + } + return pexEvent{t, addr, f, nil}, nil } func (c *PeerConn) String() string { - return fmt.Sprintf("%T %p [id=%q, exts=%v, v=%q]", c, c, c.PeerID, c.PeerExtensionBytes, c.PeerClientName.Load()) + return fmt.Sprintf("%T %p [id=%+q, exts=%v, v=%q]", c, c, c.PeerID, c.PeerExtensionBytes, c.PeerClientName.Load()) } // Returns the pieces the peer could have based on their claims. If we don't know how many pieces @@ -1063,8 +1098,34 @@ func (pc *PeerConn) remoteIsTransmission() bool { return bytes.HasPrefix(pc.PeerID[:], []byte("-TR")) && pc.PeerID[7] == '-' } -func (pc *PeerConn) remoteAddrPort() Option[netip.AddrPort] { - return Some(pc.conn.RemoteAddr().(interface { - AddrPort() netip.AddrPort - }).AddrPort()) +func (pc *PeerConn) remoteDialAddrPort() (netip.AddrPort, error) { + dialAddr := pc.dialAddr() + return addrPortFromPeerRemoteAddr(dialAddr) +} + +func (pc *PeerConn) bitExtensionEnabled(bit pp.ExtensionBit) bool { + return pc.t.cl.config.Extensions.GetBit(bit) && pc.PeerExtensionBytes.GetBit(bit) +} + +func (cn *PeerConn) peerPiecesChanged() { + cn.t.maybeDropMutuallyCompletePeer(cn) +} + +// Returns whether the connection could be useful to us. We're seeding and +// they want data, we don't have metainfo and they can provide it, etc. +func (c *PeerConn) useful() bool { + t := c.t + if c.closed.IsSet() { + return false + } + if !t.haveInfo() { + return c.supportsExtension("ut_metadata") + } + if t.seeding() && c.peerInterested { + return true + } + if c.peerHasWantedPieces() { + return true + } + return false }