]> Sergey Matveev's repositories - btrtrc.git/blob - client.go
Make everything on type torrent private
[btrtrc.git] / client.go
1 package torrent
2
3 import (
4         "bufio"
5         "bytes"
6         "crypto/rand"
7         "crypto/sha1"
8         "encoding/hex"
9         "errors"
10         "expvar"
11         "fmt"
12         "io"
13         "log"
14         "math/big"
15         mathRand "math/rand"
16         "net"
17         "net/url"
18         "os"
19         "path/filepath"
20         "sort"
21         "strconv"
22         "strings"
23         "time"
24
25         "github.com/anacrolix/missinggo"
26         "github.com/anacrolix/missinggo/bitmap"
27         "github.com/anacrolix/missinggo/pproffd"
28         "github.com/anacrolix/missinggo/pubsub"
29         "github.com/anacrolix/sync"
30         "github.com/anacrolix/utp"
31         "github.com/edsrzf/mmap-go"
32
33         "github.com/anacrolix/torrent/bencode"
34         "github.com/anacrolix/torrent/dht"
35         "github.com/anacrolix/torrent/iplist"
36         "github.com/anacrolix/torrent/metainfo"
37         "github.com/anacrolix/torrent/mse"
38         pp "github.com/anacrolix/torrent/peer_protocol"
39         "github.com/anacrolix/torrent/storage"
40         "github.com/anacrolix/torrent/tracker"
41 )
42
43 // I could move a lot of these counters to their own file, but I suspect they
44 // may be attached to a Client someday.
45 var (
46         unwantedChunksReceived   = expvar.NewInt("chunksReceivedUnwanted")
47         unexpectedChunksReceived = expvar.NewInt("chunksReceivedUnexpected")
48         chunksReceived           = expvar.NewInt("chunksReceived")
49
50         peersAddedBySource = expvar.NewMap("peersAddedBySource")
51
52         uploadChunksPosted    = expvar.NewInt("uploadChunksPosted")
53         unexpectedCancels     = expvar.NewInt("unexpectedCancels")
54         postedCancels         = expvar.NewInt("postedCancels")
55         duplicateConnsAvoided = expvar.NewInt("duplicateConnsAvoided")
56
57         pieceHashedCorrect    = expvar.NewInt("pieceHashedCorrect")
58         pieceHashedNotCorrect = expvar.NewInt("pieceHashedNotCorrect")
59
60         unsuccessfulDials = expvar.NewInt("dialSuccessful")
61         successfulDials   = expvar.NewInt("dialUnsuccessful")
62
63         acceptUTP    = expvar.NewInt("acceptUTP")
64         acceptTCP    = expvar.NewInt("acceptTCP")
65         acceptReject = expvar.NewInt("acceptReject")
66
67         peerExtensions                    = expvar.NewMap("peerExtensions")
68         completedHandshakeConnectionFlags = expvar.NewMap("completedHandshakeConnectionFlags")
69         // Count of connections to peer with same client ID.
70         connsToSelf = expvar.NewInt("connsToSelf")
71         // Number of completed connections to a client we're already connected with.
72         duplicateClientConns       = expvar.NewInt("duplicateClientConns")
73         receivedMessageTypes       = expvar.NewMap("receivedMessageTypes")
74         receivedKeepalives         = expvar.NewInt("receivedKeepalives")
75         supportedExtensionMessages = expvar.NewMap("supportedExtensionMessages")
76         postedMessageTypes         = expvar.NewMap("postedMessageTypes")
77         postedKeepalives           = expvar.NewInt("postedKeepalives")
78         // Requests received for pieces we don't have.
79         requestsReceivedForMissingPieces = expvar.NewInt("requestsReceivedForMissingPieces")
80 )
81
82 const (
83         // Justification for set bits follows.
84         //
85         // Extension protocol ([5]|=0x10):
86         // http://www.bittorrent.org/beps/bep_0010.html
87         //
88         // Fast Extension ([7]|=0x04):
89         // http://bittorrent.org/beps/bep_0006.html.
90         // Disabled until AllowedFast is implemented.
91         //
92         // DHT ([7]|=1):
93         // http://www.bittorrent.org/beps/bep_0005.html
94         defaultExtensionBytes = "\x00\x00\x00\x00\x00\x10\x00\x01"
95
96         socketsPerTorrent     = 80
97         torrentPeersHighWater = 200
98         torrentPeersLowWater  = 50
99
100         // Limit how long handshake can take. This is to reduce the lingering
101         // impact of a few bad apples. 4s loses 1% of successful handshakes that
102         // are obtained with 60s timeout, and 5% of unsuccessful handshakes.
103         btHandshakeTimeout = 4 * time.Second
104         handshakesTimeout  = 20 * time.Second
105
106         // These are our extended message IDs.
107         metadataExtendedId = iota + 1 // 0 is reserved for deleting keys
108         pexExtendedId
109
110         // Updated occasionally to when there's been some changes to client
111         // behaviour in case other clients are assuming anything of us. See also
112         // `bep20`.
113         extendedHandshakeClientVersion = "go.torrent dev 20150624"
114 )
115
116 // Currently doesn't really queue, but should in the future.
117 func (cl *Client) queuePieceCheck(t *torrent, pieceIndex int) {
118         piece := &t.pieces[pieceIndex]
119         if piece.QueuedForHash {
120                 return
121         }
122         piece.QueuedForHash = true
123         t.publishPieceChange(int(pieceIndex))
124         go cl.verifyPiece(t, int(pieceIndex))
125 }
126
127 // Queue a piece check if one isn't already queued, and the piece has never
128 // been checked before.
129 func (cl *Client) queueFirstHash(t *torrent, piece int) {
130         p := &t.pieces[piece]
131         if p.EverHashed || p.Hashing || p.QueuedForHash || t.pieceComplete(piece) {
132                 return
133         }
134         cl.queuePieceCheck(t, piece)
135 }
136
137 // Clients contain zero or more Torrents. A client manages a blocklist, the
138 // TCP/UDP protocol ports, and DHT as desired.
139 type Client struct {
140         halfOpenLimit  int
141         peerID         [20]byte
142         listeners      []net.Listener
143         utpSock        *utp.Socket
144         dHT            *dht.Server
145         ipBlockList    iplist.Ranger
146         bannedTorrents map[metainfo.InfoHash]struct{}
147         config         Config
148         pruneTimer     *time.Timer
149         extensionBytes peerExtensionBytes
150         // Set of addresses that have our client ID. This intentionally will
151         // include ourselves if we end up trying to connect to our own address
152         // through legitimate channels.
153         dopplegangerAddrs map[string]struct{}
154
155         defaultStorage storage.I
156
157         mu     sync.RWMutex
158         event  sync.Cond
159         closed missinggo.Event
160
161         torrents map[metainfo.InfoHash]*torrent
162 }
163
164 func (me *Client) IPBlockList() iplist.Ranger {
165         me.mu.Lock()
166         defer me.mu.Unlock()
167         return me.ipBlockList
168 }
169
170 func (me *Client) SetIPBlockList(list iplist.Ranger) {
171         me.mu.Lock()
172         defer me.mu.Unlock()
173         me.ipBlockList = list
174         if me.dHT != nil {
175                 me.dHT.SetIPBlockList(list)
176         }
177 }
178
179 func (me *Client) PeerID() string {
180         return string(me.peerID[:])
181 }
182
183 func (me *Client) ListenAddr() (addr net.Addr) {
184         for _, l := range me.listeners {
185                 addr = l.Addr()
186                 break
187         }
188         return
189 }
190
191 type hashSorter struct {
192         Hashes []metainfo.InfoHash
193 }
194
195 func (me hashSorter) Len() int {
196         return len(me.Hashes)
197 }
198
199 func (me hashSorter) Less(a, b int) bool {
200         return (&big.Int{}).SetBytes(me.Hashes[a][:]).Cmp((&big.Int{}).SetBytes(me.Hashes[b][:])) < 0
201 }
202
203 func (me hashSorter) Swap(a, b int) {
204         me.Hashes[a], me.Hashes[b] = me.Hashes[b], me.Hashes[a]
205 }
206
207 func (cl *Client) sortedTorrents() (ret []*torrent) {
208         var hs hashSorter
209         for ih := range cl.torrents {
210                 hs.Hashes = append(hs.Hashes, ih)
211         }
212         sort.Sort(hs)
213         for _, ih := range hs.Hashes {
214                 ret = append(ret, cl.torrent(ih))
215         }
216         return
217 }
218
219 // Writes out a human readable status of the client, such as for writing to a
220 // HTTP status page.
221 func (cl *Client) WriteStatus(_w io.Writer) {
222         cl.mu.RLock()
223         defer cl.mu.RUnlock()
224         w := bufio.NewWriter(_w)
225         defer w.Flush()
226         if addr := cl.ListenAddr(); addr != nil {
227                 fmt.Fprintf(w, "Listening on %s\n", cl.ListenAddr())
228         } else {
229                 fmt.Fprintln(w, "Not listening!")
230         }
231         fmt.Fprintf(w, "Peer ID: %+q\n", cl.peerID)
232         if cl.dHT != nil {
233                 dhtStats := cl.dHT.Stats()
234                 fmt.Fprintf(w, "DHT nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
235                 fmt.Fprintf(w, "DHT Server ID: %x\n", cl.dHT.ID())
236                 fmt.Fprintf(w, "DHT port: %d\n", missinggo.AddrPort(cl.dHT.Addr()))
237                 fmt.Fprintf(w, "DHT announces: %d\n", dhtStats.ConfirmedAnnounces)
238                 fmt.Fprintf(w, "Outstanding transactions: %d\n", dhtStats.OutstandingTransactions)
239         }
240         fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrents))
241         fmt.Fprintln(w)
242         for _, t := range cl.sortedTorrents() {
243                 if t.Name() == "" {
244                         fmt.Fprint(w, "<unknown name>")
245                 } else {
246                         fmt.Fprint(w, t.Name())
247                 }
248                 fmt.Fprint(w, "\n")
249                 if t.haveInfo() {
250                         fmt.Fprintf(w, "%f%% of %d bytes", 100*(1-float64(t.bytesLeft())/float64(t.length)), t.length)
251                 } else {
252                         w.WriteString("<missing metainfo>")
253                 }
254                 fmt.Fprint(w, "\n")
255                 t.writeStatus(w, cl)
256                 fmt.Fprintln(w)
257         }
258 }
259
260 // Calculates the number of pieces to set to Readahead priority, after the
261 // Now, and Next pieces.
262 func readaheadPieces(readahead, pieceLength int64) (ret int) {
263         // Expand the readahead to fit any partial pieces. Subtract 1 for the
264         // "next" piece that is assigned.
265         ret = int((readahead+pieceLength-1)/pieceLength - 1)
266         // Lengthen the "readahead tail" to smooth blockiness that occurs when the
267         // piece length is much larger than the readahead.
268         if ret < 2 {
269                 ret++
270         }
271         return
272 }
273
274 func (cl *Client) configDir() string {
275         if cl.config.ConfigDir == "" {
276                 return filepath.Join(os.Getenv("HOME"), ".config/torrent")
277         }
278         return cl.config.ConfigDir
279 }
280
281 // The directory where the Client expects to find and store configuration
282 // data. Defaults to $HOME/.config/torrent.
283 func (cl *Client) ConfigDir() string {
284         return cl.configDir()
285 }
286
287 func loadPackedBlocklist(filename string) (ret iplist.Ranger, err error) {
288         f, err := os.Open(filename)
289         if os.IsNotExist(err) {
290                 err = nil
291                 return
292         }
293         if err != nil {
294                 return
295         }
296         defer f.Close()
297         mm, err := mmap.Map(f, mmap.RDONLY, 0)
298         if err != nil {
299                 return
300         }
301         ret = iplist.NewFromPacked(mm)
302         return
303 }
304
305 func (cl *Client) setEnvBlocklist() (err error) {
306         filename := os.Getenv("TORRENT_BLOCKLIST_FILE")
307         defaultBlocklist := filename == ""
308         if defaultBlocklist {
309                 cl.ipBlockList, err = loadPackedBlocklist(filepath.Join(cl.configDir(), "packed-blocklist"))
310                 if err != nil {
311                         return
312                 }
313                 if cl.ipBlockList != nil {
314                         return
315                 }
316                 filename = filepath.Join(cl.configDir(), "blocklist")
317         }
318         f, err := os.Open(filename)
319         if err != nil {
320                 if defaultBlocklist {
321                         err = nil
322                 }
323                 return
324         }
325         defer f.Close()
326         cl.ipBlockList, err = iplist.NewFromReader(f)
327         return
328 }
329
330 func (cl *Client) initBannedTorrents() error {
331         f, err := os.Open(filepath.Join(cl.configDir(), "banned_infohashes"))
332         if err != nil {
333                 if os.IsNotExist(err) {
334                         return nil
335                 }
336                 return fmt.Errorf("error opening banned infohashes file: %s", err)
337         }
338         defer f.Close()
339         scanner := bufio.NewScanner(f)
340         cl.bannedTorrents = make(map[metainfo.InfoHash]struct{})
341         for scanner.Scan() {
342                 if strings.HasPrefix(strings.TrimSpace(scanner.Text()), "#") {
343                         continue
344                 }
345                 var ihs string
346                 n, err := fmt.Sscanf(scanner.Text(), "%x", &ihs)
347                 if err != nil {
348                         return fmt.Errorf("error reading infohash: %s", err)
349                 }
350                 if n != 1 {
351                         continue
352                 }
353                 if len(ihs) != 20 {
354                         return errors.New("bad infohash")
355                 }
356                 var ih metainfo.InfoHash
357                 missinggo.CopyExact(&ih, ihs)
358                 cl.bannedTorrents[ih] = struct{}{}
359         }
360         if err := scanner.Err(); err != nil {
361                 return fmt.Errorf("error scanning file: %s", err)
362         }
363         return nil
364 }
365
366 // Creates a new client.
367 func NewClient(cfg *Config) (cl *Client, err error) {
368         if cfg == nil {
369                 cfg = &Config{}
370         }
371
372         defer func() {
373                 if err != nil {
374                         cl = nil
375                 }
376         }()
377         cl = &Client{
378                 halfOpenLimit:     socketsPerTorrent,
379                 config:            *cfg,
380                 defaultStorage:    cfg.DefaultStorage,
381                 dopplegangerAddrs: make(map[string]struct{}),
382                 torrents:          make(map[metainfo.InfoHash]*torrent),
383         }
384         missinggo.CopyExact(&cl.extensionBytes, defaultExtensionBytes)
385         cl.event.L = &cl.mu
386         if cl.defaultStorage == nil {
387                 cl.defaultStorage = storage.NewFile(cfg.DataDir)
388         }
389         if cfg.IPBlocklist != nil {
390                 cl.ipBlockList = cfg.IPBlocklist
391         } else if !cfg.NoDefaultBlocklist {
392                 err = cl.setEnvBlocklist()
393                 if err != nil {
394                         return
395                 }
396         }
397
398         if err = cl.initBannedTorrents(); err != nil {
399                 err = fmt.Errorf("error initing banned torrents: %s", err)
400                 return
401         }
402
403         if cfg.PeerID != "" {
404                 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
405         } else {
406                 o := copy(cl.peerID[:], bep20)
407                 _, err = rand.Read(cl.peerID[o:])
408                 if err != nil {
409                         panic("error generating peer id")
410                 }
411         }
412
413         // Returns the laddr string to listen on for the next Listen call.
414         listenAddr := func() string {
415                 if addr := cl.ListenAddr(); addr != nil {
416                         return addr.String()
417                 }
418                 if cfg.ListenAddr == "" {
419                         return ":50007"
420                 }
421                 return cfg.ListenAddr
422         }
423         if !cl.config.DisableTCP {
424                 var l net.Listener
425                 l, err = net.Listen(func() string {
426                         if cl.config.DisableIPv6 {
427                                 return "tcp4"
428                         } else {
429                                 return "tcp"
430                         }
431                 }(), listenAddr())
432                 if err != nil {
433                         return
434                 }
435                 cl.listeners = append(cl.listeners, l)
436                 go cl.acceptConnections(l, false)
437         }
438         if !cl.config.DisableUTP {
439                 cl.utpSock, err = utp.NewSocket(func() string {
440                         if cl.config.DisableIPv6 {
441                                 return "udp4"
442                         } else {
443                                 return "udp"
444                         }
445                 }(), listenAddr())
446                 if err != nil {
447                         return
448                 }
449                 cl.listeners = append(cl.listeners, cl.utpSock)
450                 go cl.acceptConnections(cl.utpSock, true)
451         }
452         if !cfg.NoDHT {
453                 dhtCfg := cfg.DHTConfig
454                 if dhtCfg.IPBlocklist == nil {
455                         dhtCfg.IPBlocklist = cl.ipBlockList
456                 }
457                 if dhtCfg.Addr == "" {
458                         dhtCfg.Addr = listenAddr()
459                 }
460                 if dhtCfg.Conn == nil && cl.utpSock != nil {
461                         dhtCfg.Conn = cl.utpSock
462                 }
463                 cl.dHT, err = dht.NewServer(&dhtCfg)
464                 if err != nil {
465                         return
466                 }
467         }
468
469         return
470 }
471
472 // Stops the client. All connections to peers are closed and all activity will
473 // come to a halt.
474 func (me *Client) Close() {
475         me.mu.Lock()
476         defer me.mu.Unlock()
477         me.closed.Set()
478         if me.dHT != nil {
479                 me.dHT.Close()
480         }
481         for _, l := range me.listeners {
482                 l.Close()
483         }
484         for _, t := range me.torrents {
485                 t.close()
486         }
487         me.event.Broadcast()
488 }
489
490 var ipv6BlockRange = iplist.Range{Description: "non-IPv4 address"}
491
492 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
493         if cl.ipBlockList == nil {
494                 return
495         }
496         ip4 := ip.To4()
497         // If blocklists are enabled, then block non-IPv4 addresses, because
498         // blocklists do not yet support IPv6.
499         if ip4 == nil {
500                 if missinggo.CryHeard() {
501                         log.Printf("blocking non-IPv4 address: %s", ip)
502                 }
503                 r = ipv6BlockRange
504                 blocked = true
505                 return
506         }
507         return cl.ipBlockList.Lookup(ip4)
508 }
509
510 func (cl *Client) waitAccept() {
511         cl.mu.Lock()
512         defer cl.mu.Unlock()
513         for {
514                 for _, t := range cl.torrents {
515                         if cl.wantConns(t) {
516                                 return
517                         }
518                 }
519                 if cl.closed.IsSet() {
520                         return
521                 }
522                 cl.event.Wait()
523         }
524 }
525
526 func (cl *Client) acceptConnections(l net.Listener, utp bool) {
527         for {
528                 cl.waitAccept()
529                 conn, err := l.Accept()
530                 conn = pproffd.WrapNetConn(conn)
531                 if cl.closed.IsSet() {
532                         if conn != nil {
533                                 conn.Close()
534                         }
535                         return
536                 }
537                 if err != nil {
538                         log.Print(err)
539                         // I think something harsher should happen here? Our accept
540                         // routine just fucked off.
541                         return
542                 }
543                 if utp {
544                         acceptUTP.Add(1)
545                 } else {
546                         acceptTCP.Add(1)
547                 }
548                 cl.mu.RLock()
549                 doppleganger := cl.dopplegangerAddr(conn.RemoteAddr().String())
550                 _, blocked := cl.ipBlockRange(missinggo.AddrIP(conn.RemoteAddr()))
551                 cl.mu.RUnlock()
552                 if blocked || doppleganger {
553                         acceptReject.Add(1)
554                         // log.Printf("inbound connection from %s blocked by %s", conn.RemoteAddr(), blockRange)
555                         conn.Close()
556                         continue
557                 }
558                 go cl.incomingConnection(conn, utp)
559         }
560 }
561
562 func (cl *Client) incomingConnection(nc net.Conn, utp bool) {
563         defer nc.Close()
564         if tc, ok := nc.(*net.TCPConn); ok {
565                 tc.SetLinger(0)
566         }
567         c := newConnection()
568         c.conn = nc
569         c.rw = nc
570         c.Discovery = peerSourceIncoming
571         c.uTP = utp
572         err := cl.runReceivedConn(c)
573         if err != nil {
574                 // log.Print(err)
575         }
576 }
577
578 // Returns a handle to the given torrent, if it's present in the client.
579 func (cl *Client) Torrent(ih metainfo.InfoHash) (T Torrent, ok bool) {
580         cl.mu.Lock()
581         defer cl.mu.Unlock()
582         t, ok := cl.torrents[ih]
583         if !ok {
584                 return
585         }
586         T = Torrent{cl, t}
587         return
588 }
589
590 func (me *Client) torrent(ih metainfo.InfoHash) *torrent {
591         return me.torrents[ih]
592 }
593
594 type dialResult struct {
595         Conn net.Conn
596         UTP  bool
597 }
598
599 func doDial(dial func(addr string, t *torrent) (net.Conn, error), ch chan dialResult, utp bool, addr string, t *torrent) {
600         conn, err := dial(addr, t)
601         if err != nil {
602                 if conn != nil {
603                         conn.Close()
604                 }
605                 conn = nil // Pedantic
606         }
607         ch <- dialResult{conn, utp}
608         if err == nil {
609                 successfulDials.Add(1)
610                 return
611         }
612         unsuccessfulDials.Add(1)
613 }
614
615 func reducedDialTimeout(max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
616         ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
617         if ret < minDialTimeout {
618                 ret = minDialTimeout
619         }
620         return
621 }
622
623 // Returns whether an address is known to connect to a client with our own ID.
624 func (me *Client) dopplegangerAddr(addr string) bool {
625         _, ok := me.dopplegangerAddrs[addr]
626         return ok
627 }
628
629 // Start the process of connecting to the given peer for the given torrent if
630 // appropriate.
631 func (me *Client) initiateConn(peer Peer, t *torrent) {
632         if peer.Id == me.peerID {
633                 return
634         }
635         addr := net.JoinHostPort(peer.IP.String(), fmt.Sprintf("%d", peer.Port))
636         if me.dopplegangerAddr(addr) || t.addrActive(addr) {
637                 duplicateConnsAvoided.Add(1)
638                 return
639         }
640         if r, ok := me.ipBlockRange(peer.IP); ok {
641                 log.Printf("outbound connect to %s blocked by IP blocklist rule %s", peer.IP, r)
642                 return
643         }
644         t.halfOpen[addr] = struct{}{}
645         go me.outgoingConnection(t, addr, peer.Source)
646 }
647
648 func (me *Client) dialTimeout(t *torrent) time.Duration {
649         me.mu.Lock()
650         pendingPeers := len(t.peers)
651         me.mu.Unlock()
652         return reducedDialTimeout(nominalDialTimeout, me.halfOpenLimit, pendingPeers)
653 }
654
655 func (me *Client) dialTCP(addr string, t *torrent) (c net.Conn, err error) {
656         c, err = net.DialTimeout("tcp", addr, me.dialTimeout(t))
657         if err == nil {
658                 c.(*net.TCPConn).SetLinger(0)
659         }
660         return
661 }
662
663 func (me *Client) dialUTP(addr string, t *torrent) (c net.Conn, err error) {
664         return me.utpSock.DialTimeout(addr, me.dialTimeout(t))
665 }
666
667 // Returns a connection over UTP or TCP, whichever is first to connect.
668 func (me *Client) dialFirst(addr string, t *torrent) (conn net.Conn, utp bool) {
669         // Initiate connections via TCP and UTP simultaneously. Use the first one
670         // that succeeds.
671         left := 0
672         if !me.config.DisableUTP {
673                 left++
674         }
675         if !me.config.DisableTCP {
676                 left++
677         }
678         resCh := make(chan dialResult, left)
679         if !me.config.DisableUTP {
680                 go doDial(me.dialUTP, resCh, true, addr, t)
681         }
682         if !me.config.DisableTCP {
683                 go doDial(me.dialTCP, resCh, false, addr, t)
684         }
685         var res dialResult
686         // Wait for a successful connection.
687         for ; left > 0 && res.Conn == nil; left-- {
688                 res = <-resCh
689         }
690         if left > 0 {
691                 // There are still incompleted dials.
692                 go func() {
693                         for ; left > 0; left-- {
694                                 conn := (<-resCh).Conn
695                                 if conn != nil {
696                                         conn.Close()
697                                 }
698                         }
699                 }()
700         }
701         conn = res.Conn
702         utp = res.UTP
703         return
704 }
705
706 func (me *Client) noLongerHalfOpen(t *torrent, addr string) {
707         if _, ok := t.halfOpen[addr]; !ok {
708                 panic("invariant broken")
709         }
710         delete(t.halfOpen, addr)
711         me.openNewConns(t)
712 }
713
714 // Performs initiator handshakes and returns a connection. Returns nil
715 // *connection if no connection for valid reasons.
716 func (me *Client) handshakesConnection(nc net.Conn, t *torrent, encrypted, utp bool) (c *connection, err error) {
717         c = newConnection()
718         c.conn = nc
719         c.rw = nc
720         c.encrypted = encrypted
721         c.uTP = utp
722         err = nc.SetDeadline(time.Now().Add(handshakesTimeout))
723         if err != nil {
724                 return
725         }
726         ok, err := me.initiateHandshakes(c, t)
727         if !ok {
728                 c = nil
729         }
730         return
731 }
732
733 // Returns nil connection and nil error if no connection could be established
734 // for valid reasons.
735 func (me *Client) establishOutgoingConn(t *torrent, addr string) (c *connection, err error) {
736         nc, utp := me.dialFirst(addr, t)
737         if nc == nil {
738                 return
739         }
740         c, err = me.handshakesConnection(nc, t, !me.config.DisableEncryption, utp)
741         if err != nil {
742                 nc.Close()
743                 return
744         } else if c != nil {
745                 return
746         }
747         nc.Close()
748         if me.config.DisableEncryption {
749                 // We already tried without encryption.
750                 return
751         }
752         // Try again without encryption, using whichever protocol type worked last
753         // time.
754         if utp {
755                 nc, err = me.dialUTP(addr, t)
756         } else {
757                 nc, err = me.dialTCP(addr, t)
758         }
759         if err != nil {
760                 err = fmt.Errorf("error dialing for unencrypted connection: %s", err)
761                 return
762         }
763         c, err = me.handshakesConnection(nc, t, false, utp)
764         if err != nil || c == nil {
765                 nc.Close()
766         }
767         return
768 }
769
770 // Called to dial out and run a connection. The addr we're given is already
771 // considered half-open.
772 func (me *Client) outgoingConnection(t *torrent, addr string, ps peerSource) {
773         c, err := me.establishOutgoingConn(t, addr)
774         me.mu.Lock()
775         defer me.mu.Unlock()
776         // Don't release lock between here and addConnection, unless it's for
777         // failure.
778         me.noLongerHalfOpen(t, addr)
779         if err != nil {
780                 if me.config.Debug {
781                         log.Printf("error establishing outgoing connection: %s", err)
782                 }
783                 return
784         }
785         if c == nil {
786                 return
787         }
788         defer c.Close()
789         c.Discovery = ps
790         err = me.runInitiatedHandshookConn(c, t)
791         if err != nil {
792                 if me.config.Debug {
793                         log.Printf("error in established outgoing connection: %s", err)
794                 }
795         }
796 }
797
798 // The port number for incoming peer connections. 0 if the client isn't
799 // listening.
800 func (cl *Client) incomingPeerPort() int {
801         listenAddr := cl.ListenAddr()
802         if listenAddr == nil {
803                 return 0
804         }
805         return missinggo.AddrPort(listenAddr)
806 }
807
808 // Convert a net.Addr to its compact IP representation. Either 4 or 16 bytes
809 // per "yourip" field of http://www.bittorrent.org/beps/bep_0010.html.
810 func addrCompactIP(addr net.Addr) (string, error) {
811         host, _, err := net.SplitHostPort(addr.String())
812         if err != nil {
813                 return "", err
814         }
815         ip := net.ParseIP(host)
816         if v4 := ip.To4(); v4 != nil {
817                 if len(v4) != 4 {
818                         panic(v4)
819                 }
820                 return string(v4), nil
821         }
822         return string(ip.To16()), nil
823 }
824
825 func handshakeWriter(w io.Writer, bb <-chan []byte, done chan<- error) {
826         var err error
827         for b := range bb {
828                 _, err = w.Write(b)
829                 if err != nil {
830                         break
831                 }
832         }
833         done <- err
834 }
835
836 type (
837         peerExtensionBytes [8]byte
838         peerID             [20]byte
839 )
840
841 func (me *peerExtensionBytes) SupportsExtended() bool {
842         return me[5]&0x10 != 0
843 }
844
845 func (me *peerExtensionBytes) SupportsDHT() bool {
846         return me[7]&0x01 != 0
847 }
848
849 func (me *peerExtensionBytes) SupportsFast() bool {
850         return me[7]&0x04 != 0
851 }
852
853 type handshakeResult struct {
854         peerExtensionBytes
855         peerID
856         metainfo.InfoHash
857 }
858
859 // ih is nil if we expect the peer to declare the InfoHash, such as when the
860 // peer initiated the connection. Returns ok if the handshake was successful,
861 // and err if there was an unexpected condition other than the peer simply
862 // abandoning the handshake.
863 func handshake(sock io.ReadWriter, ih *metainfo.InfoHash, peerID [20]byte, extensions peerExtensionBytes) (res handshakeResult, ok bool, err error) {
864         // Bytes to be sent to the peer. Should never block the sender.
865         postCh := make(chan []byte, 4)
866         // A single error value sent when the writer completes.
867         writeDone := make(chan error, 1)
868         // Performs writes to the socket and ensures posts don't block.
869         go handshakeWriter(sock, postCh, writeDone)
870
871         defer func() {
872                 close(postCh) // Done writing.
873                 if !ok {
874                         return
875                 }
876                 if err != nil {
877                         panic(err)
878                 }
879                 // Wait until writes complete before returning from handshake.
880                 err = <-writeDone
881                 if err != nil {
882                         err = fmt.Errorf("error writing: %s", err)
883                 }
884         }()
885
886         post := func(bb []byte) {
887                 select {
888                 case postCh <- bb:
889                 default:
890                         panic("mustn't block while posting")
891                 }
892         }
893
894         post([]byte(pp.Protocol))
895         post(extensions[:])
896         if ih != nil { // We already know what we want.
897                 post(ih[:])
898                 post(peerID[:])
899         }
900         var b [68]byte
901         _, err = io.ReadFull(sock, b[:68])
902         if err != nil {
903                 err = nil
904                 return
905         }
906         if string(b[:20]) != pp.Protocol {
907                 return
908         }
909         missinggo.CopyExact(&res.peerExtensionBytes, b[20:28])
910         missinggo.CopyExact(&res.InfoHash, b[28:48])
911         missinggo.CopyExact(&res.peerID, b[48:68])
912         peerExtensions.Add(hex.EncodeToString(res.peerExtensionBytes[:]), 1)
913
914         // TODO: Maybe we can just drop peers here if we're not interested. This
915         // could prevent them trying to reconnect, falsely believing there was
916         // just a problem.
917         if ih == nil { // We were waiting for the peer to tell us what they wanted.
918                 post(res.InfoHash[:])
919                 post(peerID[:])
920         }
921
922         ok = true
923         return
924 }
925
926 // Wraps a raw connection and provides the interface we want for using the
927 // connection in the message loop.
928 type deadlineReader struct {
929         nc net.Conn
930         r  io.Reader
931 }
932
933 func (me deadlineReader) Read(b []byte) (n int, err error) {
934         // Keep-alives should be received every 2 mins. Give a bit of gracetime.
935         err = me.nc.SetReadDeadline(time.Now().Add(150 * time.Second))
936         if err != nil {
937                 err = fmt.Errorf("error setting read deadline: %s", err)
938         }
939         n, err = me.r.Read(b)
940         // Convert common errors into io.EOF.
941         // if err != nil {
942         //      if opError, ok := err.(*net.OpError); ok && opError.Op == "read" && opError.Err == syscall.ECONNRESET {
943         //              err = io.EOF
944         //      } else if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
945         //              if n != 0 {
946         //                      panic(n)
947         //              }
948         //              err = io.EOF
949         //      }
950         // }
951         return
952 }
953
954 type readWriter struct {
955         io.Reader
956         io.Writer
957 }
958
959 func maybeReceiveEncryptedHandshake(rw io.ReadWriter, skeys [][]byte) (ret io.ReadWriter, encrypted bool, err error) {
960         var protocol [len(pp.Protocol)]byte
961         _, err = io.ReadFull(rw, protocol[:])
962         if err != nil {
963                 return
964         }
965         ret = readWriter{
966                 io.MultiReader(bytes.NewReader(protocol[:]), rw),
967                 rw,
968         }
969         if string(protocol[:]) == pp.Protocol {
970                 return
971         }
972         encrypted = true
973         ret, err = mse.ReceiveHandshake(ret, skeys)
974         return
975 }
976
977 func (cl *Client) receiveSkeys() (ret [][]byte) {
978         for ih := range cl.torrents {
979                 ret = append(ret, ih[:])
980         }
981         return
982 }
983
984 func (me *Client) initiateHandshakes(c *connection, t *torrent) (ok bool, err error) {
985         if c.encrypted {
986                 c.rw, err = mse.InitiateHandshake(c.rw, t.InfoHash[:], nil)
987                 if err != nil {
988                         return
989                 }
990         }
991         ih, ok, err := me.connBTHandshake(c, &t.InfoHash)
992         if ih != t.InfoHash {
993                 ok = false
994         }
995         return
996 }
997
998 // Do encryption and bittorrent handshakes as receiver.
999 func (cl *Client) receiveHandshakes(c *connection) (t *torrent, err error) {
1000         cl.mu.Lock()
1001         skeys := cl.receiveSkeys()
1002         cl.mu.Unlock()
1003         if !cl.config.DisableEncryption {
1004                 c.rw, c.encrypted, err = maybeReceiveEncryptedHandshake(c.rw, skeys)
1005                 if err != nil {
1006                         if err == mse.ErrNoSecretKeyMatch {
1007                                 err = nil
1008                         }
1009                         return
1010                 }
1011         }
1012         ih, ok, err := cl.connBTHandshake(c, nil)
1013         if err != nil {
1014                 err = fmt.Errorf("error during bt handshake: %s", err)
1015                 return
1016         }
1017         if !ok {
1018                 return
1019         }
1020         cl.mu.Lock()
1021         t = cl.torrents[ih]
1022         cl.mu.Unlock()
1023         return
1024 }
1025
1026 // Returns !ok if handshake failed for valid reasons.
1027 func (cl *Client) connBTHandshake(c *connection, ih *metainfo.InfoHash) (ret metainfo.InfoHash, ok bool, err error) {
1028         res, ok, err := handshake(c.rw, ih, cl.peerID, cl.extensionBytes)
1029         if err != nil || !ok {
1030                 return
1031         }
1032         ret = res.InfoHash
1033         c.PeerExtensionBytes = res.peerExtensionBytes
1034         c.PeerID = res.peerID
1035         c.completedHandshake = time.Now()
1036         return
1037 }
1038
1039 func (cl *Client) runInitiatedHandshookConn(c *connection, t *torrent) (err error) {
1040         if c.PeerID == cl.peerID {
1041                 // Only if we initiated the connection is the remote address a
1042                 // listen addr for a doppleganger.
1043                 connsToSelf.Add(1)
1044                 addr := c.conn.RemoteAddr().String()
1045                 cl.dopplegangerAddrs[addr] = struct{}{}
1046                 return
1047         }
1048         return cl.runHandshookConn(c, t)
1049 }
1050
1051 func (cl *Client) runReceivedConn(c *connection) (err error) {
1052         err = c.conn.SetDeadline(time.Now().Add(handshakesTimeout))
1053         if err != nil {
1054                 return
1055         }
1056         t, err := cl.receiveHandshakes(c)
1057         if err != nil {
1058                 err = fmt.Errorf("error receiving handshakes: %s", err)
1059                 return
1060         }
1061         if t == nil {
1062                 return
1063         }
1064         cl.mu.Lock()
1065         defer cl.mu.Unlock()
1066         if c.PeerID == cl.peerID {
1067                 return
1068         }
1069         return cl.runHandshookConn(c, t)
1070 }
1071
1072 func (cl *Client) runHandshookConn(c *connection, t *torrent) (err error) {
1073         c.conn.SetWriteDeadline(time.Time{})
1074         c.rw = readWriter{
1075                 deadlineReader{c.conn, c.rw},
1076                 c.rw,
1077         }
1078         completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
1079         if !cl.addConnection(t, c) {
1080                 return
1081         }
1082         defer cl.dropConnection(t, c)
1083         go c.writer()
1084         go c.writeOptimizer(time.Minute)
1085         cl.sendInitialMessages(c, t)
1086         err = cl.connectionLoop(t, c)
1087         if err != nil {
1088                 err = fmt.Errorf("error during connection loop: %s", err)
1089         }
1090         return
1091 }
1092
1093 func (me *Client) sendInitialMessages(conn *connection, torrent *torrent) {
1094         if conn.PeerExtensionBytes.SupportsExtended() && me.extensionBytes.SupportsExtended() {
1095                 conn.Post(pp.Message{
1096                         Type:       pp.Extended,
1097                         ExtendedID: pp.HandshakeExtendedID,
1098                         ExtendedPayload: func() []byte {
1099                                 d := map[string]interface{}{
1100                                         "m": func() (ret map[string]int) {
1101                                                 ret = make(map[string]int, 2)
1102                                                 ret["ut_metadata"] = metadataExtendedId
1103                                                 if !me.config.DisablePEX {
1104                                                         ret["ut_pex"] = pexExtendedId
1105                                                 }
1106                                                 return
1107                                         }(),
1108                                         "v": extendedHandshakeClientVersion,
1109                                         // No upload queue is implemented yet.
1110                                         "reqq": 64,
1111                                 }
1112                                 if !me.config.DisableEncryption {
1113                                         d["e"] = 1
1114                                 }
1115                                 if torrent.metadataSizeKnown() {
1116                                         d["metadata_size"] = torrent.metadataSize()
1117                                 }
1118                                 if p := me.incomingPeerPort(); p != 0 {
1119                                         d["p"] = p
1120                                 }
1121                                 yourip, err := addrCompactIP(conn.remoteAddr())
1122                                 if err != nil {
1123                                         log.Printf("error calculating yourip field value in extension handshake: %s", err)
1124                                 } else {
1125                                         d["yourip"] = yourip
1126                                 }
1127                                 // log.Printf("sending %v", d)
1128                                 b, err := bencode.Marshal(d)
1129                                 if err != nil {
1130                                         panic(err)
1131                                 }
1132                                 return b
1133                         }(),
1134                 })
1135         }
1136         if torrent.haveAnyPieces() {
1137                 conn.Bitfield(torrent.bitfield())
1138         } else if me.extensionBytes.SupportsFast() && conn.PeerExtensionBytes.SupportsFast() {
1139                 conn.Post(pp.Message{
1140                         Type: pp.HaveNone,
1141                 })
1142         }
1143         if conn.PeerExtensionBytes.SupportsDHT() && me.extensionBytes.SupportsDHT() && me.dHT != nil {
1144                 conn.Post(pp.Message{
1145                         Type: pp.Port,
1146                         Port: uint16(missinggo.AddrPort(me.dHT.Addr())),
1147                 })
1148         }
1149 }
1150
1151 func (me *Client) peerUnchoked(torrent *torrent, conn *connection) {
1152         conn.updateRequests()
1153 }
1154
1155 func (cl *Client) connCancel(t *torrent, cn *connection, r request) (ok bool) {
1156         ok = cn.Cancel(r)
1157         if ok {
1158                 postedCancels.Add(1)
1159         }
1160         return
1161 }
1162
1163 func (cl *Client) connDeleteRequest(t *torrent, cn *connection, r request) bool {
1164         if !cn.RequestPending(r) {
1165                 return false
1166         }
1167         delete(cn.Requests, r)
1168         return true
1169 }
1170
1171 func (cl *Client) requestPendingMetadata(t *torrent, c *connection) {
1172         if t.haveInfo() {
1173                 return
1174         }
1175         if c.PeerExtensionIDs["ut_metadata"] == 0 {
1176                 // Peer doesn't support this.
1177                 return
1178         }
1179         // Request metadata pieces that we don't have in a random order.
1180         var pending []int
1181         for index := 0; index < t.metadataPieceCount(); index++ {
1182                 if !t.haveMetadataPiece(index) && !c.requestedMetadataPiece(index) {
1183                         pending = append(pending, index)
1184                 }
1185         }
1186         for _, i := range mathRand.Perm(len(pending)) {
1187                 c.requestMetadataPiece(pending[i])
1188         }
1189 }
1190
1191 func (cl *Client) completedMetadata(t *torrent) {
1192         h := sha1.New()
1193         h.Write(t.metadataBytes)
1194         var ih metainfo.InfoHash
1195         missinggo.CopyExact(&ih, h.Sum(nil))
1196         if ih != t.InfoHash {
1197                 log.Print("bad metadata")
1198                 t.invalidateMetadata()
1199                 return
1200         }
1201         var info metainfo.Info
1202         err := bencode.Unmarshal(t.metadataBytes, &info)
1203         if err != nil {
1204                 log.Printf("error unmarshalling metadata: %s", err)
1205                 t.invalidateMetadata()
1206                 return
1207         }
1208         // TODO(anacrolix): If this fails, I think something harsher should be
1209         // done.
1210         err = cl.setMetaData(t, &info, t.metadataBytes)
1211         if err != nil {
1212                 log.Printf("error setting metadata: %s", err)
1213                 t.invalidateMetadata()
1214                 return
1215         }
1216         if cl.config.Debug {
1217                 log.Printf("%s: got metadata from peers", t)
1218         }
1219 }
1220
1221 // Process incoming ut_metadata message.
1222 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *torrent, c *connection) (err error) {
1223         var d map[string]int
1224         err = bencode.Unmarshal(payload, &d)
1225         if err != nil {
1226                 err = fmt.Errorf("error unmarshalling payload: %s: %q", err, payload)
1227                 return
1228         }
1229         msgType, ok := d["msg_type"]
1230         if !ok {
1231                 err = errors.New("missing msg_type field")
1232                 return
1233         }
1234         piece := d["piece"]
1235         switch msgType {
1236         case pp.DataMetadataExtensionMsgType:
1237                 if t.haveInfo() {
1238                         break
1239                 }
1240                 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
1241                 if begin < 0 || begin >= len(payload) {
1242                         log.Printf("got bad metadata piece")
1243                         break
1244                 }
1245                 if !c.requestedMetadataPiece(piece) {
1246                         log.Printf("got unexpected metadata piece %d", piece)
1247                         break
1248                 }
1249                 c.metadataRequests[piece] = false
1250                 t.saveMetadataPiece(piece, payload[begin:])
1251                 c.UsefulChunksReceived++
1252                 c.lastUsefulChunkReceived = time.Now()
1253                 if !t.haveAllMetadataPieces() {
1254                         break
1255                 }
1256                 cl.completedMetadata(t)
1257         case pp.RequestMetadataExtensionMsgType:
1258                 if !t.haveMetadataPiece(piece) {
1259                         c.Post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
1260                         break
1261                 }
1262                 start := (1 << 14) * piece
1263                 c.Post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1264         case pp.RejectMetadataExtensionMsgType:
1265         default:
1266                 err = errors.New("unknown msg_type value")
1267         }
1268         return
1269 }
1270
1271 func (me *Client) upload(t *torrent, c *connection) {
1272         if me.config.NoUpload {
1273                 return
1274         }
1275         if !c.PeerInterested {
1276                 return
1277         }
1278         seeding := me.seeding(t)
1279         if !seeding && !t.connHasWantedPieces(c) {
1280                 return
1281         }
1282 another:
1283         for seeding || c.chunksSent < c.UsefulChunksReceived+6 {
1284                 c.Unchoke()
1285                 for r := range c.PeerRequests {
1286                         err := me.sendChunk(t, c, r)
1287                         if err != nil {
1288                                 if t.pieceComplete(int(r.Index)) && err == io.ErrUnexpectedEOF {
1289                                         // We had the piece, but not anymore.
1290                                 } else {
1291                                         log.Printf("error sending chunk %+v to peer: %s", r, err)
1292                                 }
1293                                 // If we failed to send a chunk, choke the peer to ensure they
1294                                 // flush all their requests. We've probably dropped a piece,
1295                                 // but there's no way to communicate this to the peer. If they
1296                                 // ask for it again, we'll kick them to allow us to send them
1297                                 // an updated bitfield.
1298                                 break another
1299                         }
1300                         delete(c.PeerRequests, r)
1301                         goto another
1302                 }
1303                 return
1304         }
1305         c.Choke()
1306 }
1307
1308 func (me *Client) sendChunk(t *torrent, c *connection, r request) error {
1309         // Count the chunk being sent, even if it isn't.
1310         b := make([]byte, r.Length)
1311         p := t.info.Piece(int(r.Index))
1312         n, err := t.readAt(b, p.Offset()+int64(r.Begin))
1313         if n != len(b) {
1314                 if err == nil {
1315                         panic("expected error")
1316                 }
1317                 return err
1318         }
1319         c.Post(pp.Message{
1320                 Type:  pp.Piece,
1321                 Index: r.Index,
1322                 Begin: r.Begin,
1323                 Piece: b,
1324         })
1325         c.chunksSent++
1326         uploadChunksPosted.Add(1)
1327         c.lastChunkSent = time.Now()
1328         return nil
1329 }
1330
1331 // Processes incoming bittorrent messages. The client lock is held upon entry
1332 // and exit.
1333 func (me *Client) connectionLoop(t *torrent, c *connection) error {
1334         decoder := pp.Decoder{
1335                 R:         bufio.NewReader(c.rw),
1336                 MaxLength: 256 * 1024,
1337         }
1338         for {
1339                 me.mu.Unlock()
1340                 var msg pp.Message
1341                 err := decoder.Decode(&msg)
1342                 me.mu.Lock()
1343                 if me.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
1344                         return nil
1345                 }
1346                 if err != nil {
1347                         return err
1348                 }
1349                 c.lastMessageReceived = time.Now()
1350                 if msg.Keepalive {
1351                         receivedKeepalives.Add(1)
1352                         continue
1353                 }
1354                 receivedMessageTypes.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
1355                 switch msg.Type {
1356                 case pp.Choke:
1357                         c.PeerChoked = true
1358                         c.Requests = nil
1359                         // We can then reset our interest.
1360                         c.updateRequests()
1361                 case pp.Reject:
1362                         me.connDeleteRequest(t, c, newRequest(msg.Index, msg.Begin, msg.Length))
1363                         c.updateRequests()
1364                 case pp.Unchoke:
1365                         c.PeerChoked = false
1366                         me.peerUnchoked(t, c)
1367                 case pp.Interested:
1368                         c.PeerInterested = true
1369                         me.upload(t, c)
1370                 case pp.NotInterested:
1371                         c.PeerInterested = false
1372                         c.Choke()
1373                 case pp.Have:
1374                         err = c.peerSentHave(int(msg.Index))
1375                 case pp.Request:
1376                         if c.Choked {
1377                                 break
1378                         }
1379                         if !c.PeerInterested {
1380                                 err = errors.New("peer sent request but isn't interested")
1381                                 break
1382                         }
1383                         if !t.havePiece(msg.Index.Int()) {
1384                                 // This isn't necessarily them screwing up. We can drop pieces
1385                                 // from our storage, and can't communicate this to peers
1386                                 // except by reconnecting.
1387                                 requestsReceivedForMissingPieces.Add(1)
1388                                 err = errors.New("peer requested piece we don't have")
1389                                 break
1390                         }
1391                         if c.PeerRequests == nil {
1392                                 c.PeerRequests = make(map[request]struct{}, maxRequests)
1393                         }
1394                         c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{}
1395                         me.upload(t, c)
1396                 case pp.Cancel:
1397                         req := newRequest(msg.Index, msg.Begin, msg.Length)
1398                         if !c.PeerCancel(req) {
1399                                 unexpectedCancels.Add(1)
1400                         }
1401                 case pp.Bitfield:
1402                         err = c.peerSentBitfield(msg.Bitfield)
1403                 case pp.HaveAll:
1404                         err = c.peerSentHaveAll()
1405                 case pp.HaveNone:
1406                         err = c.peerSentHaveNone()
1407                 case pp.Piece:
1408                         me.downloadedChunk(t, c, &msg)
1409                 case pp.Extended:
1410                         switch msg.ExtendedID {
1411                         case pp.HandshakeExtendedID:
1412                                 // TODO: Create a bencode struct for this.
1413                                 var d map[string]interface{}
1414                                 err = bencode.Unmarshal(msg.ExtendedPayload, &d)
1415                                 if err != nil {
1416                                         err = fmt.Errorf("error decoding extended message payload: %s", err)
1417                                         break
1418                                 }
1419                                 // log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d)
1420                                 if reqq, ok := d["reqq"]; ok {
1421                                         if i, ok := reqq.(int64); ok {
1422                                                 c.PeerMaxRequests = int(i)
1423                                         }
1424                                 }
1425                                 if v, ok := d["v"]; ok {
1426                                         c.PeerClientName = v.(string)
1427                                 }
1428                                 m, ok := d["m"]
1429                                 if !ok {
1430                                         err = errors.New("handshake missing m item")
1431                                         break
1432                                 }
1433                                 mTyped, ok := m.(map[string]interface{})
1434                                 if !ok {
1435                                         err = errors.New("handshake m value is not dict")
1436                                         break
1437                                 }
1438                                 if c.PeerExtensionIDs == nil {
1439                                         c.PeerExtensionIDs = make(map[string]byte, len(mTyped))
1440                                 }
1441                                 for name, v := range mTyped {
1442                                         id, ok := v.(int64)
1443                                         if !ok {
1444                                                 log.Printf("bad handshake m item extension ID type: %T", v)
1445                                                 continue
1446                                         }
1447                                         if id == 0 {
1448                                                 delete(c.PeerExtensionIDs, name)
1449                                         } else {
1450                                                 if c.PeerExtensionIDs[name] == 0 {
1451                                                         supportedExtensionMessages.Add(name, 1)
1452                                                 }
1453                                                 c.PeerExtensionIDs[name] = byte(id)
1454                                         }
1455                                 }
1456                                 metadata_sizeUntyped, ok := d["metadata_size"]
1457                                 if ok {
1458                                         metadata_size, ok := metadata_sizeUntyped.(int64)
1459                                         if !ok {
1460                                                 log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
1461                                         } else {
1462                                                 t.setMetadataSize(metadata_size, me)
1463                                         }
1464                                 }
1465                                 if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
1466                                         me.requestPendingMetadata(t, c)
1467                                 }
1468                         case metadataExtendedId:
1469                                 err = me.gotMetadataExtensionMsg(msg.ExtendedPayload, t, c)
1470                                 if err != nil {
1471                                         err = fmt.Errorf("error handling metadata extension message: %s", err)
1472                                 }
1473                         case pexExtendedId:
1474                                 if me.config.DisablePEX {
1475                                         break
1476                                 }
1477                                 var pexMsg peerExchangeMessage
1478                                 err := bencode.Unmarshal(msg.ExtendedPayload, &pexMsg)
1479                                 if err != nil {
1480                                         err = fmt.Errorf("error unmarshalling PEX message: %s", err)
1481                                         break
1482                                 }
1483                                 go func() {
1484                                         me.mu.Lock()
1485                                         me.addPeers(t, func() (ret []Peer) {
1486                                                 for i, cp := range pexMsg.Added {
1487                                                         p := Peer{
1488                                                                 IP:     make([]byte, 4),
1489                                                                 Port:   int(cp.Port),
1490                                                                 Source: peerSourcePEX,
1491                                                         }
1492                                                         if i < len(pexMsg.AddedFlags) && pexMsg.AddedFlags[i]&0x01 != 0 {
1493                                                                 p.SupportsEncryption = true
1494                                                         }
1495                                                         missinggo.CopyExact(p.IP, cp.IP[:])
1496                                                         ret = append(ret, p)
1497                                                 }
1498                                                 return
1499                                         }())
1500                                         me.mu.Unlock()
1501                                 }()
1502                         default:
1503                                 err = fmt.Errorf("unexpected extended message ID: %v", msg.ExtendedID)
1504                         }
1505                         if err != nil {
1506                                 // That client uses its own extension IDs for outgoing message
1507                                 // types, which is incorrect.
1508                                 if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) ||
1509                                         strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
1510                                         return nil
1511                                 }
1512                         }
1513                 case pp.Port:
1514                         if me.dHT == nil {
1515                                 break
1516                         }
1517                         pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
1518                         if err != nil {
1519                                 panic(err)
1520                         }
1521                         if msg.Port != 0 {
1522                                 pingAddr.Port = int(msg.Port)
1523                         }
1524                         _, err = me.dHT.Ping(pingAddr)
1525                 default:
1526                         err = fmt.Errorf("received unknown message type: %#v", msg.Type)
1527                 }
1528                 if err != nil {
1529                         return err
1530                 }
1531         }
1532 }
1533
1534 // Returns true if connection is removed from torrent.Conns.
1535 func (me *Client) deleteConnection(t *torrent, c *connection) bool {
1536         for i0, _c := range t.conns {
1537                 if _c != c {
1538                         continue
1539                 }
1540                 i1 := len(t.conns) - 1
1541                 if i0 != i1 {
1542                         t.conns[i0] = t.conns[i1]
1543                 }
1544                 t.conns = t.conns[:i1]
1545                 return true
1546         }
1547         return false
1548 }
1549
1550 func (me *Client) dropConnection(t *torrent, c *connection) {
1551         me.event.Broadcast()
1552         c.Close()
1553         if me.deleteConnection(t, c) {
1554                 me.openNewConns(t)
1555         }
1556 }
1557
1558 // Returns true if the connection is added.
1559 func (me *Client) addConnection(t *torrent, c *connection) bool {
1560         if me.closed.IsSet() {
1561                 return false
1562         }
1563         select {
1564         case <-t.ceasingNetworking:
1565                 return false
1566         default:
1567         }
1568         if !me.wantConns(t) {
1569                 return false
1570         }
1571         for _, c0 := range t.conns {
1572                 if c.PeerID == c0.PeerID {
1573                         // Already connected to a client with that ID.
1574                         duplicateClientConns.Add(1)
1575                         return false
1576                 }
1577         }
1578         if len(t.conns) >= socketsPerTorrent {
1579                 c := t.worstBadConn(me)
1580                 if c == nil {
1581                         return false
1582                 }
1583                 if me.config.Debug && missinggo.CryHeard() {
1584                         log.Printf("%s: dropping connection to make room for new one:\n    %s", t, c)
1585                 }
1586                 c.Close()
1587                 me.deleteConnection(t, c)
1588         }
1589         if len(t.conns) >= socketsPerTorrent {
1590                 panic(len(t.conns))
1591         }
1592         t.conns = append(t.conns, c)
1593         c.t = t
1594         return true
1595 }
1596
1597 func (t *torrent) readerPieces() (ret bitmap.Bitmap) {
1598         t.forReaderOffsetPieces(func(begin, end int) bool {
1599                 ret.AddRange(begin, end)
1600                 return true
1601         })
1602         return
1603 }
1604
1605 func (t *torrent) needData() bool {
1606         if !t.haveInfo() {
1607                 return true
1608         }
1609         if t.pendingPieces.Len() != 0 {
1610                 return true
1611         }
1612         return !t.readerPieces().IterTyped(func(piece int) bool {
1613                 return t.pieceComplete(piece)
1614         })
1615 }
1616
1617 func (cl *Client) usefulConn(t *torrent, c *connection) bool {
1618         if c.closed.IsSet() {
1619                 return false
1620         }
1621         if !t.haveInfo() {
1622                 return c.supportsExtension("ut_metadata")
1623         }
1624         if cl.seeding(t) {
1625                 return c.PeerInterested
1626         }
1627         return t.connHasWantedPieces(c)
1628 }
1629
1630 func (me *Client) wantConns(t *torrent) bool {
1631         if !me.seeding(t) && !t.needData() {
1632                 return false
1633         }
1634         if len(t.conns) < socketsPerTorrent {
1635                 return true
1636         }
1637         return t.worstBadConn(me) != nil
1638 }
1639
1640 func (me *Client) openNewConns(t *torrent) {
1641         select {
1642         case <-t.ceasingNetworking:
1643                 return
1644         default:
1645         }
1646         for len(t.peers) != 0 {
1647                 if !me.wantConns(t) {
1648                         return
1649                 }
1650                 if len(t.halfOpen) >= me.halfOpenLimit {
1651                         return
1652                 }
1653                 var (
1654                         k peersKey
1655                         p Peer
1656                 )
1657                 for k, p = range t.peers {
1658                         break
1659                 }
1660                 delete(t.peers, k)
1661                 me.initiateConn(p, t)
1662         }
1663         t.wantPeers.Broadcast()
1664 }
1665
1666 func (me *Client) addPeers(t *torrent, peers []Peer) {
1667         for _, p := range peers {
1668                 if me.dopplegangerAddr(net.JoinHostPort(
1669                         p.IP.String(),
1670                         strconv.FormatInt(int64(p.Port), 10),
1671                 )) {
1672                         continue
1673                 }
1674                 if _, ok := me.ipBlockRange(p.IP); ok {
1675                         continue
1676                 }
1677                 if p.Port == 0 {
1678                         // The spec says to scrub these yourselves. Fine.
1679                         continue
1680                 }
1681                 t.addPeer(p, me)
1682         }
1683 }
1684
1685 func (cl *Client) cachedMetaInfoFilename(ih metainfo.InfoHash) string {
1686         return filepath.Join(cl.configDir(), "torrents", ih.HexString()+".torrent")
1687 }
1688
1689 func (cl *Client) saveTorrentFile(t *torrent) error {
1690         path := cl.cachedMetaInfoFilename(t.InfoHash)
1691         os.MkdirAll(filepath.Dir(path), 0777)
1692         f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
1693         if err != nil {
1694                 return fmt.Errorf("error opening file: %s", err)
1695         }
1696         defer f.Close()
1697         e := bencode.NewEncoder(f)
1698         err = e.Encode(t.MetaInfo())
1699         if err != nil {
1700                 return fmt.Errorf("error marshalling metainfo: %s", err)
1701         }
1702         mi, err := cl.torrentCacheMetaInfo(t.InfoHash)
1703         if err != nil {
1704                 // For example, a script kiddy makes us load too many files, and we're
1705                 // able to save the torrent, but not load it again to check it.
1706                 return nil
1707         }
1708         if !bytes.Equal(mi.Info.Hash.Bytes(), t.InfoHash[:]) {
1709                 log.Fatalf("%x != %x", mi.Info.Hash, t.InfoHash[:])
1710         }
1711         return nil
1712 }
1713
1714 func (cl *Client) setMetaData(t *torrent, md *metainfo.Info, bytes []byte) (err error) {
1715         err = t.setMetadata(md, bytes)
1716         if err != nil {
1717                 return
1718         }
1719         if !cl.config.DisableMetainfoCache {
1720                 if err := cl.saveTorrentFile(t); err != nil {
1721                         log.Printf("error saving torrent file for %s: %s", t, err)
1722                 }
1723         }
1724         cl.event.Broadcast()
1725         close(t.gotMetainfo)
1726         return
1727 }
1728
1729 // Prepare a Torrent without any attachment to a Client. That means we can
1730 // initialize fields all fields that don't require the Client without locking
1731 // it.
1732 func newTorrent(ih metainfo.InfoHash) (t *torrent) {
1733         t = &torrent{
1734                 InfoHash:  ih,
1735                 chunkSize: defaultChunkSize,
1736                 peers:     make(map[peersKey]Peer),
1737
1738                 closing:           make(chan struct{}),
1739                 ceasingNetworking: make(chan struct{}),
1740
1741                 gotMetainfo: make(chan struct{}),
1742
1743                 halfOpen:          make(map[string]struct{}),
1744                 pieceStateChanges: pubsub.NewPubSub(),
1745         }
1746         return
1747 }
1748
1749 func init() {
1750         // For shuffling the tracker tiers.
1751         mathRand.Seed(time.Now().Unix())
1752 }
1753
1754 type trackerTier []string
1755
1756 // The trackers within each tier must be shuffled before use.
1757 // http://stackoverflow.com/a/12267471/149482
1758 // http://www.bittorrent.org/beps/bep_0012.html#order-of-processing
1759 func shuffleTier(tier trackerTier) {
1760         for i := range tier {
1761                 j := mathRand.Intn(i + 1)
1762                 tier[i], tier[j] = tier[j], tier[i]
1763         }
1764 }
1765
1766 func copyTrackers(base []trackerTier) (copy []trackerTier) {
1767         for _, tier := range base {
1768                 copy = append(copy, append(trackerTier(nil), tier...))
1769         }
1770         return
1771 }
1772
1773 func mergeTier(tier trackerTier, newURLs []string) trackerTier {
1774 nextURL:
1775         for _, url := range newURLs {
1776                 for _, trURL := range tier {
1777                         if trURL == url {
1778                                 continue nextURL
1779                         }
1780                 }
1781                 tier = append(tier, url)
1782         }
1783         return tier
1784 }
1785
1786 func (t *torrent) addTrackers(announceList [][]string) {
1787         newTrackers := copyTrackers(t.trackers)
1788         for tierIndex, tier := range announceList {
1789                 if tierIndex < len(newTrackers) {
1790                         newTrackers[tierIndex] = mergeTier(newTrackers[tierIndex], tier)
1791                 } else {
1792                         newTrackers = append(newTrackers, mergeTier(nil, tier))
1793                 }
1794                 shuffleTier(newTrackers[tierIndex])
1795         }
1796         t.trackers = newTrackers
1797 }
1798
1799 // Don't call this before the info is available.
1800 func (t *torrent) bytesCompleted() int64 {
1801         if !t.haveInfo() {
1802                 return 0
1803         }
1804         return t.info.TotalLength() - t.bytesLeft()
1805 }
1806
1807 // A file-like handle to some torrent data resource.
1808 type Handle interface {
1809         io.Reader
1810         io.Seeker
1811         io.Closer
1812         io.ReaderAt
1813 }
1814
1815 // Returns handles to the files in the torrent. This requires the metainfo is
1816 // available first.
1817 func (t Torrent) Files() (ret []File) {
1818         t.cl.mu.Lock()
1819         info := t.Info()
1820         t.cl.mu.Unlock()
1821         if info == nil {
1822                 return
1823         }
1824         var offset int64
1825         for _, fi := range info.UpvertedFiles() {
1826                 ret = append(ret, File{
1827                         t,
1828                         strings.Join(append([]string{info.Name}, fi.Path...), "/"),
1829                         offset,
1830                         fi.Length,
1831                         fi,
1832                 })
1833                 offset += fi.Length
1834         }
1835         return
1836 }
1837
1838 func (t Torrent) AddPeers(pp []Peer) error {
1839         cl := t.cl
1840         cl.mu.Lock()
1841         defer cl.mu.Unlock()
1842         cl.addPeers(t.torrent, pp)
1843         return nil
1844 }
1845
1846 // Marks the entire torrent for download. Requires the info first, see
1847 // GotInfo.
1848 func (t Torrent) DownloadAll() {
1849         t.cl.mu.Lock()
1850         defer t.cl.mu.Unlock()
1851         t.torrent.pendPieceRange(0, t.torrent.numPieces())
1852 }
1853
1854 // Returns nil metainfo if it isn't in the cache. Checks that the retrieved
1855 // metainfo has the correct infohash.
1856 func (cl *Client) torrentCacheMetaInfo(ih metainfo.InfoHash) (mi *metainfo.MetaInfo, err error) {
1857         if cl.config.DisableMetainfoCache {
1858                 return
1859         }
1860         f, err := os.Open(cl.cachedMetaInfoFilename(ih))
1861         if err != nil {
1862                 if os.IsNotExist(err) {
1863                         err = nil
1864                 }
1865                 return
1866         }
1867         defer f.Close()
1868         dec := bencode.NewDecoder(f)
1869         err = dec.Decode(&mi)
1870         if err != nil {
1871                 return
1872         }
1873         if !bytes.Equal(mi.Info.Hash.Bytes(), ih[:]) {
1874                 err = fmt.Errorf("cached torrent has wrong infohash: %x != %x", mi.Info.Hash, ih[:])
1875                 return
1876         }
1877         return
1878 }
1879
1880 // Specifies a new torrent for adding to a client. There are helpers for
1881 // magnet URIs and torrent metainfo files.
1882 type TorrentSpec struct {
1883         // The tiered tracker URIs.
1884         Trackers [][]string
1885         InfoHash metainfo.InfoHash
1886         Info     *metainfo.InfoEx
1887         // The name to use if the Name field from the Info isn't available.
1888         DisplayName string
1889         // The chunk size to use for outbound requests. Defaults to 16KiB if not
1890         // set.
1891         ChunkSize int
1892         Storage   storage.I
1893 }
1894
1895 func TorrentSpecFromMagnetURI(uri string) (spec *TorrentSpec, err error) {
1896         m, err := ParseMagnetURI(uri)
1897         if err != nil {
1898                 return
1899         }
1900         spec = &TorrentSpec{
1901                 Trackers:    [][]string{m.Trackers},
1902                 DisplayName: m.DisplayName,
1903                 InfoHash:    m.InfoHash,
1904         }
1905         return
1906 }
1907
1908 func TorrentSpecFromMetaInfo(mi *metainfo.MetaInfo) (spec *TorrentSpec) {
1909         spec = &TorrentSpec{
1910                 Trackers:    mi.AnnounceList,
1911                 Info:        &mi.Info,
1912                 DisplayName: mi.Info.Name,
1913         }
1914
1915         if len(spec.Trackers) == 0 {
1916                 spec.Trackers = [][]string{[]string{mi.Announce}}
1917         } else {
1918                 spec.Trackers[0] = append(spec.Trackers[0], mi.Announce)
1919         }
1920
1921         missinggo.CopyExact(&spec.InfoHash, mi.Info.Hash)
1922         return
1923 }
1924
1925 // Add or merge a torrent spec. If the torrent is already present, the
1926 // trackers will be merged with the existing ones. If the Info isn't yet
1927 // known, it will be set. The display name is replaced if the new spec
1928 // provides one. Returns new if the torrent wasn't already in the client.
1929 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (T Torrent, new bool, err error) {
1930         T.cl = cl
1931         cl.mu.Lock()
1932         defer cl.mu.Unlock()
1933
1934         t, ok := cl.torrents[spec.InfoHash]
1935         if !ok {
1936                 new = true
1937
1938                 // TODO: This doesn't belong in the core client, it's more of a
1939                 // helper.
1940                 if _, ok := cl.bannedTorrents[spec.InfoHash]; ok {
1941                         err = errors.New("banned torrent")
1942                         return
1943                 }
1944                 // TODO: Tidy this up?
1945                 t = newTorrent(spec.InfoHash)
1946                 t.cl = cl
1947                 t.wantPeers.L = &cl.mu
1948                 if spec.ChunkSize != 0 {
1949                         t.chunkSize = pp.Integer(spec.ChunkSize)
1950                 }
1951                 t.storageOpener = spec.Storage
1952                 if t.storageOpener == nil {
1953                         t.storageOpener = cl.defaultStorage
1954                 }
1955         }
1956         if spec.DisplayName != "" {
1957                 t.setDisplayName(spec.DisplayName)
1958         }
1959         // Try to merge in info we have on the torrent. Any err left will
1960         // terminate the function.
1961         if t.info == nil {
1962                 if spec.Info != nil {
1963                         err = cl.setMetaData(t, &spec.Info.Info, spec.Info.Bytes)
1964                 } else {
1965                         var mi *metainfo.MetaInfo
1966                         mi, err = cl.torrentCacheMetaInfo(spec.InfoHash)
1967                         if err != nil {
1968                                 log.Printf("error getting cached metainfo: %s", err)
1969                                 err = nil
1970                         } else if mi != nil {
1971                                 t.addTrackers(mi.AnnounceList)
1972                                 err = cl.setMetaData(t, &mi.Info.Info, mi.Info.Bytes)
1973                         }
1974                 }
1975         }
1976         if err != nil {
1977                 return
1978         }
1979         t.addTrackers(spec.Trackers)
1980
1981         cl.torrents[spec.InfoHash] = t
1982         T.torrent = t
1983
1984         // From this point onwards, we can consider the torrent a part of the
1985         // client.
1986         if new {
1987                 if !cl.config.DisableTrackers {
1988                         go cl.announceTorrentTrackers(T.torrent)
1989                 }
1990                 if cl.dHT != nil {
1991                         go cl.announceTorrentDHT(T.torrent, true)
1992                 }
1993         }
1994         return
1995 }
1996
1997 func (me *Client) dropTorrent(infoHash metainfo.InfoHash) (err error) {
1998         t, ok := me.torrents[infoHash]
1999         if !ok {
2000                 err = fmt.Errorf("no such torrent")
2001                 return
2002         }
2003         err = t.close()
2004         if err != nil {
2005                 panic(err)
2006         }
2007         delete(me.torrents, infoHash)
2008         return
2009 }
2010
2011 // Returns true when peers are required, or false if the torrent is closing.
2012 func (cl *Client) waitWantPeers(t *torrent) bool {
2013         cl.mu.Lock()
2014         defer cl.mu.Unlock()
2015         for {
2016                 select {
2017                 case <-t.ceasingNetworking:
2018                         return false
2019                 default:
2020                 }
2021                 if len(t.peers) > torrentPeersLowWater {
2022                         goto wait
2023                 }
2024                 if t.needData() || cl.seeding(t) {
2025                         return true
2026                 }
2027         wait:
2028                 t.wantPeers.Wait()
2029         }
2030 }
2031
2032 // Returns whether the client should make effort to seed the torrent.
2033 func (cl *Client) seeding(t *torrent) bool {
2034         if cl.config.NoUpload {
2035                 return false
2036         }
2037         if !cl.config.Seed {
2038                 return false
2039         }
2040         if t.needData() {
2041                 return false
2042         }
2043         return true
2044 }
2045
2046 func (cl *Client) announceTorrentDHT(t *torrent, impliedPort bool) {
2047         for cl.waitWantPeers(t) {
2048                 // log.Printf("getting peers for %q from DHT", t)
2049                 ps, err := cl.dHT.Announce(string(t.InfoHash[:]), cl.incomingPeerPort(), impliedPort)
2050                 if err != nil {
2051                         log.Printf("error getting peers from dht: %s", err)
2052                         return
2053                 }
2054                 // Count all the unique addresses we got during this announce.
2055                 allAddrs := make(map[string]struct{})
2056         getPeers:
2057                 for {
2058                         select {
2059                         case v, ok := <-ps.Peers:
2060                                 if !ok {
2061                                         break getPeers
2062                                 }
2063                                 addPeers := make([]Peer, 0, len(v.Peers))
2064                                 for _, cp := range v.Peers {
2065                                         if cp.Port == 0 {
2066                                                 // Can't do anything with this.
2067                                                 continue
2068                                         }
2069                                         addPeers = append(addPeers, Peer{
2070                                                 IP:     cp.IP[:],
2071                                                 Port:   int(cp.Port),
2072                                                 Source: peerSourceDHT,
2073                                         })
2074                                         key := (&net.UDPAddr{
2075                                                 IP:   cp.IP[:],
2076                                                 Port: int(cp.Port),
2077                                         }).String()
2078                                         allAddrs[key] = struct{}{}
2079                                 }
2080                                 cl.mu.Lock()
2081                                 cl.addPeers(t, addPeers)
2082                                 numPeers := len(t.peers)
2083                                 cl.mu.Unlock()
2084                                 if numPeers >= torrentPeersHighWater {
2085                                         break getPeers
2086                                 }
2087                         case <-t.ceasingNetworking:
2088                                 ps.Close()
2089                                 return
2090                         }
2091                 }
2092                 ps.Close()
2093                 // log.Printf("finished DHT peer scrape for %s: %d peers", t, len(allAddrs))
2094         }
2095 }
2096
2097 func (cl *Client) trackerBlockedUnlocked(trRawURL string) (blocked bool, err error) {
2098         url_, err := url.Parse(trRawURL)
2099         if err != nil {
2100                 return
2101         }
2102         host, _, err := net.SplitHostPort(url_.Host)
2103         if err != nil {
2104                 host = url_.Host
2105         }
2106         addr, err := net.ResolveIPAddr("ip", host)
2107         if err != nil {
2108                 return
2109         }
2110         cl.mu.RLock()
2111         _, blocked = cl.ipBlockRange(addr.IP)
2112         cl.mu.RUnlock()
2113         return
2114 }
2115
2116 func (cl *Client) announceTorrentSingleTracker(tr string, req *tracker.AnnounceRequest, t *torrent) error {
2117         blocked, err := cl.trackerBlockedUnlocked(tr)
2118         if err != nil {
2119                 return fmt.Errorf("error determining if tracker blocked: %s", err)
2120         }
2121         if blocked {
2122                 return fmt.Errorf("tracker blocked: %s", tr)
2123         }
2124         resp, err := tracker.Announce(tr, req)
2125         if err != nil {
2126                 return fmt.Errorf("error announcing: %s", err)
2127         }
2128         var peers []Peer
2129         for _, peer := range resp.Peers {
2130                 peers = append(peers, Peer{
2131                         IP:   peer.IP,
2132                         Port: peer.Port,
2133                 })
2134         }
2135         cl.mu.Lock()
2136         cl.addPeers(t, peers)
2137         cl.mu.Unlock()
2138
2139         // log.Printf("%s: %d new peers from %s", t, len(peers), tr)
2140
2141         time.Sleep(time.Second * time.Duration(resp.Interval))
2142         return nil
2143 }
2144
2145 func (cl *Client) announceTorrentTrackersFastStart(req *tracker.AnnounceRequest, trackers []trackerTier, t *torrent) (atLeastOne bool) {
2146         oks := make(chan bool)
2147         outstanding := 0
2148         for _, tier := range trackers {
2149                 for _, tr := range tier {
2150                         outstanding++
2151                         go func(tr string) {
2152                                 err := cl.announceTorrentSingleTracker(tr, req, t)
2153                                 oks <- err == nil
2154                         }(tr)
2155                 }
2156         }
2157         for outstanding > 0 {
2158                 ok := <-oks
2159                 outstanding--
2160                 if ok {
2161                         atLeastOne = true
2162                 }
2163         }
2164         return
2165 }
2166
2167 // Announce torrent to its trackers.
2168 func (cl *Client) announceTorrentTrackers(t *torrent) {
2169         req := tracker.AnnounceRequest{
2170                 Event:    tracker.Started,
2171                 NumWant:  -1,
2172                 Port:     uint16(cl.incomingPeerPort()),
2173                 PeerId:   cl.peerID,
2174                 InfoHash: t.InfoHash,
2175         }
2176         if !cl.waitWantPeers(t) {
2177                 return
2178         }
2179         cl.mu.RLock()
2180         req.Left = t.bytesLeftAnnounce()
2181         trackers := t.trackers
2182         cl.mu.RUnlock()
2183         if cl.announceTorrentTrackersFastStart(&req, trackers, t) {
2184                 req.Event = tracker.None
2185         }
2186 newAnnounce:
2187         for cl.waitWantPeers(t) {
2188                 cl.mu.RLock()
2189                 req.Left = t.bytesLeftAnnounce()
2190                 trackers = t.trackers
2191                 cl.mu.RUnlock()
2192                 numTrackersTried := 0
2193                 for _, tier := range trackers {
2194                         for trIndex, tr := range tier {
2195                                 numTrackersTried++
2196                                 err := cl.announceTorrentSingleTracker(tr, &req, t)
2197                                 if err != nil {
2198                                         continue
2199                                 }
2200                                 // Float the successful announce to the top of the tier. If
2201                                 // the trackers list has been changed, we'll be modifying an
2202                                 // old copy so it won't matter.
2203                                 cl.mu.Lock()
2204                                 tier[0], tier[trIndex] = tier[trIndex], tier[0]
2205                                 cl.mu.Unlock()
2206
2207                                 req.Event = tracker.None
2208                                 continue newAnnounce
2209                         }
2210                 }
2211                 if numTrackersTried != 0 {
2212                         log.Printf("%s: all trackers failed", t)
2213                 }
2214                 // TODO: Wait until trackers are added if there are none.
2215                 time.Sleep(10 * time.Second)
2216         }
2217 }
2218
2219 func (cl *Client) allTorrentsCompleted() bool {
2220         for _, t := range cl.torrents {
2221                 if !t.haveInfo() {
2222                         return false
2223                 }
2224                 if t.numPiecesCompleted() != t.numPieces() {
2225                         return false
2226                 }
2227         }
2228         return true
2229 }
2230
2231 // Returns true when all torrents are completely downloaded and false if the
2232 // client is stopped before that.
2233 func (me *Client) WaitAll() bool {
2234         me.mu.Lock()
2235         defer me.mu.Unlock()
2236         for !me.allTorrentsCompleted() {
2237                 if me.closed.IsSet() {
2238                         return false
2239                 }
2240                 me.event.Wait()
2241         }
2242         return true
2243 }
2244
2245 // Handle a received chunk from a peer.
2246 func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) {
2247         chunksReceived.Add(1)
2248
2249         req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
2250
2251         // Request has been satisfied.
2252         if me.connDeleteRequest(t, c, req) {
2253                 defer c.updateRequests()
2254         } else {
2255                 unexpectedChunksReceived.Add(1)
2256         }
2257
2258         index := int(req.Index)
2259         piece := &t.pieces[index]
2260
2261         // Do we actually want this chunk?
2262         if !t.wantChunk(req) {
2263                 unwantedChunksReceived.Add(1)
2264                 c.UnwantedChunksReceived++
2265                 return
2266         }
2267
2268         c.UsefulChunksReceived++
2269         c.lastUsefulChunkReceived = time.Now()
2270
2271         me.upload(t, c)
2272
2273         // Need to record that it hasn't been written yet, before we attempt to do
2274         // anything with it.
2275         piece.incrementPendingWrites()
2276         // Record that we have the chunk.
2277         piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
2278
2279         // Cancel pending requests for this chunk.
2280         for _, c := range t.conns {
2281                 if me.connCancel(t, c, req) {
2282                         c.updateRequests()
2283                 }
2284         }
2285
2286         me.mu.Unlock()
2287         // Write the chunk out.
2288         err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
2289         me.mu.Lock()
2290
2291         piece.decrementPendingWrites()
2292
2293         if err != nil {
2294                 log.Printf("%s: error writing chunk %v: %s", t, req, err)
2295                 t.pendRequest(req)
2296                 t.updatePieceCompletion(int(msg.Index))
2297                 return
2298         }
2299
2300         // It's important that the piece is potentially queued before we check if
2301         // the piece is still wanted, because if it is queued, it won't be wanted.
2302         if t.pieceAllDirty(index) {
2303                 me.queuePieceCheck(t, int(req.Index))
2304         }
2305
2306         if c.peerTouchedPieces == nil {
2307                 c.peerTouchedPieces = make(map[int]struct{})
2308         }
2309         c.peerTouchedPieces[index] = struct{}{}
2310
2311         me.event.Broadcast()
2312         t.publishPieceChange(int(req.Index))
2313         return
2314 }
2315
2316 // Return the connections that touched a piece, and clear the entry while
2317 // doing it.
2318 func (me *Client) reapPieceTouches(t *torrent, piece int) (ret []*connection) {
2319         for _, c := range t.conns {
2320                 if _, ok := c.peerTouchedPieces[piece]; ok {
2321                         ret = append(ret, c)
2322                         delete(c.peerTouchedPieces, piece)
2323                 }
2324         }
2325         return
2326 }
2327
2328 func (me *Client) pieceHashed(t *torrent, piece int, correct bool) {
2329         p := &t.pieces[piece]
2330         if p.EverHashed {
2331                 // Don't score the first time a piece is hashed, it could be an
2332                 // initial check.
2333                 if correct {
2334                         pieceHashedCorrect.Add(1)
2335                 } else {
2336                         log.Printf("%s: piece %d (%x) failed hash", t, piece, p.Hash)
2337                         pieceHashedNotCorrect.Add(1)
2338                 }
2339         }
2340         p.EverHashed = true
2341         touchers := me.reapPieceTouches(t, int(piece))
2342         if correct {
2343                 err := p.Storage().MarkComplete()
2344                 if err != nil {
2345                         log.Printf("%T: error completing piece %d: %s", t.storage, piece, err)
2346                 }
2347                 t.updatePieceCompletion(piece)
2348         } else if len(touchers) != 0 {
2349                 log.Printf("dropping %d conns that touched piece", len(touchers))
2350                 for _, c := range touchers {
2351                         me.dropConnection(t, c)
2352                 }
2353         }
2354         me.pieceChanged(t, int(piece))
2355 }
2356
2357 func (me *Client) onCompletedPiece(t *torrent, piece int) {
2358         t.pendingPieces.Remove(piece)
2359         t.pendAllChunkSpecs(piece)
2360         for _, conn := range t.conns {
2361                 conn.Have(piece)
2362                 for r := range conn.Requests {
2363                         if int(r.Index) == piece {
2364                                 conn.Cancel(r)
2365                         }
2366                 }
2367                 // Could check here if peer doesn't have piece, but due to caching
2368                 // some peers may have said they have a piece but they don't.
2369                 me.upload(t, conn)
2370         }
2371 }
2372
2373 func (me *Client) onFailedPiece(t *torrent, piece int) {
2374         if t.pieceAllDirty(piece) {
2375                 t.pendAllChunkSpecs(piece)
2376         }
2377         if !t.wantPiece(piece) {
2378                 return
2379         }
2380         me.openNewConns(t)
2381         for _, conn := range t.conns {
2382                 if conn.PeerHasPiece(piece) {
2383                         conn.updateRequests()
2384                 }
2385         }
2386 }
2387
2388 func (me *Client) pieceChanged(t *torrent, piece int) {
2389         correct := t.pieceComplete(piece)
2390         defer me.event.Broadcast()
2391         if correct {
2392                 me.onCompletedPiece(t, piece)
2393         } else {
2394                 me.onFailedPiece(t, piece)
2395         }
2396         if t.updatePiecePriority(piece) {
2397                 t.piecePriorityChanged(piece)
2398         }
2399         t.publishPieceChange(piece)
2400 }
2401
2402 func (cl *Client) verifyPiece(t *torrent, piece int) {
2403         cl.mu.Lock()
2404         defer cl.mu.Unlock()
2405         p := &t.pieces[piece]
2406         for p.Hashing || t.storage == nil {
2407                 cl.event.Wait()
2408         }
2409         p.QueuedForHash = false
2410         if t.isClosed() || t.pieceComplete(piece) {
2411                 t.updatePiecePriority(piece)
2412                 t.publishPieceChange(piece)
2413                 return
2414         }
2415         p.Hashing = true
2416         t.publishPieceChange(piece)
2417         cl.mu.Unlock()
2418         sum := t.hashPiece(piece)
2419         cl.mu.Lock()
2420         select {
2421         case <-t.closing:
2422                 return
2423         default:
2424         }
2425         p.Hashing = false
2426         cl.pieceHashed(t, piece, sum == p.Hash)
2427 }
2428
2429 // Returns handles to all the torrents loaded in the Client.
2430 func (me *Client) Torrents() (ret []Torrent) {
2431         me.mu.Lock()
2432         for _, t := range me.torrents {
2433                 ret = append(ret, Torrent{me, t})
2434         }
2435         me.mu.Unlock()
2436         return
2437 }
2438
2439 func (me *Client) AddMagnet(uri string) (T Torrent, err error) {
2440         spec, err := TorrentSpecFromMagnetURI(uri)
2441         if err != nil {
2442                 return
2443         }
2444         T, _, err = me.AddTorrentSpec(spec)
2445         return
2446 }
2447
2448 func (me *Client) AddTorrent(mi *metainfo.MetaInfo) (T Torrent, err error) {
2449         T, _, err = me.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
2450         var ss []string
2451         missinggo.CastSlice(&ss, mi.Nodes)
2452         me.AddDHTNodes(ss)
2453         return
2454 }
2455
2456 func (me *Client) AddTorrentFromFile(filename string) (T Torrent, err error) {
2457         mi, err := metainfo.LoadFromFile(filename)
2458         if err != nil {
2459                 return
2460         }
2461         return me.AddTorrent(mi)
2462 }
2463
2464 func (me *Client) DHT() *dht.Server {
2465         return me.dHT
2466 }
2467
2468 func (me *Client) AddDHTNodes(nodes []string) {
2469         for _, n := range nodes {
2470                 hmp := missinggo.SplitHostMaybePort(n)
2471                 ip := net.ParseIP(hmp.Host)
2472                 if ip == nil {
2473                         log.Printf("won't add DHT node with bad IP: %q", hmp.Host)
2474                         continue
2475                 }
2476                 ni := dht.NodeInfo{
2477                         Addr: dht.NewAddr(&net.UDPAddr{
2478                                 IP:   ip,
2479                                 Port: hmp.Port,
2480                         }),
2481                 }
2482                 me.DHT().AddNode(ni)
2483         }
2484 }