"github.com/anacrolix/missinggo/slices"
"github.com/anacrolix/missinggo/v2"
"github.com/anacrolix/missinggo/v2/bitmap"
- "github.com/anacrolix/missinggo/v2/conntrack"
"github.com/anacrolix/missinggo/v2/pproffd"
"github.com/anacrolix/sync"
"github.com/davecgh/go-spew/spew"
if !cfg.NoDHT {
for _, s := range sockets {
if pc, ok := s.(net.PacketConn); ok {
- ds, err := cl.newAnacrolixDhtServer(pc)
+ ds, err := cl.NewAnacrolixDhtServer(pc)
if err != nil {
panic(err)
}
return
}
-func (cl *Client) newAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
+// Creates an anacrolix/dht Server, as would be done internally in NewClient, for the given conn.
+func (cl *Client) NewAnacrolixDhtServer(conn net.PacketConn) (s *dht.Server, err error) {
cfg := dht.ServerConfig{
IPBlocklist: cl.ipBlockList,
Conn: conn,
}
return cl.config.PublicIp4
}(),
- StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
- ConnectionTracking: cl.config.ConnTracker,
- OnQuery: cl.config.DHTOnQuery,
- Logger: cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())),
+ StartingNodes: cl.config.DhtStartingNodes(conn.LocalAddr().Network()),
+ OnQuery: cl.config.DHTOnQuery,
+ Logger: cl.logger.WithContextText(fmt.Sprintf("dht server on %v", conn.LocalAddr().String())),
}
if f := cl.config.ConfigureAnacrolixDhtServer; f != nil {
f(&cfg)
}
func (cl *Client) dialFromSocket(ctx context.Context, s Dialer, addr string) net.Conn {
- network := s.LocalAddr().Network()
- cte := cl.config.ConnTracker.Wait(
- ctx,
- conntrack.Entry{network, s.LocalAddr().String(), addr},
- "dial torrent client",
- 0,
- )
- // Try to avoid committing to a dial if the context is complete as it's difficult to determine
- // which dial errors allow us to forget the connection tracking entry handle.
- if ctx.Err() != nil {
- if cte != nil {
- cte.Forget()
- }
- return nil
- }
c, err := s.Dial(ctx, addr)
// This is a bit optimistic, but it looks non-trivial to thread this through the proxy code. Set
// it now in case we close the connection forthwith.
tc.SetLinger(0)
}
countDialResult(err)
- if c == nil {
- if err != nil && forgettableDialError(err) {
- cte.Forget()
- } else {
- cte.Done()
- }
- return nil
- }
- return closeWrapper{c, func() error {
- err := c.Close()
- cte.Done()
- return err
- }}
+ return c
}
func forgettableDialError(err error) bool {
t = cl.newTorrent(infoHash, specStorage)
cl.eachDhtServer(func(s DhtServer) {
- go t.dhtAnnouncer(s)
+ if cl.config.PeriodicallyAnnounceTorrentsToDht {
+ go t.dhtAnnouncer(s)
+ }
})
cl.torrents[infoHash] = t
cl.clearAcceptLimits()
"github.com/anacrolix/log"
"github.com/anacrolix/missinggo"
"github.com/anacrolix/missinggo/expect"
- "github.com/anacrolix/missinggo/v2/conntrack"
"golang.org/x/time/rate"
"github.com/anacrolix/torrent/iplist"
NoDHT bool `long:"disable-dht"`
DhtStartingNodes func(network string) dht.StartingNodesGetter
// Called for each anacrolix/dht Server created for the Client.
- ConfigureAnacrolixDhtServer func(*dht.ServerConfig)
+ ConfigureAnacrolixDhtServer func(*dht.ServerConfig)
+ PeriodicallyAnnounceTorrentsToDht bool
// Never send chunks to peers.
NoUpload bool `long:"no-upload"`
// we don't intend to obtain all of a torrent's data.
DropMutuallyCompletePeers bool
- ConnTracker *conntrack.Instance
-
// OnQuery hook func
DHTOnQuery func(query *krpc.Msg, source net.Addr) (propagate bool)
DhtStartingNodes: func(network string) dht.StartingNodesGetter {
return func() ([]dht.Addr, error) { return dht.GlobalBootstrapAddrs(network) }
},
- ListenHost: func(string) string { return "" },
- UploadRateLimiter: unlimited,
- DownloadRateLimiter: unlimited,
- ConnTracker: conntrack.NewInstance(),
- DisableAcceptRateLimiting: true,
- DropMutuallyCompletePeers: true,
+ PeriodicallyAnnounceTorrentsToDht: true,
+ ListenHost: func(string) string { return "" },
+ UploadRateLimiter: unlimited,
+ DownloadRateLimiter: unlimited,
+ DisableAcceptRateLimiting: true,
+ DropMutuallyCompletePeers: true,
HeaderObfuscationPolicy: HeaderObfuscationPolicy{
Preferred: true,
RequirePreferred: false,
}
}
-func (t *Torrent) announceToDht(impliedPort bool, s DhtServer) error {
- ps, err := s.Announce(t.infoHash, t.cl.incomingPeerPort(), impliedPort)
+// Announce using the provided DHT server. Peers are consumed automatically. done is closed when the
+// announce ends. stop will force the announce to end.
+func (t *Torrent) AnnounceToDht(s DhtServer) (done <-chan struct{}, stop func(), err error) {
+ ps, err := s.Announce(t.infoHash, t.cl.incomingPeerPort(), true)
+ if err != nil {
+ return
+ }
+ _done := make(chan struct{})
+ done = _done
+ stop = ps.Close
+ go func() {
+ t.consumeDhtAnnouncePeers(ps.Peers())
+ close(_done)
+ }()
+ return
+}
+
+func (t *Torrent) announceToDht(s DhtServer) error {
+ _, stop, err := t.AnnounceToDht(s)
if err != nil {
return err
}
- go t.consumeDhtAnnouncePeers(ps.Peers())
select {
case <-t.closed.LockedChan(t.cl.locker()):
case <-time.After(5 * time.Minute):
}
- ps.Close()
+ stop()
return nil
}
t.numDHTAnnounces++
cl.unlock()
defer cl.lock()
- err := t.announceToDht(true, s)
+ err := t.announceToDht(s)
if err != nil {
t.logger.WithDefaultLevel(log.Warning).Printf("error announcing %q to DHT: %s", t, err)
}