]> Sergey Matveev's repositories - btrtrc.git/blob - client.go
66b2d81ebcecc0bcdd95c5ca5d4a7d175849f6a3
[btrtrc.git] / client.go
1 package torrent
2
3 import (
4         "bufio"
5         "bytes"
6         "crypto/rand"
7         "crypto/sha1"
8         "encoding/hex"
9         "errors"
10         "expvar"
11         "fmt"
12         "io"
13         "log"
14         "math/big"
15         mathRand "math/rand"
16         "net"
17         "net/url"
18         "os"
19         "path/filepath"
20         "sort"
21         "strconv"
22         "strings"
23         "time"
24
25         "github.com/anacrolix/missinggo"
26         "github.com/anacrolix/missinggo/bitmap"
27         "github.com/anacrolix/missinggo/pproffd"
28         "github.com/anacrolix/missinggo/pubsub"
29         "github.com/anacrolix/sync"
30         "github.com/anacrolix/utp"
31         "github.com/edsrzf/mmap-go"
32
33         "github.com/anacrolix/torrent/bencode"
34         "github.com/anacrolix/torrent/dht"
35         "github.com/anacrolix/torrent/iplist"
36         "github.com/anacrolix/torrent/metainfo"
37         "github.com/anacrolix/torrent/mse"
38         pp "github.com/anacrolix/torrent/peer_protocol"
39         "github.com/anacrolix/torrent/storage"
40         "github.com/anacrolix/torrent/tracker"
41 )
42
43 // I could move a lot of these counters to their own file, but I suspect they
44 // may be attached to a Client someday.
45 var (
46         unwantedChunksReceived   = expvar.NewInt("chunksReceivedUnwanted")
47         unexpectedChunksReceived = expvar.NewInt("chunksReceivedUnexpected")
48         chunksReceived           = expvar.NewInt("chunksReceived")
49
50         peersAddedBySource = expvar.NewMap("peersAddedBySource")
51
52         uploadChunksPosted    = expvar.NewInt("uploadChunksPosted")
53         unexpectedCancels     = expvar.NewInt("unexpectedCancels")
54         postedCancels         = expvar.NewInt("postedCancels")
55         duplicateConnsAvoided = expvar.NewInt("duplicateConnsAvoided")
56
57         pieceHashedCorrect    = expvar.NewInt("pieceHashedCorrect")
58         pieceHashedNotCorrect = expvar.NewInt("pieceHashedNotCorrect")
59
60         unsuccessfulDials = expvar.NewInt("dialSuccessful")
61         successfulDials   = expvar.NewInt("dialUnsuccessful")
62
63         acceptUTP    = expvar.NewInt("acceptUTP")
64         acceptTCP    = expvar.NewInt("acceptTCP")
65         acceptReject = expvar.NewInt("acceptReject")
66
67         peerExtensions                    = expvar.NewMap("peerExtensions")
68         completedHandshakeConnectionFlags = expvar.NewMap("completedHandshakeConnectionFlags")
69         // Count of connections to peer with same client ID.
70         connsToSelf = expvar.NewInt("connsToSelf")
71         // Number of completed connections to a client we're already connected with.
72         duplicateClientConns       = expvar.NewInt("duplicateClientConns")
73         receivedMessageTypes       = expvar.NewMap("receivedMessageTypes")
74         receivedKeepalives         = expvar.NewInt("receivedKeepalives")
75         supportedExtensionMessages = expvar.NewMap("supportedExtensionMessages")
76         postedMessageTypes         = expvar.NewMap("postedMessageTypes")
77         postedKeepalives           = expvar.NewInt("postedKeepalives")
78         // Requests received for pieces we don't have.
79         requestsReceivedForMissingPieces = expvar.NewInt("requestsReceivedForMissingPieces")
80 )
81
82 const (
83         // Justification for set bits follows.
84         //
85         // Extension protocol ([5]|=0x10):
86         // http://www.bittorrent.org/beps/bep_0010.html
87         //
88         // Fast Extension ([7]|=0x04):
89         // http://bittorrent.org/beps/bep_0006.html.
90         // Disabled until AllowedFast is implemented.
91         //
92         // DHT ([7]|=1):
93         // http://www.bittorrent.org/beps/bep_0005.html
94         defaultExtensionBytes = "\x00\x00\x00\x00\x00\x10\x00\x01"
95
96         socketsPerTorrent     = 80
97         torrentPeersHighWater = 200
98         torrentPeersLowWater  = 50
99
100         // Limit how long handshake can take. This is to reduce the lingering
101         // impact of a few bad apples. 4s loses 1% of successful handshakes that
102         // are obtained with 60s timeout, and 5% of unsuccessful handshakes.
103         btHandshakeTimeout = 4 * time.Second
104         handshakesTimeout  = 20 * time.Second
105
106         // These are our extended message IDs.
107         metadataExtendedId = iota + 1 // 0 is reserved for deleting keys
108         pexExtendedId
109
110         // Updated occasionally to when there's been some changes to client
111         // behaviour in case other clients are assuming anything of us. See also
112         // `bep20`.
113         extendedHandshakeClientVersion = "go.torrent dev 20150624"
114 )
115
116 // Currently doesn't really queue, but should in the future.
117 func (cl *Client) queuePieceCheck(t *Torrent, pieceIndex int) {
118         piece := &t.pieces[pieceIndex]
119         if piece.QueuedForHash {
120                 return
121         }
122         piece.QueuedForHash = true
123         t.publishPieceChange(int(pieceIndex))
124         go cl.verifyPiece(t, int(pieceIndex))
125 }
126
127 // Queue a piece check if one isn't already queued, and the piece has never
128 // been checked before.
129 func (cl *Client) queueFirstHash(t *Torrent, piece int) {
130         p := &t.pieces[piece]
131         if p.EverHashed || p.Hashing || p.QueuedForHash || t.pieceComplete(piece) {
132                 return
133         }
134         cl.queuePieceCheck(t, piece)
135 }
136
137 // Clients contain zero or more Torrents. A client manages a blocklist, the
138 // TCP/UDP protocol ports, and DHT as desired.
139 type Client struct {
140         halfOpenLimit  int
141         peerID         [20]byte
142         listeners      []net.Listener
143         utpSock        *utp.Socket
144         dHT            *dht.Server
145         ipBlockList    iplist.Ranger
146         bannedTorrents map[metainfo.Hash]struct{}
147         config         Config
148         pruneTimer     *time.Timer
149         extensionBytes peerExtensionBytes
150         // Set of addresses that have our client ID. This intentionally will
151         // include ourselves if we end up trying to connect to our own address
152         // through legitimate channels.
153         dopplegangerAddrs map[string]struct{}
154
155         defaultStorage storage.I
156
157         mu     sync.RWMutex
158         event  sync.Cond
159         closed missinggo.Event
160
161         torrents map[metainfo.Hash]*Torrent
162 }
163
164 func (me *Client) IPBlockList() iplist.Ranger {
165         me.mu.Lock()
166         defer me.mu.Unlock()
167         return me.ipBlockList
168 }
169
170 func (me *Client) SetIPBlockList(list iplist.Ranger) {
171         me.mu.Lock()
172         defer me.mu.Unlock()
173         me.ipBlockList = list
174         if me.dHT != nil {
175                 me.dHT.SetIPBlockList(list)
176         }
177 }
178
179 func (me *Client) PeerID() string {
180         return string(me.peerID[:])
181 }
182
183 func (me *Client) ListenAddr() (addr net.Addr) {
184         for _, l := range me.listeners {
185                 addr = l.Addr()
186                 break
187         }
188         return
189 }
190
191 type hashSorter struct {
192         Hashes []metainfo.Hash
193 }
194
195 func (me hashSorter) Len() int {
196         return len(me.Hashes)
197 }
198
199 func (me hashSorter) Less(a, b int) bool {
200         return (&big.Int{}).SetBytes(me.Hashes[a][:]).Cmp((&big.Int{}).SetBytes(me.Hashes[b][:])) < 0
201 }
202
203 func (me hashSorter) Swap(a, b int) {
204         me.Hashes[a], me.Hashes[b] = me.Hashes[b], me.Hashes[a]
205 }
206
207 func (cl *Client) sortedTorrents() (ret []*Torrent) {
208         var hs hashSorter
209         for ih := range cl.torrents {
210                 hs.Hashes = append(hs.Hashes, ih)
211         }
212         sort.Sort(hs)
213         for _, ih := range hs.Hashes {
214                 ret = append(ret, cl.torrent(ih))
215         }
216         return
217 }
218
219 // Writes out a human readable status of the client, such as for writing to a
220 // HTTP status page.
221 func (cl *Client) WriteStatus(_w io.Writer) {
222         cl.mu.RLock()
223         defer cl.mu.RUnlock()
224         w := bufio.NewWriter(_w)
225         defer w.Flush()
226         if addr := cl.ListenAddr(); addr != nil {
227                 fmt.Fprintf(w, "Listening on %s\n", cl.ListenAddr())
228         } else {
229                 fmt.Fprintln(w, "Not listening!")
230         }
231         fmt.Fprintf(w, "Peer ID: %+q\n", cl.peerID)
232         if cl.dHT != nil {
233                 dhtStats := cl.dHT.Stats()
234                 fmt.Fprintf(w, "DHT nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
235                 fmt.Fprintf(w, "DHT Server ID: %x\n", cl.dHT.ID())
236                 fmt.Fprintf(w, "DHT port: %d\n", missinggo.AddrPort(cl.dHT.Addr()))
237                 fmt.Fprintf(w, "DHT announces: %d\n", dhtStats.ConfirmedAnnounces)
238                 fmt.Fprintf(w, "Outstanding transactions: %d\n", dhtStats.OutstandingTransactions)
239         }
240         fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrents))
241         fmt.Fprintln(w)
242         for _, t := range cl.sortedTorrents() {
243                 if t.name() == "" {
244                         fmt.Fprint(w, "<unknown name>")
245                 } else {
246                         fmt.Fprint(w, t.name())
247                 }
248                 fmt.Fprint(w, "\n")
249                 if t.haveInfo() {
250                         fmt.Fprintf(w, "%f%% of %d bytes", 100*(1-float64(t.bytesLeft())/float64(t.length)), t.length)
251                 } else {
252                         w.WriteString("<missing metainfo>")
253                 }
254                 fmt.Fprint(w, "\n")
255                 t.writeStatus(w, cl)
256                 fmt.Fprintln(w)
257         }
258 }
259
260 // Calculates the number of pieces to set to Readahead priority, after the
261 // Now, and Next pieces.
262 func readaheadPieces(readahead, pieceLength int64) (ret int) {
263         // Expand the readahead to fit any partial pieces. Subtract 1 for the
264         // "next" piece that is assigned.
265         ret = int((readahead+pieceLength-1)/pieceLength - 1)
266         // Lengthen the "readahead tail" to smooth blockiness that occurs when the
267         // piece length is much larger than the readahead.
268         if ret < 2 {
269                 ret++
270         }
271         return
272 }
273
274 func (cl *Client) configDir() string {
275         if cl.config.ConfigDir == "" {
276                 return filepath.Join(os.Getenv("HOME"), ".config/torrent")
277         }
278         return cl.config.ConfigDir
279 }
280
281 // The directory where the Client expects to find and store configuration
282 // data. Defaults to $HOME/.config/torrent.
283 func (cl *Client) ConfigDir() string {
284         return cl.configDir()
285 }
286
287 func loadPackedBlocklist(filename string) (ret iplist.Ranger, err error) {
288         f, err := os.Open(filename)
289         if os.IsNotExist(err) {
290                 err = nil
291                 return
292         }
293         if err != nil {
294                 return
295         }
296         defer f.Close()
297         mm, err := mmap.Map(f, mmap.RDONLY, 0)
298         if err != nil {
299                 return
300         }
301         ret = iplist.NewFromPacked(mm)
302         return
303 }
304
305 func (cl *Client) setEnvBlocklist() (err error) {
306         filename := os.Getenv("TORRENT_BLOCKLIST_FILE")
307         defaultBlocklist := filename == ""
308         if defaultBlocklist {
309                 cl.ipBlockList, err = loadPackedBlocklist(filepath.Join(cl.configDir(), "packed-blocklist"))
310                 if err != nil {
311                         return
312                 }
313                 if cl.ipBlockList != nil {
314                         return
315                 }
316                 filename = filepath.Join(cl.configDir(), "blocklist")
317         }
318         f, err := os.Open(filename)
319         if err != nil {
320                 if defaultBlocklist {
321                         err = nil
322                 }
323                 return
324         }
325         defer f.Close()
326         cl.ipBlockList, err = iplist.NewFromReader(f)
327         return
328 }
329
330 func (cl *Client) initBannedTorrents() error {
331         f, err := os.Open(filepath.Join(cl.configDir(), "banned_infohashes"))
332         if err != nil {
333                 if os.IsNotExist(err) {
334                         return nil
335                 }
336                 return fmt.Errorf("error opening banned infohashes file: %s", err)
337         }
338         defer f.Close()
339         scanner := bufio.NewScanner(f)
340         cl.bannedTorrents = make(map[metainfo.Hash]struct{})
341         for scanner.Scan() {
342                 if strings.HasPrefix(strings.TrimSpace(scanner.Text()), "#") {
343                         continue
344                 }
345                 var ihs string
346                 n, err := fmt.Sscanf(scanner.Text(), "%x", &ihs)
347                 if err != nil {
348                         return fmt.Errorf("error reading infohash: %s", err)
349                 }
350                 if n != 1 {
351                         continue
352                 }
353                 if len(ihs) != 20 {
354                         return errors.New("bad infohash")
355                 }
356                 var ih metainfo.Hash
357                 missinggo.CopyExact(&ih, ihs)
358                 cl.bannedTorrents[ih] = struct{}{}
359         }
360         if err := scanner.Err(); err != nil {
361                 return fmt.Errorf("error scanning file: %s", err)
362         }
363         return nil
364 }
365
366 // Creates a new client.
367 func NewClient(cfg *Config) (cl *Client, err error) {
368         if cfg == nil {
369                 cfg = &Config{}
370         }
371
372         defer func() {
373                 if err != nil {
374                         cl = nil
375                 }
376         }()
377         cl = &Client{
378                 halfOpenLimit:     socketsPerTorrent,
379                 config:            *cfg,
380                 defaultStorage:    cfg.DefaultStorage,
381                 dopplegangerAddrs: make(map[string]struct{}),
382                 torrents:          make(map[metainfo.Hash]*Torrent),
383         }
384         missinggo.CopyExact(&cl.extensionBytes, defaultExtensionBytes)
385         cl.event.L = &cl.mu
386         if cl.defaultStorage == nil {
387                 cl.defaultStorage = storage.NewFile(cfg.DataDir)
388         }
389         if cfg.IPBlocklist != nil {
390                 cl.ipBlockList = cfg.IPBlocklist
391         } else if !cfg.NoDefaultBlocklist {
392                 err = cl.setEnvBlocklist()
393                 if err != nil {
394                         return
395                 }
396         }
397
398         if err = cl.initBannedTorrents(); err != nil {
399                 err = fmt.Errorf("error initing banned torrents: %s", err)
400                 return
401         }
402
403         if cfg.PeerID != "" {
404                 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
405         } else {
406                 o := copy(cl.peerID[:], bep20)
407                 _, err = rand.Read(cl.peerID[o:])
408                 if err != nil {
409                         panic("error generating peer id")
410                 }
411         }
412
413         // Returns the laddr string to listen on for the next Listen call.
414         listenAddr := func() string {
415                 if addr := cl.ListenAddr(); addr != nil {
416                         return addr.String()
417                 }
418                 if cfg.ListenAddr == "" {
419                         return ":50007"
420                 }
421                 return cfg.ListenAddr
422         }
423         if !cl.config.DisableTCP {
424                 var l net.Listener
425                 l, err = net.Listen(func() string {
426                         if cl.config.DisableIPv6 {
427                                 return "tcp4"
428                         } else {
429                                 return "tcp"
430                         }
431                 }(), listenAddr())
432                 if err != nil {
433                         return
434                 }
435                 cl.listeners = append(cl.listeners, l)
436                 go cl.acceptConnections(l, false)
437         }
438         if !cl.config.DisableUTP {
439                 cl.utpSock, err = utp.NewSocket(func() string {
440                         if cl.config.DisableIPv6 {
441                                 return "udp4"
442                         } else {
443                                 return "udp"
444                         }
445                 }(), listenAddr())
446                 if err != nil {
447                         return
448                 }
449                 cl.listeners = append(cl.listeners, cl.utpSock)
450                 go cl.acceptConnections(cl.utpSock, true)
451         }
452         if !cfg.NoDHT {
453                 dhtCfg := cfg.DHTConfig
454                 if dhtCfg.IPBlocklist == nil {
455                         dhtCfg.IPBlocklist = cl.ipBlockList
456                 }
457                 if dhtCfg.Addr == "" {
458                         dhtCfg.Addr = listenAddr()
459                 }
460                 if dhtCfg.Conn == nil && cl.utpSock != nil {
461                         dhtCfg.Conn = cl.utpSock
462                 }
463                 cl.dHT, err = dht.NewServer(&dhtCfg)
464                 if err != nil {
465                         return
466                 }
467         }
468
469         return
470 }
471
472 // Stops the client. All connections to peers are closed and all activity will
473 // come to a halt.
474 func (me *Client) Close() {
475         me.mu.Lock()
476         defer me.mu.Unlock()
477         me.closed.Set()
478         if me.dHT != nil {
479                 me.dHT.Close()
480         }
481         for _, l := range me.listeners {
482                 l.Close()
483         }
484         for _, t := range me.torrents {
485                 t.close()
486         }
487         me.event.Broadcast()
488 }
489
490 var ipv6BlockRange = iplist.Range{Description: "non-IPv4 address"}
491
492 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
493         if cl.ipBlockList == nil {
494                 return
495         }
496         ip4 := ip.To4()
497         // If blocklists are enabled, then block non-IPv4 addresses, because
498         // blocklists do not yet support IPv6.
499         if ip4 == nil {
500                 if missinggo.CryHeard() {
501                         log.Printf("blocking non-IPv4 address: %s", ip)
502                 }
503                 r = ipv6BlockRange
504                 blocked = true
505                 return
506         }
507         return cl.ipBlockList.Lookup(ip4)
508 }
509
510 func (cl *Client) waitAccept() {
511         cl.mu.Lock()
512         defer cl.mu.Unlock()
513         for {
514                 for _, t := range cl.torrents {
515                         if cl.wantConns(t) {
516                                 return
517                         }
518                 }
519                 if cl.closed.IsSet() {
520                         return
521                 }
522                 cl.event.Wait()
523         }
524 }
525
526 func (cl *Client) acceptConnections(l net.Listener, utp bool) {
527         for {
528                 cl.waitAccept()
529                 conn, err := l.Accept()
530                 conn = pproffd.WrapNetConn(conn)
531                 if cl.closed.IsSet() {
532                         if conn != nil {
533                                 conn.Close()
534                         }
535                         return
536                 }
537                 if err != nil {
538                         log.Print(err)
539                         // I think something harsher should happen here? Our accept
540                         // routine just fucked off.
541                         return
542                 }
543                 if utp {
544                         acceptUTP.Add(1)
545                 } else {
546                         acceptTCP.Add(1)
547                 }
548                 cl.mu.RLock()
549                 doppleganger := cl.dopplegangerAddr(conn.RemoteAddr().String())
550                 _, blocked := cl.ipBlockRange(missinggo.AddrIP(conn.RemoteAddr()))
551                 cl.mu.RUnlock()
552                 if blocked || doppleganger {
553                         acceptReject.Add(1)
554                         // log.Printf("inbound connection from %s blocked by %s", conn.RemoteAddr(), blockRange)
555                         conn.Close()
556                         continue
557                 }
558                 go cl.incomingConnection(conn, utp)
559         }
560 }
561
562 func (cl *Client) incomingConnection(nc net.Conn, utp bool) {
563         defer nc.Close()
564         if tc, ok := nc.(*net.TCPConn); ok {
565                 tc.SetLinger(0)
566         }
567         c := newConnection()
568         c.conn = nc
569         c.rw = nc
570         c.Discovery = peerSourceIncoming
571         c.uTP = utp
572         err := cl.runReceivedConn(c)
573         if err != nil {
574                 // log.Print(err)
575         }
576 }
577
578 // Returns a handle to the given torrent, if it's present in the client.
579 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
580         cl.mu.Lock()
581         defer cl.mu.Unlock()
582         t, ok = cl.torrents[ih]
583         return
584 }
585
586 func (me *Client) torrent(ih metainfo.Hash) *Torrent {
587         return me.torrents[ih]
588 }
589
590 type dialResult struct {
591         Conn net.Conn
592         UTP  bool
593 }
594
595 func doDial(dial func(addr string, t *Torrent) (net.Conn, error), ch chan dialResult, utp bool, addr string, t *Torrent) {
596         conn, err := dial(addr, t)
597         if err != nil {
598                 if conn != nil {
599                         conn.Close()
600                 }
601                 conn = nil // Pedantic
602         }
603         ch <- dialResult{conn, utp}
604         if err == nil {
605                 successfulDials.Add(1)
606                 return
607         }
608         unsuccessfulDials.Add(1)
609 }
610
611 func reducedDialTimeout(max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
612         ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
613         if ret < minDialTimeout {
614                 ret = minDialTimeout
615         }
616         return
617 }
618
619 // Returns whether an address is known to connect to a client with our own ID.
620 func (me *Client) dopplegangerAddr(addr string) bool {
621         _, ok := me.dopplegangerAddrs[addr]
622         return ok
623 }
624
625 // Start the process of connecting to the given peer for the given torrent if
626 // appropriate.
627 func (me *Client) initiateConn(peer Peer, t *Torrent) {
628         if peer.Id == me.peerID {
629                 return
630         }
631         addr := net.JoinHostPort(peer.IP.String(), fmt.Sprintf("%d", peer.Port))
632         if me.dopplegangerAddr(addr) || t.addrActive(addr) {
633                 duplicateConnsAvoided.Add(1)
634                 return
635         }
636         if r, ok := me.ipBlockRange(peer.IP); ok {
637                 log.Printf("outbound connect to %s blocked by IP blocklist rule %s", peer.IP, r)
638                 return
639         }
640         t.halfOpen[addr] = struct{}{}
641         go me.outgoingConnection(t, addr, peer.Source)
642 }
643
644 func (me *Client) dialTimeout(t *Torrent) time.Duration {
645         me.mu.Lock()
646         pendingPeers := len(t.peers)
647         me.mu.Unlock()
648         return reducedDialTimeout(nominalDialTimeout, me.halfOpenLimit, pendingPeers)
649 }
650
651 func (me *Client) dialTCP(addr string, t *Torrent) (c net.Conn, err error) {
652         c, err = net.DialTimeout("tcp", addr, me.dialTimeout(t))
653         if err == nil {
654                 c.(*net.TCPConn).SetLinger(0)
655         }
656         return
657 }
658
659 func (me *Client) dialUTP(addr string, t *Torrent) (c net.Conn, err error) {
660         return me.utpSock.DialTimeout(addr, me.dialTimeout(t))
661 }
662
663 // Returns a connection over UTP or TCP, whichever is first to connect.
664 func (me *Client) dialFirst(addr string, t *Torrent) (conn net.Conn, utp bool) {
665         // Initiate connections via TCP and UTP simultaneously. Use the first one
666         // that succeeds.
667         left := 0
668         if !me.config.DisableUTP {
669                 left++
670         }
671         if !me.config.DisableTCP {
672                 left++
673         }
674         resCh := make(chan dialResult, left)
675         if !me.config.DisableUTP {
676                 go doDial(me.dialUTP, resCh, true, addr, t)
677         }
678         if !me.config.DisableTCP {
679                 go doDial(me.dialTCP, resCh, false, addr, t)
680         }
681         var res dialResult
682         // Wait for a successful connection.
683         for ; left > 0 && res.Conn == nil; left-- {
684                 res = <-resCh
685         }
686         if left > 0 {
687                 // There are still incompleted dials.
688                 go func() {
689                         for ; left > 0; left-- {
690                                 conn := (<-resCh).Conn
691                                 if conn != nil {
692                                         conn.Close()
693                                 }
694                         }
695                 }()
696         }
697         conn = res.Conn
698         utp = res.UTP
699         return
700 }
701
702 func (me *Client) noLongerHalfOpen(t *Torrent, addr string) {
703         if _, ok := t.halfOpen[addr]; !ok {
704                 panic("invariant broken")
705         }
706         delete(t.halfOpen, addr)
707         me.openNewConns(t)
708 }
709
710 // Performs initiator handshakes and returns a connection. Returns nil
711 // *connection if no connection for valid reasons.
712 func (me *Client) handshakesConnection(nc net.Conn, t *Torrent, encrypted, utp bool) (c *connection, err error) {
713         c = newConnection()
714         c.conn = nc
715         c.rw = nc
716         c.encrypted = encrypted
717         c.uTP = utp
718         err = nc.SetDeadline(time.Now().Add(handshakesTimeout))
719         if err != nil {
720                 return
721         }
722         ok, err := me.initiateHandshakes(c, t)
723         if !ok {
724                 c = nil
725         }
726         return
727 }
728
729 // Returns nil connection and nil error if no connection could be established
730 // for valid reasons.
731 func (me *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) {
732         nc, utp := me.dialFirst(addr, t)
733         if nc == nil {
734                 return
735         }
736         c, err = me.handshakesConnection(nc, t, !me.config.DisableEncryption, utp)
737         if err != nil {
738                 nc.Close()
739                 return
740         } else if c != nil {
741                 return
742         }
743         nc.Close()
744         if me.config.DisableEncryption {
745                 // We already tried without encryption.
746                 return
747         }
748         // Try again without encryption, using whichever protocol type worked last
749         // time.
750         if utp {
751                 nc, err = me.dialUTP(addr, t)
752         } else {
753                 nc, err = me.dialTCP(addr, t)
754         }
755         if err != nil {
756                 err = fmt.Errorf("error dialing for unencrypted connection: %s", err)
757                 return
758         }
759         c, err = me.handshakesConnection(nc, t, false, utp)
760         if err != nil || c == nil {
761                 nc.Close()
762         }
763         return
764 }
765
766 // Called to dial out and run a connection. The addr we're given is already
767 // considered half-open.
768 func (me *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
769         c, err := me.establishOutgoingConn(t, addr)
770         me.mu.Lock()
771         defer me.mu.Unlock()
772         // Don't release lock between here and addConnection, unless it's for
773         // failure.
774         me.noLongerHalfOpen(t, addr)
775         if err != nil {
776                 if me.config.Debug {
777                         log.Printf("error establishing outgoing connection: %s", err)
778                 }
779                 return
780         }
781         if c == nil {
782                 return
783         }
784         defer c.Close()
785         c.Discovery = ps
786         err = me.runInitiatedHandshookConn(c, t)
787         if err != nil {
788                 if me.config.Debug {
789                         log.Printf("error in established outgoing connection: %s", err)
790                 }
791         }
792 }
793
794 // The port number for incoming peer connections. 0 if the client isn't
795 // listening.
796 func (cl *Client) incomingPeerPort() int {
797         listenAddr := cl.ListenAddr()
798         if listenAddr == nil {
799                 return 0
800         }
801         return missinggo.AddrPort(listenAddr)
802 }
803
804 // Convert a net.Addr to its compact IP representation. Either 4 or 16 bytes
805 // per "yourip" field of http://www.bittorrent.org/beps/bep_0010.html.
806 func addrCompactIP(addr net.Addr) (string, error) {
807         host, _, err := net.SplitHostPort(addr.String())
808         if err != nil {
809                 return "", err
810         }
811         ip := net.ParseIP(host)
812         if v4 := ip.To4(); v4 != nil {
813                 if len(v4) != 4 {
814                         panic(v4)
815                 }
816                 return string(v4), nil
817         }
818         return string(ip.To16()), nil
819 }
820
821 func handshakeWriter(w io.Writer, bb <-chan []byte, done chan<- error) {
822         var err error
823         for b := range bb {
824                 _, err = w.Write(b)
825                 if err != nil {
826                         break
827                 }
828         }
829         done <- err
830 }
831
832 type (
833         peerExtensionBytes [8]byte
834         peerID             [20]byte
835 )
836
837 func (me *peerExtensionBytes) SupportsExtended() bool {
838         return me[5]&0x10 != 0
839 }
840
841 func (me *peerExtensionBytes) SupportsDHT() bool {
842         return me[7]&0x01 != 0
843 }
844
845 func (me *peerExtensionBytes) SupportsFast() bool {
846         return me[7]&0x04 != 0
847 }
848
849 type handshakeResult struct {
850         peerExtensionBytes
851         peerID
852         metainfo.Hash
853 }
854
855 // ih is nil if we expect the peer to declare the InfoHash, such as when the
856 // peer initiated the connection. Returns ok if the handshake was successful,
857 // and err if there was an unexpected condition other than the peer simply
858 // abandoning the handshake.
859 func handshake(sock io.ReadWriter, ih *metainfo.Hash, peerID [20]byte, extensions peerExtensionBytes) (res handshakeResult, ok bool, err error) {
860         // Bytes to be sent to the peer. Should never block the sender.
861         postCh := make(chan []byte, 4)
862         // A single error value sent when the writer completes.
863         writeDone := make(chan error, 1)
864         // Performs writes to the socket and ensures posts don't block.
865         go handshakeWriter(sock, postCh, writeDone)
866
867         defer func() {
868                 close(postCh) // Done writing.
869                 if !ok {
870                         return
871                 }
872                 if err != nil {
873                         panic(err)
874                 }
875                 // Wait until writes complete before returning from handshake.
876                 err = <-writeDone
877                 if err != nil {
878                         err = fmt.Errorf("error writing: %s", err)
879                 }
880         }()
881
882         post := func(bb []byte) {
883                 select {
884                 case postCh <- bb:
885                 default:
886                         panic("mustn't block while posting")
887                 }
888         }
889
890         post([]byte(pp.Protocol))
891         post(extensions[:])
892         if ih != nil { // We already know what we want.
893                 post(ih[:])
894                 post(peerID[:])
895         }
896         var b [68]byte
897         _, err = io.ReadFull(sock, b[:68])
898         if err != nil {
899                 err = nil
900                 return
901         }
902         if string(b[:20]) != pp.Protocol {
903                 return
904         }
905         missinggo.CopyExact(&res.peerExtensionBytes, b[20:28])
906         missinggo.CopyExact(&res.Hash, b[28:48])
907         missinggo.CopyExact(&res.peerID, b[48:68])
908         peerExtensions.Add(hex.EncodeToString(res.peerExtensionBytes[:]), 1)
909
910         // TODO: Maybe we can just drop peers here if we're not interested. This
911         // could prevent them trying to reconnect, falsely believing there was
912         // just a problem.
913         if ih == nil { // We were waiting for the peer to tell us what they wanted.
914                 post(res.Hash[:])
915                 post(peerID[:])
916         }
917
918         ok = true
919         return
920 }
921
922 // Wraps a raw connection and provides the interface we want for using the
923 // connection in the message loop.
924 type deadlineReader struct {
925         nc net.Conn
926         r  io.Reader
927 }
928
929 func (me deadlineReader) Read(b []byte) (n int, err error) {
930         // Keep-alives should be received every 2 mins. Give a bit of gracetime.
931         err = me.nc.SetReadDeadline(time.Now().Add(150 * time.Second))
932         if err != nil {
933                 err = fmt.Errorf("error setting read deadline: %s", err)
934         }
935         n, err = me.r.Read(b)
936         // Convert common errors into io.EOF.
937         // if err != nil {
938         //      if opError, ok := err.(*net.OpError); ok && opError.Op == "read" && opError.Err == syscall.ECONNRESET {
939         //              err = io.EOF
940         //      } else if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
941         //              if n != 0 {
942         //                      panic(n)
943         //              }
944         //              err = io.EOF
945         //      }
946         // }
947         return
948 }
949
950 type readWriter struct {
951         io.Reader
952         io.Writer
953 }
954
955 func maybeReceiveEncryptedHandshake(rw io.ReadWriter, skeys [][]byte) (ret io.ReadWriter, encrypted bool, err error) {
956         var protocol [len(pp.Protocol)]byte
957         _, err = io.ReadFull(rw, protocol[:])
958         if err != nil {
959                 return
960         }
961         ret = readWriter{
962                 io.MultiReader(bytes.NewReader(protocol[:]), rw),
963                 rw,
964         }
965         if string(protocol[:]) == pp.Protocol {
966                 return
967         }
968         encrypted = true
969         ret, err = mse.ReceiveHandshake(ret, skeys)
970         return
971 }
972
973 func (cl *Client) receiveSkeys() (ret [][]byte) {
974         for ih := range cl.torrents {
975                 ret = append(ret, ih[:])
976         }
977         return
978 }
979
980 func (me *Client) initiateHandshakes(c *connection, t *Torrent) (ok bool, err error) {
981         if c.encrypted {
982                 c.rw, err = mse.InitiateHandshake(c.rw, t.infoHash[:], nil)
983                 if err != nil {
984                         return
985                 }
986         }
987         ih, ok, err := me.connBTHandshake(c, &t.infoHash)
988         if ih != t.infoHash {
989                 ok = false
990         }
991         return
992 }
993
994 // Do encryption and bittorrent handshakes as receiver.
995 func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
996         cl.mu.Lock()
997         skeys := cl.receiveSkeys()
998         cl.mu.Unlock()
999         if !cl.config.DisableEncryption {
1000                 c.rw, c.encrypted, err = maybeReceiveEncryptedHandshake(c.rw, skeys)
1001                 if err != nil {
1002                         if err == mse.ErrNoSecretKeyMatch {
1003                                 err = nil
1004                         }
1005                         return
1006                 }
1007         }
1008         ih, ok, err := cl.connBTHandshake(c, nil)
1009         if err != nil {
1010                 err = fmt.Errorf("error during bt handshake: %s", err)
1011                 return
1012         }
1013         if !ok {
1014                 return
1015         }
1016         cl.mu.Lock()
1017         t = cl.torrents[ih]
1018         cl.mu.Unlock()
1019         return
1020 }
1021
1022 // Returns !ok if handshake failed for valid reasons.
1023 func (cl *Client) connBTHandshake(c *connection, ih *metainfo.Hash) (ret metainfo.Hash, ok bool, err error) {
1024         res, ok, err := handshake(c.rw, ih, cl.peerID, cl.extensionBytes)
1025         if err != nil || !ok {
1026                 return
1027         }
1028         ret = res.Hash
1029         c.PeerExtensionBytes = res.peerExtensionBytes
1030         c.PeerID = res.peerID
1031         c.completedHandshake = time.Now()
1032         return
1033 }
1034
1035 func (cl *Client) runInitiatedHandshookConn(c *connection, t *Torrent) (err error) {
1036         if c.PeerID == cl.peerID {
1037                 // Only if we initiated the connection is the remote address a
1038                 // listen addr for a doppleganger.
1039                 connsToSelf.Add(1)
1040                 addr := c.conn.RemoteAddr().String()
1041                 cl.dopplegangerAddrs[addr] = struct{}{}
1042                 return
1043         }
1044         return cl.runHandshookConn(c, t)
1045 }
1046
1047 func (cl *Client) runReceivedConn(c *connection) (err error) {
1048         err = c.conn.SetDeadline(time.Now().Add(handshakesTimeout))
1049         if err != nil {
1050                 return
1051         }
1052         t, err := cl.receiveHandshakes(c)
1053         if err != nil {
1054                 err = fmt.Errorf("error receiving handshakes: %s", err)
1055                 return
1056         }
1057         if t == nil {
1058                 return
1059         }
1060         cl.mu.Lock()
1061         defer cl.mu.Unlock()
1062         if c.PeerID == cl.peerID {
1063                 return
1064         }
1065         return cl.runHandshookConn(c, t)
1066 }
1067
1068 func (cl *Client) runHandshookConn(c *connection, t *Torrent) (err error) {
1069         c.conn.SetWriteDeadline(time.Time{})
1070         c.rw = readWriter{
1071                 deadlineReader{c.conn, c.rw},
1072                 c.rw,
1073         }
1074         completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
1075         if !cl.addConnection(t, c) {
1076                 return
1077         }
1078         defer cl.dropConnection(t, c)
1079         go c.writer()
1080         go c.writeOptimizer(time.Minute)
1081         cl.sendInitialMessages(c, t)
1082         err = cl.connectionLoop(t, c)
1083         if err != nil {
1084                 err = fmt.Errorf("error during connection loop: %s", err)
1085         }
1086         return
1087 }
1088
1089 func (me *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
1090         if conn.PeerExtensionBytes.SupportsExtended() && me.extensionBytes.SupportsExtended() {
1091                 conn.Post(pp.Message{
1092                         Type:       pp.Extended,
1093                         ExtendedID: pp.HandshakeExtendedID,
1094                         ExtendedPayload: func() []byte {
1095                                 d := map[string]interface{}{
1096                                         "m": func() (ret map[string]int) {
1097                                                 ret = make(map[string]int, 2)
1098                                                 ret["ut_metadata"] = metadataExtendedId
1099                                                 if !me.config.DisablePEX {
1100                                                         ret["ut_pex"] = pexExtendedId
1101                                                 }
1102                                                 return
1103                                         }(),
1104                                         "v": extendedHandshakeClientVersion,
1105                                         // No upload queue is implemented yet.
1106                                         "reqq": 64,
1107                                 }
1108                                 if !me.config.DisableEncryption {
1109                                         d["e"] = 1
1110                                 }
1111                                 if torrent.metadataSizeKnown() {
1112                                         d["metadata_size"] = torrent.metadataSize()
1113                                 }
1114                                 if p := me.incomingPeerPort(); p != 0 {
1115                                         d["p"] = p
1116                                 }
1117                                 yourip, err := addrCompactIP(conn.remoteAddr())
1118                                 if err != nil {
1119                                         log.Printf("error calculating yourip field value in extension handshake: %s", err)
1120                                 } else {
1121                                         d["yourip"] = yourip
1122                                 }
1123                                 // log.Printf("sending %v", d)
1124                                 b, err := bencode.Marshal(d)
1125                                 if err != nil {
1126                                         panic(err)
1127                                 }
1128                                 return b
1129                         }(),
1130                 })
1131         }
1132         if torrent.haveAnyPieces() {
1133                 conn.Bitfield(torrent.bitfield())
1134         } else if me.extensionBytes.SupportsFast() && conn.PeerExtensionBytes.SupportsFast() {
1135                 conn.Post(pp.Message{
1136                         Type: pp.HaveNone,
1137                 })
1138         }
1139         if conn.PeerExtensionBytes.SupportsDHT() && me.extensionBytes.SupportsDHT() && me.dHT != nil {
1140                 conn.Post(pp.Message{
1141                         Type: pp.Port,
1142                         Port: uint16(missinggo.AddrPort(me.dHT.Addr())),
1143                 })
1144         }
1145 }
1146
1147 func (me *Client) peerUnchoked(torrent *Torrent, conn *connection) {
1148         conn.updateRequests()
1149 }
1150
1151 func (cl *Client) connCancel(t *Torrent, cn *connection, r request) (ok bool) {
1152         ok = cn.Cancel(r)
1153         if ok {
1154                 postedCancels.Add(1)
1155         }
1156         return
1157 }
1158
1159 func (cl *Client) connDeleteRequest(t *Torrent, cn *connection, r request) bool {
1160         if !cn.RequestPending(r) {
1161                 return false
1162         }
1163         delete(cn.Requests, r)
1164         return true
1165 }
1166
1167 func (cl *Client) requestPendingMetadata(t *Torrent, c *connection) {
1168         if t.haveInfo() {
1169                 return
1170         }
1171         if c.PeerExtensionIDs["ut_metadata"] == 0 {
1172                 // Peer doesn't support this.
1173                 return
1174         }
1175         // Request metadata pieces that we don't have in a random order.
1176         var pending []int
1177         for index := 0; index < t.metadataPieceCount(); index++ {
1178                 if !t.haveMetadataPiece(index) && !c.requestedMetadataPiece(index) {
1179                         pending = append(pending, index)
1180                 }
1181         }
1182         for _, i := range mathRand.Perm(len(pending)) {
1183                 c.requestMetadataPiece(pending[i])
1184         }
1185 }
1186
1187 func (cl *Client) completedMetadata(t *Torrent) {
1188         h := sha1.New()
1189         h.Write(t.metadataBytes)
1190         var ih metainfo.Hash
1191         missinggo.CopyExact(&ih, h.Sum(nil))
1192         if ih != t.infoHash {
1193                 log.Print("bad metadata")
1194                 t.invalidateMetadata()
1195                 return
1196         }
1197         var info metainfo.Info
1198         err := bencode.Unmarshal(t.metadataBytes, &info)
1199         if err != nil {
1200                 log.Printf("error unmarshalling metadata: %s", err)
1201                 t.invalidateMetadata()
1202                 return
1203         }
1204         // TODO(anacrolix): If this fails, I think something harsher should be
1205         // done.
1206         err = cl.setMetaData(t, &info, t.metadataBytes)
1207         if err != nil {
1208                 log.Printf("error setting metadata: %s", err)
1209                 t.invalidateMetadata()
1210                 return
1211         }
1212         if cl.config.Debug {
1213                 log.Printf("%s: got metadata from peers", t)
1214         }
1215 }
1216
1217 // Process incoming ut_metadata message.
1218 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) (err error) {
1219         var d map[string]int
1220         err = bencode.Unmarshal(payload, &d)
1221         if err != nil {
1222                 err = fmt.Errorf("error unmarshalling payload: %s: %q", err, payload)
1223                 return
1224         }
1225         msgType, ok := d["msg_type"]
1226         if !ok {
1227                 err = errors.New("missing msg_type field")
1228                 return
1229         }
1230         piece := d["piece"]
1231         switch msgType {
1232         case pp.DataMetadataExtensionMsgType:
1233                 if t.haveInfo() {
1234                         break
1235                 }
1236                 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
1237                 if begin < 0 || begin >= len(payload) {
1238                         log.Printf("got bad metadata piece")
1239                         break
1240                 }
1241                 if !c.requestedMetadataPiece(piece) {
1242                         log.Printf("got unexpected metadata piece %d", piece)
1243                         break
1244                 }
1245                 c.metadataRequests[piece] = false
1246                 t.saveMetadataPiece(piece, payload[begin:])
1247                 c.UsefulChunksReceived++
1248                 c.lastUsefulChunkReceived = time.Now()
1249                 if !t.haveAllMetadataPieces() {
1250                         break
1251                 }
1252                 cl.completedMetadata(t)
1253         case pp.RequestMetadataExtensionMsgType:
1254                 if !t.haveMetadataPiece(piece) {
1255                         c.Post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
1256                         break
1257                 }
1258                 start := (1 << 14) * piece
1259                 c.Post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1260         case pp.RejectMetadataExtensionMsgType:
1261         default:
1262                 err = errors.New("unknown msg_type value")
1263         }
1264         return
1265 }
1266
1267 func (me *Client) upload(t *Torrent, c *connection) {
1268         if me.config.NoUpload {
1269                 return
1270         }
1271         if !c.PeerInterested {
1272                 return
1273         }
1274         seeding := me.seeding(t)
1275         if !seeding && !t.connHasWantedPieces(c) {
1276                 return
1277         }
1278 another:
1279         for seeding || c.chunksSent < c.UsefulChunksReceived+6 {
1280                 c.Unchoke()
1281                 for r := range c.PeerRequests {
1282                         err := me.sendChunk(t, c, r)
1283                         if err != nil {
1284                                 if t.pieceComplete(int(r.Index)) && err == io.ErrUnexpectedEOF {
1285                                         // We had the piece, but not anymore.
1286                                 } else {
1287                                         log.Printf("error sending chunk %+v to peer: %s", r, err)
1288                                 }
1289                                 // If we failed to send a chunk, choke the peer to ensure they
1290                                 // flush all their requests. We've probably dropped a piece,
1291                                 // but there's no way to communicate this to the peer. If they
1292                                 // ask for it again, we'll kick them to allow us to send them
1293                                 // an updated bitfield.
1294                                 break another
1295                         }
1296                         delete(c.PeerRequests, r)
1297                         goto another
1298                 }
1299                 return
1300         }
1301         c.Choke()
1302 }
1303
1304 func (me *Client) sendChunk(t *Torrent, c *connection, r request) error {
1305         // Count the chunk being sent, even if it isn't.
1306         b := make([]byte, r.Length)
1307         p := t.info.Piece(int(r.Index))
1308         n, err := t.readAt(b, p.Offset()+int64(r.Begin))
1309         if n != len(b) {
1310                 if err == nil {
1311                         panic("expected error")
1312                 }
1313                 return err
1314         }
1315         c.Post(pp.Message{
1316                 Type:  pp.Piece,
1317                 Index: r.Index,
1318                 Begin: r.Begin,
1319                 Piece: b,
1320         })
1321         c.chunksSent++
1322         uploadChunksPosted.Add(1)
1323         c.lastChunkSent = time.Now()
1324         return nil
1325 }
1326
1327 // Processes incoming bittorrent messages. The client lock is held upon entry
1328 // and exit.
1329 func (me *Client) connectionLoop(t *Torrent, c *connection) error {
1330         decoder := pp.Decoder{
1331                 R:         bufio.NewReader(c.rw),
1332                 MaxLength: 256 * 1024,
1333         }
1334         for {
1335                 me.mu.Unlock()
1336                 var msg pp.Message
1337                 err := decoder.Decode(&msg)
1338                 me.mu.Lock()
1339                 if me.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
1340                         return nil
1341                 }
1342                 if err != nil {
1343                         return err
1344                 }
1345                 c.lastMessageReceived = time.Now()
1346                 if msg.Keepalive {
1347                         receivedKeepalives.Add(1)
1348                         continue
1349                 }
1350                 receivedMessageTypes.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
1351                 switch msg.Type {
1352                 case pp.Choke:
1353                         c.PeerChoked = true
1354                         c.Requests = nil
1355                         // We can then reset our interest.
1356                         c.updateRequests()
1357                 case pp.Reject:
1358                         me.connDeleteRequest(t, c, newRequest(msg.Index, msg.Begin, msg.Length))
1359                         c.updateRequests()
1360                 case pp.Unchoke:
1361                         c.PeerChoked = false
1362                         me.peerUnchoked(t, c)
1363                 case pp.Interested:
1364                         c.PeerInterested = true
1365                         me.upload(t, c)
1366                 case pp.NotInterested:
1367                         c.PeerInterested = false
1368                         c.Choke()
1369                 case pp.Have:
1370                         err = c.peerSentHave(int(msg.Index))
1371                 case pp.Request:
1372                         if c.Choked {
1373                                 break
1374                         }
1375                         if !c.PeerInterested {
1376                                 err = errors.New("peer sent request but isn't interested")
1377                                 break
1378                         }
1379                         if !t.havePiece(msg.Index.Int()) {
1380                                 // This isn't necessarily them screwing up. We can drop pieces
1381                                 // from our storage, and can't communicate this to peers
1382                                 // except by reconnecting.
1383                                 requestsReceivedForMissingPieces.Add(1)
1384                                 err = errors.New("peer requested piece we don't have")
1385                                 break
1386                         }
1387                         if c.PeerRequests == nil {
1388                                 c.PeerRequests = make(map[request]struct{}, maxRequests)
1389                         }
1390                         c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{}
1391                         me.upload(t, c)
1392                 case pp.Cancel:
1393                         req := newRequest(msg.Index, msg.Begin, msg.Length)
1394                         if !c.PeerCancel(req) {
1395                                 unexpectedCancels.Add(1)
1396                         }
1397                 case pp.Bitfield:
1398                         err = c.peerSentBitfield(msg.Bitfield)
1399                 case pp.HaveAll:
1400                         err = c.peerSentHaveAll()
1401                 case pp.HaveNone:
1402                         err = c.peerSentHaveNone()
1403                 case pp.Piece:
1404                         me.downloadedChunk(t, c, &msg)
1405                 case pp.Extended:
1406                         switch msg.ExtendedID {
1407                         case pp.HandshakeExtendedID:
1408                                 // TODO: Create a bencode struct for this.
1409                                 var d map[string]interface{}
1410                                 err = bencode.Unmarshal(msg.ExtendedPayload, &d)
1411                                 if err != nil {
1412                                         err = fmt.Errorf("error decoding extended message payload: %s", err)
1413                                         break
1414                                 }
1415                                 // log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d)
1416                                 if reqq, ok := d["reqq"]; ok {
1417                                         if i, ok := reqq.(int64); ok {
1418                                                 c.PeerMaxRequests = int(i)
1419                                         }
1420                                 }
1421                                 if v, ok := d["v"]; ok {
1422                                         c.PeerClientName = v.(string)
1423                                 }
1424                                 m, ok := d["m"]
1425                                 if !ok {
1426                                         err = errors.New("handshake missing m item")
1427                                         break
1428                                 }
1429                                 mTyped, ok := m.(map[string]interface{})
1430                                 if !ok {
1431                                         err = errors.New("handshake m value is not dict")
1432                                         break
1433                                 }
1434                                 if c.PeerExtensionIDs == nil {
1435                                         c.PeerExtensionIDs = make(map[string]byte, len(mTyped))
1436                                 }
1437                                 for name, v := range mTyped {
1438                                         id, ok := v.(int64)
1439                                         if !ok {
1440                                                 log.Printf("bad handshake m item extension ID type: %T", v)
1441                                                 continue
1442                                         }
1443                                         if id == 0 {
1444                                                 delete(c.PeerExtensionIDs, name)
1445                                         } else {
1446                                                 if c.PeerExtensionIDs[name] == 0 {
1447                                                         supportedExtensionMessages.Add(name, 1)
1448                                                 }
1449                                                 c.PeerExtensionIDs[name] = byte(id)
1450                                         }
1451                                 }
1452                                 metadata_sizeUntyped, ok := d["metadata_size"]
1453                                 if ok {
1454                                         metadata_size, ok := metadata_sizeUntyped.(int64)
1455                                         if !ok {
1456                                                 log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
1457                                         } else {
1458                                                 t.setMetadataSize(metadata_size, me)
1459                                         }
1460                                 }
1461                                 if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
1462                                         me.requestPendingMetadata(t, c)
1463                                 }
1464                         case metadataExtendedId:
1465                                 err = me.gotMetadataExtensionMsg(msg.ExtendedPayload, t, c)
1466                                 if err != nil {
1467                                         err = fmt.Errorf("error handling metadata extension message: %s", err)
1468                                 }
1469                         case pexExtendedId:
1470                                 if me.config.DisablePEX {
1471                                         break
1472                                 }
1473                                 var pexMsg peerExchangeMessage
1474                                 err := bencode.Unmarshal(msg.ExtendedPayload, &pexMsg)
1475                                 if err != nil {
1476                                         err = fmt.Errorf("error unmarshalling PEX message: %s", err)
1477                                         break
1478                                 }
1479                                 go func() {
1480                                         me.mu.Lock()
1481                                         me.addPeers(t, func() (ret []Peer) {
1482                                                 for i, cp := range pexMsg.Added {
1483                                                         p := Peer{
1484                                                                 IP:     make([]byte, 4),
1485                                                                 Port:   int(cp.Port),
1486                                                                 Source: peerSourcePEX,
1487                                                         }
1488                                                         if i < len(pexMsg.AddedFlags) && pexMsg.AddedFlags[i]&0x01 != 0 {
1489                                                                 p.SupportsEncryption = true
1490                                                         }
1491                                                         missinggo.CopyExact(p.IP, cp.IP[:])
1492                                                         ret = append(ret, p)
1493                                                 }
1494                                                 return
1495                                         }())
1496                                         me.mu.Unlock()
1497                                 }()
1498                         default:
1499                                 err = fmt.Errorf("unexpected extended message ID: %v", msg.ExtendedID)
1500                         }
1501                         if err != nil {
1502                                 // That client uses its own extension IDs for outgoing message
1503                                 // types, which is incorrect.
1504                                 if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) ||
1505                                         strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
1506                                         return nil
1507                                 }
1508                         }
1509                 case pp.Port:
1510                         if me.dHT == nil {
1511                                 break
1512                         }
1513                         pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
1514                         if err != nil {
1515                                 panic(err)
1516                         }
1517                         if msg.Port != 0 {
1518                                 pingAddr.Port = int(msg.Port)
1519                         }
1520                         _, err = me.dHT.Ping(pingAddr)
1521                 default:
1522                         err = fmt.Errorf("received unknown message type: %#v", msg.Type)
1523                 }
1524                 if err != nil {
1525                         return err
1526                 }
1527         }
1528 }
1529
1530 // Returns true if connection is removed from torrent.Conns.
1531 func (me *Client) deleteConnection(t *Torrent, c *connection) bool {
1532         for i0, _c := range t.conns {
1533                 if _c != c {
1534                         continue
1535                 }
1536                 i1 := len(t.conns) - 1
1537                 if i0 != i1 {
1538                         t.conns[i0] = t.conns[i1]
1539                 }
1540                 t.conns = t.conns[:i1]
1541                 return true
1542         }
1543         return false
1544 }
1545
1546 func (me *Client) dropConnection(t *Torrent, c *connection) {
1547         me.event.Broadcast()
1548         c.Close()
1549         if me.deleteConnection(t, c) {
1550                 me.openNewConns(t)
1551         }
1552 }
1553
1554 // Returns true if the connection is added.
1555 func (me *Client) addConnection(t *Torrent, c *connection) bool {
1556         if me.closed.IsSet() {
1557                 return false
1558         }
1559         select {
1560         case <-t.ceasingNetworking:
1561                 return false
1562         default:
1563         }
1564         if !me.wantConns(t) {
1565                 return false
1566         }
1567         for _, c0 := range t.conns {
1568                 if c.PeerID == c0.PeerID {
1569                         // Already connected to a client with that ID.
1570                         duplicateClientConns.Add(1)
1571                         return false
1572                 }
1573         }
1574         if len(t.conns) >= socketsPerTorrent {
1575                 c := t.worstBadConn(me)
1576                 if c == nil {
1577                         return false
1578                 }
1579                 if me.config.Debug && missinggo.CryHeard() {
1580                         log.Printf("%s: dropping connection to make room for new one:\n    %s", t, c)
1581                 }
1582                 c.Close()
1583                 me.deleteConnection(t, c)
1584         }
1585         if len(t.conns) >= socketsPerTorrent {
1586                 panic(len(t.conns))
1587         }
1588         t.conns = append(t.conns, c)
1589         c.t = t
1590         return true
1591 }
1592
1593 func (t *Torrent) readerPieces() (ret bitmap.Bitmap) {
1594         t.forReaderOffsetPieces(func(begin, end int) bool {
1595                 ret.AddRange(begin, end)
1596                 return true
1597         })
1598         return
1599 }
1600
1601 func (t *Torrent) needData() bool {
1602         if !t.haveInfo() {
1603                 return true
1604         }
1605         if t.pendingPieces.Len() != 0 {
1606                 return true
1607         }
1608         return !t.readerPieces().IterTyped(func(piece int) bool {
1609                 return t.pieceComplete(piece)
1610         })
1611 }
1612
1613 func (cl *Client) usefulConn(t *Torrent, c *connection) bool {
1614         if c.closed.IsSet() {
1615                 return false
1616         }
1617         if !t.haveInfo() {
1618                 return c.supportsExtension("ut_metadata")
1619         }
1620         if cl.seeding(t) {
1621                 return c.PeerInterested
1622         }
1623         return t.connHasWantedPieces(c)
1624 }
1625
1626 func (me *Client) wantConns(t *Torrent) bool {
1627         if !me.seeding(t) && !t.needData() {
1628                 return false
1629         }
1630         if len(t.conns) < socketsPerTorrent {
1631                 return true
1632         }
1633         return t.worstBadConn(me) != nil
1634 }
1635
1636 func (me *Client) openNewConns(t *Torrent) {
1637         select {
1638         case <-t.ceasingNetworking:
1639                 return
1640         default:
1641         }
1642         for len(t.peers) != 0 {
1643                 if !me.wantConns(t) {
1644                         return
1645                 }
1646                 if len(t.halfOpen) >= me.halfOpenLimit {
1647                         return
1648                 }
1649                 var (
1650                         k peersKey
1651                         p Peer
1652                 )
1653                 for k, p = range t.peers {
1654                         break
1655                 }
1656                 delete(t.peers, k)
1657                 me.initiateConn(p, t)
1658         }
1659         t.wantPeers.Broadcast()
1660 }
1661
1662 func (me *Client) addPeers(t *Torrent, peers []Peer) {
1663         for _, p := range peers {
1664                 if me.dopplegangerAddr(net.JoinHostPort(
1665                         p.IP.String(),
1666                         strconv.FormatInt(int64(p.Port), 10),
1667                 )) {
1668                         continue
1669                 }
1670                 if _, ok := me.ipBlockRange(p.IP); ok {
1671                         continue
1672                 }
1673                 if p.Port == 0 {
1674                         // The spec says to scrub these yourselves. Fine.
1675                         continue
1676                 }
1677                 t.addPeer(p, me)
1678         }
1679 }
1680
1681 func (cl *Client) cachedMetaInfoFilename(ih metainfo.Hash) string {
1682         return filepath.Join(cl.configDir(), "torrents", ih.HexString()+".torrent")
1683 }
1684
1685 func (cl *Client) saveTorrentFile(t *Torrent) error {
1686         path := cl.cachedMetaInfoFilename(t.infoHash)
1687         os.MkdirAll(filepath.Dir(path), 0777)
1688         f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
1689         if err != nil {
1690                 return fmt.Errorf("error opening file: %s", err)
1691         }
1692         defer f.Close()
1693         e := bencode.NewEncoder(f)
1694         err = e.Encode(t.metainfo())
1695         if err != nil {
1696                 return fmt.Errorf("error marshalling metainfo: %s", err)
1697         }
1698         mi, err := cl.torrentCacheMetaInfo(t.infoHash)
1699         if err != nil {
1700                 // For example, a script kiddy makes us load too many files, and we're
1701                 // able to save the torrent, but not load it again to check it.
1702                 return nil
1703         }
1704         if !bytes.Equal(mi.Info.Hash.Bytes(), t.infoHash[:]) {
1705                 log.Fatalf("%x != %x", mi.Info.Hash, t.infoHash[:])
1706         }
1707         return nil
1708 }
1709
1710 func (cl *Client) setMetaData(t *Torrent, md *metainfo.Info, bytes []byte) (err error) {
1711         err = t.setMetadata(md, bytes)
1712         if err != nil {
1713                 return
1714         }
1715         if !cl.config.DisableMetainfoCache {
1716                 if err := cl.saveTorrentFile(t); err != nil {
1717                         log.Printf("error saving torrent file for %s: %s", t, err)
1718                 }
1719         }
1720         cl.event.Broadcast()
1721         close(t.gotMetainfo)
1722         return
1723 }
1724
1725 // Prepare a Torrent without any attachment to a Client. That means we can
1726 // initialize fields all fields that don't require the Client without locking
1727 // it.
1728 func newTorrent(ih metainfo.Hash) (t *Torrent) {
1729         t = &Torrent{
1730                 infoHash:  ih,
1731                 chunkSize: defaultChunkSize,
1732                 peers:     make(map[peersKey]Peer),
1733
1734                 closing:           make(chan struct{}),
1735                 ceasingNetworking: make(chan struct{}),
1736
1737                 gotMetainfo: make(chan struct{}),
1738
1739                 halfOpen:          make(map[string]struct{}),
1740                 pieceStateChanges: pubsub.NewPubSub(),
1741         }
1742         return
1743 }
1744
1745 func init() {
1746         // For shuffling the tracker tiers.
1747         mathRand.Seed(time.Now().Unix())
1748 }
1749
1750 type trackerTier []string
1751
1752 // The trackers within each tier must be shuffled before use.
1753 // http://stackoverflow.com/a/12267471/149482
1754 // http://www.bittorrent.org/beps/bep_0012.html#order-of-processing
1755 func shuffleTier(tier trackerTier) {
1756         for i := range tier {
1757                 j := mathRand.Intn(i + 1)
1758                 tier[i], tier[j] = tier[j], tier[i]
1759         }
1760 }
1761
1762 func copyTrackers(base []trackerTier) (copy []trackerTier) {
1763         for _, tier := range base {
1764                 copy = append(copy, append(trackerTier(nil), tier...))
1765         }
1766         return
1767 }
1768
1769 func mergeTier(tier trackerTier, newURLs []string) trackerTier {
1770 nextURL:
1771         for _, url := range newURLs {
1772                 for _, trURL := range tier {
1773                         if trURL == url {
1774                                 continue nextURL
1775                         }
1776                 }
1777                 tier = append(tier, url)
1778         }
1779         return tier
1780 }
1781
1782 func (t *Torrent) addTrackers(announceList [][]string) {
1783         newTrackers := copyTrackers(t.trackers)
1784         for tierIndex, tier := range announceList {
1785                 if tierIndex < len(newTrackers) {
1786                         newTrackers[tierIndex] = mergeTier(newTrackers[tierIndex], tier)
1787                 } else {
1788                         newTrackers = append(newTrackers, mergeTier(nil, tier))
1789                 }
1790                 shuffleTier(newTrackers[tierIndex])
1791         }
1792         t.trackers = newTrackers
1793 }
1794
1795 // Don't call this before the info is available.
1796 func (t *Torrent) bytesCompleted() int64 {
1797         if !t.haveInfo() {
1798                 return 0
1799         }
1800         return t.info.TotalLength() - t.bytesLeft()
1801 }
1802
1803 // A file-like handle to some torrent data resource.
1804 type Handle interface {
1805         io.Reader
1806         io.Seeker
1807         io.Closer
1808         io.ReaderAt
1809 }
1810
1811 // Returns handles to the files in the torrent. This requires the metainfo is
1812 // available first.
1813 func (t *Torrent) Files() (ret []File) {
1814         t.cl.mu.Lock()
1815         info := t.Info()
1816         t.cl.mu.Unlock()
1817         if info == nil {
1818                 return
1819         }
1820         var offset int64
1821         for _, fi := range info.UpvertedFiles() {
1822                 ret = append(ret, File{
1823                         t,
1824                         strings.Join(append([]string{info.Name}, fi.Path...), "/"),
1825                         offset,
1826                         fi.Length,
1827                         fi,
1828                 })
1829                 offset += fi.Length
1830         }
1831         return
1832 }
1833
1834 func (t *Torrent) AddPeers(pp []Peer) error {
1835         cl := t.cl
1836         cl.mu.Lock()
1837         defer cl.mu.Unlock()
1838         cl.addPeers(t, pp)
1839         return nil
1840 }
1841
1842 // Marks the entire torrent for download. Requires the info first, see
1843 // GotInfo.
1844 func (t *Torrent) DownloadAll() {
1845         t.cl.mu.Lock()
1846         defer t.cl.mu.Unlock()
1847         t.pendPieceRange(0, t.numPieces())
1848 }
1849
1850 // Returns nil metainfo if it isn't in the cache. Checks that the retrieved
1851 // metainfo has the correct infohash.
1852 func (cl *Client) torrentCacheMetaInfo(ih metainfo.Hash) (mi *metainfo.MetaInfo, err error) {
1853         if cl.config.DisableMetainfoCache {
1854                 return
1855         }
1856         f, err := os.Open(cl.cachedMetaInfoFilename(ih))
1857         if err != nil {
1858                 if os.IsNotExist(err) {
1859                         err = nil
1860                 }
1861                 return
1862         }
1863         defer f.Close()
1864         dec := bencode.NewDecoder(f)
1865         err = dec.Decode(&mi)
1866         if err != nil {
1867                 return
1868         }
1869         if !bytes.Equal(mi.Info.Hash.Bytes(), ih[:]) {
1870                 err = fmt.Errorf("cached torrent has wrong infohash: %x != %x", mi.Info.Hash, ih[:])
1871                 return
1872         }
1873         return
1874 }
1875
1876 // Specifies a new torrent for adding to a client. There are helpers for
1877 // magnet URIs and torrent metainfo files.
1878 type TorrentSpec struct {
1879         // The tiered tracker URIs.
1880         Trackers [][]string
1881         InfoHash metainfo.Hash
1882         Info     *metainfo.InfoEx
1883         // The name to use if the Name field from the Info isn't available.
1884         DisplayName string
1885         // The chunk size to use for outbound requests. Defaults to 16KiB if not
1886         // set.
1887         ChunkSize int
1888         Storage   storage.I
1889 }
1890
1891 func TorrentSpecFromMagnetURI(uri string) (spec *TorrentSpec, err error) {
1892         m, err := metainfo.ParseMagnetURI(uri)
1893         if err != nil {
1894                 return
1895         }
1896         spec = &TorrentSpec{
1897                 Trackers:    [][]string{m.Trackers},
1898                 DisplayName: m.DisplayName,
1899                 InfoHash:    m.InfoHash,
1900         }
1901         return
1902 }
1903
1904 func TorrentSpecFromMetaInfo(mi *metainfo.MetaInfo) (spec *TorrentSpec) {
1905         spec = &TorrentSpec{
1906                 Trackers:    mi.AnnounceList,
1907                 Info:        &mi.Info,
1908                 DisplayName: mi.Info.Name,
1909         }
1910
1911         if len(spec.Trackers) == 0 {
1912                 spec.Trackers = [][]string{[]string{mi.Announce}}
1913         } else {
1914                 spec.Trackers[0] = append(spec.Trackers[0], mi.Announce)
1915         }
1916
1917         missinggo.CopyExact(&spec.InfoHash, mi.Info.Hash)
1918         return
1919 }
1920
1921 // Add or merge a torrent spec. If the torrent is already present, the
1922 // trackers will be merged with the existing ones. If the Info isn't yet
1923 // known, it will be set. The display name is replaced if the new spec
1924 // provides one. Returns new if the torrent wasn't already in the client.
1925 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1926         cl.mu.Lock()
1927         defer cl.mu.Unlock()
1928
1929         t, ok := cl.torrents[spec.InfoHash]
1930         if !ok {
1931                 new = true
1932
1933                 // TODO: This doesn't belong in the core client, it's more of a
1934                 // helper.
1935                 if _, ok := cl.bannedTorrents[spec.InfoHash]; ok {
1936                         err = errors.New("banned torrent")
1937                         return
1938                 }
1939                 // TODO: Tidy this up?
1940                 t = newTorrent(spec.InfoHash)
1941                 t.cl = cl
1942                 t.wantPeers.L = &cl.mu
1943                 if spec.ChunkSize != 0 {
1944                         t.chunkSize = pp.Integer(spec.ChunkSize)
1945                 }
1946                 t.storageOpener = spec.Storage
1947                 if t.storageOpener == nil {
1948                         t.storageOpener = cl.defaultStorage
1949                 }
1950         }
1951         if spec.DisplayName != "" {
1952                 t.setDisplayName(spec.DisplayName)
1953         }
1954         // Try to merge in info we have on the torrent. Any err left will
1955         // terminate the function.
1956         if t.info == nil {
1957                 if spec.Info != nil {
1958                         err = cl.setMetaData(t, &spec.Info.Info, spec.Info.Bytes)
1959                 } else {
1960                         var mi *metainfo.MetaInfo
1961                         mi, err = cl.torrentCacheMetaInfo(spec.InfoHash)
1962                         if err != nil {
1963                                 log.Printf("error getting cached metainfo: %s", err)
1964                                 err = nil
1965                         } else if mi != nil {
1966                                 t.addTrackers(mi.AnnounceList)
1967                                 err = cl.setMetaData(t, &mi.Info.Info, mi.Info.Bytes)
1968                         }
1969                 }
1970         }
1971         if err != nil {
1972                 return
1973         }
1974         t.addTrackers(spec.Trackers)
1975
1976         cl.torrents[spec.InfoHash] = t
1977         t.maybeNewConns()
1978
1979         // From this point onwards, we can consider the torrent a part of the
1980         // client.
1981         if new {
1982                 if !cl.config.DisableTrackers {
1983                         go cl.announceTorrentTrackers(t)
1984                 }
1985                 if cl.dHT != nil {
1986                         go cl.announceTorrentDHT(t, true)
1987                 }
1988         }
1989         return
1990 }
1991
1992 func (me *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1993         t, ok := me.torrents[infoHash]
1994         if !ok {
1995                 err = fmt.Errorf("no such torrent")
1996                 return
1997         }
1998         err = t.close()
1999         if err != nil {
2000                 panic(err)
2001         }
2002         delete(me.torrents, infoHash)
2003         return
2004 }
2005
2006 // Returns true when peers are required, or false if the torrent is closing.
2007 func (cl *Client) waitWantPeers(t *Torrent) bool {
2008         cl.mu.Lock()
2009         defer cl.mu.Unlock()
2010         for {
2011                 select {
2012                 case <-t.ceasingNetworking:
2013                         return false
2014                 default:
2015                 }
2016                 if len(t.peers) > torrentPeersLowWater {
2017                         goto wait
2018                 }
2019                 if t.needData() || cl.seeding(t) {
2020                         return true
2021                 }
2022         wait:
2023                 t.wantPeers.Wait()
2024         }
2025 }
2026
2027 // Returns whether the client should make effort to seed the torrent.
2028 func (cl *Client) seeding(t *Torrent) bool {
2029         if cl.config.NoUpload {
2030                 return false
2031         }
2032         if !cl.config.Seed {
2033                 return false
2034         }
2035         if t.needData() {
2036                 return false
2037         }
2038         return true
2039 }
2040
2041 func (cl *Client) announceTorrentDHT(t *Torrent, impliedPort bool) {
2042         for cl.waitWantPeers(t) {
2043                 // log.Printf("getting peers for %q from DHT", t)
2044                 ps, err := cl.dHT.Announce(string(t.infoHash[:]), cl.incomingPeerPort(), impliedPort)
2045                 if err != nil {
2046                         log.Printf("error getting peers from dht: %s", err)
2047                         return
2048                 }
2049                 // Count all the unique addresses we got during this announce.
2050                 allAddrs := make(map[string]struct{})
2051         getPeers:
2052                 for {
2053                         select {
2054                         case v, ok := <-ps.Peers:
2055                                 if !ok {
2056                                         break getPeers
2057                                 }
2058                                 addPeers := make([]Peer, 0, len(v.Peers))
2059                                 for _, cp := range v.Peers {
2060                                         if cp.Port == 0 {
2061                                                 // Can't do anything with this.
2062                                                 continue
2063                                         }
2064                                         addPeers = append(addPeers, Peer{
2065                                                 IP:     cp.IP[:],
2066                                                 Port:   int(cp.Port),
2067                                                 Source: peerSourceDHT,
2068                                         })
2069                                         key := (&net.UDPAddr{
2070                                                 IP:   cp.IP[:],
2071                                                 Port: int(cp.Port),
2072                                         }).String()
2073                                         allAddrs[key] = struct{}{}
2074                                 }
2075                                 cl.mu.Lock()
2076                                 cl.addPeers(t, addPeers)
2077                                 numPeers := len(t.peers)
2078                                 cl.mu.Unlock()
2079                                 if numPeers >= torrentPeersHighWater {
2080                                         break getPeers
2081                                 }
2082                         case <-t.ceasingNetworking:
2083                                 ps.Close()
2084                                 return
2085                         }
2086                 }
2087                 ps.Close()
2088                 // log.Printf("finished DHT peer scrape for %s: %d peers", t, len(allAddrs))
2089         }
2090 }
2091
2092 func (cl *Client) trackerBlockedUnlocked(trRawURL string) (blocked bool, err error) {
2093         url_, err := url.Parse(trRawURL)
2094         if err != nil {
2095                 return
2096         }
2097         host, _, err := net.SplitHostPort(url_.Host)
2098         if err != nil {
2099                 host = url_.Host
2100         }
2101         addr, err := net.ResolveIPAddr("ip", host)
2102         if err != nil {
2103                 return
2104         }
2105         cl.mu.RLock()
2106         _, blocked = cl.ipBlockRange(addr.IP)
2107         cl.mu.RUnlock()
2108         return
2109 }
2110
2111 func (cl *Client) announceTorrentSingleTracker(tr string, req *tracker.AnnounceRequest, t *Torrent) error {
2112         blocked, err := cl.trackerBlockedUnlocked(tr)
2113         if err != nil {
2114                 return fmt.Errorf("error determining if tracker blocked: %s", err)
2115         }
2116         if blocked {
2117                 return fmt.Errorf("tracker blocked: %s", tr)
2118         }
2119         resp, err := tracker.Announce(tr, req)
2120         if err != nil {
2121                 return fmt.Errorf("error announcing: %s", err)
2122         }
2123         var peers []Peer
2124         for _, peer := range resp.Peers {
2125                 peers = append(peers, Peer{
2126                         IP:   peer.IP,
2127                         Port: peer.Port,
2128                 })
2129         }
2130         cl.mu.Lock()
2131         cl.addPeers(t, peers)
2132         cl.mu.Unlock()
2133
2134         // log.Printf("%s: %d new peers from %s", t, len(peers), tr)
2135
2136         time.Sleep(time.Second * time.Duration(resp.Interval))
2137         return nil
2138 }
2139
2140 func (cl *Client) announceTorrentTrackersFastStart(req *tracker.AnnounceRequest, trackers []trackerTier, t *Torrent) (atLeastOne bool) {
2141         oks := make(chan bool)
2142         outstanding := 0
2143         for _, tier := range trackers {
2144                 for _, tr := range tier {
2145                         outstanding++
2146                         go func(tr string) {
2147                                 err := cl.announceTorrentSingleTracker(tr, req, t)
2148                                 oks <- err == nil
2149                         }(tr)
2150                 }
2151         }
2152         for outstanding > 0 {
2153                 ok := <-oks
2154                 outstanding--
2155                 if ok {
2156                         atLeastOne = true
2157                 }
2158         }
2159         return
2160 }
2161
2162 // Announce torrent to its trackers.
2163 func (cl *Client) announceTorrentTrackers(t *Torrent) {
2164         req := tracker.AnnounceRequest{
2165                 Event:    tracker.Started,
2166                 NumWant:  -1,
2167                 Port:     uint16(cl.incomingPeerPort()),
2168                 PeerId:   cl.peerID,
2169                 InfoHash: t.infoHash,
2170         }
2171         if !cl.waitWantPeers(t) {
2172                 return
2173         }
2174         cl.mu.RLock()
2175         req.Left = t.bytesLeftAnnounce()
2176         trackers := t.trackers
2177         cl.mu.RUnlock()
2178         if cl.announceTorrentTrackersFastStart(&req, trackers, t) {
2179                 req.Event = tracker.None
2180         }
2181 newAnnounce:
2182         for cl.waitWantPeers(t) {
2183                 cl.mu.RLock()
2184                 req.Left = t.bytesLeftAnnounce()
2185                 trackers = t.trackers
2186                 cl.mu.RUnlock()
2187                 numTrackersTried := 0
2188                 for _, tier := range trackers {
2189                         for trIndex, tr := range tier {
2190                                 numTrackersTried++
2191                                 err := cl.announceTorrentSingleTracker(tr, &req, t)
2192                                 if err != nil {
2193                                         continue
2194                                 }
2195                                 // Float the successful announce to the top of the tier. If
2196                                 // the trackers list has been changed, we'll be modifying an
2197                                 // old copy so it won't matter.
2198                                 cl.mu.Lock()
2199                                 tier[0], tier[trIndex] = tier[trIndex], tier[0]
2200                                 cl.mu.Unlock()
2201
2202                                 req.Event = tracker.None
2203                                 continue newAnnounce
2204                         }
2205                 }
2206                 if numTrackersTried != 0 {
2207                         log.Printf("%s: all trackers failed", t)
2208                 }
2209                 // TODO: Wait until trackers are added if there are none.
2210                 time.Sleep(10 * time.Second)
2211         }
2212 }
2213
2214 func (cl *Client) allTorrentsCompleted() bool {
2215         for _, t := range cl.torrents {
2216                 if !t.haveInfo() {
2217                         return false
2218                 }
2219                 if t.numPiecesCompleted() != t.numPieces() {
2220                         return false
2221                 }
2222         }
2223         return true
2224 }
2225
2226 // Returns true when all torrents are completely downloaded and false if the
2227 // client is stopped before that.
2228 func (me *Client) WaitAll() bool {
2229         me.mu.Lock()
2230         defer me.mu.Unlock()
2231         for !me.allTorrentsCompleted() {
2232                 if me.closed.IsSet() {
2233                         return false
2234                 }
2235                 me.event.Wait()
2236         }
2237         return true
2238 }
2239
2240 // Handle a received chunk from a peer.
2241 func (me *Client) downloadedChunk(t *Torrent, c *connection, msg *pp.Message) {
2242         chunksReceived.Add(1)
2243
2244         req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
2245
2246         // Request has been satisfied.
2247         if me.connDeleteRequest(t, c, req) {
2248                 defer c.updateRequests()
2249         } else {
2250                 unexpectedChunksReceived.Add(1)
2251         }
2252
2253         index := int(req.Index)
2254         piece := &t.pieces[index]
2255
2256         // Do we actually want this chunk?
2257         if !t.wantChunk(req) {
2258                 unwantedChunksReceived.Add(1)
2259                 c.UnwantedChunksReceived++
2260                 return
2261         }
2262
2263         c.UsefulChunksReceived++
2264         c.lastUsefulChunkReceived = time.Now()
2265
2266         me.upload(t, c)
2267
2268         // Need to record that it hasn't been written yet, before we attempt to do
2269         // anything with it.
2270         piece.incrementPendingWrites()
2271         // Record that we have the chunk.
2272         piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
2273
2274         // Cancel pending requests for this chunk.
2275         for _, c := range t.conns {
2276                 if me.connCancel(t, c, req) {
2277                         c.updateRequests()
2278                 }
2279         }
2280
2281         me.mu.Unlock()
2282         // Write the chunk out.
2283         err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
2284         me.mu.Lock()
2285
2286         piece.decrementPendingWrites()
2287
2288         if err != nil {
2289                 log.Printf("%s: error writing chunk %v: %s", t, req, err)
2290                 t.pendRequest(req)
2291                 t.updatePieceCompletion(int(msg.Index))
2292                 return
2293         }
2294
2295         // It's important that the piece is potentially queued before we check if
2296         // the piece is still wanted, because if it is queued, it won't be wanted.
2297         if t.pieceAllDirty(index) {
2298                 me.queuePieceCheck(t, int(req.Index))
2299         }
2300
2301         if c.peerTouchedPieces == nil {
2302                 c.peerTouchedPieces = make(map[int]struct{})
2303         }
2304         c.peerTouchedPieces[index] = struct{}{}
2305
2306         me.event.Broadcast()
2307         t.publishPieceChange(int(req.Index))
2308         return
2309 }
2310
2311 // Return the connections that touched a piece, and clear the entry while
2312 // doing it.
2313 func (me *Client) reapPieceTouches(t *Torrent, piece int) (ret []*connection) {
2314         for _, c := range t.conns {
2315                 if _, ok := c.peerTouchedPieces[piece]; ok {
2316                         ret = append(ret, c)
2317                         delete(c.peerTouchedPieces, piece)
2318                 }
2319         }
2320         return
2321 }
2322
2323 func (me *Client) pieceHashed(t *Torrent, piece int, correct bool) {
2324         p := &t.pieces[piece]
2325         if p.EverHashed {
2326                 // Don't score the first time a piece is hashed, it could be an
2327                 // initial check.
2328                 if correct {
2329                         pieceHashedCorrect.Add(1)
2330                 } else {
2331                         log.Printf("%s: piece %d (%x) failed hash", t, piece, p.Hash)
2332                         pieceHashedNotCorrect.Add(1)
2333                 }
2334         }
2335         p.EverHashed = true
2336         touchers := me.reapPieceTouches(t, int(piece))
2337         if correct {
2338                 err := p.Storage().MarkComplete()
2339                 if err != nil {
2340                         log.Printf("%T: error completing piece %d: %s", t.storage, piece, err)
2341                 }
2342                 t.updatePieceCompletion(piece)
2343         } else if len(touchers) != 0 {
2344                 log.Printf("dropping %d conns that touched piece", len(touchers))
2345                 for _, c := range touchers {
2346                         me.dropConnection(t, c)
2347                 }
2348         }
2349         me.pieceChanged(t, int(piece))
2350 }
2351
2352 func (me *Client) onCompletedPiece(t *Torrent, piece int) {
2353         t.pendingPieces.Remove(piece)
2354         t.pendAllChunkSpecs(piece)
2355         for _, conn := range t.conns {
2356                 conn.Have(piece)
2357                 for r := range conn.Requests {
2358                         if int(r.Index) == piece {
2359                                 conn.Cancel(r)
2360                         }
2361                 }
2362                 // Could check here if peer doesn't have piece, but due to caching
2363                 // some peers may have said they have a piece but they don't.
2364                 me.upload(t, conn)
2365         }
2366 }
2367
2368 func (me *Client) onFailedPiece(t *Torrent, piece int) {
2369         if t.pieceAllDirty(piece) {
2370                 t.pendAllChunkSpecs(piece)
2371         }
2372         if !t.wantPiece(piece) {
2373                 return
2374         }
2375         me.openNewConns(t)
2376         for _, conn := range t.conns {
2377                 if conn.PeerHasPiece(piece) {
2378                         conn.updateRequests()
2379                 }
2380         }
2381 }
2382
2383 func (me *Client) pieceChanged(t *Torrent, piece int) {
2384         correct := t.pieceComplete(piece)
2385         defer me.event.Broadcast()
2386         if correct {
2387                 me.onCompletedPiece(t, piece)
2388         } else {
2389                 me.onFailedPiece(t, piece)
2390         }
2391         if t.updatePiecePriority(piece) {
2392                 t.piecePriorityChanged(piece)
2393         }
2394         t.publishPieceChange(piece)
2395 }
2396
2397 func (cl *Client) verifyPiece(t *Torrent, piece int) {
2398         cl.mu.Lock()
2399         defer cl.mu.Unlock()
2400         p := &t.pieces[piece]
2401         for p.Hashing || t.storage == nil {
2402                 cl.event.Wait()
2403         }
2404         p.QueuedForHash = false
2405         if t.isClosed() || t.pieceComplete(piece) {
2406                 t.updatePiecePriority(piece)
2407                 t.publishPieceChange(piece)
2408                 return
2409         }
2410         p.Hashing = true
2411         t.publishPieceChange(piece)
2412         cl.mu.Unlock()
2413         sum := t.hashPiece(piece)
2414         cl.mu.Lock()
2415         select {
2416         case <-t.closing:
2417                 return
2418         default:
2419         }
2420         p.Hashing = false
2421         cl.pieceHashed(t, piece, sum == p.Hash)
2422 }
2423
2424 // Returns handles to all the torrents loaded in the Client.
2425 func (me *Client) Torrents() (ret []*Torrent) {
2426         me.mu.Lock()
2427         for _, t := range me.torrents {
2428                 ret = append(ret, t)
2429         }
2430         me.mu.Unlock()
2431         return
2432 }
2433
2434 func (me *Client) AddMagnet(uri string) (T *Torrent, err error) {
2435         spec, err := TorrentSpecFromMagnetURI(uri)
2436         if err != nil {
2437                 return
2438         }
2439         T, _, err = me.AddTorrentSpec(spec)
2440         return
2441 }
2442
2443 func (me *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
2444         T, _, err = me.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
2445         var ss []string
2446         missinggo.CastSlice(&ss, mi.Nodes)
2447         me.AddDHTNodes(ss)
2448         return
2449 }
2450
2451 func (me *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
2452         mi, err := metainfo.LoadFromFile(filename)
2453         if err != nil {
2454                 return
2455         }
2456         return me.AddTorrent(mi)
2457 }
2458
2459 func (me *Client) DHT() *dht.Server {
2460         return me.dHT
2461 }
2462
2463 func (me *Client) AddDHTNodes(nodes []string) {
2464         for _, n := range nodes {
2465                 hmp := missinggo.SplitHostMaybePort(n)
2466                 ip := net.ParseIP(hmp.Host)
2467                 if ip == nil {
2468                         log.Printf("won't add DHT node with bad IP: %q", hmp.Host)
2469                         continue
2470                 }
2471                 ni := dht.NodeInfo{
2472                         Addr: dht.NewAddr(&net.UDPAddr{
2473                                 IP:   ip,
2474                                 Port: hmp.Port,
2475                         }),
2476                 }
2477                 me.DHT().AddNode(ni)
2478         }
2479 }