]> Sergey Matveev's repositories - btrtrc.git/blob - client.go
Move TorrentSpec stuff into its own file
[btrtrc.git] / client.go
1 package torrent
2
3 import (
4         "bufio"
5         "bytes"
6         "crypto/rand"
7         "encoding/hex"
8         "errors"
9         "fmt"
10         "io"
11         "log"
12         mathRand "math/rand"
13         "net"
14         "net/url"
15         "strconv"
16         "strings"
17         "time"
18
19         "github.com/anacrolix/missinggo"
20         "github.com/anacrolix/missinggo/pproffd"
21         "github.com/anacrolix/missinggo/pubsub"
22         "github.com/anacrolix/missinggo/slices"
23         "github.com/anacrolix/sync"
24         "github.com/anacrolix/utp"
25         "github.com/dustin/go-humanize"
26
27         "github.com/anacrolix/torrent/bencode"
28         "github.com/anacrolix/torrent/dht"
29         "github.com/anacrolix/torrent/dht/krpc"
30         "github.com/anacrolix/torrent/iplist"
31         "github.com/anacrolix/torrent/metainfo"
32         "github.com/anacrolix/torrent/mse"
33         pp "github.com/anacrolix/torrent/peer_protocol"
34         "github.com/anacrolix/torrent/storage"
35 )
36
37 // Currently doesn't really queue, but should in the future.
38 func (cl *Client) queuePieceCheck(t *Torrent, pieceIndex int) {
39         piece := &t.pieces[pieceIndex]
40         if piece.QueuedForHash {
41                 return
42         }
43         piece.QueuedForHash = true
44         t.publishPieceChange(pieceIndex)
45         go cl.verifyPiece(t, pieceIndex)
46 }
47
48 // Queue a piece check if one isn't already queued, and the piece has never
49 // been checked before.
50 func (cl *Client) queueFirstHash(t *Torrent, piece int) {
51         p := &t.pieces[piece]
52         if p.EverHashed || p.Hashing || p.QueuedForHash || t.pieceComplete(piece) {
53                 return
54         }
55         cl.queuePieceCheck(t, piece)
56 }
57
58 // Clients contain zero or more Torrents. A Client manages a blocklist, the
59 // TCP/UDP protocol ports, and DHT as desired.
60 type Client struct {
61         halfOpenLimit int
62         peerID        [20]byte
63         // The net.Addr.String part that should be common to all active listeners.
64         listenAddr     string
65         tcpListener    net.Listener
66         utpSock        *utp.Socket
67         dHT            *dht.Server
68         ipBlockList    iplist.Ranger
69         config         Config
70         extensionBytes peerExtensionBytes
71         // Set of addresses that have our client ID. This intentionally will
72         // include ourselves if we end up trying to connect to our own address
73         // through legitimate channels.
74         dopplegangerAddrs map[string]struct{}
75         badPeerIPs        map[string]struct{}
76
77         defaultStorage *storage.Client
78
79         mu     sync.RWMutex
80         event  sync.Cond
81         closed missinggo.Event
82
83         torrents map[metainfo.Hash]*Torrent
84 }
85
86 func (cl *Client) BadPeerIPs() []string {
87         cl.mu.RLock()
88         defer cl.mu.RUnlock()
89         return slices.FromMapKeys(cl.badPeerIPs).([]string)
90 }
91
92 func (cl *Client) IPBlockList() iplist.Ranger {
93         cl.mu.Lock()
94         defer cl.mu.Unlock()
95         return cl.ipBlockList
96 }
97
98 func (cl *Client) SetIPBlockList(list iplist.Ranger) {
99         cl.mu.Lock()
100         defer cl.mu.Unlock()
101         cl.ipBlockList = list
102         if cl.dHT != nil {
103                 cl.dHT.SetIPBlockList(list)
104         }
105 }
106
107 func (cl *Client) PeerID() string {
108         return string(cl.peerID[:])
109 }
110
111 type torrentAddr string
112
113 func (me torrentAddr) Network() string { return "" }
114
115 func (me torrentAddr) String() string { return string(me) }
116
117 func (cl *Client) ListenAddr() net.Addr {
118         if cl.listenAddr == "" {
119                 return nil
120         }
121         return torrentAddr(cl.listenAddr)
122 }
123
124 func (cl *Client) sortedTorrents() (ret []*Torrent) {
125         return slices.Sort(slices.FromMapElems(cl.torrents), func(l, r metainfo.Hash) bool {
126                 return l.AsString() < r.AsString()
127         }).([]*Torrent)
128 }
129
130 // Writes out a human readable status of the client, such as for writing to a
131 // HTTP status page.
132 func (cl *Client) WriteStatus(_w io.Writer) {
133         w := bufio.NewWriter(_w)
134         defer w.Flush()
135         if addr := cl.ListenAddr(); addr != nil {
136                 fmt.Fprintf(w, "Listening on %s\n", addr)
137         } else {
138                 fmt.Fprintln(w, "Not listening!")
139         }
140         fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
141         fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.BadPeerIPs()))
142         if dht := cl.DHT(); dht != nil {
143                 dhtStats := dht.Stats()
144                 fmt.Fprintf(w, "DHT nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
145                 fmt.Fprintf(w, "DHT Server ID: %x\n", dht.ID())
146                 fmt.Fprintf(w, "DHT port: %d\n", missinggo.AddrPort(dht.Addr()))
147                 fmt.Fprintf(w, "DHT announces: %d\n", dhtStats.ConfirmedAnnounces)
148                 fmt.Fprintf(w, "Outstanding transactions: %d\n", dhtStats.OutstandingTransactions)
149         }
150         fmt.Fprintf(w, "# Torrents: %d\n", len(cl.Torrents()))
151         fmt.Fprintln(w)
152         for _, t := range slices.Sort(append([]*Torrent(nil), cl.Torrents()...), func(l, r *Torrent) bool {
153                 return l.InfoHash().AsString() < r.InfoHash().AsString()
154         }).([]*Torrent) {
155                 if t.Name() == "" {
156                         fmt.Fprint(w, "<unknown name>")
157                 } else {
158                         fmt.Fprint(w, t.Name())
159                 }
160                 fmt.Fprint(w, "\n")
161                 if t.Info() != nil {
162                         fmt.Fprintf(w, "%f%% of %d bytes (%s)", 100*(1-float64(t.BytesMissing())/float64(t.Info().TotalLength())), t.length, humanize.Bytes(uint64(t.Info().TotalLength())))
163                 } else {
164                         w.WriteString("<missing metainfo>")
165                 }
166                 fmt.Fprint(w, "\n")
167                 t.writeStatus(w)
168                 fmt.Fprintln(w)
169         }
170 }
171
172 func listenUTP(networkSuffix, addr string) (*utp.Socket, error) {
173         return utp.NewSocket("udp"+networkSuffix, addr)
174 }
175
176 func listenTCP(networkSuffix, addr string) (net.Listener, error) {
177         return net.Listen("tcp"+networkSuffix, addr)
178 }
179
180 func listenBothSameDynamicPort(networkSuffix, host string) (tcpL net.Listener, utpSock *utp.Socket, listenedAddr string, err error) {
181         for {
182                 tcpL, err = listenTCP(networkSuffix, net.JoinHostPort(host, "0"))
183                 if err != nil {
184                         return
185                 }
186                 listenedAddr = tcpL.Addr().String()
187                 utpSock, err = listenUTP(networkSuffix, listenedAddr)
188                 if err == nil {
189                         return
190                 }
191                 tcpL.Close()
192                 if !strings.Contains(err.Error(), "address already in use") {
193                         return
194                 }
195         }
196 }
197
198 // Listen to enabled protocols, ensuring ports match.
199 func listen(tcp, utp bool, networkSuffix, addr string) (tcpL net.Listener, utpSock *utp.Socket, listenedAddr string, err error) {
200         if addr == "" {
201                 addr = ":50007"
202         }
203         if tcp && utp {
204                 var host string
205                 var port int
206                 host, port, err = missinggo.ParseHostPort(addr)
207                 if err != nil {
208                         return
209                 }
210                 if port == 0 {
211                         // If both protocols are active, they need to have the same port.
212                         return listenBothSameDynamicPort(networkSuffix, host)
213                 }
214         }
215         defer func() {
216                 if err != nil {
217                         listenedAddr = ""
218                 }
219         }()
220         if tcp {
221                 tcpL, err = listenTCP(networkSuffix, addr)
222                 if err != nil {
223                         return
224                 }
225                 defer func() {
226                         if err != nil {
227                                 tcpL.Close()
228                         }
229                 }()
230                 listenedAddr = tcpL.Addr().String()
231         }
232         if utp {
233                 utpSock, err = listenUTP(networkSuffix, addr)
234                 if err != nil {
235                         return
236                 }
237                 listenedAddr = utpSock.Addr().String()
238         }
239         return
240 }
241
242 // Creates a new client.
243 func NewClient(cfg *Config) (cl *Client, err error) {
244         if cfg == nil {
245                 cfg = &Config{}
246         }
247
248         defer func() {
249                 if err != nil {
250                         cl = nil
251                 }
252         }()
253         cl = &Client{
254                 halfOpenLimit:     defaultHalfOpenConnsPerTorrent,
255                 config:            *cfg,
256                 dopplegangerAddrs: make(map[string]struct{}),
257                 torrents:          make(map[metainfo.Hash]*Torrent),
258         }
259         missinggo.CopyExact(&cl.extensionBytes, defaultExtensionBytes)
260         cl.event.L = &cl.mu
261         storageImpl := cfg.DefaultStorage
262         if storageImpl == nil {
263                 storageImpl = storage.NewFile(cfg.DataDir)
264         }
265         cl.defaultStorage = storage.NewClient(storageImpl)
266         if cfg.IPBlocklist != nil {
267                 cl.ipBlockList = cfg.IPBlocklist
268         }
269
270         if cfg.PeerID != "" {
271                 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
272         } else {
273                 o := copy(cl.peerID[:], bep20)
274                 _, err = rand.Read(cl.peerID[o:])
275                 if err != nil {
276                         panic("error generating peer id")
277                 }
278         }
279
280         cl.tcpListener, cl.utpSock, cl.listenAddr, err = listen(
281                 !cl.config.DisableTCP,
282                 !cl.config.DisableUTP,
283                 func() string {
284                         if cl.config.DisableIPv6 {
285                                 return "4"
286                         } else {
287                                 return ""
288                         }
289                 }(),
290                 cl.config.ListenAddr)
291         if err != nil {
292                 return
293         }
294         if cl.tcpListener != nil {
295                 go cl.acceptConnections(cl.tcpListener, false)
296         }
297         if cl.utpSock != nil {
298                 go cl.acceptConnections(cl.utpSock, true)
299         }
300         if !cfg.NoDHT {
301                 dhtCfg := cfg.DHTConfig
302                 if dhtCfg.IPBlocklist == nil {
303                         dhtCfg.IPBlocklist = cl.ipBlockList
304                 }
305                 dhtCfg.Addr = firstNonEmptyString(dhtCfg.Addr, cl.listenAddr, cl.config.ListenAddr)
306                 if dhtCfg.Conn == nil && cl.utpSock != nil {
307                         dhtCfg.Conn = cl.utpSock
308                 }
309                 cl.dHT, err = dht.NewServer(&dhtCfg)
310                 if err != nil {
311                         return
312                 }
313         }
314
315         return
316 }
317
318 func firstNonEmptyString(ss ...string) string {
319         for _, s := range ss {
320                 if s != "" {
321                         return s
322                 }
323         }
324         return ""
325 }
326
327 // Stops the client. All connections to peers are closed and all activity will
328 // come to a halt.
329 func (cl *Client) Close() {
330         cl.mu.Lock()
331         defer cl.mu.Unlock()
332         cl.closed.Set()
333         if cl.dHT != nil {
334                 cl.dHT.Close()
335         }
336         if cl.utpSock != nil {
337                 cl.utpSock.CloseNow()
338         }
339         if cl.tcpListener != nil {
340                 cl.tcpListener.Close()
341         }
342         for _, t := range cl.torrents {
343                 t.close()
344         }
345         cl.event.Broadcast()
346 }
347
348 var ipv6BlockRange = iplist.Range{Description: "non-IPv4 address"}
349
350 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
351         if cl.ipBlockList == nil {
352                 return
353         }
354         ip4 := ip.To4()
355         // If blocklists are enabled, then block non-IPv4 addresses, because
356         // blocklists do not yet support IPv6.
357         if ip4 == nil {
358                 if missinggo.CryHeard() {
359                         log.Printf("blocking non-IPv4 address: %s", ip)
360                 }
361                 r = ipv6BlockRange
362                 blocked = true
363                 return
364         }
365         return cl.ipBlockList.Lookup(ip4)
366 }
367
368 func (cl *Client) waitAccept() {
369         for {
370                 for _, t := range cl.torrents {
371                         if t.wantConns() {
372                                 return
373                         }
374                 }
375                 if cl.closed.IsSet() {
376                         return
377                 }
378                 cl.event.Wait()
379         }
380 }
381
382 func (cl *Client) acceptConnections(l net.Listener, utp bool) {
383         cl.mu.Lock()
384         defer cl.mu.Unlock()
385         for {
386                 cl.waitAccept()
387                 cl.mu.Unlock()
388                 conn, err := l.Accept()
389                 conn = pproffd.WrapNetConn(conn)
390                 cl.mu.Lock()
391                 if cl.closed.IsSet() {
392                         if conn != nil {
393                                 conn.Close()
394                         }
395                         return
396                 }
397                 if err != nil {
398                         log.Print(err)
399                         // I think something harsher should happen here? Our accept
400                         // routine just fucked off.
401                         return
402                 }
403                 if utp {
404                         acceptUTP.Add(1)
405                 } else {
406                         acceptTCP.Add(1)
407                 }
408                 reject := cl.badPeerIPPort(
409                         missinggo.AddrIP(conn.RemoteAddr()),
410                         missinggo.AddrPort(conn.RemoteAddr()))
411                 if reject {
412                         acceptReject.Add(1)
413                         conn.Close()
414                         continue
415                 }
416                 go cl.incomingConnection(conn, utp)
417         }
418 }
419
420 func (cl *Client) incomingConnection(nc net.Conn, utp bool) {
421         defer nc.Close()
422         if tc, ok := nc.(*net.TCPConn); ok {
423                 tc.SetLinger(0)
424         }
425         c := newConnection(nc, &cl.mu)
426         c.Discovery = peerSourceIncoming
427         c.uTP = utp
428         cl.runReceivedConn(c)
429 }
430
431 // Returns a handle to the given torrent, if it's present in the client.
432 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
433         cl.mu.Lock()
434         defer cl.mu.Unlock()
435         t, ok = cl.torrents[ih]
436         return
437 }
438
439 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
440         return cl.torrents[ih]
441 }
442
443 type dialResult struct {
444         Conn net.Conn
445         UTP  bool
446 }
447
448 func doDial(dial func(addr string, t *Torrent) (net.Conn, error), ch chan dialResult, utp bool, addr string, t *Torrent) {
449         conn, err := dial(addr, t)
450         if err != nil {
451                 if conn != nil {
452                         conn.Close()
453                 }
454                 conn = nil // Pedantic
455         }
456         ch <- dialResult{conn, utp}
457         if err == nil {
458                 successfulDials.Add(1)
459                 return
460         }
461         unsuccessfulDials.Add(1)
462 }
463
464 func reducedDialTimeout(max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
465         ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
466         if ret < minDialTimeout {
467                 ret = minDialTimeout
468         }
469         return
470 }
471
472 // Returns whether an address is known to connect to a client with our own ID.
473 func (cl *Client) dopplegangerAddr(addr string) bool {
474         _, ok := cl.dopplegangerAddrs[addr]
475         return ok
476 }
477
478 // Start the process of connecting to the given peer for the given torrent if
479 // appropriate.
480 func (cl *Client) initiateConn(peer Peer, t *Torrent) {
481         if peer.Id == cl.peerID {
482                 return
483         }
484         if cl.badPeerIPPort(peer.IP, peer.Port) {
485                 return
486         }
487         addr := net.JoinHostPort(peer.IP.String(), fmt.Sprintf("%d", peer.Port))
488         if t.addrActive(addr) {
489                 return
490         }
491         t.halfOpen[addr] = struct{}{}
492         go cl.outgoingConnection(t, addr, peer.Source)
493 }
494
495 func (cl *Client) dialTimeout(t *Torrent) time.Duration {
496         cl.mu.Lock()
497         pendingPeers := len(t.peers)
498         cl.mu.Unlock()
499         return reducedDialTimeout(nominalDialTimeout, cl.halfOpenLimit, pendingPeers)
500 }
501
502 func (cl *Client) dialTCP(addr string, t *Torrent) (c net.Conn, err error) {
503         c, err = net.DialTimeout("tcp", addr, cl.dialTimeout(t))
504         if err == nil {
505                 c.(*net.TCPConn).SetLinger(0)
506         }
507         c = pproffd.WrapNetConn(c)
508         return
509 }
510
511 func (cl *Client) dialUTP(addr string, t *Torrent) (c net.Conn, err error) {
512         return cl.utpSock.DialTimeout(addr, cl.dialTimeout(t))
513 }
514
515 // Returns a connection over UTP or TCP, whichever is first to connect.
516 func (cl *Client) dialFirst(addr string, t *Torrent) (conn net.Conn, utp bool) {
517         // Initiate connections via TCP and UTP simultaneously. Use the first one
518         // that succeeds.
519         left := 0
520         if !cl.config.DisableUTP {
521                 left++
522         }
523         if !cl.config.DisableTCP {
524                 left++
525         }
526         resCh := make(chan dialResult, left)
527         if !cl.config.DisableUTP {
528                 go doDial(cl.dialUTP, resCh, true, addr, t)
529         }
530         if !cl.config.DisableTCP {
531                 go doDial(cl.dialTCP, resCh, false, addr, t)
532         }
533         var res dialResult
534         // Wait for a successful connection.
535         for ; left > 0 && res.Conn == nil; left-- {
536                 res = <-resCh
537         }
538         if left > 0 {
539                 // There are still incompleted dials.
540                 go func() {
541                         for ; left > 0; left-- {
542                                 conn := (<-resCh).Conn
543                                 if conn != nil {
544                                         conn.Close()
545                                 }
546                         }
547                 }()
548         }
549         conn = res.Conn
550         utp = res.UTP
551         return
552 }
553
554 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
555         if _, ok := t.halfOpen[addr]; !ok {
556                 panic("invariant broken")
557         }
558         delete(t.halfOpen, addr)
559         cl.openNewConns(t)
560 }
561
562 // Performs initiator handshakes and returns a connection. Returns nil
563 // *connection if no connection for valid reasons.
564 func (cl *Client) handshakesConnection(nc net.Conn, t *Torrent, encrypted, utp bool) (c *connection, err error) {
565         c = newConnection(nc, &cl.mu)
566         c.encrypted = encrypted
567         c.uTP = utp
568         err = nc.SetDeadline(time.Now().Add(handshakesTimeout))
569         if err != nil {
570                 return
571         }
572         ok, err := cl.initiateHandshakes(c, t)
573         if !ok {
574                 c = nil
575         }
576         return
577 }
578
579 // Returns nil connection and nil error if no connection could be established
580 // for valid reasons.
581 func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) {
582         nc, utp := cl.dialFirst(addr, t)
583         if nc == nil {
584                 return
585         }
586         c, err = cl.handshakesConnection(nc, t, !cl.config.DisableEncryption, utp)
587         if err != nil {
588                 nc.Close()
589                 return
590         } else if c != nil {
591                 return
592         }
593         nc.Close()
594         if cl.config.DisableEncryption {
595                 // We already tried without encryption.
596                 return
597         }
598         // Try again without encryption, using whichever protocol type worked last
599         // time.
600         if utp {
601                 nc, err = cl.dialUTP(addr, t)
602         } else {
603                 nc, err = cl.dialTCP(addr, t)
604         }
605         if err != nil {
606                 err = fmt.Errorf("error dialing for unencrypted connection: %s", err)
607                 return
608         }
609         c, err = cl.handshakesConnection(nc, t, false, utp)
610         if err != nil || c == nil {
611                 nc.Close()
612         }
613         return
614 }
615
616 // Called to dial out and run a connection. The addr we're given is already
617 // considered half-open.
618 func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
619         c, err := cl.establishOutgoingConn(t, addr)
620         cl.mu.Lock()
621         defer cl.mu.Unlock()
622         // Don't release lock between here and addConnection, unless it's for
623         // failure.
624         cl.noLongerHalfOpen(t, addr)
625         if err != nil {
626                 if cl.config.Debug {
627                         log.Printf("error establishing outgoing connection: %s", err)
628                 }
629                 return
630         }
631         if c == nil {
632                 return
633         }
634         defer c.Close()
635         c.Discovery = ps
636         cl.runInitiatedHandshookConn(c, t)
637 }
638
639 // The port number for incoming peer connections. 0 if the client isn't
640 // listening.
641 func (cl *Client) incomingPeerPort() int {
642         if cl.listenAddr == "" {
643                 return 0
644         }
645         _, port, err := missinggo.ParseHostPort(cl.listenAddr)
646         if err != nil {
647                 panic(err)
648         }
649         return port
650 }
651
652 // Convert a net.Addr to its compact IP representation. Either 4 or 16 bytes
653 // per "yourip" field of http://www.bittorrent.org/beps/bep_0010.html.
654 func addrCompactIP(addr net.Addr) (string, error) {
655         host, _, err := net.SplitHostPort(addr.String())
656         if err != nil {
657                 return "", err
658         }
659         ip := net.ParseIP(host)
660         if v4 := ip.To4(); v4 != nil {
661                 if len(v4) != 4 {
662                         panic(v4)
663                 }
664                 return string(v4), nil
665         }
666         return string(ip.To16()), nil
667 }
668
669 func handshakeWriter(w io.Writer, bb <-chan []byte, done chan<- error) {
670         var err error
671         for b := range bb {
672                 _, err = w.Write(b)
673                 if err != nil {
674                         break
675                 }
676         }
677         done <- err
678 }
679
680 type (
681         peerExtensionBytes [8]byte
682         peerID             [20]byte
683 )
684
685 func (pex *peerExtensionBytes) SupportsExtended() bool {
686         return pex[5]&0x10 != 0
687 }
688
689 func (pex *peerExtensionBytes) SupportsDHT() bool {
690         return pex[7]&0x01 != 0
691 }
692
693 func (pex *peerExtensionBytes) SupportsFast() bool {
694         return pex[7]&0x04 != 0
695 }
696
697 type handshakeResult struct {
698         peerExtensionBytes
699         peerID
700         metainfo.Hash
701 }
702
703 // ih is nil if we expect the peer to declare the InfoHash, such as when the
704 // peer initiated the connection. Returns ok if the handshake was successful,
705 // and err if there was an unexpected condition other than the peer simply
706 // abandoning the handshake.
707 func handshake(sock io.ReadWriter, ih *metainfo.Hash, peerID [20]byte, extensions peerExtensionBytes) (res handshakeResult, ok bool, err error) {
708         // Bytes to be sent to the peer. Should never block the sender.
709         postCh := make(chan []byte, 4)
710         // A single error value sent when the writer completes.
711         writeDone := make(chan error, 1)
712         // Performs writes to the socket and ensures posts don't block.
713         go handshakeWriter(sock, postCh, writeDone)
714
715         defer func() {
716                 close(postCh) // Done writing.
717                 if !ok {
718                         return
719                 }
720                 if err != nil {
721                         panic(err)
722                 }
723                 // Wait until writes complete before returning from handshake.
724                 err = <-writeDone
725                 if err != nil {
726                         err = fmt.Errorf("error writing: %s", err)
727                 }
728         }()
729
730         post := func(bb []byte) {
731                 select {
732                 case postCh <- bb:
733                 default:
734                         panic("mustn't block while posting")
735                 }
736         }
737
738         post([]byte(pp.Protocol))
739         post(extensions[:])
740         if ih != nil { // We already know what we want.
741                 post(ih[:])
742                 post(peerID[:])
743         }
744         var b [68]byte
745         _, err = io.ReadFull(sock, b[:68])
746         if err != nil {
747                 err = nil
748                 return
749         }
750         if string(b[:20]) != pp.Protocol {
751                 return
752         }
753         missinggo.CopyExact(&res.peerExtensionBytes, b[20:28])
754         missinggo.CopyExact(&res.Hash, b[28:48])
755         missinggo.CopyExact(&res.peerID, b[48:68])
756         peerExtensions.Add(hex.EncodeToString(res.peerExtensionBytes[:]), 1)
757
758         // TODO: Maybe we can just drop peers here if we're not interested. This
759         // could prevent them trying to reconnect, falsely believing there was
760         // just a problem.
761         if ih == nil { // We were waiting for the peer to tell us what they wanted.
762                 post(res.Hash[:])
763                 post(peerID[:])
764         }
765
766         ok = true
767         return
768 }
769
770 // Wraps a raw connection and provides the interface we want for using the
771 // connection in the message loop.
772 type deadlineReader struct {
773         nc net.Conn
774         r  io.Reader
775 }
776
777 func (r deadlineReader) Read(b []byte) (n int, err error) {
778         // Keep-alives should be received every 2 mins. Give a bit of gracetime.
779         err = r.nc.SetReadDeadline(time.Now().Add(150 * time.Second))
780         if err != nil {
781                 err = fmt.Errorf("error setting read deadline: %s", err)
782         }
783         n, err = r.r.Read(b)
784         // Convert common errors into io.EOF.
785         // if err != nil {
786         //      if opError, ok := err.(*net.OpError); ok && opError.Op == "read" && opError.Err == syscall.ECONNRESET {
787         //              err = io.EOF
788         //      } else if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
789         //              if n != 0 {
790         //                      panic(n)
791         //              }
792         //              err = io.EOF
793         //      }
794         // }
795         return
796 }
797
798 type readWriter struct {
799         io.Reader
800         io.Writer
801 }
802
803 func maybeReceiveEncryptedHandshake(rw io.ReadWriter, skeys [][]byte) (ret io.ReadWriter, encrypted bool, err error) {
804         var protocol [len(pp.Protocol)]byte
805         _, err = io.ReadFull(rw, protocol[:])
806         if err != nil {
807                 return
808         }
809         ret = readWriter{
810                 io.MultiReader(bytes.NewReader(protocol[:]), rw),
811                 rw,
812         }
813         if string(protocol[:]) == pp.Protocol {
814                 return
815         }
816         encrypted = true
817         ret, err = mse.ReceiveHandshake(ret, skeys)
818         return
819 }
820
821 func (cl *Client) receiveSkeys() (ret [][]byte) {
822         for ih := range cl.torrents {
823                 ret = append(ret, ih[:])
824         }
825         return
826 }
827
828 func (cl *Client) initiateHandshakes(c *connection, t *Torrent) (ok bool, err error) {
829         if c.encrypted {
830                 c.rw, err = mse.InitiateHandshake(c.rw, t.infoHash[:], nil)
831                 if err != nil {
832                         return
833                 }
834         }
835         ih, ok, err := cl.connBTHandshake(c, &t.infoHash)
836         if ih != t.infoHash {
837                 ok = false
838         }
839         return
840 }
841
842 // Do encryption and bittorrent handshakes as receiver.
843 func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
844         cl.mu.Lock()
845         skeys := cl.receiveSkeys()
846         cl.mu.Unlock()
847         if !cl.config.DisableEncryption {
848                 c.rw, c.encrypted, err = maybeReceiveEncryptedHandshake(c.rw, skeys)
849                 if err != nil {
850                         if err == mse.ErrNoSecretKeyMatch {
851                                 err = nil
852                         }
853                         return
854                 }
855         }
856         ih, ok, err := cl.connBTHandshake(c, nil)
857         if err != nil {
858                 err = fmt.Errorf("error during bt handshake: %s", err)
859                 return
860         }
861         if !ok {
862                 return
863         }
864         cl.mu.Lock()
865         t = cl.torrents[ih]
866         cl.mu.Unlock()
867         return
868 }
869
870 // Returns !ok if handshake failed for valid reasons.
871 func (cl *Client) connBTHandshake(c *connection, ih *metainfo.Hash) (ret metainfo.Hash, ok bool, err error) {
872         res, ok, err := handshake(c.rw, ih, cl.peerID, cl.extensionBytes)
873         if err != nil || !ok {
874                 return
875         }
876         ret = res.Hash
877         c.PeerExtensionBytes = res.peerExtensionBytes
878         c.PeerID = res.peerID
879         c.completedHandshake = time.Now()
880         return
881 }
882
883 func (cl *Client) runInitiatedHandshookConn(c *connection, t *Torrent) {
884         if c.PeerID == cl.peerID {
885                 connsToSelf.Add(1)
886                 addr := c.conn.RemoteAddr().String()
887                 cl.dopplegangerAddrs[addr] = struct{}{}
888                 return
889         }
890         cl.runHandshookConn(c, t)
891 }
892
893 func (cl *Client) runReceivedConn(c *connection) {
894         err := c.conn.SetDeadline(time.Now().Add(handshakesTimeout))
895         if err != nil {
896                 panic(err)
897         }
898         t, err := cl.receiveHandshakes(c)
899         if err != nil {
900                 if cl.config.Debug {
901                         log.Printf("error receiving handshakes: %s", err)
902                 }
903                 return
904         }
905         if t == nil {
906                 return
907         }
908         cl.mu.Lock()
909         defer cl.mu.Unlock()
910         if c.PeerID == cl.peerID {
911                 // Because the remote address is not necessarily the same as its
912                 // client's torrent listen address, we won't record the remote address
913                 // as a doppleganger. Instead, the initiator can record *us* as the
914                 // doppleganger.
915                 return
916         }
917         cl.runHandshookConn(c, t)
918 }
919
920 func (cl *Client) runHandshookConn(c *connection, t *Torrent) {
921         c.conn.SetWriteDeadline(time.Time{})
922         c.rw = readWriter{
923                 deadlineReader{c.conn, c.rw},
924                 c.rw,
925         }
926         completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
927         if !t.addConnection(c) {
928                 return
929         }
930         defer t.dropConnection(c)
931         go c.writer(time.Minute)
932         cl.sendInitialMessages(c, t)
933         err := cl.connectionLoop(t, c)
934         if err != nil && cl.config.Debug {
935                 log.Printf("error during connection loop: %s", err)
936         }
937 }
938
939 func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
940         if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
941                 conn.Post(pp.Message{
942                         Type:       pp.Extended,
943                         ExtendedID: pp.HandshakeExtendedID,
944                         ExtendedPayload: func() []byte {
945                                 d := map[string]interface{}{
946                                         "m": func() (ret map[string]int) {
947                                                 ret = make(map[string]int, 2)
948                                                 ret["ut_metadata"] = metadataExtendedId
949                                                 if !cl.config.DisablePEX {
950                                                         ret["ut_pex"] = pexExtendedId
951                                                 }
952                                                 return
953                                         }(),
954                                         "v": extendedHandshakeClientVersion,
955                                         // No upload queue is implemented yet.
956                                         "reqq": 64,
957                                 }
958                                 if !cl.config.DisableEncryption {
959                                         d["e"] = 1
960                                 }
961                                 if torrent.metadataSizeKnown() {
962                                         d["metadata_size"] = torrent.metadataSize()
963                                 }
964                                 if p := cl.incomingPeerPort(); p != 0 {
965                                         d["p"] = p
966                                 }
967                                 yourip, err := addrCompactIP(conn.remoteAddr())
968                                 if err != nil {
969                                         log.Printf("error calculating yourip field value in extension handshake: %s", err)
970                                 } else {
971                                         d["yourip"] = yourip
972                                 }
973                                 // log.Printf("sending %v", d)
974                                 b, err := bencode.Marshal(d)
975                                 if err != nil {
976                                         panic(err)
977                                 }
978                                 return b
979                         }(),
980                 })
981         }
982         if torrent.haveAnyPieces() {
983                 conn.Bitfield(torrent.bitfield())
984         } else if cl.extensionBytes.SupportsFast() && conn.PeerExtensionBytes.SupportsFast() {
985                 conn.Post(pp.Message{
986                         Type: pp.HaveNone,
987                 })
988         }
989         if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.dHT != nil {
990                 conn.Post(pp.Message{
991                         Type: pp.Port,
992                         Port: uint16(missinggo.AddrPort(cl.dHT.Addr())),
993                 })
994         }
995 }
996
997 func (cl *Client) peerUnchoked(torrent *Torrent, conn *connection) {
998         conn.updateRequests()
999 }
1000
1001 func (cl *Client) connCancel(t *Torrent, cn *connection, r request) (ok bool) {
1002         ok = cn.Cancel(r)
1003         if ok {
1004                 postedCancels.Add(1)
1005         }
1006         return
1007 }
1008
1009 func (cl *Client) connDeleteRequest(t *Torrent, cn *connection, r request) bool {
1010         if !cn.RequestPending(r) {
1011                 return false
1012         }
1013         delete(cn.Requests, r)
1014         return true
1015 }
1016
1017 // Process incoming ut_metadata message.
1018 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) error {
1019         var d map[string]int
1020         err := bencode.Unmarshal(payload, &d)
1021         if err != nil {
1022                 return fmt.Errorf("error unmarshalling payload: %s: %q", err, payload)
1023         }
1024         msgType, ok := d["msg_type"]
1025         if !ok {
1026                 return errors.New("missing msg_type field")
1027         }
1028         piece := d["piece"]
1029         switch msgType {
1030         case pp.DataMetadataExtensionMsgType:
1031                 if !c.requestedMetadataPiece(piece) {
1032                         return fmt.Errorf("got unexpected piece %d", piece)
1033                 }
1034                 c.metadataRequests[piece] = false
1035                 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
1036                 if begin < 0 || begin >= len(payload) {
1037                         return fmt.Errorf("data has bad offset in payload: %d", begin)
1038                 }
1039                 t.saveMetadataPiece(piece, payload[begin:])
1040                 c.UsefulChunksReceived++
1041                 c.lastUsefulChunkReceived = time.Now()
1042                 return t.maybeCompleteMetadata()
1043         case pp.RequestMetadataExtensionMsgType:
1044                 if !t.haveMetadataPiece(piece) {
1045                         c.Post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
1046                         return nil
1047                 }
1048                 start := (1 << 14) * piece
1049                 c.Post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1050                 return nil
1051         case pp.RejectMetadataExtensionMsgType:
1052                 return nil
1053         default:
1054                 return errors.New("unknown msg_type value")
1055         }
1056 }
1057
1058 func (cl *Client) upload(t *Torrent, c *connection) {
1059         if cl.config.NoUpload {
1060                 return
1061         }
1062         if !c.PeerInterested {
1063                 return
1064         }
1065         seeding := t.seeding()
1066         if !seeding && !t.connHasWantedPieces(c) {
1067                 return
1068         }
1069 another:
1070         for seeding || c.chunksSent < c.UsefulChunksReceived+6 {
1071                 c.Unchoke()
1072                 for r := range c.PeerRequests {
1073                         err := cl.sendChunk(t, c, r)
1074                         if err != nil {
1075                                 i := int(r.Index)
1076                                 if t.pieceComplete(i) {
1077                                         t.updatePieceCompletion(i)
1078                                         if !t.pieceComplete(i) {
1079                                                 // We had the piece, but not anymore.
1080                                                 break another
1081                                         }
1082                                 }
1083                                 log.Printf("error sending chunk %+v to peer: %s", r, err)
1084                                 // If we failed to send a chunk, choke the peer to ensure they
1085                                 // flush all their requests. We've probably dropped a piece,
1086                                 // but there's no way to communicate this to the peer. If they
1087                                 // ask for it again, we'll kick them to allow us to send them
1088                                 // an updated bitfield.
1089                                 break another
1090                         }
1091                         delete(c.PeerRequests, r)
1092                         goto another
1093                 }
1094                 return
1095         }
1096         c.Choke()
1097 }
1098
1099 func (cl *Client) sendChunk(t *Torrent, c *connection, r request) error {
1100         // Count the chunk being sent, even if it isn't.
1101         b := make([]byte, r.Length)
1102         p := t.info.Piece(int(r.Index))
1103         n, err := t.readAt(b, p.Offset()+int64(r.Begin))
1104         if n != len(b) {
1105                 if err == nil {
1106                         panic("expected error")
1107                 }
1108                 return err
1109         }
1110         c.Post(pp.Message{
1111                 Type:  pp.Piece,
1112                 Index: r.Index,
1113                 Begin: r.Begin,
1114                 Piece: b,
1115         })
1116         c.chunksSent++
1117         uploadChunksPosted.Add(1)
1118         c.lastChunkSent = time.Now()
1119         return nil
1120 }
1121
1122 // Processes incoming bittorrent messages. The client lock is held upon entry
1123 // and exit. Returning will end the connection.
1124 func (cl *Client) connectionLoop(t *Torrent, c *connection) error {
1125         decoder := pp.Decoder{
1126                 R:         bufio.NewReader(c.rw),
1127                 MaxLength: 256 * 1024,
1128         }
1129         for {
1130                 cl.mu.Unlock()
1131                 var msg pp.Message
1132                 err := decoder.Decode(&msg)
1133                 cl.mu.Lock()
1134                 if cl.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
1135                         return nil
1136                 }
1137                 if err != nil {
1138                         return err
1139                 }
1140                 c.readMsg(&msg)
1141                 c.lastMessageReceived = time.Now()
1142                 if msg.Keepalive {
1143                         receivedKeepalives.Add(1)
1144                         continue
1145                 }
1146                 receivedMessageTypes.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
1147                 switch msg.Type {
1148                 case pp.Choke:
1149                         c.PeerChoked = true
1150                         c.Requests = nil
1151                         // We can then reset our interest.
1152                         c.updateRequests()
1153                 case pp.Reject:
1154                         cl.connDeleteRequest(t, c, newRequest(msg.Index, msg.Begin, msg.Length))
1155                         c.updateRequests()
1156                 case pp.Unchoke:
1157                         c.PeerChoked = false
1158                         cl.peerUnchoked(t, c)
1159                 case pp.Interested:
1160                         c.PeerInterested = true
1161                         cl.upload(t, c)
1162                 case pp.NotInterested:
1163                         c.PeerInterested = false
1164                         c.Choke()
1165                 case pp.Have:
1166                         err = c.peerSentHave(int(msg.Index))
1167                 case pp.Request:
1168                         if c.Choked {
1169                                 break
1170                         }
1171                         if !c.PeerInterested {
1172                                 err = errors.New("peer sent request but isn't interested")
1173                                 break
1174                         }
1175                         if !t.havePiece(msg.Index.Int()) {
1176                                 // This isn't necessarily them screwing up. We can drop pieces
1177                                 // from our storage, and can't communicate this to peers
1178                                 // except by reconnecting.
1179                                 requestsReceivedForMissingPieces.Add(1)
1180                                 err = errors.New("peer requested piece we don't have")
1181                                 break
1182                         }
1183                         if c.PeerRequests == nil {
1184                                 c.PeerRequests = make(map[request]struct{}, maxRequests)
1185                         }
1186                         c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{}
1187                         cl.upload(t, c)
1188                 case pp.Cancel:
1189                         req := newRequest(msg.Index, msg.Begin, msg.Length)
1190                         if !c.PeerCancel(req) {
1191                                 unexpectedCancels.Add(1)
1192                         }
1193                 case pp.Bitfield:
1194                         err = c.peerSentBitfield(msg.Bitfield)
1195                 case pp.HaveAll:
1196                         err = c.peerSentHaveAll()
1197                 case pp.HaveNone:
1198                         err = c.peerSentHaveNone()
1199                 case pp.Piece:
1200                         cl.downloadedChunk(t, c, &msg)
1201                 case pp.Extended:
1202                         switch msg.ExtendedID {
1203                         case pp.HandshakeExtendedID:
1204                                 // TODO: Create a bencode struct for this.
1205                                 var d map[string]interface{}
1206                                 err = bencode.Unmarshal(msg.ExtendedPayload, &d)
1207                                 if err != nil {
1208                                         err = fmt.Errorf("error decoding extended message payload: %s", err)
1209                                         break
1210                                 }
1211                                 // log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d)
1212                                 if reqq, ok := d["reqq"]; ok {
1213                                         if i, ok := reqq.(int64); ok {
1214                                                 c.PeerMaxRequests = int(i)
1215                                         }
1216                                 }
1217                                 if v, ok := d["v"]; ok {
1218                                         c.PeerClientName = v.(string)
1219                                 }
1220                                 m, ok := d["m"]
1221                                 if !ok {
1222                                         err = errors.New("handshake missing m item")
1223                                         break
1224                                 }
1225                                 mTyped, ok := m.(map[string]interface{})
1226                                 if !ok {
1227                                         err = errors.New("handshake m value is not dict")
1228                                         break
1229                                 }
1230                                 if c.PeerExtensionIDs == nil {
1231                                         c.PeerExtensionIDs = make(map[string]byte, len(mTyped))
1232                                 }
1233                                 for name, v := range mTyped {
1234                                         id, ok := v.(int64)
1235                                         if !ok {
1236                                                 log.Printf("bad handshake m item extension ID type: %T", v)
1237                                                 continue
1238                                         }
1239                                         if id == 0 {
1240                                                 delete(c.PeerExtensionIDs, name)
1241                                         } else {
1242                                                 if c.PeerExtensionIDs[name] == 0 {
1243                                                         supportedExtensionMessages.Add(name, 1)
1244                                                 }
1245                                                 c.PeerExtensionIDs[name] = byte(id)
1246                                         }
1247                                 }
1248                                 metadata_sizeUntyped, ok := d["metadata_size"]
1249                                 if ok {
1250                                         metadata_size, ok := metadata_sizeUntyped.(int64)
1251                                         if !ok {
1252                                                 log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
1253                                         } else {
1254                                                 err = t.setMetadataSize(metadata_size)
1255                                                 if err != nil {
1256                                                         err = fmt.Errorf("error setting metadata size to %d", metadata_size)
1257                                                         break
1258                                                 }
1259                                         }
1260                                 }
1261                                 if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
1262                                         c.requestPendingMetadata()
1263                                 }
1264                         case metadataExtendedId:
1265                                 err = cl.gotMetadataExtensionMsg(msg.ExtendedPayload, t, c)
1266                                 if err != nil {
1267                                         err = fmt.Errorf("error handling metadata extension message: %s", err)
1268                                 }
1269                         case pexExtendedId:
1270                                 if cl.config.DisablePEX {
1271                                         break
1272                                 }
1273                                 var pexMsg peerExchangeMessage
1274                                 err = bencode.Unmarshal(msg.ExtendedPayload, &pexMsg)
1275                                 if err != nil {
1276                                         err = fmt.Errorf("error unmarshalling PEX message: %s", err)
1277                                         break
1278                                 }
1279                                 go func() {
1280                                         cl.mu.Lock()
1281                                         t.addPeers(func() (ret []Peer) {
1282                                                 for i, cp := range pexMsg.Added {
1283                                                         p := Peer{
1284                                                                 IP:     make([]byte, 4),
1285                                                                 Port:   cp.Port,
1286                                                                 Source: peerSourcePEX,
1287                                                         }
1288                                                         if i < len(pexMsg.AddedFlags) && pexMsg.AddedFlags[i]&0x01 != 0 {
1289                                                                 p.SupportsEncryption = true
1290                                                         }
1291                                                         missinggo.CopyExact(p.IP, cp.IP[:])
1292                                                         ret = append(ret, p)
1293                                                 }
1294                                                 return
1295                                         }())
1296                                         cl.mu.Unlock()
1297                                 }()
1298                         default:
1299                                 err = fmt.Errorf("unexpected extended message ID: %v", msg.ExtendedID)
1300                         }
1301                         if err != nil {
1302                                 // That client uses its own extension IDs for outgoing message
1303                                 // types, which is incorrect.
1304                                 if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) ||
1305                                         strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
1306                                         return nil
1307                                 }
1308                         }
1309                 case pp.Port:
1310                         if cl.dHT == nil {
1311                                 break
1312                         }
1313                         pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
1314                         if err != nil {
1315                                 panic(err)
1316                         }
1317                         if msg.Port != 0 {
1318                                 pingAddr.Port = int(msg.Port)
1319                         }
1320                         cl.dHT.Ping(pingAddr)
1321                 default:
1322                         err = fmt.Errorf("received unknown message type: %#v", msg.Type)
1323                 }
1324                 if err != nil {
1325                         return err
1326                 }
1327         }
1328 }
1329
1330 func (cl *Client) openNewConns(t *Torrent) {
1331         defer t.updateWantPeersEvent()
1332         for len(t.peers) != 0 {
1333                 if !t.wantConns() {
1334                         return
1335                 }
1336                 if len(t.halfOpen) >= cl.halfOpenLimit {
1337                         return
1338                 }
1339                 var (
1340                         k peersKey
1341                         p Peer
1342                 )
1343                 for k, p = range t.peers {
1344                         break
1345                 }
1346                 delete(t.peers, k)
1347                 cl.initiateConn(p, t)
1348         }
1349 }
1350
1351 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1352         if port == 0 {
1353                 return true
1354         }
1355         if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1356                 return true
1357         }
1358         if _, ok := cl.ipBlockRange(ip); ok {
1359                 return true
1360         }
1361         if _, ok := cl.badPeerIPs[ip.String()]; ok {
1362                 return true
1363         }
1364         return false
1365 }
1366
1367 // Return a Torrent ready for insertion into a Client.
1368 func (cl *Client) newTorrent(ih metainfo.Hash) (t *Torrent) {
1369         t = &Torrent{
1370                 cl:        cl,
1371                 infoHash:  ih,
1372                 chunkSize: defaultChunkSize,
1373                 peers:     make(map[peersKey]Peer),
1374
1375                 halfOpen:          make(map[string]struct{}),
1376                 pieceStateChanges: pubsub.NewPubSub(),
1377
1378                 storageOpener:       cl.defaultStorage,
1379                 maxEstablishedConns: defaultEstablishedConnsPerTorrent,
1380         }
1381         return
1382 }
1383
1384 func init() {
1385         // For shuffling the tracker tiers.
1386         mathRand.Seed(time.Now().Unix())
1387 }
1388
1389 type trackerTier []string
1390
1391 // The trackers within each tier must be shuffled before use.
1392 // http://stackoverflow.com/a/12267471/149482
1393 // http://www.bittorrent.org/beps/bep_0012.html#order-of-processing
1394 func shuffleTier(tier trackerTier) {
1395         for i := range tier {
1396                 j := mathRand.Intn(i + 1)
1397                 tier[i], tier[j] = tier[j], tier[i]
1398         }
1399 }
1400
1401 // A file-like handle to some torrent data resource.
1402 type Handle interface {
1403         io.Reader
1404         io.Seeker
1405         io.Closer
1406         io.ReaderAt
1407 }
1408
1409 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1410         cl.mu.Lock()
1411         defer cl.mu.Unlock()
1412         t, ok := cl.torrents[infoHash]
1413         if ok {
1414                 return
1415         }
1416         new = true
1417         t = cl.newTorrent(infoHash)
1418         if cl.dHT != nil {
1419                 go t.dhtAnnouncer()
1420         }
1421         cl.torrents[infoHash] = t
1422         t.updateWantPeersEvent()
1423         // Tickle Client.waitAccept, new torrent may want conns.
1424         cl.event.Broadcast()
1425         return
1426 }
1427
1428 // Add or merge a torrent spec. If the torrent is already present, the
1429 // trackers will be merged with the existing ones. If the Info isn't yet
1430 // known, it will be set. The display name is replaced if the new spec
1431 // provides one. Returns new if the torrent wasn't already in the client.
1432 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1433         t, new = cl.AddTorrentInfoHash(spec.InfoHash)
1434         if spec.DisplayName != "" {
1435                 t.SetDisplayName(spec.DisplayName)
1436         }
1437         if spec.InfoBytes != nil {
1438                 err = t.SetInfoBytes(spec.InfoBytes)
1439                 if err != nil {
1440                         return
1441                 }
1442         }
1443         cl.mu.Lock()
1444         defer cl.mu.Unlock()
1445         if spec.ChunkSize != 0 {
1446                 t.chunkSize = pp.Integer(spec.ChunkSize)
1447         }
1448         t.addTrackers(spec.Trackers)
1449         t.maybeNewConns()
1450         return
1451 }
1452
1453 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1454         t, ok := cl.torrents[infoHash]
1455         if !ok {
1456                 err = fmt.Errorf("no such torrent")
1457                 return
1458         }
1459         err = t.close()
1460         if err != nil {
1461                 panic(err)
1462         }
1463         delete(cl.torrents, infoHash)
1464         return
1465 }
1466
1467 func (cl *Client) prepareTrackerAnnounceUnlocked(announceURL string) (blocked bool, urlToUse string, host string, err error) {
1468         _url, err := url.Parse(announceURL)
1469         if err != nil {
1470                 return
1471         }
1472         hmp := missinggo.SplitHostMaybePort(_url.Host)
1473         if hmp.Err != nil {
1474                 err = hmp.Err
1475                 return
1476         }
1477         addr, err := net.ResolveIPAddr("ip", hmp.Host)
1478         if err != nil {
1479                 return
1480         }
1481         cl.mu.RLock()
1482         _, blocked = cl.ipBlockRange(addr.IP)
1483         cl.mu.RUnlock()
1484         host = _url.Host
1485         hmp.Host = addr.String()
1486         _url.Host = hmp.String()
1487         urlToUse = _url.String()
1488         return
1489 }
1490
1491 func (cl *Client) allTorrentsCompleted() bool {
1492         for _, t := range cl.torrents {
1493                 if !t.haveInfo() {
1494                         return false
1495                 }
1496                 if t.numPiecesCompleted() != t.numPieces() {
1497                         return false
1498                 }
1499         }
1500         return true
1501 }
1502
1503 // Returns true when all torrents are completely downloaded and false if the
1504 // client is stopped before that.
1505 func (cl *Client) WaitAll() bool {
1506         cl.mu.Lock()
1507         defer cl.mu.Unlock()
1508         for !cl.allTorrentsCompleted() {
1509                 if cl.closed.IsSet() {
1510                         return false
1511                 }
1512                 cl.event.Wait()
1513         }
1514         return true
1515 }
1516
1517 // Handle a received chunk from a peer.
1518 func (cl *Client) downloadedChunk(t *Torrent, c *connection, msg *pp.Message) {
1519         chunksReceived.Add(1)
1520
1521         req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
1522
1523         // Request has been satisfied.
1524         if cl.connDeleteRequest(t, c, req) {
1525                 defer c.updateRequests()
1526         } else {
1527                 unexpectedChunksReceived.Add(1)
1528         }
1529
1530         index := int(req.Index)
1531         piece := &t.pieces[index]
1532
1533         // Do we actually want this chunk?
1534         if !t.wantPiece(req) {
1535                 unwantedChunksReceived.Add(1)
1536                 c.UnwantedChunksReceived++
1537                 return
1538         }
1539
1540         c.UsefulChunksReceived++
1541         c.lastUsefulChunkReceived = time.Now()
1542
1543         cl.upload(t, c)
1544
1545         // Need to record that it hasn't been written yet, before we attempt to do
1546         // anything with it.
1547         piece.incrementPendingWrites()
1548         // Record that we have the chunk.
1549         piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
1550
1551         // Cancel pending requests for this chunk.
1552         for _, c := range t.conns {
1553                 if cl.connCancel(t, c, req) {
1554                         c.updateRequests()
1555                 }
1556         }
1557
1558         cl.mu.Unlock()
1559         // Write the chunk out. Note that the upper bound on chunk writing
1560         // concurrency will be the number of connections.
1561         err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
1562         cl.mu.Lock()
1563
1564         piece.decrementPendingWrites()
1565
1566         if err != nil {
1567                 log.Printf("%s: error writing chunk %v: %s", t, req, err)
1568                 t.pendRequest(req)
1569                 t.updatePieceCompletion(int(msg.Index))
1570                 return
1571         }
1572
1573         // It's important that the piece is potentially queued before we check if
1574         // the piece is still wanted, because if it is queued, it won't be wanted.
1575         if t.pieceAllDirty(index) {
1576                 cl.queuePieceCheck(t, int(req.Index))
1577         }
1578
1579         if c.peerTouchedPieces == nil {
1580                 c.peerTouchedPieces = make(map[int]struct{})
1581         }
1582         c.peerTouchedPieces[index] = struct{}{}
1583
1584         cl.event.Broadcast()
1585         t.publishPieceChange(int(req.Index))
1586         return
1587 }
1588
1589 // Return the connections that touched a piece, and clear the entry while
1590 // doing it.
1591 func (cl *Client) reapPieceTouches(t *Torrent, piece int) (ret []*connection) {
1592         for _, c := range t.conns {
1593                 if _, ok := c.peerTouchedPieces[piece]; ok {
1594                         ret = append(ret, c)
1595                         delete(c.peerTouchedPieces, piece)
1596                 }
1597         }
1598         return
1599 }
1600
1601 func (cl *Client) pieceHashed(t *Torrent, piece int, correct bool) {
1602         if t.closed.IsSet() {
1603                 return
1604         }
1605         p := &t.pieces[piece]
1606         if p.EverHashed {
1607                 // Don't score the first time a piece is hashed, it could be an
1608                 // initial check.
1609                 if correct {
1610                         pieceHashedCorrect.Add(1)
1611                 } else {
1612                         log.Printf("%s: piece %d (%x) failed hash", t, piece, p.Hash)
1613                         pieceHashedNotCorrect.Add(1)
1614                 }
1615         }
1616         p.EverHashed = true
1617         touchers := cl.reapPieceTouches(t, piece)
1618         if correct {
1619                 for _, c := range touchers {
1620                         c.goodPiecesDirtied++
1621                 }
1622                 err := p.Storage().MarkComplete()
1623                 if err != nil {
1624                         log.Printf("%T: error completing piece %d: %s", t.storage, piece, err)
1625                 }
1626                 t.updatePieceCompletion(piece)
1627         } else if len(touchers) != 0 {
1628                 log.Printf("dropping and banning %d conns that touched piece", len(touchers))
1629                 for _, c := range touchers {
1630                         c.badPiecesDirtied++
1631                         t.cl.banPeerIP(missinggo.AddrIP(c.remoteAddr()))
1632                         t.dropConnection(c)
1633                 }
1634         }
1635         cl.pieceChanged(t, piece)
1636 }
1637
1638 func (cl *Client) onCompletedPiece(t *Torrent, piece int) {
1639         t.pendingPieces.Remove(piece)
1640         t.pendAllChunkSpecs(piece)
1641         for _, conn := range t.conns {
1642                 conn.Have(piece)
1643                 for r := range conn.Requests {
1644                         if int(r.Index) == piece {
1645                                 conn.Cancel(r)
1646                         }
1647                 }
1648                 // Could check here if peer doesn't have piece, but due to caching
1649                 // some peers may have said they have a piece but they don't.
1650                 cl.upload(t, conn)
1651         }
1652 }
1653
1654 func (cl *Client) onFailedPiece(t *Torrent, piece int) {
1655         if t.pieceAllDirty(piece) {
1656                 t.pendAllChunkSpecs(piece)
1657         }
1658         if !t.wantPieceIndex(piece) {
1659                 return
1660         }
1661         cl.openNewConns(t)
1662         for _, conn := range t.conns {
1663                 if conn.PeerHasPiece(piece) {
1664                         conn.updateRequests()
1665                 }
1666         }
1667 }
1668
1669 func (cl *Client) pieceChanged(t *Torrent, piece int) {
1670         correct := t.pieceComplete(piece)
1671         defer cl.event.Broadcast()
1672         if correct {
1673                 cl.onCompletedPiece(t, piece)
1674         } else {
1675                 cl.onFailedPiece(t, piece)
1676         }
1677         t.updatePiecePriority(piece)
1678 }
1679
1680 func (cl *Client) verifyPiece(t *Torrent, piece int) {
1681         cl.mu.Lock()
1682         defer cl.mu.Unlock()
1683         p := &t.pieces[piece]
1684         for p.Hashing || t.storage == nil {
1685                 cl.event.Wait()
1686         }
1687         p.QueuedForHash = false
1688         if t.closed.IsSet() || t.pieceComplete(piece) {
1689                 t.updatePiecePriority(piece)
1690                 return
1691         }
1692         p.Hashing = true
1693         t.publishPieceChange(piece)
1694         cl.mu.Unlock()
1695         sum := t.hashPiece(piece)
1696         cl.mu.Lock()
1697         p.Hashing = false
1698         cl.pieceHashed(t, piece, sum == p.Hash)
1699 }
1700
1701 // Returns handles to all the torrents loaded in the Client.
1702 func (cl *Client) Torrents() (ret []*Torrent) {
1703         cl.mu.Lock()
1704         for _, t := range cl.torrents {
1705                 ret = append(ret, t)
1706         }
1707         cl.mu.Unlock()
1708         return
1709 }
1710
1711 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1712         spec, err := TorrentSpecFromMagnetURI(uri)
1713         if err != nil {
1714                 return
1715         }
1716         T, _, err = cl.AddTorrentSpec(spec)
1717         return
1718 }
1719
1720 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1721         T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1722         var ss []string
1723         slices.MakeInto(&ss, mi.Nodes)
1724         cl.AddDHTNodes(ss)
1725         return
1726 }
1727
1728 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1729         mi, err := metainfo.LoadFromFile(filename)
1730         if err != nil {
1731                 return
1732         }
1733         return cl.AddTorrent(mi)
1734 }
1735
1736 func (cl *Client) DHT() *dht.Server {
1737         return cl.dHT
1738 }
1739
1740 func (cl *Client) AddDHTNodes(nodes []string) {
1741         for _, n := range nodes {
1742                 hmp := missinggo.SplitHostMaybePort(n)
1743                 ip := net.ParseIP(hmp.Host)
1744                 if ip == nil {
1745                         log.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1746                         continue
1747                 }
1748                 ni := krpc.NodeInfo{
1749                         Addr: &net.UDPAddr{
1750                                 IP:   ip,
1751                                 Port: hmp.Port,
1752                         },
1753                 }
1754                 cl.DHT().AddNode(ni)
1755         }
1756 }
1757
1758 func (cl *Client) banPeerIP(ip net.IP) {
1759         if cl.badPeerIPs == nil {
1760                 cl.badPeerIPs = make(map[string]struct{})
1761         }
1762         cl.badPeerIPs[ip.String()] = struct{}{}
1763 }