X-Git-Url: http://www.git.stargrave.org/?a=blobdiff_plain;f=pex.go;h=a0a5f49f5babfb54a2ca11b532d7a017babf4bb8;hb=HEAD;hp=76d4cb42eb1eddba0321e9ea9963ac23e9103164;hpb=bdb218a6983be41e735a991900234dba88254977;p=btrtrc.git diff --git a/pex.go b/pex.go index 76d4cb42..a0a5f49f 100644 --- a/pex.go +++ b/pex.go @@ -2,10 +2,10 @@ package torrent import ( "net" + "net/netip" "sync" "time" - "github.com/anacrolix/dht/v2/krpc" pp "github.com/anacrolix/torrent/peer_protocol" ) @@ -26,7 +26,7 @@ const ( // represents a single connection (t=pexAdd) or disconnection (t=pexDrop) event type pexEvent struct { t pexEventType - addr PeerRemoteAddr + addr netip.AddrPort f pp.PexPeerFlags next *pexEvent // event feed list } @@ -34,8 +34,8 @@ type pexEvent struct { // facilitates efficient de-duplication while generating PEX messages type pexMsgFactory struct { msg pp.PexMsg - added map[addrKey]struct{} - dropped map[addrKey]struct{} + added map[netip.AddrPort]struct{} + dropped map[netip.AddrPort]struct{} } func (me *pexMsgFactory) DeltaLen() int { @@ -44,11 +44,11 @@ func (me *pexMsgFactory) DeltaLen() int { int64(len(me.dropped)))) } -type addrKey string +type addrKey = netip.AddrPort // Returns the key to use to identify a given addr in the factory. -func (me *pexMsgFactory) addrKey(addr PeerRemoteAddr) addrKey { - return addrKey(addr.String()) +func (me *pexMsgFactory) addrKey(addr netip.AddrPort) addrKey { + return addr } // Returns whether the entry was added (we can check if we're cancelling out another entry and so @@ -61,10 +61,7 @@ func (me *pexMsgFactory) add(e pexEvent) { if me.added == nil { me.added = make(map[addrKey]struct{}, pexMaxDelta) } - addr, ok := nodeAddr(e.addr) - if !ok { - return - } + addr := krpcNodeAddrFromAddrPort(e.addr) m := &me.msg switch { case addr.IP.To4() != nil: @@ -96,10 +93,7 @@ func (me *pexMsgFactory) add(e pexEvent) { // Returns whether the entry was added (we can check if we're cancelling out another entry and so // won't hit the limit consuming this event). func (me *pexMsgFactory) drop(e pexEvent) { - addr, ok := nodeAddr(e.addr) - if !ok { - return - } + addr := krpcNodeAddrFromAddrPort(e.addr) key := me.addrKey(e.addr) if me.dropped == nil { me.dropped = make(map[addrKey]struct{}, pexMaxDelta) @@ -144,32 +138,18 @@ func (me *pexMsgFactory) append(event pexEvent) { } } -func (me *pexMsgFactory) PexMsg() pp.PexMsg { - return me.msg -} - -// Convert an arbitrary torrent peer Addr into one that can be represented by the compact addr -// format. -func nodeAddr(addr PeerRemoteAddr) (krpc.NodeAddr, bool) { - ipport, _ := tryIpPortFromNetAddr(addr) - ok := ipport.IP != nil - return krpc.NodeAddr{IP: shortestIP(ipport.IP), Port: ipport.Port}, ok -} - -// mainly for the krpc marshallers -func shortestIP(ip net.IP) net.IP { - if ip4 := ip.To4(); ip4 != nil { - return ip4 - } - return ip +func (me *pexMsgFactory) PexMsg() *pp.PexMsg { + return &me.msg } // Per-torrent PEX state type pexState struct { sync.RWMutex - tail *pexEvent // event feed list - hold []pexEvent // delayed drops - rest time.Time // cooldown deadline on inbound + tail *pexEvent // event feed list + hold []pexEvent // delayed drops + // Torrent-wide cooldown deadline on inbound. This exists to prevent PEX from drowning out other + // peer address sources, until that is fixed. + rest time.Time nc int // net number of alive conns msg0 pexMsgFactory // initial message } @@ -194,6 +174,10 @@ func (s *pexState) append(e *pexEvent) { } func (s *pexState) Add(c *PeerConn) { + e, err := c.pexEvent(pexAdd) + if err != nil { + return + } s.Lock() defer s.Unlock() s.nc++ @@ -204,7 +188,6 @@ func (s *pexState) Add(c *PeerConn) { } s.hold = s.hold[:0] } - e := c.pexEvent(pexAdd) c.pex.Listed = true s.append(&e) } @@ -214,9 +197,12 @@ func (s *pexState) Drop(c *PeerConn) { // skip connections which were not previously Added return } + e, err := c.pexEvent(pexDrop) + if err != nil { + return + } s.Lock() defer s.Unlock() - e := c.pexEvent(pexDrop) s.nc-- if s.nc < pexTargAdded && len(s.hold) < pexMaxHold { s.hold = append(s.hold, e) @@ -232,7 +218,7 @@ func (s *pexState) Genmsg(start *pexEvent) (pp.PexMsg, *pexEvent) { s.RLock() defer s.RUnlock() if start == nil { - return s.msg0.PexMsg(), s.tail + return *s.msg0.PexMsg(), s.tail } var msg pexMsgFactory last := start @@ -243,5 +229,18 @@ func (s *pexState) Genmsg(start *pexEvent) (pp.PexMsg, *pexEvent) { msg.append(*e) last = e } - return msg.PexMsg(), last + return *msg.PexMsg(), last +} + +// The same as Genmsg but just counts up the distinct events that haven't been sent. +func (s *pexState) numPending(start *pexEvent) (num int) { + s.RLock() + defer s.RUnlock() + if start == nil { + return s.msg0.PexMsg().Len() + } + for e := start.next; e != nil; e = e.next { + num++ + } + return }