]> Sergey Matveev's repositories - btrtrc.git/blob - client.go
Remove the last of the "config dir" stuff
[btrtrc.git] / client.go
1 package torrent
2
3 import (
4         "bufio"
5         "bytes"
6         "crypto/rand"
7         "encoding/hex"
8         "errors"
9         "fmt"
10         "io"
11         "log"
12         "math/big"
13         mathRand "math/rand"
14         "net"
15         "net/url"
16         "sort"
17         "strconv"
18         "strings"
19         "time"
20
21         "github.com/anacrolix/missinggo"
22         "github.com/anacrolix/missinggo/pproffd"
23         "github.com/anacrolix/missinggo/pubsub"
24         "github.com/anacrolix/sync"
25         "github.com/anacrolix/utp"
26
27         "github.com/anacrolix/torrent/bencode"
28         "github.com/anacrolix/torrent/dht"
29         "github.com/anacrolix/torrent/iplist"
30         "github.com/anacrolix/torrent/metainfo"
31         "github.com/anacrolix/torrent/mse"
32         pp "github.com/anacrolix/torrent/peer_protocol"
33         "github.com/anacrolix/torrent/storage"
34         "github.com/anacrolix/torrent/tracker"
35 )
36
37 // Currently doesn't really queue, but should in the future.
38 func (cl *Client) queuePieceCheck(t *Torrent, pieceIndex int) {
39         piece := &t.pieces[pieceIndex]
40         if piece.QueuedForHash {
41                 return
42         }
43         piece.QueuedForHash = true
44         t.publishPieceChange(pieceIndex)
45         go cl.verifyPiece(t, pieceIndex)
46 }
47
48 // Queue a piece check if one isn't already queued, and the piece has never
49 // been checked before.
50 func (cl *Client) queueFirstHash(t *Torrent, piece int) {
51         p := &t.pieces[piece]
52         if p.EverHashed || p.Hashing || p.QueuedForHash || t.pieceComplete(piece) {
53                 return
54         }
55         cl.queuePieceCheck(t, piece)
56 }
57
58 // Clients contain zero or more Torrents. A Client manages a blocklist, the
59 // TCP/UDP protocol ports, and DHT as desired.
60 type Client struct {
61         halfOpenLimit  int
62         peerID         [20]byte
63         listeners      []net.Listener
64         utpSock        *utp.Socket
65         dHT            *dht.Server
66         ipBlockList    iplist.Ranger
67         bannedTorrents map[metainfo.Hash]struct{}
68         config         Config
69         pruneTimer     *time.Timer
70         extensionBytes peerExtensionBytes
71         // Set of addresses that have our client ID. This intentionally will
72         // include ourselves if we end up trying to connect to our own address
73         // through legitimate channels.
74         dopplegangerAddrs map[string]struct{}
75
76         defaultStorage storage.I
77
78         mu     sync.RWMutex
79         event  sync.Cond
80         closed missinggo.Event
81
82         torrents map[metainfo.Hash]*Torrent
83 }
84
85 func (cl *Client) IPBlockList() iplist.Ranger {
86         cl.mu.Lock()
87         defer cl.mu.Unlock()
88         return cl.ipBlockList
89 }
90
91 func (cl *Client) SetIPBlockList(list iplist.Ranger) {
92         cl.mu.Lock()
93         defer cl.mu.Unlock()
94         cl.ipBlockList = list
95         if cl.dHT != nil {
96                 cl.dHT.SetIPBlockList(list)
97         }
98 }
99
100 func (cl *Client) PeerID() string {
101         return string(cl.peerID[:])
102 }
103
104 func (cl *Client) ListenAddr() (addr net.Addr) {
105         for _, l := range cl.listeners {
106                 addr = l.Addr()
107                 break
108         }
109         return
110 }
111
112 type hashSorter struct {
113         Hashes []metainfo.Hash
114 }
115
116 func (hs hashSorter) Len() int {
117         return len(hs.Hashes)
118 }
119
120 func (hs hashSorter) Less(a, b int) bool {
121         return (&big.Int{}).SetBytes(hs.Hashes[a][:]).Cmp((&big.Int{}).SetBytes(hs.Hashes[b][:])) < 0
122 }
123
124 func (hs hashSorter) Swap(a, b int) {
125         hs.Hashes[a], hs.Hashes[b] = hs.Hashes[b], hs.Hashes[a]
126 }
127
128 func (cl *Client) sortedTorrents() (ret []*Torrent) {
129         var hs hashSorter
130         for ih := range cl.torrents {
131                 hs.Hashes = append(hs.Hashes, ih)
132         }
133         sort.Sort(hs)
134         for _, ih := range hs.Hashes {
135                 ret = append(ret, cl.torrent(ih))
136         }
137         return
138 }
139
140 // Writes out a human readable status of the client, such as for writing to a
141 // HTTP status page.
142 func (cl *Client) WriteStatus(_w io.Writer) {
143         cl.mu.RLock()
144         defer cl.mu.RUnlock()
145         w := bufio.NewWriter(_w)
146         defer w.Flush()
147         if addr := cl.ListenAddr(); addr != nil {
148                 fmt.Fprintf(w, "Listening on %s\n", cl.ListenAddr())
149         } else {
150                 fmt.Fprintln(w, "Not listening!")
151         }
152         fmt.Fprintf(w, "Peer ID: %+q\n", cl.peerID)
153         if cl.dHT != nil {
154                 dhtStats := cl.dHT.Stats()
155                 fmt.Fprintf(w, "DHT nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
156                 fmt.Fprintf(w, "DHT Server ID: %x\n", cl.dHT.ID())
157                 fmt.Fprintf(w, "DHT port: %d\n", missinggo.AddrPort(cl.dHT.Addr()))
158                 fmt.Fprintf(w, "DHT announces: %d\n", dhtStats.ConfirmedAnnounces)
159                 fmt.Fprintf(w, "Outstanding transactions: %d\n", dhtStats.OutstandingTransactions)
160         }
161         fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrents))
162         fmt.Fprintln(w)
163         for _, t := range cl.sortedTorrents() {
164                 if t.name() == "" {
165                         fmt.Fprint(w, "<unknown name>")
166                 } else {
167                         fmt.Fprint(w, t.name())
168                 }
169                 fmt.Fprint(w, "\n")
170                 if t.haveInfo() {
171                         fmt.Fprintf(w, "%f%% of %d bytes", 100*(1-float64(t.bytesLeft())/float64(t.length)), t.length)
172                 } else {
173                         w.WriteString("<missing metainfo>")
174                 }
175                 fmt.Fprint(w, "\n")
176                 t.writeStatus(w, cl)
177                 fmt.Fprintln(w)
178         }
179 }
180
181 // Creates a new client.
182 func NewClient(cfg *Config) (cl *Client, err error) {
183         if cfg == nil {
184                 cfg = &Config{}
185         }
186
187         defer func() {
188                 if err != nil {
189                         cl = nil
190                 }
191         }()
192         cl = &Client{
193                 halfOpenLimit:     socketsPerTorrent,
194                 config:            *cfg,
195                 defaultStorage:    cfg.DefaultStorage,
196                 dopplegangerAddrs: make(map[string]struct{}),
197                 torrents:          make(map[metainfo.Hash]*Torrent),
198         }
199         missinggo.CopyExact(&cl.extensionBytes, defaultExtensionBytes)
200         cl.event.L = &cl.mu
201         if cl.defaultStorage == nil {
202                 cl.defaultStorage = storage.NewFile(cfg.DataDir)
203         }
204         if cfg.IPBlocklist != nil {
205                 cl.ipBlockList = cfg.IPBlocklist
206         }
207
208         if cfg.PeerID != "" {
209                 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
210         } else {
211                 o := copy(cl.peerID[:], bep20)
212                 _, err = rand.Read(cl.peerID[o:])
213                 if err != nil {
214                         panic("error generating peer id")
215                 }
216         }
217
218         // Returns the laddr string to listen on for the next Listen call.
219         listenAddr := func() string {
220                 if addr := cl.ListenAddr(); addr != nil {
221                         return addr.String()
222                 }
223                 if cfg.ListenAddr == "" {
224                         return ":50007"
225                 }
226                 return cfg.ListenAddr
227         }
228         if !cl.config.DisableTCP {
229                 var l net.Listener
230                 l, err = net.Listen(func() string {
231                         if cl.config.DisableIPv6 {
232                                 return "tcp4"
233                         } else {
234                                 return "tcp"
235                         }
236                 }(), listenAddr())
237                 if err != nil {
238                         return
239                 }
240                 cl.listeners = append(cl.listeners, l)
241                 go cl.acceptConnections(l, false)
242         }
243         if !cl.config.DisableUTP {
244                 cl.utpSock, err = utp.NewSocket(func() string {
245                         if cl.config.DisableIPv6 {
246                                 return "udp4"
247                         } else {
248                                 return "udp"
249                         }
250                 }(), listenAddr())
251                 if err != nil {
252                         return
253                 }
254                 cl.listeners = append(cl.listeners, cl.utpSock)
255                 go cl.acceptConnections(cl.utpSock, true)
256         }
257         if !cfg.NoDHT {
258                 dhtCfg := cfg.DHTConfig
259                 if dhtCfg.IPBlocklist == nil {
260                         dhtCfg.IPBlocklist = cl.ipBlockList
261                 }
262                 if dhtCfg.Addr == "" {
263                         dhtCfg.Addr = listenAddr()
264                 }
265                 if dhtCfg.Conn == nil && cl.utpSock != nil {
266                         dhtCfg.Conn = cl.utpSock
267                 }
268                 cl.dHT, err = dht.NewServer(&dhtCfg)
269                 if err != nil {
270                         return
271                 }
272         }
273
274         return
275 }
276
277 // Stops the client. All connections to peers are closed and all activity will
278 // come to a halt.
279 func (cl *Client) Close() {
280         cl.mu.Lock()
281         defer cl.mu.Unlock()
282         cl.closed.Set()
283         if cl.dHT != nil {
284                 cl.dHT.Close()
285         }
286         for _, l := range cl.listeners {
287                 l.Close()
288         }
289         for _, t := range cl.torrents {
290                 t.close()
291         }
292         cl.event.Broadcast()
293 }
294
295 var ipv6BlockRange = iplist.Range{Description: "non-IPv4 address"}
296
297 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
298         if cl.ipBlockList == nil {
299                 return
300         }
301         ip4 := ip.To4()
302         // If blocklists are enabled, then block non-IPv4 addresses, because
303         // blocklists do not yet support IPv6.
304         if ip4 == nil {
305                 if missinggo.CryHeard() {
306                         log.Printf("blocking non-IPv4 address: %s", ip)
307                 }
308                 r = ipv6BlockRange
309                 blocked = true
310                 return
311         }
312         return cl.ipBlockList.Lookup(ip4)
313 }
314
315 func (cl *Client) waitAccept() {
316         cl.mu.Lock()
317         defer cl.mu.Unlock()
318         for {
319                 for _, t := range cl.torrents {
320                         if cl.wantConns(t) {
321                                 return
322                         }
323                 }
324                 if cl.closed.IsSet() {
325                         return
326                 }
327                 cl.event.Wait()
328         }
329 }
330
331 func (cl *Client) acceptConnections(l net.Listener, utp bool) {
332         for {
333                 cl.waitAccept()
334                 conn, err := l.Accept()
335                 conn = pproffd.WrapNetConn(conn)
336                 if cl.closed.IsSet() {
337                         if conn != nil {
338                                 conn.Close()
339                         }
340                         return
341                 }
342                 if err != nil {
343                         log.Print(err)
344                         // I think something harsher should happen here? Our accept
345                         // routine just fucked off.
346                         return
347                 }
348                 if utp {
349                         acceptUTP.Add(1)
350                 } else {
351                         acceptTCP.Add(1)
352                 }
353                 cl.mu.RLock()
354                 doppleganger := cl.dopplegangerAddr(conn.RemoteAddr().String())
355                 _, blocked := cl.ipBlockRange(missinggo.AddrIP(conn.RemoteAddr()))
356                 cl.mu.RUnlock()
357                 if blocked || doppleganger {
358                         acceptReject.Add(1)
359                         // log.Printf("inbound connection from %s blocked by %s", conn.RemoteAddr(), blockRange)
360                         conn.Close()
361                         continue
362                 }
363                 go cl.incomingConnection(conn, utp)
364         }
365 }
366
367 func (cl *Client) incomingConnection(nc net.Conn, utp bool) {
368         defer nc.Close()
369         if tc, ok := nc.(*net.TCPConn); ok {
370                 tc.SetLinger(0)
371         }
372         c := newConnection()
373         c.conn = nc
374         c.rw = nc
375         c.Discovery = peerSourceIncoming
376         c.uTP = utp
377         err := cl.runReceivedConn(c)
378         if err != nil {
379                 // log.Print(err)
380         }
381 }
382
383 // Returns a handle to the given torrent, if it's present in the client.
384 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
385         cl.mu.Lock()
386         defer cl.mu.Unlock()
387         t, ok = cl.torrents[ih]
388         return
389 }
390
391 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
392         return cl.torrents[ih]
393 }
394
395 type dialResult struct {
396         Conn net.Conn
397         UTP  bool
398 }
399
400 func doDial(dial func(addr string, t *Torrent) (net.Conn, error), ch chan dialResult, utp bool, addr string, t *Torrent) {
401         conn, err := dial(addr, t)
402         if err != nil {
403                 if conn != nil {
404                         conn.Close()
405                 }
406                 conn = nil // Pedantic
407         }
408         ch <- dialResult{conn, utp}
409         if err == nil {
410                 successfulDials.Add(1)
411                 return
412         }
413         unsuccessfulDials.Add(1)
414 }
415
416 func reducedDialTimeout(max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
417         ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
418         if ret < minDialTimeout {
419                 ret = minDialTimeout
420         }
421         return
422 }
423
424 // Returns whether an address is known to connect to a client with our own ID.
425 func (cl *Client) dopplegangerAddr(addr string) bool {
426         _, ok := cl.dopplegangerAddrs[addr]
427         return ok
428 }
429
430 // Start the process of connecting to the given peer for the given torrent if
431 // appropriate.
432 func (cl *Client) initiateConn(peer Peer, t *Torrent) {
433         if peer.Id == cl.peerID {
434                 return
435         }
436         addr := net.JoinHostPort(peer.IP.String(), fmt.Sprintf("%d", peer.Port))
437         if cl.dopplegangerAddr(addr) || t.addrActive(addr) {
438                 duplicateConnsAvoided.Add(1)
439                 return
440         }
441         if r, ok := cl.ipBlockRange(peer.IP); ok {
442                 log.Printf("outbound connect to %s blocked by IP blocklist rule %s", peer.IP, r)
443                 return
444         }
445         t.halfOpen[addr] = struct{}{}
446         go cl.outgoingConnection(t, addr, peer.Source)
447 }
448
449 func (cl *Client) dialTimeout(t *Torrent) time.Duration {
450         cl.mu.Lock()
451         pendingPeers := len(t.peers)
452         cl.mu.Unlock()
453         return reducedDialTimeout(nominalDialTimeout, cl.halfOpenLimit, pendingPeers)
454 }
455
456 func (cl *Client) dialTCP(addr string, t *Torrent) (c net.Conn, err error) {
457         c, err = net.DialTimeout("tcp", addr, cl.dialTimeout(t))
458         if err == nil {
459                 c.(*net.TCPConn).SetLinger(0)
460         }
461         return
462 }
463
464 func (cl *Client) dialUTP(addr string, t *Torrent) (c net.Conn, err error) {
465         return cl.utpSock.DialTimeout(addr, cl.dialTimeout(t))
466 }
467
468 // Returns a connection over UTP or TCP, whichever is first to connect.
469 func (cl *Client) dialFirst(addr string, t *Torrent) (conn net.Conn, utp bool) {
470         // Initiate connections via TCP and UTP simultaneously. Use the first one
471         // that succeeds.
472         left := 0
473         if !cl.config.DisableUTP {
474                 left++
475         }
476         if !cl.config.DisableTCP {
477                 left++
478         }
479         resCh := make(chan dialResult, left)
480         if !cl.config.DisableUTP {
481                 go doDial(cl.dialUTP, resCh, true, addr, t)
482         }
483         if !cl.config.DisableTCP {
484                 go doDial(cl.dialTCP, resCh, false, addr, t)
485         }
486         var res dialResult
487         // Wait for a successful connection.
488         for ; left > 0 && res.Conn == nil; left-- {
489                 res = <-resCh
490         }
491         if left > 0 {
492                 // There are still incompleted dials.
493                 go func() {
494                         for ; left > 0; left-- {
495                                 conn := (<-resCh).Conn
496                                 if conn != nil {
497                                         conn.Close()
498                                 }
499                         }
500                 }()
501         }
502         conn = res.Conn
503         utp = res.UTP
504         return
505 }
506
507 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
508         if _, ok := t.halfOpen[addr]; !ok {
509                 panic("invariant broken")
510         }
511         delete(t.halfOpen, addr)
512         cl.openNewConns(t)
513 }
514
515 // Performs initiator handshakes and returns a connection. Returns nil
516 // *connection if no connection for valid reasons.
517 func (cl *Client) handshakesConnection(nc net.Conn, t *Torrent, encrypted, utp bool) (c *connection, err error) {
518         c = newConnection()
519         c.conn = nc
520         c.rw = nc
521         c.encrypted = encrypted
522         c.uTP = utp
523         err = nc.SetDeadline(time.Now().Add(handshakesTimeout))
524         if err != nil {
525                 return
526         }
527         ok, err := cl.initiateHandshakes(c, t)
528         if !ok {
529                 c = nil
530         }
531         return
532 }
533
534 // Returns nil connection and nil error if no connection could be established
535 // for valid reasons.
536 func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) {
537         nc, utp := cl.dialFirst(addr, t)
538         if nc == nil {
539                 return
540         }
541         c, err = cl.handshakesConnection(nc, t, !cl.config.DisableEncryption, utp)
542         if err != nil {
543                 nc.Close()
544                 return
545         } else if c != nil {
546                 return
547         }
548         nc.Close()
549         if cl.config.DisableEncryption {
550                 // We already tried without encryption.
551                 return
552         }
553         // Try again without encryption, using whichever protocol type worked last
554         // time.
555         if utp {
556                 nc, err = cl.dialUTP(addr, t)
557         } else {
558                 nc, err = cl.dialTCP(addr, t)
559         }
560         if err != nil {
561                 err = fmt.Errorf("error dialing for unencrypted connection: %s", err)
562                 return
563         }
564         c, err = cl.handshakesConnection(nc, t, false, utp)
565         if err != nil || c == nil {
566                 nc.Close()
567         }
568         return
569 }
570
571 // Called to dial out and run a connection. The addr we're given is already
572 // considered half-open.
573 func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
574         c, err := cl.establishOutgoingConn(t, addr)
575         cl.mu.Lock()
576         defer cl.mu.Unlock()
577         // Don't release lock between here and addConnection, unless it's for
578         // failure.
579         cl.noLongerHalfOpen(t, addr)
580         if err != nil {
581                 if cl.config.Debug {
582                         log.Printf("error establishing outgoing connection: %s", err)
583                 }
584                 return
585         }
586         if c == nil {
587                 return
588         }
589         defer c.Close()
590         c.Discovery = ps
591         err = cl.runInitiatedHandshookConn(c, t)
592         if err != nil {
593                 if cl.config.Debug {
594                         log.Printf("error in established outgoing connection: %s", err)
595                 }
596         }
597 }
598
599 // The port number for incoming peer connections. 0 if the client isn't
600 // listening.
601 func (cl *Client) incomingPeerPort() int {
602         listenAddr := cl.ListenAddr()
603         if listenAddr == nil {
604                 return 0
605         }
606         return missinggo.AddrPort(listenAddr)
607 }
608
609 // Convert a net.Addr to its compact IP representation. Either 4 or 16 bytes
610 // per "yourip" field of http://www.bittorrent.org/beps/bep_0010.html.
611 func addrCompactIP(addr net.Addr) (string, error) {
612         host, _, err := net.SplitHostPort(addr.String())
613         if err != nil {
614                 return "", err
615         }
616         ip := net.ParseIP(host)
617         if v4 := ip.To4(); v4 != nil {
618                 if len(v4) != 4 {
619                         panic(v4)
620                 }
621                 return string(v4), nil
622         }
623         return string(ip.To16()), nil
624 }
625
626 func handshakeWriter(w io.Writer, bb <-chan []byte, done chan<- error) {
627         var err error
628         for b := range bb {
629                 _, err = w.Write(b)
630                 if err != nil {
631                         break
632                 }
633         }
634         done <- err
635 }
636
637 type (
638         peerExtensionBytes [8]byte
639         peerID             [20]byte
640 )
641
642 func (pex *peerExtensionBytes) SupportsExtended() bool {
643         return pex[5]&0x10 != 0
644 }
645
646 func (pex *peerExtensionBytes) SupportsDHT() bool {
647         return pex[7]&0x01 != 0
648 }
649
650 func (pex *peerExtensionBytes) SupportsFast() bool {
651         return pex[7]&0x04 != 0
652 }
653
654 type handshakeResult struct {
655         peerExtensionBytes
656         peerID
657         metainfo.Hash
658 }
659
660 // ih is nil if we expect the peer to declare the InfoHash, such as when the
661 // peer initiated the connection. Returns ok if the handshake was successful,
662 // and err if there was an unexpected condition other than the peer simply
663 // abandoning the handshake.
664 func handshake(sock io.ReadWriter, ih *metainfo.Hash, peerID [20]byte, extensions peerExtensionBytes) (res handshakeResult, ok bool, err error) {
665         // Bytes to be sent to the peer. Should never block the sender.
666         postCh := make(chan []byte, 4)
667         // A single error value sent when the writer completes.
668         writeDone := make(chan error, 1)
669         // Performs writes to the socket and ensures posts don't block.
670         go handshakeWriter(sock, postCh, writeDone)
671
672         defer func() {
673                 close(postCh) // Done writing.
674                 if !ok {
675                         return
676                 }
677                 if err != nil {
678                         panic(err)
679                 }
680                 // Wait until writes complete before returning from handshake.
681                 err = <-writeDone
682                 if err != nil {
683                         err = fmt.Errorf("error writing: %s", err)
684                 }
685         }()
686
687         post := func(bb []byte) {
688                 select {
689                 case postCh <- bb:
690                 default:
691                         panic("mustn't block while posting")
692                 }
693         }
694
695         post([]byte(pp.Protocol))
696         post(extensions[:])
697         if ih != nil { // We already know what we want.
698                 post(ih[:])
699                 post(peerID[:])
700         }
701         var b [68]byte
702         _, err = io.ReadFull(sock, b[:68])
703         if err != nil {
704                 err = nil
705                 return
706         }
707         if string(b[:20]) != pp.Protocol {
708                 return
709         }
710         missinggo.CopyExact(&res.peerExtensionBytes, b[20:28])
711         missinggo.CopyExact(&res.Hash, b[28:48])
712         missinggo.CopyExact(&res.peerID, b[48:68])
713         peerExtensions.Add(hex.EncodeToString(res.peerExtensionBytes[:]), 1)
714
715         // TODO: Maybe we can just drop peers here if we're not interested. This
716         // could prevent them trying to reconnect, falsely believing there was
717         // just a problem.
718         if ih == nil { // We were waiting for the peer to tell us what they wanted.
719                 post(res.Hash[:])
720                 post(peerID[:])
721         }
722
723         ok = true
724         return
725 }
726
727 // Wraps a raw connection and provides the interface we want for using the
728 // connection in the message loop.
729 type deadlineReader struct {
730         nc net.Conn
731         r  io.Reader
732 }
733
734 func (r deadlineReader) Read(b []byte) (n int, err error) {
735         // Keep-alives should be received every 2 mins. Give a bit of gracetime.
736         err = r.nc.SetReadDeadline(time.Now().Add(150 * time.Second))
737         if err != nil {
738                 err = fmt.Errorf("error setting read deadline: %s", err)
739         }
740         n, err = r.r.Read(b)
741         // Convert common errors into io.EOF.
742         // if err != nil {
743         //      if opError, ok := err.(*net.OpError); ok && opError.Op == "read" && opError.Err == syscall.ECONNRESET {
744         //              err = io.EOF
745         //      } else if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
746         //              if n != 0 {
747         //                      panic(n)
748         //              }
749         //              err = io.EOF
750         //      }
751         // }
752         return
753 }
754
755 type readWriter struct {
756         io.Reader
757         io.Writer
758 }
759
760 func maybeReceiveEncryptedHandshake(rw io.ReadWriter, skeys [][]byte) (ret io.ReadWriter, encrypted bool, err error) {
761         var protocol [len(pp.Protocol)]byte
762         _, err = io.ReadFull(rw, protocol[:])
763         if err != nil {
764                 return
765         }
766         ret = readWriter{
767                 io.MultiReader(bytes.NewReader(protocol[:]), rw),
768                 rw,
769         }
770         if string(protocol[:]) == pp.Protocol {
771                 return
772         }
773         encrypted = true
774         ret, err = mse.ReceiveHandshake(ret, skeys)
775         return
776 }
777
778 func (cl *Client) receiveSkeys() (ret [][]byte) {
779         for ih := range cl.torrents {
780                 ret = append(ret, ih[:])
781         }
782         return
783 }
784
785 func (cl *Client) initiateHandshakes(c *connection, t *Torrent) (ok bool, err error) {
786         if c.encrypted {
787                 c.rw, err = mse.InitiateHandshake(c.rw, t.infoHash[:], nil)
788                 if err != nil {
789                         return
790                 }
791         }
792         ih, ok, err := cl.connBTHandshake(c, &t.infoHash)
793         if ih != t.infoHash {
794                 ok = false
795         }
796         return
797 }
798
799 // Do encryption and bittorrent handshakes as receiver.
800 func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
801         cl.mu.Lock()
802         skeys := cl.receiveSkeys()
803         cl.mu.Unlock()
804         if !cl.config.DisableEncryption {
805                 c.rw, c.encrypted, err = maybeReceiveEncryptedHandshake(c.rw, skeys)
806                 if err != nil {
807                         if err == mse.ErrNoSecretKeyMatch {
808                                 err = nil
809                         }
810                         return
811                 }
812         }
813         ih, ok, err := cl.connBTHandshake(c, nil)
814         if err != nil {
815                 err = fmt.Errorf("error during bt handshake: %s", err)
816                 return
817         }
818         if !ok {
819                 return
820         }
821         cl.mu.Lock()
822         t = cl.torrents[ih]
823         cl.mu.Unlock()
824         return
825 }
826
827 // Returns !ok if handshake failed for valid reasons.
828 func (cl *Client) connBTHandshake(c *connection, ih *metainfo.Hash) (ret metainfo.Hash, ok bool, err error) {
829         res, ok, err := handshake(c.rw, ih, cl.peerID, cl.extensionBytes)
830         if err != nil || !ok {
831                 return
832         }
833         ret = res.Hash
834         c.PeerExtensionBytes = res.peerExtensionBytes
835         c.PeerID = res.peerID
836         c.completedHandshake = time.Now()
837         return
838 }
839
840 func (cl *Client) runInitiatedHandshookConn(c *connection, t *Torrent) (err error) {
841         if c.PeerID == cl.peerID {
842                 // Only if we initiated the connection is the remote address a
843                 // listen addr for a doppleganger.
844                 connsToSelf.Add(1)
845                 addr := c.conn.RemoteAddr().String()
846                 cl.dopplegangerAddrs[addr] = struct{}{}
847                 return
848         }
849         return cl.runHandshookConn(c, t)
850 }
851
852 func (cl *Client) runReceivedConn(c *connection) (err error) {
853         err = c.conn.SetDeadline(time.Now().Add(handshakesTimeout))
854         if err != nil {
855                 return
856         }
857         t, err := cl.receiveHandshakes(c)
858         if err != nil {
859                 err = fmt.Errorf("error receiving handshakes: %s", err)
860                 return
861         }
862         if t == nil {
863                 return
864         }
865         cl.mu.Lock()
866         defer cl.mu.Unlock()
867         if c.PeerID == cl.peerID {
868                 return
869         }
870         return cl.runHandshookConn(c, t)
871 }
872
873 func (cl *Client) runHandshookConn(c *connection, t *Torrent) (err error) {
874         c.conn.SetWriteDeadline(time.Time{})
875         c.rw = readWriter{
876                 deadlineReader{c.conn, c.rw},
877                 c.rw,
878         }
879         completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
880         if !cl.addConnection(t, c) {
881                 return
882         }
883         defer cl.dropConnection(t, c)
884         go c.writer()
885         go c.writeOptimizer(time.Minute)
886         cl.sendInitialMessages(c, t)
887         err = cl.connectionLoop(t, c)
888         if err != nil {
889                 err = fmt.Errorf("error during connection loop: %s", err)
890         }
891         return
892 }
893
894 func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
895         if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
896                 conn.Post(pp.Message{
897                         Type:       pp.Extended,
898                         ExtendedID: pp.HandshakeExtendedID,
899                         ExtendedPayload: func() []byte {
900                                 d := map[string]interface{}{
901                                         "m": func() (ret map[string]int) {
902                                                 ret = make(map[string]int, 2)
903                                                 ret["ut_metadata"] = metadataExtendedId
904                                                 if !cl.config.DisablePEX {
905                                                         ret["ut_pex"] = pexExtendedId
906                                                 }
907                                                 return
908                                         }(),
909                                         "v": extendedHandshakeClientVersion,
910                                         // No upload queue is implemented yet.
911                                         "reqq": 64,
912                                 }
913                                 if !cl.config.DisableEncryption {
914                                         d["e"] = 1
915                                 }
916                                 if torrent.metadataSizeKnown() {
917                                         d["metadata_size"] = torrent.metadataSize()
918                                 }
919                                 if p := cl.incomingPeerPort(); p != 0 {
920                                         d["p"] = p
921                                 }
922                                 yourip, err := addrCompactIP(conn.remoteAddr())
923                                 if err != nil {
924                                         log.Printf("error calculating yourip field value in extension handshake: %s", err)
925                                 } else {
926                                         d["yourip"] = yourip
927                                 }
928                                 // log.Printf("sending %v", d)
929                                 b, err := bencode.Marshal(d)
930                                 if err != nil {
931                                         panic(err)
932                                 }
933                                 return b
934                         }(),
935                 })
936         }
937         if torrent.haveAnyPieces() {
938                 conn.Bitfield(torrent.bitfield())
939         } else if cl.extensionBytes.SupportsFast() && conn.PeerExtensionBytes.SupportsFast() {
940                 conn.Post(pp.Message{
941                         Type: pp.HaveNone,
942                 })
943         }
944         if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.dHT != nil {
945                 conn.Post(pp.Message{
946                         Type: pp.Port,
947                         Port: uint16(missinggo.AddrPort(cl.dHT.Addr())),
948                 })
949         }
950 }
951
952 func (cl *Client) peerUnchoked(torrent *Torrent, conn *connection) {
953         conn.updateRequests()
954 }
955
956 func (cl *Client) connCancel(t *Torrent, cn *connection, r request) (ok bool) {
957         ok = cn.Cancel(r)
958         if ok {
959                 postedCancels.Add(1)
960         }
961         return
962 }
963
964 func (cl *Client) connDeleteRequest(t *Torrent, cn *connection, r request) bool {
965         if !cn.RequestPending(r) {
966                 return false
967         }
968         delete(cn.Requests, r)
969         return true
970 }
971
972 func (cl *Client) requestPendingMetadata(t *Torrent, c *connection) {
973         if t.haveInfo() {
974                 return
975         }
976         if c.PeerExtensionIDs["ut_metadata"] == 0 {
977                 // Peer doesn't support this.
978                 return
979         }
980         // Request metadata pieces that we don't have in a random order.
981         var pending []int
982         for index := 0; index < t.metadataPieceCount(); index++ {
983                 if !t.haveMetadataPiece(index) && !c.requestedMetadataPiece(index) {
984                         pending = append(pending, index)
985                 }
986         }
987         for _, i := range mathRand.Perm(len(pending)) {
988                 c.requestMetadataPiece(pending[i])
989         }
990 }
991
992 // Process incoming ut_metadata message.
993 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) (err error) {
994         var d map[string]int
995         err = bencode.Unmarshal(payload, &d)
996         if err != nil {
997                 err = fmt.Errorf("error unmarshalling payload: %s: %q", err, payload)
998                 return
999         }
1000         msgType, ok := d["msg_type"]
1001         if !ok {
1002                 err = errors.New("missing msg_type field")
1003                 return
1004         }
1005         piece := d["piece"]
1006         switch msgType {
1007         case pp.DataMetadataExtensionMsgType:
1008                 if !c.requestedMetadataPiece(piece) {
1009                         err = fmt.Errorf("got unexpected piece %d", piece)
1010                         return
1011                 }
1012                 c.metadataRequests[piece] = false
1013                 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
1014                 if begin < 0 || begin >= len(payload) {
1015                         err = fmt.Errorf("data has bad offset in payload: %d", begin)
1016                         return
1017                 }
1018                 t.saveMetadataPiece(piece, payload[begin:])
1019                 c.UsefulChunksReceived++
1020                 c.lastUsefulChunkReceived = time.Now()
1021                 t.maybeMetadataCompleted()
1022         case pp.RequestMetadataExtensionMsgType:
1023                 if !t.haveMetadataPiece(piece) {
1024                         c.Post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
1025                         break
1026                 }
1027                 start := (1 << 14) * piece
1028                 c.Post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1029         case pp.RejectMetadataExtensionMsgType:
1030         default:
1031                 err = errors.New("unknown msg_type value")
1032         }
1033         return
1034 }
1035
1036 func (cl *Client) upload(t *Torrent, c *connection) {
1037         if cl.config.NoUpload {
1038                 return
1039         }
1040         if !c.PeerInterested {
1041                 return
1042         }
1043         seeding := cl.seeding(t)
1044         if !seeding && !t.connHasWantedPieces(c) {
1045                 return
1046         }
1047 another:
1048         for seeding || c.chunksSent < c.UsefulChunksReceived+6 {
1049                 c.Unchoke()
1050                 for r := range c.PeerRequests {
1051                         err := cl.sendChunk(t, c, r)
1052                         if err != nil {
1053                                 if t.pieceComplete(int(r.Index)) && err == io.ErrUnexpectedEOF {
1054                                         // We had the piece, but not anymore.
1055                                 } else {
1056                                         log.Printf("error sending chunk %+v to peer: %s", r, err)
1057                                 }
1058                                 // If we failed to send a chunk, choke the peer to ensure they
1059                                 // flush all their requests. We've probably dropped a piece,
1060                                 // but there's no way to communicate this to the peer. If they
1061                                 // ask for it again, we'll kick them to allow us to send them
1062                                 // an updated bitfield.
1063                                 break another
1064                         }
1065                         delete(c.PeerRequests, r)
1066                         goto another
1067                 }
1068                 return
1069         }
1070         c.Choke()
1071 }
1072
1073 func (cl *Client) sendChunk(t *Torrent, c *connection, r request) error {
1074         // Count the chunk being sent, even if it isn't.
1075         b := make([]byte, r.Length)
1076         p := t.info.Piece(int(r.Index))
1077         n, err := t.readAt(b, p.Offset()+int64(r.Begin))
1078         if n != len(b) {
1079                 if err == nil {
1080                         panic("expected error")
1081                 }
1082                 return err
1083         }
1084         c.Post(pp.Message{
1085                 Type:  pp.Piece,
1086                 Index: r.Index,
1087                 Begin: r.Begin,
1088                 Piece: b,
1089         })
1090         c.chunksSent++
1091         uploadChunksPosted.Add(1)
1092         c.lastChunkSent = time.Now()
1093         return nil
1094 }
1095
1096 // Processes incoming bittorrent messages. The client lock is held upon entry
1097 // and exit.
1098 func (cl *Client) connectionLoop(t *Torrent, c *connection) error {
1099         decoder := pp.Decoder{
1100                 R:         bufio.NewReader(c.rw),
1101                 MaxLength: 256 * 1024,
1102         }
1103         for {
1104                 cl.mu.Unlock()
1105                 var msg pp.Message
1106                 err := decoder.Decode(&msg)
1107                 cl.mu.Lock()
1108                 if cl.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
1109                         return nil
1110                 }
1111                 if err != nil {
1112                         return err
1113                 }
1114                 c.lastMessageReceived = time.Now()
1115                 if msg.Keepalive {
1116                         receivedKeepalives.Add(1)
1117                         continue
1118                 }
1119                 receivedMessageTypes.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
1120                 switch msg.Type {
1121                 case pp.Choke:
1122                         c.PeerChoked = true
1123                         c.Requests = nil
1124                         // We can then reset our interest.
1125                         c.updateRequests()
1126                 case pp.Reject:
1127                         cl.connDeleteRequest(t, c, newRequest(msg.Index, msg.Begin, msg.Length))
1128                         c.updateRequests()
1129                 case pp.Unchoke:
1130                         c.PeerChoked = false
1131                         cl.peerUnchoked(t, c)
1132                 case pp.Interested:
1133                         c.PeerInterested = true
1134                         cl.upload(t, c)
1135                 case pp.NotInterested:
1136                         c.PeerInterested = false
1137                         c.Choke()
1138                 case pp.Have:
1139                         err = c.peerSentHave(int(msg.Index))
1140                 case pp.Request:
1141                         if c.Choked {
1142                                 break
1143                         }
1144                         if !c.PeerInterested {
1145                                 err = errors.New("peer sent request but isn't interested")
1146                                 break
1147                         }
1148                         if !t.havePiece(msg.Index.Int()) {
1149                                 // This isn't necessarily them screwing up. We can drop pieces
1150                                 // from our storage, and can't communicate this to peers
1151                                 // except by reconnecting.
1152                                 requestsReceivedForMissingPieces.Add(1)
1153                                 err = errors.New("peer requested piece we don't have")
1154                                 break
1155                         }
1156                         if c.PeerRequests == nil {
1157                                 c.PeerRequests = make(map[request]struct{}, maxRequests)
1158                         }
1159                         c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{}
1160                         cl.upload(t, c)
1161                 case pp.Cancel:
1162                         req := newRequest(msg.Index, msg.Begin, msg.Length)
1163                         if !c.PeerCancel(req) {
1164                                 unexpectedCancels.Add(1)
1165                         }
1166                 case pp.Bitfield:
1167                         err = c.peerSentBitfield(msg.Bitfield)
1168                 case pp.HaveAll:
1169                         err = c.peerSentHaveAll()
1170                 case pp.HaveNone:
1171                         err = c.peerSentHaveNone()
1172                 case pp.Piece:
1173                         cl.downloadedChunk(t, c, &msg)
1174                 case pp.Extended:
1175                         switch msg.ExtendedID {
1176                         case pp.HandshakeExtendedID:
1177                                 // TODO: Create a bencode struct for this.
1178                                 var d map[string]interface{}
1179                                 err = bencode.Unmarshal(msg.ExtendedPayload, &d)
1180                                 if err != nil {
1181                                         err = fmt.Errorf("error decoding extended message payload: %s", err)
1182                                         break
1183                                 }
1184                                 // log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d)
1185                                 if reqq, ok := d["reqq"]; ok {
1186                                         if i, ok := reqq.(int64); ok {
1187                                                 c.PeerMaxRequests = int(i)
1188                                         }
1189                                 }
1190                                 if v, ok := d["v"]; ok {
1191                                         c.PeerClientName = v.(string)
1192                                 }
1193                                 m, ok := d["m"]
1194                                 if !ok {
1195                                         err = errors.New("handshake missing m item")
1196                                         break
1197                                 }
1198                                 mTyped, ok := m.(map[string]interface{})
1199                                 if !ok {
1200                                         err = errors.New("handshake m value is not dict")
1201                                         break
1202                                 }
1203                                 if c.PeerExtensionIDs == nil {
1204                                         c.PeerExtensionIDs = make(map[string]byte, len(mTyped))
1205                                 }
1206                                 for name, v := range mTyped {
1207                                         id, ok := v.(int64)
1208                                         if !ok {
1209                                                 log.Printf("bad handshake m item extension ID type: %T", v)
1210                                                 continue
1211                                         }
1212                                         if id == 0 {
1213                                                 delete(c.PeerExtensionIDs, name)
1214                                         } else {
1215                                                 if c.PeerExtensionIDs[name] == 0 {
1216                                                         supportedExtensionMessages.Add(name, 1)
1217                                                 }
1218                                                 c.PeerExtensionIDs[name] = byte(id)
1219                                         }
1220                                 }
1221                                 metadata_sizeUntyped, ok := d["metadata_size"]
1222                                 if ok {
1223                                         metadata_size, ok := metadata_sizeUntyped.(int64)
1224                                         if !ok {
1225                                                 log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
1226                                         } else {
1227                                                 t.setMetadataSize(metadata_size, cl)
1228                                         }
1229                                 }
1230                                 if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
1231                                         cl.requestPendingMetadata(t, c)
1232                                 }
1233                         case metadataExtendedId:
1234                                 err = cl.gotMetadataExtensionMsg(msg.ExtendedPayload, t, c)
1235                                 if err != nil {
1236                                         err = fmt.Errorf("error handling metadata extension message: %s", err)
1237                                 }
1238                         case pexExtendedId:
1239                                 if cl.config.DisablePEX {
1240                                         break
1241                                 }
1242                                 var pexMsg peerExchangeMessage
1243                                 err = bencode.Unmarshal(msg.ExtendedPayload, &pexMsg)
1244                                 if err != nil {
1245                                         err = fmt.Errorf("error unmarshalling PEX message: %s", err)
1246                                         break
1247                                 }
1248                                 go func() {
1249                                         cl.mu.Lock()
1250                                         cl.addPeers(t, func() (ret []Peer) {
1251                                                 for i, cp := range pexMsg.Added {
1252                                                         p := Peer{
1253                                                                 IP:     make([]byte, 4),
1254                                                                 Port:   cp.Port,
1255                                                                 Source: peerSourcePEX,
1256                                                         }
1257                                                         if i < len(pexMsg.AddedFlags) && pexMsg.AddedFlags[i]&0x01 != 0 {
1258                                                                 p.SupportsEncryption = true
1259                                                         }
1260                                                         missinggo.CopyExact(p.IP, cp.IP[:])
1261                                                         ret = append(ret, p)
1262                                                 }
1263                                                 return
1264                                         }())
1265                                         cl.mu.Unlock()
1266                                 }()
1267                         default:
1268                                 err = fmt.Errorf("unexpected extended message ID: %v", msg.ExtendedID)
1269                         }
1270                         if err != nil {
1271                                 // That client uses its own extension IDs for outgoing message
1272                                 // types, which is incorrect.
1273                                 if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) ||
1274                                         strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
1275                                         return nil
1276                                 }
1277                         }
1278                 case pp.Port:
1279                         if cl.dHT == nil {
1280                                 break
1281                         }
1282                         pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
1283                         if err != nil {
1284                                 panic(err)
1285                         }
1286                         if msg.Port != 0 {
1287                                 pingAddr.Port = int(msg.Port)
1288                         }
1289                         cl.dHT.Ping(pingAddr)
1290                 default:
1291                         err = fmt.Errorf("received unknown message type: %#v", msg.Type)
1292                 }
1293                 if err != nil {
1294                         return err
1295                 }
1296         }
1297 }
1298
1299 // Returns true if connection is removed from torrent.Conns.
1300 func (cl *Client) deleteConnection(t *Torrent, c *connection) bool {
1301         for i0, _c := range t.conns {
1302                 if _c != c {
1303                         continue
1304                 }
1305                 i1 := len(t.conns) - 1
1306                 if i0 != i1 {
1307                         t.conns[i0] = t.conns[i1]
1308                 }
1309                 t.conns = t.conns[:i1]
1310                 return true
1311         }
1312         return false
1313 }
1314
1315 func (cl *Client) dropConnection(t *Torrent, c *connection) {
1316         cl.event.Broadcast()
1317         c.Close()
1318         if cl.deleteConnection(t, c) {
1319                 cl.openNewConns(t)
1320         }
1321 }
1322
1323 // Returns true if the connection is added.
1324 func (cl *Client) addConnection(t *Torrent, c *connection) bool {
1325         if cl.closed.IsSet() {
1326                 return false
1327         }
1328         select {
1329         case <-t.ceasingNetworking:
1330                 return false
1331         default:
1332         }
1333         if !cl.wantConns(t) {
1334                 return false
1335         }
1336         for _, c0 := range t.conns {
1337                 if c.PeerID == c0.PeerID {
1338                         // Already connected to a client with that ID.
1339                         duplicateClientConns.Add(1)
1340                         return false
1341                 }
1342         }
1343         if len(t.conns) >= socketsPerTorrent {
1344                 c := t.worstBadConn(cl)
1345                 if c == nil {
1346                         return false
1347                 }
1348                 if cl.config.Debug && missinggo.CryHeard() {
1349                         log.Printf("%s: dropping connection to make room for new one:\n    %s", t, c)
1350                 }
1351                 c.Close()
1352                 cl.deleteConnection(t, c)
1353         }
1354         if len(t.conns) >= socketsPerTorrent {
1355                 panic(len(t.conns))
1356         }
1357         t.conns = append(t.conns, c)
1358         c.t = t
1359         return true
1360 }
1361
1362 func (cl *Client) usefulConn(t *Torrent, c *connection) bool {
1363         if c.closed.IsSet() {
1364                 return false
1365         }
1366         if !t.haveInfo() {
1367                 return c.supportsExtension("ut_metadata")
1368         }
1369         if cl.seeding(t) {
1370                 return c.PeerInterested
1371         }
1372         return t.connHasWantedPieces(c)
1373 }
1374
1375 func (cl *Client) wantConns(t *Torrent) bool {
1376         if !cl.seeding(t) && !t.needData() {
1377                 return false
1378         }
1379         if len(t.conns) < socketsPerTorrent {
1380                 return true
1381         }
1382         return t.worstBadConn(cl) != nil
1383 }
1384
1385 func (cl *Client) openNewConns(t *Torrent) {
1386         select {
1387         case <-t.ceasingNetworking:
1388                 return
1389         default:
1390         }
1391         for len(t.peers) != 0 {
1392                 if !cl.wantConns(t) {
1393                         return
1394                 }
1395                 if len(t.halfOpen) >= cl.halfOpenLimit {
1396                         return
1397                 }
1398                 var (
1399                         k peersKey
1400                         p Peer
1401                 )
1402                 for k, p = range t.peers {
1403                         break
1404                 }
1405                 delete(t.peers, k)
1406                 cl.initiateConn(p, t)
1407         }
1408         t.wantPeers.Broadcast()
1409 }
1410
1411 func (cl *Client) addPeers(t *Torrent, peers []Peer) {
1412         for _, p := range peers {
1413                 if cl.dopplegangerAddr(net.JoinHostPort(
1414                         p.IP.String(),
1415                         strconv.FormatInt(int64(p.Port), 10),
1416                 )) {
1417                         continue
1418                 }
1419                 if _, ok := cl.ipBlockRange(p.IP); ok {
1420                         continue
1421                 }
1422                 if p.Port == 0 {
1423                         // The spec says to scrub these yourselves. Fine.
1424                         continue
1425                 }
1426                 t.addPeer(p, cl)
1427         }
1428 }
1429
1430 func (cl *Client) setMetaData(t *Torrent, md *metainfo.Info, bytes []byte) (err error) {
1431         err = t.setMetadata(md, bytes)
1432         if err != nil {
1433                 return
1434         }
1435         cl.event.Broadcast()
1436         close(t.gotMetainfo)
1437         return
1438 }
1439
1440 // Prepare a Torrent without any attachment to a Client. That means we can
1441 // initialize fields all fields that don't require the Client without locking
1442 // it.
1443 func newTorrent(ih metainfo.Hash) (t *Torrent) {
1444         t = &Torrent{
1445                 infoHash:  ih,
1446                 chunkSize: defaultChunkSize,
1447                 peers:     make(map[peersKey]Peer),
1448
1449                 closing:           make(chan struct{}),
1450                 ceasingNetworking: make(chan struct{}),
1451
1452                 gotMetainfo: make(chan struct{}),
1453
1454                 halfOpen:          make(map[string]struct{}),
1455                 pieceStateChanges: pubsub.NewPubSub(),
1456         }
1457         return
1458 }
1459
1460 func init() {
1461         // For shuffling the tracker tiers.
1462         mathRand.Seed(time.Now().Unix())
1463 }
1464
1465 type trackerTier []string
1466
1467 // The trackers within each tier must be shuffled before use.
1468 // http://stackoverflow.com/a/12267471/149482
1469 // http://www.bittorrent.org/beps/bep_0012.html#order-of-processing
1470 func shuffleTier(tier trackerTier) {
1471         for i := range tier {
1472                 j := mathRand.Intn(i + 1)
1473                 tier[i], tier[j] = tier[j], tier[i]
1474         }
1475 }
1476
1477 func copyTrackers(base []trackerTier) (copy []trackerTier) {
1478         for _, tier := range base {
1479                 copy = append(copy, append(trackerTier(nil), tier...))
1480         }
1481         return
1482 }
1483
1484 func mergeTier(tier trackerTier, newURLs []string) trackerTier {
1485 nextURL:
1486         for _, url := range newURLs {
1487                 for _, trURL := range tier {
1488                         if trURL == url {
1489                                 continue nextURL
1490                         }
1491                 }
1492                 tier = append(tier, url)
1493         }
1494         return tier
1495 }
1496
1497 // A file-like handle to some torrent data resource.
1498 type Handle interface {
1499         io.Reader
1500         io.Seeker
1501         io.Closer
1502         io.ReaderAt
1503 }
1504
1505 // Specifies a new torrent for adding to a client. There are helpers for
1506 // magnet URIs and torrent metainfo files.
1507 type TorrentSpec struct {
1508         // The tiered tracker URIs.
1509         Trackers [][]string
1510         InfoHash metainfo.Hash
1511         Info     *metainfo.InfoEx
1512         // The name to use if the Name field from the Info isn't available.
1513         DisplayName string
1514         // The chunk size to use for outbound requests. Defaults to 16KiB if not
1515         // set.
1516         ChunkSize int
1517         Storage   storage.I
1518 }
1519
1520 func TorrentSpecFromMagnetURI(uri string) (spec *TorrentSpec, err error) {
1521         m, err := metainfo.ParseMagnetURI(uri)
1522         if err != nil {
1523                 return
1524         }
1525         spec = &TorrentSpec{
1526                 Trackers:    [][]string{m.Trackers},
1527                 DisplayName: m.DisplayName,
1528                 InfoHash:    m.InfoHash,
1529         }
1530         return
1531 }
1532
1533 func TorrentSpecFromMetaInfo(mi *metainfo.MetaInfo) (spec *TorrentSpec) {
1534         spec = &TorrentSpec{
1535                 Trackers:    mi.AnnounceList,
1536                 Info:        &mi.Info,
1537                 DisplayName: mi.Info.Name,
1538                 InfoHash:    mi.Info.Hash(),
1539         }
1540         if len(spec.Trackers) == 0 {
1541                 spec.Trackers = [][]string{[]string{mi.Announce}}
1542         } else {
1543                 spec.Trackers[0] = append(spec.Trackers[0], mi.Announce)
1544         }
1545         return
1546 }
1547
1548 // Add or merge a torrent spec. If the torrent is already present, the
1549 // trackers will be merged with the existing ones. If the Info isn't yet
1550 // known, it will be set. The display name is replaced if the new spec
1551 // provides one. Returns new if the torrent wasn't already in the client.
1552 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1553         cl.mu.Lock()
1554         defer cl.mu.Unlock()
1555
1556         t, ok := cl.torrents[spec.InfoHash]
1557         if !ok {
1558                 new = true
1559
1560                 // TODO: This doesn't belong in the core client, it's more of a
1561                 // helper.
1562                 if _, ok := cl.bannedTorrents[spec.InfoHash]; ok {
1563                         err = errors.New("banned torrent")
1564                         return
1565                 }
1566                 // TODO: Tidy this up?
1567                 t = newTorrent(spec.InfoHash)
1568                 t.cl = cl
1569                 t.wantPeers.L = &cl.mu
1570                 if spec.ChunkSize != 0 {
1571                         t.chunkSize = pp.Integer(spec.ChunkSize)
1572                 }
1573                 t.storageOpener = spec.Storage
1574                 if t.storageOpener == nil {
1575                         t.storageOpener = cl.defaultStorage
1576                 }
1577         }
1578         if spec.DisplayName != "" {
1579                 t.setDisplayName(spec.DisplayName)
1580         }
1581         // Try to merge in info we have on the torrent. Any err left will
1582         // terminate the function.
1583         if t.info == nil && spec.Info != nil {
1584                 err = cl.setMetaData(t, &spec.Info.Info, spec.Info.Bytes)
1585         }
1586         if err != nil {
1587                 return
1588         }
1589         t.addTrackers(spec.Trackers)
1590
1591         cl.torrents[spec.InfoHash] = t
1592         t.maybeNewConns()
1593
1594         // From this point onwards, we can consider the torrent a part of the
1595         // client.
1596         if new {
1597                 if !cl.config.DisableTrackers {
1598                         go cl.announceTorrentTrackers(t)
1599                 }
1600                 if cl.dHT != nil {
1601                         go cl.announceTorrentDHT(t, true)
1602                 }
1603         }
1604         return
1605 }
1606
1607 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1608         t, ok := cl.torrents[infoHash]
1609         if !ok {
1610                 err = fmt.Errorf("no such torrent")
1611                 return
1612         }
1613         err = t.close()
1614         if err != nil {
1615                 panic(err)
1616         }
1617         delete(cl.torrents, infoHash)
1618         return
1619 }
1620
1621 // Returns true when peers are required, or false if the torrent is closing.
1622 func (cl *Client) waitWantPeers(t *Torrent) bool {
1623         cl.mu.Lock()
1624         defer cl.mu.Unlock()
1625         for {
1626                 select {
1627                 case <-t.ceasingNetworking:
1628                         return false
1629                 default:
1630                 }
1631                 if len(t.peers) > torrentPeersLowWater {
1632                         goto wait
1633                 }
1634                 if t.needData() || cl.seeding(t) {
1635                         return true
1636                 }
1637         wait:
1638                 t.wantPeers.Wait()
1639         }
1640 }
1641
1642 // Returns whether the client should make effort to seed the torrent.
1643 func (cl *Client) seeding(t *Torrent) bool {
1644         if cl.config.NoUpload {
1645                 return false
1646         }
1647         if !cl.config.Seed {
1648                 return false
1649         }
1650         if t.needData() {
1651                 return false
1652         }
1653         return true
1654 }
1655
1656 func (cl *Client) announceTorrentDHT(t *Torrent, impliedPort bool) {
1657         for cl.waitWantPeers(t) {
1658                 // log.Printf("getting peers for %q from DHT", t)
1659                 ps, err := cl.dHT.Announce(string(t.infoHash[:]), cl.incomingPeerPort(), impliedPort)
1660                 if err != nil {
1661                         log.Printf("error getting peers from dht: %s", err)
1662                         return
1663                 }
1664                 // Count all the unique addresses we got during this announce.
1665                 allAddrs := make(map[string]struct{})
1666         getPeers:
1667                 for {
1668                         select {
1669                         case v, ok := <-ps.Peers:
1670                                 if !ok {
1671                                         break getPeers
1672                                 }
1673                                 addPeers := make([]Peer, 0, len(v.Peers))
1674                                 for _, cp := range v.Peers {
1675                                         if cp.Port == 0 {
1676                                                 // Can't do anything with this.
1677                                                 continue
1678                                         }
1679                                         addPeers = append(addPeers, Peer{
1680                                                 IP:     cp.IP[:],
1681                                                 Port:   cp.Port,
1682                                                 Source: peerSourceDHT,
1683                                         })
1684                                         key := (&net.UDPAddr{
1685                                                 IP:   cp.IP[:],
1686                                                 Port: cp.Port,
1687                                         }).String()
1688                                         allAddrs[key] = struct{}{}
1689                                 }
1690                                 cl.mu.Lock()
1691                                 cl.addPeers(t, addPeers)
1692                                 numPeers := len(t.peers)
1693                                 cl.mu.Unlock()
1694                                 if numPeers >= torrentPeersHighWater {
1695                                         break getPeers
1696                                 }
1697                         case <-t.ceasingNetworking:
1698                                 ps.Close()
1699                                 return
1700                         }
1701                 }
1702                 ps.Close()
1703                 // log.Printf("finished DHT peer scrape for %s: %d peers", t, len(allAddrs))
1704         }
1705 }
1706
1707 func (cl *Client) trackerBlockedUnlocked(trRawURL string) (blocked bool, err error) {
1708         url_, err := url.Parse(trRawURL)
1709         if err != nil {
1710                 return
1711         }
1712         host, _, err := net.SplitHostPort(url_.Host)
1713         if err != nil {
1714                 host = url_.Host
1715         }
1716         addr, err := net.ResolveIPAddr("ip", host)
1717         if err != nil {
1718                 return
1719         }
1720         cl.mu.RLock()
1721         _, blocked = cl.ipBlockRange(addr.IP)
1722         cl.mu.RUnlock()
1723         return
1724 }
1725
1726 func (cl *Client) announceTorrentSingleTracker(tr string, req *tracker.AnnounceRequest, t *Torrent) error {
1727         blocked, err := cl.trackerBlockedUnlocked(tr)
1728         if err != nil {
1729                 return fmt.Errorf("error determining if tracker blocked: %s", err)
1730         }
1731         if blocked {
1732                 return fmt.Errorf("tracker blocked: %s", tr)
1733         }
1734         resp, err := tracker.Announce(tr, req)
1735         if err != nil {
1736                 return fmt.Errorf("error announcing: %s", err)
1737         }
1738         var peers []Peer
1739         for _, peer := range resp.Peers {
1740                 peers = append(peers, Peer{
1741                         IP:   peer.IP,
1742                         Port: peer.Port,
1743                 })
1744         }
1745         cl.mu.Lock()
1746         cl.addPeers(t, peers)
1747         cl.mu.Unlock()
1748
1749         // log.Printf("%s: %d new peers from %s", t, len(peers), tr)
1750
1751         time.Sleep(time.Second * time.Duration(resp.Interval))
1752         return nil
1753 }
1754
1755 func (cl *Client) announceTorrentTrackersFastStart(req *tracker.AnnounceRequest, trackers []trackerTier, t *Torrent) (atLeastOne bool) {
1756         oks := make(chan bool)
1757         outstanding := 0
1758         for _, tier := range trackers {
1759                 for _, tr := range tier {
1760                         outstanding++
1761                         go func(tr string) {
1762                                 err := cl.announceTorrentSingleTracker(tr, req, t)
1763                                 oks <- err == nil
1764                         }(tr)
1765                 }
1766         }
1767         for outstanding > 0 {
1768                 ok := <-oks
1769                 outstanding--
1770                 if ok {
1771                         atLeastOne = true
1772                 }
1773         }
1774         return
1775 }
1776
1777 // Announce torrent to its trackers.
1778 func (cl *Client) announceTorrentTrackers(t *Torrent) {
1779         req := tracker.AnnounceRequest{
1780                 Event:    tracker.Started,
1781                 NumWant:  -1,
1782                 Port:     uint16(cl.incomingPeerPort()),
1783                 PeerId:   cl.peerID,
1784                 InfoHash: t.infoHash,
1785         }
1786         if !cl.waitWantPeers(t) {
1787                 return
1788         }
1789         cl.mu.RLock()
1790         req.Left = t.bytesLeftAnnounce()
1791         trackers := t.trackers
1792         cl.mu.RUnlock()
1793         if cl.announceTorrentTrackersFastStart(&req, trackers, t) {
1794                 req.Event = tracker.None
1795         }
1796 newAnnounce:
1797         for cl.waitWantPeers(t) {
1798                 cl.mu.RLock()
1799                 req.Left = t.bytesLeftAnnounce()
1800                 trackers = t.trackers
1801                 cl.mu.RUnlock()
1802                 numTrackersTried := 0
1803                 for _, tier := range trackers {
1804                         for trIndex, tr := range tier {
1805                                 numTrackersTried++
1806                                 err := cl.announceTorrentSingleTracker(tr, &req, t)
1807                                 if err != nil {
1808                                         continue
1809                                 }
1810                                 // Float the successful announce to the top of the tier. If
1811                                 // the trackers list has been changed, we'll be modifying an
1812                                 // old copy so it won't matter.
1813                                 cl.mu.Lock()
1814                                 tier[0], tier[trIndex] = tier[trIndex], tier[0]
1815                                 cl.mu.Unlock()
1816
1817                                 req.Event = tracker.None
1818                                 continue newAnnounce
1819                         }
1820                 }
1821                 if numTrackersTried != 0 {
1822                         log.Printf("%s: all trackers failed", t)
1823                 }
1824                 // TODO: Wait until trackers are added if there are none.
1825                 time.Sleep(10 * time.Second)
1826         }
1827 }
1828
1829 func (cl *Client) allTorrentsCompleted() bool {
1830         for _, t := range cl.torrents {
1831                 if !t.haveInfo() {
1832                         return false
1833                 }
1834                 if t.numPiecesCompleted() != t.numPieces() {
1835                         return false
1836                 }
1837         }
1838         return true
1839 }
1840
1841 // Returns true when all torrents are completely downloaded and false if the
1842 // client is stopped before that.
1843 func (cl *Client) WaitAll() bool {
1844         cl.mu.Lock()
1845         defer cl.mu.Unlock()
1846         for !cl.allTorrentsCompleted() {
1847                 if cl.closed.IsSet() {
1848                         return false
1849                 }
1850                 cl.event.Wait()
1851         }
1852         return true
1853 }
1854
1855 // Handle a received chunk from a peer.
1856 func (cl *Client) downloadedChunk(t *Torrent, c *connection, msg *pp.Message) {
1857         chunksReceived.Add(1)
1858
1859         req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
1860
1861         // Request has been satisfied.
1862         if cl.connDeleteRequest(t, c, req) {
1863                 defer c.updateRequests()
1864         } else {
1865                 unexpectedChunksReceived.Add(1)
1866         }
1867
1868         index := int(req.Index)
1869         piece := &t.pieces[index]
1870
1871         // Do we actually want this chunk?
1872         if !t.wantPiece(req) {
1873                 unwantedChunksReceived.Add(1)
1874                 c.UnwantedChunksReceived++
1875                 return
1876         }
1877
1878         c.UsefulChunksReceived++
1879         c.lastUsefulChunkReceived = time.Now()
1880
1881         cl.upload(t, c)
1882
1883         // Need to record that it hasn't been written yet, before we attempt to do
1884         // anything with it.
1885         piece.incrementPendingWrites()
1886         // Record that we have the chunk.
1887         piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
1888
1889         // Cancel pending requests for this chunk.
1890         for _, c := range t.conns {
1891                 if cl.connCancel(t, c, req) {
1892                         c.updateRequests()
1893                 }
1894         }
1895
1896         cl.mu.Unlock()
1897         // Write the chunk out.
1898         err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
1899         cl.mu.Lock()
1900
1901         piece.decrementPendingWrites()
1902
1903         if err != nil {
1904                 log.Printf("%s: error writing chunk %v: %s", t, req, err)
1905                 t.pendRequest(req)
1906                 t.updatePieceCompletion(int(msg.Index))
1907                 return
1908         }
1909
1910         // It's important that the piece is potentially queued before we check if
1911         // the piece is still wanted, because if it is queued, it won't be wanted.
1912         if t.pieceAllDirty(index) {
1913                 cl.queuePieceCheck(t, int(req.Index))
1914         }
1915
1916         if c.peerTouchedPieces == nil {
1917                 c.peerTouchedPieces = make(map[int]struct{})
1918         }
1919         c.peerTouchedPieces[index] = struct{}{}
1920
1921         cl.event.Broadcast()
1922         t.publishPieceChange(int(req.Index))
1923         return
1924 }
1925
1926 // Return the connections that touched a piece, and clear the entry while
1927 // doing it.
1928 func (cl *Client) reapPieceTouches(t *Torrent, piece int) (ret []*connection) {
1929         for _, c := range t.conns {
1930                 if _, ok := c.peerTouchedPieces[piece]; ok {
1931                         ret = append(ret, c)
1932                         delete(c.peerTouchedPieces, piece)
1933                 }
1934         }
1935         return
1936 }
1937
1938 func (cl *Client) pieceHashed(t *Torrent, piece int, correct bool) {
1939         p := &t.pieces[piece]
1940         if p.EverHashed {
1941                 // Don't score the first time a piece is hashed, it could be an
1942                 // initial check.
1943                 if correct {
1944                         pieceHashedCorrect.Add(1)
1945                 } else {
1946                         log.Printf("%s: piece %d (%x) failed hash", t, piece, p.Hash)
1947                         pieceHashedNotCorrect.Add(1)
1948                 }
1949         }
1950         p.EverHashed = true
1951         touchers := cl.reapPieceTouches(t, piece)
1952         if correct {
1953                 err := p.Storage().MarkComplete()
1954                 if err != nil {
1955                         log.Printf("%T: error completing piece %d: %s", t.storage, piece, err)
1956                 }
1957                 t.updatePieceCompletion(piece)
1958         } else if len(touchers) != 0 {
1959                 log.Printf("dropping %d conns that touched piece", len(touchers))
1960                 for _, c := range touchers {
1961                         cl.dropConnection(t, c)
1962                 }
1963         }
1964         cl.pieceChanged(t, piece)
1965 }
1966
1967 func (cl *Client) onCompletedPiece(t *Torrent, piece int) {
1968         t.pendingPieces.Remove(piece)
1969         t.pendAllChunkSpecs(piece)
1970         for _, conn := range t.conns {
1971                 conn.Have(piece)
1972                 for r := range conn.Requests {
1973                         if int(r.Index) == piece {
1974                                 conn.Cancel(r)
1975                         }
1976                 }
1977                 // Could check here if peer doesn't have piece, but due to caching
1978                 // some peers may have said they have a piece but they don't.
1979                 cl.upload(t, conn)
1980         }
1981 }
1982
1983 func (cl *Client) onFailedPiece(t *Torrent, piece int) {
1984         if t.pieceAllDirty(piece) {
1985                 t.pendAllChunkSpecs(piece)
1986         }
1987         if !t.wantPieceIndex(piece) {
1988                 return
1989         }
1990         cl.openNewConns(t)
1991         for _, conn := range t.conns {
1992                 if conn.PeerHasPiece(piece) {
1993                         conn.updateRequests()
1994                 }
1995         }
1996 }
1997
1998 func (cl *Client) pieceChanged(t *Torrent, piece int) {
1999         correct := t.pieceComplete(piece)
2000         defer cl.event.Broadcast()
2001         if correct {
2002                 cl.onCompletedPiece(t, piece)
2003         } else {
2004                 cl.onFailedPiece(t, piece)
2005         }
2006         if t.updatePiecePriority(piece) {
2007                 t.piecePriorityChanged(piece)
2008         }
2009         t.publishPieceChange(piece)
2010 }
2011
2012 func (cl *Client) verifyPiece(t *Torrent, piece int) {
2013         cl.mu.Lock()
2014         defer cl.mu.Unlock()
2015         p := &t.pieces[piece]
2016         for p.Hashing || t.storage == nil {
2017                 cl.event.Wait()
2018         }
2019         p.QueuedForHash = false
2020         if t.isClosed() || t.pieceComplete(piece) {
2021                 t.updatePiecePriority(piece)
2022                 t.publishPieceChange(piece)
2023                 return
2024         }
2025         p.Hashing = true
2026         t.publishPieceChange(piece)
2027         cl.mu.Unlock()
2028         sum := t.hashPiece(piece)
2029         cl.mu.Lock()
2030         select {
2031         case <-t.closing:
2032                 return
2033         default:
2034         }
2035         p.Hashing = false
2036         cl.pieceHashed(t, piece, sum == p.Hash)
2037 }
2038
2039 // Returns handles to all the torrents loaded in the Client.
2040 func (cl *Client) Torrents() (ret []*Torrent) {
2041         cl.mu.Lock()
2042         for _, t := range cl.torrents {
2043                 ret = append(ret, t)
2044         }
2045         cl.mu.Unlock()
2046         return
2047 }
2048
2049 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
2050         spec, err := TorrentSpecFromMagnetURI(uri)
2051         if err != nil {
2052                 return
2053         }
2054         T, _, err = cl.AddTorrentSpec(spec)
2055         return
2056 }
2057
2058 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
2059         T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
2060         var ss []string
2061         missinggo.CastSlice(&ss, mi.Nodes)
2062         cl.AddDHTNodes(ss)
2063         return
2064 }
2065
2066 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
2067         mi, err := metainfo.LoadFromFile(filename)
2068         if err != nil {
2069                 return
2070         }
2071         return cl.AddTorrent(mi)
2072 }
2073
2074 func (cl *Client) DHT() *dht.Server {
2075         return cl.dHT
2076 }
2077
2078 func (cl *Client) AddDHTNodes(nodes []string) {
2079         for _, n := range nodes {
2080                 hmp := missinggo.SplitHostMaybePort(n)
2081                 ip := net.ParseIP(hmp.Host)
2082                 if ip == nil {
2083                         log.Printf("won't add DHT node with bad IP: %q", hmp.Host)
2084                         continue
2085                 }
2086                 ni := dht.NodeInfo{
2087                         Addr: dht.NewAddr(&net.UDPAddr{
2088                                 IP:   ip,
2089                                 Port: hmp.Port,
2090                         }),
2091                 }
2092                 cl.DHT().AddNode(ni)
2093         }
2094 }