]> Sergey Matveev's repositories - btrtrc.git/blob - client.go
5020e07b41ec0451007529c934ec147c7bad2ed8
[btrtrc.git] / client.go
1 package torrent
2
3 import (
4         "bufio"
5         "bytes"
6         "context"
7         "crypto/rand"
8         "encoding/hex"
9         "errors"
10         "expvar"
11         "fmt"
12         "io"
13         "log"
14         "net"
15         "net/url"
16         "strconv"
17         "strings"
18         "time"
19
20         "github.com/anacrolix/dht"
21         "github.com/anacrolix/dht/krpc"
22         "github.com/anacrolix/missinggo"
23         "github.com/anacrolix/missinggo/pproffd"
24         "github.com/anacrolix/missinggo/pubsub"
25         "github.com/anacrolix/missinggo/slices"
26         "github.com/anacrolix/sync"
27         "github.com/dustin/go-humanize"
28         "golang.org/x/time/rate"
29
30         "github.com/anacrolix/torrent/bencode"
31         "github.com/anacrolix/torrent/iplist"
32         "github.com/anacrolix/torrent/metainfo"
33         "github.com/anacrolix/torrent/mse"
34         pp "github.com/anacrolix/torrent/peer_protocol"
35         "github.com/anacrolix/torrent/storage"
36 )
37
38 // Clients contain zero or more Torrents. A Client manages a blocklist, the
39 // TCP/UDP protocol ports, and DHT as desired.
40 type Client struct {
41         mu     sync.RWMutex
42         event  sync.Cond
43         closed missinggo.Event
44
45         config Config
46
47         halfOpenLimit  int
48         peerID         [20]byte
49         defaultStorage *storage.Client
50         onClose        []func()
51         tcpListener    net.Listener
52         utpSock        utpSocket
53         dHT            *dht.Server
54         ipBlockList    iplist.Ranger
55         // Our BitTorrent protocol extension bytes, sent in our BT handshakes.
56         extensionBytes peerExtensionBytes
57         // The net.Addr.String part that should be common to all active listeners.
58         listenAddr    string
59         uploadLimit   *rate.Limiter
60         downloadLimit *rate.Limiter
61
62         // Set of addresses that have our client ID. This intentionally will
63         // include ourselves if we end up trying to connect to our own address
64         // through legitimate channels.
65         dopplegangerAddrs map[string]struct{}
66         badPeerIPs        map[string]struct{}
67         torrents          map[metainfo.Hash]*Torrent
68 }
69
70 func (cl *Client) BadPeerIPs() []string {
71         cl.mu.RLock()
72         defer cl.mu.RUnlock()
73         return cl.badPeerIPsLocked()
74 }
75
76 func (cl *Client) badPeerIPsLocked() []string {
77         return slices.FromMapKeys(cl.badPeerIPs).([]string)
78 }
79
80 func (cl *Client) IPBlockList() iplist.Ranger {
81         cl.mu.Lock()
82         defer cl.mu.Unlock()
83         return cl.ipBlockList
84 }
85
86 func (cl *Client) SetIPBlockList(list iplist.Ranger) {
87         cl.mu.Lock()
88         defer cl.mu.Unlock()
89         cl.ipBlockList = list
90         if cl.dHT != nil {
91                 cl.dHT.SetIPBlockList(list)
92         }
93 }
94
95 func (cl *Client) PeerID() string {
96         return string(cl.peerID[:])
97 }
98
99 type torrentAddr string
100
101 func (torrentAddr) Network() string { return "" }
102
103 func (me torrentAddr) String() string { return string(me) }
104
105 func (cl *Client) ListenAddr() net.Addr {
106         if cl.listenAddr == "" {
107                 return nil
108         }
109         return torrentAddr(cl.listenAddr)
110 }
111
112 // Writes out a human readable status of the client, such as for writing to a
113 // HTTP status page.
114 func (cl *Client) WriteStatus(_w io.Writer) {
115         cl.mu.Lock()
116         defer cl.mu.Unlock()
117         w := bufio.NewWriter(_w)
118         defer w.Flush()
119         if addr := cl.ListenAddr(); addr != nil {
120                 fmt.Fprintf(w, "Listening on %s\n", addr)
121         } else {
122                 fmt.Fprintln(w, "Not listening!")
123         }
124         fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
125         fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
126         if dht := cl.DHT(); dht != nil {
127                 dhtStats := dht.Stats()
128                 fmt.Fprintf(w, "DHT nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
129                 fmt.Fprintf(w, "DHT Server ID: %x\n", dht.ID())
130                 fmt.Fprintf(w, "DHT port: %d\n", missinggo.AddrPort(dht.Addr()))
131                 fmt.Fprintf(w, "DHT announces: %d\n", dhtStats.ConfirmedAnnounces)
132                 fmt.Fprintf(w, "Outstanding transactions: %d\n", dhtStats.OutstandingTransactions)
133         }
134         fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
135         fmt.Fprintln(w)
136         for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
137                 return l.InfoHash().AsString() < r.InfoHash().AsString()
138         }).([]*Torrent) {
139                 if t.name() == "" {
140                         fmt.Fprint(w, "<unknown name>")
141                 } else {
142                         fmt.Fprint(w, t.name())
143                 }
144                 fmt.Fprint(w, "\n")
145                 if t.info != nil {
146                         fmt.Fprintf(w, "%f%% of %d bytes (%s)", 100*(1-float64(t.bytesMissingLocked())/float64(t.info.TotalLength())), t.length, humanize.Bytes(uint64(t.info.TotalLength())))
147                 } else {
148                         w.WriteString("<missing metainfo>")
149                 }
150                 fmt.Fprint(w, "\n")
151                 t.writeStatus(w)
152                 fmt.Fprintln(w)
153         }
154 }
155
156 func listenUTP(networkSuffix, addr string) (utpSocket, error) {
157         return NewUtpSocket("udp"+networkSuffix, addr)
158 }
159
160 func listenTCP(networkSuffix, addr string) (net.Listener, error) {
161         return net.Listen("tcp"+networkSuffix, addr)
162 }
163
164 func listenBothSameDynamicPort(networkSuffix, host string) (tcpL net.Listener, utpSock utpSocket, listenedAddr string, err error) {
165         for {
166                 tcpL, err = listenTCP(networkSuffix, net.JoinHostPort(host, "0"))
167                 if err != nil {
168                         return
169                 }
170                 listenedAddr = tcpL.Addr().String()
171                 utpSock, err = listenUTP(networkSuffix, listenedAddr)
172                 if err == nil {
173                         return
174                 }
175                 tcpL.Close()
176                 if !strings.Contains(err.Error(), "address already in use") {
177                         return
178                 }
179         }
180 }
181
182 // Listen to enabled protocols, ensuring ports match.
183 func listen(tcp, utp bool, networkSuffix, addr string) (tcpL net.Listener, utpSock utpSocket, listenedAddr string, err error) {
184         if addr == "" {
185                 addr = ":50007"
186         }
187         if tcp && utp {
188                 var host string
189                 var port int
190                 host, port, err = missinggo.ParseHostPort(addr)
191                 if err != nil {
192                         return
193                 }
194                 if port == 0 {
195                         // If both protocols are active, they need to have the same port.
196                         return listenBothSameDynamicPort(networkSuffix, host)
197                 }
198         }
199         defer func() {
200                 if err != nil {
201                         listenedAddr = ""
202                 }
203         }()
204         if tcp {
205                 tcpL, err = listenTCP(networkSuffix, addr)
206                 if err != nil {
207                         return
208                 }
209                 defer func() {
210                         if err != nil {
211                                 tcpL.Close()
212                         }
213                 }()
214                 listenedAddr = tcpL.Addr().String()
215         }
216         if utp {
217                 utpSock, err = listenUTP(networkSuffix, addr)
218                 if err != nil {
219                         return
220                 }
221                 listenedAddr = utpSock.Addr().String()
222         }
223         return
224 }
225
226 // Creates a new client.
227 func NewClient(cfg *Config) (cl *Client, err error) {
228         if cfg == nil {
229                 cfg = &Config{
230                         DHTConfig: dht.ServerConfig{
231                                 StartingNodes: dht.GlobalBootstrapAddrs,
232                         },
233                 }
234         }
235
236         defer func() {
237                 if err != nil {
238                         cl = nil
239                 }
240         }()
241         cl = &Client{
242                 halfOpenLimit:     defaultHalfOpenConnsPerTorrent,
243                 config:            *cfg,
244                 dopplegangerAddrs: make(map[string]struct{}),
245                 torrents:          make(map[metainfo.Hash]*Torrent),
246         }
247         defer func() {
248                 if err == nil {
249                         return
250                 }
251                 cl.Close()
252         }()
253         if cfg.UploadRateLimiter == nil {
254                 cl.uploadLimit = rate.NewLimiter(rate.Inf, 0)
255         } else {
256                 cl.uploadLimit = cfg.UploadRateLimiter
257         }
258         if cfg.DownloadRateLimiter == nil {
259                 cl.downloadLimit = rate.NewLimiter(rate.Inf, 0)
260         } else {
261                 cl.downloadLimit = cfg.DownloadRateLimiter
262         }
263         missinggo.CopyExact(&cl.extensionBytes, defaultExtensionBytes)
264         cl.event.L = &cl.mu
265         storageImpl := cfg.DefaultStorage
266         if storageImpl == nil {
267                 // We'd use mmap but HFS+ doesn't support sparse files.
268                 storageImpl = storage.NewFile(cfg.DataDir)
269                 cl.onClose = append(cl.onClose, func() {
270                         if err := storageImpl.Close(); err != nil {
271                                 log.Printf("error closing default storage: %s", err)
272                         }
273                 })
274         }
275         cl.defaultStorage = storage.NewClient(storageImpl)
276         if cfg.IPBlocklist != nil {
277                 cl.ipBlockList = cfg.IPBlocklist
278         }
279
280         if cfg.PeerID != "" {
281                 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
282         } else {
283                 o := copy(cl.peerID[:], bep20)
284                 _, err = rand.Read(cl.peerID[o:])
285                 if err != nil {
286                         panic("error generating peer id")
287                 }
288         }
289
290         cl.tcpListener, cl.utpSock, cl.listenAddr, err = listen(
291                 !cl.config.DisableTCP,
292                 !cl.config.DisableUTP,
293                 func() string {
294                         if cl.config.DisableIPv6 {
295                                 return "4"
296                         } else {
297                                 return ""
298                         }
299                 }(),
300                 cl.config.ListenAddr)
301         if err != nil {
302                 return
303         }
304         if cl.tcpListener != nil {
305                 go cl.acceptConnections(cl.tcpListener, false)
306         }
307         if cl.utpSock != nil {
308                 go cl.acceptConnections(cl.utpSock, true)
309         }
310         if !cfg.NoDHT {
311                 dhtCfg := cfg.DHTConfig
312                 if dhtCfg.IPBlocklist == nil {
313                         dhtCfg.IPBlocklist = cl.ipBlockList
314                 }
315                 if dhtCfg.Conn == nil {
316                         if cl.utpSock != nil {
317                                 dhtCfg.Conn = cl.utpSock
318                         } else {
319                                 dhtCfg.Conn, err = net.ListenPacket("udp", firstNonEmptyString(cl.listenAddr, cl.config.ListenAddr))
320                                 if err != nil {
321                                         return
322                                 }
323                         }
324                 }
325                 if dhtCfg.OnAnnouncePeer == nil {
326                         dhtCfg.OnAnnouncePeer = cl.onDHTAnnouncePeer
327                 }
328                 cl.dHT, err = dht.NewServer(&dhtCfg)
329                 if err != nil {
330                         return
331                 }
332                 go func() {
333                         if _, err := cl.dHT.Bootstrap(); err != nil {
334                                 log.Printf("error bootstrapping dht: %s", err)
335                         }
336                 }()
337         }
338
339         return
340 }
341
342 func firstNonEmptyString(ss ...string) string {
343         for _, s := range ss {
344                 if s != "" {
345                         return s
346                 }
347         }
348         return ""
349 }
350
351 // Stops the client. All connections to peers are closed and all activity will
352 // come to a halt.
353 func (cl *Client) Close() {
354         cl.mu.Lock()
355         defer cl.mu.Unlock()
356         cl.closed.Set()
357         if cl.dHT != nil {
358                 cl.dHT.Close()
359         }
360         if cl.utpSock != nil {
361                 cl.utpSock.Close()
362         }
363         if cl.tcpListener != nil {
364                 cl.tcpListener.Close()
365         }
366         for _, t := range cl.torrents {
367                 t.close()
368         }
369         for _, f := range cl.onClose {
370                 f()
371         }
372         cl.event.Broadcast()
373 }
374
375 var ipv6BlockRange = iplist.Range{Description: "non-IPv4 address"}
376
377 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
378         if cl.ipBlockList == nil {
379                 return
380         }
381         ip4 := ip.To4()
382         // If blocklists are enabled, then block non-IPv4 addresses, because
383         // blocklists do not yet support IPv6.
384         if ip4 == nil {
385                 if missinggo.CryHeard() {
386                         log.Printf("blocking non-IPv4 address: %s", ip)
387                 }
388                 r = ipv6BlockRange
389                 blocked = true
390                 return
391         }
392         return cl.ipBlockList.Lookup(ip4)
393 }
394
395 func (cl *Client) waitAccept() {
396         for {
397                 for _, t := range cl.torrents {
398                         if t.wantConns() {
399                                 return
400                         }
401                 }
402                 if cl.closed.IsSet() {
403                         return
404                 }
405                 cl.event.Wait()
406         }
407 }
408
409 func (cl *Client) acceptConnections(l net.Listener, utp bool) {
410         cl.mu.Lock()
411         defer cl.mu.Unlock()
412         for {
413                 cl.waitAccept()
414                 cl.mu.Unlock()
415                 conn, err := l.Accept()
416                 conn = pproffd.WrapNetConn(conn)
417                 cl.mu.Lock()
418                 if cl.closed.IsSet() {
419                         if conn != nil {
420                                 conn.Close()
421                         }
422                         return
423                 }
424                 if err != nil {
425                         log.Print(err)
426                         // I think something harsher should happen here? Our accept
427                         // routine just fucked off.
428                         return
429                 }
430                 if utp {
431                         acceptUTP.Add(1)
432                 } else {
433                         acceptTCP.Add(1)
434                 }
435                 if cl.config.Debug {
436                         log.Printf("accepted connection from %s", conn.RemoteAddr())
437                 }
438                 reject := cl.badPeerIPPort(
439                         missinggo.AddrIP(conn.RemoteAddr()),
440                         missinggo.AddrPort(conn.RemoteAddr()))
441                 if reject {
442                         if cl.config.Debug {
443                                 log.Printf("rejecting connection from %s", conn.RemoteAddr())
444                         }
445                         acceptReject.Add(1)
446                         conn.Close()
447                         continue
448                 }
449                 go cl.incomingConnection(conn, utp)
450         }
451 }
452
453 func (cl *Client) incomingConnection(nc net.Conn, utp bool) {
454         defer nc.Close()
455         if tc, ok := nc.(*net.TCPConn); ok {
456                 tc.SetLinger(0)
457         }
458         c := cl.newConnection(nc)
459         c.Discovery = peerSourceIncoming
460         c.uTP = utp
461         cl.runReceivedConn(c)
462 }
463
464 // Returns a handle to the given torrent, if it's present in the client.
465 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
466         cl.mu.Lock()
467         defer cl.mu.Unlock()
468         t, ok = cl.torrents[ih]
469         return
470 }
471
472 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
473         return cl.torrents[ih]
474 }
475
476 type dialResult struct {
477         Conn net.Conn
478         UTP  bool
479 }
480
481 func countDialResult(err error) {
482         if err == nil {
483                 successfulDials.Add(1)
484         } else {
485                 unsuccessfulDials.Add(1)
486         }
487 }
488
489 func reducedDialTimeout(max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
490         ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
491         if ret < minDialTimeout {
492                 ret = minDialTimeout
493         }
494         return
495 }
496
497 // Returns whether an address is known to connect to a client with our own ID.
498 func (cl *Client) dopplegangerAddr(addr string) bool {
499         _, ok := cl.dopplegangerAddrs[addr]
500         return ok
501 }
502
503 // Start the process of connecting to the given peer for the given torrent if
504 // appropriate.
505 func (cl *Client) initiateConn(peer Peer, t *Torrent) {
506         if peer.Id == cl.peerID {
507                 return
508         }
509         if cl.badPeerIPPort(peer.IP, peer.Port) {
510                 return
511         }
512         addr := net.JoinHostPort(peer.IP.String(), fmt.Sprintf("%d", peer.Port))
513         if t.addrActive(addr) {
514                 return
515         }
516         t.halfOpen[addr] = peer
517         go cl.outgoingConnection(t, addr, peer.Source)
518 }
519
520 func (cl *Client) dialTCP(ctx context.Context, addr string) (c net.Conn, err error) {
521         d := net.Dialer{
522         // LocalAddr: cl.tcpListener.Addr(),
523         }
524         c, err = d.DialContext(ctx, "tcp", addr)
525         countDialResult(err)
526         if err == nil {
527                 c.(*net.TCPConn).SetLinger(0)
528         }
529         c = pproffd.WrapNetConn(c)
530         return
531 }
532
533 func (cl *Client) dialUTP(ctx context.Context, addr string) (c net.Conn, err error) {
534         c, err = cl.utpSock.DialContext(ctx, addr)
535         countDialResult(err)
536         return
537 }
538
539 var (
540         dialledFirstUtp    = expvar.NewInt("dialledFirstUtp")
541         dialledFirstNotUtp = expvar.NewInt("dialledFirstNotUtp")
542 )
543
544 // Returns a connection over UTP or TCP, whichever is first to connect.
545 func (cl *Client) dialFirst(ctx context.Context, addr string) (conn net.Conn, utp bool) {
546         ctx, cancel := context.WithCancel(ctx)
547         // As soon as we return one connection, cancel the others.
548         defer cancel()
549         left := 0
550         resCh := make(chan dialResult, left)
551         if !cl.config.DisableUTP {
552                 left++
553                 go func() {
554                         c, _ := cl.dialUTP(ctx, addr)
555                         resCh <- dialResult{c, true}
556                 }()
557         }
558         if !cl.config.DisableTCP {
559                 left++
560                 go func() {
561                         c, _ := cl.dialTCP(ctx, addr)
562                         resCh <- dialResult{c, false}
563                 }()
564         }
565         var res dialResult
566         // Wait for a successful connection.
567         for ; left > 0 && res.Conn == nil; left-- {
568                 res = <-resCh
569         }
570         if left > 0 {
571                 // There are still incompleted dials.
572                 go func() {
573                         for ; left > 0; left-- {
574                                 conn := (<-resCh).Conn
575                                 if conn != nil {
576                                         conn.Close()
577                                 }
578                         }
579                 }()
580         }
581         conn = res.Conn
582         utp = res.UTP
583         if conn != nil {
584                 if utp {
585                         dialledFirstUtp.Add(1)
586                 } else {
587                         dialledFirstNotUtp.Add(1)
588                 }
589         }
590         return
591 }
592
593 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
594         if _, ok := t.halfOpen[addr]; !ok {
595                 panic("invariant broken")
596         }
597         delete(t.halfOpen, addr)
598         cl.openNewConns(t)
599 }
600
601 // Performs initiator handshakes and returns a connection. Returns nil
602 // *connection if no connection for valid reasons.
603 func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encryptHeader, utp bool) (c *connection, err error) {
604         c = cl.newConnection(nc)
605         c.headerEncrypted = encryptHeader
606         c.uTP = utp
607         ctx, cancel := context.WithTimeout(ctx, handshakesTimeout)
608         defer cancel()
609         dl, ok := ctx.Deadline()
610         if !ok {
611                 panic(ctx)
612         }
613         err = nc.SetDeadline(dl)
614         if err != nil {
615                 panic(err)
616         }
617         ok, err = cl.initiateHandshakes(c, t)
618         if !ok {
619                 c = nil
620         }
621         return
622 }
623
624 var (
625         initiatedConnWithPreferredHeaderEncryption = expvar.NewInt("initiatedConnWithPreferredHeaderEncryption")
626         initiatedConnWithFallbackHeaderEncryption  = expvar.NewInt("initiatedConnWithFallbackHeaderEncryption")
627 )
628
629 // Returns nil connection and nil error if no connection could be established
630 // for valid reasons.
631 func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) {
632         ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
633         defer cancel()
634         nc, utp := cl.dialFirst(ctx, addr)
635         if nc == nil {
636                 return
637         }
638         obfuscatedHeaderFirst := !cl.config.DisableEncryption && !cl.config.PreferNoEncryption
639         c, err = cl.handshakesConnection(ctx, nc, t, obfuscatedHeaderFirst, utp)
640         if err != nil {
641                 // log.Printf("error initiating connection handshakes: %s", err)
642                 nc.Close()
643                 return
644         } else if c != nil {
645                 initiatedConnWithPreferredHeaderEncryption.Add(1)
646                 return
647         }
648         nc.Close()
649         if cl.config.ForceEncryption {
650                 // We should have just tried with an obfuscated header. A plaintext
651                 // header can't result in an encrypted connection, so we're done.
652                 if !obfuscatedHeaderFirst {
653                         panic(cl.config.EncryptionPolicy)
654                 }
655                 return
656         }
657         // Try again with encryption if we didn't earlier, or without if we did,
658         // using whichever protocol type worked last time.
659         if utp {
660                 nc, err = cl.dialUTP(ctx, addr)
661         } else {
662                 nc, err = cl.dialTCP(ctx, addr)
663         }
664         if err != nil {
665                 err = fmt.Errorf("error dialing for header encryption fallback: %s", err)
666                 return
667         }
668         c, err = cl.handshakesConnection(ctx, nc, t, !obfuscatedHeaderFirst, utp)
669         if err != nil || c == nil {
670                 nc.Close()
671         }
672         if err == nil && c != nil {
673                 initiatedConnWithFallbackHeaderEncryption.Add(1)
674         }
675         return
676 }
677
678 // Called to dial out and run a connection. The addr we're given is already
679 // considered half-open.
680 func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
681         c, err := cl.establishOutgoingConn(t, addr)
682         cl.mu.Lock()
683         defer cl.mu.Unlock()
684         // Don't release lock between here and addConnection, unless it's for
685         // failure.
686         cl.noLongerHalfOpen(t, addr)
687         if err != nil {
688                 if cl.config.Debug {
689                         log.Printf("error establishing outgoing connection: %s", err)
690                 }
691                 return
692         }
693         if c == nil {
694                 return
695         }
696         defer c.Close()
697         c.Discovery = ps
698         cl.runInitiatedHandshookConn(c, t)
699 }
700
701 // The port number for incoming peer connections. 0 if the client isn't
702 // listening.
703 func (cl *Client) incomingPeerPort() int {
704         if cl.listenAddr == "" {
705                 return 0
706         }
707         _, port, err := missinggo.ParseHostPort(cl.listenAddr)
708         if err != nil {
709                 panic(err)
710         }
711         return port
712 }
713
714 // Convert a net.Addr to its compact IP representation. Either 4 or 16 bytes
715 // per "yourip" field of http://www.bittorrent.org/beps/bep_0010.html.
716 func addrCompactIP(addr net.Addr) (string, error) {
717         host, _, err := net.SplitHostPort(addr.String())
718         if err != nil {
719                 return "", err
720         }
721         ip := net.ParseIP(host)
722         if v4 := ip.To4(); v4 != nil {
723                 if len(v4) != 4 {
724                         panic(v4)
725                 }
726                 return string(v4), nil
727         }
728         return string(ip.To16()), nil
729 }
730
731 func handshakeWriter(w io.Writer, bb <-chan []byte, done chan<- error) {
732         var err error
733         for b := range bb {
734                 _, err = w.Write(b)
735                 if err != nil {
736                         break
737                 }
738         }
739         done <- err
740 }
741
742 type (
743         peerExtensionBytes [8]byte
744         peerID             [20]byte
745 )
746
747 func (pex *peerExtensionBytes) SupportsExtended() bool {
748         return pex[5]&0x10 != 0
749 }
750
751 func (pex *peerExtensionBytes) SupportsDHT() bool {
752         return pex[7]&0x01 != 0
753 }
754
755 func (pex *peerExtensionBytes) SupportsFast() bool {
756         return pex[7]&0x04 != 0
757 }
758
759 type handshakeResult struct {
760         peerExtensionBytes
761         peerID
762         metainfo.Hash
763 }
764
765 // ih is nil if we expect the peer to declare the InfoHash, such as when the
766 // peer initiated the connection. Returns ok if the handshake was successful,
767 // and err if there was an unexpected condition other than the peer simply
768 // abandoning the handshake.
769 func handshake(sock io.ReadWriter, ih *metainfo.Hash, peerID [20]byte, extensions peerExtensionBytes) (res handshakeResult, ok bool, err error) {
770         // Bytes to be sent to the peer. Should never block the sender.
771         postCh := make(chan []byte, 4)
772         // A single error value sent when the writer completes.
773         writeDone := make(chan error, 1)
774         // Performs writes to the socket and ensures posts don't block.
775         go handshakeWriter(sock, postCh, writeDone)
776
777         defer func() {
778                 close(postCh) // Done writing.
779                 if !ok {
780                         return
781                 }
782                 if err != nil {
783                         panic(err)
784                 }
785                 // Wait until writes complete before returning from handshake.
786                 err = <-writeDone
787                 if err != nil {
788                         err = fmt.Errorf("error writing: %s", err)
789                 }
790         }()
791
792         post := func(bb []byte) {
793                 select {
794                 case postCh <- bb:
795                 default:
796                         panic("mustn't block while posting")
797                 }
798         }
799
800         post([]byte(pp.Protocol))
801         post(extensions[:])
802         if ih != nil { // We already know what we want.
803                 post(ih[:])
804                 post(peerID[:])
805         }
806         var b [68]byte
807         _, err = io.ReadFull(sock, b[:68])
808         if err != nil {
809                 err = nil
810                 return
811         }
812         if string(b[:20]) != pp.Protocol {
813                 return
814         }
815         missinggo.CopyExact(&res.peerExtensionBytes, b[20:28])
816         missinggo.CopyExact(&res.Hash, b[28:48])
817         missinggo.CopyExact(&res.peerID, b[48:68])
818         peerExtensions.Add(hex.EncodeToString(res.peerExtensionBytes[:]), 1)
819
820         // TODO: Maybe we can just drop peers here if we're not interested. This
821         // could prevent them trying to reconnect, falsely believing there was
822         // just a problem.
823         if ih == nil { // We were waiting for the peer to tell us what they wanted.
824                 post(res.Hash[:])
825                 post(peerID[:])
826         }
827
828         ok = true
829         return
830 }
831
832 // Wraps a raw connection and provides the interface we want for using the
833 // connection in the message loop.
834 type deadlineReader struct {
835         nc net.Conn
836         r  io.Reader
837 }
838
839 func (r deadlineReader) Read(b []byte) (n int, err error) {
840         // Keep-alives should be received every 2 mins. Give a bit of gracetime.
841         err = r.nc.SetReadDeadline(time.Now().Add(150 * time.Second))
842         if err != nil {
843                 err = fmt.Errorf("error setting read deadline: %s", err)
844         }
845         n, err = r.r.Read(b)
846         // Convert common errors into io.EOF.
847         // if err != nil {
848         //      if opError, ok := err.(*net.OpError); ok && opError.Op == "read" && opError.Err == syscall.ECONNRESET {
849         //              err = io.EOF
850         //      } else if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
851         //              if n != 0 {
852         //                      panic(n)
853         //              }
854         //              err = io.EOF
855         //      }
856         // }
857         return
858 }
859
860 func handleEncryption(
861         rw io.ReadWriter,
862         skeys mse.SecretKeyIter,
863         policy EncryptionPolicy,
864 ) (
865         ret io.ReadWriter,
866         headerEncrypted bool,
867         cryptoMethod uint32,
868         err error,
869 ) {
870         if !policy.ForceEncryption {
871                 var protocol [len(pp.Protocol)]byte
872                 _, err = io.ReadFull(rw, protocol[:])
873                 if err != nil {
874                         return
875                 }
876                 rw = struct {
877                         io.Reader
878                         io.Writer
879                 }{
880                         io.MultiReader(bytes.NewReader(protocol[:]), rw),
881                         rw,
882                 }
883                 if string(protocol[:]) == pp.Protocol {
884                         ret = rw
885                         return
886                 }
887         }
888         headerEncrypted = true
889         ret, err = mse.ReceiveHandshake(rw, skeys, func(provides uint32) uint32 {
890                 cryptoMethod = func() uint32 {
891                         switch {
892                         case policy.ForceEncryption:
893                                 return mse.CryptoMethodRC4
894                         case policy.DisableEncryption:
895                                 return mse.CryptoMethodPlaintext
896                         case policy.PreferNoEncryption && provides&mse.CryptoMethodPlaintext != 0:
897                                 return mse.CryptoMethodPlaintext
898                         default:
899                                 return mse.DefaultCryptoSelector(provides)
900                         }
901                 }()
902                 return cryptoMethod
903         })
904         return
905 }
906
907 func (cl *Client) initiateHandshakes(c *connection, t *Torrent) (ok bool, err error) {
908         if c.headerEncrypted {
909                 var rw io.ReadWriter
910                 rw, err = mse.InitiateHandshake(
911                         struct {
912                                 io.Reader
913                                 io.Writer
914                         }{c.r, c.w},
915                         t.infoHash[:],
916                         nil,
917                         func() uint32 {
918                                 switch {
919                                 case cl.config.ForceEncryption:
920                                         return mse.CryptoMethodRC4
921                                 case cl.config.DisableEncryption:
922                                         return mse.CryptoMethodPlaintext
923                                 default:
924                                         return mse.AllSupportedCrypto
925                                 }
926                         }(),
927                 )
928                 c.setRW(rw)
929                 if err != nil {
930                         return
931                 }
932         }
933         ih, ok, err := cl.connBTHandshake(c, &t.infoHash)
934         if ih != t.infoHash {
935                 ok = false
936         }
937         return
938 }
939
940 // Calls f with any secret keys.
941 func (cl *Client) forSkeys(f func([]byte) bool) {
942         cl.mu.Lock()
943         defer cl.mu.Unlock()
944         for ih := range cl.torrents {
945                 if !f(ih[:]) {
946                         break
947                 }
948         }
949 }
950
951 // Do encryption and bittorrent handshakes as receiver.
952 func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
953         var rw io.ReadWriter
954         rw, c.headerEncrypted, c.cryptoMethod, err = handleEncryption(c.rw(), cl.forSkeys, cl.config.EncryptionPolicy)
955         c.setRW(rw)
956         if err != nil {
957                 if err == mse.ErrNoSecretKeyMatch {
958                         err = nil
959                 }
960                 return
961         }
962         if cl.config.ForceEncryption && !c.headerEncrypted {
963                 err = errors.New("connection not encrypted")
964                 return
965         }
966         ih, ok, err := cl.connBTHandshake(c, nil)
967         if err != nil {
968                 err = fmt.Errorf("error during bt handshake: %s", err)
969                 return
970         }
971         if !ok {
972                 return
973         }
974         cl.mu.Lock()
975         t = cl.torrents[ih]
976         cl.mu.Unlock()
977         return
978 }
979
980 // Returns !ok if handshake failed for valid reasons.
981 func (cl *Client) connBTHandshake(c *connection, ih *metainfo.Hash) (ret metainfo.Hash, ok bool, err error) {
982         res, ok, err := handshake(c.rw(), ih, cl.peerID, cl.extensionBytes)
983         if err != nil || !ok {
984                 return
985         }
986         ret = res.Hash
987         c.PeerExtensionBytes = res.peerExtensionBytes
988         c.PeerID = res.peerID
989         c.completedHandshake = time.Now()
990         return
991 }
992
993 func (cl *Client) runInitiatedHandshookConn(c *connection, t *Torrent) {
994         if c.PeerID == cl.peerID {
995                 connsToSelf.Add(1)
996                 addr := c.conn.RemoteAddr().String()
997                 cl.dopplegangerAddrs[addr] = struct{}{}
998                 return
999         }
1000         cl.runHandshookConn(c, t, true)
1001 }
1002
1003 func (cl *Client) runReceivedConn(c *connection) {
1004         err := c.conn.SetDeadline(time.Now().Add(handshakesTimeout))
1005         if err != nil {
1006                 panic(err)
1007         }
1008         t, err := cl.receiveHandshakes(c)
1009         if err != nil {
1010                 if cl.config.Debug {
1011                         log.Printf("error receiving handshakes: %s", err)
1012                 }
1013                 return
1014         }
1015         if t == nil {
1016                 return
1017         }
1018         cl.mu.Lock()
1019         defer cl.mu.Unlock()
1020         if c.PeerID == cl.peerID {
1021                 // Because the remote address is not necessarily the same as its
1022                 // client's torrent listen address, we won't record the remote address
1023                 // as a doppleganger. Instead, the initiator can record *us* as the
1024                 // doppleganger.
1025                 return
1026         }
1027         cl.runHandshookConn(c, t, false)
1028 }
1029
1030 func (cl *Client) runHandshookConn(c *connection, t *Torrent, outgoing bool) {
1031         c.conn.SetWriteDeadline(time.Time{})
1032         c.r = deadlineReader{c.conn, c.r}
1033         completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
1034         if !t.addConnection(c, outgoing) {
1035                 return
1036         }
1037         defer t.dropConnection(c)
1038         go c.writer(time.Minute)
1039         cl.sendInitialMessages(c, t)
1040         err := c.mainReadLoop()
1041         if err != nil && cl.config.Debug {
1042                 log.Printf("error during connection loop: %s", err)
1043         }
1044 }
1045
1046 func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
1047         if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
1048                 conn.Post(pp.Message{
1049                         Type:       pp.Extended,
1050                         ExtendedID: pp.HandshakeExtendedID,
1051                         ExtendedPayload: func() []byte {
1052                                 d := map[string]interface{}{
1053                                         "m": func() (ret map[string]int) {
1054                                                 ret = make(map[string]int, 2)
1055                                                 ret["ut_metadata"] = metadataExtendedId
1056                                                 if !cl.config.DisablePEX {
1057                                                         ret["ut_pex"] = pexExtendedId
1058                                                 }
1059                                                 return
1060                                         }(),
1061                                         "v": extendedHandshakeClientVersion,
1062                                         // No upload queue is implemented yet.
1063                                         "reqq": 64,
1064                                 }
1065                                 if !cl.config.DisableEncryption {
1066                                         d["e"] = 1
1067                                 }
1068                                 if torrent.metadataSizeKnown() {
1069                                         d["metadata_size"] = torrent.metadataSize()
1070                                 }
1071                                 if p := cl.incomingPeerPort(); p != 0 {
1072                                         d["p"] = p
1073                                 }
1074                                 yourip, err := addrCompactIP(conn.remoteAddr())
1075                                 if err != nil {
1076                                         log.Printf("error calculating yourip field value in extension handshake: %s", err)
1077                                 } else {
1078                                         d["yourip"] = yourip
1079                                 }
1080                                 // log.Printf("sending %v", d)
1081                                 b, err := bencode.Marshal(d)
1082                                 if err != nil {
1083                                         panic(err)
1084                                 }
1085                                 return b
1086                         }(),
1087                 })
1088         }
1089         if torrent.haveAnyPieces() {
1090                 conn.Bitfield(torrent.bitfield())
1091         } else if cl.extensionBytes.SupportsFast() && conn.PeerExtensionBytes.SupportsFast() {
1092                 conn.Post(pp.Message{
1093                         Type: pp.HaveNone,
1094                 })
1095         }
1096         if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.dHT != nil {
1097                 conn.Post(pp.Message{
1098                         Type: pp.Port,
1099                         Port: uint16(missinggo.AddrPort(cl.dHT.Addr())),
1100                 })
1101         }
1102 }
1103
1104 // Process incoming ut_metadata message.
1105 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) error {
1106         var d map[string]int
1107         err := bencode.Unmarshal(payload, &d)
1108         if err != nil {
1109                 return fmt.Errorf("error unmarshalling payload: %s: %q", err, payload)
1110         }
1111         msgType, ok := d["msg_type"]
1112         if !ok {
1113                 return errors.New("missing msg_type field")
1114         }
1115         piece := d["piece"]
1116         switch msgType {
1117         case pp.DataMetadataExtensionMsgType:
1118                 if !c.requestedMetadataPiece(piece) {
1119                         return fmt.Errorf("got unexpected piece %d", piece)
1120                 }
1121                 c.metadataRequests[piece] = false
1122                 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
1123                 if begin < 0 || begin >= len(payload) {
1124                         return fmt.Errorf("data has bad offset in payload: %d", begin)
1125                 }
1126                 t.saveMetadataPiece(piece, payload[begin:])
1127                 c.UsefulChunksReceived++
1128                 c.lastUsefulChunkReceived = time.Now()
1129                 return t.maybeCompleteMetadata()
1130         case pp.RequestMetadataExtensionMsgType:
1131                 if !t.haveMetadataPiece(piece) {
1132                         c.Post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
1133                         return nil
1134                 }
1135                 start := (1 << 14) * piece
1136                 c.Post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1137                 return nil
1138         case pp.RejectMetadataExtensionMsgType:
1139                 return nil
1140         default:
1141                 return errors.New("unknown msg_type value")
1142         }
1143 }
1144
1145 func (cl *Client) sendChunk(t *Torrent, c *connection, r request, msg func(pp.Message) bool) (more bool, err error) {
1146         // Count the chunk being sent, even if it isn't.
1147         b := make([]byte, r.Length)
1148         p := t.info.Piece(int(r.Index))
1149         n, err := t.readAt(b, p.Offset()+int64(r.Begin))
1150         if n != len(b) {
1151                 if err == nil {
1152                         panic("expected error")
1153                 }
1154                 return
1155         } else if err == io.EOF {
1156                 err = nil
1157         }
1158         more = msg(pp.Message{
1159                 Type:  pp.Piece,
1160                 Index: r.Index,
1161                 Begin: r.Begin,
1162                 Piece: b,
1163         })
1164         c.chunksSent++
1165         uploadChunksPosted.Add(1)
1166         c.lastChunkSent = time.Now()
1167         return
1168 }
1169
1170 func (cl *Client) openNewConns(t *Torrent) {
1171         defer t.updateWantPeersEvent()
1172         for len(t.peers) != 0 {
1173                 if !t.wantConns() {
1174                         return
1175                 }
1176                 if len(t.halfOpen) >= cl.halfOpenLimit {
1177                         return
1178                 }
1179                 var (
1180                         k peersKey
1181                         p Peer
1182                 )
1183                 for k, p = range t.peers {
1184                         break
1185                 }
1186                 delete(t.peers, k)
1187                 cl.initiateConn(p, t)
1188         }
1189 }
1190
1191 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1192         if port == 0 {
1193                 return true
1194         }
1195         if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1196                 return true
1197         }
1198         if _, ok := cl.ipBlockRange(ip); ok {
1199                 return true
1200         }
1201         if _, ok := cl.badPeerIPs[ip.String()]; ok {
1202                 return true
1203         }
1204         return false
1205 }
1206
1207 // Return a Torrent ready for insertion into a Client.
1208 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1209         // use provided storage, if provided
1210         storageClient := cl.defaultStorage
1211         if specStorage != nil {
1212                 storageClient = storage.NewClient(specStorage)
1213         }
1214
1215         t = &Torrent{
1216                 cl:       cl,
1217                 infoHash: ih,
1218                 peers:    make(map[peersKey]Peer),
1219                 conns:    make(map[*connection]struct{}, 2*defaultEstablishedConnsPerTorrent),
1220
1221                 halfOpen:          make(map[string]Peer),
1222                 pieceStateChanges: pubsub.NewPubSub(),
1223
1224                 storageOpener:       storageClient,
1225                 maxEstablishedConns: defaultEstablishedConnsPerTorrent,
1226
1227                 networkingEnabled: true,
1228                 requestStrategy:   2,
1229         }
1230         t.setChunkSize(defaultChunkSize)
1231         return
1232 }
1233
1234 // A file-like handle to some torrent data resource.
1235 type Handle interface {
1236         io.Reader
1237         io.Seeker
1238         io.Closer
1239         io.ReaderAt
1240 }
1241
1242 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1243         return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1244 }
1245
1246 // Adds a torrent by InfoHash with a custom Storage implementation.
1247 // If the torrent already exists then this Storage is ignored and the
1248 // existing torrent returned with `new` set to `false`
1249 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1250         cl.mu.Lock()
1251         defer cl.mu.Unlock()
1252         t, ok := cl.torrents[infoHash]
1253         if ok {
1254                 return
1255         }
1256         new = true
1257         t = cl.newTorrent(infoHash, specStorage)
1258         if cl.dHT != nil {
1259                 go t.dhtAnnouncer()
1260         }
1261         cl.torrents[infoHash] = t
1262         t.updateWantPeersEvent()
1263         // Tickle Client.waitAccept, new torrent may want conns.
1264         cl.event.Broadcast()
1265         return
1266 }
1267
1268 // Add or merge a torrent spec. If the torrent is already present, the
1269 // trackers will be merged with the existing ones. If the Info isn't yet
1270 // known, it will be set. The display name is replaced if the new spec
1271 // provides one. Returns new if the torrent wasn't already in the client.
1272 // Note that any `Storage` defined on the spec will be ignored if the
1273 // torrent is already present (i.e. `new` return value is `true`)
1274 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1275         t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1276         if spec.DisplayName != "" {
1277                 t.SetDisplayName(spec.DisplayName)
1278         }
1279         if spec.InfoBytes != nil {
1280                 err = t.SetInfoBytes(spec.InfoBytes)
1281                 if err != nil {
1282                         return
1283                 }
1284         }
1285         cl.mu.Lock()
1286         defer cl.mu.Unlock()
1287         if spec.ChunkSize != 0 {
1288                 t.setChunkSize(pp.Integer(spec.ChunkSize))
1289         }
1290         t.addTrackers(spec.Trackers)
1291         t.maybeNewConns()
1292         return
1293 }
1294
1295 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1296         t, ok := cl.torrents[infoHash]
1297         if !ok {
1298                 err = fmt.Errorf("no such torrent")
1299                 return
1300         }
1301         err = t.close()
1302         if err != nil {
1303                 panic(err)
1304         }
1305         delete(cl.torrents, infoHash)
1306         return
1307 }
1308
1309 func (cl *Client) prepareTrackerAnnounceUnlocked(announceURL string) (blocked bool, urlToUse string, host string, err error) {
1310         _url, err := url.Parse(announceURL)
1311         if err != nil {
1312                 return
1313         }
1314         hmp := missinggo.SplitHostMaybePort(_url.Host)
1315         if hmp.Err != nil {
1316                 err = hmp.Err
1317                 return
1318         }
1319         addr, err := net.ResolveIPAddr("ip", hmp.Host)
1320         if err != nil {
1321                 return
1322         }
1323         cl.mu.RLock()
1324         _, blocked = cl.ipBlockRange(addr.IP)
1325         cl.mu.RUnlock()
1326         host = _url.Host
1327         hmp.Host = addr.String()
1328         _url.Host = hmp.String()
1329         urlToUse = _url.String()
1330         return
1331 }
1332
1333 func (cl *Client) allTorrentsCompleted() bool {
1334         for _, t := range cl.torrents {
1335                 if !t.haveInfo() {
1336                         return false
1337                 }
1338                 if t.numPiecesCompleted() != t.numPieces() {
1339                         return false
1340                 }
1341         }
1342         return true
1343 }
1344
1345 // Returns true when all torrents are completely downloaded and false if the
1346 // client is stopped before that.
1347 func (cl *Client) WaitAll() bool {
1348         cl.mu.Lock()
1349         defer cl.mu.Unlock()
1350         for !cl.allTorrentsCompleted() {
1351                 if cl.closed.IsSet() {
1352                         return false
1353                 }
1354                 cl.event.Wait()
1355         }
1356         return true
1357 }
1358
1359 // Returns handles to all the torrents loaded in the Client.
1360 func (cl *Client) Torrents() []*Torrent {
1361         cl.mu.Lock()
1362         defer cl.mu.Unlock()
1363         return cl.torrentsAsSlice()
1364 }
1365
1366 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1367         for _, t := range cl.torrents {
1368                 ret = append(ret, t)
1369         }
1370         return
1371 }
1372
1373 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1374         spec, err := TorrentSpecFromMagnetURI(uri)
1375         if err != nil {
1376                 return
1377         }
1378         T, _, err = cl.AddTorrentSpec(spec)
1379         return
1380 }
1381
1382 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1383         T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1384         var ss []string
1385         slices.MakeInto(&ss, mi.Nodes)
1386         cl.AddDHTNodes(ss)
1387         return
1388 }
1389
1390 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1391         mi, err := metainfo.LoadFromFile(filename)
1392         if err != nil {
1393                 return
1394         }
1395         return cl.AddTorrent(mi)
1396 }
1397
1398 func (cl *Client) DHT() *dht.Server {
1399         return cl.dHT
1400 }
1401
1402 func (cl *Client) AddDHTNodes(nodes []string) {
1403         if cl.DHT() == nil {
1404                 return
1405         }
1406         for _, n := range nodes {
1407                 hmp := missinggo.SplitHostMaybePort(n)
1408                 ip := net.ParseIP(hmp.Host)
1409                 if ip == nil {
1410                         log.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1411                         continue
1412                 }
1413                 ni := krpc.NodeInfo{
1414                         Addr: &net.UDPAddr{
1415                                 IP:   ip,
1416                                 Port: hmp.Port,
1417                         },
1418                 }
1419                 cl.DHT().AddNode(ni)
1420         }
1421 }
1422
1423 func (cl *Client) banPeerIP(ip net.IP) {
1424         if cl.badPeerIPs == nil {
1425                 cl.badPeerIPs = make(map[string]struct{})
1426         }
1427         cl.badPeerIPs[ip.String()] = struct{}{}
1428 }
1429
1430 func (cl *Client) newConnection(nc net.Conn) (c *connection) {
1431         c = &connection{
1432                 conn: nc,
1433
1434                 Choked:          true,
1435                 PeerChoked:      true,
1436                 PeerMaxRequests: 250,
1437         }
1438         c.writerCond.L = &cl.mu
1439         c.setRW(connStatsReadWriter{nc, &cl.mu, c})
1440         c.r = rateLimitedReader{cl.downloadLimit, c.r}
1441         return
1442 }
1443
1444 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, p dht.Peer) {
1445         cl.mu.Lock()
1446         defer cl.mu.Unlock()
1447         t := cl.torrent(ih)
1448         if t == nil {
1449                 return
1450         }
1451         t.addPeers([]Peer{{
1452                 IP:     p.IP,
1453                 Port:   p.Port,
1454                 Source: peerSourceDHTAnnouncePeer,
1455         }})
1456 }