"strings"
"time"
- "github.com/davecgh/go-spew/spew"
- "github.com/dustin/go-humanize"
- gbtree "github.com/google/btree"
- "github.com/pion/datachannel"
- "golang.org/x/time/rate"
-
"github.com/anacrolix/chansync"
"github.com/anacrolix/chansync/events"
"github.com/anacrolix/dht/v2"
"github.com/anacrolix/dht/v2/krpc"
- "github.com/anacrolix/generics"
. "github.com/anacrolix/generics"
+ g "github.com/anacrolix/generics"
"github.com/anacrolix/log"
"github.com/anacrolix/missinggo/perf"
"github.com/anacrolix/missinggo/v2"
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/missinggo/v2/pproffd"
"github.com/anacrolix/sync"
+ "github.com/davecgh/go-spew/spew"
+ "github.com/dustin/go-humanize"
+ gbtree "github.com/google/btree"
+ "golang.org/x/time/rate"
"github.com/anacrolix/torrent/bencode"
"github.com/anacrolix/torrent/internal/limiter"
pp "github.com/anacrolix/torrent/peer_protocol"
request_strategy "github.com/anacrolix/torrent/request-strategy"
"github.com/anacrolix/torrent/storage"
- "github.com/anacrolix/torrent/tracker"
- "github.com/anacrolix/torrent/webtorrent"
+ "github.com/anacrolix/torrent/types/infohash"
)
// Clients contain zero or more Torrents. A Client manages a blocklist, the
dialRateLimiter *rate.Limiter
numHalfOpen int
- websocketTrackers websocketTrackers
-
activeAnnounceLimiter limiter.Instance
httpClient *http.Client
}
w,
"%f%% of %d bytes (%s)",
100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())),
- *t.length,
- humanize.Bytes(uint64(*t.length)))
+ t.length(),
+ humanize.Bytes(uint64(t.length())))
} else {
w.WriteString("<missing metainfo>")
}
// Initializes a bare minimum Client. *Client and *ClientConfig must not be nil.
func (cl *Client) init(cfg *ClientConfig) {
cl.config = cfg
- generics.MakeMap(&cl.dopplegangerAddrs)
+ g.MakeMap(&cl.dopplegangerAddrs)
cl.torrents = make(map[metainfo.Hash]*Torrent)
cl.dialRateLimiter = rate.NewLimiter(10, 10)
cl.activeAnnounceLimiter.SlotsPerKey = 2
cl.ipBlockList = cfg.IPBlocklist
cl.httpClient = &http.Client{
Transport: &http.Transport{
- Proxy: cfg.HTTPProxy,
+ Proxy: cfg.HTTPProxy,
+ DialContext: cfg.HTTPDialContext,
// I think this value was observed from some webseeds. It seems reasonable to extend it
// to other uses of HTTP from the client.
MaxConnsPerHost: 10,
}
}
- cl.websocketTrackers = websocketTrackers{
- PeerId: cl.peerID,
- Logger: cl.logger,
- GetAnnounceRequest: func(event tracker.AnnounceEvent, infoHash [20]byte) (tracker.AnnounceRequest, error) {
- cl.lock()
- defer cl.unlock()
- t, ok := cl.torrents[infoHash]
- if !ok {
- return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
- }
- return t.announceRequest(event), nil
- },
- Proxy: cl.config.HTTPProxy,
- OnConn: func(dc datachannel.ReadWriteCloser, dcc webtorrent.DataChannelContext) {
- cl.lock()
- defer cl.unlock()
- t, ok := cl.torrents[dcc.InfoHash]
- if !ok {
- cl.logger.WithDefaultLevel(log.Warning).Printf(
- "got webrtc conn for unloaded torrent with infohash %x",
- dcc.InfoHash,
- )
- dc.Close()
- return
- }
- go t.onWebRtcConn(dc, dcc)
- },
- }
-
return
}
}
s, err = dht.NewServer(&cfg)
if err == nil {
- go func() {
- ts, err := s.Bootstrap()
- if err != nil {
- logger.Levelf(log.Warning, "error bootstrapping dht: %s", err)
- }
- logger.Levelf(log.Debug, "completed bootstrap: %+v", ts)
- }()
+ go s.TableMaintainer()
}
return
}
if tc, ok := nc.(*net.TCPConn); ok {
tc.SetLinger(0)
}
- c := cl.newConnection(nc, false, nc.RemoteAddr(), nc.RemoteAddr().Network(),
- regularNetConnPeerConnConnString(nc))
+ remoteAddr, _ := tryIpPortFromNetAddr(nc.RemoteAddr())
+ c := cl.newConnection(
+ nc,
+ newConnectionOpts{
+ outgoing: false,
+ remoteAddr: nc.RemoteAddr(),
+ localPublicAddr: cl.publicAddr(remoteAddr.IP),
+ network: nc.RemoteAddr().Network(),
+ connString: regularNetConnPeerConnConnString(nc),
+ })
defer func() {
cl.lock()
defer cl.unlock()
// Returns a handle to the given torrent, if it's present in the client.
func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
- cl.lock()
- defer cl.unlock()
+ cl.rLock()
+ defer cl.rUnlock()
t, ok = cl.torrents[ih]
return
}
ctx context.Context,
nc net.Conn,
t *Torrent,
- outgoing, encryptHeader bool,
- remoteAddr PeerRemoteAddr,
- network, connString string,
+ encryptHeader bool,
+ newConnOpts newConnectionOpts,
) (
c *PeerConn, err error,
) {
- c = cl.newConnection(nc, outgoing, remoteAddr, network, connString)
+ c = cl.newConnection(nc, newConnOpts)
c.headerEncrypted = encryptHeader
ctx, cancel := context.WithTimeout(ctx, cl.config.HandshakesTimeout)
defer cancel()
}
return nil, errors.New("dial failed")
}
- c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, true, obfuscatedHeader, addr, dr.Dialer.DialerNetwork(), regularNetConnPeerConnConnString(nc))
+ addrIpPort, _ := tryIpPortFromNetAddr(addr)
+ c, err := cl.initiateProtocolHandshakes(context.Background(), nc, t, obfuscatedHeader, newConnectionOpts{
+ outgoing: true,
+ remoteAddr: addr,
+ // It would be possible to retrieve a public IP from the dialer used here?
+ localPublicAddr: cl.publicAddr(addrIpPort.IP),
+ network: dr.Dialer.DialerNetwork(),
+ connString: regularNetConnPeerConnConnString(nc),
+ })
if err != nil {
nc.Close()
}
cl.noLongerHalfOpen(t, addr.String())
if err != nil {
if cl.config.Debug {
- cl.logger.Printf("error establishing outgoing connection to %v: %v", addr, err)
+ cl.logger.Levelf(log.Debug, "error establishing outgoing connection to %v: %v", addr, err)
}
return
}
return fmt.Errorf("adding connection: %w", err)
}
defer t.dropConnection(c)
- c.startWriter()
+ c.startMessageWriter()
cl.sendInitialMessages(c, t)
c.initUpdateRequestsTimer()
err := c.mainReadLoop()
// Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
// instructs the amount of memory that might be used to cache pending writes. Assuming 512KiB
// (1<<19) cached for sending, for 16KiB (1<<14) chunks.
-const localClientReqq = 1 << 5
+const localClientReqq = 1024
// See the order given in Transmission's tr_peerMsgsNew.
func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
Port: cl.incomingPeerPort(),
MetadataSize: torrent.metadataSize(),
- // TODO: We can figured these out specific to the socket
- // used.
+ // TODO: We can figure these out specific to the socket used.
Ipv4: pp.CompactIp(cl.config.PublicIp4.To4()),
Ipv6: cl.config.PublicIp6.To16(),
}
return
}
-// Adds a torrent by InfoHash with a custom Storage implementation.
-// If the torrent already exists then this Storage is ignored and the
-// existing torrent returned with `new` set to `false`
+// Adds a torrent by InfoHash with a custom Storage implementation. If the torrent already exists
+// then this Storage is ignored and the existing torrent returned with `new` set to `false`.
func (cl *Client) AddTorrentOpt(opts AddTorrentOpts) (t *Torrent, new bool) {
infoHash := opts.InfoHash
cl.lock()
}
})
cl.torrents[infoHash] = t
+ t.setInfoBytesLocked(opts.InfoBytes)
cl.clearAcceptLimits()
t.updateWantPeersEvent()
// Tickle Client.waitAccept, new torrent may want conns.
}
type AddTorrentOpts struct {
- InfoHash InfoHash
+ InfoHash infohash.T
Storage storage.ClientImpl
ChunkSize pp.Integer
+ InfoBytes []byte
}
// Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
return
}
-type stringAddr string
-
-var _ net.Addr = stringAddr("")
-
-func (stringAddr) Network() string { return "" }
-func (me stringAddr) String() string { return string(me) }
-
// The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
// spec.DisallowDataDownload/Upload will be read and applied
// The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
}
for _, peerAddr := range spec.PeerAddrs {
t.addPeer(PeerInfo{
- Addr: stringAddr(peerAddr),
+ Addr: StringAddr(peerAddr),
Source: PeerSourceDirect,
Trusted: true,
})
// Returns handles to all the torrents loaded in the Client.
func (cl *Client) Torrents() []*Torrent {
- cl.lock()
- defer cl.unlock()
+ cl.rLock()
+ defer cl.rUnlock()
return cl.torrentsAsSlice()
}
if !ok {
panic(ip)
}
- generics.MakeMapIfNilAndSet(&cl.badPeerIPs, ipAddr, struct{}{})
+ g.MakeMapIfNilAndSet(&cl.badPeerIPs, ipAddr, struct{}{})
for _, t := range cl.torrents {
t.iterPeers(func(p *Peer) {
if p.remoteIp().Equal(ip) {
}
}
-func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr PeerRemoteAddr, network, connString string) (c *PeerConn) {
- if network == "" {
- panic(remoteAddr)
+type newConnectionOpts struct {
+ outgoing bool
+ remoteAddr PeerRemoteAddr
+ localPublicAddr peerLocalPublicAddr
+ network string
+ connString string
+}
+
+func (cl *Client) newConnection(nc net.Conn, opts newConnectionOpts) (c *PeerConn) {
+ if opts.network == "" {
+ panic(opts.remoteAddr)
}
c = &PeerConn{
Peer: Peer{
- outgoing: outgoing,
+ outgoing: opts.outgoing,
choking: true,
peerChoking: true,
PeerMaxRequests: 250,
- RemoteAddr: remoteAddr,
- Network: network,
- callbacks: &cl.config.Callbacks,
+ RemoteAddr: opts.remoteAddr,
+ localPublicAddr: opts.localPublicAddr,
+ Network: opts.network,
+ callbacks: &cl.config.Callbacks,
},
- connString: connString,
+ connString: opts.connString,
conn: nc,
}
+ c.peerRequestDataAllocLimiter.Max = cl.config.MaxAllocPeerRequestDataPerConn
c.initRequestState()
// TODO: Need to be much more explicit about this, including allowing non-IP bannable addresses.
- if remoteAddr != nil {
- netipAddrPort, err := netip.ParseAddrPort(remoteAddr.String())
+ if opts.remoteAddr != nil {
+ netipAddrPort, err := netip.ParseAddrPort(opts.remoteAddr.String())
if err == nil {
c.bannableAddr = Some(netipAddrPort.Addr())
}
l: cl.config.DownloadRateLimiter,
r: c.r,
}
- c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", remoteAddr, network, outgoing)
+ c.logger.WithDefaultLevel(log.Debug).Printf("initialized with remote %v over network %v (outgoing=%t)", opts.remoteAddr, opts.network, opts.outgoing)
for _, f := range cl.config.Callbacks.NewPeer {
f(&c.Peer)
}
return
}
+func (cl *Client) PublicIPs() (ips []net.IP) {
+ if ip := cl.config.PublicIp4; ip != nil {
+ ips = append(ips, ip)
+ }
+ if ip := cl.config.PublicIp6; ip != nil {
+ ips = append(ips, ip)
+ }
+ return
+}
+
func (cl *Client) onBadAccept(addr PeerRemoteAddr) {
ipa, ok := tryIpPortFromNetAddr(addr)
if !ok {