]> Sergey Matveev's repositories - btrtrc.git/blob - client.go
Remove unused functions
[btrtrc.git] / client.go
1 package torrent
2
3 import (
4         "bufio"
5         "bytes"
6         "context"
7         "crypto/rand"
8         "encoding/hex"
9         "errors"
10         "fmt"
11         "io"
12         "log"
13         "net"
14         "net/url"
15         "strconv"
16         "strings"
17         "time"
18
19         "github.com/anacrolix/dht"
20         "github.com/anacrolix/dht/krpc"
21         "github.com/anacrolix/missinggo"
22         "github.com/anacrolix/missinggo/pproffd"
23         "github.com/anacrolix/missinggo/pubsub"
24         "github.com/anacrolix/missinggo/slices"
25         "github.com/anacrolix/sync"
26         "github.com/dustin/go-humanize"
27         "golang.org/x/time/rate"
28
29         "github.com/anacrolix/torrent/bencode"
30         "github.com/anacrolix/torrent/iplist"
31         "github.com/anacrolix/torrent/metainfo"
32         "github.com/anacrolix/torrent/mse"
33         pp "github.com/anacrolix/torrent/peer_protocol"
34         "github.com/anacrolix/torrent/storage"
35 )
36
37 // Clients contain zero or more Torrents. A Client manages a blocklist, the
38 // TCP/UDP protocol ports, and DHT as desired.
39 type Client struct {
40         mu     sync.RWMutex
41         event  sync.Cond
42         closed missinggo.Event
43
44         config Config
45
46         halfOpenLimit  int
47         peerID         [20]byte
48         defaultStorage *storage.Client
49         onClose        []func()
50         tcpListener    net.Listener
51         utpSock        utpSocket
52         dHT            *dht.Server
53         ipBlockList    iplist.Ranger
54         // Our BitTorrent protocol extension bytes, sent in our BT handshakes.
55         extensionBytes peerExtensionBytes
56         // The net.Addr.String part that should be common to all active listeners.
57         listenAddr    string
58         uploadLimit   *rate.Limiter
59         downloadLimit *rate.Limiter
60
61         // Set of addresses that have our client ID. This intentionally will
62         // include ourselves if we end up trying to connect to our own address
63         // through legitimate channels.
64         dopplegangerAddrs map[string]struct{}
65         badPeerIPs        map[string]struct{}
66         torrents          map[metainfo.Hash]*Torrent
67 }
68
69 func (cl *Client) BadPeerIPs() []string {
70         cl.mu.RLock()
71         defer cl.mu.RUnlock()
72         return cl.badPeerIPsLocked()
73 }
74
75 func (cl *Client) badPeerIPsLocked() []string {
76         return slices.FromMapKeys(cl.badPeerIPs).([]string)
77 }
78
79 func (cl *Client) IPBlockList() iplist.Ranger {
80         cl.mu.Lock()
81         defer cl.mu.Unlock()
82         return cl.ipBlockList
83 }
84
85 func (cl *Client) SetIPBlockList(list iplist.Ranger) {
86         cl.mu.Lock()
87         defer cl.mu.Unlock()
88         cl.ipBlockList = list
89         if cl.dHT != nil {
90                 cl.dHT.SetIPBlockList(list)
91         }
92 }
93
94 func (cl *Client) PeerID() string {
95         return string(cl.peerID[:])
96 }
97
98 type torrentAddr string
99
100 func (torrentAddr) Network() string { return "" }
101
102 func (me torrentAddr) String() string { return string(me) }
103
104 func (cl *Client) ListenAddr() net.Addr {
105         if cl.listenAddr == "" {
106                 return nil
107         }
108         return torrentAddr(cl.listenAddr)
109 }
110
111 // Writes out a human readable status of the client, such as for writing to a
112 // HTTP status page.
113 func (cl *Client) WriteStatus(_w io.Writer) {
114         cl.mu.Lock()
115         defer cl.mu.Unlock()
116         w := bufio.NewWriter(_w)
117         defer w.Flush()
118         if addr := cl.ListenAddr(); addr != nil {
119                 fmt.Fprintf(w, "Listening on %s\n", addr)
120         } else {
121                 fmt.Fprintln(w, "Not listening!")
122         }
123         fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
124         fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPsLocked()))
125         if dht := cl.DHT(); dht != nil {
126                 dhtStats := dht.Stats()
127                 fmt.Fprintf(w, "DHT nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
128                 fmt.Fprintf(w, "DHT Server ID: %x\n", dht.ID())
129                 fmt.Fprintf(w, "DHT port: %d\n", missinggo.AddrPort(dht.Addr()))
130                 fmt.Fprintf(w, "DHT announces: %d\n", dhtStats.ConfirmedAnnounces)
131                 fmt.Fprintf(w, "Outstanding transactions: %d\n", dhtStats.OutstandingTransactions)
132         }
133         fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrentsAsSlice()))
134         fmt.Fprintln(w)
135         for _, t := range slices.Sort(cl.torrentsAsSlice(), func(l, r *Torrent) bool {
136                 return l.InfoHash().AsString() < r.InfoHash().AsString()
137         }).([]*Torrent) {
138                 if t.name() == "" {
139                         fmt.Fprint(w, "<unknown name>")
140                 } else {
141                         fmt.Fprint(w, t.name())
142                 }
143                 fmt.Fprint(w, "\n")
144                 if t.Info() != nil {
145                         fmt.Fprintf(w, "%f%% of %d bytes (%s)", 100*(1-float64(t.bytesMissingLocked())/float64(t.Info().TotalLength())), t.length, humanize.Bytes(uint64(t.Info().TotalLength())))
146                 } else {
147                         w.WriteString("<missing metainfo>")
148                 }
149                 fmt.Fprint(w, "\n")
150                 t.writeStatus(w)
151                 fmt.Fprintln(w)
152         }
153 }
154
155 func listenUTP(networkSuffix, addr string) (utpSocket, error) {
156         return NewUtpSocket("udp"+networkSuffix, addr)
157 }
158
159 func listenTCP(networkSuffix, addr string) (net.Listener, error) {
160         return net.Listen("tcp"+networkSuffix, addr)
161 }
162
163 func listenBothSameDynamicPort(networkSuffix, host string) (tcpL net.Listener, utpSock utpSocket, listenedAddr string, err error) {
164         for {
165                 tcpL, err = listenTCP(networkSuffix, net.JoinHostPort(host, "0"))
166                 if err != nil {
167                         return
168                 }
169                 listenedAddr = tcpL.Addr().String()
170                 utpSock, err = listenUTP(networkSuffix, listenedAddr)
171                 if err == nil {
172                         return
173                 }
174                 tcpL.Close()
175                 if !strings.Contains(err.Error(), "address already in use") {
176                         return
177                 }
178         }
179 }
180
181 // Listen to enabled protocols, ensuring ports match.
182 func listen(tcp, utp bool, networkSuffix, addr string) (tcpL net.Listener, utpSock utpSocket, listenedAddr string, err error) {
183         if addr == "" {
184                 addr = ":50007"
185         }
186         if tcp && utp {
187                 var host string
188                 var port int
189                 host, port, err = missinggo.ParseHostPort(addr)
190                 if err != nil {
191                         return
192                 }
193                 if port == 0 {
194                         // If both protocols are active, they need to have the same port.
195                         return listenBothSameDynamicPort(networkSuffix, host)
196                 }
197         }
198         defer func() {
199                 if err != nil {
200                         listenedAddr = ""
201                 }
202         }()
203         if tcp {
204                 tcpL, err = listenTCP(networkSuffix, addr)
205                 if err != nil {
206                         return
207                 }
208                 defer func() {
209                         if err != nil {
210                                 tcpL.Close()
211                         }
212                 }()
213                 listenedAddr = tcpL.Addr().String()
214         }
215         if utp {
216                 utpSock, err = listenUTP(networkSuffix, addr)
217                 if err != nil {
218                         return
219                 }
220                 listenedAddr = utpSock.Addr().String()
221         }
222         return
223 }
224
225 // Creates a new client.
226 func NewClient(cfg *Config) (cl *Client, err error) {
227         if cfg == nil {
228                 cfg = &Config{
229                         DHTConfig: dht.ServerConfig{
230                                 StartingNodes: dht.GlobalBootstrapAddrs,
231                         },
232                 }
233         }
234
235         defer func() {
236                 if err != nil {
237                         cl = nil
238                 }
239         }()
240         cl = &Client{
241                 halfOpenLimit:     defaultHalfOpenConnsPerTorrent,
242                 config:            *cfg,
243                 dopplegangerAddrs: make(map[string]struct{}),
244                 torrents:          make(map[metainfo.Hash]*Torrent),
245         }
246         defer func() {
247                 if err == nil {
248                         return
249                 }
250                 cl.Close()
251         }()
252         if cfg.UploadRateLimiter == nil {
253                 cl.uploadLimit = rate.NewLimiter(rate.Inf, 0)
254         } else {
255                 cl.uploadLimit = cfg.UploadRateLimiter
256         }
257         if cfg.DownloadRateLimiter == nil {
258                 cl.downloadLimit = rate.NewLimiter(rate.Inf, 0)
259         } else {
260                 cl.downloadLimit = cfg.DownloadRateLimiter
261         }
262         missinggo.CopyExact(&cl.extensionBytes, defaultExtensionBytes)
263         cl.event.L = &cl.mu
264         storageImpl := cfg.DefaultStorage
265         if storageImpl == nil {
266                 storageImpl = storage.NewFile(cfg.DataDir)
267                 cl.onClose = append(cl.onClose, func() {
268                         if err := storageImpl.Close(); err != nil {
269                                 log.Printf("error closing default storage: %s", err)
270                         }
271                 })
272         }
273         cl.defaultStorage = storage.NewClient(storageImpl)
274         if cfg.IPBlocklist != nil {
275                 cl.ipBlockList = cfg.IPBlocklist
276         }
277
278         if cfg.PeerID != "" {
279                 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
280         } else {
281                 o := copy(cl.peerID[:], bep20)
282                 _, err = rand.Read(cl.peerID[o:])
283                 if err != nil {
284                         panic("error generating peer id")
285                 }
286         }
287
288         cl.tcpListener, cl.utpSock, cl.listenAddr, err = listen(
289                 !cl.config.DisableTCP,
290                 !cl.config.DisableUTP,
291                 func() string {
292                         if cl.config.DisableIPv6 {
293                                 return "4"
294                         } else {
295                                 return ""
296                         }
297                 }(),
298                 cl.config.ListenAddr)
299         if err != nil {
300                 return
301         }
302         if cl.tcpListener != nil {
303                 go cl.acceptConnections(cl.tcpListener, false)
304         }
305         if cl.utpSock != nil {
306                 go cl.acceptConnections(cl.utpSock, true)
307         }
308         if !cfg.NoDHT {
309                 dhtCfg := cfg.DHTConfig
310                 if dhtCfg.IPBlocklist == nil {
311                         dhtCfg.IPBlocklist = cl.ipBlockList
312                 }
313                 if dhtCfg.Conn == nil {
314                         if cl.utpSock != nil {
315                                 dhtCfg.Conn = cl.utpSock
316                         } else {
317                                 dhtCfg.Conn, err = net.ListenPacket("udp", firstNonEmptyString(cl.listenAddr, cl.config.ListenAddr))
318                                 if err != nil {
319                                         return
320                                 }
321                         }
322                 }
323                 if dhtCfg.OnAnnouncePeer == nil {
324                         dhtCfg.OnAnnouncePeer = cl.onDHTAnnouncePeer
325                 }
326                 cl.dHT, err = dht.NewServer(&dhtCfg)
327                 if err != nil {
328                         return
329                 }
330                 go func() {
331                         if _, err := cl.dHT.Bootstrap(); err != nil {
332                                 log.Printf("error bootstrapping dht: %s", err)
333                         }
334                 }()
335         }
336
337         return
338 }
339
340 func firstNonEmptyString(ss ...string) string {
341         for _, s := range ss {
342                 if s != "" {
343                         return s
344                 }
345         }
346         return ""
347 }
348
349 // Stops the client. All connections to peers are closed and all activity will
350 // come to a halt.
351 func (cl *Client) Close() {
352         cl.mu.Lock()
353         defer cl.mu.Unlock()
354         cl.closed.Set()
355         if cl.dHT != nil {
356                 cl.dHT.Close()
357         }
358         if cl.utpSock != nil {
359                 cl.utpSock.Close()
360         }
361         if cl.tcpListener != nil {
362                 cl.tcpListener.Close()
363         }
364         for _, t := range cl.torrents {
365                 t.close()
366         }
367         for _, f := range cl.onClose {
368                 f()
369         }
370         cl.event.Broadcast()
371 }
372
373 var ipv6BlockRange = iplist.Range{Description: "non-IPv4 address"}
374
375 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
376         if cl.ipBlockList == nil {
377                 return
378         }
379         ip4 := ip.To4()
380         // If blocklists are enabled, then block non-IPv4 addresses, because
381         // blocklists do not yet support IPv6.
382         if ip4 == nil {
383                 if missinggo.CryHeard() {
384                         log.Printf("blocking non-IPv4 address: %s", ip)
385                 }
386                 r = ipv6BlockRange
387                 blocked = true
388                 return
389         }
390         return cl.ipBlockList.Lookup(ip4)
391 }
392
393 func (cl *Client) waitAccept() {
394         for {
395                 for _, t := range cl.torrents {
396                         if t.wantConns() {
397                                 return
398                         }
399                 }
400                 if cl.closed.IsSet() {
401                         return
402                 }
403                 cl.event.Wait()
404         }
405 }
406
407 func (cl *Client) acceptConnections(l net.Listener, utp bool) {
408         cl.mu.Lock()
409         defer cl.mu.Unlock()
410         for {
411                 cl.waitAccept()
412                 cl.mu.Unlock()
413                 conn, err := l.Accept()
414                 conn = pproffd.WrapNetConn(conn)
415                 cl.mu.Lock()
416                 if cl.closed.IsSet() {
417                         if conn != nil {
418                                 conn.Close()
419                         }
420                         return
421                 }
422                 if err != nil {
423                         log.Print(err)
424                         // I think something harsher should happen here? Our accept
425                         // routine just fucked off.
426                         return
427                 }
428                 if utp {
429                         acceptUTP.Add(1)
430                 } else {
431                         acceptTCP.Add(1)
432                 }
433                 if cl.config.Debug {
434                         log.Printf("accepted connection from %s", conn.RemoteAddr())
435                 }
436                 reject := cl.badPeerIPPort(
437                         missinggo.AddrIP(conn.RemoteAddr()),
438                         missinggo.AddrPort(conn.RemoteAddr()))
439                 if reject {
440                         if cl.config.Debug {
441                                 log.Printf("rejecting connection from %s", conn.RemoteAddr())
442                         }
443                         acceptReject.Add(1)
444                         conn.Close()
445                         continue
446                 }
447                 go cl.incomingConnection(conn, utp)
448         }
449 }
450
451 func (cl *Client) incomingConnection(nc net.Conn, utp bool) {
452         defer nc.Close()
453         if tc, ok := nc.(*net.TCPConn); ok {
454                 tc.SetLinger(0)
455         }
456         c := cl.newConnection(nc)
457         c.Discovery = peerSourceIncoming
458         c.uTP = utp
459         cl.runReceivedConn(c)
460 }
461
462 // Returns a handle to the given torrent, if it's present in the client.
463 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
464         cl.mu.Lock()
465         defer cl.mu.Unlock()
466         t, ok = cl.torrents[ih]
467         return
468 }
469
470 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
471         return cl.torrents[ih]
472 }
473
474 type dialResult struct {
475         Conn net.Conn
476         UTP  bool
477 }
478
479 func countDialResult(err error) {
480         if err == nil {
481                 successfulDials.Add(1)
482         } else {
483                 unsuccessfulDials.Add(1)
484         }
485 }
486
487 func reducedDialTimeout(max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
488         ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
489         if ret < minDialTimeout {
490                 ret = minDialTimeout
491         }
492         return
493 }
494
495 // Returns whether an address is known to connect to a client with our own ID.
496 func (cl *Client) dopplegangerAddr(addr string) bool {
497         _, ok := cl.dopplegangerAddrs[addr]
498         return ok
499 }
500
501 // Start the process of connecting to the given peer for the given torrent if
502 // appropriate.
503 func (cl *Client) initiateConn(peer Peer, t *Torrent) {
504         if peer.Id == cl.peerID {
505                 return
506         }
507         if cl.badPeerIPPort(peer.IP, peer.Port) {
508                 return
509         }
510         addr := net.JoinHostPort(peer.IP.String(), fmt.Sprintf("%d", peer.Port))
511         if t.addrActive(addr) {
512                 return
513         }
514         t.halfOpen[addr] = struct{}{}
515         go cl.outgoingConnection(t, addr, peer.Source)
516 }
517
518 func (cl *Client) dialTCP(ctx context.Context, addr string) (c net.Conn, err error) {
519         d := net.Dialer{
520         // LocalAddr: cl.tcpListener.Addr(),
521         }
522         c, err = d.DialContext(ctx, "tcp", addr)
523         countDialResult(err)
524         if err == nil {
525                 c.(*net.TCPConn).SetLinger(0)
526         }
527         c = pproffd.WrapNetConn(c)
528         return
529 }
530
531 func (cl *Client) dialUTP(ctx context.Context, addr string) (c net.Conn, err error) {
532         c, err = cl.utpSock.DialContext(ctx, addr)
533         countDialResult(err)
534         return
535 }
536
537 // Returns a connection over UTP or TCP, whichever is first to connect.
538 func (cl *Client) dialFirst(ctx context.Context, addr string) (conn net.Conn, utp bool) {
539         ctx, cancel := context.WithCancel(ctx)
540         // As soon as we return one connection, cancel the others.
541         defer cancel()
542         left := 0
543         resCh := make(chan dialResult, left)
544         if !cl.config.DisableUTP {
545                 left++
546                 go func() {
547                         c, _ := cl.dialUTP(ctx, addr)
548                         resCh <- dialResult{c, true}
549                 }()
550         }
551         if !cl.config.DisableTCP {
552                 left++
553                 go func() {
554                         c, _ := cl.dialTCP(ctx, addr)
555                         resCh <- dialResult{c, false}
556                 }()
557         }
558         var res dialResult
559         // Wait for a successful connection.
560         for ; left > 0 && res.Conn == nil; left-- {
561                 res = <-resCh
562         }
563         if left > 0 {
564                 // There are still incompleted dials.
565                 go func() {
566                         for ; left > 0; left-- {
567                                 conn := (<-resCh).Conn
568                                 if conn != nil {
569                                         conn.Close()
570                                 }
571                         }
572                 }()
573         }
574         conn = res.Conn
575         utp = res.UTP
576         return
577 }
578
579 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
580         if _, ok := t.halfOpen[addr]; !ok {
581                 panic("invariant broken")
582         }
583         delete(t.halfOpen, addr)
584         cl.openNewConns(t)
585 }
586
587 // Performs initiator handshakes and returns a connection. Returns nil
588 // *connection if no connection for valid reasons.
589 func (cl *Client) handshakesConnection(ctx context.Context, nc net.Conn, t *Torrent, encrypted, utp bool) (c *connection, err error) {
590         c = cl.newConnection(nc)
591         c.encrypted = encrypted
592         c.uTP = utp
593         ctx, cancel := context.WithTimeout(ctx, handshakesTimeout)
594         defer cancel()
595         dl, ok := ctx.Deadline()
596         if !ok {
597                 panic(ctx)
598         }
599         err = nc.SetDeadline(dl)
600         if err != nil {
601                 panic(err)
602         }
603         ok, err = cl.initiateHandshakes(c, t)
604         if !ok {
605                 c = nil
606         }
607         return
608 }
609
610 // Returns nil connection and nil error if no connection could be established
611 // for valid reasons.
612 func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) {
613         ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
614         defer cancel()
615         nc, utp := cl.dialFirst(ctx, addr)
616         if nc == nil {
617                 return
618         }
619         encryptFirst := !cl.config.DisableEncryption && !cl.config.PreferNoEncryption
620         c, err = cl.handshakesConnection(ctx, nc, t, encryptFirst, utp)
621         if err != nil {
622                 nc.Close()
623                 return
624         } else if c != nil {
625                 return
626         }
627         nc.Close()
628         if cl.config.DisableEncryption || cl.config.ForceEncryption {
629                 // There's no alternate encryption case to try.
630                 return
631         }
632         // Try again with encryption if we didn't earlier, or without if we did,
633         // using whichever protocol type worked last time.
634         if utp {
635                 nc, err = cl.dialUTP(ctx, addr)
636         } else {
637                 nc, err = cl.dialTCP(ctx, addr)
638         }
639         if err != nil {
640                 err = fmt.Errorf("error dialing for unencrypted connection: %s", err)
641                 return
642         }
643         c, err = cl.handshakesConnection(ctx, nc, t, !encryptFirst, utp)
644         if err != nil || c == nil {
645                 nc.Close()
646         }
647         return
648 }
649
650 // Called to dial out and run a connection. The addr we're given is already
651 // considered half-open.
652 func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
653         c, err := cl.establishOutgoingConn(t, addr)
654         cl.mu.Lock()
655         defer cl.mu.Unlock()
656         // Don't release lock between here and addConnection, unless it's for
657         // failure.
658         cl.noLongerHalfOpen(t, addr)
659         if err != nil {
660                 if cl.config.Debug {
661                         log.Printf("error establishing outgoing connection: %s", err)
662                 }
663                 return
664         }
665         if c == nil {
666                 return
667         }
668         defer c.Close()
669         c.Discovery = ps
670         cl.runInitiatedHandshookConn(c, t)
671 }
672
673 // The port number for incoming peer connections. 0 if the client isn't
674 // listening.
675 func (cl *Client) incomingPeerPort() int {
676         if cl.listenAddr == "" {
677                 return 0
678         }
679         _, port, err := missinggo.ParseHostPort(cl.listenAddr)
680         if err != nil {
681                 panic(err)
682         }
683         return port
684 }
685
686 // Convert a net.Addr to its compact IP representation. Either 4 or 16 bytes
687 // per "yourip" field of http://www.bittorrent.org/beps/bep_0010.html.
688 func addrCompactIP(addr net.Addr) (string, error) {
689         host, _, err := net.SplitHostPort(addr.String())
690         if err != nil {
691                 return "", err
692         }
693         ip := net.ParseIP(host)
694         if v4 := ip.To4(); v4 != nil {
695                 if len(v4) != 4 {
696                         panic(v4)
697                 }
698                 return string(v4), nil
699         }
700         return string(ip.To16()), nil
701 }
702
703 func handshakeWriter(w io.Writer, bb <-chan []byte, done chan<- error) {
704         var err error
705         for b := range bb {
706                 _, err = w.Write(b)
707                 if err != nil {
708                         break
709                 }
710         }
711         done <- err
712 }
713
714 type (
715         peerExtensionBytes [8]byte
716         peerID             [20]byte
717 )
718
719 func (pex *peerExtensionBytes) SupportsExtended() bool {
720         return pex[5]&0x10 != 0
721 }
722
723 func (pex *peerExtensionBytes) SupportsDHT() bool {
724         return pex[7]&0x01 != 0
725 }
726
727 func (pex *peerExtensionBytes) SupportsFast() bool {
728         return pex[7]&0x04 != 0
729 }
730
731 type handshakeResult struct {
732         peerExtensionBytes
733         peerID
734         metainfo.Hash
735 }
736
737 // ih is nil if we expect the peer to declare the InfoHash, such as when the
738 // peer initiated the connection. Returns ok if the handshake was successful,
739 // and err if there was an unexpected condition other than the peer simply
740 // abandoning the handshake.
741 func handshake(sock io.ReadWriter, ih *metainfo.Hash, peerID [20]byte, extensions peerExtensionBytes) (res handshakeResult, ok bool, err error) {
742         // Bytes to be sent to the peer. Should never block the sender.
743         postCh := make(chan []byte, 4)
744         // A single error value sent when the writer completes.
745         writeDone := make(chan error, 1)
746         // Performs writes to the socket and ensures posts don't block.
747         go handshakeWriter(sock, postCh, writeDone)
748
749         defer func() {
750                 close(postCh) // Done writing.
751                 if !ok {
752                         return
753                 }
754                 if err != nil {
755                         panic(err)
756                 }
757                 // Wait until writes complete before returning from handshake.
758                 err = <-writeDone
759                 if err != nil {
760                         err = fmt.Errorf("error writing: %s", err)
761                 }
762         }()
763
764         post := func(bb []byte) {
765                 select {
766                 case postCh <- bb:
767                 default:
768                         panic("mustn't block while posting")
769                 }
770         }
771
772         post([]byte(pp.Protocol))
773         post(extensions[:])
774         if ih != nil { // We already know what we want.
775                 post(ih[:])
776                 post(peerID[:])
777         }
778         var b [68]byte
779         _, err = io.ReadFull(sock, b[:68])
780         if err != nil {
781                 err = nil
782                 return
783         }
784         if string(b[:20]) != pp.Protocol {
785                 return
786         }
787         missinggo.CopyExact(&res.peerExtensionBytes, b[20:28])
788         missinggo.CopyExact(&res.Hash, b[28:48])
789         missinggo.CopyExact(&res.peerID, b[48:68])
790         peerExtensions.Add(hex.EncodeToString(res.peerExtensionBytes[:]), 1)
791
792         // TODO: Maybe we can just drop peers here if we're not interested. This
793         // could prevent them trying to reconnect, falsely believing there was
794         // just a problem.
795         if ih == nil { // We were waiting for the peer to tell us what they wanted.
796                 post(res.Hash[:])
797                 post(peerID[:])
798         }
799
800         ok = true
801         return
802 }
803
804 // Wraps a raw connection and provides the interface we want for using the
805 // connection in the message loop.
806 type deadlineReader struct {
807         nc net.Conn
808         r  io.Reader
809 }
810
811 func (r deadlineReader) Read(b []byte) (n int, err error) {
812         // Keep-alives should be received every 2 mins. Give a bit of gracetime.
813         err = r.nc.SetReadDeadline(time.Now().Add(150 * time.Second))
814         if err != nil {
815                 err = fmt.Errorf("error setting read deadline: %s", err)
816         }
817         n, err = r.r.Read(b)
818         // Convert common errors into io.EOF.
819         // if err != nil {
820         //      if opError, ok := err.(*net.OpError); ok && opError.Op == "read" && opError.Err == syscall.ECONNRESET {
821         //              err = io.EOF
822         //      } else if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
823         //              if n != 0 {
824         //                      panic(n)
825         //              }
826         //              err = io.EOF
827         //      }
828         // }
829         return
830 }
831
832 func maybeReceiveEncryptedHandshake(rw io.ReadWriter, skeys mse.SecretKeyIter) (ret io.ReadWriter, encrypted bool, err error) {
833         var protocol [len(pp.Protocol)]byte
834         _, err = io.ReadFull(rw, protocol[:])
835         if err != nil {
836                 return
837         }
838         ret = struct {
839                 io.Reader
840                 io.Writer
841         }{
842                 io.MultiReader(bytes.NewReader(protocol[:]), rw),
843                 rw,
844         }
845         if string(protocol[:]) == pp.Protocol {
846                 return
847         }
848         encrypted = true
849         ret, err = mse.ReceiveHandshakeLazy(ret, skeys)
850         return
851 }
852
853 func (cl *Client) initiateHandshakes(c *connection, t *Torrent) (ok bool, err error) {
854         if c.encrypted {
855                 var rw io.ReadWriter
856                 rw, err = mse.InitiateHandshake(struct {
857                         io.Reader
858                         io.Writer
859                 }{c.r, c.w}, t.infoHash[:], nil)
860                 c.setRW(rw)
861                 if err != nil {
862                         return
863                 }
864         }
865         ih, ok, err := cl.connBTHandshake(c, &t.infoHash)
866         if ih != t.infoHash {
867                 ok = false
868         }
869         return
870 }
871
872 // Calls f with any secret keys.
873 func (cl *Client) forSkeys(f func([]byte) bool) {
874         cl.mu.Lock()
875         defer cl.mu.Unlock()
876         for ih := range cl.torrents {
877                 if !f(ih[:]) {
878                         break
879                 }
880         }
881 }
882
883 // Do encryption and bittorrent handshakes as receiver.
884 func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
885         if !cl.config.DisableEncryption {
886                 var rw io.ReadWriter
887                 rw, c.encrypted, err = maybeReceiveEncryptedHandshake(c.rw(), cl.forSkeys)
888                 c.setRW(rw)
889                 if err != nil {
890                         if err == mse.ErrNoSecretKeyMatch {
891                                 err = nil
892                         }
893                         return
894                 }
895         }
896         if cl.config.ForceEncryption && !c.encrypted {
897                 err = errors.New("connection not encrypted")
898                 return
899         }
900         ih, ok, err := cl.connBTHandshake(c, nil)
901         if err != nil {
902                 err = fmt.Errorf("error during bt handshake: %s", err)
903                 return
904         }
905         if !ok {
906                 return
907         }
908         cl.mu.Lock()
909         t = cl.torrents[ih]
910         cl.mu.Unlock()
911         return
912 }
913
914 // Returns !ok if handshake failed for valid reasons.
915 func (cl *Client) connBTHandshake(c *connection, ih *metainfo.Hash) (ret metainfo.Hash, ok bool, err error) {
916         res, ok, err := handshake(c.rw(), ih, cl.peerID, cl.extensionBytes)
917         if err != nil || !ok {
918                 return
919         }
920         ret = res.Hash
921         c.PeerExtensionBytes = res.peerExtensionBytes
922         c.PeerID = res.peerID
923         c.completedHandshake = time.Now()
924         return
925 }
926
927 func (cl *Client) runInitiatedHandshookConn(c *connection, t *Torrent) {
928         if c.PeerID == cl.peerID {
929                 connsToSelf.Add(1)
930                 addr := c.conn.RemoteAddr().String()
931                 cl.dopplegangerAddrs[addr] = struct{}{}
932                 return
933         }
934         cl.runHandshookConn(c, t, true)
935 }
936
937 func (cl *Client) runReceivedConn(c *connection) {
938         err := c.conn.SetDeadline(time.Now().Add(handshakesTimeout))
939         if err != nil {
940                 panic(err)
941         }
942         t, err := cl.receiveHandshakes(c)
943         if err != nil {
944                 if cl.config.Debug {
945                         log.Printf("error receiving handshakes: %s", err)
946                 }
947                 return
948         }
949         if t == nil {
950                 return
951         }
952         cl.mu.Lock()
953         defer cl.mu.Unlock()
954         if c.PeerID == cl.peerID {
955                 // Because the remote address is not necessarily the same as its
956                 // client's torrent listen address, we won't record the remote address
957                 // as a doppleganger. Instead, the initiator can record *us* as the
958                 // doppleganger.
959                 return
960         }
961         cl.runHandshookConn(c, t, false)
962 }
963
964 func (cl *Client) runHandshookConn(c *connection, t *Torrent, outgoing bool) {
965         c.conn.SetWriteDeadline(time.Time{})
966         c.r = deadlineReader{c.conn, c.r}
967         completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
968         if !t.addConnection(c, outgoing) {
969                 return
970         }
971         defer t.dropConnection(c)
972         go c.writer(time.Minute)
973         cl.sendInitialMessages(c, t)
974         err := c.mainReadLoop()
975         if err != nil && cl.config.Debug {
976                 log.Printf("error during connection loop: %s", err)
977         }
978 }
979
980 func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
981         if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
982                 conn.Post(pp.Message{
983                         Type:       pp.Extended,
984                         ExtendedID: pp.HandshakeExtendedID,
985                         ExtendedPayload: func() []byte {
986                                 d := map[string]interface{}{
987                                         "m": func() (ret map[string]int) {
988                                                 ret = make(map[string]int, 2)
989                                                 ret["ut_metadata"] = metadataExtendedId
990                                                 if !cl.config.DisablePEX {
991                                                         ret["ut_pex"] = pexExtendedId
992                                                 }
993                                                 return
994                                         }(),
995                                         "v": extendedHandshakeClientVersion,
996                                         // No upload queue is implemented yet.
997                                         "reqq": 64,
998                                 }
999                                 if !cl.config.DisableEncryption {
1000                                         d["e"] = 1
1001                                 }
1002                                 if torrent.metadataSizeKnown() {
1003                                         d["metadata_size"] = torrent.metadataSize()
1004                                 }
1005                                 if p := cl.incomingPeerPort(); p != 0 {
1006                                         d["p"] = p
1007                                 }
1008                                 yourip, err := addrCompactIP(conn.remoteAddr())
1009                                 if err != nil {
1010                                         log.Printf("error calculating yourip field value in extension handshake: %s", err)
1011                                 } else {
1012                                         d["yourip"] = yourip
1013                                 }
1014                                 // log.Printf("sending %v", d)
1015                                 b, err := bencode.Marshal(d)
1016                                 if err != nil {
1017                                         panic(err)
1018                                 }
1019                                 return b
1020                         }(),
1021                 })
1022         }
1023         if torrent.haveAnyPieces() {
1024                 conn.Bitfield(torrent.bitfield())
1025         } else if cl.extensionBytes.SupportsFast() && conn.PeerExtensionBytes.SupportsFast() {
1026                 conn.Post(pp.Message{
1027                         Type: pp.HaveNone,
1028                 })
1029         }
1030         if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.dHT != nil {
1031                 conn.Post(pp.Message{
1032                         Type: pp.Port,
1033                         Port: uint16(missinggo.AddrPort(cl.dHT.Addr())),
1034                 })
1035         }
1036 }
1037
1038 func (cl *Client) peerUnchoked(torrent *Torrent, conn *connection) {
1039         conn.updateRequests()
1040 }
1041
1042 func (cl *Client) connCancel(t *Torrent, cn *connection, r request) (ok bool) {
1043         ok = cn.Cancel(r)
1044         if ok {
1045                 postedCancels.Add(1)
1046         }
1047         return
1048 }
1049
1050 func (cl *Client) connDeleteRequest(t *Torrent, cn *connection, r request) bool {
1051         if !cn.RequestPending(r) {
1052                 return false
1053         }
1054         delete(cn.Requests, r)
1055         return true
1056 }
1057
1058 // Process incoming ut_metadata message.
1059 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) error {
1060         var d map[string]int
1061         err := bencode.Unmarshal(payload, &d)
1062         if err != nil {
1063                 return fmt.Errorf("error unmarshalling payload: %s: %q", err, payload)
1064         }
1065         msgType, ok := d["msg_type"]
1066         if !ok {
1067                 return errors.New("missing msg_type field")
1068         }
1069         piece := d["piece"]
1070         switch msgType {
1071         case pp.DataMetadataExtensionMsgType:
1072                 if !c.requestedMetadataPiece(piece) {
1073                         return fmt.Errorf("got unexpected piece %d", piece)
1074                 }
1075                 c.metadataRequests[piece] = false
1076                 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
1077                 if begin < 0 || begin >= len(payload) {
1078                         return fmt.Errorf("data has bad offset in payload: %d", begin)
1079                 }
1080                 t.saveMetadataPiece(piece, payload[begin:])
1081                 c.UsefulChunksReceived++
1082                 c.lastUsefulChunkReceived = time.Now()
1083                 return t.maybeCompleteMetadata()
1084         case pp.RequestMetadataExtensionMsgType:
1085                 if !t.haveMetadataPiece(piece) {
1086                         c.Post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
1087                         return nil
1088                 }
1089                 start := (1 << 14) * piece
1090                 c.Post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1091                 return nil
1092         case pp.RejectMetadataExtensionMsgType:
1093                 return nil
1094         default:
1095                 return errors.New("unknown msg_type value")
1096         }
1097 }
1098
1099 func (cl *Client) sendChunk(t *Torrent, c *connection, r request) error {
1100         // Count the chunk being sent, even if it isn't.
1101         b := make([]byte, r.Length)
1102         p := t.info.Piece(int(r.Index))
1103         n, err := t.readAt(b, p.Offset()+int64(r.Begin))
1104         if n != len(b) {
1105                 if err == nil {
1106                         panic("expected error")
1107                 }
1108                 return err
1109         }
1110         c.Post(pp.Message{
1111                 Type:  pp.Piece,
1112                 Index: r.Index,
1113                 Begin: r.Begin,
1114                 Piece: b,
1115         })
1116         c.chunksSent++
1117         uploadChunksPosted.Add(1)
1118         c.lastChunkSent = time.Now()
1119         return nil
1120 }
1121
1122 func (cl *Client) openNewConns(t *Torrent) {
1123         defer t.updateWantPeersEvent()
1124         for len(t.peers) != 0 {
1125                 if !t.wantConns() {
1126                         return
1127                 }
1128                 if len(t.halfOpen) >= cl.halfOpenLimit {
1129                         return
1130                 }
1131                 var (
1132                         k peersKey
1133                         p Peer
1134                 )
1135                 for k, p = range t.peers {
1136                         break
1137                 }
1138                 delete(t.peers, k)
1139                 cl.initiateConn(p, t)
1140         }
1141 }
1142
1143 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1144         if port == 0 {
1145                 return true
1146         }
1147         if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1148                 return true
1149         }
1150         if _, ok := cl.ipBlockRange(ip); ok {
1151                 return true
1152         }
1153         if _, ok := cl.badPeerIPs[ip.String()]; ok {
1154                 return true
1155         }
1156         return false
1157 }
1158
1159 // Return a Torrent ready for insertion into a Client.
1160 func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent) {
1161         // use provided storage, if provided
1162         storageClient := cl.defaultStorage
1163         if specStorage != nil {
1164                 storageClient = storage.NewClient(specStorage)
1165         }
1166
1167         t = &Torrent{
1168                 cl:       cl,
1169                 infoHash: ih,
1170                 peers:    make(map[peersKey]Peer),
1171                 conns:    make(map[*connection]struct{}, 2*defaultEstablishedConnsPerTorrent),
1172
1173                 halfOpen:          make(map[string]struct{}),
1174                 pieceStateChanges: pubsub.NewPubSub(),
1175
1176                 storageOpener:       storageClient,
1177                 maxEstablishedConns: defaultEstablishedConnsPerTorrent,
1178
1179                 networkingEnabled: true,
1180         }
1181         t.setChunkSize(defaultChunkSize)
1182         return
1183 }
1184
1185 // A file-like handle to some torrent data resource.
1186 type Handle interface {
1187         io.Reader
1188         io.Seeker
1189         io.Closer
1190         io.ReaderAt
1191 }
1192
1193 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1194         return cl.AddTorrentInfoHashWithStorage(infoHash, nil)
1195 }
1196
1197 // Adds a torrent by InfoHash with a custom Storage implementation.
1198 // If the torrent already exists then this Storage is ignored and the
1199 // existing torrent returned with `new` set to `false`
1200 func (cl *Client) AddTorrentInfoHashWithStorage(infoHash metainfo.Hash, specStorage storage.ClientImpl) (t *Torrent, new bool) {
1201         cl.mu.Lock()
1202         defer cl.mu.Unlock()
1203         t, ok := cl.torrents[infoHash]
1204         if ok {
1205                 return
1206         }
1207         new = true
1208         t = cl.newTorrent(infoHash, specStorage)
1209         if cl.dHT != nil {
1210                 go t.dhtAnnouncer()
1211         }
1212         cl.torrents[infoHash] = t
1213         t.updateWantPeersEvent()
1214         // Tickle Client.waitAccept, new torrent may want conns.
1215         cl.event.Broadcast()
1216         return
1217 }
1218
1219 // Add or merge a torrent spec. If the torrent is already present, the
1220 // trackers will be merged with the existing ones. If the Info isn't yet
1221 // known, it will be set. The display name is replaced if the new spec
1222 // provides one. Returns new if the torrent wasn't already in the client.
1223 // Note that any `Storage` defined on the spec will be ignored if the
1224 // torrent is already present (i.e. `new` return value is `true`)
1225 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1226         t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
1227         if spec.DisplayName != "" {
1228                 t.SetDisplayName(spec.DisplayName)
1229         }
1230         if spec.InfoBytes != nil {
1231                 err = t.SetInfoBytes(spec.InfoBytes)
1232                 if err != nil {
1233                         return
1234                 }
1235         }
1236         cl.mu.Lock()
1237         defer cl.mu.Unlock()
1238         if spec.ChunkSize != 0 {
1239                 t.setChunkSize(pp.Integer(spec.ChunkSize))
1240         }
1241         t.addTrackers(spec.Trackers)
1242         t.maybeNewConns()
1243         return
1244 }
1245
1246 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1247         t, ok := cl.torrents[infoHash]
1248         if !ok {
1249                 err = fmt.Errorf("no such torrent")
1250                 return
1251         }
1252         err = t.close()
1253         if err != nil {
1254                 panic(err)
1255         }
1256         delete(cl.torrents, infoHash)
1257         return
1258 }
1259
1260 func (cl *Client) prepareTrackerAnnounceUnlocked(announceURL string) (blocked bool, urlToUse string, host string, err error) {
1261         _url, err := url.Parse(announceURL)
1262         if err != nil {
1263                 return
1264         }
1265         hmp := missinggo.SplitHostMaybePort(_url.Host)
1266         if hmp.Err != nil {
1267                 err = hmp.Err
1268                 return
1269         }
1270         addr, err := net.ResolveIPAddr("ip", hmp.Host)
1271         if err != nil {
1272                 return
1273         }
1274         cl.mu.RLock()
1275         _, blocked = cl.ipBlockRange(addr.IP)
1276         cl.mu.RUnlock()
1277         host = _url.Host
1278         hmp.Host = addr.String()
1279         _url.Host = hmp.String()
1280         urlToUse = _url.String()
1281         return
1282 }
1283
1284 func (cl *Client) allTorrentsCompleted() bool {
1285         for _, t := range cl.torrents {
1286                 if !t.haveInfo() {
1287                         return false
1288                 }
1289                 if t.numPiecesCompleted() != t.numPieces() {
1290                         return false
1291                 }
1292         }
1293         return true
1294 }
1295
1296 // Returns true when all torrents are completely downloaded and false if the
1297 // client is stopped before that.
1298 func (cl *Client) WaitAll() bool {
1299         cl.mu.Lock()
1300         defer cl.mu.Unlock()
1301         for !cl.allTorrentsCompleted() {
1302                 if cl.closed.IsSet() {
1303                         return false
1304                 }
1305                 cl.event.Wait()
1306         }
1307         return true
1308 }
1309
1310 // Returns handles to all the torrents loaded in the Client.
1311 func (cl *Client) Torrents() []*Torrent {
1312         cl.mu.Lock()
1313         defer cl.mu.Unlock()
1314         return cl.torrentsAsSlice()
1315 }
1316
1317 func (cl *Client) torrentsAsSlice() (ret []*Torrent) {
1318         for _, t := range cl.torrents {
1319                 ret = append(ret, t)
1320         }
1321         return
1322 }
1323
1324 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1325         spec, err := TorrentSpecFromMagnetURI(uri)
1326         if err != nil {
1327                 return
1328         }
1329         T, _, err = cl.AddTorrentSpec(spec)
1330         return
1331 }
1332
1333 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1334         T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1335         var ss []string
1336         slices.MakeInto(&ss, mi.Nodes)
1337         cl.AddDHTNodes(ss)
1338         return
1339 }
1340
1341 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1342         mi, err := metainfo.LoadFromFile(filename)
1343         if err != nil {
1344                 return
1345         }
1346         return cl.AddTorrent(mi)
1347 }
1348
1349 func (cl *Client) DHT() *dht.Server {
1350         return cl.dHT
1351 }
1352
1353 func (cl *Client) AddDHTNodes(nodes []string) {
1354         if cl.DHT() == nil {
1355                 return
1356         }
1357         for _, n := range nodes {
1358                 hmp := missinggo.SplitHostMaybePort(n)
1359                 ip := net.ParseIP(hmp.Host)
1360                 if ip == nil {
1361                         log.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1362                         continue
1363                 }
1364                 ni := krpc.NodeInfo{
1365                         Addr: &net.UDPAddr{
1366                                 IP:   ip,
1367                                 Port: hmp.Port,
1368                         },
1369                 }
1370                 cl.DHT().AddNode(ni)
1371         }
1372 }
1373
1374 func (cl *Client) banPeerIP(ip net.IP) {
1375         if cl.badPeerIPs == nil {
1376                 cl.badPeerIPs = make(map[string]struct{})
1377         }
1378         cl.badPeerIPs[ip.String()] = struct{}{}
1379 }
1380
1381 func (cl *Client) newConnection(nc net.Conn) (c *connection) {
1382         c = &connection{
1383                 conn: nc,
1384
1385                 Choked:          true,
1386                 PeerChoked:      true,
1387                 PeerMaxRequests: 250,
1388         }
1389         c.setRW(connStatsReadWriter{nc, &cl.mu, c})
1390         c.r = rateLimitedReader{cl.downloadLimit, c.r}
1391         return
1392 }
1393
1394 func (cl *Client) onDHTAnnouncePeer(ih metainfo.Hash, p dht.Peer) {
1395         cl.mu.Lock()
1396         defer cl.mu.Unlock()
1397         t := cl.torrent(ih)
1398         if t == nil {
1399                 return
1400         }
1401         t.addPeers([]Peer{{
1402                 IP:     p.IP,
1403                 Port:   p.Port,
1404                 Source: peerSourceDHTAnnouncePeer,
1405         }})
1406 }