]> Sergey Matveev's repositories - btrtrc.git/blob - client.go
2c4655abd01592144e8db570d3faa781af04fe05
[btrtrc.git] / client.go
1 package torrent
2
3 import (
4         "bufio"
5         "bytes"
6         "crypto/rand"
7         "encoding/hex"
8         "errors"
9         "fmt"
10         "io"
11         "log"
12         "math/big"
13         mathRand "math/rand"
14         "net"
15         "net/url"
16         "os"
17         "path/filepath"
18         "sort"
19         "strconv"
20         "strings"
21         "time"
22
23         "github.com/anacrolix/missinggo"
24         "github.com/anacrolix/missinggo/pproffd"
25         "github.com/anacrolix/missinggo/pubsub"
26         "github.com/anacrolix/sync"
27         "github.com/anacrolix/utp"
28
29         "github.com/anacrolix/torrent/bencode"
30         "github.com/anacrolix/torrent/dht"
31         "github.com/anacrolix/torrent/iplist"
32         "github.com/anacrolix/torrent/metainfo"
33         "github.com/anacrolix/torrent/mse"
34         pp "github.com/anacrolix/torrent/peer_protocol"
35         "github.com/anacrolix/torrent/storage"
36         "github.com/anacrolix/torrent/tracker"
37 )
38
39 // Currently doesn't really queue, but should in the future.
40 func (cl *Client) queuePieceCheck(t *Torrent, pieceIndex int) {
41         piece := &t.pieces[pieceIndex]
42         if piece.QueuedForHash {
43                 return
44         }
45         piece.QueuedForHash = true
46         t.publishPieceChange(pieceIndex)
47         go cl.verifyPiece(t, pieceIndex)
48 }
49
50 // Queue a piece check if one isn't already queued, and the piece has never
51 // been checked before.
52 func (cl *Client) queueFirstHash(t *Torrent, piece int) {
53         p := &t.pieces[piece]
54         if p.EverHashed || p.Hashing || p.QueuedForHash || t.pieceComplete(piece) {
55                 return
56         }
57         cl.queuePieceCheck(t, piece)
58 }
59
60 // Clients contain zero or more Torrents. A Client manages a blocklist, the
61 // TCP/UDP protocol ports, and DHT as desired.
62 type Client struct {
63         halfOpenLimit  int
64         peerID         [20]byte
65         listeners      []net.Listener
66         utpSock        *utp.Socket
67         dHT            *dht.Server
68         ipBlockList    iplist.Ranger
69         bannedTorrents map[metainfo.Hash]struct{}
70         config         Config
71         pruneTimer     *time.Timer
72         extensionBytes peerExtensionBytes
73         // Set of addresses that have our client ID. This intentionally will
74         // include ourselves if we end up trying to connect to our own address
75         // through legitimate channels.
76         dopplegangerAddrs map[string]struct{}
77
78         defaultStorage storage.I
79
80         mu     sync.RWMutex
81         event  sync.Cond
82         closed missinggo.Event
83
84         torrents map[metainfo.Hash]*Torrent
85 }
86
87 func (cl *Client) IPBlockList() iplist.Ranger {
88         cl.mu.Lock()
89         defer cl.mu.Unlock()
90         return cl.ipBlockList
91 }
92
93 func (cl *Client) SetIPBlockList(list iplist.Ranger) {
94         cl.mu.Lock()
95         defer cl.mu.Unlock()
96         cl.ipBlockList = list
97         if cl.dHT != nil {
98                 cl.dHT.SetIPBlockList(list)
99         }
100 }
101
102 func (cl *Client) PeerID() string {
103         return string(cl.peerID[:])
104 }
105
106 func (cl *Client) ListenAddr() (addr net.Addr) {
107         for _, l := range cl.listeners {
108                 addr = l.Addr()
109                 break
110         }
111         return
112 }
113
114 type hashSorter struct {
115         Hashes []metainfo.Hash
116 }
117
118 func (hs hashSorter) Len() int {
119         return len(hs.Hashes)
120 }
121
122 func (hs hashSorter) Less(a, b int) bool {
123         return (&big.Int{}).SetBytes(hs.Hashes[a][:]).Cmp((&big.Int{}).SetBytes(hs.Hashes[b][:])) < 0
124 }
125
126 func (hs hashSorter) Swap(a, b int) {
127         hs.Hashes[a], hs.Hashes[b] = hs.Hashes[b], hs.Hashes[a]
128 }
129
130 func (cl *Client) sortedTorrents() (ret []*Torrent) {
131         var hs hashSorter
132         for ih := range cl.torrents {
133                 hs.Hashes = append(hs.Hashes, ih)
134         }
135         sort.Sort(hs)
136         for _, ih := range hs.Hashes {
137                 ret = append(ret, cl.torrent(ih))
138         }
139         return
140 }
141
142 // Writes out a human readable status of the client, such as for writing to a
143 // HTTP status page.
144 func (cl *Client) WriteStatus(_w io.Writer) {
145         cl.mu.RLock()
146         defer cl.mu.RUnlock()
147         w := bufio.NewWriter(_w)
148         defer w.Flush()
149         if addr := cl.ListenAddr(); addr != nil {
150                 fmt.Fprintf(w, "Listening on %s\n", cl.ListenAddr())
151         } else {
152                 fmt.Fprintln(w, "Not listening!")
153         }
154         fmt.Fprintf(w, "Peer ID: %+q\n", cl.peerID)
155         if cl.dHT != nil {
156                 dhtStats := cl.dHT.Stats()
157                 fmt.Fprintf(w, "DHT nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
158                 fmt.Fprintf(w, "DHT Server ID: %x\n", cl.dHT.ID())
159                 fmt.Fprintf(w, "DHT port: %d\n", missinggo.AddrPort(cl.dHT.Addr()))
160                 fmt.Fprintf(w, "DHT announces: %d\n", dhtStats.ConfirmedAnnounces)
161                 fmt.Fprintf(w, "Outstanding transactions: %d\n", dhtStats.OutstandingTransactions)
162         }
163         fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrents))
164         fmt.Fprintln(w)
165         for _, t := range cl.sortedTorrents() {
166                 if t.name() == "" {
167                         fmt.Fprint(w, "<unknown name>")
168                 } else {
169                         fmt.Fprint(w, t.name())
170                 }
171                 fmt.Fprint(w, "\n")
172                 if t.haveInfo() {
173                         fmt.Fprintf(w, "%f%% of %d bytes", 100*(1-float64(t.bytesLeft())/float64(t.length)), t.length)
174                 } else {
175                         w.WriteString("<missing metainfo>")
176                 }
177                 fmt.Fprint(w, "\n")
178                 t.writeStatus(w, cl)
179                 fmt.Fprintln(w)
180         }
181 }
182
183 func (cl *Client) configDir() string {
184         if cl.config.ConfigDir == "" {
185                 return filepath.Join(os.Getenv("HOME"), ".config/torrent")
186         }
187         return cl.config.ConfigDir
188 }
189
190 // The directory where the Client expects to find and store configuration
191 // data. Defaults to $HOME/.config/torrent.
192 func (cl *Client) ConfigDir() string {
193         return cl.configDir()
194 }
195
196 // Creates a new client.
197 func NewClient(cfg *Config) (cl *Client, err error) {
198         if cfg == nil {
199                 cfg = &Config{}
200         }
201
202         defer func() {
203                 if err != nil {
204                         cl = nil
205                 }
206         }()
207         cl = &Client{
208                 halfOpenLimit:     socketsPerTorrent,
209                 config:            *cfg,
210                 defaultStorage:    cfg.DefaultStorage,
211                 dopplegangerAddrs: make(map[string]struct{}),
212                 torrents:          make(map[metainfo.Hash]*Torrent),
213         }
214         missinggo.CopyExact(&cl.extensionBytes, defaultExtensionBytes)
215         cl.event.L = &cl.mu
216         if cl.defaultStorage == nil {
217                 cl.defaultStorage = storage.NewFile(cfg.DataDir)
218         }
219         if cfg.IPBlocklist != nil {
220                 cl.ipBlockList = cfg.IPBlocklist
221         }
222
223         if cfg.PeerID != "" {
224                 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
225         } else {
226                 o := copy(cl.peerID[:], bep20)
227                 _, err = rand.Read(cl.peerID[o:])
228                 if err != nil {
229                         panic("error generating peer id")
230                 }
231         }
232
233         // Returns the laddr string to listen on for the next Listen call.
234         listenAddr := func() string {
235                 if addr := cl.ListenAddr(); addr != nil {
236                         return addr.String()
237                 }
238                 if cfg.ListenAddr == "" {
239                         return ":50007"
240                 }
241                 return cfg.ListenAddr
242         }
243         if !cl.config.DisableTCP {
244                 var l net.Listener
245                 l, err = net.Listen(func() string {
246                         if cl.config.DisableIPv6 {
247                                 return "tcp4"
248                         } else {
249                                 return "tcp"
250                         }
251                 }(), listenAddr())
252                 if err != nil {
253                         return
254                 }
255                 cl.listeners = append(cl.listeners, l)
256                 go cl.acceptConnections(l, false)
257         }
258         if !cl.config.DisableUTP {
259                 cl.utpSock, err = utp.NewSocket(func() string {
260                         if cl.config.DisableIPv6 {
261                                 return "udp4"
262                         } else {
263                                 return "udp"
264                         }
265                 }(), listenAddr())
266                 if err != nil {
267                         return
268                 }
269                 cl.listeners = append(cl.listeners, cl.utpSock)
270                 go cl.acceptConnections(cl.utpSock, true)
271         }
272         if !cfg.NoDHT {
273                 dhtCfg := cfg.DHTConfig
274                 if dhtCfg.IPBlocklist == nil {
275                         dhtCfg.IPBlocklist = cl.ipBlockList
276                 }
277                 if dhtCfg.Addr == "" {
278                         dhtCfg.Addr = listenAddr()
279                 }
280                 if dhtCfg.Conn == nil && cl.utpSock != nil {
281                         dhtCfg.Conn = cl.utpSock
282                 }
283                 cl.dHT, err = dht.NewServer(&dhtCfg)
284                 if err != nil {
285                         return
286                 }
287         }
288
289         return
290 }
291
292 // Stops the client. All connections to peers are closed and all activity will
293 // come to a halt.
294 func (cl *Client) Close() {
295         cl.mu.Lock()
296         defer cl.mu.Unlock()
297         cl.closed.Set()
298         if cl.dHT != nil {
299                 cl.dHT.Close()
300         }
301         for _, l := range cl.listeners {
302                 l.Close()
303         }
304         for _, t := range cl.torrents {
305                 t.close()
306         }
307         cl.event.Broadcast()
308 }
309
310 var ipv6BlockRange = iplist.Range{Description: "non-IPv4 address"}
311
312 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
313         if cl.ipBlockList == nil {
314                 return
315         }
316         ip4 := ip.To4()
317         // If blocklists are enabled, then block non-IPv4 addresses, because
318         // blocklists do not yet support IPv6.
319         if ip4 == nil {
320                 if missinggo.CryHeard() {
321                         log.Printf("blocking non-IPv4 address: %s", ip)
322                 }
323                 r = ipv6BlockRange
324                 blocked = true
325                 return
326         }
327         return cl.ipBlockList.Lookup(ip4)
328 }
329
330 func (cl *Client) waitAccept() {
331         cl.mu.Lock()
332         defer cl.mu.Unlock()
333         for {
334                 for _, t := range cl.torrents {
335                         if cl.wantConns(t) {
336                                 return
337                         }
338                 }
339                 if cl.closed.IsSet() {
340                         return
341                 }
342                 cl.event.Wait()
343         }
344 }
345
346 func (cl *Client) acceptConnections(l net.Listener, utp bool) {
347         for {
348                 cl.waitAccept()
349                 conn, err := l.Accept()
350                 conn = pproffd.WrapNetConn(conn)
351                 if cl.closed.IsSet() {
352                         if conn != nil {
353                                 conn.Close()
354                         }
355                         return
356                 }
357                 if err != nil {
358                         log.Print(err)
359                         // I think something harsher should happen here? Our accept
360                         // routine just fucked off.
361                         return
362                 }
363                 if utp {
364                         acceptUTP.Add(1)
365                 } else {
366                         acceptTCP.Add(1)
367                 }
368                 cl.mu.RLock()
369                 doppleganger := cl.dopplegangerAddr(conn.RemoteAddr().String())
370                 _, blocked := cl.ipBlockRange(missinggo.AddrIP(conn.RemoteAddr()))
371                 cl.mu.RUnlock()
372                 if blocked || doppleganger {
373                         acceptReject.Add(1)
374                         // log.Printf("inbound connection from %s blocked by %s", conn.RemoteAddr(), blockRange)
375                         conn.Close()
376                         continue
377                 }
378                 go cl.incomingConnection(conn, utp)
379         }
380 }
381
382 func (cl *Client) incomingConnection(nc net.Conn, utp bool) {
383         defer nc.Close()
384         if tc, ok := nc.(*net.TCPConn); ok {
385                 tc.SetLinger(0)
386         }
387         c := newConnection()
388         c.conn = nc
389         c.rw = nc
390         c.Discovery = peerSourceIncoming
391         c.uTP = utp
392         err := cl.runReceivedConn(c)
393         if err != nil {
394                 // log.Print(err)
395         }
396 }
397
398 // Returns a handle to the given torrent, if it's present in the client.
399 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
400         cl.mu.Lock()
401         defer cl.mu.Unlock()
402         t, ok = cl.torrents[ih]
403         return
404 }
405
406 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
407         return cl.torrents[ih]
408 }
409
410 type dialResult struct {
411         Conn net.Conn
412         UTP  bool
413 }
414
415 func doDial(dial func(addr string, t *Torrent) (net.Conn, error), ch chan dialResult, utp bool, addr string, t *Torrent) {
416         conn, err := dial(addr, t)
417         if err != nil {
418                 if conn != nil {
419                         conn.Close()
420                 }
421                 conn = nil // Pedantic
422         }
423         ch <- dialResult{conn, utp}
424         if err == nil {
425                 successfulDials.Add(1)
426                 return
427         }
428         unsuccessfulDials.Add(1)
429 }
430
431 func reducedDialTimeout(max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
432         ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
433         if ret < minDialTimeout {
434                 ret = minDialTimeout
435         }
436         return
437 }
438
439 // Returns whether an address is known to connect to a client with our own ID.
440 func (cl *Client) dopplegangerAddr(addr string) bool {
441         _, ok := cl.dopplegangerAddrs[addr]
442         return ok
443 }
444
445 // Start the process of connecting to the given peer for the given torrent if
446 // appropriate.
447 func (cl *Client) initiateConn(peer Peer, t *Torrent) {
448         if peer.Id == cl.peerID {
449                 return
450         }
451         addr := net.JoinHostPort(peer.IP.String(), fmt.Sprintf("%d", peer.Port))
452         if cl.dopplegangerAddr(addr) || t.addrActive(addr) {
453                 duplicateConnsAvoided.Add(1)
454                 return
455         }
456         if r, ok := cl.ipBlockRange(peer.IP); ok {
457                 log.Printf("outbound connect to %s blocked by IP blocklist rule %s", peer.IP, r)
458                 return
459         }
460         t.halfOpen[addr] = struct{}{}
461         go cl.outgoingConnection(t, addr, peer.Source)
462 }
463
464 func (cl *Client) dialTimeout(t *Torrent) time.Duration {
465         cl.mu.Lock()
466         pendingPeers := len(t.peers)
467         cl.mu.Unlock()
468         return reducedDialTimeout(nominalDialTimeout, cl.halfOpenLimit, pendingPeers)
469 }
470
471 func (cl *Client) dialTCP(addr string, t *Torrent) (c net.Conn, err error) {
472         c, err = net.DialTimeout("tcp", addr, cl.dialTimeout(t))
473         if err == nil {
474                 c.(*net.TCPConn).SetLinger(0)
475         }
476         return
477 }
478
479 func (cl *Client) dialUTP(addr string, t *Torrent) (c net.Conn, err error) {
480         return cl.utpSock.DialTimeout(addr, cl.dialTimeout(t))
481 }
482
483 // Returns a connection over UTP or TCP, whichever is first to connect.
484 func (cl *Client) dialFirst(addr string, t *Torrent) (conn net.Conn, utp bool) {
485         // Initiate connections via TCP and UTP simultaneously. Use the first one
486         // that succeeds.
487         left := 0
488         if !cl.config.DisableUTP {
489                 left++
490         }
491         if !cl.config.DisableTCP {
492                 left++
493         }
494         resCh := make(chan dialResult, left)
495         if !cl.config.DisableUTP {
496                 go doDial(cl.dialUTP, resCh, true, addr, t)
497         }
498         if !cl.config.DisableTCP {
499                 go doDial(cl.dialTCP, resCh, false, addr, t)
500         }
501         var res dialResult
502         // Wait for a successful connection.
503         for ; left > 0 && res.Conn == nil; left-- {
504                 res = <-resCh
505         }
506         if left > 0 {
507                 // There are still incompleted dials.
508                 go func() {
509                         for ; left > 0; left-- {
510                                 conn := (<-resCh).Conn
511                                 if conn != nil {
512                                         conn.Close()
513                                 }
514                         }
515                 }()
516         }
517         conn = res.Conn
518         utp = res.UTP
519         return
520 }
521
522 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
523         if _, ok := t.halfOpen[addr]; !ok {
524                 panic("invariant broken")
525         }
526         delete(t.halfOpen, addr)
527         cl.openNewConns(t)
528 }
529
530 // Performs initiator handshakes and returns a connection. Returns nil
531 // *connection if no connection for valid reasons.
532 func (cl *Client) handshakesConnection(nc net.Conn, t *Torrent, encrypted, utp bool) (c *connection, err error) {
533         c = newConnection()
534         c.conn = nc
535         c.rw = nc
536         c.encrypted = encrypted
537         c.uTP = utp
538         err = nc.SetDeadline(time.Now().Add(handshakesTimeout))
539         if err != nil {
540                 return
541         }
542         ok, err := cl.initiateHandshakes(c, t)
543         if !ok {
544                 c = nil
545         }
546         return
547 }
548
549 // Returns nil connection and nil error if no connection could be established
550 // for valid reasons.
551 func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) {
552         nc, utp := cl.dialFirst(addr, t)
553         if nc == nil {
554                 return
555         }
556         c, err = cl.handshakesConnection(nc, t, !cl.config.DisableEncryption, utp)
557         if err != nil {
558                 nc.Close()
559                 return
560         } else if c != nil {
561                 return
562         }
563         nc.Close()
564         if cl.config.DisableEncryption {
565                 // We already tried without encryption.
566                 return
567         }
568         // Try again without encryption, using whichever protocol type worked last
569         // time.
570         if utp {
571                 nc, err = cl.dialUTP(addr, t)
572         } else {
573                 nc, err = cl.dialTCP(addr, t)
574         }
575         if err != nil {
576                 err = fmt.Errorf("error dialing for unencrypted connection: %s", err)
577                 return
578         }
579         c, err = cl.handshakesConnection(nc, t, false, utp)
580         if err != nil || c == nil {
581                 nc.Close()
582         }
583         return
584 }
585
586 // Called to dial out and run a connection. The addr we're given is already
587 // considered half-open.
588 func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
589         c, err := cl.establishOutgoingConn(t, addr)
590         cl.mu.Lock()
591         defer cl.mu.Unlock()
592         // Don't release lock between here and addConnection, unless it's for
593         // failure.
594         cl.noLongerHalfOpen(t, addr)
595         if err != nil {
596                 if cl.config.Debug {
597                         log.Printf("error establishing outgoing connection: %s", err)
598                 }
599                 return
600         }
601         if c == nil {
602                 return
603         }
604         defer c.Close()
605         c.Discovery = ps
606         err = cl.runInitiatedHandshookConn(c, t)
607         if err != nil {
608                 if cl.config.Debug {
609                         log.Printf("error in established outgoing connection: %s", err)
610                 }
611         }
612 }
613
614 // The port number for incoming peer connections. 0 if the client isn't
615 // listening.
616 func (cl *Client) incomingPeerPort() int {
617         listenAddr := cl.ListenAddr()
618         if listenAddr == nil {
619                 return 0
620         }
621         return missinggo.AddrPort(listenAddr)
622 }
623
624 // Convert a net.Addr to its compact IP representation. Either 4 or 16 bytes
625 // per "yourip" field of http://www.bittorrent.org/beps/bep_0010.html.
626 func addrCompactIP(addr net.Addr) (string, error) {
627         host, _, err := net.SplitHostPort(addr.String())
628         if err != nil {
629                 return "", err
630         }
631         ip := net.ParseIP(host)
632         if v4 := ip.To4(); v4 != nil {
633                 if len(v4) != 4 {
634                         panic(v4)
635                 }
636                 return string(v4), nil
637         }
638         return string(ip.To16()), nil
639 }
640
641 func handshakeWriter(w io.Writer, bb <-chan []byte, done chan<- error) {
642         var err error
643         for b := range bb {
644                 _, err = w.Write(b)
645                 if err != nil {
646                         break
647                 }
648         }
649         done <- err
650 }
651
652 type (
653         peerExtensionBytes [8]byte
654         peerID             [20]byte
655 )
656
657 func (pex *peerExtensionBytes) SupportsExtended() bool {
658         return pex[5]&0x10 != 0
659 }
660
661 func (pex *peerExtensionBytes) SupportsDHT() bool {
662         return pex[7]&0x01 != 0
663 }
664
665 func (pex *peerExtensionBytes) SupportsFast() bool {
666         return pex[7]&0x04 != 0
667 }
668
669 type handshakeResult struct {
670         peerExtensionBytes
671         peerID
672         metainfo.Hash
673 }
674
675 // ih is nil if we expect the peer to declare the InfoHash, such as when the
676 // peer initiated the connection. Returns ok if the handshake was successful,
677 // and err if there was an unexpected condition other than the peer simply
678 // abandoning the handshake.
679 func handshake(sock io.ReadWriter, ih *metainfo.Hash, peerID [20]byte, extensions peerExtensionBytes) (res handshakeResult, ok bool, err error) {
680         // Bytes to be sent to the peer. Should never block the sender.
681         postCh := make(chan []byte, 4)
682         // A single error value sent when the writer completes.
683         writeDone := make(chan error, 1)
684         // Performs writes to the socket and ensures posts don't block.
685         go handshakeWriter(sock, postCh, writeDone)
686
687         defer func() {
688                 close(postCh) // Done writing.
689                 if !ok {
690                         return
691                 }
692                 if err != nil {
693                         panic(err)
694                 }
695                 // Wait until writes complete before returning from handshake.
696                 err = <-writeDone
697                 if err != nil {
698                         err = fmt.Errorf("error writing: %s", err)
699                 }
700         }()
701
702         post := func(bb []byte) {
703                 select {
704                 case postCh <- bb:
705                 default:
706                         panic("mustn't block while posting")
707                 }
708         }
709
710         post([]byte(pp.Protocol))
711         post(extensions[:])
712         if ih != nil { // We already know what we want.
713                 post(ih[:])
714                 post(peerID[:])
715         }
716         var b [68]byte
717         _, err = io.ReadFull(sock, b[:68])
718         if err != nil {
719                 err = nil
720                 return
721         }
722         if string(b[:20]) != pp.Protocol {
723                 return
724         }
725         missinggo.CopyExact(&res.peerExtensionBytes, b[20:28])
726         missinggo.CopyExact(&res.Hash, b[28:48])
727         missinggo.CopyExact(&res.peerID, b[48:68])
728         peerExtensions.Add(hex.EncodeToString(res.peerExtensionBytes[:]), 1)
729
730         // TODO: Maybe we can just drop peers here if we're not interested. This
731         // could prevent them trying to reconnect, falsely believing there was
732         // just a problem.
733         if ih == nil { // We were waiting for the peer to tell us what they wanted.
734                 post(res.Hash[:])
735                 post(peerID[:])
736         }
737
738         ok = true
739         return
740 }
741
742 // Wraps a raw connection and provides the interface we want for using the
743 // connection in the message loop.
744 type deadlineReader struct {
745         nc net.Conn
746         r  io.Reader
747 }
748
749 func (r deadlineReader) Read(b []byte) (n int, err error) {
750         // Keep-alives should be received every 2 mins. Give a bit of gracetime.
751         err = r.nc.SetReadDeadline(time.Now().Add(150 * time.Second))
752         if err != nil {
753                 err = fmt.Errorf("error setting read deadline: %s", err)
754         }
755         n, err = r.r.Read(b)
756         // Convert common errors into io.EOF.
757         // if err != nil {
758         //      if opError, ok := err.(*net.OpError); ok && opError.Op == "read" && opError.Err == syscall.ECONNRESET {
759         //              err = io.EOF
760         //      } else if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
761         //              if n != 0 {
762         //                      panic(n)
763         //              }
764         //              err = io.EOF
765         //      }
766         // }
767         return
768 }
769
770 type readWriter struct {
771         io.Reader
772         io.Writer
773 }
774
775 func maybeReceiveEncryptedHandshake(rw io.ReadWriter, skeys [][]byte) (ret io.ReadWriter, encrypted bool, err error) {
776         var protocol [len(pp.Protocol)]byte
777         _, err = io.ReadFull(rw, protocol[:])
778         if err != nil {
779                 return
780         }
781         ret = readWriter{
782                 io.MultiReader(bytes.NewReader(protocol[:]), rw),
783                 rw,
784         }
785         if string(protocol[:]) == pp.Protocol {
786                 return
787         }
788         encrypted = true
789         ret, err = mse.ReceiveHandshake(ret, skeys)
790         return
791 }
792
793 func (cl *Client) receiveSkeys() (ret [][]byte) {
794         for ih := range cl.torrents {
795                 ret = append(ret, ih[:])
796         }
797         return
798 }
799
800 func (cl *Client) initiateHandshakes(c *connection, t *Torrent) (ok bool, err error) {
801         if c.encrypted {
802                 c.rw, err = mse.InitiateHandshake(c.rw, t.infoHash[:], nil)
803                 if err != nil {
804                         return
805                 }
806         }
807         ih, ok, err := cl.connBTHandshake(c, &t.infoHash)
808         if ih != t.infoHash {
809                 ok = false
810         }
811         return
812 }
813
814 // Do encryption and bittorrent handshakes as receiver.
815 func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
816         cl.mu.Lock()
817         skeys := cl.receiveSkeys()
818         cl.mu.Unlock()
819         if !cl.config.DisableEncryption {
820                 c.rw, c.encrypted, err = maybeReceiveEncryptedHandshake(c.rw, skeys)
821                 if err != nil {
822                         if err == mse.ErrNoSecretKeyMatch {
823                                 err = nil
824                         }
825                         return
826                 }
827         }
828         ih, ok, err := cl.connBTHandshake(c, nil)
829         if err != nil {
830                 err = fmt.Errorf("error during bt handshake: %s", err)
831                 return
832         }
833         if !ok {
834                 return
835         }
836         cl.mu.Lock()
837         t = cl.torrents[ih]
838         cl.mu.Unlock()
839         return
840 }
841
842 // Returns !ok if handshake failed for valid reasons.
843 func (cl *Client) connBTHandshake(c *connection, ih *metainfo.Hash) (ret metainfo.Hash, ok bool, err error) {
844         res, ok, err := handshake(c.rw, ih, cl.peerID, cl.extensionBytes)
845         if err != nil || !ok {
846                 return
847         }
848         ret = res.Hash
849         c.PeerExtensionBytes = res.peerExtensionBytes
850         c.PeerID = res.peerID
851         c.completedHandshake = time.Now()
852         return
853 }
854
855 func (cl *Client) runInitiatedHandshookConn(c *connection, t *Torrent) (err error) {
856         if c.PeerID == cl.peerID {
857                 // Only if we initiated the connection is the remote address a
858                 // listen addr for a doppleganger.
859                 connsToSelf.Add(1)
860                 addr := c.conn.RemoteAddr().String()
861                 cl.dopplegangerAddrs[addr] = struct{}{}
862                 return
863         }
864         return cl.runHandshookConn(c, t)
865 }
866
867 func (cl *Client) runReceivedConn(c *connection) (err error) {
868         err = c.conn.SetDeadline(time.Now().Add(handshakesTimeout))
869         if err != nil {
870                 return
871         }
872         t, err := cl.receiveHandshakes(c)
873         if err != nil {
874                 err = fmt.Errorf("error receiving handshakes: %s", err)
875                 return
876         }
877         if t == nil {
878                 return
879         }
880         cl.mu.Lock()
881         defer cl.mu.Unlock()
882         if c.PeerID == cl.peerID {
883                 return
884         }
885         return cl.runHandshookConn(c, t)
886 }
887
888 func (cl *Client) runHandshookConn(c *connection, t *Torrent) (err error) {
889         c.conn.SetWriteDeadline(time.Time{})
890         c.rw = readWriter{
891                 deadlineReader{c.conn, c.rw},
892                 c.rw,
893         }
894         completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
895         if !cl.addConnection(t, c) {
896                 return
897         }
898         defer cl.dropConnection(t, c)
899         go c.writer()
900         go c.writeOptimizer(time.Minute)
901         cl.sendInitialMessages(c, t)
902         err = cl.connectionLoop(t, c)
903         if err != nil {
904                 err = fmt.Errorf("error during connection loop: %s", err)
905         }
906         return
907 }
908
909 func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
910         if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
911                 conn.Post(pp.Message{
912                         Type:       pp.Extended,
913                         ExtendedID: pp.HandshakeExtendedID,
914                         ExtendedPayload: func() []byte {
915                                 d := map[string]interface{}{
916                                         "m": func() (ret map[string]int) {
917                                                 ret = make(map[string]int, 2)
918                                                 ret["ut_metadata"] = metadataExtendedId
919                                                 if !cl.config.DisablePEX {
920                                                         ret["ut_pex"] = pexExtendedId
921                                                 }
922                                                 return
923                                         }(),
924                                         "v": extendedHandshakeClientVersion,
925                                         // No upload queue is implemented yet.
926                                         "reqq": 64,
927                                 }
928                                 if !cl.config.DisableEncryption {
929                                         d["e"] = 1
930                                 }
931                                 if torrent.metadataSizeKnown() {
932                                         d["metadata_size"] = torrent.metadataSize()
933                                 }
934                                 if p := cl.incomingPeerPort(); p != 0 {
935                                         d["p"] = p
936                                 }
937                                 yourip, err := addrCompactIP(conn.remoteAddr())
938                                 if err != nil {
939                                         log.Printf("error calculating yourip field value in extension handshake: %s", err)
940                                 } else {
941                                         d["yourip"] = yourip
942                                 }
943                                 // log.Printf("sending %v", d)
944                                 b, err := bencode.Marshal(d)
945                                 if err != nil {
946                                         panic(err)
947                                 }
948                                 return b
949                         }(),
950                 })
951         }
952         if torrent.haveAnyPieces() {
953                 conn.Bitfield(torrent.bitfield())
954         } else if cl.extensionBytes.SupportsFast() && conn.PeerExtensionBytes.SupportsFast() {
955                 conn.Post(pp.Message{
956                         Type: pp.HaveNone,
957                 })
958         }
959         if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.dHT != nil {
960                 conn.Post(pp.Message{
961                         Type: pp.Port,
962                         Port: uint16(missinggo.AddrPort(cl.dHT.Addr())),
963                 })
964         }
965 }
966
967 func (cl *Client) peerUnchoked(torrent *Torrent, conn *connection) {
968         conn.updateRequests()
969 }
970
971 func (cl *Client) connCancel(t *Torrent, cn *connection, r request) (ok bool) {
972         ok = cn.Cancel(r)
973         if ok {
974                 postedCancels.Add(1)
975         }
976         return
977 }
978
979 func (cl *Client) connDeleteRequest(t *Torrent, cn *connection, r request) bool {
980         if !cn.RequestPending(r) {
981                 return false
982         }
983         delete(cn.Requests, r)
984         return true
985 }
986
987 func (cl *Client) requestPendingMetadata(t *Torrent, c *connection) {
988         if t.haveInfo() {
989                 return
990         }
991         if c.PeerExtensionIDs["ut_metadata"] == 0 {
992                 // Peer doesn't support this.
993                 return
994         }
995         // Request metadata pieces that we don't have in a random order.
996         var pending []int
997         for index := 0; index < t.metadataPieceCount(); index++ {
998                 if !t.haveMetadataPiece(index) && !c.requestedMetadataPiece(index) {
999                         pending = append(pending, index)
1000                 }
1001         }
1002         for _, i := range mathRand.Perm(len(pending)) {
1003                 c.requestMetadataPiece(pending[i])
1004         }
1005 }
1006
1007 // Process incoming ut_metadata message.
1008 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) (err error) {
1009         var d map[string]int
1010         err = bencode.Unmarshal(payload, &d)
1011         if err != nil {
1012                 err = fmt.Errorf("error unmarshalling payload: %s: %q", err, payload)
1013                 return
1014         }
1015         msgType, ok := d["msg_type"]
1016         if !ok {
1017                 err = errors.New("missing msg_type field")
1018                 return
1019         }
1020         piece := d["piece"]
1021         switch msgType {
1022         case pp.DataMetadataExtensionMsgType:
1023                 if !c.requestedMetadataPiece(piece) {
1024                         err = fmt.Errorf("got unexpected piece %d", piece)
1025                         return
1026                 }
1027                 c.metadataRequests[piece] = false
1028                 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
1029                 if begin < 0 || begin >= len(payload) {
1030                         err = fmt.Errorf("data has bad offset in payload: %d", begin)
1031                         return
1032                 }
1033                 t.saveMetadataPiece(piece, payload[begin:])
1034                 c.UsefulChunksReceived++
1035                 c.lastUsefulChunkReceived = time.Now()
1036                 t.maybeMetadataCompleted()
1037         case pp.RequestMetadataExtensionMsgType:
1038                 if !t.haveMetadataPiece(piece) {
1039                         c.Post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
1040                         break
1041                 }
1042                 start := (1 << 14) * piece
1043                 c.Post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1044         case pp.RejectMetadataExtensionMsgType:
1045         default:
1046                 err = errors.New("unknown msg_type value")
1047         }
1048         return
1049 }
1050
1051 func (cl *Client) upload(t *Torrent, c *connection) {
1052         if cl.config.NoUpload {
1053                 return
1054         }
1055         if !c.PeerInterested {
1056                 return
1057         }
1058         seeding := cl.seeding(t)
1059         if !seeding && !t.connHasWantedPieces(c) {
1060                 return
1061         }
1062 another:
1063         for seeding || c.chunksSent < c.UsefulChunksReceived+6 {
1064                 c.Unchoke()
1065                 for r := range c.PeerRequests {
1066                         err := cl.sendChunk(t, c, r)
1067                         if err != nil {
1068                                 if t.pieceComplete(int(r.Index)) && err == io.ErrUnexpectedEOF {
1069                                         // We had the piece, but not anymore.
1070                                 } else {
1071                                         log.Printf("error sending chunk %+v to peer: %s", r, err)
1072                                 }
1073                                 // If we failed to send a chunk, choke the peer to ensure they
1074                                 // flush all their requests. We've probably dropped a piece,
1075                                 // but there's no way to communicate this to the peer. If they
1076                                 // ask for it again, we'll kick them to allow us to send them
1077                                 // an updated bitfield.
1078                                 break another
1079                         }
1080                         delete(c.PeerRequests, r)
1081                         goto another
1082                 }
1083                 return
1084         }
1085         c.Choke()
1086 }
1087
1088 func (cl *Client) sendChunk(t *Torrent, c *connection, r request) error {
1089         // Count the chunk being sent, even if it isn't.
1090         b := make([]byte, r.Length)
1091         p := t.info.Piece(int(r.Index))
1092         n, err := t.readAt(b, p.Offset()+int64(r.Begin))
1093         if n != len(b) {
1094                 if err == nil {
1095                         panic("expected error")
1096                 }
1097                 return err
1098         }
1099         c.Post(pp.Message{
1100                 Type:  pp.Piece,
1101                 Index: r.Index,
1102                 Begin: r.Begin,
1103                 Piece: b,
1104         })
1105         c.chunksSent++
1106         uploadChunksPosted.Add(1)
1107         c.lastChunkSent = time.Now()
1108         return nil
1109 }
1110
1111 // Processes incoming bittorrent messages. The client lock is held upon entry
1112 // and exit.
1113 func (cl *Client) connectionLoop(t *Torrent, c *connection) error {
1114         decoder := pp.Decoder{
1115                 R:         bufio.NewReader(c.rw),
1116                 MaxLength: 256 * 1024,
1117         }
1118         for {
1119                 cl.mu.Unlock()
1120                 var msg pp.Message
1121                 err := decoder.Decode(&msg)
1122                 cl.mu.Lock()
1123                 if cl.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
1124                         return nil
1125                 }
1126                 if err != nil {
1127                         return err
1128                 }
1129                 c.lastMessageReceived = time.Now()
1130                 if msg.Keepalive {
1131                         receivedKeepalives.Add(1)
1132                         continue
1133                 }
1134                 receivedMessageTypes.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
1135                 switch msg.Type {
1136                 case pp.Choke:
1137                         c.PeerChoked = true
1138                         c.Requests = nil
1139                         // We can then reset our interest.
1140                         c.updateRequests()
1141                 case pp.Reject:
1142                         cl.connDeleteRequest(t, c, newRequest(msg.Index, msg.Begin, msg.Length))
1143                         c.updateRequests()
1144                 case pp.Unchoke:
1145                         c.PeerChoked = false
1146                         cl.peerUnchoked(t, c)
1147                 case pp.Interested:
1148                         c.PeerInterested = true
1149                         cl.upload(t, c)
1150                 case pp.NotInterested:
1151                         c.PeerInterested = false
1152                         c.Choke()
1153                 case pp.Have:
1154                         err = c.peerSentHave(int(msg.Index))
1155                 case pp.Request:
1156                         if c.Choked {
1157                                 break
1158                         }
1159                         if !c.PeerInterested {
1160                                 err = errors.New("peer sent request but isn't interested")
1161                                 break
1162                         }
1163                         if !t.havePiece(msg.Index.Int()) {
1164                                 // This isn't necessarily them screwing up. We can drop pieces
1165                                 // from our storage, and can't communicate this to peers
1166                                 // except by reconnecting.
1167                                 requestsReceivedForMissingPieces.Add(1)
1168                                 err = errors.New("peer requested piece we don't have")
1169                                 break
1170                         }
1171                         if c.PeerRequests == nil {
1172                                 c.PeerRequests = make(map[request]struct{}, maxRequests)
1173                         }
1174                         c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{}
1175                         cl.upload(t, c)
1176                 case pp.Cancel:
1177                         req := newRequest(msg.Index, msg.Begin, msg.Length)
1178                         if !c.PeerCancel(req) {
1179                                 unexpectedCancels.Add(1)
1180                         }
1181                 case pp.Bitfield:
1182                         err = c.peerSentBitfield(msg.Bitfield)
1183                 case pp.HaveAll:
1184                         err = c.peerSentHaveAll()
1185                 case pp.HaveNone:
1186                         err = c.peerSentHaveNone()
1187                 case pp.Piece:
1188                         cl.downloadedChunk(t, c, &msg)
1189                 case pp.Extended:
1190                         switch msg.ExtendedID {
1191                         case pp.HandshakeExtendedID:
1192                                 // TODO: Create a bencode struct for this.
1193                                 var d map[string]interface{}
1194                                 err = bencode.Unmarshal(msg.ExtendedPayload, &d)
1195                                 if err != nil {
1196                                         err = fmt.Errorf("error decoding extended message payload: %s", err)
1197                                         break
1198                                 }
1199                                 // log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d)
1200                                 if reqq, ok := d["reqq"]; ok {
1201                                         if i, ok := reqq.(int64); ok {
1202                                                 c.PeerMaxRequests = int(i)
1203                                         }
1204                                 }
1205                                 if v, ok := d["v"]; ok {
1206                                         c.PeerClientName = v.(string)
1207                                 }
1208                                 m, ok := d["m"]
1209                                 if !ok {
1210                                         err = errors.New("handshake missing m item")
1211                                         break
1212                                 }
1213                                 mTyped, ok := m.(map[string]interface{})
1214                                 if !ok {
1215                                         err = errors.New("handshake m value is not dict")
1216                                         break
1217                                 }
1218                                 if c.PeerExtensionIDs == nil {
1219                                         c.PeerExtensionIDs = make(map[string]byte, len(mTyped))
1220                                 }
1221                                 for name, v := range mTyped {
1222                                         id, ok := v.(int64)
1223                                         if !ok {
1224                                                 log.Printf("bad handshake m item extension ID type: %T", v)
1225                                                 continue
1226                                         }
1227                                         if id == 0 {
1228                                                 delete(c.PeerExtensionIDs, name)
1229                                         } else {
1230                                                 if c.PeerExtensionIDs[name] == 0 {
1231                                                         supportedExtensionMessages.Add(name, 1)
1232                                                 }
1233                                                 c.PeerExtensionIDs[name] = byte(id)
1234                                         }
1235                                 }
1236                                 metadata_sizeUntyped, ok := d["metadata_size"]
1237                                 if ok {
1238                                         metadata_size, ok := metadata_sizeUntyped.(int64)
1239                                         if !ok {
1240                                                 log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
1241                                         } else {
1242                                                 t.setMetadataSize(metadata_size, cl)
1243                                         }
1244                                 }
1245                                 if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
1246                                         cl.requestPendingMetadata(t, c)
1247                                 }
1248                         case metadataExtendedId:
1249                                 err = cl.gotMetadataExtensionMsg(msg.ExtendedPayload, t, c)
1250                                 if err != nil {
1251                                         err = fmt.Errorf("error handling metadata extension message: %s", err)
1252                                 }
1253                         case pexExtendedId:
1254                                 if cl.config.DisablePEX {
1255                                         break
1256                                 }
1257                                 var pexMsg peerExchangeMessage
1258                                 err = bencode.Unmarshal(msg.ExtendedPayload, &pexMsg)
1259                                 if err != nil {
1260                                         err = fmt.Errorf("error unmarshalling PEX message: %s", err)
1261                                         break
1262                                 }
1263                                 go func() {
1264                                         cl.mu.Lock()
1265                                         cl.addPeers(t, func() (ret []Peer) {
1266                                                 for i, cp := range pexMsg.Added {
1267                                                         p := Peer{
1268                                                                 IP:     make([]byte, 4),
1269                                                                 Port:   cp.Port,
1270                                                                 Source: peerSourcePEX,
1271                                                         }
1272                                                         if i < len(pexMsg.AddedFlags) && pexMsg.AddedFlags[i]&0x01 != 0 {
1273                                                                 p.SupportsEncryption = true
1274                                                         }
1275                                                         missinggo.CopyExact(p.IP, cp.IP[:])
1276                                                         ret = append(ret, p)
1277                                                 }
1278                                                 return
1279                                         }())
1280                                         cl.mu.Unlock()
1281                                 }()
1282                         default:
1283                                 err = fmt.Errorf("unexpected extended message ID: %v", msg.ExtendedID)
1284                         }
1285                         if err != nil {
1286                                 // That client uses its own extension IDs for outgoing message
1287                                 // types, which is incorrect.
1288                                 if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) ||
1289                                         strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
1290                                         return nil
1291                                 }
1292                         }
1293                 case pp.Port:
1294                         if cl.dHT == nil {
1295                                 break
1296                         }
1297                         pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
1298                         if err != nil {
1299                                 panic(err)
1300                         }
1301                         if msg.Port != 0 {
1302                                 pingAddr.Port = int(msg.Port)
1303                         }
1304                         cl.dHT.Ping(pingAddr)
1305                 default:
1306                         err = fmt.Errorf("received unknown message type: %#v", msg.Type)
1307                 }
1308                 if err != nil {
1309                         return err
1310                 }
1311         }
1312 }
1313
1314 // Returns true if connection is removed from torrent.Conns.
1315 func (cl *Client) deleteConnection(t *Torrent, c *connection) bool {
1316         for i0, _c := range t.conns {
1317                 if _c != c {
1318                         continue
1319                 }
1320                 i1 := len(t.conns) - 1
1321                 if i0 != i1 {
1322                         t.conns[i0] = t.conns[i1]
1323                 }
1324                 t.conns = t.conns[:i1]
1325                 return true
1326         }
1327         return false
1328 }
1329
1330 func (cl *Client) dropConnection(t *Torrent, c *connection) {
1331         cl.event.Broadcast()
1332         c.Close()
1333         if cl.deleteConnection(t, c) {
1334                 cl.openNewConns(t)
1335         }
1336 }
1337
1338 // Returns true if the connection is added.
1339 func (cl *Client) addConnection(t *Torrent, c *connection) bool {
1340         if cl.closed.IsSet() {
1341                 return false
1342         }
1343         select {
1344         case <-t.ceasingNetworking:
1345                 return false
1346         default:
1347         }
1348         if !cl.wantConns(t) {
1349                 return false
1350         }
1351         for _, c0 := range t.conns {
1352                 if c.PeerID == c0.PeerID {
1353                         // Already connected to a client with that ID.
1354                         duplicateClientConns.Add(1)
1355                         return false
1356                 }
1357         }
1358         if len(t.conns) >= socketsPerTorrent {
1359                 c := t.worstBadConn(cl)
1360                 if c == nil {
1361                         return false
1362                 }
1363                 if cl.config.Debug && missinggo.CryHeard() {
1364                         log.Printf("%s: dropping connection to make room for new one:\n    %s", t, c)
1365                 }
1366                 c.Close()
1367                 cl.deleteConnection(t, c)
1368         }
1369         if len(t.conns) >= socketsPerTorrent {
1370                 panic(len(t.conns))
1371         }
1372         t.conns = append(t.conns, c)
1373         c.t = t
1374         return true
1375 }
1376
1377 func (cl *Client) usefulConn(t *Torrent, c *connection) bool {
1378         if c.closed.IsSet() {
1379                 return false
1380         }
1381         if !t.haveInfo() {
1382                 return c.supportsExtension("ut_metadata")
1383         }
1384         if cl.seeding(t) {
1385                 return c.PeerInterested
1386         }
1387         return t.connHasWantedPieces(c)
1388 }
1389
1390 func (cl *Client) wantConns(t *Torrent) bool {
1391         if !cl.seeding(t) && !t.needData() {
1392                 return false
1393         }
1394         if len(t.conns) < socketsPerTorrent {
1395                 return true
1396         }
1397         return t.worstBadConn(cl) != nil
1398 }
1399
1400 func (cl *Client) openNewConns(t *Torrent) {
1401         select {
1402         case <-t.ceasingNetworking:
1403                 return
1404         default:
1405         }
1406         for len(t.peers) != 0 {
1407                 if !cl.wantConns(t) {
1408                         return
1409                 }
1410                 if len(t.halfOpen) >= cl.halfOpenLimit {
1411                         return
1412                 }
1413                 var (
1414                         k peersKey
1415                         p Peer
1416                 )
1417                 for k, p = range t.peers {
1418                         break
1419                 }
1420                 delete(t.peers, k)
1421                 cl.initiateConn(p, t)
1422         }
1423         t.wantPeers.Broadcast()
1424 }
1425
1426 func (cl *Client) addPeers(t *Torrent, peers []Peer) {
1427         for _, p := range peers {
1428                 if cl.dopplegangerAddr(net.JoinHostPort(
1429                         p.IP.String(),
1430                         strconv.FormatInt(int64(p.Port), 10),
1431                 )) {
1432                         continue
1433                 }
1434                 if _, ok := cl.ipBlockRange(p.IP); ok {
1435                         continue
1436                 }
1437                 if p.Port == 0 {
1438                         // The spec says to scrub these yourselves. Fine.
1439                         continue
1440                 }
1441                 t.addPeer(p, cl)
1442         }
1443 }
1444
1445 func (cl *Client) cachedMetaInfoFilename(ih metainfo.Hash) string {
1446         return filepath.Join(cl.configDir(), "torrents", ih.HexString()+".torrent")
1447 }
1448
1449 func (cl *Client) saveTorrentFile(t *Torrent) error {
1450         path := cl.cachedMetaInfoFilename(t.infoHash)
1451         os.MkdirAll(filepath.Dir(path), 0777)
1452         f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
1453         if err != nil {
1454                 return fmt.Errorf("error opening file: %s", err)
1455         }
1456         defer f.Close()
1457         e := bencode.NewEncoder(f)
1458         err = e.Encode(t.metainfo())
1459         if err != nil {
1460                 return fmt.Errorf("error marshalling metainfo: %s", err)
1461         }
1462         mi, err := cl.torrentCacheMetaInfo(t.infoHash)
1463         if err != nil {
1464                 // For example, a script kiddy makes us load too many files, and we're
1465                 // able to save the torrent, but not load it again to check it.
1466                 return nil
1467         }
1468         if !bytes.Equal(mi.Info.Hash.Bytes(), t.infoHash[:]) {
1469                 log.Fatalf("%x != %x", mi.Info.Hash, t.infoHash[:])
1470         }
1471         return nil
1472 }
1473
1474 func (cl *Client) setMetaData(t *Torrent, md *metainfo.Info, bytes []byte) (err error) {
1475         err = t.setMetadata(md, bytes)
1476         if err != nil {
1477                 return
1478         }
1479         if !cl.config.DisableMetainfoCache {
1480                 if err := cl.saveTorrentFile(t); err != nil {
1481                         log.Printf("error saving torrent file for %s: %s", t, err)
1482                 }
1483         }
1484         cl.event.Broadcast()
1485         close(t.gotMetainfo)
1486         return
1487 }
1488
1489 // Prepare a Torrent without any attachment to a Client. That means we can
1490 // initialize fields all fields that don't require the Client without locking
1491 // it.
1492 func newTorrent(ih metainfo.Hash) (t *Torrent) {
1493         t = &Torrent{
1494                 infoHash:  ih,
1495                 chunkSize: defaultChunkSize,
1496                 peers:     make(map[peersKey]Peer),
1497
1498                 closing:           make(chan struct{}),
1499                 ceasingNetworking: make(chan struct{}),
1500
1501                 gotMetainfo: make(chan struct{}),
1502
1503                 halfOpen:          make(map[string]struct{}),
1504                 pieceStateChanges: pubsub.NewPubSub(),
1505         }
1506         return
1507 }
1508
1509 func init() {
1510         // For shuffling the tracker tiers.
1511         mathRand.Seed(time.Now().Unix())
1512 }
1513
1514 type trackerTier []string
1515
1516 // The trackers within each tier must be shuffled before use.
1517 // http://stackoverflow.com/a/12267471/149482
1518 // http://www.bittorrent.org/beps/bep_0012.html#order-of-processing
1519 func shuffleTier(tier trackerTier) {
1520         for i := range tier {
1521                 j := mathRand.Intn(i + 1)
1522                 tier[i], tier[j] = tier[j], tier[i]
1523         }
1524 }
1525
1526 func copyTrackers(base []trackerTier) (copy []trackerTier) {
1527         for _, tier := range base {
1528                 copy = append(copy, append(trackerTier(nil), tier...))
1529         }
1530         return
1531 }
1532
1533 func mergeTier(tier trackerTier, newURLs []string) trackerTier {
1534 nextURL:
1535         for _, url := range newURLs {
1536                 for _, trURL := range tier {
1537                         if trURL == url {
1538                                 continue nextURL
1539                         }
1540                 }
1541                 tier = append(tier, url)
1542         }
1543         return tier
1544 }
1545
1546 // A file-like handle to some torrent data resource.
1547 type Handle interface {
1548         io.Reader
1549         io.Seeker
1550         io.Closer
1551         io.ReaderAt
1552 }
1553
1554 // Returns nil metainfo if it isn't in the cache. Checks that the retrieved
1555 // metainfo has the correct infohash.
1556 func (cl *Client) torrentCacheMetaInfo(ih metainfo.Hash) (mi *metainfo.MetaInfo, err error) {
1557         if cl.config.DisableMetainfoCache {
1558                 return
1559         }
1560         f, err := os.Open(cl.cachedMetaInfoFilename(ih))
1561         if err != nil {
1562                 if os.IsNotExist(err) {
1563                         err = nil
1564                 }
1565                 return
1566         }
1567         defer f.Close()
1568         dec := bencode.NewDecoder(f)
1569         err = dec.Decode(&mi)
1570         if err != nil {
1571                 return
1572         }
1573         if !bytes.Equal(mi.Info.Hash.Bytes(), ih[:]) {
1574                 err = fmt.Errorf("cached torrent has wrong infohash: %x != %x", mi.Info.Hash, ih[:])
1575                 return
1576         }
1577         return
1578 }
1579
1580 // Specifies a new torrent for adding to a client. There are helpers for
1581 // magnet URIs and torrent metainfo files.
1582 type TorrentSpec struct {
1583         // The tiered tracker URIs.
1584         Trackers [][]string
1585         InfoHash metainfo.Hash
1586         Info     *metainfo.InfoEx
1587         // The name to use if the Name field from the Info isn't available.
1588         DisplayName string
1589         // The chunk size to use for outbound requests. Defaults to 16KiB if not
1590         // set.
1591         ChunkSize int
1592         Storage   storage.I
1593 }
1594
1595 func TorrentSpecFromMagnetURI(uri string) (spec *TorrentSpec, err error) {
1596         m, err := metainfo.ParseMagnetURI(uri)
1597         if err != nil {
1598                 return
1599         }
1600         spec = &TorrentSpec{
1601                 Trackers:    [][]string{m.Trackers},
1602                 DisplayName: m.DisplayName,
1603                 InfoHash:    m.InfoHash,
1604         }
1605         return
1606 }
1607
1608 func TorrentSpecFromMetaInfo(mi *metainfo.MetaInfo) (spec *TorrentSpec) {
1609         spec = &TorrentSpec{
1610                 Trackers:    mi.AnnounceList,
1611                 Info:        &mi.Info,
1612                 DisplayName: mi.Info.Name,
1613         }
1614
1615         if len(spec.Trackers) == 0 {
1616                 spec.Trackers = [][]string{[]string{mi.Announce}}
1617         } else {
1618                 spec.Trackers[0] = append(spec.Trackers[0], mi.Announce)
1619         }
1620
1621         missinggo.CopyExact(&spec.InfoHash, mi.Info.Hash)
1622         return
1623 }
1624
1625 // Add or merge a torrent spec. If the torrent is already present, the
1626 // trackers will be merged with the existing ones. If the Info isn't yet
1627 // known, it will be set. The display name is replaced if the new spec
1628 // provides one. Returns new if the torrent wasn't already in the client.
1629 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1630         cl.mu.Lock()
1631         defer cl.mu.Unlock()
1632
1633         t, ok := cl.torrents[spec.InfoHash]
1634         if !ok {
1635                 new = true
1636
1637                 // TODO: This doesn't belong in the core client, it's more of a
1638                 // helper.
1639                 if _, ok := cl.bannedTorrents[spec.InfoHash]; ok {
1640                         err = errors.New("banned torrent")
1641                         return
1642                 }
1643                 // TODO: Tidy this up?
1644                 t = newTorrent(spec.InfoHash)
1645                 t.cl = cl
1646                 t.wantPeers.L = &cl.mu
1647                 if spec.ChunkSize != 0 {
1648                         t.chunkSize = pp.Integer(spec.ChunkSize)
1649                 }
1650                 t.storageOpener = spec.Storage
1651                 if t.storageOpener == nil {
1652                         t.storageOpener = cl.defaultStorage
1653                 }
1654         }
1655         if spec.DisplayName != "" {
1656                 t.setDisplayName(spec.DisplayName)
1657         }
1658         // Try to merge in info we have on the torrent. Any err left will
1659         // terminate the function.
1660         if t.info == nil {
1661                 if spec.Info != nil {
1662                         err = cl.setMetaData(t, &spec.Info.Info, spec.Info.Bytes)
1663                 } else {
1664                         var mi *metainfo.MetaInfo
1665                         mi, err = cl.torrentCacheMetaInfo(spec.InfoHash)
1666                         if err != nil {
1667                                 log.Printf("error getting cached metainfo: %s", err)
1668                                 err = nil
1669                         } else if mi != nil {
1670                                 t.addTrackers(mi.AnnounceList)
1671                                 err = cl.setMetaData(t, &mi.Info.Info, mi.Info.Bytes)
1672                         }
1673                 }
1674         }
1675         if err != nil {
1676                 return
1677         }
1678         t.addTrackers(spec.Trackers)
1679
1680         cl.torrents[spec.InfoHash] = t
1681         t.maybeNewConns()
1682
1683         // From this point onwards, we can consider the torrent a part of the
1684         // client.
1685         if new {
1686                 if !cl.config.DisableTrackers {
1687                         go cl.announceTorrentTrackers(t)
1688                 }
1689                 if cl.dHT != nil {
1690                         go cl.announceTorrentDHT(t, true)
1691                 }
1692         }
1693         return
1694 }
1695
1696 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1697         t, ok := cl.torrents[infoHash]
1698         if !ok {
1699                 err = fmt.Errorf("no such torrent")
1700                 return
1701         }
1702         err = t.close()
1703         if err != nil {
1704                 panic(err)
1705         }
1706         delete(cl.torrents, infoHash)
1707         return
1708 }
1709
1710 // Returns true when peers are required, or false if the torrent is closing.
1711 func (cl *Client) waitWantPeers(t *Torrent) bool {
1712         cl.mu.Lock()
1713         defer cl.mu.Unlock()
1714         for {
1715                 select {
1716                 case <-t.ceasingNetworking:
1717                         return false
1718                 default:
1719                 }
1720                 if len(t.peers) > torrentPeersLowWater {
1721                         goto wait
1722                 }
1723                 if t.needData() || cl.seeding(t) {
1724                         return true
1725                 }
1726         wait:
1727                 t.wantPeers.Wait()
1728         }
1729 }
1730
1731 // Returns whether the client should make effort to seed the torrent.
1732 func (cl *Client) seeding(t *Torrent) bool {
1733         if cl.config.NoUpload {
1734                 return false
1735         }
1736         if !cl.config.Seed {
1737                 return false
1738         }
1739         if t.needData() {
1740                 return false
1741         }
1742         return true
1743 }
1744
1745 func (cl *Client) announceTorrentDHT(t *Torrent, impliedPort bool) {
1746         for cl.waitWantPeers(t) {
1747                 // log.Printf("getting peers for %q from DHT", t)
1748                 ps, err := cl.dHT.Announce(string(t.infoHash[:]), cl.incomingPeerPort(), impliedPort)
1749                 if err != nil {
1750                         log.Printf("error getting peers from dht: %s", err)
1751                         return
1752                 }
1753                 // Count all the unique addresses we got during this announce.
1754                 allAddrs := make(map[string]struct{})
1755         getPeers:
1756                 for {
1757                         select {
1758                         case v, ok := <-ps.Peers:
1759                                 if !ok {
1760                                         break getPeers
1761                                 }
1762                                 addPeers := make([]Peer, 0, len(v.Peers))
1763                                 for _, cp := range v.Peers {
1764                                         if cp.Port == 0 {
1765                                                 // Can't do anything with this.
1766                                                 continue
1767                                         }
1768                                         addPeers = append(addPeers, Peer{
1769                                                 IP:     cp.IP[:],
1770                                                 Port:   cp.Port,
1771                                                 Source: peerSourceDHT,
1772                                         })
1773                                         key := (&net.UDPAddr{
1774                                                 IP:   cp.IP[:],
1775                                                 Port: cp.Port,
1776                                         }).String()
1777                                         allAddrs[key] = struct{}{}
1778                                 }
1779                                 cl.mu.Lock()
1780                                 cl.addPeers(t, addPeers)
1781                                 numPeers := len(t.peers)
1782                                 cl.mu.Unlock()
1783                                 if numPeers >= torrentPeersHighWater {
1784                                         break getPeers
1785                                 }
1786                         case <-t.ceasingNetworking:
1787                                 ps.Close()
1788                                 return
1789                         }
1790                 }
1791                 ps.Close()
1792                 // log.Printf("finished DHT peer scrape for %s: %d peers", t, len(allAddrs))
1793         }
1794 }
1795
1796 func (cl *Client) trackerBlockedUnlocked(trRawURL string) (blocked bool, err error) {
1797         url_, err := url.Parse(trRawURL)
1798         if err != nil {
1799                 return
1800         }
1801         host, _, err := net.SplitHostPort(url_.Host)
1802         if err != nil {
1803                 host = url_.Host
1804         }
1805         addr, err := net.ResolveIPAddr("ip", host)
1806         if err != nil {
1807                 return
1808         }
1809         cl.mu.RLock()
1810         _, blocked = cl.ipBlockRange(addr.IP)
1811         cl.mu.RUnlock()
1812         return
1813 }
1814
1815 func (cl *Client) announceTorrentSingleTracker(tr string, req *tracker.AnnounceRequest, t *Torrent) error {
1816         blocked, err := cl.trackerBlockedUnlocked(tr)
1817         if err != nil {
1818                 return fmt.Errorf("error determining if tracker blocked: %s", err)
1819         }
1820         if blocked {
1821                 return fmt.Errorf("tracker blocked: %s", tr)
1822         }
1823         resp, err := tracker.Announce(tr, req)
1824         if err != nil {
1825                 return fmt.Errorf("error announcing: %s", err)
1826         }
1827         var peers []Peer
1828         for _, peer := range resp.Peers {
1829                 peers = append(peers, Peer{
1830                         IP:   peer.IP,
1831                         Port: peer.Port,
1832                 })
1833         }
1834         cl.mu.Lock()
1835         cl.addPeers(t, peers)
1836         cl.mu.Unlock()
1837
1838         // log.Printf("%s: %d new peers from %s", t, len(peers), tr)
1839
1840         time.Sleep(time.Second * time.Duration(resp.Interval))
1841         return nil
1842 }
1843
1844 func (cl *Client) announceTorrentTrackersFastStart(req *tracker.AnnounceRequest, trackers []trackerTier, t *Torrent) (atLeastOne bool) {
1845         oks := make(chan bool)
1846         outstanding := 0
1847         for _, tier := range trackers {
1848                 for _, tr := range tier {
1849                         outstanding++
1850                         go func(tr string) {
1851                                 err := cl.announceTorrentSingleTracker(tr, req, t)
1852                                 oks <- err == nil
1853                         }(tr)
1854                 }
1855         }
1856         for outstanding > 0 {
1857                 ok := <-oks
1858                 outstanding--
1859                 if ok {
1860                         atLeastOne = true
1861                 }
1862         }
1863         return
1864 }
1865
1866 // Announce torrent to its trackers.
1867 func (cl *Client) announceTorrentTrackers(t *Torrent) {
1868         req := tracker.AnnounceRequest{
1869                 Event:    tracker.Started,
1870                 NumWant:  -1,
1871                 Port:     uint16(cl.incomingPeerPort()),
1872                 PeerId:   cl.peerID,
1873                 InfoHash: t.infoHash,
1874         }
1875         if !cl.waitWantPeers(t) {
1876                 return
1877         }
1878         cl.mu.RLock()
1879         req.Left = t.bytesLeftAnnounce()
1880         trackers := t.trackers
1881         cl.mu.RUnlock()
1882         if cl.announceTorrentTrackersFastStart(&req, trackers, t) {
1883                 req.Event = tracker.None
1884         }
1885 newAnnounce:
1886         for cl.waitWantPeers(t) {
1887                 cl.mu.RLock()
1888                 req.Left = t.bytesLeftAnnounce()
1889                 trackers = t.trackers
1890                 cl.mu.RUnlock()
1891                 numTrackersTried := 0
1892                 for _, tier := range trackers {
1893                         for trIndex, tr := range tier {
1894                                 numTrackersTried++
1895                                 err := cl.announceTorrentSingleTracker(tr, &req, t)
1896                                 if err != nil {
1897                                         continue
1898                                 }
1899                                 // Float the successful announce to the top of the tier. If
1900                                 // the trackers list has been changed, we'll be modifying an
1901                                 // old copy so it won't matter.
1902                                 cl.mu.Lock()
1903                                 tier[0], tier[trIndex] = tier[trIndex], tier[0]
1904                                 cl.mu.Unlock()
1905
1906                                 req.Event = tracker.None
1907                                 continue newAnnounce
1908                         }
1909                 }
1910                 if numTrackersTried != 0 {
1911                         log.Printf("%s: all trackers failed", t)
1912                 }
1913                 // TODO: Wait until trackers are added if there are none.
1914                 time.Sleep(10 * time.Second)
1915         }
1916 }
1917
1918 func (cl *Client) allTorrentsCompleted() bool {
1919         for _, t := range cl.torrents {
1920                 if !t.haveInfo() {
1921                         return false
1922                 }
1923                 if t.numPiecesCompleted() != t.numPieces() {
1924                         return false
1925                 }
1926         }
1927         return true
1928 }
1929
1930 // Returns true when all torrents are completely downloaded and false if the
1931 // client is stopped before that.
1932 func (cl *Client) WaitAll() bool {
1933         cl.mu.Lock()
1934         defer cl.mu.Unlock()
1935         for !cl.allTorrentsCompleted() {
1936                 if cl.closed.IsSet() {
1937                         return false
1938                 }
1939                 cl.event.Wait()
1940         }
1941         return true
1942 }
1943
1944 // Handle a received chunk from a peer.
1945 func (cl *Client) downloadedChunk(t *Torrent, c *connection, msg *pp.Message) {
1946         chunksReceived.Add(1)
1947
1948         req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
1949
1950         // Request has been satisfied.
1951         if cl.connDeleteRequest(t, c, req) {
1952                 defer c.updateRequests()
1953         } else {
1954                 unexpectedChunksReceived.Add(1)
1955         }
1956
1957         index := int(req.Index)
1958         piece := &t.pieces[index]
1959
1960         // Do we actually want this chunk?
1961         if !t.wantPiece(req) {
1962                 unwantedChunksReceived.Add(1)
1963                 c.UnwantedChunksReceived++
1964                 return
1965         }
1966
1967         c.UsefulChunksReceived++
1968         c.lastUsefulChunkReceived = time.Now()
1969
1970         cl.upload(t, c)
1971
1972         // Need to record that it hasn't been written yet, before we attempt to do
1973         // anything with it.
1974         piece.incrementPendingWrites()
1975         // Record that we have the chunk.
1976         piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
1977
1978         // Cancel pending requests for this chunk.
1979         for _, c := range t.conns {
1980                 if cl.connCancel(t, c, req) {
1981                         c.updateRequests()
1982                 }
1983         }
1984
1985         cl.mu.Unlock()
1986         // Write the chunk out.
1987         err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
1988         cl.mu.Lock()
1989
1990         piece.decrementPendingWrites()
1991
1992         if err != nil {
1993                 log.Printf("%s: error writing chunk %v: %s", t, req, err)
1994                 t.pendRequest(req)
1995                 t.updatePieceCompletion(int(msg.Index))
1996                 return
1997         }
1998
1999         // It's important that the piece is potentially queued before we check if
2000         // the piece is still wanted, because if it is queued, it won't be wanted.
2001         if t.pieceAllDirty(index) {
2002                 cl.queuePieceCheck(t, int(req.Index))
2003         }
2004
2005         if c.peerTouchedPieces == nil {
2006                 c.peerTouchedPieces = make(map[int]struct{})
2007         }
2008         c.peerTouchedPieces[index] = struct{}{}
2009
2010         cl.event.Broadcast()
2011         t.publishPieceChange(int(req.Index))
2012         return
2013 }
2014
2015 // Return the connections that touched a piece, and clear the entry while
2016 // doing it.
2017 func (cl *Client) reapPieceTouches(t *Torrent, piece int) (ret []*connection) {
2018         for _, c := range t.conns {
2019                 if _, ok := c.peerTouchedPieces[piece]; ok {
2020                         ret = append(ret, c)
2021                         delete(c.peerTouchedPieces, piece)
2022                 }
2023         }
2024         return
2025 }
2026
2027 func (cl *Client) pieceHashed(t *Torrent, piece int, correct bool) {
2028         p := &t.pieces[piece]
2029         if p.EverHashed {
2030                 // Don't score the first time a piece is hashed, it could be an
2031                 // initial check.
2032                 if correct {
2033                         pieceHashedCorrect.Add(1)
2034                 } else {
2035                         log.Printf("%s: piece %d (%x) failed hash", t, piece, p.Hash)
2036                         pieceHashedNotCorrect.Add(1)
2037                 }
2038         }
2039         p.EverHashed = true
2040         touchers := cl.reapPieceTouches(t, piece)
2041         if correct {
2042                 err := p.Storage().MarkComplete()
2043                 if err != nil {
2044                         log.Printf("%T: error completing piece %d: %s", t.storage, piece, err)
2045                 }
2046                 t.updatePieceCompletion(piece)
2047         } else if len(touchers) != 0 {
2048                 log.Printf("dropping %d conns that touched piece", len(touchers))
2049                 for _, c := range touchers {
2050                         cl.dropConnection(t, c)
2051                 }
2052         }
2053         cl.pieceChanged(t, piece)
2054 }
2055
2056 func (cl *Client) onCompletedPiece(t *Torrent, piece int) {
2057         t.pendingPieces.Remove(piece)
2058         t.pendAllChunkSpecs(piece)
2059         for _, conn := range t.conns {
2060                 conn.Have(piece)
2061                 for r := range conn.Requests {
2062                         if int(r.Index) == piece {
2063                                 conn.Cancel(r)
2064                         }
2065                 }
2066                 // Could check here if peer doesn't have piece, but due to caching
2067                 // some peers may have said they have a piece but they don't.
2068                 cl.upload(t, conn)
2069         }
2070 }
2071
2072 func (cl *Client) onFailedPiece(t *Torrent, piece int) {
2073         if t.pieceAllDirty(piece) {
2074                 t.pendAllChunkSpecs(piece)
2075         }
2076         if !t.wantPieceIndex(piece) {
2077                 return
2078         }
2079         cl.openNewConns(t)
2080         for _, conn := range t.conns {
2081                 if conn.PeerHasPiece(piece) {
2082                         conn.updateRequests()
2083                 }
2084         }
2085 }
2086
2087 func (cl *Client) pieceChanged(t *Torrent, piece int) {
2088         correct := t.pieceComplete(piece)
2089         defer cl.event.Broadcast()
2090         if correct {
2091                 cl.onCompletedPiece(t, piece)
2092         } else {
2093                 cl.onFailedPiece(t, piece)
2094         }
2095         if t.updatePiecePriority(piece) {
2096                 t.piecePriorityChanged(piece)
2097         }
2098         t.publishPieceChange(piece)
2099 }
2100
2101 func (cl *Client) verifyPiece(t *Torrent, piece int) {
2102         cl.mu.Lock()
2103         defer cl.mu.Unlock()
2104         p := &t.pieces[piece]
2105         for p.Hashing || t.storage == nil {
2106                 cl.event.Wait()
2107         }
2108         p.QueuedForHash = false
2109         if t.isClosed() || t.pieceComplete(piece) {
2110                 t.updatePiecePriority(piece)
2111                 t.publishPieceChange(piece)
2112                 return
2113         }
2114         p.Hashing = true
2115         t.publishPieceChange(piece)
2116         cl.mu.Unlock()
2117         sum := t.hashPiece(piece)
2118         cl.mu.Lock()
2119         select {
2120         case <-t.closing:
2121                 return
2122         default:
2123         }
2124         p.Hashing = false
2125         cl.pieceHashed(t, piece, sum == p.Hash)
2126 }
2127
2128 // Returns handles to all the torrents loaded in the Client.
2129 func (cl *Client) Torrents() (ret []*Torrent) {
2130         cl.mu.Lock()
2131         for _, t := range cl.torrents {
2132                 ret = append(ret, t)
2133         }
2134         cl.mu.Unlock()
2135         return
2136 }
2137
2138 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
2139         spec, err := TorrentSpecFromMagnetURI(uri)
2140         if err != nil {
2141                 return
2142         }
2143         T, _, err = cl.AddTorrentSpec(spec)
2144         return
2145 }
2146
2147 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
2148         T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
2149         var ss []string
2150         missinggo.CastSlice(&ss, mi.Nodes)
2151         cl.AddDHTNodes(ss)
2152         return
2153 }
2154
2155 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
2156         mi, err := metainfo.LoadFromFile(filename)
2157         if err != nil {
2158                 return
2159         }
2160         return cl.AddTorrent(mi)
2161 }
2162
2163 func (cl *Client) DHT() *dht.Server {
2164         return cl.dHT
2165 }
2166
2167 func (cl *Client) AddDHTNodes(nodes []string) {
2168         for _, n := range nodes {
2169                 hmp := missinggo.SplitHostMaybePort(n)
2170                 ip := net.ParseIP(hmp.Host)
2171                 if ip == nil {
2172                         log.Printf("won't add DHT node with bad IP: %q", hmp.Host)
2173                         continue
2174                 }
2175                 ni := dht.NodeInfo{
2176                         Addr: dht.NewAddr(&net.UDPAddr{
2177                                 IP:   ip,
2178                                 Port: hmp.Port,
2179                         }),
2180                 }
2181                 cl.DHT().AddNode(ni)
2182         }
2183 }