]> Sergey Matveev's repositories - btrtrc.git/blob - client.go
Rework setting of info bytes
[btrtrc.git] / client.go
1 package torrent
2
3 import (
4         "bufio"
5         "bytes"
6         "crypto/rand"
7         "encoding/hex"
8         "errors"
9         "fmt"
10         "io"
11         "log"
12         "math/big"
13         mathRand "math/rand"
14         "net"
15         "net/url"
16         "sort"
17         "strconv"
18         "strings"
19         "time"
20
21         "github.com/anacrolix/missinggo"
22         "github.com/anacrolix/missinggo/pproffd"
23         "github.com/anacrolix/missinggo/pubsub"
24         "github.com/anacrolix/sync"
25         "github.com/anacrolix/utp"
26
27         "github.com/anacrolix/torrent/bencode"
28         "github.com/anacrolix/torrent/dht"
29         "github.com/anacrolix/torrent/iplist"
30         "github.com/anacrolix/torrent/metainfo"
31         "github.com/anacrolix/torrent/mse"
32         pp "github.com/anacrolix/torrent/peer_protocol"
33         "github.com/anacrolix/torrent/storage"
34         "github.com/anacrolix/torrent/tracker"
35 )
36
37 // Currently doesn't really queue, but should in the future.
38 func (cl *Client) queuePieceCheck(t *Torrent, pieceIndex int) {
39         piece := &t.pieces[pieceIndex]
40         if piece.QueuedForHash {
41                 return
42         }
43         piece.QueuedForHash = true
44         t.publishPieceChange(pieceIndex)
45         go cl.verifyPiece(t, pieceIndex)
46 }
47
48 // Queue a piece check if one isn't already queued, and the piece has never
49 // been checked before.
50 func (cl *Client) queueFirstHash(t *Torrent, piece int) {
51         p := &t.pieces[piece]
52         if p.EverHashed || p.Hashing || p.QueuedForHash || t.pieceComplete(piece) {
53                 return
54         }
55         cl.queuePieceCheck(t, piece)
56 }
57
58 // Clients contain zero or more Torrents. A Client manages a blocklist, the
59 // TCP/UDP protocol ports, and DHT as desired.
60 type Client struct {
61         halfOpenLimit  int
62         peerID         [20]byte
63         listeners      []net.Listener
64         utpSock        *utp.Socket
65         dHT            *dht.Server
66         ipBlockList    iplist.Ranger
67         config         Config
68         pruneTimer     *time.Timer
69         extensionBytes peerExtensionBytes
70         // Set of addresses that have our client ID. This intentionally will
71         // include ourselves if we end up trying to connect to our own address
72         // through legitimate channels.
73         dopplegangerAddrs map[string]struct{}
74
75         defaultStorage storage.I
76
77         mu     sync.RWMutex
78         event  sync.Cond
79         closed missinggo.Event
80
81         torrents map[metainfo.Hash]*Torrent
82 }
83
84 func (cl *Client) IPBlockList() iplist.Ranger {
85         cl.mu.Lock()
86         defer cl.mu.Unlock()
87         return cl.ipBlockList
88 }
89
90 func (cl *Client) SetIPBlockList(list iplist.Ranger) {
91         cl.mu.Lock()
92         defer cl.mu.Unlock()
93         cl.ipBlockList = list
94         if cl.dHT != nil {
95                 cl.dHT.SetIPBlockList(list)
96         }
97 }
98
99 func (cl *Client) PeerID() string {
100         return string(cl.peerID[:])
101 }
102
103 func (cl *Client) ListenAddr() (addr net.Addr) {
104         for _, l := range cl.listeners {
105                 addr = l.Addr()
106                 break
107         }
108         return
109 }
110
111 type hashSorter struct {
112         Hashes []metainfo.Hash
113 }
114
115 func (hs hashSorter) Len() int {
116         return len(hs.Hashes)
117 }
118
119 func (hs hashSorter) Less(a, b int) bool {
120         return (&big.Int{}).SetBytes(hs.Hashes[a][:]).Cmp((&big.Int{}).SetBytes(hs.Hashes[b][:])) < 0
121 }
122
123 func (hs hashSorter) Swap(a, b int) {
124         hs.Hashes[a], hs.Hashes[b] = hs.Hashes[b], hs.Hashes[a]
125 }
126
127 func (cl *Client) sortedTorrents() (ret []*Torrent) {
128         var hs hashSorter
129         for ih := range cl.torrents {
130                 hs.Hashes = append(hs.Hashes, ih)
131         }
132         sort.Sort(hs)
133         for _, ih := range hs.Hashes {
134                 ret = append(ret, cl.torrent(ih))
135         }
136         return
137 }
138
139 // Writes out a human readable status of the client, such as for writing to a
140 // HTTP status page.
141 func (cl *Client) WriteStatus(_w io.Writer) {
142         cl.mu.RLock()
143         defer cl.mu.RUnlock()
144         w := bufio.NewWriter(_w)
145         defer w.Flush()
146         if addr := cl.ListenAddr(); addr != nil {
147                 fmt.Fprintf(w, "Listening on %s\n", cl.ListenAddr())
148         } else {
149                 fmt.Fprintln(w, "Not listening!")
150         }
151         fmt.Fprintf(w, "Peer ID: %+q\n", cl.peerID)
152         if cl.dHT != nil {
153                 dhtStats := cl.dHT.Stats()
154                 fmt.Fprintf(w, "DHT nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
155                 fmt.Fprintf(w, "DHT Server ID: %x\n", cl.dHT.ID())
156                 fmt.Fprintf(w, "DHT port: %d\n", missinggo.AddrPort(cl.dHT.Addr()))
157                 fmt.Fprintf(w, "DHT announces: %d\n", dhtStats.ConfirmedAnnounces)
158                 fmt.Fprintf(w, "Outstanding transactions: %d\n", dhtStats.OutstandingTransactions)
159         }
160         fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrents))
161         fmt.Fprintln(w)
162         for _, t := range cl.sortedTorrents() {
163                 if t.name() == "" {
164                         fmt.Fprint(w, "<unknown name>")
165                 } else {
166                         fmt.Fprint(w, t.name())
167                 }
168                 fmt.Fprint(w, "\n")
169                 if t.haveInfo() {
170                         fmt.Fprintf(w, "%f%% of %d bytes", 100*(1-float64(t.bytesLeft())/float64(t.length)), t.length)
171                 } else {
172                         w.WriteString("<missing metainfo>")
173                 }
174                 fmt.Fprint(w, "\n")
175                 t.writeStatus(w, cl)
176                 fmt.Fprintln(w)
177         }
178 }
179
180 // Creates a new client.
181 func NewClient(cfg *Config) (cl *Client, err error) {
182         if cfg == nil {
183                 cfg = &Config{}
184         }
185
186         defer func() {
187                 if err != nil {
188                         cl = nil
189                 }
190         }()
191         cl = &Client{
192                 halfOpenLimit:     socketsPerTorrent,
193                 config:            *cfg,
194                 defaultStorage:    cfg.DefaultStorage,
195                 dopplegangerAddrs: make(map[string]struct{}),
196                 torrents:          make(map[metainfo.Hash]*Torrent),
197         }
198         missinggo.CopyExact(&cl.extensionBytes, defaultExtensionBytes)
199         cl.event.L = &cl.mu
200         if cl.defaultStorage == nil {
201                 cl.defaultStorage = storage.NewFile(cfg.DataDir)
202         }
203         if cfg.IPBlocklist != nil {
204                 cl.ipBlockList = cfg.IPBlocklist
205         }
206
207         if cfg.PeerID != "" {
208                 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
209         } else {
210                 o := copy(cl.peerID[:], bep20)
211                 _, err = rand.Read(cl.peerID[o:])
212                 if err != nil {
213                         panic("error generating peer id")
214                 }
215         }
216
217         // Returns the laddr string to listen on for the next Listen call.
218         listenAddr := func() string {
219                 if addr := cl.ListenAddr(); addr != nil {
220                         return addr.String()
221                 }
222                 if cfg.ListenAddr == "" {
223                         return ":50007"
224                 }
225                 return cfg.ListenAddr
226         }
227         if !cl.config.DisableTCP {
228                 var l net.Listener
229                 l, err = net.Listen(func() string {
230                         if cl.config.DisableIPv6 {
231                                 return "tcp4"
232                         } else {
233                                 return "tcp"
234                         }
235                 }(), listenAddr())
236                 if err != nil {
237                         return
238                 }
239                 cl.listeners = append(cl.listeners, l)
240                 go cl.acceptConnections(l, false)
241         }
242         if !cl.config.DisableUTP {
243                 cl.utpSock, err = utp.NewSocket(func() string {
244                         if cl.config.DisableIPv6 {
245                                 return "udp4"
246                         } else {
247                                 return "udp"
248                         }
249                 }(), listenAddr())
250                 if err != nil {
251                         return
252                 }
253                 cl.listeners = append(cl.listeners, cl.utpSock)
254                 go cl.acceptConnections(cl.utpSock, true)
255         }
256         if !cfg.NoDHT {
257                 dhtCfg := cfg.DHTConfig
258                 if dhtCfg.IPBlocklist == nil {
259                         dhtCfg.IPBlocklist = cl.ipBlockList
260                 }
261                 if dhtCfg.Addr == "" {
262                         dhtCfg.Addr = listenAddr()
263                 }
264                 if dhtCfg.Conn == nil && cl.utpSock != nil {
265                         dhtCfg.Conn = cl.utpSock
266                 }
267                 cl.dHT, err = dht.NewServer(&dhtCfg)
268                 if err != nil {
269                         return
270                 }
271         }
272
273         return
274 }
275
276 // Stops the client. All connections to peers are closed and all activity will
277 // come to a halt.
278 func (cl *Client) Close() {
279         cl.mu.Lock()
280         defer cl.mu.Unlock()
281         cl.closed.Set()
282         if cl.dHT != nil {
283                 cl.dHT.Close()
284         }
285         for _, l := range cl.listeners {
286                 l.Close()
287         }
288         for _, t := range cl.torrents {
289                 t.close()
290         }
291         cl.event.Broadcast()
292 }
293
294 var ipv6BlockRange = iplist.Range{Description: "non-IPv4 address"}
295
296 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
297         if cl.ipBlockList == nil {
298                 return
299         }
300         ip4 := ip.To4()
301         // If blocklists are enabled, then block non-IPv4 addresses, because
302         // blocklists do not yet support IPv6.
303         if ip4 == nil {
304                 if missinggo.CryHeard() {
305                         log.Printf("blocking non-IPv4 address: %s", ip)
306                 }
307                 r = ipv6BlockRange
308                 blocked = true
309                 return
310         }
311         return cl.ipBlockList.Lookup(ip4)
312 }
313
314 func (cl *Client) waitAccept() {
315         cl.mu.Lock()
316         defer cl.mu.Unlock()
317         for {
318                 for _, t := range cl.torrents {
319                         if cl.wantConns(t) {
320                                 return
321                         }
322                 }
323                 if cl.closed.IsSet() {
324                         return
325                 }
326                 cl.event.Wait()
327         }
328 }
329
330 func (cl *Client) acceptConnections(l net.Listener, utp bool) {
331         for {
332                 cl.waitAccept()
333                 conn, err := l.Accept()
334                 conn = pproffd.WrapNetConn(conn)
335                 if cl.closed.IsSet() {
336                         if conn != nil {
337                                 conn.Close()
338                         }
339                         return
340                 }
341                 if err != nil {
342                         log.Print(err)
343                         // I think something harsher should happen here? Our accept
344                         // routine just fucked off.
345                         return
346                 }
347                 if utp {
348                         acceptUTP.Add(1)
349                 } else {
350                         acceptTCP.Add(1)
351                 }
352                 cl.mu.RLock()
353                 doppleganger := cl.dopplegangerAddr(conn.RemoteAddr().String())
354                 _, blocked := cl.ipBlockRange(missinggo.AddrIP(conn.RemoteAddr()))
355                 cl.mu.RUnlock()
356                 if blocked || doppleganger {
357                         acceptReject.Add(1)
358                         // log.Printf("inbound connection from %s blocked by %s", conn.RemoteAddr(), blockRange)
359                         conn.Close()
360                         continue
361                 }
362                 go cl.incomingConnection(conn, utp)
363         }
364 }
365
366 func (cl *Client) incomingConnection(nc net.Conn, utp bool) {
367         defer nc.Close()
368         if tc, ok := nc.(*net.TCPConn); ok {
369                 tc.SetLinger(0)
370         }
371         c := cl.newConnection(nc)
372         c.Discovery = peerSourceIncoming
373         c.uTP = utp
374         err := cl.runReceivedConn(c)
375         if err != nil {
376                 // log.Print(err)
377         }
378 }
379
380 // Returns a handle to the given torrent, if it's present in the client.
381 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
382         cl.mu.Lock()
383         defer cl.mu.Unlock()
384         t, ok = cl.torrents[ih]
385         return
386 }
387
388 func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
389         return cl.torrents[ih]
390 }
391
392 type dialResult struct {
393         Conn net.Conn
394         UTP  bool
395 }
396
397 func doDial(dial func(addr string, t *Torrent) (net.Conn, error), ch chan dialResult, utp bool, addr string, t *Torrent) {
398         conn, err := dial(addr, t)
399         if err != nil {
400                 if conn != nil {
401                         conn.Close()
402                 }
403                 conn = nil // Pedantic
404         }
405         ch <- dialResult{conn, utp}
406         if err == nil {
407                 successfulDials.Add(1)
408                 return
409         }
410         unsuccessfulDials.Add(1)
411 }
412
413 func reducedDialTimeout(max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
414         ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
415         if ret < minDialTimeout {
416                 ret = minDialTimeout
417         }
418         return
419 }
420
421 // Returns whether an address is known to connect to a client with our own ID.
422 func (cl *Client) dopplegangerAddr(addr string) bool {
423         _, ok := cl.dopplegangerAddrs[addr]
424         return ok
425 }
426
427 // Start the process of connecting to the given peer for the given torrent if
428 // appropriate.
429 func (cl *Client) initiateConn(peer Peer, t *Torrent) {
430         if peer.Id == cl.peerID {
431                 return
432         }
433         addr := net.JoinHostPort(peer.IP.String(), fmt.Sprintf("%d", peer.Port))
434         if cl.dopplegangerAddr(addr) || t.addrActive(addr) {
435                 duplicateConnsAvoided.Add(1)
436                 return
437         }
438         if r, ok := cl.ipBlockRange(peer.IP); ok {
439                 log.Printf("outbound connect to %s blocked by IP blocklist rule %s", peer.IP, r)
440                 return
441         }
442         t.halfOpen[addr] = struct{}{}
443         go cl.outgoingConnection(t, addr, peer.Source)
444 }
445
446 func (cl *Client) dialTimeout(t *Torrent) time.Duration {
447         cl.mu.Lock()
448         pendingPeers := len(t.peers)
449         cl.mu.Unlock()
450         return reducedDialTimeout(nominalDialTimeout, cl.halfOpenLimit, pendingPeers)
451 }
452
453 func (cl *Client) dialTCP(addr string, t *Torrent) (c net.Conn, err error) {
454         c, err = net.DialTimeout("tcp", addr, cl.dialTimeout(t))
455         if err == nil {
456                 c.(*net.TCPConn).SetLinger(0)
457         }
458         return
459 }
460
461 func (cl *Client) dialUTP(addr string, t *Torrent) (c net.Conn, err error) {
462         return cl.utpSock.DialTimeout(addr, cl.dialTimeout(t))
463 }
464
465 // Returns a connection over UTP or TCP, whichever is first to connect.
466 func (cl *Client) dialFirst(addr string, t *Torrent) (conn net.Conn, utp bool) {
467         // Initiate connections via TCP and UTP simultaneously. Use the first one
468         // that succeeds.
469         left := 0
470         if !cl.config.DisableUTP {
471                 left++
472         }
473         if !cl.config.DisableTCP {
474                 left++
475         }
476         resCh := make(chan dialResult, left)
477         if !cl.config.DisableUTP {
478                 go doDial(cl.dialUTP, resCh, true, addr, t)
479         }
480         if !cl.config.DisableTCP {
481                 go doDial(cl.dialTCP, resCh, false, addr, t)
482         }
483         var res dialResult
484         // Wait for a successful connection.
485         for ; left > 0 && res.Conn == nil; left-- {
486                 res = <-resCh
487         }
488         if left > 0 {
489                 // There are still incompleted dials.
490                 go func() {
491                         for ; left > 0; left-- {
492                                 conn := (<-resCh).Conn
493                                 if conn != nil {
494                                         conn.Close()
495                                 }
496                         }
497                 }()
498         }
499         conn = res.Conn
500         utp = res.UTP
501         return
502 }
503
504 func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
505         if _, ok := t.halfOpen[addr]; !ok {
506                 panic("invariant broken")
507         }
508         delete(t.halfOpen, addr)
509         cl.openNewConns(t)
510 }
511
512 // Performs initiator handshakes and returns a connection. Returns nil
513 // *connection if no connection for valid reasons.
514 func (cl *Client) handshakesConnection(nc net.Conn, t *Torrent, encrypted, utp bool) (c *connection, err error) {
515         c = cl.newConnection(nc)
516         c.encrypted = encrypted
517         c.uTP = utp
518         err = nc.SetDeadline(time.Now().Add(handshakesTimeout))
519         if err != nil {
520                 return
521         }
522         ok, err := cl.initiateHandshakes(c, t)
523         if !ok {
524                 c = nil
525         }
526         return
527 }
528
529 // Returns nil connection and nil error if no connection could be established
530 // for valid reasons.
531 func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) {
532         nc, utp := cl.dialFirst(addr, t)
533         if nc == nil {
534                 return
535         }
536         c, err = cl.handshakesConnection(nc, t, !cl.config.DisableEncryption, utp)
537         if err != nil {
538                 nc.Close()
539                 return
540         } else if c != nil {
541                 return
542         }
543         nc.Close()
544         if cl.config.DisableEncryption {
545                 // We already tried without encryption.
546                 return
547         }
548         // Try again without encryption, using whichever protocol type worked last
549         // time.
550         if utp {
551                 nc, err = cl.dialUTP(addr, t)
552         } else {
553                 nc, err = cl.dialTCP(addr, t)
554         }
555         if err != nil {
556                 err = fmt.Errorf("error dialing for unencrypted connection: %s", err)
557                 return
558         }
559         c, err = cl.handshakesConnection(nc, t, false, utp)
560         if err != nil || c == nil {
561                 nc.Close()
562         }
563         return
564 }
565
566 // Called to dial out and run a connection. The addr we're given is already
567 // considered half-open.
568 func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
569         c, err := cl.establishOutgoingConn(t, addr)
570         cl.mu.Lock()
571         defer cl.mu.Unlock()
572         // Don't release lock between here and addConnection, unless it's for
573         // failure.
574         cl.noLongerHalfOpen(t, addr)
575         if err != nil {
576                 if cl.config.Debug {
577                         log.Printf("error establishing outgoing connection: %s", err)
578                 }
579                 return
580         }
581         if c == nil {
582                 return
583         }
584         defer c.Close()
585         c.Discovery = ps
586         err = cl.runInitiatedHandshookConn(c, t)
587         if err != nil {
588                 if cl.config.Debug {
589                         log.Printf("error in established outgoing connection: %s", err)
590                 }
591         }
592 }
593
594 // The port number for incoming peer connections. 0 if the client isn't
595 // listening.
596 func (cl *Client) incomingPeerPort() int {
597         listenAddr := cl.ListenAddr()
598         if listenAddr == nil {
599                 return 0
600         }
601         return missinggo.AddrPort(listenAddr)
602 }
603
604 // Convert a net.Addr to its compact IP representation. Either 4 or 16 bytes
605 // per "yourip" field of http://www.bittorrent.org/beps/bep_0010.html.
606 func addrCompactIP(addr net.Addr) (string, error) {
607         host, _, err := net.SplitHostPort(addr.String())
608         if err != nil {
609                 return "", err
610         }
611         ip := net.ParseIP(host)
612         if v4 := ip.To4(); v4 != nil {
613                 if len(v4) != 4 {
614                         panic(v4)
615                 }
616                 return string(v4), nil
617         }
618         return string(ip.To16()), nil
619 }
620
621 func handshakeWriter(w io.Writer, bb <-chan []byte, done chan<- error) {
622         var err error
623         for b := range bb {
624                 _, err = w.Write(b)
625                 if err != nil {
626                         break
627                 }
628         }
629         done <- err
630 }
631
632 type (
633         peerExtensionBytes [8]byte
634         peerID             [20]byte
635 )
636
637 func (pex *peerExtensionBytes) SupportsExtended() bool {
638         return pex[5]&0x10 != 0
639 }
640
641 func (pex *peerExtensionBytes) SupportsDHT() bool {
642         return pex[7]&0x01 != 0
643 }
644
645 func (pex *peerExtensionBytes) SupportsFast() bool {
646         return pex[7]&0x04 != 0
647 }
648
649 type handshakeResult struct {
650         peerExtensionBytes
651         peerID
652         metainfo.Hash
653 }
654
655 // ih is nil if we expect the peer to declare the InfoHash, such as when the
656 // peer initiated the connection. Returns ok if the handshake was successful,
657 // and err if there was an unexpected condition other than the peer simply
658 // abandoning the handshake.
659 func handshake(sock io.ReadWriter, ih *metainfo.Hash, peerID [20]byte, extensions peerExtensionBytes) (res handshakeResult, ok bool, err error) {
660         // Bytes to be sent to the peer. Should never block the sender.
661         postCh := make(chan []byte, 4)
662         // A single error value sent when the writer completes.
663         writeDone := make(chan error, 1)
664         // Performs writes to the socket and ensures posts don't block.
665         go handshakeWriter(sock, postCh, writeDone)
666
667         defer func() {
668                 close(postCh) // Done writing.
669                 if !ok {
670                         return
671                 }
672                 if err != nil {
673                         panic(err)
674                 }
675                 // Wait until writes complete before returning from handshake.
676                 err = <-writeDone
677                 if err != nil {
678                         err = fmt.Errorf("error writing: %s", err)
679                 }
680         }()
681
682         post := func(bb []byte) {
683                 select {
684                 case postCh <- bb:
685                 default:
686                         panic("mustn't block while posting")
687                 }
688         }
689
690         post([]byte(pp.Protocol))
691         post(extensions[:])
692         if ih != nil { // We already know what we want.
693                 post(ih[:])
694                 post(peerID[:])
695         }
696         var b [68]byte
697         _, err = io.ReadFull(sock, b[:68])
698         if err != nil {
699                 err = nil
700                 return
701         }
702         if string(b[:20]) != pp.Protocol {
703                 return
704         }
705         missinggo.CopyExact(&res.peerExtensionBytes, b[20:28])
706         missinggo.CopyExact(&res.Hash, b[28:48])
707         missinggo.CopyExact(&res.peerID, b[48:68])
708         peerExtensions.Add(hex.EncodeToString(res.peerExtensionBytes[:]), 1)
709
710         // TODO: Maybe we can just drop peers here if we're not interested. This
711         // could prevent them trying to reconnect, falsely believing there was
712         // just a problem.
713         if ih == nil { // We were waiting for the peer to tell us what they wanted.
714                 post(res.Hash[:])
715                 post(peerID[:])
716         }
717
718         ok = true
719         return
720 }
721
722 // Wraps a raw connection and provides the interface we want for using the
723 // connection in the message loop.
724 type deadlineReader struct {
725         nc net.Conn
726         r  io.Reader
727 }
728
729 func (r deadlineReader) Read(b []byte) (n int, err error) {
730         // Keep-alives should be received every 2 mins. Give a bit of gracetime.
731         err = r.nc.SetReadDeadline(time.Now().Add(150 * time.Second))
732         if err != nil {
733                 err = fmt.Errorf("error setting read deadline: %s", err)
734         }
735         n, err = r.r.Read(b)
736         // Convert common errors into io.EOF.
737         // if err != nil {
738         //      if opError, ok := err.(*net.OpError); ok && opError.Op == "read" && opError.Err == syscall.ECONNRESET {
739         //              err = io.EOF
740         //      } else if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
741         //              if n != 0 {
742         //                      panic(n)
743         //              }
744         //              err = io.EOF
745         //      }
746         // }
747         return
748 }
749
750 type readWriter struct {
751         io.Reader
752         io.Writer
753 }
754
755 func maybeReceiveEncryptedHandshake(rw io.ReadWriter, skeys [][]byte) (ret io.ReadWriter, encrypted bool, err error) {
756         var protocol [len(pp.Protocol)]byte
757         _, err = io.ReadFull(rw, protocol[:])
758         if err != nil {
759                 return
760         }
761         ret = readWriter{
762                 io.MultiReader(bytes.NewReader(protocol[:]), rw),
763                 rw,
764         }
765         if string(protocol[:]) == pp.Protocol {
766                 return
767         }
768         encrypted = true
769         ret, err = mse.ReceiveHandshake(ret, skeys)
770         return
771 }
772
773 func (cl *Client) receiveSkeys() (ret [][]byte) {
774         for ih := range cl.torrents {
775                 ret = append(ret, ih[:])
776         }
777         return
778 }
779
780 func (cl *Client) initiateHandshakes(c *connection, t *Torrent) (ok bool, err error) {
781         if c.encrypted {
782                 c.rw, err = mse.InitiateHandshake(c.rw, t.infoHash[:], nil)
783                 if err != nil {
784                         return
785                 }
786         }
787         ih, ok, err := cl.connBTHandshake(c, &t.infoHash)
788         if ih != t.infoHash {
789                 ok = false
790         }
791         return
792 }
793
794 // Do encryption and bittorrent handshakes as receiver.
795 func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
796         cl.mu.Lock()
797         skeys := cl.receiveSkeys()
798         cl.mu.Unlock()
799         if !cl.config.DisableEncryption {
800                 c.rw, c.encrypted, err = maybeReceiveEncryptedHandshake(c.rw, skeys)
801                 if err != nil {
802                         if err == mse.ErrNoSecretKeyMatch {
803                                 err = nil
804                         }
805                         return
806                 }
807         }
808         ih, ok, err := cl.connBTHandshake(c, nil)
809         if err != nil {
810                 err = fmt.Errorf("error during bt handshake: %s", err)
811                 return
812         }
813         if !ok {
814                 return
815         }
816         cl.mu.Lock()
817         t = cl.torrents[ih]
818         cl.mu.Unlock()
819         return
820 }
821
822 // Returns !ok if handshake failed for valid reasons.
823 func (cl *Client) connBTHandshake(c *connection, ih *metainfo.Hash) (ret metainfo.Hash, ok bool, err error) {
824         res, ok, err := handshake(c.rw, ih, cl.peerID, cl.extensionBytes)
825         if err != nil || !ok {
826                 return
827         }
828         ret = res.Hash
829         c.PeerExtensionBytes = res.peerExtensionBytes
830         c.PeerID = res.peerID
831         c.completedHandshake = time.Now()
832         return
833 }
834
835 func (cl *Client) runInitiatedHandshookConn(c *connection, t *Torrent) (err error) {
836         if c.PeerID == cl.peerID {
837                 // Only if we initiated the connection is the remote address a
838                 // listen addr for a doppleganger.
839                 connsToSelf.Add(1)
840                 addr := c.conn.RemoteAddr().String()
841                 cl.dopplegangerAddrs[addr] = struct{}{}
842                 return
843         }
844         return cl.runHandshookConn(c, t)
845 }
846
847 func (cl *Client) runReceivedConn(c *connection) (err error) {
848         err = c.conn.SetDeadline(time.Now().Add(handshakesTimeout))
849         if err != nil {
850                 return
851         }
852         t, err := cl.receiveHandshakes(c)
853         if err != nil {
854                 err = fmt.Errorf("error receiving handshakes: %s", err)
855                 return
856         }
857         if t == nil {
858                 return
859         }
860         cl.mu.Lock()
861         defer cl.mu.Unlock()
862         if c.PeerID == cl.peerID {
863                 return
864         }
865         return cl.runHandshookConn(c, t)
866 }
867
868 func (cl *Client) runHandshookConn(c *connection, t *Torrent) (err error) {
869         c.conn.SetWriteDeadline(time.Time{})
870         c.rw = readWriter{
871                 deadlineReader{c.conn, c.rw},
872                 c.rw,
873         }
874         completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
875         if !cl.addConnection(t, c) {
876                 return
877         }
878         defer cl.dropConnection(t, c)
879         go c.writer(time.Minute)
880         cl.sendInitialMessages(c, t)
881         err = cl.connectionLoop(t, c)
882         if err != nil {
883                 err = fmt.Errorf("error during connection loop: %s", err)
884         }
885         return
886 }
887
888 func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
889         if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
890                 conn.Post(pp.Message{
891                         Type:       pp.Extended,
892                         ExtendedID: pp.HandshakeExtendedID,
893                         ExtendedPayload: func() []byte {
894                                 d := map[string]interface{}{
895                                         "m": func() (ret map[string]int) {
896                                                 ret = make(map[string]int, 2)
897                                                 ret["ut_metadata"] = metadataExtendedId
898                                                 if !cl.config.DisablePEX {
899                                                         ret["ut_pex"] = pexExtendedId
900                                                 }
901                                                 return
902                                         }(),
903                                         "v": extendedHandshakeClientVersion,
904                                         // No upload queue is implemented yet.
905                                         "reqq": 64,
906                                 }
907                                 if !cl.config.DisableEncryption {
908                                         d["e"] = 1
909                                 }
910                                 if torrent.metadataSizeKnown() {
911                                         d["metadata_size"] = torrent.metadataSize()
912                                 }
913                                 if p := cl.incomingPeerPort(); p != 0 {
914                                         d["p"] = p
915                                 }
916                                 yourip, err := addrCompactIP(conn.remoteAddr())
917                                 if err != nil {
918                                         log.Printf("error calculating yourip field value in extension handshake: %s", err)
919                                 } else {
920                                         d["yourip"] = yourip
921                                 }
922                                 // log.Printf("sending %v", d)
923                                 b, err := bencode.Marshal(d)
924                                 if err != nil {
925                                         panic(err)
926                                 }
927                                 return b
928                         }(),
929                 })
930         }
931         if torrent.haveAnyPieces() {
932                 conn.Bitfield(torrent.bitfield())
933         } else if cl.extensionBytes.SupportsFast() && conn.PeerExtensionBytes.SupportsFast() {
934                 conn.Post(pp.Message{
935                         Type: pp.HaveNone,
936                 })
937         }
938         if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.dHT != nil {
939                 conn.Post(pp.Message{
940                         Type: pp.Port,
941                         Port: uint16(missinggo.AddrPort(cl.dHT.Addr())),
942                 })
943         }
944 }
945
946 func (cl *Client) peerUnchoked(torrent *Torrent, conn *connection) {
947         conn.updateRequests()
948 }
949
950 func (cl *Client) connCancel(t *Torrent, cn *connection, r request) (ok bool) {
951         ok = cn.Cancel(r)
952         if ok {
953                 postedCancels.Add(1)
954         }
955         return
956 }
957
958 func (cl *Client) connDeleteRequest(t *Torrent, cn *connection, r request) bool {
959         if !cn.RequestPending(r) {
960                 return false
961         }
962         delete(cn.Requests, r)
963         return true
964 }
965
966 func (cl *Client) requestPendingMetadata(t *Torrent, c *connection) {
967         if t.haveInfo() {
968                 return
969         }
970         if c.PeerExtensionIDs["ut_metadata"] == 0 {
971                 // Peer doesn't support this.
972                 return
973         }
974         // Request metadata pieces that we don't have in a random order.
975         var pending []int
976         for index := 0; index < t.metadataPieceCount(); index++ {
977                 if !t.haveMetadataPiece(index) && !c.requestedMetadataPiece(index) {
978                         pending = append(pending, index)
979                 }
980         }
981         for _, i := range mathRand.Perm(len(pending)) {
982                 c.requestMetadataPiece(pending[i])
983         }
984 }
985
986 // Process incoming ut_metadata message.
987 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) (err error) {
988         var d map[string]int
989         err = bencode.Unmarshal(payload, &d)
990         if err != nil {
991                 err = fmt.Errorf("error unmarshalling payload: %s: %q", err, payload)
992                 return
993         }
994         msgType, ok := d["msg_type"]
995         if !ok {
996                 err = errors.New("missing msg_type field")
997                 return
998         }
999         piece := d["piece"]
1000         switch msgType {
1001         case pp.DataMetadataExtensionMsgType:
1002                 if !c.requestedMetadataPiece(piece) {
1003                         err = fmt.Errorf("got unexpected piece %d", piece)
1004                         return
1005                 }
1006                 c.metadataRequests[piece] = false
1007                 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
1008                 if begin < 0 || begin >= len(payload) {
1009                         err = fmt.Errorf("data has bad offset in payload: %d", begin)
1010                         return
1011                 }
1012                 t.saveMetadataPiece(piece, payload[begin:])
1013                 c.UsefulChunksReceived++
1014                 c.lastUsefulChunkReceived = time.Now()
1015                 t.maybeMetadataCompleted()
1016         case pp.RequestMetadataExtensionMsgType:
1017                 if !t.haveMetadataPiece(piece) {
1018                         c.Post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
1019                         break
1020                 }
1021                 start := (1 << 14) * piece
1022                 c.Post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1023         case pp.RejectMetadataExtensionMsgType:
1024         default:
1025                 err = errors.New("unknown msg_type value")
1026         }
1027         return
1028 }
1029
1030 func (cl *Client) upload(t *Torrent, c *connection) {
1031         if cl.config.NoUpload {
1032                 return
1033         }
1034         if !c.PeerInterested {
1035                 return
1036         }
1037         seeding := cl.seeding(t)
1038         if !seeding && !t.connHasWantedPieces(c) {
1039                 return
1040         }
1041 another:
1042         for seeding || c.chunksSent < c.UsefulChunksReceived+6 {
1043                 c.Unchoke()
1044                 for r := range c.PeerRequests {
1045                         err := cl.sendChunk(t, c, r)
1046                         if err != nil {
1047                                 if t.pieceComplete(int(r.Index)) && err == io.ErrUnexpectedEOF {
1048                                         // We had the piece, but not anymore.
1049                                 } else {
1050                                         log.Printf("error sending chunk %+v to peer: %s", r, err)
1051                                 }
1052                                 // If we failed to send a chunk, choke the peer to ensure they
1053                                 // flush all their requests. We've probably dropped a piece,
1054                                 // but there's no way to communicate this to the peer. If they
1055                                 // ask for it again, we'll kick them to allow us to send them
1056                                 // an updated bitfield.
1057                                 break another
1058                         }
1059                         delete(c.PeerRequests, r)
1060                         goto another
1061                 }
1062                 return
1063         }
1064         c.Choke()
1065 }
1066
1067 func (cl *Client) sendChunk(t *Torrent, c *connection, r request) error {
1068         // Count the chunk being sent, even if it isn't.
1069         b := make([]byte, r.Length)
1070         p := t.info.Piece(int(r.Index))
1071         n, err := t.readAt(b, p.Offset()+int64(r.Begin))
1072         if n != len(b) {
1073                 if err == nil {
1074                         panic("expected error")
1075                 }
1076                 return err
1077         }
1078         c.Post(pp.Message{
1079                 Type:  pp.Piece,
1080                 Index: r.Index,
1081                 Begin: r.Begin,
1082                 Piece: b,
1083         })
1084         c.chunksSent++
1085         uploadChunksPosted.Add(1)
1086         c.lastChunkSent = time.Now()
1087         return nil
1088 }
1089
1090 // Processes incoming bittorrent messages. The client lock is held upon entry
1091 // and exit.
1092 func (cl *Client) connectionLoop(t *Torrent, c *connection) error {
1093         decoder := pp.Decoder{
1094                 R:         bufio.NewReader(c.rw),
1095                 MaxLength: 256 * 1024,
1096         }
1097         for {
1098                 cl.mu.Unlock()
1099                 var msg pp.Message
1100                 err := decoder.Decode(&msg)
1101                 cl.mu.Lock()
1102                 if cl.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
1103                         return nil
1104                 }
1105                 if err != nil {
1106                         return err
1107                 }
1108                 c.lastMessageReceived = time.Now()
1109                 if msg.Keepalive {
1110                         receivedKeepalives.Add(1)
1111                         continue
1112                 }
1113                 receivedMessageTypes.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
1114                 switch msg.Type {
1115                 case pp.Choke:
1116                         c.PeerChoked = true
1117                         c.Requests = nil
1118                         // We can then reset our interest.
1119                         c.updateRequests()
1120                 case pp.Reject:
1121                         cl.connDeleteRequest(t, c, newRequest(msg.Index, msg.Begin, msg.Length))
1122                         c.updateRequests()
1123                 case pp.Unchoke:
1124                         c.PeerChoked = false
1125                         cl.peerUnchoked(t, c)
1126                 case pp.Interested:
1127                         c.PeerInterested = true
1128                         cl.upload(t, c)
1129                 case pp.NotInterested:
1130                         c.PeerInterested = false
1131                         c.Choke()
1132                 case pp.Have:
1133                         err = c.peerSentHave(int(msg.Index))
1134                 case pp.Request:
1135                         if c.Choked {
1136                                 break
1137                         }
1138                         if !c.PeerInterested {
1139                                 err = errors.New("peer sent request but isn't interested")
1140                                 break
1141                         }
1142                         if !t.havePiece(msg.Index.Int()) {
1143                                 // This isn't necessarily them screwing up. We can drop pieces
1144                                 // from our storage, and can't communicate this to peers
1145                                 // except by reconnecting.
1146                                 requestsReceivedForMissingPieces.Add(1)
1147                                 err = errors.New("peer requested piece we don't have")
1148                                 break
1149                         }
1150                         if c.PeerRequests == nil {
1151                                 c.PeerRequests = make(map[request]struct{}, maxRequests)
1152                         }
1153                         c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{}
1154                         cl.upload(t, c)
1155                 case pp.Cancel:
1156                         req := newRequest(msg.Index, msg.Begin, msg.Length)
1157                         if !c.PeerCancel(req) {
1158                                 unexpectedCancels.Add(1)
1159                         }
1160                 case pp.Bitfield:
1161                         err = c.peerSentBitfield(msg.Bitfield)
1162                 case pp.HaveAll:
1163                         err = c.peerSentHaveAll()
1164                 case pp.HaveNone:
1165                         err = c.peerSentHaveNone()
1166                 case pp.Piece:
1167                         cl.downloadedChunk(t, c, &msg)
1168                 case pp.Extended:
1169                         switch msg.ExtendedID {
1170                         case pp.HandshakeExtendedID:
1171                                 // TODO: Create a bencode struct for this.
1172                                 var d map[string]interface{}
1173                                 err = bencode.Unmarshal(msg.ExtendedPayload, &d)
1174                                 if err != nil {
1175                                         err = fmt.Errorf("error decoding extended message payload: %s", err)
1176                                         break
1177                                 }
1178                                 // log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d)
1179                                 if reqq, ok := d["reqq"]; ok {
1180                                         if i, ok := reqq.(int64); ok {
1181                                                 c.PeerMaxRequests = int(i)
1182                                         }
1183                                 }
1184                                 if v, ok := d["v"]; ok {
1185                                         c.PeerClientName = v.(string)
1186                                 }
1187                                 m, ok := d["m"]
1188                                 if !ok {
1189                                         err = errors.New("handshake missing m item")
1190                                         break
1191                                 }
1192                                 mTyped, ok := m.(map[string]interface{})
1193                                 if !ok {
1194                                         err = errors.New("handshake m value is not dict")
1195                                         break
1196                                 }
1197                                 if c.PeerExtensionIDs == nil {
1198                                         c.PeerExtensionIDs = make(map[string]byte, len(mTyped))
1199                                 }
1200                                 for name, v := range mTyped {
1201                                         id, ok := v.(int64)
1202                                         if !ok {
1203                                                 log.Printf("bad handshake m item extension ID type: %T", v)
1204                                                 continue
1205                                         }
1206                                         if id == 0 {
1207                                                 delete(c.PeerExtensionIDs, name)
1208                                         } else {
1209                                                 if c.PeerExtensionIDs[name] == 0 {
1210                                                         supportedExtensionMessages.Add(name, 1)
1211                                                 }
1212                                                 c.PeerExtensionIDs[name] = byte(id)
1213                                         }
1214                                 }
1215                                 metadata_sizeUntyped, ok := d["metadata_size"]
1216                                 if ok {
1217                                         metadata_size, ok := metadata_sizeUntyped.(int64)
1218                                         if !ok {
1219                                                 log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
1220                                         } else {
1221                                                 t.setMetadataSize(metadata_size, cl)
1222                                         }
1223                                 }
1224                                 if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
1225                                         cl.requestPendingMetadata(t, c)
1226                                 }
1227                         case metadataExtendedId:
1228                                 err = cl.gotMetadataExtensionMsg(msg.ExtendedPayload, t, c)
1229                                 if err != nil {
1230                                         err = fmt.Errorf("error handling metadata extension message: %s", err)
1231                                 }
1232                         case pexExtendedId:
1233                                 if cl.config.DisablePEX {
1234                                         break
1235                                 }
1236                                 var pexMsg peerExchangeMessage
1237                                 err = bencode.Unmarshal(msg.ExtendedPayload, &pexMsg)
1238                                 if err != nil {
1239                                         err = fmt.Errorf("error unmarshalling PEX message: %s", err)
1240                                         break
1241                                 }
1242                                 go func() {
1243                                         cl.mu.Lock()
1244                                         cl.addPeers(t, func() (ret []Peer) {
1245                                                 for i, cp := range pexMsg.Added {
1246                                                         p := Peer{
1247                                                                 IP:     make([]byte, 4),
1248                                                                 Port:   cp.Port,
1249                                                                 Source: peerSourcePEX,
1250                                                         }
1251                                                         if i < len(pexMsg.AddedFlags) && pexMsg.AddedFlags[i]&0x01 != 0 {
1252                                                                 p.SupportsEncryption = true
1253                                                         }
1254                                                         missinggo.CopyExact(p.IP, cp.IP[:])
1255                                                         ret = append(ret, p)
1256                                                 }
1257                                                 return
1258                                         }())
1259                                         cl.mu.Unlock()
1260                                 }()
1261                         default:
1262                                 err = fmt.Errorf("unexpected extended message ID: %v", msg.ExtendedID)
1263                         }
1264                         if err != nil {
1265                                 // That client uses its own extension IDs for outgoing message
1266                                 // types, which is incorrect.
1267                                 if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) ||
1268                                         strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
1269                                         return nil
1270                                 }
1271                         }
1272                 case pp.Port:
1273                         if cl.dHT == nil {
1274                                 break
1275                         }
1276                         pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
1277                         if err != nil {
1278                                 panic(err)
1279                         }
1280                         if msg.Port != 0 {
1281                                 pingAddr.Port = int(msg.Port)
1282                         }
1283                         cl.dHT.Ping(pingAddr)
1284                 default:
1285                         err = fmt.Errorf("received unknown message type: %#v", msg.Type)
1286                 }
1287                 if err != nil {
1288                         return err
1289                 }
1290         }
1291 }
1292
1293 // Returns true if connection is removed from torrent.Conns.
1294 func (cl *Client) deleteConnection(t *Torrent, c *connection) bool {
1295         for i0, _c := range t.conns {
1296                 if _c != c {
1297                         continue
1298                 }
1299                 i1 := len(t.conns) - 1
1300                 if i0 != i1 {
1301                         t.conns[i0] = t.conns[i1]
1302                 }
1303                 t.conns = t.conns[:i1]
1304                 return true
1305         }
1306         return false
1307 }
1308
1309 func (cl *Client) dropConnection(t *Torrent, c *connection) {
1310         cl.event.Broadcast()
1311         c.Close()
1312         if cl.deleteConnection(t, c) {
1313                 cl.openNewConns(t)
1314         }
1315 }
1316
1317 // Returns true if the connection is added.
1318 func (cl *Client) addConnection(t *Torrent, c *connection) bool {
1319         if cl.closed.IsSet() {
1320                 return false
1321         }
1322         select {
1323         case <-t.ceasingNetworking:
1324                 return false
1325         default:
1326         }
1327         if !cl.wantConns(t) {
1328                 return false
1329         }
1330         for _, c0 := range t.conns {
1331                 if c.PeerID == c0.PeerID {
1332                         // Already connected to a client with that ID.
1333                         duplicateClientConns.Add(1)
1334                         return false
1335                 }
1336         }
1337         if len(t.conns) >= socketsPerTorrent {
1338                 c := t.worstBadConn(cl)
1339                 if c == nil {
1340                         return false
1341                 }
1342                 if cl.config.Debug && missinggo.CryHeard() {
1343                         log.Printf("%s: dropping connection to make room for new one:\n    %s", t, c)
1344                 }
1345                 c.Close()
1346                 cl.deleteConnection(t, c)
1347         }
1348         if len(t.conns) >= socketsPerTorrent {
1349                 panic(len(t.conns))
1350         }
1351         t.conns = append(t.conns, c)
1352         c.t = t
1353         return true
1354 }
1355
1356 func (cl *Client) usefulConn(t *Torrent, c *connection) bool {
1357         if c.closed.IsSet() {
1358                 return false
1359         }
1360         if !t.haveInfo() {
1361                 return c.supportsExtension("ut_metadata")
1362         }
1363         if cl.seeding(t) {
1364                 return c.PeerInterested
1365         }
1366         return t.connHasWantedPieces(c)
1367 }
1368
1369 func (cl *Client) wantConns(t *Torrent) bool {
1370         if !cl.seeding(t) && !t.needData() {
1371                 return false
1372         }
1373         if len(t.conns) < socketsPerTorrent {
1374                 return true
1375         }
1376         return t.worstBadConn(cl) != nil
1377 }
1378
1379 func (cl *Client) openNewConns(t *Torrent) {
1380         select {
1381         case <-t.ceasingNetworking:
1382                 return
1383         default:
1384         }
1385         for len(t.peers) != 0 {
1386                 if !cl.wantConns(t) {
1387                         return
1388                 }
1389                 if len(t.halfOpen) >= cl.halfOpenLimit {
1390                         return
1391                 }
1392                 var (
1393                         k peersKey
1394                         p Peer
1395                 )
1396                 for k, p = range t.peers {
1397                         break
1398                 }
1399                 delete(t.peers, k)
1400                 cl.initiateConn(p, t)
1401         }
1402         t.wantPeers.Broadcast()
1403 }
1404
1405 func (cl *Client) addPeers(t *Torrent, peers []Peer) {
1406         for _, p := range peers {
1407                 if cl.dopplegangerAddr(net.JoinHostPort(
1408                         p.IP.String(),
1409                         strconv.FormatInt(int64(p.Port), 10),
1410                 )) {
1411                         continue
1412                 }
1413                 if _, ok := cl.ipBlockRange(p.IP); ok {
1414                         continue
1415                 }
1416                 if p.Port == 0 {
1417                         // The spec says to scrub these yourselves. Fine.
1418                         continue
1419                 }
1420                 t.addPeer(p, cl)
1421         }
1422 }
1423
1424 // Prepare a Torrent without any attachment to a Client. That means we can
1425 // initialize fields all fields that don't require the Client without locking
1426 // it.
1427 func (cl *Client) newTorrent(ih metainfo.Hash) (t *Torrent) {
1428         t = &Torrent{
1429                 cl:        cl,
1430                 infoHash:  ih,
1431                 chunkSize: defaultChunkSize,
1432                 peers:     make(map[peersKey]Peer),
1433
1434                 closing:           make(chan struct{}),
1435                 ceasingNetworking: make(chan struct{}),
1436
1437                 halfOpen:          make(map[string]struct{}),
1438                 pieceStateChanges: pubsub.NewPubSub(),
1439
1440                 storageOpener: cl.defaultStorage,
1441         }
1442         t.wantPeers.L = &cl.mu
1443         return
1444 }
1445
1446 func init() {
1447         // For shuffling the tracker tiers.
1448         mathRand.Seed(time.Now().Unix())
1449 }
1450
1451 type trackerTier []string
1452
1453 // The trackers within each tier must be shuffled before use.
1454 // http://stackoverflow.com/a/12267471/149482
1455 // http://www.bittorrent.org/beps/bep_0012.html#order-of-processing
1456 func shuffleTier(tier trackerTier) {
1457         for i := range tier {
1458                 j := mathRand.Intn(i + 1)
1459                 tier[i], tier[j] = tier[j], tier[i]
1460         }
1461 }
1462
1463 func copyTrackers(base []trackerTier) (copy []trackerTier) {
1464         for _, tier := range base {
1465                 copy = append(copy, append(trackerTier(nil), tier...))
1466         }
1467         return
1468 }
1469
1470 func mergeTier(tier trackerTier, newURLs []string) trackerTier {
1471 nextURL:
1472         for _, url := range newURLs {
1473                 for _, trURL := range tier {
1474                         if trURL == url {
1475                                 continue nextURL
1476                         }
1477                 }
1478                 tier = append(tier, url)
1479         }
1480         return tier
1481 }
1482
1483 // A file-like handle to some torrent data resource.
1484 type Handle interface {
1485         io.Reader
1486         io.Seeker
1487         io.Closer
1488         io.ReaderAt
1489 }
1490
1491 // Specifies a new torrent for adding to a client. There are helpers for
1492 // magnet URIs and torrent metainfo files.
1493 type TorrentSpec struct {
1494         // The tiered tracker URIs.
1495         Trackers [][]string
1496         InfoHash metainfo.Hash
1497         Info     *metainfo.InfoEx
1498         // The name to use if the Name field from the Info isn't available.
1499         DisplayName string
1500         // The chunk size to use for outbound requests. Defaults to 16KiB if not
1501         // set.
1502         ChunkSize int
1503         Storage   storage.I
1504 }
1505
1506 func TorrentSpecFromMagnetURI(uri string) (spec *TorrentSpec, err error) {
1507         m, err := metainfo.ParseMagnetURI(uri)
1508         if err != nil {
1509                 return
1510         }
1511         spec = &TorrentSpec{
1512                 Trackers:    [][]string{m.Trackers},
1513                 DisplayName: m.DisplayName,
1514                 InfoHash:    m.InfoHash,
1515         }
1516         return
1517 }
1518
1519 func TorrentSpecFromMetaInfo(mi *metainfo.MetaInfo) (spec *TorrentSpec) {
1520         spec = &TorrentSpec{
1521                 Trackers:    mi.AnnounceList,
1522                 Info:        &mi.Info,
1523                 DisplayName: mi.Info.Name,
1524                 InfoHash:    mi.Info.Hash(),
1525         }
1526         if len(spec.Trackers) == 0 {
1527                 spec.Trackers = [][]string{[]string{mi.Announce}}
1528         } else {
1529                 spec.Trackers[0] = append(spec.Trackers[0], mi.Announce)
1530         }
1531         return
1532 }
1533
1534 func (cl *Client) AddTorrentInfoHash(infoHash metainfo.Hash) (t *Torrent, new bool) {
1535         cl.mu.Lock()
1536         defer cl.mu.Unlock()
1537         t, ok := cl.torrents[infoHash]
1538         if ok {
1539                 return
1540         }
1541         new = true
1542         t = cl.newTorrent(infoHash)
1543         if !cl.config.DisableTrackers {
1544                 go cl.announceTorrentTrackers(t)
1545         }
1546         if cl.dHT != nil {
1547                 go cl.announceTorrentDHT(t, true)
1548         }
1549         cl.torrents[infoHash] = t
1550         return
1551 }
1552
1553 // Add or merge a torrent spec. If the torrent is already present, the
1554 // trackers will be merged with the existing ones. If the Info isn't yet
1555 // known, it will be set. The display name is replaced if the new spec
1556 // provides one. Returns new if the torrent wasn't already in the client.
1557 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1558         t, new = cl.AddTorrentInfoHash(spec.InfoHash)
1559
1560         if spec.DisplayName != "" {
1561                 t.SetDisplayName(spec.DisplayName)
1562         }
1563
1564         if spec.Info != nil {
1565                 err = t.SetInfoBytes(spec.Info.Bytes)
1566                 if err != nil {
1567                         return
1568                 }
1569         }
1570
1571         cl.mu.Lock()
1572         defer cl.mu.Unlock()
1573
1574         t.addTrackers(spec.Trackers)
1575
1576         t.maybeNewConns()
1577
1578         return
1579 }
1580
1581 func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1582         t, ok := cl.torrents[infoHash]
1583         if !ok {
1584                 err = fmt.Errorf("no such torrent")
1585                 return
1586         }
1587         err = t.close()
1588         if err != nil {
1589                 panic(err)
1590         }
1591         delete(cl.torrents, infoHash)
1592         return
1593 }
1594
1595 // Returns true when peers are required, or false if the torrent is closing.
1596 func (cl *Client) waitWantPeers(t *Torrent) bool {
1597         cl.mu.Lock()
1598         defer cl.mu.Unlock()
1599         for {
1600                 select {
1601                 case <-t.ceasingNetworking:
1602                         return false
1603                 default:
1604                 }
1605                 if len(t.peers) > torrentPeersLowWater {
1606                         goto wait
1607                 }
1608                 if t.needData() || cl.seeding(t) {
1609                         return true
1610                 }
1611         wait:
1612                 t.wantPeers.Wait()
1613         }
1614 }
1615
1616 // Returns whether the client should make effort to seed the torrent.
1617 func (cl *Client) seeding(t *Torrent) bool {
1618         if cl.config.NoUpload {
1619                 return false
1620         }
1621         if !cl.config.Seed {
1622                 return false
1623         }
1624         if t.needData() {
1625                 return false
1626         }
1627         return true
1628 }
1629
1630 func (cl *Client) announceTorrentDHT(t *Torrent, impliedPort bool) {
1631         for cl.waitWantPeers(t) {
1632                 // log.Printf("getting peers for %q from DHT", t)
1633                 ps, err := cl.dHT.Announce(string(t.infoHash[:]), cl.incomingPeerPort(), impliedPort)
1634                 if err != nil {
1635                         log.Printf("error getting peers from dht: %s", err)
1636                         return
1637                 }
1638                 // Count all the unique addresses we got during this announce.
1639                 allAddrs := make(map[string]struct{})
1640         getPeers:
1641                 for {
1642                         select {
1643                         case v, ok := <-ps.Peers:
1644                                 if !ok {
1645                                         break getPeers
1646                                 }
1647                                 addPeers := make([]Peer, 0, len(v.Peers))
1648                                 for _, cp := range v.Peers {
1649                                         if cp.Port == 0 {
1650                                                 // Can't do anything with this.
1651                                                 continue
1652                                         }
1653                                         addPeers = append(addPeers, Peer{
1654                                                 IP:     cp.IP[:],
1655                                                 Port:   cp.Port,
1656                                                 Source: peerSourceDHT,
1657                                         })
1658                                         key := (&net.UDPAddr{
1659                                                 IP:   cp.IP[:],
1660                                                 Port: cp.Port,
1661                                         }).String()
1662                                         allAddrs[key] = struct{}{}
1663                                 }
1664                                 cl.mu.Lock()
1665                                 cl.addPeers(t, addPeers)
1666                                 numPeers := len(t.peers)
1667                                 cl.mu.Unlock()
1668                                 if numPeers >= torrentPeersHighWater {
1669                                         break getPeers
1670                                 }
1671                         case <-t.ceasingNetworking:
1672                                 ps.Close()
1673                                 return
1674                         }
1675                 }
1676                 ps.Close()
1677                 // log.Printf("finished DHT peer scrape for %s: %d peers", t, len(allAddrs))
1678         }
1679 }
1680
1681 func (cl *Client) trackerBlockedUnlocked(trRawURL string) (blocked bool, err error) {
1682         url_, err := url.Parse(trRawURL)
1683         if err != nil {
1684                 return
1685         }
1686         host, _, err := net.SplitHostPort(url_.Host)
1687         if err != nil {
1688                 host = url_.Host
1689         }
1690         addr, err := net.ResolveIPAddr("ip", host)
1691         if err != nil {
1692                 return
1693         }
1694         cl.mu.RLock()
1695         _, blocked = cl.ipBlockRange(addr.IP)
1696         cl.mu.RUnlock()
1697         return
1698 }
1699
1700 func (cl *Client) announceTorrentSingleTracker(tr string, req *tracker.AnnounceRequest, t *Torrent) error {
1701         blocked, err := cl.trackerBlockedUnlocked(tr)
1702         if err != nil {
1703                 return fmt.Errorf("error determining if tracker blocked: %s", err)
1704         }
1705         if blocked {
1706                 return fmt.Errorf("tracker blocked: %s", tr)
1707         }
1708         resp, err := tracker.Announce(tr, req)
1709         if err != nil {
1710                 return fmt.Errorf("error announcing: %s", err)
1711         }
1712         var peers []Peer
1713         for _, peer := range resp.Peers {
1714                 peers = append(peers, Peer{
1715                         IP:   peer.IP,
1716                         Port: peer.Port,
1717                 })
1718         }
1719         cl.mu.Lock()
1720         cl.addPeers(t, peers)
1721         cl.mu.Unlock()
1722
1723         // log.Printf("%s: %d new peers from %s", t, len(peers), tr)
1724
1725         time.Sleep(time.Second * time.Duration(resp.Interval))
1726         return nil
1727 }
1728
1729 func (cl *Client) announceTorrentTrackersFastStart(req *tracker.AnnounceRequest, trackers []trackerTier, t *Torrent) (atLeastOne bool) {
1730         oks := make(chan bool)
1731         outstanding := 0
1732         for _, tier := range trackers {
1733                 for _, tr := range tier {
1734                         outstanding++
1735                         go func(tr string) {
1736                                 err := cl.announceTorrentSingleTracker(tr, req, t)
1737                                 oks <- err == nil
1738                         }(tr)
1739                 }
1740         }
1741         for outstanding > 0 {
1742                 ok := <-oks
1743                 outstanding--
1744                 if ok {
1745                         atLeastOne = true
1746                 }
1747         }
1748         return
1749 }
1750
1751 // Announce torrent to its trackers.
1752 func (cl *Client) announceTorrentTrackers(t *Torrent) {
1753         req := tracker.AnnounceRequest{
1754                 Event:    tracker.Started,
1755                 NumWant:  -1,
1756                 Port:     uint16(cl.incomingPeerPort()),
1757                 PeerId:   cl.peerID,
1758                 InfoHash: t.infoHash,
1759         }
1760         if !cl.waitWantPeers(t) {
1761                 return
1762         }
1763         cl.mu.RLock()
1764         req.Left = t.bytesLeftAnnounce()
1765         trackers := t.trackers
1766         cl.mu.RUnlock()
1767         if cl.announceTorrentTrackersFastStart(&req, trackers, t) {
1768                 req.Event = tracker.None
1769         }
1770 newAnnounce:
1771         for cl.waitWantPeers(t) {
1772                 cl.mu.RLock()
1773                 req.Left = t.bytesLeftAnnounce()
1774                 trackers = t.trackers
1775                 cl.mu.RUnlock()
1776                 numTrackersTried := 0
1777                 for _, tier := range trackers {
1778                         for trIndex, tr := range tier {
1779                                 numTrackersTried++
1780                                 err := cl.announceTorrentSingleTracker(tr, &req, t)
1781                                 if err != nil {
1782                                         continue
1783                                 }
1784                                 // Float the successful announce to the top of the tier. If
1785                                 // the trackers list has been changed, we'll be modifying an
1786                                 // old copy so it won't matter.
1787                                 cl.mu.Lock()
1788                                 tier[0], tier[trIndex] = tier[trIndex], tier[0]
1789                                 cl.mu.Unlock()
1790
1791                                 req.Event = tracker.None
1792                                 continue newAnnounce
1793                         }
1794                 }
1795                 if numTrackersTried != 0 {
1796                         log.Printf("%s: all trackers failed", t)
1797                 }
1798                 // TODO: Wait until trackers are added if there are none.
1799                 time.Sleep(10 * time.Second)
1800         }
1801 }
1802
1803 func (cl *Client) allTorrentsCompleted() bool {
1804         for _, t := range cl.torrents {
1805                 if !t.haveInfo() {
1806                         return false
1807                 }
1808                 if t.numPiecesCompleted() != t.numPieces() {
1809                         return false
1810                 }
1811         }
1812         return true
1813 }
1814
1815 // Returns true when all torrents are completely downloaded and false if the
1816 // client is stopped before that.
1817 func (cl *Client) WaitAll() bool {
1818         cl.mu.Lock()
1819         defer cl.mu.Unlock()
1820         for !cl.allTorrentsCompleted() {
1821                 if cl.closed.IsSet() {
1822                         return false
1823                 }
1824                 cl.event.Wait()
1825         }
1826         return true
1827 }
1828
1829 // Handle a received chunk from a peer.
1830 func (cl *Client) downloadedChunk(t *Torrent, c *connection, msg *pp.Message) {
1831         chunksReceived.Add(1)
1832
1833         req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
1834
1835         // Request has been satisfied.
1836         if cl.connDeleteRequest(t, c, req) {
1837                 defer c.updateRequests()
1838         } else {
1839                 unexpectedChunksReceived.Add(1)
1840         }
1841
1842         index := int(req.Index)
1843         piece := &t.pieces[index]
1844
1845         // Do we actually want this chunk?
1846         if !t.wantPiece(req) {
1847                 unwantedChunksReceived.Add(1)
1848                 c.UnwantedChunksReceived++
1849                 return
1850         }
1851
1852         c.UsefulChunksReceived++
1853         c.lastUsefulChunkReceived = time.Now()
1854
1855         cl.upload(t, c)
1856
1857         // Need to record that it hasn't been written yet, before we attempt to do
1858         // anything with it.
1859         piece.incrementPendingWrites()
1860         // Record that we have the chunk.
1861         piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
1862
1863         // Cancel pending requests for this chunk.
1864         for _, c := range t.conns {
1865                 if cl.connCancel(t, c, req) {
1866                         c.updateRequests()
1867                 }
1868         }
1869
1870         cl.mu.Unlock()
1871         // Write the chunk out.
1872         err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
1873         cl.mu.Lock()
1874
1875         piece.decrementPendingWrites()
1876
1877         if err != nil {
1878                 log.Printf("%s: error writing chunk %v: %s", t, req, err)
1879                 t.pendRequest(req)
1880                 t.updatePieceCompletion(int(msg.Index))
1881                 return
1882         }
1883
1884         // It's important that the piece is potentially queued before we check if
1885         // the piece is still wanted, because if it is queued, it won't be wanted.
1886         if t.pieceAllDirty(index) {
1887                 cl.queuePieceCheck(t, int(req.Index))
1888         }
1889
1890         if c.peerTouchedPieces == nil {
1891                 c.peerTouchedPieces = make(map[int]struct{})
1892         }
1893         c.peerTouchedPieces[index] = struct{}{}
1894
1895         cl.event.Broadcast()
1896         t.publishPieceChange(int(req.Index))
1897         return
1898 }
1899
1900 // Return the connections that touched a piece, and clear the entry while
1901 // doing it.
1902 func (cl *Client) reapPieceTouches(t *Torrent, piece int) (ret []*connection) {
1903         for _, c := range t.conns {
1904                 if _, ok := c.peerTouchedPieces[piece]; ok {
1905                         ret = append(ret, c)
1906                         delete(c.peerTouchedPieces, piece)
1907                 }
1908         }
1909         return
1910 }
1911
1912 func (cl *Client) pieceHashed(t *Torrent, piece int, correct bool) {
1913         p := &t.pieces[piece]
1914         if p.EverHashed {
1915                 // Don't score the first time a piece is hashed, it could be an
1916                 // initial check.
1917                 if correct {
1918                         pieceHashedCorrect.Add(1)
1919                 } else {
1920                         log.Printf("%s: piece %d (%x) failed hash", t, piece, p.Hash)
1921                         pieceHashedNotCorrect.Add(1)
1922                 }
1923         }
1924         p.EverHashed = true
1925         touchers := cl.reapPieceTouches(t, piece)
1926         if correct {
1927                 err := p.Storage().MarkComplete()
1928                 if err != nil {
1929                         log.Printf("%T: error completing piece %d: %s", t.storage, piece, err)
1930                 }
1931                 t.updatePieceCompletion(piece)
1932         } else if len(touchers) != 0 {
1933                 log.Printf("dropping %d conns that touched piece", len(touchers))
1934                 for _, c := range touchers {
1935                         cl.dropConnection(t, c)
1936                 }
1937         }
1938         cl.pieceChanged(t, piece)
1939 }
1940
1941 func (cl *Client) onCompletedPiece(t *Torrent, piece int) {
1942         t.pendingPieces.Remove(piece)
1943         t.pendAllChunkSpecs(piece)
1944         for _, conn := range t.conns {
1945                 conn.Have(piece)
1946                 for r := range conn.Requests {
1947                         if int(r.Index) == piece {
1948                                 conn.Cancel(r)
1949                         }
1950                 }
1951                 // Could check here if peer doesn't have piece, but due to caching
1952                 // some peers may have said they have a piece but they don't.
1953                 cl.upload(t, conn)
1954         }
1955 }
1956
1957 func (cl *Client) onFailedPiece(t *Torrent, piece int) {
1958         if t.pieceAllDirty(piece) {
1959                 t.pendAllChunkSpecs(piece)
1960         }
1961         if !t.wantPieceIndex(piece) {
1962                 return
1963         }
1964         cl.openNewConns(t)
1965         for _, conn := range t.conns {
1966                 if conn.PeerHasPiece(piece) {
1967                         conn.updateRequests()
1968                 }
1969         }
1970 }
1971
1972 func (cl *Client) pieceChanged(t *Torrent, piece int) {
1973         correct := t.pieceComplete(piece)
1974         defer cl.event.Broadcast()
1975         if correct {
1976                 cl.onCompletedPiece(t, piece)
1977         } else {
1978                 cl.onFailedPiece(t, piece)
1979         }
1980         if t.updatePiecePriority(piece) {
1981                 t.piecePriorityChanged(piece)
1982         }
1983         t.publishPieceChange(piece)
1984 }
1985
1986 func (cl *Client) verifyPiece(t *Torrent, piece int) {
1987         cl.mu.Lock()
1988         defer cl.mu.Unlock()
1989         p := &t.pieces[piece]
1990         for p.Hashing || t.storage == nil {
1991                 cl.event.Wait()
1992         }
1993         p.QueuedForHash = false
1994         if t.isClosed() || t.pieceComplete(piece) {
1995                 t.updatePiecePriority(piece)
1996                 t.publishPieceChange(piece)
1997                 return
1998         }
1999         p.Hashing = true
2000         t.publishPieceChange(piece)
2001         cl.mu.Unlock()
2002         sum := t.hashPiece(piece)
2003         cl.mu.Lock()
2004         select {
2005         case <-t.closing:
2006                 return
2007         default:
2008         }
2009         p.Hashing = false
2010         cl.pieceHashed(t, piece, sum == p.Hash)
2011 }
2012
2013 // Returns handles to all the torrents loaded in the Client.
2014 func (cl *Client) Torrents() (ret []*Torrent) {
2015         cl.mu.Lock()
2016         for _, t := range cl.torrents {
2017                 ret = append(ret, t)
2018         }
2019         cl.mu.Unlock()
2020         return
2021 }
2022
2023 func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
2024         spec, err := TorrentSpecFromMagnetURI(uri)
2025         if err != nil {
2026                 return
2027         }
2028         T, _, err = cl.AddTorrentSpec(spec)
2029         return
2030 }
2031
2032 func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
2033         T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
2034         var ss []string
2035         missinggo.CastSlice(&ss, mi.Nodes)
2036         cl.AddDHTNodes(ss)
2037         return
2038 }
2039
2040 func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
2041         mi, err := metainfo.LoadFromFile(filename)
2042         if err != nil {
2043                 return
2044         }
2045         return cl.AddTorrent(mi)
2046 }
2047
2048 func (cl *Client) DHT() *dht.Server {
2049         return cl.dHT
2050 }
2051
2052 func (cl *Client) AddDHTNodes(nodes []string) {
2053         for _, n := range nodes {
2054                 hmp := missinggo.SplitHostMaybePort(n)
2055                 ip := net.ParseIP(hmp.Host)
2056                 if ip == nil {
2057                         log.Printf("won't add DHT node with bad IP: %q", hmp.Host)
2058                         continue
2059                 }
2060                 ni := dht.NodeInfo{
2061                         Addr: dht.NewAddr(&net.UDPAddr{
2062                                 IP:   ip,
2063                                 Port: hmp.Port,
2064                         }),
2065                 }
2066                 cl.DHT().AddNode(ni)
2067         }
2068 }