From: Matt Joiner Date: Fri, 18 Jun 2021 01:05:23 +0000 (+1000) Subject: Remove conntrack, expose Torrent.AnnounceToDht, ClientConfig.PeriodicallyAnnounceTorr... X-Git-Tag: v1.29.0~30 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=af1ca91e04f41ea98a844a78841fd8581143535b;p=btrtrc.git Remove conntrack, expose Torrent.AnnounceToDht, ClientConfig.PeriodicallyAnnounceTorrentsToDht --- diff --git a/client.go b/client.go index 2920c263..a8d93319 100644 --- a/client.go +++ b/client.go @@ -22,7 +22,6 @@ import ( "github.com/anacrolix/missinggo/slices" "github.com/anacrolix/missinggo/v2" "github.com/anacrolix/missinggo/v2/bitmap" - "github.com/anacrolix/missinggo/v2/conntrack" "github.com/anacrolix/missinggo/v2/pproffd" "github.com/anacrolix/sync" "github.com/davecgh/go-spew/spew" @@ -257,7 +256,7 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) { if !cfg.NoDHT { for _, s := range sockets { if pc, ok := s.(net.PacketConn); ok { - ds, err := cl.newAnacrolixDhtServer(pc) + ds, err := cl.NewAnacrolixDhtServer(pc) if err != nil { panic(err) } @@ -363,7 +362,8 @@ func (cl *Client) listenNetworks() (ns []network) { return } -func (cl *Client) newAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) { +// Creates an anacrolix/dht Server, as would be done internally in NewClient, for the given conn. +func (cl *Client) NewAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) { cfg := dht.ServerConfig{ IPBlocklist: cl.ipBlockList, Conn: conn, @@ -374,10 +374,9 @@ func (cl *Client) newAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err } return cl.config.PublicIp4 }(), - StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()), - ConnectionTracking: cl.config.ConnTracker, - OnQuery: cl.config.DHTOnQuery, - Logger: cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())), + StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()), + OnQuery: cl.config.DHTOnQuery, + Logger: cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())), } if f := cl.config.ConfigureAnacrolixDhtServer; f != nil { f(&cfg) @@ -639,21 +638,6 @@ func (cl *Client) dialFirst(ctx context.Context, addr string) (res dialResult) { } func (cl *Client) dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn { - network := s.LocalAddr().Network() - cte := cl.config.ConnTracker.Wait( - ctx, - conntrack.Entry{network, s.LocalAddr().String(), addr}, - "dial torrent client", - 0, - ) - // Try to avoid committing to a dial if the context is complete as it's difficult to determine - // which dial errors allow us to forget the connection tracking entry handle. - if ctx.Err() != nil { - if cte != nil { - cte.Forget() - } - return nil - } c, err := s.Dial(ctx, addr) // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set // it now in case we close the connection forthwith. @@ -661,19 +645,7 @@ func (cl *Client) dialFromSocket(ctx context.Context, s Dialer, addr string) net tc.SetLinger(0) } countDialResult(err) - if c == nil { - if err != nil && forgettableDialError(err) { - cte.Forget() - } else { - cte.Done() - } - return nil - } - return closeWrapper{c, func() error { - err := c.Close() - cte.Done() - return err - }} + return c } func forgettableDialError(err error) bool { @@ -1180,7 +1152,9 @@ func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStor t = cl.newTorrent(infoHash, specStorage) cl.eachDhtServer(func(s DhtServer) { - go t.dhtAnnouncer(s) + if cl.config.PeriodicallyAnnounceTorrentsToDht { + go t.dhtAnnouncer(s) + } }) cl.torrents[infoHash] = t cl.clearAcceptLimits() diff --git a/config.go b/config.go index 37b2b714..b687f05d 100644 --- a/config.go +++ b/config.go @@ -11,7 +11,6 @@ import ( "github.com/anacrolix/log" "github.com/anacrolix/missinggo" "github.com/anacrolix/missinggo/expect" - "github.com/anacrolix/missinggo/v2/conntrack" "golang.org/x/time/rate" "github.com/anacrolix/torrent/iplist" @@ -38,7 +37,8 @@ type ClientConfig struct { NoDHT bool `long:"disable-dht"` DhtStartingNodes func(network string) dht.StartingNodesGetter // Called for each anacrolix/dht Server created for the Client. - ConfigureAnacrolixDhtServer func(*dht.ServerConfig) + ConfigureAnacrolixDhtServer func(*dht.ServerConfig) + PeriodicallyAnnounceTorrentsToDht bool // Never send chunks to peers. NoUpload bool `long:"no-upload"` @@ -134,8 +134,6 @@ type ClientConfig struct { // we don't intend to obtain all of a torrent's data. DropMutuallyCompletePeers bool - ConnTracker *conntrack.Instance - // OnQuery hook func DHTOnQuery func(query *krpc.Msg, source net.Addr) (propagate bool) @@ -172,12 +170,12 @@ func NewDefaultClientConfig() *ClientConfig { DhtStartingNodes: func(network string) dht.StartingNodesGetter { return func() ([]dht.Addr, error) { return dht.GlobalBootstrapAddrs(network) } }, - ListenHost: func(string) string { return "" }, - UploadRateLimiter: unlimited, - DownloadRateLimiter: unlimited, - ConnTracker: conntrack.NewInstance(), - DisableAcceptRateLimiting: true, - DropMutuallyCompletePeers: true, + PeriodicallyAnnounceTorrentsToDht: true, + ListenHost: func(string) string { return "" }, + UploadRateLimiter: unlimited, + DownloadRateLimiter: unlimited, + DisableAcceptRateLimiting: true, + DropMutuallyCompletePeers: true, HeaderObfuscationPolicy: HeaderObfuscationPolicy{ Preferred: true, RequirePreferred: false, diff --git a/torrent.go b/torrent.go index b4dda85d..c4aae8fe 100644 --- a/torrent.go +++ b/torrent.go @@ -1643,17 +1643,33 @@ func (t *Torrent) consumeDhtAnnouncePeers(pvs <-chan dht.PeersValues) { } } -func (t *Torrent) announceToDht(impliedPort bool, s DhtServer) error { - ps, err := s.Announce(t.infoHash, t.cl.incomingPeerPort(), impliedPort) +// Announce using the provided DHT server. Peers are consumed automatically. done is closed when the +// announce ends. stop will force the announce to end. +func (t *Torrent) AnnounceToDht(s DhtServer) (done <-chan struct{}, stop func(), err error) { + ps, err := s.Announce(t.infoHash, t.cl.incomingPeerPort(), true) + if err != nil { + return + } + _done := make(chan struct{}) + done = _done + stop = ps.Close + go func() { + t.consumeDhtAnnouncePeers(ps.Peers()) + close(_done) + }() + return +} + +func (t *Torrent) announceToDht(s DhtServer) error { + _, stop, err := t.AnnounceToDht(s) if err != nil { return err } - go t.consumeDhtAnnouncePeers(ps.Peers()) select { case <-t.closed.LockedChan(t.cl.locker()): case <-time.After(5 * time.Minute): } - ps.Close() + stop() return nil } @@ -1681,7 +1697,7 @@ func (t *Torrent) dhtAnnouncer(s DhtServer) { t.numDHTAnnounces++ cl.unlock() defer cl.lock() - err := t.announceToDht(true, s) + err := t.announceToDht(s) if err != nil { t.logger.WithDefaultLevel(log.Warning).Printf("error announcing %q to DHT: %s", t, err) }