peerChoking bool
peerRequests map[Request]*peerRequestState
PeerPrefersEncryption bool // as indicated by 'e' field in extension handshake
- PeerListenPort int
// The highest possible number of pieces the torrent could have based on
// communication with the peer. Generally only useful until we have the
// torrent info.
"time"
"github.com/RoaringBitmap/roaring"
+ "github.com/anacrolix/generics"
. "github.com/anacrolix/generics"
"github.com/anacrolix/log"
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/multiless"
+ "golang.org/x/exp/maps"
"golang.org/x/time/rate"
"github.com/anacrolix/torrent/bencode"
// See BEP 3 etc.
PeerID PeerID
PeerExtensionBytes pp.PeerExtensionBits
+ PeerListenPort int
// The actual Conn, used for closing, and setting socket options. Do not use methods on this
// while holding any mutexes.
if !cn.supportsExtension(pp.ExtensionNamePex) {
return "unsupported"
}
- return fmt.Sprintf(
- "%v conns, %v unsent events",
- len(cn.pex.remoteLiveConns),
- cn.pex.numPending(),
- )
+ if true {
+ return fmt.Sprintf(
+ "%v conns, %v unsent events",
+ len(cn.pex.remoteLiveConns),
+ cn.pex.numPending(),
+ )
+ } else {
+ // This alternative branch prints out the remote live conn addresses.
+ return fmt.Sprintf(
+ "%v conns, %v unsent events",
+ strings.Join(generics.SliceMap(
+ maps.Keys(cn.pex.remoteLiveConns),
+ func(from netip.AddrPort) string {
+ return from.String()
+ }), ","),
+ cn.pex.numPending(),
+ )
+
+ }
}
func (cn *PeerConn) peerImplStatusLines() []string {
dialAddr := *addr
dialAddr.Port = c.PeerListenPort
return &dialAddr
+ default:
+ panic(addr)
}
}
return c.RemoteAddr
}).AddrPort())
}
+func (pc *PeerConn) remoteDialAddrPort() (netip.AddrPort, error) {
+ dialAddr := pc.dialAddr()
+ return addrPortFromPeerRemoteAddr(dialAddr)
+}
+
func (pc *PeerConn) bitExtensionEnabled(bit pp.ExtensionBit) bool {
return pc.t.cl.config.Extensions.GetBit(bit) && pc.PeerExtensionBytes.GetBit(bit)
}
},
{
pexDrop,
- &PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true, PeerListenPort: dialTcpAddr.Port}},
+ &PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true}, PeerListenPort: dialTcpAddr.Port},
pexEvent{pexDrop, tcpAddr, pp.PexOutgoingConn, nil},
},
{
pexAdd,
- &PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), PeerListenPort: dialTcpAddr.Port}},
+ &PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network()}, PeerListenPort: dialTcpAddr.Port},
pexEvent{pexAdd, dialTcpAddr, 0, nil},
},
{
pexDrop,
- &PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network(), PeerListenPort: dialUdpAddr.Port}},
+ &PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}, PeerListenPort: dialUdpAddr.Port},
pexEvent{pexDrop, dialUdpAddr, pp.PexSupportsUtp, nil},
},
}
}
t.conns[c] = struct{}{}
t.cl.event.Broadcast()
+ // We'll never receive the "p" extended handshake parameter.
if !t.cl.config.DisablePEX && !c.PeerExtensionBytes.SupportsExtended() {
- t.pex.Add(c) // as no further extended handshake expected
+ t.pex.Add(c)
}
return nil
}
return nil
}
-func (t *Torrent) peerConnsWithRemoteAddrPort(addrPort netip.AddrPort) (ret []*PeerConn) {
+func (t *Torrent) peerConnsWithDialAddrPort(target netip.AddrPort) (ret []*PeerConn) {
for pc := range t.conns {
- addr := pc.remoteAddrPort()
- if !(addr.Ok && addr.Value == addrPort) {
+ dialAddr, err := pc.remoteDialAddrPort()
+ if err != nil {
+ continue
+ }
+ if dialAddr != target {
continue
}
ret = append(ret, pc)
case utHolepunch.Rendezvous:
t.logger.Printf("got holepunch rendezvous request for %v from %p", msg.AddrPort, sender)
sendMsg := sendUtHolepunchMsg
- targets := t.peerConnsWithRemoteAddrPort(msg.AddrPort)
+ senderAddrPort, err := sender.remoteDialAddrPort()
+ if err != nil {
+ sender.logger.Levelf(
+ log.Warning,
+ "error getting ut_holepunch rendezvous sender's dial address: %v",
+ err,
+ )
+ // There's no better error code. The sender's address itself is invalid. I don't see
+ // this error message being appropriate anywhere else anyway.
+ sendMsg(sender, utHolepunch.Error, msg.AddrPort, utHolepunch.NoSuchPeer)
+ }
+ targets := t.peerConnsWithDialAddrPort(msg.AddrPort)
if len(targets) == 0 {
sendMsg(sender, utHolepunch.Error, msg.AddrPort, utHolepunch.NotConnected)
return nil
continue
}
sendMsg(sender, utHolepunch.Connect, msg.AddrPort, 0)
- sendMsg(pc, utHolepunch.Connect, sender.remoteAddrPort().Unwrap(), 0)
+ sendMsg(pc, utHolepunch.Connect, senderAddrPort, 0)
}
return nil
case utHolepunch.Connect:
"testing"
"testing/iotest"
+ "github.com/anacrolix/log"
+
"github.com/anacrolix/torrent/internal/testutil"
qt "github.com/frankban/quicktest"
"github.com/stretchr/testify/assert"
cfg.MaxAllocPeerRequestDataPerConn = 4
cfg.DataDir = greetingTempDir
cfg.DisablePEX = true
- //cfg.Debug = true
+ cfg.Debug = true
cfg.AcceptPeerConnections = false
+ //cfg.DisableUTP = true
seeder, err := NewClient(cfg)
require.NoError(t, err)
defer seeder.Close()
cfg.Seed = false
cfg.DataDir = t.TempDir()
cfg.MaxAllocPeerRequestDataPerConn = 4
- cfg.Debug = true
+ //cfg.Debug = true
//cfg.DisableUTP = true
leecherLeecher, _ := NewClient(cfg)
require.NoError(t, err)
waitForConns(seederTorrent)
go llg.AddClientPeer(leecher)
waitForConns(llg)
+ //time.Sleep(time.Second)
llg.cl.lock()
+ targetAddr := seeder.ListenAddrs()[1]
+ log.Printf("trying to initiate to %v", targetAddr)
llg.initiateConn(PeerInfo{
- Addr: seeder.ListenAddrs()[0],
+ Addr: targetAddr,
}, true, false)
llg.cl.unlock()
wg.Wait()