]> Sergey Matveev's repositories - btrtrc.git/blob - client.go
fa63e446d494e92735254af68a7896aa94867b60
[btrtrc.git] / client.go
1 package torrent
2
3 import (
4         "bufio"
5         "bytes"
6         "crypto/rand"
7         "encoding/hex"
8         "errors"
9         "fmt"
10         "io"
11         "log"
12         "math/big"
13         mathRand "math/rand"
14         "net"
15         "net/url"
16         "sort"
17         "strconv"
18         "strings"
19         "time"
20
21         "github.com/anacrolix/missinggo"
22         "github.com/anacrolix/missinggo/pproffd"
23         "github.com/anacrolix/missinggo/pubsub"
24         "github.com/anacrolix/sync"
25         "github.com/anacrolix/utp"
26         "github.com/dustin/go-humanize"
27
28         "github.com/anacrolix/torrent/bencode"
29         "github.com/anacrolix/torrent/dht"
30         "github.com/anacrolix/torrent/dht/krpc"
31         "github.com/anacrolix/torrent/iplist"
32         "github.com/anacrolix/torrent/metainfo"
33         "github.com/anacrolix/torrent/mse"
34         pp "github.com/anacrolix/torrent/peer_protocol"
35         "github.com/anacrolix/torrent/storage"
36 )
37
38 // Currently doesn't really queue, but should in the future.
39 func (cl *Client) queuePieceCheck(t *Torrent, pieceIndex int) {
40         piece := &t.pieces[pieceIndex]
41         if piece.QueuedForHash {
42                 return
43         }
44         piece.QueuedForHash = true
45         t.publishPieceChange(pieceIndex)
46         go cl.verifyPiece(t, pieceIndex)
47 }
48
49 // Queue a piece check if one isn't already queued, and the piece has never
50 // been checked before.
51 func (cl *Client) queueFirstHash(t *Torrent, piece int) {
52         p := &t.pieces[piece]
53         if p.EverHashed || p.Hashing || p.QueuedForHash || t.pieceComplete(piece) {
54                 return
55         }
56         cl.queuePieceCheck(t, piece)
57 }
58
59 // Clients contain zero or more Torrents. A Client manages a blocklist, the
60 // TCP/UDP protocol ports, and DHT as desired.
61 type Client struct {
62         halfOpenLimit int
63         peerID        [20]byte
64         // The net.Addr.String part that should be common to all active listeners.
65         listenAddr     string
66         tcpListener    net.Listener
67         utpSock        *utp.Socket
68         dHT            *dht.Server
69         ipBlockList    iplist.Ranger
70         config         Config
71         extensionBytes peerExtensionBytes
72         // Set of addresses that have our client ID. This intentionally will
73         // include ourselves if we end up trying to connect to our own address
74         // through legitimate channels.
75         dopplegangerAddrs map[string]struct{}
76         badPeerIPs        map[string]struct{}
77
78         defaultStorage storage.Client
79
80         mu     sync.RWMutex
81         event  sync.Cond
82         closed missinggo.Event
83
84         torrents map[metainfo.Hash]*Torrent
85 }
86
87 func (cl *Client) IPBlockList() iplist.Ranger {
88         cl.mu.Lock()
89         defer cl.mu.Unlock()
90         return cl.ipBlockList
91 }
92
93 func (cl *Client) SetIPBlockList(list iplist.Ranger) {
94         cl.mu.Lock()
95         defer cl.mu.Unlock()
96         cl.ipBlockList = list
97         if cl.dHT != nil {
98                 cl.dHT.SetIPBlockList(list)
99         }
100 }
101
102 func (cl *Client) PeerID() string {
103         return string(cl.peerID[:])
104 }
105
106 type torrentAddr string
107
108 func (me torrentAddr) Network() string { return "" }
109
110 func (me torrentAddr) String() string { return string(me) }
111
112 func (cl *Client) ListenAddr() net.Addr {
113         if cl.listenAddr == "" {
114                 return nil
115         }
116         return torrentAddr(cl.listenAddr)
117 }
118
119 type hashSorter struct {
120         Hashes []metainfo.Hash
121 }
122
123 func (hs hashSorter) Len() int {
124         return len(hs.Hashes)
125 }
126
127 func (hs hashSorter) Less(a, b int) bool {
128         return (&big.Int{}).SetBytes(hs.Hashes[a][:]).Cmp((&big.Int{}).SetBytes(hs.Hashes[b][:])) < 0
129 }
130
131 func (hs hashSorter) Swap(a, b int) {
132         hs.Hashes[a], hs.Hashes[b] = hs.Hashes[b], hs.Hashes[a]
133 }
134
135 func (cl *Client) sortedTorrents() (ret []*Torrent) {
136         var hs hashSorter
137         for ih := range cl.torrents {
138                 hs.Hashes = append(hs.Hashes, ih)
139         }
140         sort.Sort(hs)
141         for _, ih := range hs.Hashes {
142                 ret = append(ret, cl.torrent(ih))
143         }
144         return
145 }
146
147 // Writes out a human readable status of the client, such as for writing to a
148 // HTTP status page.
149 func (cl *Client) WriteStatus(_w io.Writer) {
150         cl.mu.RLock()
151         defer cl.mu.RUnlock()
152         w := bufio.NewWriter(_w)
153         defer w.Flush()
154         if addr := cl.ListenAddr(); addr != nil {
155                 fmt.Fprintf(w, "Listening on %s\n", cl.ListenAddr())
156         } else {
157                 fmt.Fprintln(w, "Not listening!")
158         }
159         fmt.Fprintf(w, "Peer ID: %+q\n", cl.peerID)
160         fmt.Fprintf(w, "Banned IPs: %d\n", len(cl.badPeerIPs))
161         if cl.dHT != nil {
162                 dhtStats := cl.dHT.Stats()
163                 fmt.Fprintf(w, "DHT nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
164                 fmt.Fprintf(w, "DHT Server ID: %x\n", cl.dHT.ID())
165                 fmt.Fprintf(w, "DHT port: %d\n", missinggo.AddrPort(cl.dHT.Addr()))
166                 fmt.Fprintf(w, "DHT announces: %d\n", dhtStats.ConfirmedAnnounces)
167                 fmt.Fprintf(w, "Outstanding transactions: %d\n", dhtStats.OutstandingTransactions)
168         }
169         fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrents))
170         fmt.Fprintln(w)
171         for _, t := range cl.sortedTorrents() {
172                 if t.name() == "" {
173                         fmt.Fprint(w, "<unknown name>")
174                 } else {
175                         fmt.Fprint(w, t.name())
176                 }
177                 fmt.Fprint(w, "\n")
178                 if t.haveInfo() {
179                         fmt.Fprintf(w, "%f%% of %d bytes (%s)", 100*(1-float64(t.bytesLeft())/float64(t.length)), t.length, humanize.Bytes(uint64(t.length)))
180                 } else {
181                         w.WriteString("<missing metainfo>")
182                 }
183                 fmt.Fprint(w, "\n")
184                 t.writeStatus(w, cl)
185                 fmt.Fprintln(w)
186         }
187 }
188
189 func listenUTP(networkSuffix, addr string) (*utp.Socket, error) {
190         return utp.NewSocket("udp"+networkSuffix, addr)
191 }
192
193 func listenTCP(networkSuffix, addr string) (net.Listener, error) {
194         return net.Listen("tcp"+networkSuffix, addr)
195 }
196
197 func listenBothSameDynamicPort(networkSuffix, host string) (tcpL net.Listener, utpSock *utp.Socket, listenedAddr string, err error) {
198         for {
199                 tcpL, err = listenTCP(networkSuffix, net.JoinHostPort(host, "0"))
200                 if err != nil {
201                         return
202                 }
203                 listenedAddr = tcpL.Addr().String()
204                 utpSock, err = listenUTP(networkSuffix, listenedAddr)
205                 if err == nil {
206                         return
207                 }
208                 tcpL.Close()
209                 if !strings.Contains(err.Error(), "address already in use") {
210                         return
211                 }
212         }
213 }
214
215 // Listen to enabled protocols, ensuring ports match.
216 func listen(tcp, utp bool, networkSuffix, addr string) (tcpL net.Listener, utpSock *utp.Socket, listenedAddr string, err error) {
217         if addr == "" {
218                 addr = ":50007"
219         }
220         host, port, err := missinggo.ParseHostPort(addr)
221         if err != nil {
222                 return
223         }
224         if tcp && utp && port == 0 {
225                 // If both protocols are active, they need to have the same port.
226                 return listenBothSameDynamicPort(networkSuffix, host)
227         }
228         defer func() {
229                 if err != nil {
230                         listenedAddr = ""
231                 }
232         }()
233         if tcp {
234                 tcpL, err = listenTCP(networkSuffix, addr)
235                 if err != nil {
236                         return
237                 }
238                 defer func() {
239                         if err != nil {
240                                 tcpL.Close()
241                         }
242                 }()
243                 listenedAddr = tcpL.Addr().String()
244         }
245         if utp {
246                 utpSock, err = listenUTP(networkSuffix, addr)
247                 if err != nil {
248                         return
249                 }
250                 listenedAddr = utpSock.Addr().String()
251         }
252         return
253 }
254
255 // Creates a new client.
256 func NewClient(cfg *Config) (cl *Client, err error) {
257         if cfg == nil {
258                 cfg = &Config{}
259         }
260
261         defer func() {
262                 if err != nil {
263                         cl = nil
264                 }
265         }()
266         cl = &Client{
267                 halfOpenLimit:     socketsPerTorrent,
268                 config:            *cfg,
269                 defaultStorage:    cfg.DefaultStorage,
270                 dopplegangerAddrs: make(map[string]struct{}),
271                 torrents:          make(map[metainfo.Hash]*Torrent),
272         }
273         missinggo.CopyExact(&cl.extensionBytes, defaultExtensionBytes)
274         cl.event.L = &cl.mu
275         if cl.defaultStorage == nil {
276                 cl.defaultStorage = storage.NewFile(cfg.DataDir)
277         }
278         if cfg.IPBlocklist != nil {
279                 cl.ipBlockList = cfg.IPBlocklist
280         }
281
282         if cfg.PeerID != "" {
283                 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
284         } else {
285                 o := copy(cl.peerID[:], bep20)
286                 _, err = rand.Read(cl.peerID[o:])
287                 if err != nil {
288                         panic("error generating peer id")
289                 }
290         }
291
292         cl.tcpListener, cl.utpSock, cl.listenAddr, err = listen(
293                 !cl.config.DisableTCP,
294                 !cl.config.DisableUTP,
295                 func() string {
296                         if cl.config.DisableIPv6 {
297                                 return "4"
298                         } else {
299                                 return ""
300                         }
301                 }(),
302                 cl.config.ListenAddr)
303         if err != nil {
304                 return
305         }
306         if cl.tcpListener != nil {
307                 go cl.acceptConnections(cl.tcpListener, false)
308         }
309         if cl.utpSock != nil {
310                 go cl.acceptConnections(cl.utpSock, true)
311         }
312         if !cfg.NoDHT {
313                 dhtCfg := cfg.DHTConfig
314                 if dhtCfg.IPBlocklist == nil {
315                         dhtCfg.IPBlocklist = cl.ipBlockList
316                 }
317                 dhtCfg.Addr = firstNonEmptyString(dhtCfg.Addr, cl.listenAddr, cl.config.ListenAddr)
318                 if dhtCfg.Conn == nil && cl.utpSock != nil {
319                         dhtCfg.Conn = cl.utpSock
320                 }
321                 cl.dHT, err = dht.NewServer(&dhtCfg)
322                 if err != nil {
323                         return
324                 }
325         }
326
327         return
328 }
329
330 func firstNonEmptyString(ss ...string) string {
331         for _, s := range ss {
332                 if s != "" {
333                         return s
334                 }
335         }
336         return ""
337 }
338
339 // Stops the client. All connections to peers are closed and all activity will
340 // come to a halt.
341 func (cl *Client) Close() {
342         cl.mu.Lock()
343         defer cl.mu.Unlock()
344         cl.closed.Set()
345         if cl.dHT != nil {
346                 cl.dHT.Close()
347         }
348         if cl.utpSock != nil {
349                 cl.utpSock.Close()
350         }
351         if cl.tcpListener != nil {
352                 cl.tcpListener.Close()
353         }
354         for _, t := range cl.torrents {
355                 t.close()
356         }
357         cl.event.Broadcast()
358 }
359
360 var ipv6BlockRange = iplist.Range{Description: "non-IPv4 address"}
361
362 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
363         if cl.ipBlockList == nil {
364                 return
365         }
366         ip4 := ip.To4()
367         // If blocklists are enabled, then block non-IPv4 addresses, because
368         // blocklists do not yet support IPv6.
369         if ip4 == nil {
370                 if missinggo.CryHeard() {
371                         log.Printf("blocking non-IPv4 address: %s", ip)
372                 }
373                 r = ipv6BlockRange
374                 blocked = true
375                 return
376         }
377         return cl.ipBlockList.Lookup(ip4)
378 }
379
380 func (cl *Client) waitAccept() {
381         cl.mu.Lock()
382         defer cl.mu.Unlock()
383         for {
384                 for _, t := range cl.torrents {
385                         if cl.wantConns(t) {
386                                 return
387                         }
388                 }
389                 if cl.closed.IsSet() {
390                         return
391                 }
392                 cl.event.Wait()
393         }
394 }
395
396 func (cl *Client) acceptConnections(l net.Listener, utp bool) {
397         for {
398                 cl.waitAccept()
399                 conn, err := l.Accept()
400                 conn = pproffd.WrapNetConn(conn)
401                 if cl.closed.IsSet() {
402                         if conn != nil {
403                                 conn.Close()
404                         }
405                         return
406                 }
407                 if err != nil {
408                         log.Print(err)
409                         // I think something harsher should happen here? Our accept
410                         // routine just fucked off.
411                         return
412                 }
413                 if utp {
414                         acceptUTP.Add(1)
415                 } else {
416                         acceptTCP.Add(1)
417                 }
418                 cl.mu.RLock()
419                 reject := cl.badPeerIPPort(
420                         missinggo.AddrIP(conn.RemoteAddr()),
421                         missinggo.AddrPort(conn.RemoteAddr()))
422                 cl.mu.RUnlock()
423                 if reject {
424                         acceptReject.Add(1)
425                         conn.Close()
426                         continue
427                 }
428                 go cl.incomingConnection(conn, utp)
429         }
430 }
431
432 func (cl *Client) incomingConnection(nc net.Conn, utp bool) {
433         defer nc.Close()
434         if tc, ok := nc.(*net.TCPConn); ok {
435                 tc.SetLinger(0)
436         }
437         c := cl.newConnection(nc)
438         c.Discovery = peerSourceIncoming
439         c.uTP = utp
440         cl.runReceivedConn(c)
441 }
442
443 // Returns a handle to the given torrent, if it's present in the client.
444 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
445         cl.mu.Lock()
446         defer cl.mu.Unlock()
447         t, ok = cl.torrents[ih]
448         return
449 }
450
451 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
452         return cl.torrents[ih]
453 }
454
455 type dialResult struct {
456         Conn net.Conn
457         UTP  bool
458 }
459
460 func doDial(dial func(addr string, t *Torrent) (net.Conn, error), ch chan dialResult, utp bool, addr string, t *Torrent) {
461         conn, err := dial(addr, t)
462         if err != nil {
463                 if conn != nil {
464                         conn.Close()
465                 }
466                 conn = nil // Pedantic
467         }
468         ch <- dialResult{conn, utp}
469         if err == nil {
470                 successfulDials.Add(1)
471                 return
472         }
473         unsuccessfulDials.Add(1)
474 }
475
476 func reducedDialTimeout(max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
477         ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
478         if ret < minDialTimeout {
479                 ret = minDialTimeout
480         }
481         return
482 }
483
484 // Returns whether an address is known to connect to a client with our own ID.
485 func (cl *Client) dopplegangerAddr(addr string) bool {
486         _, ok := cl.dopplegangerAddrs[addr]
487         return ok
488 }
489
490 // Start the process of connecting to the given peer for the given torrent if
491 // appropriate.
492 func (cl *Client) initiateConn(peer Peer, t *Torrent) {
493         if peer.Id == cl.peerID {
494                 return
495         }
496         if cl.badPeerIPPort(peer.IP, peer.Port) {
497                 return
498         }
499         addr := net.JoinHostPort(peer.IP.String(), fmt.Sprintf("%d", peer.Port))
500         if t.addrActive(addr) {
501                 return
502         }
503         t.halfOpen[addr] = struct{}{}
504         go cl.outgoingConnection(t, addr, peer.Source)
505 }
506
507 func (cl *Client) dialTimeout(t *Torrent) time.Duration {
508         cl.mu.Lock()
509         pendingPeers := len(t.peers)
510         cl.mu.Unlock()
511         return reducedDialTimeout(nominalDialTimeout, cl.halfOpenLimit, pendingPeers)
512 }
513
514 func (cl *Client) dialTCP(addr string, t *Torrent) (c net.Conn, err error) {
515         c, err = net.DialTimeout("tcp", addr, cl.dialTimeout(t))
516         if err == nil {
517                 c.(*net.TCPConn).SetLinger(0)
518         }
519         return
520 }
521
522 func (cl *Client) dialUTP(addr string, t *Torrent) (c net.Conn, err error) {
523         return cl.utpSock.DialTimeout(addr, cl.dialTimeout(t))
524 }
525
526 // Returns a connection over UTP or TCP, whichever is first to connect.
527 func (cl *Client) dialFirst(addr string, t *Torrent) (conn net.Conn, utp bool) {
528         // Initiate connections via TCP and UTP simultaneously. Use the first one
529         // that succeeds.
530         left := 0
531         if !cl.config.DisableUTP {
532                 left++
533         }
534         if !cl.config.DisableTCP {
535                 left++
536         }
537         resCh := make(chan dialResult, left)
538         if !cl.config.DisableUTP {
539                 go doDial(cl.dialUTP, resCh, true, addr, t)
540         }
541         if !cl.config.DisableTCP {
542                 go doDial(cl.dialTCP, resCh, false, addr, t)
543         }
544         var res dialResult
545         // Wait for a successful connection.
546         for ; left > 0 && res.Conn == nil; left-- {
547                 res = <-resCh
548         }
549         if left > 0 {
550                 // There are still incompleted dials.
551                 go func() {
552                         for ; left > 0; left-- {
553                                 conn := (<-resCh).Conn
554                                 if conn != nil {
555                                         conn.Close()
556                                 }
557                         }
558                 }()
559         }
560         conn = res.Conn
561         utp = res.UTP
562         return
563 }
564
565 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
566         if _, ok := t.halfOpen[addr]; !ok {
567                 panic("invariant broken")
568         }
569         delete(t.halfOpen, addr)
570         cl.openNewConns(t)
571 }
572
573 // Performs initiator handshakes and returns a connection. Returns nil
574 // *connection if no connection for valid reasons.
575 func (cl *Client) handshakesConnection(nc net.Conn, t *Torrent, encrypted, utp bool) (c *connection, err error) {
576         c = cl.newConnection(nc)
577         c.encrypted = encrypted
578         c.uTP = utp
579         err = nc.SetDeadline(time.Now().Add(handshakesTimeout))
580         if err != nil {
581                 return
582         }
583         ok, err := cl.initiateHandshakes(c, t)
584         if !ok {
585                 c = nil
586         }
587         return
588 }
589
590 // Returns nil connection and nil error if no connection could be established
591 // for valid reasons.
592 func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) {
593         nc, utp := cl.dialFirst(addr, t)
594         if nc == nil {
595                 return
596         }
597         c, err = cl.handshakesConnection(nc, t, !cl.config.DisableEncryption, utp)
598         if err != nil {
599                 nc.Close()
600                 return
601         } else if c != nil {
602                 return
603         }
604         nc.Close()
605         if cl.config.DisableEncryption {
606                 // We already tried without encryption.
607                 return
608         }
609         // Try again without encryption, using whichever protocol type worked last
610         // time.
611         if utp {
612                 nc, err = cl.dialUTP(addr, t)
613         } else {
614                 nc, err = cl.dialTCP(addr, t)
615         }
616         if err != nil {
617                 err = fmt.Errorf("error dialing for unencrypted connection: %s", err)
618                 return
619         }
620         c, err = cl.handshakesConnection(nc, t, false, utp)
621         if err != nil || c == nil {
622                 nc.Close()
623         }
624         return
625 }
626
627 // Called to dial out and run a connection. The addr we're given is already
628 // considered half-open.
629 func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
630         c, err := cl.establishOutgoingConn(t, addr)
631         cl.mu.Lock()
632         defer cl.mu.Unlock()
633         // Don't release lock between here and addConnection, unless it's for
634         // failure.
635         cl.noLongerHalfOpen(t, addr)
636         if err != nil {
637                 if cl.config.Debug {
638                         log.Printf("error establishing outgoing connection: %s", err)
639                 }
640                 return
641         }
642         if c == nil {
643                 return
644         }
645         defer c.Close()
646         c.Discovery = ps
647         cl.runInitiatedHandshookConn(c, t)
648 }
649
650 // The port number for incoming peer connections. 0 if the client isn't
651 // listening.
652 func (cl *Client) incomingPeerPort() int {
653         if cl.listenAddr == "" {
654                 return 0
655         }
656         _, port, err := missinggo.ParseHostPort(cl.listenAddr)
657         if err != nil {
658                 panic(err)
659         }
660         return port
661 }
662
663 // Convert a net.Addr to its compact IP representation. Either 4 or 16 bytes
664 // per "yourip" field of http://www.bittorrent.org/beps/bep_0010.html.
665 func addrCompactIP(addr net.Addr) (string, error) {
666         host, _, err := net.SplitHostPort(addr.String())
667         if err != nil {
668                 return "", err
669         }
670         ip := net.ParseIP(host)
671         if v4 := ip.To4(); v4 != nil {
672                 if len(v4) != 4 {
673                         panic(v4)
674                 }
675                 return string(v4), nil
676         }
677         return string(ip.To16()), nil
678 }
679
680 func handshakeWriter(w io.Writer, bb <-chan []byte, done chan<- error) {
681         var err error
682         for b := range bb {
683                 _, err = w.Write(b)
684                 if err != nil {
685                         break
686                 }
687         }
688         done <- err
689 }
690
691 type (
692         peerExtensionBytes [8]byte
693         peerID             [20]byte
694 )
695
696 func (pex *peerExtensionBytes) SupportsExtended() bool {
697         return pex[5]&0x10 != 0
698 }
699
700 func (pex *peerExtensionBytes) SupportsDHT() bool {
701         return pex[7]&0x01 != 0
702 }
703
704 func (pex *peerExtensionBytes) SupportsFast() bool {
705         return pex[7]&0x04 != 0
706 }
707
708 type handshakeResult struct {
709         peerExtensionBytes
710         peerID
711         metainfo.Hash
712 }
713
714 // ih is nil if we expect the peer to declare the InfoHash, such as when the
715 // peer initiated the connection. Returns ok if the handshake was successful,
716 // and err if there was an unexpected condition other than the peer simply
717 // abandoning the handshake.
718 func handshake(sock io.ReadWriter, ih *metainfo.Hash, peerID [20]byte, extensions peerExtensionBytes) (res handshakeResult, ok bool, err error) {
719         // Bytes to be sent to the peer. Should never block the sender.
720         postCh := make(chan []byte, 4)
721         // A single error value sent when the writer completes.
722         writeDone := make(chan error, 1)
723         // Performs writes to the socket and ensures posts don't block.
724         go handshakeWriter(sock, postCh, writeDone)
725
726         defer func() {
727                 close(postCh) // Done writing.
728                 if !ok {
729                         return
730                 }
731                 if err != nil {
732                         panic(err)
733                 }
734                 // Wait until writes complete before returning from handshake.
735                 err = <-writeDone
736                 if err != nil {
737                         err = fmt.Errorf("error writing: %s", err)
738                 }
739         }()
740
741         post := func(bb []byte) {
742                 select {
743                 case postCh <- bb:
744                 default:
745                         panic("mustn't block while posting")
746                 }
747         }
748
749         post([]byte(pp.Protocol))
750         post(extensions[:])
751         if ih != nil { // We already know what we want.
752                 post(ih[:])
753                 post(peerID[:])
754         }
755         var b [68]byte
756         _, err = io.ReadFull(sock, b[:68])
757         if err != nil {
758                 err = nil
759                 return
760         }
761         if string(b[:20]) != pp.Protocol {
762                 return
763         }
764         missinggo.CopyExact(&res.peerExtensionBytes, b[20:28])
765         missinggo.CopyExact(&res.Hash, b[28:48])
766         missinggo.CopyExact(&res.peerID, b[48:68])
767         peerExtensions.Add(hex.EncodeToString(res.peerExtensionBytes[:]), 1)
768
769         // TODO: Maybe we can just drop peers here if we're not interested. This
770         // could prevent them trying to reconnect, falsely believing there was
771         // just a problem.
772         if ih == nil { // We were waiting for the peer to tell us what they wanted.
773                 post(res.Hash[:])
774                 post(peerID[:])
775         }
776
777         ok = true
778         return
779 }
780
781 // Wraps a raw connection and provides the interface we want for using the
782 // connection in the message loop.
783 type deadlineReader struct {
784         nc net.Conn
785         r  io.Reader
786 }
787
788 func (r deadlineReader) Read(b []byte) (n int, err error) {
789         // Keep-alives should be received every 2 mins. Give a bit of gracetime.
790         err = r.nc.SetReadDeadline(time.Now().Add(150 * time.Second))
791         if err != nil {
792                 err = fmt.Errorf("error setting read deadline: %s", err)
793         }
794         n, err = r.r.Read(b)
795         // Convert common errors into io.EOF.
796         // if err != nil {
797         //      if opError, ok := err.(*net.OpError); ok && opError.Op == "read" && opError.Err == syscall.ECONNRESET {
798         //              err = io.EOF
799         //      } else if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
800         //              if n != 0 {
801         //                      panic(n)
802         //              }
803         //              err = io.EOF
804         //      }
805         // }
806         return
807 }
808
809 type readWriter struct {
810         io.Reader
811         io.Writer
812 }
813
814 func maybeReceiveEncryptedHandshake(rw io.ReadWriter, skeys [][]byte) (ret io.ReadWriter, encrypted bool, err error) {
815         var protocol [len(pp.Protocol)]byte
816         _, err = io.ReadFull(rw, protocol[:])
817         if err != nil {
818                 return
819         }
820         ret = readWriter{
821                 io.MultiReader(bytes.NewReader(protocol[:]), rw),
822                 rw,
823         }
824         if string(protocol[:]) == pp.Protocol {
825                 return
826         }
827         encrypted = true
828         ret, err = mse.ReceiveHandshake(ret, skeys)
829         return
830 }
831
832 func (cl *Client) receiveSkeys() (ret [][]byte) {
833         for ih := range cl.torrents {
834                 ret = append(ret, ih[:])
835         }
836         return
837 }
838
839 func (cl *Client) initiateHandshakes(c *connection, t *Torrent) (ok bool, err error) {
840         if c.encrypted {
841                 c.rw, err = mse.InitiateHandshake(c.rw, t.infoHash[:], nil)
842                 if err != nil {
843                         return
844                 }
845         }
846         ih, ok, err := cl.connBTHandshake(c, &t.infoHash)
847         if ih != t.infoHash {
848                 ok = false
849         }
850         return
851 }
852
853 // Do encryption and bittorrent handshakes as receiver.
854 func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
855         cl.mu.Lock()
856         skeys := cl.receiveSkeys()
857         cl.mu.Unlock()
858         if !cl.config.DisableEncryption {
859                 c.rw, c.encrypted, err = maybeReceiveEncryptedHandshake(c.rw, skeys)
860                 if err != nil {
861                         if err == mse.ErrNoSecretKeyMatch {
862                                 err = nil
863                         }
864                         return
865                 }
866         }
867         ih, ok, err := cl.connBTHandshake(c, nil)
868         if err != nil {
869                 err = fmt.Errorf("error during bt handshake: %s", err)
870                 return
871         }
872         if !ok {
873                 return
874         }
875         cl.mu.Lock()
876         t = cl.torrents[ih]
877         cl.mu.Unlock()
878         return
879 }
880
881 // Returns !ok if handshake failed for valid reasons.
882 func (cl *Client) connBTHandshake(c *connection, ih *metainfo.Hash) (ret metainfo.Hash, ok bool, err error) {
883         res, ok, err := handshake(c.rw, ih, cl.peerID, cl.extensionBytes)
884         if err != nil || !ok {
885                 return
886         }
887         ret = res.Hash
888         c.PeerExtensionBytes = res.peerExtensionBytes
889         c.PeerID = res.peerID
890         c.completedHandshake = time.Now()
891         return
892 }
893
894 func (cl *Client) runInitiatedHandshookConn(c *connection, t *Torrent) {
895         if c.PeerID == cl.peerID {
896                 connsToSelf.Add(1)
897                 addr := c.conn.RemoteAddr().String()
898                 cl.dopplegangerAddrs[addr] = struct{}{}
899                 return
900         }
901         cl.runHandshookConn(c, t)
902 }
903
904 func (cl *Client) runReceivedConn(c *connection) {
905         err := c.conn.SetDeadline(time.Now().Add(handshakesTimeout))
906         if err != nil {
907                 panic(err)
908         }
909         t, err := cl.receiveHandshakes(c)
910         if err != nil {
911                 if cl.config.Debug {
912                         log.Printf("error receiving handshakes: %s", err)
913                 }
914                 return
915         }
916         if t == nil {
917                 return
918         }
919         cl.mu.Lock()
920         defer cl.mu.Unlock()
921         if c.PeerID == cl.peerID {
922                 // Because the remote address is not necessarily the same as its
923                 // client's torrent listen address, we won't record the remote address
924                 // as a doppleganger. Instead, the initiator can record *us* as the
925                 // doppleganger.
926                 return
927         }
928         cl.runHandshookConn(c, t)
929 }
930
931 func (cl *Client) runHandshookConn(c *connection, t *Torrent) {
932         c.conn.SetWriteDeadline(time.Time{})
933         c.rw = readWriter{
934                 deadlineReader{c.conn, c.rw},
935                 c.rw,
936         }
937         completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
938         if !cl.addConnection(t, c) {
939                 return
940         }
941         defer t.dropConnection(c)
942         go c.writer(time.Minute)
943         cl.sendInitialMessages(c, t)
944         err := cl.connectionLoop(t, c)
945         if err != nil && cl.config.Debug {
946                 log.Printf("error during connection loop: %s", err)
947         }
948 }
949
950 func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
951         if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
952                 conn.Post(pp.Message{
953                         Type:       pp.Extended,
954                         ExtendedID: pp.HandshakeExtendedID,
955                         ExtendedPayload: func() []byte {
956                                 d := map[string]interface{}{
957                                         "m": func() (ret map[string]int) {
958                                                 ret = make(map[string]int, 2)
959                                                 ret["ut_metadata"] = metadataExtendedId
960                                                 if !cl.config.DisablePEX {
961                                                         ret["ut_pex"] = pexExtendedId
962                                                 }
963                                                 return
964                                         }(),
965                                         "v": extendedHandshakeClientVersion,
966                                         // No upload queue is implemented yet.
967                                         "reqq": 64,
968                                 }
969                                 if !cl.config.DisableEncryption {
970                                         d["e"] = 1
971                                 }
972                                 if torrent.metadataSizeKnown() {
973                                         d["metadata_size"] = torrent.metadataSize()
974                                 }
975                                 if p := cl.incomingPeerPort(); p != 0 {
976                                         d["p"] = p
977                                 }
978                                 yourip, err := addrCompactIP(conn.remoteAddr())
979                                 if err != nil {
980                                         log.Printf("error calculating yourip field value in extension handshake: %s", err)
981                                 } else {
982                                         d["yourip"] = yourip
983                                 }
984                                 // log.Printf("sending %v", d)
985                                 b, err := bencode.Marshal(d)
986                                 if err != nil {
987                                         panic(err)
988                                 }
989                                 return b
990                         }(),
991                 })
992         }
993         if torrent.haveAnyPieces() {
994                 conn.Bitfield(torrent.bitfield())
995         } else if cl.extensionBytes.SupportsFast() && conn.PeerExtensionBytes.SupportsFast() {
996                 conn.Post(pp.Message{
997                         Type: pp.HaveNone,
998                 })
999         }
1000         if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.dHT != nil {
1001                 conn.Post(pp.Message{
1002                         Type: pp.Port,
1003                         Port: uint16(missinggo.AddrPort(cl.dHT.Addr())),
1004                 })
1005         }
1006 }
1007
1008 func (cl *Client) peerUnchoked(torrent *Torrent, conn *connection) {
1009         conn.updateRequests()
1010 }
1011
1012 func (cl *Client) connCancel(t *Torrent, cn *connection, r request) (ok bool) {
1013         ok = cn.Cancel(r)
1014         if ok {
1015                 postedCancels.Add(1)
1016         }
1017         return
1018 }
1019
1020 func (cl *Client) connDeleteRequest(t *Torrent, cn *connection, r request) bool {
1021         if !cn.RequestPending(r) {
1022                 return false
1023         }
1024         delete(cn.Requests, r)
1025         return true
1026 }
1027
1028 // Process incoming ut_metadata message.
1029 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) (err error) {
1030         var d map[string]int
1031         err = bencode.Unmarshal(payload, &d)
1032         if err != nil {
1033                 err = fmt.Errorf("error unmarshalling payload: %s: %q", err, payload)
1034                 return
1035         }
1036         msgType, ok := d["msg_type"]
1037         if !ok {
1038                 err = errors.New("missing msg_type field")
1039                 return
1040         }
1041         piece := d["piece"]
1042         switch msgType {
1043         case pp.DataMetadataExtensionMsgType:
1044                 if !c.requestedMetadataPiece(piece) {
1045                         err = fmt.Errorf("got unexpected piece %d", piece)
1046                         return
1047                 }
1048                 c.metadataRequests[piece] = false
1049                 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
1050                 if begin < 0 || begin >= len(payload) {
1051                         err = fmt.Errorf("data has bad offset in payload: %d", begin)
1052                         return
1053                 }
1054                 t.saveMetadataPiece(piece, payload[begin:])
1055                 c.UsefulChunksReceived++
1056                 c.lastUsefulChunkReceived = time.Now()
1057                 t.maybeMetadataCompleted()
1058         case pp.RequestMetadataExtensionMsgType:
1059                 if !t.haveMetadataPiece(piece) {
1060                         c.Post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
1061                         break
1062                 }
1063                 start := (1 << 14) * piece
1064                 c.Post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1065         case pp.RejectMetadataExtensionMsgType:
1066         default:
1067                 err = errors.New("unknown msg_type value")
1068         }
1069         return
1070 }
1071
1072 func (cl *Client) upload(t *Torrent, c *connection) {
1073         if cl.config.NoUpload {
1074                 return
1075         }
1076         if !c.PeerInterested {
1077                 return
1078         }
1079         seeding := t.seeding()
1080         if !seeding && !t.connHasWantedPieces(c) {
1081                 return
1082         }
1083 another:
1084         for seeding || c.chunksSent < c.UsefulChunksReceived+6 {
1085                 c.Unchoke()
1086                 for r := range c.PeerRequests {
1087                         err := cl.sendChunk(t, c, r)
1088                         if err != nil {
1089                                 if t.pieceComplete(int(r.Index)) && err == io.ErrUnexpectedEOF {
1090                                         // We had the piece, but not anymore.
1091                                 } else {
1092                                         log.Printf("error sending chunk %+v to peer: %s", r, err)
1093                                 }
1094                                 // If we failed to send a chunk, choke the peer to ensure they
1095                                 // flush all their requests. We've probably dropped a piece,
1096                                 // but there's no way to communicate this to the peer. If they
1097                                 // ask for it again, we'll kick them to allow us to send them
1098                                 // an updated bitfield.
1099                                 break another
1100                         }
1101                         delete(c.PeerRequests, r)
1102                         goto another
1103                 }
1104                 return
1105         }
1106         c.Choke()
1107 }
1108
1109 func (cl *Client) sendChunk(t *Torrent, c *connection, r request) error {
1110         // Count the chunk being sent, even if it isn't.
1111         b := make([]byte, r.Length)
1112         p := t.info.Piece(int(r.Index))
1113         n, err := t.readAt(b, p.Offset()+int64(r.Begin))
1114         if n != len(b) {
1115                 if err == nil {
1116                         panic("expected error")
1117                 }
1118                 return err
1119         }
1120         c.Post(pp.Message{
1121                 Type:  pp.Piece,
1122                 Index: r.Index,
1123                 Begin: r.Begin,
1124                 Piece: b,
1125         })
1126         c.chunksSent++
1127         uploadChunksPosted.Add(1)
1128         c.lastChunkSent = time.Now()
1129         return nil
1130 }
1131
1132 // Processes incoming bittorrent messages. The client lock is held upon entry
1133 // and exit. Returning will end the connection.
1134 func (cl *Client) connectionLoop(t *Torrent, c *connection) error {
1135         decoder := pp.Decoder{
1136                 R:         bufio.NewReader(c.rw),
1137                 MaxLength: 256 * 1024,
1138         }
1139         for {
1140                 cl.mu.Unlock()
1141                 var msg pp.Message
1142                 err := decoder.Decode(&msg)
1143                 cl.mu.Lock()
1144                 if cl.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
1145                         return nil
1146                 }
1147                 if err != nil {
1148                         return err
1149                 }
1150                 c.lastMessageReceived = time.Now()
1151                 if msg.Keepalive {
1152                         receivedKeepalives.Add(1)
1153                         continue
1154                 }
1155                 receivedMessageTypes.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
1156                 switch msg.Type {
1157                 case pp.Choke:
1158                         c.PeerChoked = true
1159                         c.Requests = nil
1160                         // We can then reset our interest.
1161                         c.updateRequests()
1162                 case pp.Reject:
1163                         cl.connDeleteRequest(t, c, newRequest(msg.Index, msg.Begin, msg.Length))
1164                         c.updateRequests()
1165                 case pp.Unchoke:
1166                         c.PeerChoked = false
1167                         cl.peerUnchoked(t, c)
1168                 case pp.Interested:
1169                         c.PeerInterested = true
1170                         cl.upload(t, c)
1171                 case pp.NotInterested:
1172                         c.PeerInterested = false
1173                         c.Choke()
1174                 case pp.Have:
1175                         err = c.peerSentHave(int(msg.Index))
1176                 case pp.Request:
1177                         if c.Choked {
1178                                 break
1179                         }
1180                         if !c.PeerInterested {
1181                                 err = errors.New("peer sent request but isn't interested")
1182                                 break
1183                         }
1184                         if !t.havePiece(msg.Index.Int()) {
1185                                 // This isn't necessarily them screwing up. We can drop pieces
1186                                 // from our storage, and can't communicate this to peers
1187                                 // except by reconnecting.
1188                                 requestsReceivedForMissingPieces.Add(1)
1189                                 err = errors.New("peer requested piece we don't have")
1190                                 break
1191                         }
1192                         if c.PeerRequests == nil {
1193                                 c.PeerRequests = make(map[request]struct{}, maxRequests)
1194                         }
1195                         c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{}
1196                         cl.upload(t, c)
1197                 case pp.Cancel:
1198                         req := newRequest(msg.Index, msg.Begin, msg.Length)
1199                         if !c.PeerCancel(req) {
1200                                 unexpectedCancels.Add(1)
1201                         }
1202                 case pp.Bitfield:
1203                         err = c.peerSentBitfield(msg.Bitfield)
1204                 case pp.HaveAll:
1205                         err = c.peerSentHaveAll()
1206                 case pp.HaveNone:
1207                         err = c.peerSentHaveNone()
1208                 case pp.Piece:
1209                         cl.downloadedChunk(t, c, &msg)
1210                 case pp.Extended:
1211                         switch msg.ExtendedID {
1212                         case pp.HandshakeExtendedID:
1213                                 // TODO: Create a bencode struct for this.
1214                                 var d map[string]interface{}
1215                                 err = bencode.Unmarshal(msg.ExtendedPayload, &d)
1216                                 if err != nil {
1217                                         err = fmt.Errorf("error decoding extended message payload: %s", err)
1218                                         break
1219                                 }
1220                                 // log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d)
1221                                 if reqq, ok := d["reqq"]; ok {
1222                                         if i, ok := reqq.(int64); ok {
1223                                                 c.PeerMaxRequests = int(i)
1224                                         }
1225                                 }
1226                                 if v, ok := d["v"]; ok {
1227                                         c.PeerClientName = v.(string)
1228                                 }
1229                                 m, ok := d["m"]
1230                                 if !ok {
1231                                         err = errors.New("handshake missing m item")
1232                                         break
1233                                 }
1234                                 mTyped, ok := m.(map[string]interface{})
1235                                 if !ok {
1236                                         err = errors.New("handshake m value is not dict")
1237                                         break
1238                                 }
1239                                 if c.PeerExtensionIDs == nil {
1240                                         c.PeerExtensionIDs = make(map[string]byte, len(mTyped))
1241                                 }
1242                                 for name, v := range mTyped {
1243                                         id, ok := v.(int64)
1244                                         if !ok {
1245                                                 log.Printf("bad handshake m item extension ID type: %T", v)
1246                                                 continue
1247                                         }
1248                                         if id == 0 {
1249                                                 delete(c.PeerExtensionIDs, name)
1250                                         } else {
1251                                                 if c.PeerExtensionIDs[name] == 0 {
1252                                                         supportedExtensionMessages.Add(name, 1)
1253                                                 }
1254                                                 c.PeerExtensionIDs[name] = byte(id)
1255                                         }
1256                                 }
1257                                 metadata_sizeUntyped, ok := d["metadata_size"]
1258                                 if ok {
1259                                         metadata_size, ok := metadata_sizeUntyped.(int64)
1260                                         if !ok {
1261                                                 log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
1262                                         } else {
1263                                                 err = t.setMetadataSize(metadata_size)
1264                                                 if err != nil {
1265                                                         err = fmt.Errorf("error setting metadata size to %d", metadata_size)
1266                                                         break
1267                                                 }
1268                                         }
1269                                 }
1270                                 if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
1271                                         c.requestPendingMetadata()
1272                                 }
1273                         case metadataExtendedId:
1274                                 err = cl.gotMetadataExtensionMsg(msg.ExtendedPayload, t, c)
1275                                 if err != nil {
1276                                         err = fmt.Errorf("error handling metadata extension message: %s", err)
1277                                 }
1278                         case pexExtendedId:
1279                                 if cl.config.DisablePEX {
1280                                         break
1281                                 }
1282                                 var pexMsg peerExchangeMessage
1283                                 err = bencode.Unmarshal(msg.ExtendedPayload, &pexMsg)
1284                                 if err != nil {
1285                                         err = fmt.Errorf("error unmarshalling PEX message: %s", err)
1286                                         break
1287                                 }
1288                                 go func() {
1289                                         cl.mu.Lock()
1290                                         cl.addPeers(t, func() (ret []Peer) {
1291                                                 for i, cp := range pexMsg.Added {
1292                                                         p := Peer{
1293                                                                 IP:     make([]byte, 4),
1294                                                                 Port:   cp.Port,
1295                                                                 Source: peerSourcePEX,
1296                                                         }
1297                                                         if i < len(pexMsg.AddedFlags) && pexMsg.AddedFlags[i]&0x01 != 0 {
1298                                                                 p.SupportsEncryption = true
1299                                                         }
1300                                                         missinggo.CopyExact(p.IP, cp.IP[:])
1301                                                         ret = append(ret, p)
1302                                                 }
1303                                                 return
1304                                         }())
1305                                         cl.mu.Unlock()
1306                                 }()
1307                         default:
1308                                 err = fmt.Errorf("unexpected extended message ID: %v", msg.ExtendedID)
1309                         }
1310                         if err != nil {
1311                                 // That client uses its own extension IDs for outgoing message
1312                                 // types, which is incorrect.
1313                                 if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) ||
1314                                         strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
1315                                         return nil
1316                                 }
1317                         }
1318                 case pp.Port:
1319                         if cl.dHT == nil {
1320                                 break
1321                         }
1322                         pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
1323                         if err != nil {
1324                                 panic(err)
1325                         }
1326                         if msg.Port != 0 {
1327                                 pingAddr.Port = int(msg.Port)
1328                         }
1329                         cl.dHT.Ping(pingAddr)
1330                 default:
1331                         err = fmt.Errorf("received unknown message type: %#v", msg.Type)
1332                 }
1333                 if err != nil {
1334                         return err
1335                 }
1336         }
1337 }
1338
1339 // Returns true if the connection is added.
1340 func (cl *Client) addConnection(t *Torrent, c *connection) bool {
1341         if cl.closed.IsSet() {
1342                 return false
1343         }
1344         if !cl.wantConns(t) {
1345                 return false
1346         }
1347         for _, c0 := range t.conns {
1348                 if c.PeerID == c0.PeerID {
1349                         // Already connected to a client with that ID.
1350                         duplicateClientConns.Add(1)
1351                         return false
1352                 }
1353         }
1354         if len(t.conns) >= socketsPerTorrent {
1355                 c := t.worstBadConn(cl)
1356                 if c == nil {
1357                         return false
1358                 }
1359                 if cl.config.Debug && missinggo.CryHeard() {
1360                         log.Printf("%s: dropping connection to make room for new one:\n    %s", t, c)
1361                 }
1362                 c.Close()
1363                 t.deleteConnection(c)
1364         }
1365         if len(t.conns) >= socketsPerTorrent {
1366                 panic(len(t.conns))
1367         }
1368         t.conns = append(t.conns, c)
1369         c.t = t
1370         return true
1371 }
1372
1373 func (cl *Client) usefulConn(t *Torrent, c *connection) bool {
1374         if c.closed.IsSet() {
1375                 return false
1376         }
1377         if !t.haveInfo() {
1378                 return c.supportsExtension("ut_metadata")
1379         }
1380         if t.seeding() {
1381                 return c.PeerInterested
1382         }
1383         return t.connHasWantedPieces(c)
1384 }
1385
1386 func (cl *Client) wantConns(t *Torrent) bool {
1387         if !t.seeding() && !t.needData() {
1388                 return false
1389         }
1390         if len(t.conns) < socketsPerTorrent {
1391                 return true
1392         }
1393         return t.worstBadConn(cl) != nil
1394 }
1395
1396 func (cl *Client) openNewConns(t *Torrent) {
1397         defer t.updateWantPeersEvent()
1398         for len(t.peers) != 0 {
1399                 if !cl.wantConns(t) {
1400                         return
1401                 }
1402                 if len(t.halfOpen) >= cl.halfOpenLimit {
1403                         return
1404                 }
1405                 var (
1406                         k peersKey
1407                         p Peer
1408                 )
1409                 for k, p = range t.peers {
1410                         break
1411                 }
1412                 delete(t.peers, k)
1413                 cl.initiateConn(p, t)
1414         }
1415 }
1416
1417 func (cl *Client) badPeerIPPort(ip net.IP, port int) bool {
1418         if port == 0 {
1419                 return true
1420         }
1421         if cl.dopplegangerAddr(net.JoinHostPort(ip.String(), strconv.FormatInt(int64(port), 10))) {
1422                 return true
1423         }
1424         if _, ok := cl.ipBlockRange(ip); ok {
1425                 return true
1426         }
1427         if _, ok := cl.badPeerIPs[ip.String()]; ok {
1428                 return true
1429         }
1430         return false
1431 }
1432
1433 func (cl *Client) addPeers(t *Torrent, peers []Peer) {
1434         for _, p := range peers {
1435                 if cl.badPeerIPPort(p.IP, p.Port) {
1436                         continue
1437                 }
1438                 t.addPeer(p, cl)
1439         }
1440 }
1441
1442 // Prepare a Torrent without any attachment to a Client. That means we can
1443 // initialize fields all fields that don't require the Client without locking
1444 // it.
1445 func (cl *Client) newTorrent(ih metainfo.Hash) (t *Torrent) {
1446         t = &Torrent{
1447                 cl:        cl,
1448                 infoHash:  ih,
1449                 chunkSize: defaultChunkSize,
1450                 peers:     make(map[peersKey]Peer),
1451
1452                 halfOpen:          make(map[string]struct{}),
1453                 pieceStateChanges: pubsub.NewPubSub(),
1454
1455                 storageOpener: cl.defaultStorage,
1456         }
1457         return
1458 }
1459
1460 func init() {
1461         // For shuffling the tracker tiers.
1462         mathRand.Seed(time.Now().Unix())
1463 }
1464
1465 type trackerTier []string
1466
1467 // The trackers within each tier must be shuffled before use.
1468 // http://stackoverflow.com/a/12267471/149482
1469 // http://www.bittorrent.org/beps/bep_0012.html#order-of-processing
1470 func shuffleTier(tier trackerTier) {
1471         for i := range tier {
1472                 j := mathRand.Intn(i + 1)
1473                 tier[i], tier[j] = tier[j], tier[i]
1474         }
1475 }
1476
1477 // A file-like handle to some torrent data resource.
1478 type Handle interface {
1479         io.Reader
1480         io.Seeker
1481         io.Closer
1482         io.ReaderAt
1483 }
1484
1485 // Specifies a new torrent for adding to a client. There are helpers for
1486 // magnet URIs and torrent metainfo files.
1487 type TorrentSpec struct {
1488         // The tiered tracker URIs.
1489         Trackers [][]string
1490         InfoHash metainfo.Hash
1491         Info     *metainfo.InfoEx
1492         // The name to use if the Name field from the Info isn't available.
1493         DisplayName string
1494         // The chunk size to use for outbound requests. Defaults to 16KiB if not
1495         // set.
1496         ChunkSize int
1497         Storage   storage.Client
1498 }
1499
1500 func TorrentSpecFromMagnetURI(uri string) (spec *TorrentSpec, err error) {
1501         m, err := metainfo.ParseMagnetURI(uri)
1502         if err != nil {
1503                 return
1504         }
1505         spec = &TorrentSpec{
1506                 Trackers:    [][]string{m.Trackers},
1507                 DisplayName: m.DisplayName,
1508                 InfoHash:    m.InfoHash,
1509         }
1510         return
1511 }
1512
1513 func TorrentSpecFromMetaInfo(mi *metainfo.MetaInfo) (spec *TorrentSpec) {
1514         spec = &TorrentSpec{
1515                 Trackers:    mi.AnnounceList,
1516                 Info:        &mi.Info,
1517                 DisplayName: mi.Info.Name,
1518                 InfoHash:    mi.Info.Hash(),
1519         }
1520         if spec.Trackers == nil && mi.Announce != "" {
1521                 spec.Trackers = [][]string{{mi.Announce}}
1522         }
1523         return
1524 }
1525
1526 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1527         cl.mu.Lock()
1528         defer cl.mu.Unlock()
1529         t, ok := cl.torrents[infoHash]
1530         if ok {
1531                 return
1532         }
1533         new = true
1534         t = cl.newTorrent(infoHash)
1535         if cl.dHT != nil {
1536                 go t.announceDHT(true)
1537         }
1538         cl.torrents[infoHash] = t
1539         t.updateWantPeersEvent()
1540         return
1541 }
1542
1543 // Add or merge a torrent spec. If the torrent is already present, the
1544 // trackers will be merged with the existing ones. If the Info isn't yet
1545 // known, it will be set. The display name is replaced if the new spec
1546 // provides one. Returns new if the torrent wasn't already in the client.
1547 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1548         t, new = cl.AddTorrentInfoHash(spec.InfoHash)
1549         if spec.DisplayName != "" {
1550                 t.SetDisplayName(spec.DisplayName)
1551         }
1552         if spec.Info != nil {
1553                 err = t.SetInfoBytes(spec.Info.Bytes)
1554                 if err != nil {
1555                         return
1556                 }
1557         }
1558         cl.mu.Lock()
1559         defer cl.mu.Unlock()
1560         if spec.ChunkSize != 0 {
1561                 t.chunkSize = pp.Integer(spec.ChunkSize)
1562         }
1563         t.addTrackers(spec.Trackers)
1564         t.maybeNewConns()
1565         return
1566 }
1567
1568 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1569         t, ok := cl.torrents[infoHash]
1570         if !ok {
1571                 err = fmt.Errorf("no such torrent")
1572                 return
1573         }
1574         err = t.close()
1575         if err != nil {
1576                 panic(err)
1577         }
1578         delete(cl.torrents, infoHash)
1579         return
1580 }
1581
1582 func (cl *Client) prepareTrackerAnnounceUnlocked(announceURL string) (blocked bool, urlToUse string, host string, err error) {
1583         _url, err := url.Parse(announceURL)
1584         if err != nil {
1585                 return
1586         }
1587         hmp := missinggo.SplitHostMaybePort(_url.Host)
1588         if hmp.Err != nil {
1589                 err = hmp.Err
1590                 return
1591         }
1592         addr, err := net.ResolveIPAddr("ip", hmp.Host)
1593         if err != nil {
1594                 return
1595         }
1596         cl.mu.RLock()
1597         _, blocked = cl.ipBlockRange(addr.IP)
1598         cl.mu.RUnlock()
1599         host = _url.Host
1600         hmp.Host = addr.String()
1601         _url.Host = hmp.String()
1602         urlToUse = _url.String()
1603         return
1604 }
1605
1606 func (cl *Client) allTorrentsCompleted() bool {
1607         for _, t := range cl.torrents {
1608                 if !t.haveInfo() {
1609                         return false
1610                 }
1611                 if t.numPiecesCompleted() != t.numPieces() {
1612                         return false
1613                 }
1614         }
1615         return true
1616 }
1617
1618 // Returns true when all torrents are completely downloaded and false if the
1619 // client is stopped before that.
1620 func (cl *Client) WaitAll() bool {
1621         cl.mu.Lock()
1622         defer cl.mu.Unlock()
1623         for !cl.allTorrentsCompleted() {
1624                 if cl.closed.IsSet() {
1625                         return false
1626                 }
1627                 cl.event.Wait()
1628         }
1629         return true
1630 }
1631
1632 // Handle a received chunk from a peer.
1633 func (cl *Client) downloadedChunk(t *Torrent, c *connection, msg *pp.Message) {
1634         chunksReceived.Add(1)
1635
1636         req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
1637
1638         // Request has been satisfied.
1639         if cl.connDeleteRequest(t, c, req) {
1640                 defer c.updateRequests()
1641         } else {
1642                 unexpectedChunksReceived.Add(1)
1643         }
1644
1645         index := int(req.Index)
1646         piece := &t.pieces[index]
1647
1648         // Do we actually want this chunk?
1649         if !t.wantPiece(req) {
1650                 unwantedChunksReceived.Add(1)
1651                 c.UnwantedChunksReceived++
1652                 return
1653         }
1654
1655         c.UsefulChunksReceived++
1656         c.lastUsefulChunkReceived = time.Now()
1657
1658         cl.upload(t, c)
1659
1660         // Need to record that it hasn't been written yet, before we attempt to do
1661         // anything with it.
1662         piece.incrementPendingWrites()
1663         // Record that we have the chunk.
1664         piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
1665
1666         // Cancel pending requests for this chunk.
1667         for _, c := range t.conns {
1668                 if cl.connCancel(t, c, req) {
1669                         c.updateRequests()
1670                 }
1671         }
1672
1673         cl.mu.Unlock()
1674         // Write the chunk out. Note that the upper bound on chunk writing
1675         // concurrency will be the number of connections.
1676         err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
1677         cl.mu.Lock()
1678
1679         piece.decrementPendingWrites()
1680
1681         if err != nil {
1682                 log.Printf("%s: error writing chunk %v: %s", t, req, err)
1683                 t.pendRequest(req)
1684                 t.updatePieceCompletion(int(msg.Index))
1685                 return
1686         }
1687
1688         // It's important that the piece is potentially queued before we check if
1689         // the piece is still wanted, because if it is queued, it won't be wanted.
1690         if t.pieceAllDirty(index) {
1691                 cl.queuePieceCheck(t, int(req.Index))
1692         }
1693
1694         if c.peerTouchedPieces == nil {
1695                 c.peerTouchedPieces = make(map[int]struct{})
1696         }
1697         c.peerTouchedPieces[index] = struct{}{}
1698
1699         cl.event.Broadcast()
1700         t.publishPieceChange(int(req.Index))
1701         return
1702 }
1703
1704 // Return the connections that touched a piece, and clear the entry while
1705 // doing it.
1706 func (cl *Client) reapPieceTouches(t *Torrent, piece int) (ret []*connection) {
1707         for _, c := range t.conns {
1708                 if _, ok := c.peerTouchedPieces[piece]; ok {
1709                         ret = append(ret, c)
1710                         delete(c.peerTouchedPieces, piece)
1711                 }
1712         }
1713         return
1714 }
1715
1716 func (cl *Client) pieceHashed(t *Torrent, piece int, correct bool) {
1717         p := &t.pieces[piece]
1718         if p.EverHashed {
1719                 // Don't score the first time a piece is hashed, it could be an
1720                 // initial check.
1721                 if correct {
1722                         pieceHashedCorrect.Add(1)
1723                 } else {
1724                         log.Printf("%s: piece %d (%x) failed hash", t, piece, p.Hash)
1725                         pieceHashedNotCorrect.Add(1)
1726                 }
1727         }
1728         p.EverHashed = true
1729         touchers := cl.reapPieceTouches(t, piece)
1730         if correct {
1731                 for _, c := range touchers {
1732                         c.goodPiecesDirtied++
1733                 }
1734                 err := p.Storage().MarkComplete()
1735                 if err != nil {
1736                         log.Printf("%T: error completing piece %d: %s", t.storage, piece, err)
1737                 }
1738                 t.updatePieceCompletion(piece)
1739         } else if len(touchers) != 0 {
1740                 log.Printf("dropping and banning %d conns that touched piece", len(touchers))
1741                 for _, c := range touchers {
1742                         c.badPiecesDirtied++
1743                         t.cl.banPeerIP(missinggo.AddrIP(c.remoteAddr()))
1744                         t.dropConnection(c)
1745                 }
1746         }
1747         cl.pieceChanged(t, piece)
1748 }
1749
1750 func (cl *Client) onCompletedPiece(t *Torrent, piece int) {
1751         t.pendingPieces.Remove(piece)
1752         t.pendAllChunkSpecs(piece)
1753         for _, conn := range t.conns {
1754                 conn.Have(piece)
1755                 for r := range conn.Requests {
1756                         if int(r.Index) == piece {
1757                                 conn.Cancel(r)
1758                         }
1759                 }
1760                 // Could check here if peer doesn't have piece, but due to caching
1761                 // some peers may have said they have a piece but they don't.
1762                 cl.upload(t, conn)
1763         }
1764 }
1765
1766 func (cl *Client) onFailedPiece(t *Torrent, piece int) {
1767         if t.pieceAllDirty(piece) {
1768                 t.pendAllChunkSpecs(piece)
1769         }
1770         if !t.wantPieceIndex(piece) {
1771                 return
1772         }
1773         cl.openNewConns(t)
1774         for _, conn := range t.conns {
1775                 if conn.PeerHasPiece(piece) {
1776                         conn.updateRequests()
1777                 }
1778         }
1779 }
1780
1781 func (cl *Client) pieceChanged(t *Torrent, piece int) {
1782         correct := t.pieceComplete(piece)
1783         defer cl.event.Broadcast()
1784         if correct {
1785                 cl.onCompletedPiece(t, piece)
1786         } else {
1787                 cl.onFailedPiece(t, piece)
1788         }
1789         if t.updatePiecePriority(piece) {
1790                 t.piecePriorityChanged(piece)
1791         }
1792         t.publishPieceChange(piece)
1793 }
1794
1795 func (cl *Client) verifyPiece(t *Torrent, piece int) {
1796         cl.mu.Lock()
1797         defer cl.mu.Unlock()
1798         p := &t.pieces[piece]
1799         for p.Hashing || t.storage == nil {
1800                 cl.event.Wait()
1801         }
1802         p.QueuedForHash = false
1803         if t.closed.IsSet() || t.pieceComplete(piece) {
1804                 t.updatePiecePriority(piece)
1805                 t.publishPieceChange(piece)
1806                 return
1807         }
1808         p.Hashing = true
1809         t.publishPieceChange(piece)
1810         cl.mu.Unlock()
1811         sum := t.hashPiece(piece)
1812         cl.mu.Lock()
1813         p.Hashing = false
1814         cl.pieceHashed(t, piece, sum == p.Hash)
1815 }
1816
1817 // Returns handles to all the torrents loaded in the Client.
1818 func (cl *Client) Torrents() (ret []*Torrent) {
1819         cl.mu.Lock()
1820         for _, t := range cl.torrents {
1821                 ret = append(ret, t)
1822         }
1823         cl.mu.Unlock()
1824         return
1825 }
1826
1827 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
1828         spec, err := TorrentSpecFromMagnetURI(uri)
1829         if err != nil {
1830                 return
1831         }
1832         T, _, err = cl.AddTorrentSpec(spec)
1833         return
1834 }
1835
1836 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
1837         T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
1838         var ss []string
1839         missinggo.CastSlice(&ss, mi.Nodes)
1840         cl.AddDHTNodes(ss)
1841         return
1842 }
1843
1844 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
1845         mi, err := metainfo.LoadFromFile(filename)
1846         if err != nil {
1847                 return
1848         }
1849         return cl.AddTorrent(mi)
1850 }
1851
1852 func (cl *Client) DHT() *dht.Server {
1853         return cl.dHT
1854 }
1855
1856 func (cl *Client) AddDHTNodes(nodes []string) {
1857         for _, n := range nodes {
1858                 hmp := missinggo.SplitHostMaybePort(n)
1859                 ip := net.ParseIP(hmp.Host)
1860                 if ip == nil {
1861                         log.Printf("won't add DHT node with bad IP: %q", hmp.Host)
1862                         continue
1863                 }
1864                 ni := krpc.NodeInfo{
1865                         Addr: &net.UDPAddr{
1866                                 IP:   ip,
1867                                 Port: hmp.Port,
1868                         },
1869                 }
1870                 cl.DHT().AddNode(ni)
1871         }
1872 }
1873
1874 func (cl *Client) banPeerIP(ip net.IP) {
1875         if cl.badPeerIPs == nil {
1876                 cl.badPeerIPs = make(map[string]struct{})
1877         }
1878         cl.badPeerIPs[ip.String()] = struct{}{}
1879 }