]> Sergey Matveev's repositories - btrtrc.git/blob - client.go
cae1ee7a80209804335cde47c7e608fc25370dd7
[btrtrc.git] / client.go
1 package torrent
2
3 import (
4         "bufio"
5         "bytes"
6         "crypto/rand"
7         "encoding/hex"
8         "errors"
9         "fmt"
10         "io"
11         "log"
12         mathRand "math/rand"
13         "net"
14         "net/url"
15         "strconv"
16         "strings"
17         "time"
18
19         "github.com/anacrolix/missinggo"
20         "github.com/anacrolix/missinggo/pproffd"
21         "github.com/anacrolix/missinggo/pubsub"
22         "github.com/anacrolix/missinggo/slices"
23         "github.com/anacrolix/sync"
24         "github.com/anacrolix/utp"
25         "github.com/dustin/go-humanize"
26
27         "github.com/anacrolix/torrent/bencode"
28         "github.com/anacrolix/torrent/dht"
29         "github.com/anacrolix/torrent/dht/krpc"
30         "github.com/anacrolix/torrent/iplist"
31         "github.com/anacrolix/torrent/metainfo"
32         "github.com/anacrolix/torrent/mse"
33         pp "github.com/anacrolix/torrent/peer_protocol"
34         "github.com/anacrolix/torrent/storage"
35 )
36
37 // Currently doesn't really queue, but should in the future.
38 func (cl *Client) queuePieceCheck(t *Torrent, pieceIndex int) {
39         piece := &t.pieces[pieceIndex]
40         if piece.QueuedForHash {
41                 return
42         }
43         piece.QueuedForHash = true
44         t.publishPieceChange(pieceIndex)
45         go cl.verifyPiece(t, pieceIndex)
46 }
47
48 // Queue a piece check if one isn't already queued, and the piece has never
49 // been checked before.
50 func (cl *Client) queueFirstHash(t *Torrent, piece int) {
51         p := &t.pieces[piece]
52         if p.EverHashed || p.Hashing || p.QueuedForHash || t.pieceComplete(piece) {
53                 return
54         }
55         cl.queuePieceCheck(t, piece)
56 }
57
58 // Clients contain zero or more Torrents. A Client manages a blocklist, the
59 // TCP/UDP protocol ports, and DHT as desired.
60 type Client struct {
61         halfOpenLimit int
62         peerID        [20]byte
63         // The net.Addr.String part that should be common to all active listeners.
64         listenAddr     string
65         tcpListener    net.Listener
66         utpSock        *utp.Socket
67         dHT            *dht.Server
68         ipBlockList    iplist.Ranger
69         config         Config
70         extensionBytes peerExtensionBytes
71         // Set of addresses that have our client ID. This intentionally will
72         // include ourselves if we end up trying to connect to our own address
73         // through legitimate channels.
74         dopplegangerAddrs map[string]struct{}
75         badPeerIPs        map[string]struct{}
76
77         defaultStorage storage.Client
78
79         mu     sync.RWMutex
80         event  sync.Cond
81         closed missinggo.Event
82
83         torrents map[metainfo.Hash]*Torrent
84 }
85
86 func (cl *Client) BadPeerIPs() []string {
87         cl.mu.RLock()
88         defer cl.mu.RUnlock()
89         return slices.FromMapKeys(cl.badPeerIPs).([]string)
90 }
91
92 func (cl *Client) IPBlockList() iplist.Ranger {
93         cl.mu.Lock()
94         defer cl.mu.Unlock()
95         return cl.ipBlockList
96 }
97
98 func (cl *Client) SetIPBlockList(list iplist.Ranger) {
99         cl.mu.Lock()
100         defer cl.mu.Unlock()
101         cl.ipBlockList = list
102         if cl.dHT != nil {
103                 cl.dHT.SetIPBlockList(list)
104         }
105 }
106
107 func (cl *Client) PeerID() string {
108         return string(cl.peerID[:])
109 }
110
111 type torrentAddr string
112
113 func (me torrentAddr) Network() string { return "" }
114
115 func (me torrentAddr) String() string { return string(me) }
116
117 func (cl *Client) ListenAddr() net.Addr {
118         if cl.listenAddr == "" {
119                 return nil
120         }
121         return torrentAddr(cl.listenAddr)
122 }
123
124 func (cl *Client) sortedTorrents() (ret []*Torrent) {
125         return slices.Sort(slices.FromMapElems(cl.torrents), func(l, r metainfo.Hash) bool {
126                 return l.AsString() < r.AsString()
127         }).([]*Torrent)
128 }
129
130 // Writes out a human readable status of the client, such as for writing to a
131 // HTTP status page.
132 func (cl *Client) WriteStatus(_w io.Writer) {
133         w := bufio.NewWriter(_w)
134         defer w.Flush()
135         if addr := cl.ListenAddr(); addr != nil {
136                 fmt.Fprintf(w, "Listening on %s\n", addr)
137         } else {
138                 fmt.Fprintln(w, "Not listening!")
139         }
140         fmt.Fprintf(w, "Peer ID: %+q\n", cl.PeerID())
141         fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.BadPeerIPs()))
142         if dht := cl.DHT(); dht != nil {
143                 dhtStats := dht.Stats()
144                 fmt.Fprintf(w, "DHT nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
145                 fmt.Fprintf(w, "DHT Server ID: %x\n", dht.ID())
146                 fmt.Fprintf(w, "DHT port: %d\n", missinggo.AddrPort(dht.Addr()))
147                 fmt.Fprintf(w, "DHT announces: %d\n", dhtStats.ConfirmedAnnounces)
148                 fmt.Fprintf(w, "Outstanding transactions: %d\n", dhtStats.OutstandingTransactions)
149         }
150         fmt.Fprintf(w, "# Torrents: %d\n", len(cl.Torrents()))
151         fmt.Fprintln(w)
152         for _, t := range slices.Sort(append([]*Torrent(nil), cl.Torrents()...), func(l, r *Torrent) bool {
153                 return l.InfoHash().AsString() < r.InfoHash().AsString()
154         }).([]*Torrent) {
155                 if t.Name() == "" {
156                         fmt.Fprint(w, "<unknown name>")
157                 } else {
158                         fmt.Fprint(w, t.Name())
159                 }
160                 fmt.Fprint(w, "\n")
161                 if t.Info() != nil {
162                         fmt.Fprintf(w, "%f%% of %d bytes (%s)", 100*(1-float64(t.BytesMissing())/float64(t.Info().TotalLength())), t.length, humanize.Bytes(uint64(t.Info().TotalLength())))
163                 } else {
164                         w.WriteString("<missing metainfo>")
165                 }
166                 fmt.Fprint(w, "\n")
167                 t.writeStatus(w)
168                 fmt.Fprintln(w)
169         }
170 }
171
172 func listenUTP(networkSuffix, addr string) (*utp.Socket, error) {
173         return utp.NewSocket("udp"+networkSuffix, addr)
174 }
175
176 func listenTCP(networkSuffix, addr string) (net.Listener, error) {
177         return net.Listen("tcp"+networkSuffix, addr)
178 }
179
180 func listenBothSameDynamicPort(networkSuffix, host string) (tcpL net.Listener, utpSock *utp.Socket, listenedAddr string, err error) {
181         for {
182                 tcpL, err = listenTCP(networkSuffix, net.JoinHostPort(host, "0"))
183                 if err != nil {
184                         return
185                 }
186                 listenedAddr = tcpL.Addr().String()
187                 utpSock, err = listenUTP(networkSuffix, listenedAddr)
188                 if err == nil {
189                         return
190                 }
191                 tcpL.Close()
192                 if !strings.Contains(err.Error(), "address already in use") {
193                         return
194                 }
195         }
196 }
197
198 // Listen to enabled protocols, ensuring ports match.
199 func listen(tcp, utp bool, networkSuffix, addr string) (tcpL net.Listener, utpSock *utp.Socket, listenedAddr string, err error) {
200         if addr == "" {
201                 addr = ":50007"
202         }
203         if tcp && utp {
204                 var host string
205                 var port int
206                 host, port, err = missinggo.ParseHostPort(addr)
207                 if err != nil {
208                         return
209                 }
210                 if port == 0 {
211                         // If both protocols are active, they need to have the same port.
212                         return listenBothSameDynamicPort(networkSuffix, host)
213                 }
214         }
215         defer func() {
216                 if err != nil {
217                         listenedAddr = ""
218                 }
219         }()
220         if tcp {
221                 tcpL, err = listenTCP(networkSuffix, addr)
222                 if err != nil {
223                         return
224                 }
225                 defer func() {
226                         if err != nil {
227                                 tcpL.Close()
228                         }
229                 }()
230                 listenedAddr = tcpL.Addr().String()
231         }
232         if utp {
233                 utpSock, err = listenUTP(networkSuffix, addr)
234                 if err != nil {
235                         return
236                 }
237                 listenedAddr = utpSock.Addr().String()
238         }
239         return
240 }
241
242 // Creates a new client.
243 func NewClient(cfg *Config) (cl *Client, err error) {
244         if cfg == nil {
245                 cfg = &Config{}
246         }
247
248         defer func() {
249                 if err != nil {
250                         cl = nil
251                 }
252         }()
253         cl = &Client{
254                 halfOpenLimit:     defaultHalfOpenConnsPerTorrent,
255                 config:            *cfg,
256                 defaultStorage:    cfg.DefaultStorage,
257                 dopplegangerAddrs: make(map[string]struct{}),
258                 torrents:          make(map[metainfo.Hash]*Torrent),
259         }
260         missinggo.CopyExact(&cl.extensionBytes, defaultExtensionBytes)
261         cl.event.L = &cl.mu
262         if cl.defaultStorage == nil {
263                 cl.defaultStorage = storage.NewFile(cfg.DataDir)
264         }
265         if cfg.IPBlocklist != nil {
266                 cl.ipBlockList = cfg.IPBlocklist
267         }
268
269         if cfg.PeerID != "" {
270                 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
271         } else {
272                 o := copy(cl.peerID[:], bep20)
273                 _, err = rand.Read(cl.peerID[o:])
274                 if err != nil {
275                         panic("error generating peer id")
276                 }
277         }
278
279         cl.tcpListener, cl.utpSock, cl.listenAddr, err = listen(
280                 !cl.config.DisableTCP,
281                 !cl.config.DisableUTP,
282                 func() string {
283                         if cl.config.DisableIPv6 {
284                                 return "4"
285                         } else {
286                                 return ""
287                         }
288                 }(),
289                 cl.config.ListenAddr)
290         if err != nil {
291                 return
292         }
293         if cl.tcpListener != nil {
294                 go cl.acceptConnections(cl.tcpListener, false)
295         }
296         if cl.utpSock != nil {
297                 go cl.acceptConnections(cl.utpSock, true)
298         }
299         if !cfg.NoDHT {
300                 dhtCfg := cfg.DHTConfig
301                 if dhtCfg.IPBlocklist == nil {
302                         dhtCfg.IPBlocklist = cl.ipBlockList
303                 }
304                 dhtCfg.Addr = firstNonEmptyString(dhtCfg.Addr, cl.listenAddr, cl.config.ListenAddr)
305                 if dhtCfg.Conn == nil && cl.utpSock != nil {
306                         dhtCfg.Conn = cl.utpSock
307                 }
308                 cl.dHT, err = dht.NewServer(&dhtCfg)
309                 if err != nil {
310                         return
311                 }
312         }
313
314         return
315 }
316
317 func firstNonEmptyString(ss ...string) string {
318         for _, s := range ss {
319                 if s != "" {
320                         return s
321                 }
322         }
323         return ""
324 }
325
326 // Stops the client. All connections to peers are closed and all activity will
327 // come to a halt.
328 func (cl *Client) Close() {
329         cl.mu.Lock()
330         defer cl.mu.Unlock()
331         cl.closed.Set()
332         if cl.dHT != nil {
333                 cl.dHT.Close()
334         }
335         if cl.utpSock != nil {
336                 cl.utpSock.CloseNow()
337         }
338         if cl.tcpListener != nil {
339                 cl.tcpListener.Close()
340         }
341         for _, t := range cl.torrents {
342                 t.close()
343         }
344         cl.event.Broadcast()
345 }
346
347 var ipv6BlockRange = iplist.Range{Description: "non-IPv4 address"}
348
349 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
350         if cl.ipBlockList == nil {
351                 return
352         }
353         ip4 := ip.To4()
354         // If blocklists are enabled, then block non-IPv4 addresses, because
355         // blocklists do not yet support IPv6.
356         if ip4 == nil {
357                 if missinggo.CryHeard() {
358                         log.Printf("blocking non-IPv4 address: %s", ip)
359                 }
360                 r = ipv6BlockRange
361                 blocked = true
362                 return
363         }
364         return cl.ipBlockList.Lookup(ip4)
365 }
366
367 func (cl *Client) waitAccept() {
368         for {
369                 for _, t := range cl.torrents {
370                         if t.wantConns() {
371                                 return
372                         }
373                 }
374                 if cl.closed.IsSet() {
375                         return
376                 }
377                 cl.event.Wait()
378         }
379 }
380
381 func (cl *Client) acceptConnections(l net.Listener, utp bool) {
382         cl.mu.Lock()
383         defer cl.mu.Unlock()
384         for {
385                 cl.waitAccept()
386                 cl.mu.Unlock()
387                 conn, err := l.Accept()
388                 conn = pproffd.WrapNetConn(conn)
389                 cl.mu.Lock()
390                 if cl.closed.IsSet() {
391                         if conn != nil {
392                                 conn.Close()
393                         }
394                         return
395                 }
396                 if err != nil {
397                         log.Print(err)
398                         // I think something harsher should happen here? Our accept
399                         // routine just fucked off.
400                         return
401                 }
402                 if utp {
403                         acceptUTP.Add(1)
404                 } else {
405                         acceptTCP.Add(1)
406                 }
407                 reject := cl.badPeerIPPort(
408                         missinggo.AddrIP(conn.RemoteAddr()),
409                         missinggo.AddrPort(conn.RemoteAddr()))
410                 if reject {
411                         acceptReject.Add(1)
412                         conn.Close()
413                         continue
414                 }
415                 go cl.incomingConnection(conn, utp)
416         }
417 }
418
419 func (cl *Client) incomingConnection(nc net.Conn, utp bool) {
420         defer nc.Close()
421         if tc, ok := nc.(*net.TCPConn); ok {
422                 tc.SetLinger(0)
423         }
424         c := newConnection(nc, &cl.mu)
425         c.Discovery = peerSourceIncoming
426         c.uTP = utp
427         cl.runReceivedConn(c)
428 }
429
430 // Returns a handle to the given torrent, if it's present in the client.
431 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
432         cl.mu.Lock()
433         defer cl.mu.Unlock()
434         t, ok = cl.torrents[ih]
435         return
436 }
437
438 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
439         return cl.torrents[ih]
440 }
441
442 type dialResult struct {
443         Conn net.Conn
444         UTP  bool
445 }
446
447 func doDial(dial func(addr string, t *Torrent) (net.Conn, error), ch chan dialResult, utp bool, addr string, t *Torrent) {
448         conn, err := dial(addr, t)
449         if err != nil {
450                 if conn != nil {
451                         conn.Close()
452                 }
453                 conn = nil // Pedantic
454         }
455         ch <- dialResult{conn, utp}
456         if err == nil {
457                 successfulDials.Add(1)
458                 return
459         }
460         unsuccessfulDials.Add(1)
461 }
462
463 func reducedDialTimeout(max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
464         ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
465         if ret < minDialTimeout {
466                 ret = minDialTimeout
467         }
468         return
469 }
470
471 // Returns whether an address is known to connect to a client with our own ID.
472 func (cl *Client) dopplegangerAddr(addr string) bool {
473         _, ok := cl.dopplegangerAddrs[addr]
474         return ok
475 }
476
477 // Start the process of connecting to the given peer for the given torrent if
478 // appropriate.
479 func (cl *Client) initiateConn(peer Peer, t *Torrent) {
480         if peer.Id == cl.peerID {
481                 return
482         }
483         if cl.badPeerIPPort(peer.IP, peer.Port) {
484                 return
485         }
486         addr := net.JoinHostPort(peer.IP.String(), fmt.Sprintf("%d", peer.Port))
487         if t.addrActive(addr) {
488                 return
489         }
490         t.halfOpen[addr] = struct{}{}
491         go cl.outgoingConnection(t, addr, peer.Source)
492 }
493
494 func (cl *Client) dialTimeout(t *Torrent) time.Duration {
495         cl.mu.Lock()
496         pendingPeers := len(t.peers)
497         cl.mu.Unlock()
498         return reducedDialTimeout(nominalDialTimeout, cl.halfOpenLimit, pendingPeers)
499 }
500
501 func (cl *Client) dialTCP(addr string, t *Torrent) (c net.Conn, err error) {
502         c, err = net.DialTimeout("tcp", addr, cl.dialTimeout(t))
503         if err == nil {
504                 c.(*net.TCPConn).SetLinger(0)
505         }
506         c = pproffd.WrapNetConn(c)
507         return
508 }
509
510 func (cl *Client) dialUTP(addr string, t *Torrent) (c net.Conn, err error) {
511         return cl.utpSock.DialTimeout(addr, cl.dialTimeout(t))
512 }
513
514 // Returns a connection over UTP or TCP, whichever is first to connect.
515 func (cl *Client) dialFirst(addr string, t *Torrent) (conn net.Conn, utp bool) {
516         // Initiate connections via TCP and UTP simultaneously. Use the first one
517         // that succeeds.
518         left := 0
519         if !cl.config.DisableUTP {
520                 left++
521         }
522         if !cl.config.DisableTCP {
523                 left++
524         }
525         resCh := make(chan dialResult, left)
526         if !cl.config.DisableUTP {
527                 go doDial(cl.dialUTP, resCh, true, addr, t)
528         }
529         if !cl.config.DisableTCP {
530                 go doDial(cl.dialTCP, resCh, false, addr, t)
531         }
532         var res dialResult
533         // Wait for a successful connection.
534         for ; left > 0 && res.Conn == nil; left-- {
535                 res = <-resCh
536         }
537         if left > 0 {
538                 // There are still incompleted dials.
539                 go func() {
540                         for ; left > 0; left-- {
541                                 conn := (<-resCh).Conn
542                                 if conn != nil {
543                                         conn.Close()
544                                 }
545                         }
546                 }()
547         }
548         conn = res.Conn
549         utp = res.UTP
550         return
551 }
552
553 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
554         if _, ok := t.halfOpen[addr]; !ok {
555                 panic("invariant broken")
556         }
557         delete(t.halfOpen, addr)
558         cl.openNewConns(t)
559 }
560
561 // Performs initiator handshakes and returns a connection. Returns nil
562 // *connection if no connection for valid reasons.
563 func (cl *Client) handshakesConnection(nc net.Conn, t *Torrent, encrypted, utp bool) (c *connection, err error) {
564         c = newConnection(nc, &cl.mu)
565         c.encrypted = encrypted
566         c.uTP = utp
567         err = nc.SetDeadline(time.Now().Add(handshakesTimeout))
568         if err != nil {
569                 return
570         }
571         ok, err := cl.initiateHandshakes(c, t)
572         if !ok {
573                 c = nil
574         }
575         return
576 }
577
578 // Returns nil connection and nil error if no connection could be established
579 // for valid reasons.
580 func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) {
581         nc, utp := cl.dialFirst(addr, t)
582         if nc == nil {
583                 return
584         }
585         c, err = cl.handshakesConnection(nc, t, !cl.config.DisableEncryption, utp)
586         if err != nil {
587                 nc.Close()
588                 return
589         } else if c != nil {
590                 return
591         }
592         nc.Close()
593         if cl.config.DisableEncryption {
594                 // We already tried without encryption.
595                 return
596         }
597         // Try again without encryption, using whichever protocol type worked last
598         // time.
599         if utp {
600                 nc, err = cl.dialUTP(addr, t)
601         } else {
602                 nc, err = cl.dialTCP(addr, t)
603         }
604         if err != nil {
605                 err = fmt.Errorf("error dialing for unencrypted connection: %s", err)
606                 return
607         }
608         c, err = cl.handshakesConnection(nc, t, false, utp)
609         if err != nil || c == nil {
610                 nc.Close()
611         }
612         return
613 }
614
615 // Called to dial out and run a connection. The addr we're given is already
616 // considered half-open.
617 func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
618         c, err := cl.establishOutgoingConn(t, addr)
619         cl.mu.Lock()
620         defer cl.mu.Unlock()
621         // Don't release lock between here and addConnection, unless it's for
622         // failure.
623         cl.noLongerHalfOpen(t, addr)
624         if err != nil {
625                 if cl.config.Debug {
626                         log.Printf("error establishing outgoing connection: %s", err)
627                 }
628                 return
629         }
630         if c == nil {
631                 return
632         }
633         defer c.Close()
634         c.Discovery = ps
635         cl.runInitiatedHandshookConn(c, t)
636 }
637
638 // The port number for incoming peer connections. 0 if the client isn't
639 // listening.
640 func (cl *Client) incomingPeerPort() int {
641         if cl.listenAddr == "" {
642                 return 0
643         }
644         _, port, err := missinggo.ParseHostPort(cl.listenAddr)
645         if err != nil {
646                 panic(err)
647         }
648         return port
649 }
650
651 // Convert a net.Addr to its compact IP representation. Either 4 or 16 bytes
652 // per "yourip" field of http://www.bittorrent.org/beps/bep_0010.html.
653 func addrCompactIP(addr net.Addr) (string, error) {
654         host, _, err := net.SplitHostPort(addr.String())
655         if err != nil {
656                 return "", err
657         }
658         ip := net.ParseIP(host)
659         if v4 := ip.To4(); v4 != nil {
660                 if len(v4) != 4 {
661                         panic(v4)
662                 }
663                 return string(v4), nil
664         }
665         return string(ip.To16()), nil
666 }
667
668 func handshakeWriter(w io.Writer, bb <-chan []byte, done chan<- error) {
669         var err error
670         for b := range bb {
671                 _, err = w.Write(b)
672                 if err != nil {
673                         break
674                 }
675         }
676         done <- err
677 }
678
679 type (
680         peerExtensionBytes [8]byte
681         peerID             [20]byte
682 )
683
684 func (pex *peerExtensionBytes) SupportsExtended() bool {
685         return pex[5]&0x10 != 0
686 }
687
688 func (pex *peerExtensionBytes) SupportsDHT() bool {
689         return pex[7]&0x01 != 0
690 }
691
692 func (pex *peerExtensionBytes) SupportsFast() bool {
693         return pex[7]&0x04 != 0
694 }
695
696 type handshakeResult struct {
697         peerExtensionBytes
698         peerID
699         metainfo.Hash
700 }
701
702 // ih is nil if we expect the peer to declare the InfoHash, such as when the
703 // peer initiated the connection. Returns ok if the handshake was successful,
704 // and err if there was an unexpected condition other than the peer simply
705 // abandoning the handshake.
706 func handshake(sock io.ReadWriter, ih *metainfo.Hash, peerID [20]byte, extensions peerExtensionBytes) (res handshakeResult, ok bool, err error) {
707         // Bytes to be sent to the peer. Should never block the sender.
708         postCh := make(chan []byte, 4)
709         // A single error value sent when the writer completes.
710         writeDone := make(chan error, 1)
711         // Performs writes to the socket and ensures posts don't block.
712         go handshakeWriter(sock, postCh, writeDone)
713
714         defer func() {
715                 close(postCh) // Done writing.
716                 if !ok {
717                         return
718                 }
719                 if err != nil {
720                         panic(err)
721                 }
722                 // Wait until writes complete before returning from handshake.
723                 err = <-writeDone
724                 if err != nil {
725                         err = fmt.Errorf("error writing: %s", err)
726                 }
727         }()
728
729         post := func(bb []byte) {
730                 select {
731                 case postCh <- bb:
732                 default:
733                         panic("mustn't block while posting")
734                 }
735         }
736
737         post([]byte(pp.Protocol))
738         post(extensions[:])
739         if ih != nil { // We already know what we want.
740                 post(ih[:])
741                 post(peerID[:])
742         }
743         var b [68]byte
744         _, err = io.ReadFull(sock, b[:68])
745         if err != nil {
746                 err = nil
747                 return
748         }
749         if string(b[:20]) != pp.Protocol {
750                 return
751         }
752         missinggo.CopyExact(&res.peerExtensionBytes, b[20:28])
753         missinggo.CopyExact(&res.Hash, b[28:48])
754         missinggo.CopyExact(&res.peerID, b[48:68])
755         peerExtensions.Add(hex.EncodeToString(res.peerExtensionBytes[:]), 1)
756
757         // TODO: Maybe we can just drop peers here if we're not interested. This
758         // could prevent them trying to reconnect, falsely believing there was
759         // just a problem.
760         if ih == nil { // We were waiting for the peer to tell us what they wanted.
761                 post(res.Hash[:])
762                 post(peerID[:])
763         }
764
765         ok = true
766         return
767 }
768
769 // Wraps a raw connection and provides the interface we want for using the
770 // connection in the message loop.
771 type deadlineReader struct {
772         nc net.Conn
773         r  io.Reader
774 }
775
776 func (r deadlineReader) Read(b []byte) (n int, err error) {
777         // Keep-alives should be received every 2 mins. Give a bit of gracetime.
778         err = r.nc.SetReadDeadline(time.Now().Add(150 * time.Second))
779         if err != nil {
780                 err = fmt.Errorf("error setting read deadline: %s", err)
781         }
782         n, err = r.r.Read(b)
783         // Convert common errors into io.EOF.
784         // if err != nil {
785         //      if opError, ok := err.(*net.OpError); ok && opError.Op == "read" && opError.Err == syscall.ECONNRESET {
786         //              err = io.EOF
787         //      } else if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
788         //              if n != 0 {
789         //                      panic(n)
790         //              }
791         //              err = io.EOF
792         //      }
793         // }
794         return
795 }
796
797 type readWriter struct {
798         io.Reader
799         io.Writer
800 }
801
802 func maybeReceiveEncryptedHandshake(rw io.ReadWriter, skeys [][]byte) (ret io.ReadWriter, encrypted bool, err error) {
803         var protocol [len(pp.Protocol)]byte
804         _, err = io.ReadFull(rw, protocol[:])
805         if err != nil {
806                 return
807         }
808         ret = readWriter{
809                 io.MultiReader(bytes.NewReader(protocol[:]), rw),
810                 rw,
811         }
812         if string(protocol[:]) == pp.Protocol {
813                 return
814         }
815         encrypted = true
816         ret, err = mse.ReceiveHandshake(ret, skeys)
817         return
818 }
819
820 func (cl *Client) receiveSkeys() (ret [][]byte) {
821         for ih := range cl.torrents {
822                 ret = append(ret, ih[:])
823         }
824         return
825 }
826
827 func (cl *Client) initiateHandshakes(c *connection, t *Torrent) (ok bool, err error) {
828         if c.encrypted {
829                 c.rw, err = mse.InitiateHandshake(c.rw, t.infoHash[:], nil)
830                 if err != nil {
831                         return
832                 }
833         }
834         ih, ok, err := cl.connBTHandshake(c, &t.infoHash)
835         if ih != t.infoHash {
836                 ok = false
837         }
838         return
839 }
840
841 // Do encryption and bittorrent handshakes as receiver.
842 func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
843         cl.mu.Lock()
844         skeys := cl.receiveSkeys()
845         cl.mu.Unlock()
846         if !cl.config.DisableEncryption {
847                 c.rw, c.encrypted, err = maybeReceiveEncryptedHandshake(c.rw, skeys)
848                 if err != nil {
849                         if err == mse.ErrNoSecretKeyMatch {
850                                 err = nil
851                         }
852                         return
853                 }
854         }
855         ih, ok, err := cl.connBTHandshake(c, nil)
856         if err != nil {
857                 err = fmt.Errorf("error during bt handshake: %s", err)
858                 return
859         }
860         if !ok {
861                 return
862         }
863         cl.mu.Lock()
864         t = cl.torrents[ih]
865         cl.mu.Unlock()
866         return
867 }
868
869 // Returns !ok if handshake failed for valid reasons.
870 func (cl *Client) connBTHandshake(c *connection, ih *metainfo.Hash) (ret metainfo.Hash, ok bool, err error) {
871         res, ok, err := handshake(c.rw, ih, cl.peerID, cl.extensionBytes)
872         if err != nil || !ok {
873                 return
874         }
875         ret = res.Hash
876         c.PeerExtensionBytes = res.peerExtensionBytes
877         c.PeerID = res.peerID
878         c.completedHandshake = time.Now()
879         return
880 }
881
882 func (cl *Client) runInitiatedHandshookConn(c *connection, t *Torrent) {
883         if c.PeerID == cl.peerID {
884                 connsToSelf.Add(1)
885                 addr := c.conn.RemoteAddr().String()
886                 cl.dopplegangerAddrs[addr] = struct{}{}
887                 return
888         }
889         cl.runHandshookConn(c, t)
890 }
891
892 func (cl *Client) runReceivedConn(c *connection) {
893         err := c.conn.SetDeadline(time.Now().Add(handshakesTimeout))
894         if err != nil {
895                 panic(err)
896         }
897         t, err := cl.receiveHandshakes(c)
898         if err != nil {
899                 if cl.config.Debug {
900                         log.Printf("error receiving handshakes: %s", err)
901                 }
902                 return
903         }
904         if t == nil {
905                 return
906         }
907         cl.mu.Lock()
908         defer cl.mu.Unlock()
909         if c.PeerID == cl.peerID {
910                 // Because the remote address is not necessarily the same as its
911                 // client's torrent listen address, we won't record the remote address
912                 // as a doppleganger. Instead, the initiator can record *us* as the
913                 // doppleganger.
914                 return
915         }
916         cl.runHandshookConn(c, t)
917 }
918
919 func (cl *Client) runHandshookConn(c *connection, t *Torrent) {
920         c.conn.SetWriteDeadline(time.Time{})
921         c.rw = readWriter{
922                 deadlineReader{c.conn, c.rw},
923                 c.rw,
924         }
925         completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
926         if !t.addConnection(c) {
927                 return
928         }
929         defer t.dropConnection(c)
930         go c.writer(time.Minute)
931         cl.sendInitialMessages(c, t)
932         err := cl.connectionLoop(t, c)
933         if err != nil && cl.config.Debug {
934                 log.Printf("error during connection loop: %s", err)
935         }
936 }
937
938 func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
939         if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
940                 conn.Post(pp.Message{
941                         Type:       pp.Extended,
942                         ExtendedID: pp.HandshakeExtendedID,
943                         ExtendedPayload: func() []byte {
944                                 d := map[string]interface{}{
945                                         "m": func() (ret map[string]int) {
946                                                 ret = make(map[string]int, 2)
947                                                 ret["ut_metadata"] = metadataExtendedId
948                                                 if !cl.config.DisablePEX {
949                                                         ret["ut_pex"] = pexExtendedId
950                                                 }
951                                                 return
952                                         }(),
953                                         "v": extendedHandshakeClientVersion,
954                                         // No upload queue is implemented yet.
955                                         "reqq": 64,
956                                 }
957                                 if !cl.config.DisableEncryption {
958                                         d["e"] = 1
959                                 }
960                                 if torrent.metadataSizeKnown() {
961                                         d["metadata_size"] = torrent.metadataSize()
962                                 }
963                                 if p := cl.incomingPeerPort(); p != 0 {
964                                         d["p"] = p
965                                 }
966                                 yourip, err := addrCompactIP(conn.remoteAddr())
967                                 if err != nil {
968                                         log.Printf("error calculating yourip field value in extension handshake: %s", err)
969                                 } else {
970                                         d["yourip"] = yourip
971                                 }
972                                 // log.Printf("sending %v", d)
973                                 b, err := bencode.Marshal(d)
974                                 if err != nil {
975                                         panic(err)
976                                 }
977                                 return b
978                         }(),
979                 })
980         }
981         if torrent.haveAnyPieces() {
982                 conn.Bitfield(torrent.bitfield())
983         } else if cl.extensionBytes.SupportsFast() && conn.PeerExtensionBytes.SupportsFast() {
984                 conn.Post(pp.Message{
985                         Type: pp.HaveNone,
986                 })
987         }
988         if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.dHT != nil {
989                 conn.Post(pp.Message{
990                         Type: pp.Port,
991                         Port: uint16(missinggo.AddrPort(cl.dHT.Addr())),
992                 })
993         }
994 }
995
996 func (cl *Client) peerUnchoked(torrent *Torrent, conn *connection) {
997         conn.updateRequests()
998 }
999
1000 func (cl *Client) connCancel(t *Torrent, cn *connection, r request) (ok bool) {
1001         ok = cn.Cancel(r)
1002         if ok {
1003                 postedCancels.Add(1)
1004         }
1005         return
1006 }
1007
1008 func (cl *Client) connDeleteRequest(t *Torrent, cn *connection, r request) bool {
1009         if !cn.RequestPending(r) {
1010                 return false
1011         }
1012         delete(cn.Requests, r)
1013         return true
1014 }
1015
1016 // Process incoming ut_metadata message.
1017 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) error {
1018         var d map[string]int
1019         err := bencode.Unmarshal(payload, &d)
1020         if err != nil {
1021                 return fmt.Errorf("error unmarshalling payload: %s: %q", err, payload)
1022         }
1023         msgType, ok := d["msg_type"]
1024         if !ok {
1025                 return errors.New("missing msg_type field")
1026         }
1027         piece := d["piece"]
1028         switch msgType {
1029         case pp.DataMetadataExtensionMsgType:
1030                 if !c.requestedMetadataPiece(piece) {
1031                         return fmt.Errorf("got unexpected piece %d", piece)
1032                 }
1033                 c.metadataRequests[piece] = false
1034                 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
1035                 if begin < 0 || begin >= len(payload) {
1036                         return fmt.Errorf("data has bad offset in payload: %d", begin)
1037                 }
1038                 t.saveMetadataPiece(piece, payload[begin:])
1039                 c.UsefulChunksReceived++
1040                 c.lastUsefulChunkReceived = time.Now()
1041                 return t.maybeCompleteMetadata()
1042         case pp.RequestMetadataExtensionMsgType:
1043                 if !t.haveMetadataPiece(piece) {
1044                         c.Post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
1045                         return nil
1046                 }
1047                 start := (1 << 14) * piece
1048                 c.Post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1049                 return nil
1050         case pp.RejectMetadataExtensionMsgType:
1051                 return nil
1052         default:
1053                 return errors.New("unknown msg_type value")
1054         }
1055 }
1056
1057 func (cl *Client) upload(t *Torrent, c *connection) {
1058         if cl.config.NoUpload {
1059                 return
1060         }
1061         if !c.PeerInterested {
1062                 return
1063         }
1064         seeding := t.seeding()
1065         if !seeding && !t.connHasWantedPieces(c) {
1066                 return
1067         }
1068 another:
1069         for seeding || c.chunksSent < c.UsefulChunksReceived+6 {
1070                 c.Unchoke()
1071                 for r := range c.PeerRequests {
1072                         err := cl.sendChunk(t, c, r)
1073                         if err != nil {
1074                                 i := int(r.Index)
1075                                 if t.pieceComplete(i) {
1076                                         t.updatePieceCompletion(i)
1077                                         if !t.pieceComplete(i) {
1078                                                 // We had the piece, but not anymore.
1079                                                 break another
1080                                         }
1081                                 }
1082                                 log.Printf("error sending chunk %+v to peer: %s", r, err)
1083                                 // If we failed to send a chunk, choke the peer to ensure they
1084                                 // flush all their requests. We've probably dropped a piece,
1085                                 // but there's no way to communicate this to the peer. If they
1086                                 // ask for it again, we'll kick them to allow us to send them
1087                                 // an updated bitfield.
1088                                 break another
1089                         }
1090                         delete(c.PeerRequests, r)
1091                         goto another
1092                 }
1093                 return
1094         }
1095         c.Choke()
1096 }
1097
1098 func (cl *Client) sendChunk(t *Torrent, c *connection, r request) error {
1099         // Count the chunk being sent, even if it isn't.
1100         b := make([]byte, r.Length)
1101         p := t.info.Piece(int(r.Index))
1102         n, err := t.readAt(b, p.Offset()+int64(r.Begin))
1103         if n != len(b) {
1104                 if err == nil {
1105                         panic("expected error")
1106                 }
1107                 return err
1108         }
1109         c.Post(pp.Message{
1110                 Type:  pp.Piece,
1111                 Index: r.Index,
1112                 Begin: r.Begin,
1113                 Piece: b,
1114         })
1115         c.chunksSent++
1116         uploadChunksPosted.Add(1)
1117         c.lastChunkSent = time.Now()
1118         return nil
1119 }
1120
1121 // Processes incoming bittorrent messages. The client lock is held upon entry
1122 // and exit. Returning will end the connection.
1123 func (cl *Client) connectionLoop(t *Torrent, c *connection) error {
1124         decoder := pp.Decoder{
1125                 R:         bufio.NewReader(c.rw),
1126                 MaxLength: 256 * 1024,
1127         }
1128         for {
1129                 cl.mu.Unlock()
1130                 var msg pp.Message
1131                 err := decoder.Decode(&msg)
1132                 cl.mu.Lock()
1133                 if cl.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
1134                         return nil
1135                 }
1136                 if err != nil {
1137                         return err
1138                 }
1139                 c.readMsg(&msg)
1140                 c.lastMessageReceived = time.Now()
1141                 if msg.Keepalive {
1142                         receivedKeepalives.Add(1)
1143                         continue
1144                 }
1145                 receivedMessageTypes.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
1146                 switch msg.Type {
1147                 case pp.Choke:
1148                         c.PeerChoked = true
1149                         c.Requests = nil
1150                         // We can then reset our interest.
1151                         c.updateRequests()
1152                 case pp.Reject:
1153                         cl.connDeleteRequest(t, c, newRequest(msg.Index, msg.Begin, msg.Length))
1154                         c.updateRequests()
1155                 case pp.Unchoke:
1156                         c.PeerChoked = false
1157                         cl.peerUnchoked(t, c)
1158                 case pp.Interested:
1159                         c.PeerInterested = true
1160                         cl.upload(t, c)
1161                 case pp.NotInterested:
1162                         c.PeerInterested = false
1163                         c.Choke()
1164                 case pp.Have:
1165                         err = c.peerSentHave(int(msg.Index))
1166                 case pp.Request:
1167                         if c.Choked {
1168                                 break
1169                         }
1170                         if !c.PeerInterested {
1171                                 err = errors.New("peer sent request but isn't interested")
1172                                 break
1173                         }
1174                         if !t.havePiece(msg.Index.Int()) {
1175                                 // This isn't necessarily them screwing up. We can drop pieces
1176                                 // from our storage, and can't communicate this to peers
1177                                 // except by reconnecting.
1178                                 requestsReceivedForMissingPieces.Add(1)
1179                                 err = errors.New("peer requested piece we don't have")
1180                                 break
1181                         }
1182                         if c.PeerRequests == nil {
1183                                 c.PeerRequests = make(map[request]struct{}, maxRequests)
1184                         }
1185                         c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{}
1186                         cl.upload(t, c)
1187                 case pp.Cancel:
1188                         req := newRequest(msg.Index, msg.Begin, msg.Length)
1189                         if !c.PeerCancel(req) {
1190                                 unexpectedCancels.Add(1)
1191                         }
1192                 case pp.Bitfield:
1193                         err = c.peerSentBitfield(msg.Bitfield)
1194                 case pp.HaveAll:
1195                         err = c.peerSentHaveAll()
1196                 case pp.HaveNone:
1197                         err = c.peerSentHaveNone()
1198                 case pp.Piece:
1199                         cl.downloadedChunk(t, c, &msg)
1200                 case pp.Extended:
1201                         switch msg.ExtendedID {
1202                         case pp.HandshakeExtendedID:
1203                                 // TODO: Create a bencode struct for this.
1204                                 var d map[string]interface{}
1205                                 err = bencode.Unmarshal(msg.ExtendedPayload, &d)
1206                                 if err != nil {
1207                                         err = fmt.Errorf("error decoding extended message payload: %s", err)
1208                                         break
1209                                 }
1210                                 // log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d)
1211                                 if reqq, ok := d["reqq"]; ok {
1212                                         if i, ok := reqq.(int64); ok {
1213                                                 c.PeerMaxRequests = int(i)
1214                                         }
1215                                 }
1216                                 if v, ok := d["v"]; ok {
1217                                         c.PeerClientName = v.(string)
1218                                 }
1219                                 m, ok := d["m"]
1220                                 if !ok {
1221                                         err = errors.New("handshake missing m item")
1222                                         break
1223                                 }
1224                                 mTyped, ok := m.(map[string]interface{})
1225                                 if !ok {
1226                                         err = errors.New("handshake m value is not dict")
1227                                         break
1228                                 }
1229                                 if c.PeerExtensionIDs == nil {
1230                                         c.PeerExtensionIDs = make(map[string]byte, len(mTyped))
1231                                 }
1232                                 for name, v := range mTyped {
1233                                         id, ok := v.(int64)
1234                                         if !ok {
1235                                                 log.Printf("bad handshake m item extension ID type: %T", v)
1236                                                 continue
1237                                         }
1238                                         if id == 0 {
1239                                                 delete(c.PeerExtensionIDs, name)
1240                                         } else {
1241                                                 if c.PeerExtensionIDs[name] == 0 {
1242                                                         supportedExtensionMessages.Add(name, 1)
1243                                                 }
1244                                                 c.PeerExtensionIDs[name] = byte(id)
1245                                         }
1246                                 }
1247                                 metadata_sizeUntyped, ok := d["metadata_size"]
1248                                 if ok {
1249                                         metadata_size, ok := metadata_sizeUntyped.(int64)
1250                                         if !ok {
1251                                                 log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
1252                                         } else {
1253                                                 err = t.setMetadataSize(metadata_size)
1254                                                 if err != nil {
1255                                                         err = fmt.Errorf("error setting metadata size to %d", metadata_size)
1256                                                         break
1257                                                 }
1258                                         }
1259                                 }
1260                                 if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
1261                                         c.requestPendingMetadata()
1262                                 }
1263                         case metadataExtendedId:
1264                                 err = cl.gotMetadataExtensionMsg(msg.ExtendedPayload, t, c)
1265                                 if err != nil {
1266                                         err = fmt.Errorf("error handling metadata extension message: %s", err)
1267                                 }
1268                         case pexExtendedId:
1269                                 if cl.config.DisablePEX {
1270                                         break
1271                                 }
1272                                 var pexMsg peerExchangeMessage
1273                                 err = bencode.Unmarshal(msg.ExtendedPayload, &pexMsg)
1274                                 if err != nil {
1275                                         err = fmt.Errorf("error unmarshalling PEX message: %s", err)
1276                                         break
1277                                 }
1278                                 go func() {
1279                                         cl.mu.Lock()
1280                                         t.addPeers(func() (ret []Peer) {
1281                                                 for i, cp := range pexMsg.Added {
1282                                                         p := Peer{
1283                                                                 IP:     make([]byte, 4),
1284                                                                 Port:   cp.Port,
1285                                                                 Source: peerSourcePEX,
1286                                                         }
1287                                                         if i < len(pexMsg.AddedFlags) && pexMsg.AddedFlags[i]&0x01 != 0 {
1288                                                                 p.SupportsEncryption = true
1289                                                         }
1290                                                         missinggo.CopyExact(p.IP, cp.IP[:])
1291                                                         ret = append(ret, p)
1292                                                 }
1293                                                 return
1294                                         }())
1295                                         cl.mu.Unlock()
1296                                 }()
1297                         default:
1298                                 err = fmt.Errorf("unexpected extended message ID: %v", msg.ExtendedID)
1299                         }
1300                         if err != nil {
1301                                 // That client uses its own extension IDs for outgoing message
1302                                 // types, which is incorrect.
1303                                 if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) ||
1304                                         strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
1305                                         return nil
1306                                 }
1307                         }
1308                 case pp.Port:
1309                         if cl.dHT == nil {
1310                                 break
1311                         }
1312                         pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
1313                         if err != nil {
1314                                 panic(err)
1315                         }
1316                         if msg.Port != 0 {
1317                                 pingAddr.Port = int(msg.Port)
1318                         }
1319                         cl.dHT.Ping(pingAddr)
1320                 default:
1321                         err = fmt.Errorf("received unknown message type: %#v", msg.Type)
1322                 }
1323                 if err != nil {
1324                         return err
1325                 }
1326         }
1327 }
1328
1329 func (cl *Client) openNewConns(t *Torrent) {
1330         defer t.updateWantPeersEvent()
1331         for len(t.peers) != 0 {
1332                 if !t.wantConns() {
1333                         return
1334                 }
1335                 if len(t.halfOpen) >= cl.halfOpenLimit {
1336                         return
1337                 }
1338                 var (
1339                         k peersKey
1340                         p Peer
1341                 )
1342                 for k, p = range t.peers {
1343                         break
1344                 }
1345                 delete(t.peers, k)
1346                 cl.initiateConn(p, t)
1347         }
1348 }
1349
1350 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1351         if port == 0 {
1352                 return true
1353         }
1354         if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1355                 return true
1356         }
1357         if _, ok := cl.ipBlockRange(ip); ok {
1358                 return true
1359         }
1360         if _, ok := cl.badPeerIPs[ip.String()]; ok {
1361                 return true
1362         }
1363         return false
1364 }
1365
1366 // Return a Torrent ready for insertion into a Client.
1367 func (cl *Client) newTorrent(ih metainfo.Hash) (t *Torrent) {
1368         t = &Torrent{
1369                 cl:        cl,
1370                 infoHash:  ih,
1371                 chunkSize: defaultChunkSize,
1372                 peers:     make(map[peersKey]Peer),
1373
1374                 halfOpen:          make(map[string]struct{}),
1375                 pieceStateChanges: pubsub.NewPubSub(),
1376
1377                 storageOpener:       cl.defaultStorage,
1378                 maxEstablishedConns: defaultEstablishedConnsPerTorrent,
1379         }
1380         return
1381 }
1382
1383 func init() {
1384         // For shuffling the tracker tiers.
1385         mathRand.Seed(time.Now().Unix())
1386 }
1387
1388 type trackerTier []string
1389
1390 // The trackers within each tier must be shuffled before use.
1391 // http://stackoverflow.com/a/12267471/149482
1392 // http://www.bittorrent.org/beps/bep_0012.html#order-of-processing
1393 func shuffleTier(tier trackerTier) {
1394         for i := range tier {
1395                 j := mathRand.Intn(i + 1)
1396                 tier[i], tier[j] = tier[j], tier[i]
1397         }
1398 }
1399
1400 // A file-like handle to some torrent data resource.
1401 type Handle interface {
1402         io.Reader
1403         io.Seeker
1404         io.Closer
1405         io.ReaderAt
1406 }
1407
1408 // Specifies a new torrent for adding to a client. There are helpers for
1409 // magnet URIs and torrent metainfo files.
1410 type TorrentSpec struct {
1411         // The tiered tracker URIs.
1412         Trackers  [][]string
1413         InfoHash  metainfo.Hash
1414         InfoBytes []byte
1415         // The name to use if the Name field from the Info isn't available.
1416         DisplayName string
1417         // The chunk size to use for outbound requests. Defaults to 16KiB if not
1418         // set.
1419         ChunkSize int
1420         Storage   storage.Client
1421 }
1422
1423 func TorrentSpecFromMagnetURI(uri string) (spec *TorrentSpec, err error) {
1424         m, err := metainfo.ParseMagnetURI(uri)
1425         if err != nil {
1426                 return
1427         }
1428         spec = &TorrentSpec{
1429                 Trackers:    [][]string{m.Trackers},
1430                 DisplayName: m.DisplayName,
1431                 InfoHash:    m.InfoHash,
1432         }
1433         return
1434 }
1435
1436 func TorrentSpecFromMetaInfo(mi *metainfo.MetaInfo) (spec *TorrentSpec) {
1437         info := mi.UnmarshalInfo()
1438         spec = &TorrentSpec{
1439                 Trackers:    mi.AnnounceList,
1440                 InfoBytes:   mi.InfoBytes,
1441                 DisplayName: info.Name,
1442                 InfoHash:    mi.HashInfoBytes(),
1443         }
1444         if spec.Trackers == nil && mi.Announce != "" {
1445                 spec.Trackers = [][]string{{mi.Announce}}
1446         }
1447         return
1448 }
1449
1450 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1451         cl.mu.Lock()
1452         defer cl.mu.Unlock()
1453         t, ok := cl.torrents[infoHash]
1454         if ok {
1455                 return
1456         }
1457         new = true
1458         t = cl.newTorrent(infoHash)
1459         if cl.dHT != nil {
1460                 go t.dhtAnnouncer()
1461         }
1462         cl.torrents[infoHash] = t
1463         t.updateWantPeersEvent()
1464         // Tickle Client.waitAccept, new torrent may want conns.
1465         cl.event.Broadcast()
1466         return
1467 }
1468
1469 // Add or merge a torrent spec. If the torrent is already present, the
1470 // trackers will be merged with the existing ones. If the Info isn't yet
1471 // known, it will be set. The display name is replaced if the new spec
1472 // provides one. Returns new if the torrent wasn't already in the client.
1473 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1474         t, new = cl.AddTorrentInfoHash(spec.InfoHash)
1475         if spec.DisplayName != "" {
1476                 t.SetDisplayName(spec.DisplayName)
1477         }
1478         if spec.InfoBytes != nil {
1479                 err = t.SetInfoBytes(spec.InfoBytes)
1480                 if err != nil {
1481                         return
1482                 }
1483         }
1484         cl.mu.Lock()
1485         defer cl.mu.Unlock()
1486         if spec.ChunkSize != 0 {
1487                 t.chunkSize = pp.Integer(spec.ChunkSize)
1488         }
1489         t.addTrackers(spec.Trackers)
1490         t.maybeNewConns()
1491         return
1492 }
1493
1494 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1495         t, ok := cl.torrents[infoHash]
1496         if !ok {
1497                 err = fmt.Errorf("no such torrent")
1498                 return
1499         }
1500         err = t.close()
1501         if err != nil {
1502                 panic(err)
1503         }
1504         delete(cl.torrents, infoHash)
1505         return
1506 }
1507
1508 func (cl *Client) prepareTrackerAnnounceUnlocked(announceURL string) (blocked bool, urlToUse string, host string, err error) {
1509         _url, err := url.Parse(announceURL)
1510         if err != nil {
1511                 return
1512         }
1513         hmp := missinggo.SplitHostMaybePort(_url.Host)
1514         if hmp.Err != nil {
1515                 err = hmp.Err
1516                 return
1517         }
1518         addr, err := net.ResolveIPAddr("ip", hmp.Host)
1519         if err != nil {
1520                 return
1521         }
1522         cl.mu.RLock()
1523         _, blocked = cl.ipBlockRange(addr.IP)
1524         cl.mu.RUnlock()
1525         host = _url.Host
1526         hmp.Host = addr.String()
1527         _url.Host = hmp.String()
1528         urlToUse = _url.String()
1529         return
1530 }
1531
1532 func (cl *Client) allTorrentsCompleted() bool {
1533         for _, t := range cl.torrents {
1534                 if !t.haveInfo() {
1535                         return false
1536                 }
1537                 if t.numPiecesCompleted() != t.numPieces() {
1538                         return false
1539                 }
1540         }
1541         return true
1542 }
1543
1544 // Returns true when all torrents are completely downloaded and false if the
1545 // client is stopped before that.
1546 func (cl *Client) WaitAll() bool {
1547         cl.mu.Lock()
1548         defer cl.mu.Unlock()
1549         for !cl.allTorrentsCompleted() {
1550                 if cl.closed.IsSet() {
1551                         return false
1552                 }
1553                 cl.event.Wait()
1554         }
1555         return true
1556 }
1557
1558 // Handle a received chunk from a peer.
1559 func (cl *Client) downloadedChunk(t *Torrent, c *connection, msg *pp.Message) {
1560         chunksReceived.Add(1)
1561
1562         req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
1563
1564         // Request has been satisfied.
1565         if cl.connDeleteRequest(t, c, req) {
1566                 defer c.updateRequests()
1567         } else {
1568                 unexpectedChunksReceived.Add(1)
1569         }
1570
1571         index := int(req.Index)
1572         piece := &t.pieces[index]
1573
1574         // Do we actually want this chunk?
1575         if !t.wantPiece(req) {
1576                 unwantedChunksReceived.Add(1)
1577                 c.UnwantedChunksReceived++
1578                 return
1579         }
1580
1581         c.UsefulChunksReceived++
1582         c.lastUsefulChunkReceived = time.Now()
1583
1584         cl.upload(t, c)
1585
1586         // Need to record that it hasn't been written yet, before we attempt to do
1587         // anything with it.
1588         piece.incrementPendingWrites()
1589         // Record that we have the chunk.
1590         piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
1591
1592         // Cancel pending requests for this chunk.
1593         for _, c := range t.conns {
1594                 if cl.connCancel(t, c, req) {
1595                         c.updateRequests()
1596                 }
1597         }
1598
1599         cl.mu.Unlock()
1600         // Write the chunk out. Note that the upper bound on chunk writing
1601         // concurrency will be the number of connections.
1602         err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
1603         cl.mu.Lock()
1604
1605         piece.decrementPendingWrites()
1606
1607         if err != nil {
1608                 log.Printf("%s: error writing chunk %v: %s", t, req, err)
1609                 t.pendRequest(req)
1610                 t.updatePieceCompletion(int(msg.Index))
1611                 return
1612         }
1613
1614         // It's important that the piece is potentially queued before we check if
1615         // the piece is still wanted, because if it is queued, it won't be wanted.
1616         if t.pieceAllDirty(index) {
1617                 cl.queuePieceCheck(t, int(req.Index))
1618         }
1619
1620         if c.peerTouchedPieces == nil {
1621                 c.peerTouchedPieces = make(map[int]struct{})
1622         }
1623         c.peerTouchedPieces[index] = struct{}{}
1624
1625         cl.event.Broadcast()
1626         t.publishPieceChange(int(req.Index))
1627         return
1628 }
1629
1630 // Return the connections that touched a piece, and clear the entry while
1631 // doing it.
1632 func (cl *Client) reapPieceTouches(t *Torrent, piece int) (ret []*connection) {
1633         for _, c := range t.conns {
1634                 if _, ok := c.peerTouchedPieces[piece]; ok {
1635                         ret = append(ret, c)
1636                         delete(c.peerTouchedPieces, piece)
1637                 }
1638         }
1639         return
1640 }
1641
1642 func (cl *Client) pieceHashed(t *Torrent, piece int, correct bool) {
1643         if t.closed.IsSet() {
1644                 return
1645         }
1646         p := &t.pieces[piece]
1647         if p.EverHashed {
1648                 // Don't score the first time a piece is hashed, it could be an
1649                 // initial check.
1650                 if correct {
1651                         pieceHashedCorrect.Add(1)
1652                 } else {
1653                         log.Printf("%s: piece %d (%x) failed hash", t, piece, p.Hash)
1654                         pieceHashedNotCorrect.Add(1)
1655                 }
1656         }
1657         p.EverHashed = true
1658         touchers := cl.reapPieceTouches(t, piece)
1659         if correct {
1660                 for _, c := range touchers {
1661                         c.goodPiecesDirtied++
1662                 }
1663                 err := p.Storage().MarkComplete()
1664                 if err != nil {
1665                         log.Printf("%T: error completing piece %d: %s", t.storage, piece, err)
1666                 }
1667                 t.updatePieceCompletion(piece)
1668         } else if len(touchers) != 0 {
1669                 log.Printf("dropping and banning %d conns that touched piece", len(touchers))
1670                 for _, c := range touchers {
1671                         c.badPiecesDirtied++
1672                         t.cl.banPeerIP(missinggo.AddrIP(c.remoteAddr()))
1673                         t.dropConnection(c)
1674                 }
1675         }
1676         cl.pieceChanged(t, piece)
1677 }
1678
1679 func (cl *Client) onCompletedPiece(t *Torrent, piece int) {
1680         t.pendingPieces.Remove(piece)
1681         t.pendAllChunkSpecs(piece)
1682         for _, conn := range t.conns {
1683                 conn.Have(piece)
1684                 for r := range conn.Requests {
1685                         if int(r.Index) == piece {
1686                                 conn.Cancel(r)
1687                         }
1688                 }
1689                 // Could check here if peer doesn't have piece, but due to caching
1690                 // some peers may have said they have a piece but they don't.
1691                 cl.upload(t, conn)
1692         }
1693 }
1694
1695 func (cl *Client) onFailedPiece(t *Torrent, piece int) {
1696         if t.pieceAllDirty(piece) {
1697                 t.pendAllChunkSpecs(piece)
1698         }
1699         if !t.wantPieceIndex(piece) {
1700                 return
1701         }
1702         cl.openNewConns(t)
1703         for _, conn := range t.conns {
1704                 if conn.PeerHasPiece(piece) {
1705                         conn.updateRequests()
1706                 }
1707         }
1708 }
1709
1710 func (cl *Client) pieceChanged(t *Torrent, piece int) {
1711         correct := t.pieceComplete(piece)
1712         defer cl.event.Broadcast()
1713         if correct {
1714                 cl.onCompletedPiece(t, piece)
1715         } else {
1716                 cl.onFailedPiece(t, piece)
1717         }
1718         if t.updatePiecePriority(piece) {
1719                 t.piecePriorityChanged(piece)
1720         }
1721         t.publishPieceChange(piece)
1722 }
1723
1724 func (cl *Client) verifyPiece(t *Torrent, piece int) {
1725         cl.mu.Lock()
1726         defer cl.mu.Unlock()
1727         p := &t.pieces[piece]
1728         for p.Hashing || t.storage == nil {
1729                 cl.event.Wait()
1730         }
1731         p.QueuedForHash = false
1732         if t.closed.IsSet() || t.pieceComplete(piece) {
1733                 t.updatePiecePriority(piece)
1734                 t.publishPieceChange(piece)
1735                 return
1736         }
1737         p.Hashing = true
1738         t.publishPieceChange(piece)
1739         cl.mu.Unlock()
1740         sum := t.hashPiece(piece)
1741         cl.mu.Lock()
1742         p.Hashing = false
1743         cl.pieceHashed(t, piece, sum == p.Hash)
1744 }
1745
1746 // Returns handles to all the torrents loaded in the Client.
1747 func (cl *Client) Torrents() (ret []*Torrent) {
1748         cl.mu.Lock()
1749         for _, t := range cl.torrents {
1750                 ret = append(ret, t)
1751         }
1752         cl.mu.Unlock()
1753         return
1754 }
1755
1756 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1757         spec, err := TorrentSpecFromMagnetURI(uri)
1758         if err != nil {
1759                 return
1760         }
1761         T, _, err = cl.AddTorrentSpec(spec)
1762         return
1763 }
1764
1765 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1766         T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1767         var ss []string
1768         slices.MakeInto(&ss, mi.Nodes)
1769         cl.AddDHTNodes(ss)
1770         return
1771 }
1772
1773 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1774         mi, err := metainfo.LoadFromFile(filename)
1775         if err != nil {
1776                 return
1777         }
1778         return cl.AddTorrent(mi)
1779 }
1780
1781 func (cl *Client) DHT() *dht.Server {
1782         return cl.dHT
1783 }
1784
1785 func (cl *Client) AddDHTNodes(nodes []string) {
1786         for _, n := range nodes {
1787                 hmp := missinggo.SplitHostMaybePort(n)
1788                 ip := net.ParseIP(hmp.Host)
1789                 if ip == nil {
1790                         log.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1791                         continue
1792                 }
1793                 ni := krpc.NodeInfo{
1794                         Addr: &net.UDPAddr{
1795                                 IP:   ip,
1796                                 Port: hmp.Port,
1797                         },
1798                 }
1799                 cl.DHT().AddNode(ni)
1800         }
1801 }
1802
1803 func (cl *Client) banPeerIP(ip net.IP) {
1804         if cl.badPeerIPs == nil {
1805                 cl.badPeerIPs = make(map[string]struct{})
1806         }
1807         cl.badPeerIPs[ip.String()] = struct{}{}
1808 }