]> Sergey Matveev's repositories - btrtrc.git/blob - client.go
Remove deadcode
[btrtrc.git] / client.go
1 package torrent
2
3 import (
4         "bufio"
5         "bytes"
6         "crypto/rand"
7         "crypto/sha1"
8         "encoding/hex"
9         "errors"
10         "expvar"
11         "fmt"
12         "io"
13         "log"
14         "math/big"
15         mathRand "math/rand"
16         "net"
17         "net/url"
18         "os"
19         "path/filepath"
20         "sort"
21         "strconv"
22         "strings"
23         "time"
24
25         "github.com/anacrolix/missinggo"
26         "github.com/anacrolix/missinggo/bitmap"
27         "github.com/anacrolix/missinggo/pproffd"
28         "github.com/anacrolix/missinggo/pubsub"
29         "github.com/anacrolix/sync"
30         "github.com/anacrolix/utp"
31
32         "github.com/anacrolix/torrent/bencode"
33         "github.com/anacrolix/torrent/dht"
34         "github.com/anacrolix/torrent/iplist"
35         "github.com/anacrolix/torrent/metainfo"
36         "github.com/anacrolix/torrent/mse"
37         pp "github.com/anacrolix/torrent/peer_protocol"
38         "github.com/anacrolix/torrent/storage"
39         "github.com/anacrolix/torrent/tracker"
40 )
41
42 // I could move a lot of these counters to their own file, but I suspect they
43 // may be attached to a Client someday.
44 var (
45         unwantedChunksReceived   = expvar.NewInt("chunksReceivedUnwanted")
46         unexpectedChunksReceived = expvar.NewInt("chunksReceivedUnexpected")
47         chunksReceived           = expvar.NewInt("chunksReceived")
48
49         peersAddedBySource = expvar.NewMap("peersAddedBySource")
50
51         uploadChunksPosted    = expvar.NewInt("uploadChunksPosted")
52         unexpectedCancels     = expvar.NewInt("unexpectedCancels")
53         postedCancels         = expvar.NewInt("postedCancels")
54         duplicateConnsAvoided = expvar.NewInt("duplicateConnsAvoided")
55
56         pieceHashedCorrect    = expvar.NewInt("pieceHashedCorrect")
57         pieceHashedNotCorrect = expvar.NewInt("pieceHashedNotCorrect")
58
59         unsuccessfulDials = expvar.NewInt("dialSuccessful")
60         successfulDials   = expvar.NewInt("dialUnsuccessful")
61
62         acceptUTP    = expvar.NewInt("acceptUTP")
63         acceptTCP    = expvar.NewInt("acceptTCP")
64         acceptReject = expvar.NewInt("acceptReject")
65
66         peerExtensions                    = expvar.NewMap("peerExtensions")
67         completedHandshakeConnectionFlags = expvar.NewMap("completedHandshakeConnectionFlags")
68         // Count of connections to peer with same client ID.
69         connsToSelf = expvar.NewInt("connsToSelf")
70         // Number of completed connections to a client we're already connected with.
71         duplicateClientConns       = expvar.NewInt("duplicateClientConns")
72         receivedMessageTypes       = expvar.NewMap("receivedMessageTypes")
73         receivedKeepalives         = expvar.NewInt("receivedKeepalives")
74         supportedExtensionMessages = expvar.NewMap("supportedExtensionMessages")
75         postedMessageTypes         = expvar.NewMap("postedMessageTypes")
76         postedKeepalives           = expvar.NewInt("postedKeepalives")
77         // Requests received for pieces we don't have.
78         requestsReceivedForMissingPieces = expvar.NewInt("requestsReceivedForMissingPieces")
79 )
80
81 const (
82         // Justification for set bits follows.
83         //
84         // Extension protocol ([5]|=0x10):
85         // http://www.bittorrent.org/beps/bep_0010.html
86         //
87         // Fast Extension ([7]|=0x04):
88         // http://bittorrent.org/beps/bep_0006.html.
89         // Disabled until AllowedFast is implemented.
90         //
91         // DHT ([7]|=1):
92         // http://www.bittorrent.org/beps/bep_0005.html
93         defaultExtensionBytes = "\x00\x00\x00\x00\x00\x10\x00\x01"
94
95         socketsPerTorrent     = 80
96         torrentPeersHighWater = 200
97         torrentPeersLowWater  = 50
98
99         // Limit how long handshake can take. This is to reduce the lingering
100         // impact of a few bad apples. 4s loses 1% of successful handshakes that
101         // are obtained with 60s timeout, and 5% of unsuccessful handshakes.
102         handshakesTimeout = 20 * time.Second
103
104         // These are our extended message IDs.
105         metadataExtendedId = iota + 1 // 0 is reserved for deleting keys
106         pexExtendedId
107
108         // Updated occasionally to when there's been some changes to client
109         // behaviour in case other clients are assuming anything of us. See also
110         // `bep20`.
111         extendedHandshakeClientVersion = "go.torrent dev 20150624"
112 )
113
114 // Currently doesn't really queue, but should in the future.
115 func (cl *Client) queuePieceCheck(t *Torrent, pieceIndex int) {
116         piece := &t.pieces[pieceIndex]
117         if piece.QueuedForHash {
118                 return
119         }
120         piece.QueuedForHash = true
121         t.publishPieceChange(int(pieceIndex))
122         go cl.verifyPiece(t, int(pieceIndex))
123 }
124
125 // Queue a piece check if one isn't already queued, and the piece has never
126 // been checked before.
127 func (cl *Client) queueFirstHash(t *Torrent, piece int) {
128         p := &t.pieces[piece]
129         if p.EverHashed || p.Hashing || p.QueuedForHash || t.pieceComplete(piece) {
130                 return
131         }
132         cl.queuePieceCheck(t, piece)
133 }
134
135 // Clients contain zero or more Torrents. A client manages a blocklist, the
136 // TCP/UDP protocol ports, and DHT as desired.
137 type Client struct {
138         halfOpenLimit  int
139         peerID         [20]byte
140         listeners      []net.Listener
141         utpSock        *utp.Socket
142         dHT            *dht.Server
143         ipBlockList    iplist.Ranger
144         bannedTorrents map[metainfo.Hash]struct{}
145         config         Config
146         pruneTimer     *time.Timer
147         extensionBytes peerExtensionBytes
148         // Set of addresses that have our client ID. This intentionally will
149         // include ourselves if we end up trying to connect to our own address
150         // through legitimate channels.
151         dopplegangerAddrs map[string]struct{}
152
153         defaultStorage storage.I
154
155         mu     sync.RWMutex
156         event  sync.Cond
157         closed missinggo.Event
158
159         torrents map[metainfo.Hash]*Torrent
160 }
161
162 func (me *Client) IPBlockList() iplist.Ranger {
163         me.mu.Lock()
164         defer me.mu.Unlock()
165         return me.ipBlockList
166 }
167
168 func (me *Client) SetIPBlockList(list iplist.Ranger) {
169         me.mu.Lock()
170         defer me.mu.Unlock()
171         me.ipBlockList = list
172         if me.dHT != nil {
173                 me.dHT.SetIPBlockList(list)
174         }
175 }
176
177 func (me *Client) PeerID() string {
178         return string(me.peerID[:])
179 }
180
181 func (me *Client) ListenAddr() (addr net.Addr) {
182         for _, l := range me.listeners {
183                 addr = l.Addr()
184                 break
185         }
186         return
187 }
188
189 type hashSorter struct {
190         Hashes []metainfo.Hash
191 }
192
193 func (me hashSorter) Len() int {
194         return len(me.Hashes)
195 }
196
197 func (me hashSorter) Less(a, b int) bool {
198         return (&big.Int{}).SetBytes(me.Hashes[a][:]).Cmp((&big.Int{}).SetBytes(me.Hashes[b][:])) < 0
199 }
200
201 func (me hashSorter) Swap(a, b int) {
202         me.Hashes[a], me.Hashes[b] = me.Hashes[b], me.Hashes[a]
203 }
204
205 func (cl *Client) sortedTorrents() (ret []*Torrent) {
206         var hs hashSorter
207         for ih := range cl.torrents {
208                 hs.Hashes = append(hs.Hashes, ih)
209         }
210         sort.Sort(hs)
211         for _, ih := range hs.Hashes {
212                 ret = append(ret, cl.torrent(ih))
213         }
214         return
215 }
216
217 // Writes out a human readable status of the client, such as for writing to a
218 // HTTP status page.
219 func (cl *Client) WriteStatus(_w io.Writer) {
220         cl.mu.RLock()
221         defer cl.mu.RUnlock()
222         w := bufio.NewWriter(_w)
223         defer w.Flush()
224         if addr := cl.ListenAddr(); addr != nil {
225                 fmt.Fprintf(w, "Listening on %s\n", cl.ListenAddr())
226         } else {
227                 fmt.Fprintln(w, "Not listening!")
228         }
229         fmt.Fprintf(w, "Peer ID: %+q\n", cl.peerID)
230         if cl.dHT != nil {
231                 dhtStats := cl.dHT.Stats()
232                 fmt.Fprintf(w, "DHT nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
233                 fmt.Fprintf(w, "DHT Server ID: %x\n", cl.dHT.ID())
234                 fmt.Fprintf(w, "DHT port: %d\n", missinggo.AddrPort(cl.dHT.Addr()))
235                 fmt.Fprintf(w, "DHT announces: %d\n", dhtStats.ConfirmedAnnounces)
236                 fmt.Fprintf(w, "Outstanding transactions: %d\n", dhtStats.OutstandingTransactions)
237         }
238         fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrents))
239         fmt.Fprintln(w)
240         for _, t := range cl.sortedTorrents() {
241                 if t.name() == "" {
242                         fmt.Fprint(w, "<unknown name>")
243                 } else {
244                         fmt.Fprint(w, t.name())
245                 }
246                 fmt.Fprint(w, "\n")
247                 if t.haveInfo() {
248                         fmt.Fprintf(w, "%f%% of %d bytes", 100*(1-float64(t.bytesLeft())/float64(t.length)), t.length)
249                 } else {
250                         w.WriteString("<missing metainfo>")
251                 }
252                 fmt.Fprint(w, "\n")
253                 t.writeStatus(w, cl)
254                 fmt.Fprintln(w)
255         }
256 }
257
258 func (cl *Client) configDir() string {
259         if cl.config.ConfigDir == "" {
260                 return filepath.Join(os.Getenv("HOME"), ".config/torrent")
261         }
262         return cl.config.ConfigDir
263 }
264
265 // The directory where the Client expects to find and store configuration
266 // data. Defaults to $HOME/.config/torrent.
267 func (cl *Client) ConfigDir() string {
268         return cl.configDir()
269 }
270
271 // Creates a new client.
272 func NewClient(cfg *Config) (cl *Client, err error) {
273         if cfg == nil {
274                 cfg = &Config{}
275         }
276
277         defer func() {
278                 if err != nil {
279                         cl = nil
280                 }
281         }()
282         cl = &Client{
283                 halfOpenLimit:     socketsPerTorrent,
284                 config:            *cfg,
285                 defaultStorage:    cfg.DefaultStorage,
286                 dopplegangerAddrs: make(map[string]struct{}),
287                 torrents:          make(map[metainfo.Hash]*Torrent),
288         }
289         missinggo.CopyExact(&cl.extensionBytes, defaultExtensionBytes)
290         cl.event.L = &cl.mu
291         if cl.defaultStorage == nil {
292                 cl.defaultStorage = storage.NewFile(cfg.DataDir)
293         }
294         if cfg.IPBlocklist != nil {
295                 cl.ipBlockList = cfg.IPBlocklist
296         }
297
298         if cfg.PeerID != "" {
299                 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
300         } else {
301                 o := copy(cl.peerID[:], bep20)
302                 _, err = rand.Read(cl.peerID[o:])
303                 if err != nil {
304                         panic("error generating peer id")
305                 }
306         }
307
308         // Returns the laddr string to listen on for the next Listen call.
309         listenAddr := func() string {
310                 if addr := cl.ListenAddr(); addr != nil {
311                         return addr.String()
312                 }
313                 if cfg.ListenAddr == "" {
314                         return ":50007"
315                 }
316                 return cfg.ListenAddr
317         }
318         if !cl.config.DisableTCP {
319                 var l net.Listener
320                 l, err = net.Listen(func() string {
321                         if cl.config.DisableIPv6 {
322                                 return "tcp4"
323                         } else {
324                                 return "tcp"
325                         }
326                 }(), listenAddr())
327                 if err != nil {
328                         return
329                 }
330                 cl.listeners = append(cl.listeners, l)
331                 go cl.acceptConnections(l, false)
332         }
333         if !cl.config.DisableUTP {
334                 cl.utpSock, err = utp.NewSocket(func() string {
335                         if cl.config.DisableIPv6 {
336                                 return "udp4"
337                         } else {
338                                 return "udp"
339                         }
340                 }(), listenAddr())
341                 if err != nil {
342                         return
343                 }
344                 cl.listeners = append(cl.listeners, cl.utpSock)
345                 go cl.acceptConnections(cl.utpSock, true)
346         }
347         if !cfg.NoDHT {
348                 dhtCfg := cfg.DHTConfig
349                 if dhtCfg.IPBlocklist == nil {
350                         dhtCfg.IPBlocklist = cl.ipBlockList
351                 }
352                 if dhtCfg.Addr == "" {
353                         dhtCfg.Addr = listenAddr()
354                 }
355                 if dhtCfg.Conn == nil && cl.utpSock != nil {
356                         dhtCfg.Conn = cl.utpSock
357                 }
358                 cl.dHT, err = dht.NewServer(&dhtCfg)
359                 if err != nil {
360                         return
361                 }
362         }
363
364         return
365 }
366
367 // Stops the client. All connections to peers are closed and all activity will
368 // come to a halt.
369 func (me *Client) Close() {
370         me.mu.Lock()
371         defer me.mu.Unlock()
372         me.closed.Set()
373         if me.dHT != nil {
374                 me.dHT.Close()
375         }
376         for _, l := range me.listeners {
377                 l.Close()
378         }
379         for _, t := range me.torrents {
380                 t.close()
381         }
382         me.event.Broadcast()
383 }
384
385 var ipv6BlockRange = iplist.Range{Description: "non-IPv4 address"}
386
387 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
388         if cl.ipBlockList == nil {
389                 return
390         }
391         ip4 := ip.To4()
392         // If blocklists are enabled, then block non-IPv4 addresses, because
393         // blocklists do not yet support IPv6.
394         if ip4 == nil {
395                 if missinggo.CryHeard() {
396                         log.Printf("blocking non-IPv4 address: %s", ip)
397                 }
398                 r = ipv6BlockRange
399                 blocked = true
400                 return
401         }
402         return cl.ipBlockList.Lookup(ip4)
403 }
404
405 func (cl *Client) waitAccept() {
406         cl.mu.Lock()
407         defer cl.mu.Unlock()
408         for {
409                 for _, t := range cl.torrents {
410                         if cl.wantConns(t) {
411                                 return
412                         }
413                 }
414                 if cl.closed.IsSet() {
415                         return
416                 }
417                 cl.event.Wait()
418         }
419 }
420
421 func (cl *Client) acceptConnections(l net.Listener, utp bool) {
422         for {
423                 cl.waitAccept()
424                 conn, err := l.Accept()
425                 conn = pproffd.WrapNetConn(conn)
426                 if cl.closed.IsSet() {
427                         if conn != nil {
428                                 conn.Close()
429                         }
430                         return
431                 }
432                 if err != nil {
433                         log.Print(err)
434                         // I think something harsher should happen here? Our accept
435                         // routine just fucked off.
436                         return
437                 }
438                 if utp {
439                         acceptUTP.Add(1)
440                 } else {
441                         acceptTCP.Add(1)
442                 }
443                 cl.mu.RLock()
444                 doppleganger := cl.dopplegangerAddr(conn.RemoteAddr().String())
445                 _, blocked := cl.ipBlockRange(missinggo.AddrIP(conn.RemoteAddr()))
446                 cl.mu.RUnlock()
447                 if blocked || doppleganger {
448                         acceptReject.Add(1)
449                         // log.Printf("inbound connection from %s blocked by %s", conn.RemoteAddr(), blockRange)
450                         conn.Close()
451                         continue
452                 }
453                 go cl.incomingConnection(conn, utp)
454         }
455 }
456
457 func (cl *Client) incomingConnection(nc net.Conn, utp bool) {
458         defer nc.Close()
459         if tc, ok := nc.(*net.TCPConn); ok {
460                 tc.SetLinger(0)
461         }
462         c := newConnection()
463         c.conn = nc
464         c.rw = nc
465         c.Discovery = peerSourceIncoming
466         c.uTP = utp
467         err := cl.runReceivedConn(c)
468         if err != nil {
469                 // log.Print(err)
470         }
471 }
472
473 // Returns a handle to the given torrent, if it's present in the client.
474 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
475         cl.mu.Lock()
476         defer cl.mu.Unlock()
477         t, ok = cl.torrents[ih]
478         return
479 }
480
481 func (me *Client) torrent(ih metainfo.Hash) *Torrent {
482         return me.torrents[ih]
483 }
484
485 type dialResult struct {
486         Conn net.Conn
487         UTP  bool
488 }
489
490 func doDial(dial func(addr string, t *Torrent) (net.Conn, error), ch chan dialResult, utp bool, addr string, t *Torrent) {
491         conn, err := dial(addr, t)
492         if err != nil {
493                 if conn != nil {
494                         conn.Close()
495                 }
496                 conn = nil // Pedantic
497         }
498         ch <- dialResult{conn, utp}
499         if err == nil {
500                 successfulDials.Add(1)
501                 return
502         }
503         unsuccessfulDials.Add(1)
504 }
505
506 func reducedDialTimeout(max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
507         ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
508         if ret < minDialTimeout {
509                 ret = minDialTimeout
510         }
511         return
512 }
513
514 // Returns whether an address is known to connect to a client with our own ID.
515 func (me *Client) dopplegangerAddr(addr string) bool {
516         _, ok := me.dopplegangerAddrs[addr]
517         return ok
518 }
519
520 // Start the process of connecting to the given peer for the given torrent if
521 // appropriate.
522 func (me *Client) initiateConn(peer Peer, t *Torrent) {
523         if peer.Id == me.peerID {
524                 return
525         }
526         addr := net.JoinHostPort(peer.IP.String(), fmt.Sprintf("%d", peer.Port))
527         if me.dopplegangerAddr(addr) || t.addrActive(addr) {
528                 duplicateConnsAvoided.Add(1)
529                 return
530         }
531         if r, ok := me.ipBlockRange(peer.IP); ok {
532                 log.Printf("outbound connect to %s blocked by IP blocklist rule %s", peer.IP, r)
533                 return
534         }
535         t.halfOpen[addr] = struct{}{}
536         go me.outgoingConnection(t, addr, peer.Source)
537 }
538
539 func (me *Client) dialTimeout(t *Torrent) time.Duration {
540         me.mu.Lock()
541         pendingPeers := len(t.peers)
542         me.mu.Unlock()
543         return reducedDialTimeout(nominalDialTimeout, me.halfOpenLimit, pendingPeers)
544 }
545
546 func (me *Client) dialTCP(addr string, t *Torrent) (c net.Conn, err error) {
547         c, err = net.DialTimeout("tcp", addr, me.dialTimeout(t))
548         if err == nil {
549                 c.(*net.TCPConn).SetLinger(0)
550         }
551         return
552 }
553
554 func (me *Client) dialUTP(addr string, t *Torrent) (c net.Conn, err error) {
555         return me.utpSock.DialTimeout(addr, me.dialTimeout(t))
556 }
557
558 // Returns a connection over UTP or TCP, whichever is first to connect.
559 func (me *Client) dialFirst(addr string, t *Torrent) (conn net.Conn, utp bool) {
560         // Initiate connections via TCP and UTP simultaneously. Use the first one
561         // that succeeds.
562         left := 0
563         if !me.config.DisableUTP {
564                 left++
565         }
566         if !me.config.DisableTCP {
567                 left++
568         }
569         resCh := make(chan dialResult, left)
570         if !me.config.DisableUTP {
571                 go doDial(me.dialUTP, resCh, true, addr, t)
572         }
573         if !me.config.DisableTCP {
574                 go doDial(me.dialTCP, resCh, false, addr, t)
575         }
576         var res dialResult
577         // Wait for a successful connection.
578         for ; left > 0 && res.Conn == nil; left-- {
579                 res = <-resCh
580         }
581         if left > 0 {
582                 // There are still incompleted dials.
583                 go func() {
584                         for ; left > 0; left-- {
585                                 conn := (<-resCh).Conn
586                                 if conn != nil {
587                                         conn.Close()
588                                 }
589                         }
590                 }()
591         }
592         conn = res.Conn
593         utp = res.UTP
594         return
595 }
596
597 func (me *Client) noLongerHalfOpen(t *Torrent, addr string) {
598         if _, ok := t.halfOpen[addr]; !ok {
599                 panic("invariant broken")
600         }
601         delete(t.halfOpen, addr)
602         me.openNewConns(t)
603 }
604
605 // Performs initiator handshakes and returns a connection. Returns nil
606 // *connection if no connection for valid reasons.
607 func (me *Client) handshakesConnection(nc net.Conn, t *Torrent, encrypted, utp bool) (c *connection, err error) {
608         c = newConnection()
609         c.conn = nc
610         c.rw = nc
611         c.encrypted = encrypted
612         c.uTP = utp
613         err = nc.SetDeadline(time.Now().Add(handshakesTimeout))
614         if err != nil {
615                 return
616         }
617         ok, err := me.initiateHandshakes(c, t)
618         if !ok {
619                 c = nil
620         }
621         return
622 }
623
624 // Returns nil connection and nil error if no connection could be established
625 // for valid reasons.
626 func (me *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) {
627         nc, utp := me.dialFirst(addr, t)
628         if nc == nil {
629                 return
630         }
631         c, err = me.handshakesConnection(nc, t, !me.config.DisableEncryption, utp)
632         if err != nil {
633                 nc.Close()
634                 return
635         } else if c != nil {
636                 return
637         }
638         nc.Close()
639         if me.config.DisableEncryption {
640                 // We already tried without encryption.
641                 return
642         }
643         // Try again without encryption, using whichever protocol type worked last
644         // time.
645         if utp {
646                 nc, err = me.dialUTP(addr, t)
647         } else {
648                 nc, err = me.dialTCP(addr, t)
649         }
650         if err != nil {
651                 err = fmt.Errorf("error dialing for unencrypted connection: %s", err)
652                 return
653         }
654         c, err = me.handshakesConnection(nc, t, false, utp)
655         if err != nil || c == nil {
656                 nc.Close()
657         }
658         return
659 }
660
661 // Called to dial out and run a connection. The addr we're given is already
662 // considered half-open.
663 func (me *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
664         c, err := me.establishOutgoingConn(t, addr)
665         me.mu.Lock()
666         defer me.mu.Unlock()
667         // Don't release lock between here and addConnection, unless it's for
668         // failure.
669         me.noLongerHalfOpen(t, addr)
670         if err != nil {
671                 if me.config.Debug {
672                         log.Printf("error establishing outgoing connection: %s", err)
673                 }
674                 return
675         }
676         if c == nil {
677                 return
678         }
679         defer c.Close()
680         c.Discovery = ps
681         err = me.runInitiatedHandshookConn(c, t)
682         if err != nil {
683                 if me.config.Debug {
684                         log.Printf("error in established outgoing connection: %s", err)
685                 }
686         }
687 }
688
689 // The port number for incoming peer connections. 0 if the client isn't
690 // listening.
691 func (cl *Client) incomingPeerPort() int {
692         listenAddr := cl.ListenAddr()
693         if listenAddr == nil {
694                 return 0
695         }
696         return missinggo.AddrPort(listenAddr)
697 }
698
699 // Convert a net.Addr to its compact IP representation. Either 4 or 16 bytes
700 // per "yourip" field of http://www.bittorrent.org/beps/bep_0010.html.
701 func addrCompactIP(addr net.Addr) (string, error) {
702         host, _, err := net.SplitHostPort(addr.String())
703         if err != nil {
704                 return "", err
705         }
706         ip := net.ParseIP(host)
707         if v4 := ip.To4(); v4 != nil {
708                 if len(v4) != 4 {
709                         panic(v4)
710                 }
711                 return string(v4), nil
712         }
713         return string(ip.To16()), nil
714 }
715
716 func handshakeWriter(w io.Writer, bb <-chan []byte, done chan<- error) {
717         var err error
718         for b := range bb {
719                 _, err = w.Write(b)
720                 if err != nil {
721                         break
722                 }
723         }
724         done <- err
725 }
726
727 type (
728         peerExtensionBytes [8]byte
729         peerID             [20]byte
730 )
731
732 func (me *peerExtensionBytes) SupportsExtended() bool {
733         return me[5]&0x10 != 0
734 }
735
736 func (me *peerExtensionBytes) SupportsDHT() bool {
737         return me[7]&0x01 != 0
738 }
739
740 func (me *peerExtensionBytes) SupportsFast() bool {
741         return me[7]&0x04 != 0
742 }
743
744 type handshakeResult struct {
745         peerExtensionBytes
746         peerID
747         metainfo.Hash
748 }
749
750 // ih is nil if we expect the peer to declare the InfoHash, such as when the
751 // peer initiated the connection. Returns ok if the handshake was successful,
752 // and err if there was an unexpected condition other than the peer simply
753 // abandoning the handshake.
754 func handshake(sock io.ReadWriter, ih *metainfo.Hash, peerID [20]byte, extensions peerExtensionBytes) (res handshakeResult, ok bool, err error) {
755         // Bytes to be sent to the peer. Should never block the sender.
756         postCh := make(chan []byte, 4)
757         // A single error value sent when the writer completes.
758         writeDone := make(chan error, 1)
759         // Performs writes to the socket and ensures posts don't block.
760         go handshakeWriter(sock, postCh, writeDone)
761
762         defer func() {
763                 close(postCh) // Done writing.
764                 if !ok {
765                         return
766                 }
767                 if err != nil {
768                         panic(err)
769                 }
770                 // Wait until writes complete before returning from handshake.
771                 err = <-writeDone
772                 if err != nil {
773                         err = fmt.Errorf("error writing: %s", err)
774                 }
775         }()
776
777         post := func(bb []byte) {
778                 select {
779                 case postCh <- bb:
780                 default:
781                         panic("mustn't block while posting")
782                 }
783         }
784
785         post([]byte(pp.Protocol))
786         post(extensions[:])
787         if ih != nil { // We already know what we want.
788                 post(ih[:])
789                 post(peerID[:])
790         }
791         var b [68]byte
792         _, err = io.ReadFull(sock, b[:68])
793         if err != nil {
794                 err = nil
795                 return
796         }
797         if string(b[:20]) != pp.Protocol {
798                 return
799         }
800         missinggo.CopyExact(&res.peerExtensionBytes, b[20:28])
801         missinggo.CopyExact(&res.Hash, b[28:48])
802         missinggo.CopyExact(&res.peerID, b[48:68])
803         peerExtensions.Add(hex.EncodeToString(res.peerExtensionBytes[:]), 1)
804
805         // TODO: Maybe we can just drop peers here if we're not interested. This
806         // could prevent them trying to reconnect, falsely believing there was
807         // just a problem.
808         if ih == nil { // We were waiting for the peer to tell us what they wanted.
809                 post(res.Hash[:])
810                 post(peerID[:])
811         }
812
813         ok = true
814         return
815 }
816
817 // Wraps a raw connection and provides the interface we want for using the
818 // connection in the message loop.
819 type deadlineReader struct {
820         nc net.Conn
821         r  io.Reader
822 }
823
824 func (me deadlineReader) Read(b []byte) (n int, err error) {
825         // Keep-alives should be received every 2 mins. Give a bit of gracetime.
826         err = me.nc.SetReadDeadline(time.Now().Add(150 * time.Second))
827         if err != nil {
828                 err = fmt.Errorf("error setting read deadline: %s", err)
829         }
830         n, err = me.r.Read(b)
831         // Convert common errors into io.EOF.
832         // if err != nil {
833         //      if opError, ok := err.(*net.OpError); ok && opError.Op == "read" && opError.Err == syscall.ECONNRESET {
834         //              err = io.EOF
835         //      } else if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
836         //              if n != 0 {
837         //                      panic(n)
838         //              }
839         //              err = io.EOF
840         //      }
841         // }
842         return
843 }
844
845 type readWriter struct {
846         io.Reader
847         io.Writer
848 }
849
850 func maybeReceiveEncryptedHandshake(rw io.ReadWriter, skeys [][]byte) (ret io.ReadWriter, encrypted bool, err error) {
851         var protocol [len(pp.Protocol)]byte
852         _, err = io.ReadFull(rw, protocol[:])
853         if err != nil {
854                 return
855         }
856         ret = readWriter{
857                 io.MultiReader(bytes.NewReader(protocol[:]), rw),
858                 rw,
859         }
860         if string(protocol[:]) == pp.Protocol {
861                 return
862         }
863         encrypted = true
864         ret, err = mse.ReceiveHandshake(ret, skeys)
865         return
866 }
867
868 func (cl *Client) receiveSkeys() (ret [][]byte) {
869         for ih := range cl.torrents {
870                 ret = append(ret, ih[:])
871         }
872         return
873 }
874
875 func (me *Client) initiateHandshakes(c *connection, t *Torrent) (ok bool, err error) {
876         if c.encrypted {
877                 c.rw, err = mse.InitiateHandshake(c.rw, t.infoHash[:], nil)
878                 if err != nil {
879                         return
880                 }
881         }
882         ih, ok, err := me.connBTHandshake(c, &t.infoHash)
883         if ih != t.infoHash {
884                 ok = false
885         }
886         return
887 }
888
889 // Do encryption and bittorrent handshakes as receiver.
890 func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
891         cl.mu.Lock()
892         skeys := cl.receiveSkeys()
893         cl.mu.Unlock()
894         if !cl.config.DisableEncryption {
895                 c.rw, c.encrypted, err = maybeReceiveEncryptedHandshake(c.rw, skeys)
896                 if err != nil {
897                         if err == mse.ErrNoSecretKeyMatch {
898                                 err = nil
899                         }
900                         return
901                 }
902         }
903         ih, ok, err := cl.connBTHandshake(c, nil)
904         if err != nil {
905                 err = fmt.Errorf("error during bt handshake: %s", err)
906                 return
907         }
908         if !ok {
909                 return
910         }
911         cl.mu.Lock()
912         t = cl.torrents[ih]
913         cl.mu.Unlock()
914         return
915 }
916
917 // Returns !ok if handshake failed for valid reasons.
918 func (cl *Client) connBTHandshake(c *connection, ih *metainfo.Hash) (ret metainfo.Hash, ok bool, err error) {
919         res, ok, err := handshake(c.rw, ih, cl.peerID, cl.extensionBytes)
920         if err != nil || !ok {
921                 return
922         }
923         ret = res.Hash
924         c.PeerExtensionBytes = res.peerExtensionBytes
925         c.PeerID = res.peerID
926         c.completedHandshake = time.Now()
927         return
928 }
929
930 func (cl *Client) runInitiatedHandshookConn(c *connection, t *Torrent) (err error) {
931         if c.PeerID == cl.peerID {
932                 // Only if we initiated the connection is the remote address a
933                 // listen addr for a doppleganger.
934                 connsToSelf.Add(1)
935                 addr := c.conn.RemoteAddr().String()
936                 cl.dopplegangerAddrs[addr] = struct{}{}
937                 return
938         }
939         return cl.runHandshookConn(c, t)
940 }
941
942 func (cl *Client) runReceivedConn(c *connection) (err error) {
943         err = c.conn.SetDeadline(time.Now().Add(handshakesTimeout))
944         if err != nil {
945                 return
946         }
947         t, err := cl.receiveHandshakes(c)
948         if err != nil {
949                 err = fmt.Errorf("error receiving handshakes: %s", err)
950                 return
951         }
952         if t == nil {
953                 return
954         }
955         cl.mu.Lock()
956         defer cl.mu.Unlock()
957         if c.PeerID == cl.peerID {
958                 return
959         }
960         return cl.runHandshookConn(c, t)
961 }
962
963 func (cl *Client) runHandshookConn(c *connection, t *Torrent) (err error) {
964         c.conn.SetWriteDeadline(time.Time{})
965         c.rw = readWriter{
966                 deadlineReader{c.conn, c.rw},
967                 c.rw,
968         }
969         completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
970         if !cl.addConnection(t, c) {
971                 return
972         }
973         defer cl.dropConnection(t, c)
974         go c.writer()
975         go c.writeOptimizer(time.Minute)
976         cl.sendInitialMessages(c, t)
977         err = cl.connectionLoop(t, c)
978         if err != nil {
979                 err = fmt.Errorf("error during connection loop: %s", err)
980         }
981         return
982 }
983
984 func (me *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
985         if conn.PeerExtensionBytes.SupportsExtended() && me.extensionBytes.SupportsExtended() {
986                 conn.Post(pp.Message{
987                         Type:       pp.Extended,
988                         ExtendedID: pp.HandshakeExtendedID,
989                         ExtendedPayload: func() []byte {
990                                 d := map[string]interface{}{
991                                         "m": func() (ret map[string]int) {
992                                                 ret = make(map[string]int, 2)
993                                                 ret["ut_metadata"] = metadataExtendedId
994                                                 if !me.config.DisablePEX {
995                                                         ret["ut_pex"] = pexExtendedId
996                                                 }
997                                                 return
998                                         }(),
999                                         "v": extendedHandshakeClientVersion,
1000                                         // No upload queue is implemented yet.
1001                                         "reqq": 64,
1002                                 }
1003                                 if !me.config.DisableEncryption {
1004                                         d["e"] = 1
1005                                 }
1006                                 if torrent.metadataSizeKnown() {
1007                                         d["metadata_size"] = torrent.metadataSize()
1008                                 }
1009                                 if p := me.incomingPeerPort(); p != 0 {
1010                                         d["p"] = p
1011                                 }
1012                                 yourip, err := addrCompactIP(conn.remoteAddr())
1013                                 if err != nil {
1014                                         log.Printf("error calculating yourip field value in extension handshake: %s", err)
1015                                 } else {
1016                                         d["yourip"] = yourip
1017                                 }
1018                                 // log.Printf("sending %v", d)
1019                                 b, err := bencode.Marshal(d)
1020                                 if err != nil {
1021                                         panic(err)
1022                                 }
1023                                 return b
1024                         }(),
1025                 })
1026         }
1027         if torrent.haveAnyPieces() {
1028                 conn.Bitfield(torrent.bitfield())
1029         } else if me.extensionBytes.SupportsFast() && conn.PeerExtensionBytes.SupportsFast() {
1030                 conn.Post(pp.Message{
1031                         Type: pp.HaveNone,
1032                 })
1033         }
1034         if conn.PeerExtensionBytes.SupportsDHT() && me.extensionBytes.SupportsDHT() && me.dHT != nil {
1035                 conn.Post(pp.Message{
1036                         Type: pp.Port,
1037                         Port: uint16(missinggo.AddrPort(me.dHT.Addr())),
1038                 })
1039         }
1040 }
1041
1042 func (me *Client) peerUnchoked(torrent *Torrent, conn *connection) {
1043         conn.updateRequests()
1044 }
1045
1046 func (cl *Client) connCancel(t *Torrent, cn *connection, r request) (ok bool) {
1047         ok = cn.Cancel(r)
1048         if ok {
1049                 postedCancels.Add(1)
1050         }
1051         return
1052 }
1053
1054 func (cl *Client) connDeleteRequest(t *Torrent, cn *connection, r request) bool {
1055         if !cn.RequestPending(r) {
1056                 return false
1057         }
1058         delete(cn.Requests, r)
1059         return true
1060 }
1061
1062 func (cl *Client) requestPendingMetadata(t *Torrent, c *connection) {
1063         if t.haveInfo() {
1064                 return
1065         }
1066         if c.PeerExtensionIDs["ut_metadata"] == 0 {
1067                 // Peer doesn't support this.
1068                 return
1069         }
1070         // Request metadata pieces that we don't have in a random order.
1071         var pending []int
1072         for index := 0; index < t.metadataPieceCount(); index++ {
1073                 if !t.haveMetadataPiece(index) && !c.requestedMetadataPiece(index) {
1074                         pending = append(pending, index)
1075                 }
1076         }
1077         for _, i := range mathRand.Perm(len(pending)) {
1078                 c.requestMetadataPiece(pending[i])
1079         }
1080 }
1081
1082 func (cl *Client) completedMetadata(t *Torrent) {
1083         h := sha1.New()
1084         h.Write(t.metadataBytes)
1085         var ih metainfo.Hash
1086         missinggo.CopyExact(&ih, h.Sum(nil))
1087         if ih != t.infoHash {
1088                 log.Print("bad metadata")
1089                 t.invalidateMetadata()
1090                 return
1091         }
1092         var info metainfo.Info
1093         err := bencode.Unmarshal(t.metadataBytes, &info)
1094         if err != nil {
1095                 log.Printf("error unmarshalling metadata: %s", err)
1096                 t.invalidateMetadata()
1097                 return
1098         }
1099         // TODO(anacrolix): If this fails, I think something harsher should be
1100         // done.
1101         err = cl.setMetaData(t, &info, t.metadataBytes)
1102         if err != nil {
1103                 log.Printf("error setting metadata: %s", err)
1104                 t.invalidateMetadata()
1105                 return
1106         }
1107         if cl.config.Debug {
1108                 log.Printf("%s: got metadata from peers", t)
1109         }
1110 }
1111
1112 // Process incoming ut_metadata message.
1113 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) (err error) {
1114         var d map[string]int
1115         err = bencode.Unmarshal(payload, &d)
1116         if err != nil {
1117                 err = fmt.Errorf("error unmarshalling payload: %s: %q", err, payload)
1118                 return
1119         }
1120         msgType, ok := d["msg_type"]
1121         if !ok {
1122                 err = errors.New("missing msg_type field")
1123                 return
1124         }
1125         piece := d["piece"]
1126         switch msgType {
1127         case pp.DataMetadataExtensionMsgType:
1128                 if t.haveInfo() {
1129                         break
1130                 }
1131                 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
1132                 if begin < 0 || begin >= len(payload) {
1133                         log.Printf("got bad metadata piece")
1134                         break
1135                 }
1136                 if !c.requestedMetadataPiece(piece) {
1137                         log.Printf("got unexpected metadata piece %d", piece)
1138                         break
1139                 }
1140                 c.metadataRequests[piece] = false
1141                 t.saveMetadataPiece(piece, payload[begin:])
1142                 c.UsefulChunksReceived++
1143                 c.lastUsefulChunkReceived = time.Now()
1144                 if !t.haveAllMetadataPieces() {
1145                         break
1146                 }
1147                 cl.completedMetadata(t)
1148         case pp.RequestMetadataExtensionMsgType:
1149                 if !t.haveMetadataPiece(piece) {
1150                         c.Post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
1151                         break
1152                 }
1153                 start := (1 << 14) * piece
1154                 c.Post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1155         case pp.RejectMetadataExtensionMsgType:
1156         default:
1157                 err = errors.New("unknown msg_type value")
1158         }
1159         return
1160 }
1161
1162 func (me *Client) upload(t *Torrent, c *connection) {
1163         if me.config.NoUpload {
1164                 return
1165         }
1166         if !c.PeerInterested {
1167                 return
1168         }
1169         seeding := me.seeding(t)
1170         if !seeding && !t.connHasWantedPieces(c) {
1171                 return
1172         }
1173 another:
1174         for seeding || c.chunksSent < c.UsefulChunksReceived+6 {
1175                 c.Unchoke()
1176                 for r := range c.PeerRequests {
1177                         err := me.sendChunk(t, c, r)
1178                         if err != nil {
1179                                 if t.pieceComplete(int(r.Index)) && err == io.ErrUnexpectedEOF {
1180                                         // We had the piece, but not anymore.
1181                                 } else {
1182                                         log.Printf("error sending chunk %+v to peer: %s", r, err)
1183                                 }
1184                                 // If we failed to send a chunk, choke the peer to ensure they
1185                                 // flush all their requests. We've probably dropped a piece,
1186                                 // but there's no way to communicate this to the peer. If they
1187                                 // ask for it again, we'll kick them to allow us to send them
1188                                 // an updated bitfield.
1189                                 break another
1190                         }
1191                         delete(c.PeerRequests, r)
1192                         goto another
1193                 }
1194                 return
1195         }
1196         c.Choke()
1197 }
1198
1199 func (me *Client) sendChunk(t *Torrent, c *connection, r request) error {
1200         // Count the chunk being sent, even if it isn't.
1201         b := make([]byte, r.Length)
1202         p := t.info.Piece(int(r.Index))
1203         n, err := t.readAt(b, p.Offset()+int64(r.Begin))
1204         if n != len(b) {
1205                 if err == nil {
1206                         panic("expected error")
1207                 }
1208                 return err
1209         }
1210         c.Post(pp.Message{
1211                 Type:  pp.Piece,
1212                 Index: r.Index,
1213                 Begin: r.Begin,
1214                 Piece: b,
1215         })
1216         c.chunksSent++
1217         uploadChunksPosted.Add(1)
1218         c.lastChunkSent = time.Now()
1219         return nil
1220 }
1221
1222 // Processes incoming bittorrent messages. The client lock is held upon entry
1223 // and exit.
1224 func (me *Client) connectionLoop(t *Torrent, c *connection) error {
1225         decoder := pp.Decoder{
1226                 R:         bufio.NewReader(c.rw),
1227                 MaxLength: 256 * 1024,
1228         }
1229         for {
1230                 me.mu.Unlock()
1231                 var msg pp.Message
1232                 err := decoder.Decode(&msg)
1233                 me.mu.Lock()
1234                 if me.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
1235                         return nil
1236                 }
1237                 if err != nil {
1238                         return err
1239                 }
1240                 c.lastMessageReceived = time.Now()
1241                 if msg.Keepalive {
1242                         receivedKeepalives.Add(1)
1243                         continue
1244                 }
1245                 receivedMessageTypes.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
1246                 switch msg.Type {
1247                 case pp.Choke:
1248                         c.PeerChoked = true
1249                         c.Requests = nil
1250                         // We can then reset our interest.
1251                         c.updateRequests()
1252                 case pp.Reject:
1253                         me.connDeleteRequest(t, c, newRequest(msg.Index, msg.Begin, msg.Length))
1254                         c.updateRequests()
1255                 case pp.Unchoke:
1256                         c.PeerChoked = false
1257                         me.peerUnchoked(t, c)
1258                 case pp.Interested:
1259                         c.PeerInterested = true
1260                         me.upload(t, c)
1261                 case pp.NotInterested:
1262                         c.PeerInterested = false
1263                         c.Choke()
1264                 case pp.Have:
1265                         err = c.peerSentHave(int(msg.Index))
1266                 case pp.Request:
1267                         if c.Choked {
1268                                 break
1269                         }
1270                         if !c.PeerInterested {
1271                                 err = errors.New("peer sent request but isn't interested")
1272                                 break
1273                         }
1274                         if !t.havePiece(msg.Index.Int()) {
1275                                 // This isn't necessarily them screwing up. We can drop pieces
1276                                 // from our storage, and can't communicate this to peers
1277                                 // except by reconnecting.
1278                                 requestsReceivedForMissingPieces.Add(1)
1279                                 err = errors.New("peer requested piece we don't have")
1280                                 break
1281                         }
1282                         if c.PeerRequests == nil {
1283                                 c.PeerRequests = make(map[request]struct{}, maxRequests)
1284                         }
1285                         c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{}
1286                         me.upload(t, c)
1287                 case pp.Cancel:
1288                         req := newRequest(msg.Index, msg.Begin, msg.Length)
1289                         if !c.PeerCancel(req) {
1290                                 unexpectedCancels.Add(1)
1291                         }
1292                 case pp.Bitfield:
1293                         err = c.peerSentBitfield(msg.Bitfield)
1294                 case pp.HaveAll:
1295                         err = c.peerSentHaveAll()
1296                 case pp.HaveNone:
1297                         err = c.peerSentHaveNone()
1298                 case pp.Piece:
1299                         me.downloadedChunk(t, c, &msg)
1300                 case pp.Extended:
1301                         switch msg.ExtendedID {
1302                         case pp.HandshakeExtendedID:
1303                                 // TODO: Create a bencode struct for this.
1304                                 var d map[string]interface{}
1305                                 err = bencode.Unmarshal(msg.ExtendedPayload, &d)
1306                                 if err != nil {
1307                                         err = fmt.Errorf("error decoding extended message payload: %s", err)
1308                                         break
1309                                 }
1310                                 // log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d)
1311                                 if reqq, ok := d["reqq"]; ok {
1312                                         if i, ok := reqq.(int64); ok {
1313                                                 c.PeerMaxRequests = int(i)
1314                                         }
1315                                 }
1316                                 if v, ok := d["v"]; ok {
1317                                         c.PeerClientName = v.(string)
1318                                 }
1319                                 m, ok := d["m"]
1320                                 if !ok {
1321                                         err = errors.New("handshake missing m item")
1322                                         break
1323                                 }
1324                                 mTyped, ok := m.(map[string]interface{})
1325                                 if !ok {
1326                                         err = errors.New("handshake m value is not dict")
1327                                         break
1328                                 }
1329                                 if c.PeerExtensionIDs == nil {
1330                                         c.PeerExtensionIDs = make(map[string]byte, len(mTyped))
1331                                 }
1332                                 for name, v := range mTyped {
1333                                         id, ok := v.(int64)
1334                                         if !ok {
1335                                                 log.Printf("bad handshake m item extension ID type: %T", v)
1336                                                 continue
1337                                         }
1338                                         if id == 0 {
1339                                                 delete(c.PeerExtensionIDs, name)
1340                                         } else {
1341                                                 if c.PeerExtensionIDs[name] == 0 {
1342                                                         supportedExtensionMessages.Add(name, 1)
1343                                                 }
1344                                                 c.PeerExtensionIDs[name] = byte(id)
1345                                         }
1346                                 }
1347                                 metadata_sizeUntyped, ok := d["metadata_size"]
1348                                 if ok {
1349                                         metadata_size, ok := metadata_sizeUntyped.(int64)
1350                                         if !ok {
1351                                                 log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
1352                                         } else {
1353                                                 t.setMetadataSize(metadata_size, me)
1354                                         }
1355                                 }
1356                                 if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
1357                                         me.requestPendingMetadata(t, c)
1358                                 }
1359                         case metadataExtendedId:
1360                                 err = me.gotMetadataExtensionMsg(msg.ExtendedPayload, t, c)
1361                                 if err != nil {
1362                                         err = fmt.Errorf("error handling metadata extension message: %s", err)
1363                                 }
1364                         case pexExtendedId:
1365                                 if me.config.DisablePEX {
1366                                         break
1367                                 }
1368                                 var pexMsg peerExchangeMessage
1369                                 err := bencode.Unmarshal(msg.ExtendedPayload, &pexMsg)
1370                                 if err != nil {
1371                                         err = fmt.Errorf("error unmarshalling PEX message: %s", err)
1372                                         break
1373                                 }
1374                                 go func() {
1375                                         me.mu.Lock()
1376                                         me.addPeers(t, func() (ret []Peer) {
1377                                                 for i, cp := range pexMsg.Added {
1378                                                         p := Peer{
1379                                                                 IP:     make([]byte, 4),
1380                                                                 Port:   int(cp.Port),
1381                                                                 Source: peerSourcePEX,
1382                                                         }
1383                                                         if i < len(pexMsg.AddedFlags) && pexMsg.AddedFlags[i]&0x01 != 0 {
1384                                                                 p.SupportsEncryption = true
1385                                                         }
1386                                                         missinggo.CopyExact(p.IP, cp.IP[:])
1387                                                         ret = append(ret, p)
1388                                                 }
1389                                                 return
1390                                         }())
1391                                         me.mu.Unlock()
1392                                 }()
1393                         default:
1394                                 err = fmt.Errorf("unexpected extended message ID: %v", msg.ExtendedID)
1395                         }
1396                         if err != nil {
1397                                 // That client uses its own extension IDs for outgoing message
1398                                 // types, which is incorrect.
1399                                 if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) ||
1400                                         strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
1401                                         return nil
1402                                 }
1403                         }
1404                 case pp.Port:
1405                         if me.dHT == nil {
1406                                 break
1407                         }
1408                         pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
1409                         if err != nil {
1410                                 panic(err)
1411                         }
1412                         if msg.Port != 0 {
1413                                 pingAddr.Port = int(msg.Port)
1414                         }
1415                         _, err = me.dHT.Ping(pingAddr)
1416                 default:
1417                         err = fmt.Errorf("received unknown message type: %#v", msg.Type)
1418                 }
1419                 if err != nil {
1420                         return err
1421                 }
1422         }
1423 }
1424
1425 // Returns true if connection is removed from torrent.Conns.
1426 func (me *Client) deleteConnection(t *Torrent, c *connection) bool {
1427         for i0, _c := range t.conns {
1428                 if _c != c {
1429                         continue
1430                 }
1431                 i1 := len(t.conns) - 1
1432                 if i0 != i1 {
1433                         t.conns[i0] = t.conns[i1]
1434                 }
1435                 t.conns = t.conns[:i1]
1436                 return true
1437         }
1438         return false
1439 }
1440
1441 func (me *Client) dropConnection(t *Torrent, c *connection) {
1442         me.event.Broadcast()
1443         c.Close()
1444         if me.deleteConnection(t, c) {
1445                 me.openNewConns(t)
1446         }
1447 }
1448
1449 // Returns true if the connection is added.
1450 func (me *Client) addConnection(t *Torrent, c *connection) bool {
1451         if me.closed.IsSet() {
1452                 return false
1453         }
1454         select {
1455         case <-t.ceasingNetworking:
1456                 return false
1457         default:
1458         }
1459         if !me.wantConns(t) {
1460                 return false
1461         }
1462         for _, c0 := range t.conns {
1463                 if c.PeerID == c0.PeerID {
1464                         // Already connected to a client with that ID.
1465                         duplicateClientConns.Add(1)
1466                         return false
1467                 }
1468         }
1469         if len(t.conns) >= socketsPerTorrent {
1470                 c := t.worstBadConn(me)
1471                 if c == nil {
1472                         return false
1473                 }
1474                 if me.config.Debug && missinggo.CryHeard() {
1475                         log.Printf("%s: dropping connection to make room for new one:\n    %s", t, c)
1476                 }
1477                 c.Close()
1478                 me.deleteConnection(t, c)
1479         }
1480         if len(t.conns) >= socketsPerTorrent {
1481                 panic(len(t.conns))
1482         }
1483         t.conns = append(t.conns, c)
1484         c.t = t
1485         return true
1486 }
1487
1488 func (t *Torrent) readerPieces() (ret bitmap.Bitmap) {
1489         t.forReaderOffsetPieces(func(begin, end int) bool {
1490                 ret.AddRange(begin, end)
1491                 return true
1492         })
1493         return
1494 }
1495
1496 func (t *Torrent) needData() bool {
1497         if !t.haveInfo() {
1498                 return true
1499         }
1500         if t.pendingPieces.Len() != 0 {
1501                 return true
1502         }
1503         return !t.readerPieces().IterTyped(func(piece int) bool {
1504                 return t.pieceComplete(piece)
1505         })
1506 }
1507
1508 func (cl *Client) usefulConn(t *Torrent, c *connection) bool {
1509         if c.closed.IsSet() {
1510                 return false
1511         }
1512         if !t.haveInfo() {
1513                 return c.supportsExtension("ut_metadata")
1514         }
1515         if cl.seeding(t) {
1516                 return c.PeerInterested
1517         }
1518         return t.connHasWantedPieces(c)
1519 }
1520
1521 func (me *Client) wantConns(t *Torrent) bool {
1522         if !me.seeding(t) && !t.needData() {
1523                 return false
1524         }
1525         if len(t.conns) < socketsPerTorrent {
1526                 return true
1527         }
1528         return t.worstBadConn(me) != nil
1529 }
1530
1531 func (me *Client) openNewConns(t *Torrent) {
1532         select {
1533         case <-t.ceasingNetworking:
1534                 return
1535         default:
1536         }
1537         for len(t.peers) != 0 {
1538                 if !me.wantConns(t) {
1539                         return
1540                 }
1541                 if len(t.halfOpen) >= me.halfOpenLimit {
1542                         return
1543                 }
1544                 var (
1545                         k peersKey
1546                         p Peer
1547                 )
1548                 for k, p = range t.peers {
1549                         break
1550                 }
1551                 delete(t.peers, k)
1552                 me.initiateConn(p, t)
1553         }
1554         t.wantPeers.Broadcast()
1555 }
1556
1557 func (me *Client) addPeers(t *Torrent, peers []Peer) {
1558         for _, p := range peers {
1559                 if me.dopplegangerAddr(net.JoinHostPort(
1560                         p.IP.String(),
1561                         strconv.FormatInt(int64(p.Port), 10),
1562                 )) {
1563                         continue
1564                 }
1565                 if _, ok := me.ipBlockRange(p.IP); ok {
1566                         continue
1567                 }
1568                 if p.Port == 0 {
1569                         // The spec says to scrub these yourselves. Fine.
1570                         continue
1571                 }
1572                 t.addPeer(p, me)
1573         }
1574 }
1575
1576 func (cl *Client) cachedMetaInfoFilename(ih metainfo.Hash) string {
1577         return filepath.Join(cl.configDir(), "torrents", ih.HexString()+".torrent")
1578 }
1579
1580 func (cl *Client) saveTorrentFile(t *Torrent) error {
1581         path := cl.cachedMetaInfoFilename(t.infoHash)
1582         os.MkdirAll(filepath.Dir(path), 0777)
1583         f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
1584         if err != nil {
1585                 return fmt.Errorf("error opening file: %s", err)
1586         }
1587         defer f.Close()
1588         e := bencode.NewEncoder(f)
1589         err = e.Encode(t.metainfo())
1590         if err != nil {
1591                 return fmt.Errorf("error marshalling metainfo: %s", err)
1592         }
1593         mi, err := cl.torrentCacheMetaInfo(t.infoHash)
1594         if err != nil {
1595                 // For example, a script kiddy makes us load too many files, and we're
1596                 // able to save the torrent, but not load it again to check it.
1597                 return nil
1598         }
1599         if !bytes.Equal(mi.Info.Hash.Bytes(), t.infoHash[:]) {
1600                 log.Fatalf("%x != %x", mi.Info.Hash, t.infoHash[:])
1601         }
1602         return nil
1603 }
1604
1605 func (cl *Client) setMetaData(t *Torrent, md *metainfo.Info, bytes []byte) (err error) {
1606         err = t.setMetadata(md, bytes)
1607         if err != nil {
1608                 return
1609         }
1610         if !cl.config.DisableMetainfoCache {
1611                 if err := cl.saveTorrentFile(t); err != nil {
1612                         log.Printf("error saving torrent file for %s: %s", t, err)
1613                 }
1614         }
1615         cl.event.Broadcast()
1616         close(t.gotMetainfo)
1617         return
1618 }
1619
1620 // Prepare a Torrent without any attachment to a Client. That means we can
1621 // initialize fields all fields that don't require the Client without locking
1622 // it.
1623 func newTorrent(ih metainfo.Hash) (t *Torrent) {
1624         t = &Torrent{
1625                 infoHash:  ih,
1626                 chunkSize: defaultChunkSize,
1627                 peers:     make(map[peersKey]Peer),
1628
1629                 closing:           make(chan struct{}),
1630                 ceasingNetworking: make(chan struct{}),
1631
1632                 gotMetainfo: make(chan struct{}),
1633
1634                 halfOpen:          make(map[string]struct{}),
1635                 pieceStateChanges: pubsub.NewPubSub(),
1636         }
1637         return
1638 }
1639
1640 func init() {
1641         // For shuffling the tracker tiers.
1642         mathRand.Seed(time.Now().Unix())
1643 }
1644
1645 type trackerTier []string
1646
1647 // The trackers within each tier must be shuffled before use.
1648 // http://stackoverflow.com/a/12267471/149482
1649 // http://www.bittorrent.org/beps/bep_0012.html#order-of-processing
1650 func shuffleTier(tier trackerTier) {
1651         for i := range tier {
1652                 j := mathRand.Intn(i + 1)
1653                 tier[i], tier[j] = tier[j], tier[i]
1654         }
1655 }
1656
1657 func copyTrackers(base []trackerTier) (copy []trackerTier) {
1658         for _, tier := range base {
1659                 copy = append(copy, append(trackerTier(nil), tier...))
1660         }
1661         return
1662 }
1663
1664 func mergeTier(tier trackerTier, newURLs []string) trackerTier {
1665 nextURL:
1666         for _, url := range newURLs {
1667                 for _, trURL := range tier {
1668                         if trURL == url {
1669                                 continue nextURL
1670                         }
1671                 }
1672                 tier = append(tier, url)
1673         }
1674         return tier
1675 }
1676
1677 func (t *Torrent) addTrackers(announceList [][]string) {
1678         newTrackers := copyTrackers(t.trackers)
1679         for tierIndex, tier := range announceList {
1680                 if tierIndex < len(newTrackers) {
1681                         newTrackers[tierIndex] = mergeTier(newTrackers[tierIndex], tier)
1682                 } else {
1683                         newTrackers = append(newTrackers, mergeTier(nil, tier))
1684                 }
1685                 shuffleTier(newTrackers[tierIndex])
1686         }
1687         t.trackers = newTrackers
1688 }
1689
1690 // Don't call this before the info is available.
1691 func (t *Torrent) bytesCompleted() int64 {
1692         if !t.haveInfo() {
1693                 return 0
1694         }
1695         return t.info.TotalLength() - t.bytesLeft()
1696 }
1697
1698 // A file-like handle to some torrent data resource.
1699 type Handle interface {
1700         io.Reader
1701         io.Seeker
1702         io.Closer
1703         io.ReaderAt
1704 }
1705
1706 // Returns handles to the files in the torrent. This requires the metainfo is
1707 // available first.
1708 func (t *Torrent) Files() (ret []File) {
1709         t.cl.mu.Lock()
1710         info := t.Info()
1711         t.cl.mu.Unlock()
1712         if info == nil {
1713                 return
1714         }
1715         var offset int64
1716         for _, fi := range info.UpvertedFiles() {
1717                 ret = append(ret, File{
1718                         t,
1719                         strings.Join(append([]string{info.Name}, fi.Path...), "/"),
1720                         offset,
1721                         fi.Length,
1722                         fi,
1723                 })
1724                 offset += fi.Length
1725         }
1726         return
1727 }
1728
1729 func (t *Torrent) AddPeers(pp []Peer) error {
1730         cl := t.cl
1731         cl.mu.Lock()
1732         defer cl.mu.Unlock()
1733         cl.addPeers(t, pp)
1734         return nil
1735 }
1736
1737 // Marks the entire torrent for download. Requires the info first, see
1738 // GotInfo.
1739 func (t *Torrent) DownloadAll() {
1740         t.cl.mu.Lock()
1741         defer t.cl.mu.Unlock()
1742         t.pendPieceRange(0, t.numPieces())
1743 }
1744
1745 // Returns nil metainfo if it isn't in the cache. Checks that the retrieved
1746 // metainfo has the correct infohash.
1747 func (cl *Client) torrentCacheMetaInfo(ih metainfo.Hash) (mi *metainfo.MetaInfo, err error) {
1748         if cl.config.DisableMetainfoCache {
1749                 return
1750         }
1751         f, err := os.Open(cl.cachedMetaInfoFilename(ih))
1752         if err != nil {
1753                 if os.IsNotExist(err) {
1754                         err = nil
1755                 }
1756                 return
1757         }
1758         defer f.Close()
1759         dec := bencode.NewDecoder(f)
1760         err = dec.Decode(&mi)
1761         if err != nil {
1762                 return
1763         }
1764         if !bytes.Equal(mi.Info.Hash.Bytes(), ih[:]) {
1765                 err = fmt.Errorf("cached torrent has wrong infohash: %x != %x", mi.Info.Hash, ih[:])
1766                 return
1767         }
1768         return
1769 }
1770
1771 // Specifies a new torrent for adding to a client. There are helpers for
1772 // magnet URIs and torrent metainfo files.
1773 type TorrentSpec struct {
1774         // The tiered tracker URIs.
1775         Trackers [][]string
1776         InfoHash metainfo.Hash
1777         Info     *metainfo.InfoEx
1778         // The name to use if the Name field from the Info isn't available.
1779         DisplayName string
1780         // The chunk size to use for outbound requests. Defaults to 16KiB if not
1781         // set.
1782         ChunkSize int
1783         Storage   storage.I
1784 }
1785
1786 func TorrentSpecFromMagnetURI(uri string) (spec *TorrentSpec, err error) {
1787         m, err := metainfo.ParseMagnetURI(uri)
1788         if err != nil {
1789                 return
1790         }
1791         spec = &TorrentSpec{
1792                 Trackers:    [][]string{m.Trackers},
1793                 DisplayName: m.DisplayName,
1794                 InfoHash:    m.InfoHash,
1795         }
1796         return
1797 }
1798
1799 func TorrentSpecFromMetaInfo(mi *metainfo.MetaInfo) (spec *TorrentSpec) {
1800         spec = &TorrentSpec{
1801                 Trackers:    mi.AnnounceList,
1802                 Info:        &mi.Info,
1803                 DisplayName: mi.Info.Name,
1804         }
1805
1806         if len(spec.Trackers) == 0 {
1807                 spec.Trackers = [][]string{[]string{mi.Announce}}
1808         } else {
1809                 spec.Trackers[0] = append(spec.Trackers[0], mi.Announce)
1810         }
1811
1812         missinggo.CopyExact(&spec.InfoHash, mi.Info.Hash)
1813         return
1814 }
1815
1816 // Add or merge a torrent spec. If the torrent is already present, the
1817 // trackers will be merged with the existing ones. If the Info isn't yet
1818 // known, it will be set. The display name is replaced if the new spec
1819 // provides one. Returns new if the torrent wasn't already in the client.
1820 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1821         cl.mu.Lock()
1822         defer cl.mu.Unlock()
1823
1824         t, ok := cl.torrents[spec.InfoHash]
1825         if !ok {
1826                 new = true
1827
1828                 // TODO: This doesn't belong in the core client, it's more of a
1829                 // helper.
1830                 if _, ok := cl.bannedTorrents[spec.InfoHash]; ok {
1831                         err = errors.New("banned torrent")
1832                         return
1833                 }
1834                 // TODO: Tidy this up?
1835                 t = newTorrent(spec.InfoHash)
1836                 t.cl = cl
1837                 t.wantPeers.L = &cl.mu
1838                 if spec.ChunkSize != 0 {
1839                         t.chunkSize = pp.Integer(spec.ChunkSize)
1840                 }
1841                 t.storageOpener = spec.Storage
1842                 if t.storageOpener == nil {
1843                         t.storageOpener = cl.defaultStorage
1844                 }
1845         }
1846         if spec.DisplayName != "" {
1847                 t.setDisplayName(spec.DisplayName)
1848         }
1849         // Try to merge in info we have on the torrent. Any err left will
1850         // terminate the function.
1851         if t.info == nil {
1852                 if spec.Info != nil {
1853                         err = cl.setMetaData(t, &spec.Info.Info, spec.Info.Bytes)
1854                 } else {
1855                         var mi *metainfo.MetaInfo
1856                         mi, err = cl.torrentCacheMetaInfo(spec.InfoHash)
1857                         if err != nil {
1858                                 log.Printf("error getting cached metainfo: %s", err)
1859                                 err = nil
1860                         } else if mi != nil {
1861                                 t.addTrackers(mi.AnnounceList)
1862                                 err = cl.setMetaData(t, &mi.Info.Info, mi.Info.Bytes)
1863                         }
1864                 }
1865         }
1866         if err != nil {
1867                 return
1868         }
1869         t.addTrackers(spec.Trackers)
1870
1871         cl.torrents[spec.InfoHash] = t
1872         t.maybeNewConns()
1873
1874         // From this point onwards, we can consider the torrent a part of the
1875         // client.
1876         if new {
1877                 if !cl.config.DisableTrackers {
1878                         go cl.announceTorrentTrackers(t)
1879                 }
1880                 if cl.dHT != nil {
1881                         go cl.announceTorrentDHT(t, true)
1882                 }
1883         }
1884         return
1885 }
1886
1887 func (me *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1888         t, ok := me.torrents[infoHash]
1889         if !ok {
1890                 err = fmt.Errorf("no such torrent")
1891                 return
1892         }
1893         err = t.close()
1894         if err != nil {
1895                 panic(err)
1896         }
1897         delete(me.torrents, infoHash)
1898         return
1899 }
1900
1901 // Returns true when peers are required, or false if the torrent is closing.
1902 func (cl *Client) waitWantPeers(t *Torrent) bool {
1903         cl.mu.Lock()
1904         defer cl.mu.Unlock()
1905         for {
1906                 select {
1907                 case <-t.ceasingNetworking:
1908                         return false
1909                 default:
1910                 }
1911                 if len(t.peers) > torrentPeersLowWater {
1912                         goto wait
1913                 }
1914                 if t.needData() || cl.seeding(t) {
1915                         return true
1916                 }
1917         wait:
1918                 t.wantPeers.Wait()
1919         }
1920 }
1921
1922 // Returns whether the client should make effort to seed the torrent.
1923 func (cl *Client) seeding(t *Torrent) bool {
1924         if cl.config.NoUpload {
1925                 return false
1926         }
1927         if !cl.config.Seed {
1928                 return false
1929         }
1930         if t.needData() {
1931                 return false
1932         }
1933         return true
1934 }
1935
1936 func (cl *Client) announceTorrentDHT(t *Torrent, impliedPort bool) {
1937         for cl.waitWantPeers(t) {
1938                 // log.Printf("getting peers for %q from DHT", t)
1939                 ps, err := cl.dHT.Announce(string(t.infoHash[:]), cl.incomingPeerPort(), impliedPort)
1940                 if err != nil {
1941                         log.Printf("error getting peers from dht: %s", err)
1942                         return
1943                 }
1944                 // Count all the unique addresses we got during this announce.
1945                 allAddrs := make(map[string]struct{})
1946         getPeers:
1947                 for {
1948                         select {
1949                         case v, ok := <-ps.Peers:
1950                                 if !ok {
1951                                         break getPeers
1952                                 }
1953                                 addPeers := make([]Peer, 0, len(v.Peers))
1954                                 for _, cp := range v.Peers {
1955                                         if cp.Port == 0 {
1956                                                 // Can't do anything with this.
1957                                                 continue
1958                                         }
1959                                         addPeers = append(addPeers, Peer{
1960                                                 IP:     cp.IP[:],
1961                                                 Port:   int(cp.Port),
1962                                                 Source: peerSourceDHT,
1963                                         })
1964                                         key := (&net.UDPAddr{
1965                                                 IP:   cp.IP[:],
1966                                                 Port: int(cp.Port),
1967                                         }).String()
1968                                         allAddrs[key] = struct{}{}
1969                                 }
1970                                 cl.mu.Lock()
1971                                 cl.addPeers(t, addPeers)
1972                                 numPeers := len(t.peers)
1973                                 cl.mu.Unlock()
1974                                 if numPeers >= torrentPeersHighWater {
1975                                         break getPeers
1976                                 }
1977                         case <-t.ceasingNetworking:
1978                                 ps.Close()
1979                                 return
1980                         }
1981                 }
1982                 ps.Close()
1983                 // log.Printf("finished DHT peer scrape for %s: %d peers", t, len(allAddrs))
1984         }
1985 }
1986
1987 func (cl *Client) trackerBlockedUnlocked(trRawURL string) (blocked bool, err error) {
1988         url_, err := url.Parse(trRawURL)
1989         if err != nil {
1990                 return
1991         }
1992         host, _, err := net.SplitHostPort(url_.Host)
1993         if err != nil {
1994                 host = url_.Host
1995         }
1996         addr, err := net.ResolveIPAddr("ip", host)
1997         if err != nil {
1998                 return
1999         }
2000         cl.mu.RLock()
2001         _, blocked = cl.ipBlockRange(addr.IP)
2002         cl.mu.RUnlock()
2003         return
2004 }
2005
2006 func (cl *Client) announceTorrentSingleTracker(tr string, req *tracker.AnnounceRequest, t *Torrent) error {
2007         blocked, err := cl.trackerBlockedUnlocked(tr)
2008         if err != nil {
2009                 return fmt.Errorf("error determining if tracker blocked: %s", err)
2010         }
2011         if blocked {
2012                 return fmt.Errorf("tracker blocked: %s", tr)
2013         }
2014         resp, err := tracker.Announce(tr, req)
2015         if err != nil {
2016                 return fmt.Errorf("error announcing: %s", err)
2017         }
2018         var peers []Peer
2019         for _, peer := range resp.Peers {
2020                 peers = append(peers, Peer{
2021                         IP:   peer.IP,
2022                         Port: peer.Port,
2023                 })
2024         }
2025         cl.mu.Lock()
2026         cl.addPeers(t, peers)
2027         cl.mu.Unlock()
2028
2029         // log.Printf("%s: %d new peers from %s", t, len(peers), tr)
2030
2031         time.Sleep(time.Second * time.Duration(resp.Interval))
2032         return nil
2033 }
2034
2035 func (cl *Client) announceTorrentTrackersFastStart(req *tracker.AnnounceRequest, trackers []trackerTier, t *Torrent) (atLeastOne bool) {
2036         oks := make(chan bool)
2037         outstanding := 0
2038         for _, tier := range trackers {
2039                 for _, tr := range tier {
2040                         outstanding++
2041                         go func(tr string) {
2042                                 err := cl.announceTorrentSingleTracker(tr, req, t)
2043                                 oks <- err == nil
2044                         }(tr)
2045                 }
2046         }
2047         for outstanding > 0 {
2048                 ok := <-oks
2049                 outstanding--
2050                 if ok {
2051                         atLeastOne = true
2052                 }
2053         }
2054         return
2055 }
2056
2057 // Announce torrent to its trackers.
2058 func (cl *Client) announceTorrentTrackers(t *Torrent) {
2059         req := tracker.AnnounceRequest{
2060                 Event:    tracker.Started,
2061                 NumWant:  -1,
2062                 Port:     uint16(cl.incomingPeerPort()),
2063                 PeerId:   cl.peerID,
2064                 InfoHash: t.infoHash,
2065         }
2066         if !cl.waitWantPeers(t) {
2067                 return
2068         }
2069         cl.mu.RLock()
2070         req.Left = t.bytesLeftAnnounce()
2071         trackers := t.trackers
2072         cl.mu.RUnlock()
2073         if cl.announceTorrentTrackersFastStart(&req, trackers, t) {
2074                 req.Event = tracker.None
2075         }
2076 newAnnounce:
2077         for cl.waitWantPeers(t) {
2078                 cl.mu.RLock()
2079                 req.Left = t.bytesLeftAnnounce()
2080                 trackers = t.trackers
2081                 cl.mu.RUnlock()
2082                 numTrackersTried := 0
2083                 for _, tier := range trackers {
2084                         for trIndex, tr := range tier {
2085                                 numTrackersTried++
2086                                 err := cl.announceTorrentSingleTracker(tr, &req, t)
2087                                 if err != nil {
2088                                         continue
2089                                 }
2090                                 // Float the successful announce to the top of the tier. If
2091                                 // the trackers list has been changed, we'll be modifying an
2092                                 // old copy so it won't matter.
2093                                 cl.mu.Lock()
2094                                 tier[0], tier[trIndex] = tier[trIndex], tier[0]
2095                                 cl.mu.Unlock()
2096
2097                                 req.Event = tracker.None
2098                                 continue newAnnounce
2099                         }
2100                 }
2101                 if numTrackersTried != 0 {
2102                         log.Printf("%s: all trackers failed", t)
2103                 }
2104                 // TODO: Wait until trackers are added if there are none.
2105                 time.Sleep(10 * time.Second)
2106         }
2107 }
2108
2109 func (cl *Client) allTorrentsCompleted() bool {
2110         for _, t := range cl.torrents {
2111                 if !t.haveInfo() {
2112                         return false
2113                 }
2114                 if t.numPiecesCompleted() != t.numPieces() {
2115                         return false
2116                 }
2117         }
2118         return true
2119 }
2120
2121 // Returns true when all torrents are completely downloaded and false if the
2122 // client is stopped before that.
2123 func (me *Client) WaitAll() bool {
2124         me.mu.Lock()
2125         defer me.mu.Unlock()
2126         for !me.allTorrentsCompleted() {
2127                 if me.closed.IsSet() {
2128                         return false
2129                 }
2130                 me.event.Wait()
2131         }
2132         return true
2133 }
2134
2135 // Handle a received chunk from a peer.
2136 func (me *Client) downloadedChunk(t *Torrent, c *connection, msg *pp.Message) {
2137         chunksReceived.Add(1)
2138
2139         req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
2140
2141         // Request has been satisfied.
2142         if me.connDeleteRequest(t, c, req) {
2143                 defer c.updateRequests()
2144         } else {
2145                 unexpectedChunksReceived.Add(1)
2146         }
2147
2148         index := int(req.Index)
2149         piece := &t.pieces[index]
2150
2151         // Do we actually want this chunk?
2152         if !t.wantChunk(req) {
2153                 unwantedChunksReceived.Add(1)
2154                 c.UnwantedChunksReceived++
2155                 return
2156         }
2157
2158         c.UsefulChunksReceived++
2159         c.lastUsefulChunkReceived = time.Now()
2160
2161         me.upload(t, c)
2162
2163         // Need to record that it hasn't been written yet, before we attempt to do
2164         // anything with it.
2165         piece.incrementPendingWrites()
2166         // Record that we have the chunk.
2167         piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
2168
2169         // Cancel pending requests for this chunk.
2170         for _, c := range t.conns {
2171                 if me.connCancel(t, c, req) {
2172                         c.updateRequests()
2173                 }
2174         }
2175
2176         me.mu.Unlock()
2177         // Write the chunk out.
2178         err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
2179         me.mu.Lock()
2180
2181         piece.decrementPendingWrites()
2182
2183         if err != nil {
2184                 log.Printf("%s: error writing chunk %v: %s", t, req, err)
2185                 t.pendRequest(req)
2186                 t.updatePieceCompletion(int(msg.Index))
2187                 return
2188         }
2189
2190         // It's important that the piece is potentially queued before we check if
2191         // the piece is still wanted, because if it is queued, it won't be wanted.
2192         if t.pieceAllDirty(index) {
2193                 me.queuePieceCheck(t, int(req.Index))
2194         }
2195
2196         if c.peerTouchedPieces == nil {
2197                 c.peerTouchedPieces = make(map[int]struct{})
2198         }
2199         c.peerTouchedPieces[index] = struct{}{}
2200
2201         me.event.Broadcast()
2202         t.publishPieceChange(int(req.Index))
2203         return
2204 }
2205
2206 // Return the connections that touched a piece, and clear the entry while
2207 // doing it.
2208 func (me *Client) reapPieceTouches(t *Torrent, piece int) (ret []*connection) {
2209         for _, c := range t.conns {
2210                 if _, ok := c.peerTouchedPieces[piece]; ok {
2211                         ret = append(ret, c)
2212                         delete(c.peerTouchedPieces, piece)
2213                 }
2214         }
2215         return
2216 }
2217
2218 func (me *Client) pieceHashed(t *Torrent, piece int, correct bool) {
2219         p := &t.pieces[piece]
2220         if p.EverHashed {
2221                 // Don't score the first time a piece is hashed, it could be an
2222                 // initial check.
2223                 if correct {
2224                         pieceHashedCorrect.Add(1)
2225                 } else {
2226                         log.Printf("%s: piece %d (%x) failed hash", t, piece, p.Hash)
2227                         pieceHashedNotCorrect.Add(1)
2228                 }
2229         }
2230         p.EverHashed = true
2231         touchers := me.reapPieceTouches(t, int(piece))
2232         if correct {
2233                 err := p.Storage().MarkComplete()
2234                 if err != nil {
2235                         log.Printf("%T: error completing piece %d: %s", t.storage, piece, err)
2236                 }
2237                 t.updatePieceCompletion(piece)
2238         } else if len(touchers) != 0 {
2239                 log.Printf("dropping %d conns that touched piece", len(touchers))
2240                 for _, c := range touchers {
2241                         me.dropConnection(t, c)
2242                 }
2243         }
2244         me.pieceChanged(t, int(piece))
2245 }
2246
2247 func (me *Client) onCompletedPiece(t *Torrent, piece int) {
2248         t.pendingPieces.Remove(piece)
2249         t.pendAllChunkSpecs(piece)
2250         for _, conn := range t.conns {
2251                 conn.Have(piece)
2252                 for r := range conn.Requests {
2253                         if int(r.Index) == piece {
2254                                 conn.Cancel(r)
2255                         }
2256                 }
2257                 // Could check here if peer doesn't have piece, but due to caching
2258                 // some peers may have said they have a piece but they don't.
2259                 me.upload(t, conn)
2260         }
2261 }
2262
2263 func (me *Client) onFailedPiece(t *Torrent, piece int) {
2264         if t.pieceAllDirty(piece) {
2265                 t.pendAllChunkSpecs(piece)
2266         }
2267         if !t.wantPiece(piece) {
2268                 return
2269         }
2270         me.openNewConns(t)
2271         for _, conn := range t.conns {
2272                 if conn.PeerHasPiece(piece) {
2273                         conn.updateRequests()
2274                 }
2275         }
2276 }
2277
2278 func (me *Client) pieceChanged(t *Torrent, piece int) {
2279         correct := t.pieceComplete(piece)
2280         defer me.event.Broadcast()
2281         if correct {
2282                 me.onCompletedPiece(t, piece)
2283         } else {
2284                 me.onFailedPiece(t, piece)
2285         }
2286         if t.updatePiecePriority(piece) {
2287                 t.piecePriorityChanged(piece)
2288         }
2289         t.publishPieceChange(piece)
2290 }
2291
2292 func (cl *Client) verifyPiece(t *Torrent, piece int) {
2293         cl.mu.Lock()
2294         defer cl.mu.Unlock()
2295         p := &t.pieces[piece]
2296         for p.Hashing || t.storage == nil {
2297                 cl.event.Wait()
2298         }
2299         p.QueuedForHash = false
2300         if t.isClosed() || t.pieceComplete(piece) {
2301                 t.updatePiecePriority(piece)
2302                 t.publishPieceChange(piece)
2303                 return
2304         }
2305         p.Hashing = true
2306         t.publishPieceChange(piece)
2307         cl.mu.Unlock()
2308         sum := t.hashPiece(piece)
2309         cl.mu.Lock()
2310         select {
2311         case <-t.closing:
2312                 return
2313         default:
2314         }
2315         p.Hashing = false
2316         cl.pieceHashed(t, piece, sum == p.Hash)
2317 }
2318
2319 // Returns handles to all the torrents loaded in the Client.
2320 func (me *Client) Torrents() (ret []*Torrent) {
2321         me.mu.Lock()
2322         for _, t := range me.torrents {
2323                 ret = append(ret, t)
2324         }
2325         me.mu.Unlock()
2326         return
2327 }
2328
2329 func (me *Client) AddMagnet(uri string) (T *Torrent, err error) {
2330         spec, err := TorrentSpecFromMagnetURI(uri)
2331         if err != nil {
2332                 return
2333         }
2334         T, _, err = me.AddTorrentSpec(spec)
2335         return
2336 }
2337
2338 func (me *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
2339         T, _, err = me.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
2340         var ss []string
2341         missinggo.CastSlice(&ss, mi.Nodes)
2342         me.AddDHTNodes(ss)
2343         return
2344 }
2345
2346 func (me *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
2347         mi, err := metainfo.LoadFromFile(filename)
2348         if err != nil {
2349                 return
2350         }
2351         return me.AddTorrent(mi)
2352 }
2353
2354 func (me *Client) DHT() *dht.Server {
2355         return me.dHT
2356 }
2357
2358 func (me *Client) AddDHTNodes(nodes []string) {
2359         for _, n := range nodes {
2360                 hmp := missinggo.SplitHostMaybePort(n)
2361                 ip := net.ParseIP(hmp.Host)
2362                 if ip == nil {
2363                         log.Printf("won't add DHT node with bad IP: %q", hmp.Host)
2364                         continue
2365                 }
2366                 ni := dht.NodeInfo{
2367                         Addr: dht.NewAddr(&net.UDPAddr{
2368                                 IP:   ip,
2369                                 Port: hmp.Port,
2370                         }),
2371                 }
2372                 me.DHT().AddNode(ni)
2373         }
2374 }