]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Remove conntrack, expose Torrent.AnnounceToDht, ClientConfig.PeriodicallyAnnounceTorr...
authorMatt Joiner <anacrolix@gmail.com>
Fri, 18 Jun 2021 01:05:23 +0000 (11:05 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 21 Jun 2021 02:13:53 +0000 (12:13 +1000)
client.go
config.go
torrent.go

index 2920c2630da4784c8cd7e8841201524882d2a548..a8d93319def3ff36a07efd1ac6db926a02cbba9f 100644 (file)
--- a/client.go
+++ b/client.go
@@ -22,7 +22,6 @@ import (
        "github.com/anacrolix/missinggo/slices"
        "github.com/anacrolix/missinggo/v2"
        "github.com/anacrolix/missinggo/v2/bitmap"
-       "github.com/anacrolix/missinggo/v2/conntrack"
        "github.com/anacrolix/missinggo/v2/pproffd"
        "github.com/anacrolix/sync"
        "github.com/davecgh/go-spew/spew"
@@ -257,7 +256,7 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) {
        if !cfg.NoDHT {
                for _, s := range sockets {
                        if pc, ok := s.(net.PacketConn); ok {
-                               ds, err := cl.newAnacrolixDhtServer(pc)
+                               ds, err := cl.NewAnacrolixDhtServer(pc)
                                if err != nil {
                                        panic(err)
                                }
@@ -363,7 +362,8 @@ func (cl *Client) listenNetworks() (ns []network) {
        return
 }
 
-func (cl *Client) newAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
+// Creates an anacrolix/dht Server, as would be done internally in NewClient, for the given conn.
+func (cl *Client) NewAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
        cfg := dht.ServerConfig{
                IPBlocklist:    cl.ipBlockList,
                Conn:           conn,
@@ -374,10 +374,9 @@ func (cl *Client) newAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err
                        }
                        return cl.config.PublicIp4
                }(),
-               StartingNodes:      cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
-               ConnectionTracking: cl.config.ConnTracker,
-               OnQuery:            cl.config.DHTOnQuery,
-               Logger:             cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())),
+               StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
+               OnQuery:       cl.config.DHTOnQuery,
+               Logger:        cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())),
        }
        if f := cl.config.ConfigureAnacrolixDhtServer; f != nil {
                f(&cfg)
@@ -639,21 +638,6 @@ func (cl *Client) dialFirst(ctx context.Context, addr string) (res dialResult) {
 }
 
 func (cl *Client) dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
-       network := s.LocalAddr().Network()
-       cte := cl.config.ConnTracker.Wait(
-               ctx,
-               conntrack.Entry{network, s.LocalAddr().String(), addr},
-               "dial torrent client",
-               0,
-       )
-       // Try to avoid committing to a dial if the context is complete as it's difficult to determine
-       // which dial errors allow us to forget the connection tracking entry handle.
-       if ctx.Err() != nil {
-               if cte != nil {
-                       cte.Forget()
-               }
-               return nil
-       }
        c, err := s.Dial(ctx, addr)
        // This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
        // it now in case we close the connection forthwith.
@@ -661,19 +645,7 @@ func (cl *Client) dialFromSocket(ctx context.Context, s Dialer, addr string) net
                tc.SetLinger(0)
        }
        countDialResult(err)
-       if c == nil {
-               if err != nil && forgettableDialError(err) {
-                       cte.Forget()
-               } else {
-                       cte.Done()
-               }
-               return nil
-       }
-       return closeWrapper{c, func() error {
-               err := c.Close()
-               cte.Done()
-               return err
-       }}
+       return c
 }
 
 func forgettableDialError(err error) bool {
@@ -1180,7 +1152,9 @@ func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStor
 
        t = cl.newTorrent(infoHash, specStorage)
        cl.eachDhtServer(func(s DhtServer) {
-               go t.dhtAnnouncer(s)
+               if cl.config.PeriodicallyAnnounceTorrentsToDht {
+                       go t.dhtAnnouncer(s)
+               }
        })
        cl.torrents[infoHash] = t
        cl.clearAcceptLimits()
index 37b2b7145041bc6dfb25466ecde54eb3c6fbcde2..b687f05de5426d064167fd528cc04a6093ec3214 100644 (file)
--- a/config.go
+++ b/config.go
@@ -11,7 +11,6 @@ import (
        "github.com/anacrolix/log"
        "github.com/anacrolix/missinggo"
        "github.com/anacrolix/missinggo/expect"
-       "github.com/anacrolix/missinggo/v2/conntrack"
        "golang.org/x/time/rate"
 
        "github.com/anacrolix/torrent/iplist"
@@ -38,7 +37,8 @@ type ClientConfig struct {
        NoDHT            bool `long:"disable-dht"`
        DhtStartingNodes func(network string) dht.StartingNodesGetter
        // Called for each anacrolix/dht Server created for the Client.
-       ConfigureAnacrolixDhtServer func(*dht.ServerConfig)
+       ConfigureAnacrolixDhtServer       func(*dht.ServerConfig)
+       PeriodicallyAnnounceTorrentsToDht bool
 
        // Never send chunks to peers.
        NoUpload bool `long:"no-upload"`
@@ -134,8 +134,6 @@ type ClientConfig struct {
        // we don't intend to obtain all of a torrent's data.
        DropMutuallyCompletePeers bool
 
-       ConnTracker *conntrack.Instance
-
        // OnQuery hook func
        DHTOnQuery func(query *krpc.Msg, source net.Addr) (propagate bool)
 
@@ -172,12 +170,12 @@ func NewDefaultClientConfig() *ClientConfig {
                DhtStartingNodes: func(network string) dht.StartingNodesGetter {
                        return func() ([]dht.Addr, error) { return dht.GlobalBootstrapAddrs(network) }
                },
-               ListenHost:                func(string) string { return "" },
-               UploadRateLimiter:         unlimited,
-               DownloadRateLimiter:       unlimited,
-               ConnTracker:               conntrack.NewInstance(),
-               DisableAcceptRateLimiting: true,
-               DropMutuallyCompletePeers: true,
+               PeriodicallyAnnounceTorrentsToDht: true,
+               ListenHost:                        func(string) string { return "" },
+               UploadRateLimiter:                 unlimited,
+               DownloadRateLimiter:               unlimited,
+               DisableAcceptRateLimiting:         true,
+               DropMutuallyCompletePeers:         true,
                HeaderObfuscationPolicy: HeaderObfuscationPolicy{
                        Preferred:        true,
                        RequirePreferred: false,
index b4dda85d740acbb6e275e6f116187d762f6f4241..c4aae8fe4b269547f98d149338f34c55808ae7f3 100644 (file)
@@ -1643,17 +1643,33 @@ func (t *Torrent) consumeDhtAnnouncePeers(pvs <-chan dht.PeersValues) {
        }
 }
 
-func (t *Torrent) announceToDht(impliedPort bool, s DhtServer) error {
-       ps, err := s.Announce(t.infoHash, t.cl.incomingPeerPort(), impliedPort)
+// Announce using the provided DHT server. Peers are consumed automatically. done is closed when the
+// announce ends. stop will force the announce to end.
+func (t *Torrent) AnnounceToDht(s DhtServer) (done <-chan struct{}, stop func(), err error) {
+       ps, err := s.Announce(t.infoHash, t.cl.incomingPeerPort(), true)
+       if err != nil {
+               return
+       }
+       _done := make(chan struct{})
+       done = _done
+       stop = ps.Close
+       go func() {
+               t.consumeDhtAnnouncePeers(ps.Peers())
+               close(_done)
+       }()
+       return
+}
+
+func (t *Torrent) announceToDht(s DhtServer) error {
+       _, stop, err := t.AnnounceToDht(s)
        if err != nil {
                return err
        }
-       go t.consumeDhtAnnouncePeers(ps.Peers())
        select {
        case <-t.closed.LockedChan(t.cl.locker()):
        case <-time.After(5 * time.Minute):
        }
-       ps.Close()
+       stop()
        return nil
 }
 
@@ -1681,7 +1697,7 @@ func (t *Torrent) dhtAnnouncer(s DhtServer) {
                        t.numDHTAnnounces++
                        cl.unlock()
                        defer cl.lock()
-                       err := t.announceToDht(true, s)
+                       err := t.announceToDht(s)
                        if err != nil {
                                t.logger.WithDefaultLevel(log.Warning).Printf("error announcing %q to DHT: %s", t, err)
                        }