"net/netip"
"strconv"
"strings"
+ "sync/atomic"
"time"
- utHolepunch "github.com/anacrolix/torrent/peer_protocol/ut-holepunch"
-
"github.com/RoaringBitmap/roaring"
+ "github.com/anacrolix/generics"
. "github.com/anacrolix/generics"
"github.com/anacrolix/log"
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/multiless"
+ "golang.org/x/exp/maps"
"golang.org/x/time/rate"
"github.com/anacrolix/torrent/bencode"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/mse"
pp "github.com/anacrolix/torrent/peer_protocol"
+ utHolepunch "github.com/anacrolix/torrent/peer_protocol/ut-holepunch"
)
// Maintains the state of a BitTorrent-protocol based connection with a peer.
// See BEP 3 etc.
PeerID PeerID
PeerExtensionBytes pp.PeerExtensionBits
+ PeerListenPort int
// The actual Conn, used for closing, and setting socket options. Do not use methods on this
// while holding any mutexes.
messageWriter peerConnMsgWriter
- uploadTimer *time.Timer
- pex pexConnState
+ PeerExtensionIDs map[pp.ExtensionName]pp.ExtensionNumber
+ PeerClientName atomic.Value
+ uploadTimer *time.Timer
+ pex pexConnState
// The pieces the peer has claimed to have.
_peerPieces roaring.Bitmap
outstandingHolepunchingRendezvous map[netip.AddrPort]struct{}
}
+func (cn *PeerConn) pexStatus() string {
+ if !cn.bitExtensionEnabled(pp.ExtensionBitLtep) {
+ return "extended protocol disabled"
+ }
+ if cn.PeerExtensionIDs == nil {
+ return "pending extended handshake"
+ }
+ if !cn.supportsExtension(pp.ExtensionNamePex) {
+ return "unsupported"
+ }
+ if true {
+ return fmt.Sprintf(
+ "%v conns, %v unsent events",
+ len(cn.pex.remoteLiveConns),
+ cn.pex.numPending(),
+ )
+ } else {
+ // This alternative branch prints out the remote live conn addresses.
+ return fmt.Sprintf(
+ "%v conns, %v unsent events",
+ strings.Join(generics.SliceMap(
+ maps.Keys(cn.pex.remoteLiveConns),
+ func(from netip.AddrPort) string {
+ return from.String()
+ }), ","),
+ cn.pex.numPending(),
+ )
+ }
+}
+
func (cn *PeerConn) peerImplStatusLines() []string {
- lines := make([]string, 0, 2)
- lines = append(
- lines,
- fmt.Sprintf("%+-55q %v %s", cn.PeerID, cn.PeerExtensionBytes, cn.connString))
- if cn.supportsExtension(pp.ExtensionNamePex) {
- lines = append(
- lines,
- fmt.Sprintf(
- "pex: %v conns, %v unsent events",
- cn.pex.remoteLiveConns,
- cn.pex.numPending()))
+ return []string{
+ cn.connString,
+ fmt.Sprintf("peer id: %+q", cn.PeerID),
+ fmt.Sprintf("extensions: %v", cn.PeerExtensionBytes),
+ fmt.Sprintf("ltep extensions: %v", cn.PeerExtensionIDs),
+ fmt.Sprintf("pex: %s", cn.pexStatus()),
}
- return lines
}
// Returns true if the connection is over IPv6.
return len(ip) == net.IPv6len
}
-// Returns true the if the dialer/initiator has the lower client peer ID. TODO: Find the
-// specification for this.
+// Returns true the if the dialer/initiator has the higher client peer ID. See
+// https://github.com/arvidn/libtorrent/blame/272828e1cc37b042dfbbafa539222d8533e99755/src/bt_peer_connection.cpp#L3536-L3557.
+// As far as I can tell, Transmission just keeps the oldest connection.
func (cn *PeerConn) isPreferredDirection() bool {
- return bytes.Compare(cn.t.cl.peerID[:], cn.PeerID[:]) < 0 == cn.outgoing
+ // True if our client peer ID is higher than the remote's peer ID.
+ return bytes.Compare(cn.PeerID[:], cn.t.cl.peerID[:]) < 0 == cn.outgoing
}
// Returns whether the left connection should be preferred over the right one,
err = fmt.Errorf("unmarshalling ut_holepunch message: %w", err)
return
}
+ err = c.t.handleReceivedUtHolepunchMsg(msg, c)
+ return
default:
return fmt.Errorf("unexpected extended message ID: %v", id)
}
// This returns the address to use if we want to dial the peer again. It incorporates the peer's
// advertised listen port.
func (c *PeerConn) dialAddr() PeerRemoteAddr {
- if !c.outgoing && c.PeerListenPort != 0 {
- switch addr := c.RemoteAddr.(type) {
- case *net.TCPAddr:
- dialAddr := *addr
- dialAddr.Port = c.PeerListenPort
- return &dialAddr
- case *net.UDPAddr:
- dialAddr := *addr
- dialAddr.Port = c.PeerListenPort
- return &dialAddr
- }
+ if c.outgoing || c.PeerListenPort == 0 {
+ return c.RemoteAddr
}
- return c.RemoteAddr
+ addrPort, err := addrPortFromPeerRemoteAddr(c.RemoteAddr)
+ if err != nil {
+ c.logger.Levelf(
+ log.Warning,
+ "error parsing %q for alternate dial port: %v",
+ c.RemoteAddr,
+ err,
+ )
+ return c.RemoteAddr
+ }
+ return netip.AddrPortFrom(addrPort.Addr(), uint16(c.PeerListenPort))
}
-func (c *PeerConn) pexEvent(t pexEventType) pexEvent {
+func (c *PeerConn) pexEvent(t pexEventType) (_ pexEvent, err error) {
f := c.pexPeerFlags()
- addr := c.dialAddr()
- return pexEvent{t, addr, f, nil}
+ dialAddr := c.dialAddr()
+ addr, err := addrPortFromPeerRemoteAddr(dialAddr)
+ if err != nil || !addr.IsValid() {
+ err = fmt.Errorf("parsing dial addr %q: %w", dialAddr, err)
+ return
+ }
+ return pexEvent{t, addr, f, nil}, nil
}
func (c *PeerConn) String() string {
- return fmt.Sprintf("%T %p [id=%q, exts=%v, v=%q]", c, c, c.PeerID, c.PeerExtensionBytes, c.PeerClientName.Load())
+ return fmt.Sprintf("%T %p [id=%+q, exts=%v, v=%q]", c, c, c.PeerID, c.PeerExtensionBytes, c.PeerClientName.Load())
}
// Returns the pieces the peer could have based on their claims. If we don't know how many pieces
return bytes.HasPrefix(pc.PeerID[:], []byte("-TR")) && pc.PeerID[7] == '-'
}
-func (pc *PeerConn) remoteAddrPort() Option[netip.AddrPort] {
- return Some(pc.conn.RemoteAddr().(interface {
- AddrPort() netip.AddrPort
- }).AddrPort())
+func (pc *PeerConn) remoteDialAddrPort() (netip.AddrPort, error) {
+ dialAddr := pc.dialAddr()
+ return addrPortFromPeerRemoteAddr(dialAddr)
+}
+
+func (pc *PeerConn) bitExtensionEnabled(bit pp.ExtensionBit) bool {
+ return pc.t.cl.config.Extensions.GetBit(bit) && pc.PeerExtensionBytes.GetBit(bit)
+}
+
+func (cn *PeerConn) peerPiecesChanged() {
+ cn.t.maybeDropMutuallyCompletePeer(cn)
+}
+
+// Returns whether the connection could be useful to us. We're seeding and
+// they want data, we don't have metainfo and they can provide it, etc.
+func (c *PeerConn) useful() bool {
+ t := c.t
+ if c.closed.IsSet() {
+ return false
+ }
+ if !t.haveInfo() {
+ return c.supportsExtension("ut_metadata")
+ }
+ if t.seeding() && c.peerInterested {
+ return true
+ }
+ if c.peerHasWantedPieces() {
+ return true
+ }
+ return false
}