]> Sergey Matveev's repositories - btrtrc.git/blob - client.go
Merge all our global consts and vars into global.go
[btrtrc.git] / client.go
1 package torrent
2
3 import (
4         "bufio"
5         "bytes"
6         "crypto/rand"
7         "crypto/sha1"
8         "encoding/hex"
9         "errors"
10         "fmt"
11         "io"
12         "log"
13         "math/big"
14         mathRand "math/rand"
15         "net"
16         "net/url"
17         "os"
18         "path/filepath"
19         "sort"
20         "strconv"
21         "strings"
22         "time"
23
24         "github.com/anacrolix/missinggo"
25         "github.com/anacrolix/missinggo/bitmap"
26         "github.com/anacrolix/missinggo/pproffd"
27         "github.com/anacrolix/missinggo/pubsub"
28         "github.com/anacrolix/sync"
29         "github.com/anacrolix/utp"
30
31         "github.com/anacrolix/torrent/bencode"
32         "github.com/anacrolix/torrent/dht"
33         "github.com/anacrolix/torrent/iplist"
34         "github.com/anacrolix/torrent/metainfo"
35         "github.com/anacrolix/torrent/mse"
36         pp "github.com/anacrolix/torrent/peer_protocol"
37         "github.com/anacrolix/torrent/storage"
38         "github.com/anacrolix/torrent/tracker"
39 )
40
41 // Currently doesn't really queue, but should in the future.
42 func (cl *Client) queuePieceCheck(t *Torrent, pieceIndex int) {
43         piece := &t.pieces[pieceIndex]
44         if piece.QueuedForHash {
45                 return
46         }
47         piece.QueuedForHash = true
48         t.publishPieceChange(pieceIndex)
49         go cl.verifyPiece(t, pieceIndex)
50 }
51
52 // Queue a piece check if one isn't already queued, and the piece has never
53 // been checked before.
54 func (cl *Client) queueFirstHash(t *Torrent, piece int) {
55         p := &t.pieces[piece]
56         if p.EverHashed || p.Hashing || p.QueuedForHash || t.pieceComplete(piece) {
57                 return
58         }
59         cl.queuePieceCheck(t, piece)
60 }
61
62 // Clients contain zero or more Torrents. A client manages a blocklist, the
63 // TCP/UDP protocol ports, and DHT as desired.
64 type Client struct {
65         halfOpenLimit  int
66         peerID         [20]byte
67         listeners      []net.Listener
68         utpSock        *utp.Socket
69         dHT            *dht.Server
70         ipBlockList    iplist.Ranger
71         bannedTorrents map[metainfo.Hash]struct{}
72         config         Config
73         pruneTimer     *time.Timer
74         extensionBytes peerExtensionBytes
75         // Set of addresses that have our client ID. This intentionally will
76         // include ourselves if we end up trying to connect to our own address
77         // through legitimate channels.
78         dopplegangerAddrs map[string]struct{}
79
80         defaultStorage storage.I
81
82         mu     sync.RWMutex
83         event  sync.Cond
84         closed missinggo.Event
85
86         torrents map[metainfo.Hash]*Torrent
87 }
88
89 func (me *Client) IPBlockList() iplist.Ranger {
90         me.mu.Lock()
91         defer me.mu.Unlock()
92         return me.ipBlockList
93 }
94
95 func (me *Client) SetIPBlockList(list iplist.Ranger) {
96         me.mu.Lock()
97         defer me.mu.Unlock()
98         me.ipBlockList = list
99         if me.dHT != nil {
100                 me.dHT.SetIPBlockList(list)
101         }
102 }
103
104 func (me *Client) PeerID() string {
105         return string(me.peerID[:])
106 }
107
108 func (me *Client) ListenAddr() (addr net.Addr) {
109         for _, l := range me.listeners {
110                 addr = l.Addr()
111                 break
112         }
113         return
114 }
115
116 type hashSorter struct {
117         Hashes []metainfo.Hash
118 }
119
120 func (me hashSorter) Len() int {
121         return len(me.Hashes)
122 }
123
124 func (me hashSorter) Less(a, b int) bool {
125         return (&big.Int{}).SetBytes(me.Hashes[a][:]).Cmp((&big.Int{}).SetBytes(me.Hashes[b][:])) < 0
126 }
127
128 func (me hashSorter) Swap(a, b int) {
129         me.Hashes[a], me.Hashes[b] = me.Hashes[b], me.Hashes[a]
130 }
131
132 func (cl *Client) sortedTorrents() (ret []*Torrent) {
133         var hs hashSorter
134         for ih := range cl.torrents {
135                 hs.Hashes = append(hs.Hashes, ih)
136         }
137         sort.Sort(hs)
138         for _, ih := range hs.Hashes {
139                 ret = append(ret, cl.torrent(ih))
140         }
141         return
142 }
143
144 // Writes out a human readable status of the client, such as for writing to a
145 // HTTP status page.
146 func (cl *Client) WriteStatus(_w io.Writer) {
147         cl.mu.RLock()
148         defer cl.mu.RUnlock()
149         w := bufio.NewWriter(_w)
150         defer w.Flush()
151         if addr := cl.ListenAddr(); addr != nil {
152                 fmt.Fprintf(w, "Listening on %s\n", cl.ListenAddr())
153         } else {
154                 fmt.Fprintln(w, "Not listening!")
155         }
156         fmt.Fprintf(w, "Peer ID: %+q\n", cl.peerID)
157         if cl.dHT != nil {
158                 dhtStats := cl.dHT.Stats()
159                 fmt.Fprintf(w, "DHT nodes: %d (%d good, %d banned)\n", dhtStats.Nodes, dhtStats.GoodNodes, dhtStats.BadNodes)
160                 fmt.Fprintf(w, "DHT Server ID: %x\n", cl.dHT.ID())
161                 fmt.Fprintf(w, "DHT port: %d\n", missinggo.AddrPort(cl.dHT.Addr()))
162                 fmt.Fprintf(w, "DHT announces: %d\n", dhtStats.ConfirmedAnnounces)
163                 fmt.Fprintf(w, "Outstanding transactions: %d\n", dhtStats.OutstandingTransactions)
164         }
165         fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrents))
166         fmt.Fprintln(w)
167         for _, t := range cl.sortedTorrents() {
168                 if t.name() == "" {
169                         fmt.Fprint(w, "<unknown name>")
170                 } else {
171                         fmt.Fprint(w, t.name())
172                 }
173                 fmt.Fprint(w, "\n")
174                 if t.haveInfo() {
175                         fmt.Fprintf(w, "%f%% of %d bytes", 100*(1-float64(t.bytesLeft())/float64(t.length)), t.length)
176                 } else {
177                         w.WriteString("<missing metainfo>")
178                 }
179                 fmt.Fprint(w, "\n")
180                 t.writeStatus(w, cl)
181                 fmt.Fprintln(w)
182         }
183 }
184
185 func (cl *Client) configDir() string {
186         if cl.config.ConfigDir == "" {
187                 return filepath.Join(os.Getenv("HOME"), ".config/torrent")
188         }
189         return cl.config.ConfigDir
190 }
191
192 // The directory where the Client expects to find and store configuration
193 // data. Defaults to $HOME/.config/torrent.
194 func (cl *Client) ConfigDir() string {
195         return cl.configDir()
196 }
197
198 // Creates a new client.
199 func NewClient(cfg *Config) (cl *Client, err error) {
200         if cfg == nil {
201                 cfg = &Config{}
202         }
203
204         defer func() {
205                 if err != nil {
206                         cl = nil
207                 }
208         }()
209         cl = &Client{
210                 halfOpenLimit:     socketsPerTorrent,
211                 config:            *cfg,
212                 defaultStorage:    cfg.DefaultStorage,
213                 dopplegangerAddrs: make(map[string]struct{}),
214                 torrents:          make(map[metainfo.Hash]*Torrent),
215         }
216         missinggo.CopyExact(&cl.extensionBytes, defaultExtensionBytes)
217         cl.event.L = &cl.mu
218         if cl.defaultStorage == nil {
219                 cl.defaultStorage = storage.NewFile(cfg.DataDir)
220         }
221         if cfg.IPBlocklist != nil {
222                 cl.ipBlockList = cfg.IPBlocklist
223         }
224
225         if cfg.PeerID != "" {
226                 missinggo.CopyExact(&cl.peerID, cfg.PeerID)
227         } else {
228                 o := copy(cl.peerID[:], bep20)
229                 _, err = rand.Read(cl.peerID[o:])
230                 if err != nil {
231                         panic("error generating peer id")
232                 }
233         }
234
235         // Returns the laddr string to listen on for the next Listen call.
236         listenAddr := func() string {
237                 if addr := cl.ListenAddr(); addr != nil {
238                         return addr.String()
239                 }
240                 if cfg.ListenAddr == "" {
241                         return ":50007"
242                 }
243                 return cfg.ListenAddr
244         }
245         if !cl.config.DisableTCP {
246                 var l net.Listener
247                 l, err = net.Listen(func() string {
248                         if cl.config.DisableIPv6 {
249                                 return "tcp4"
250                         } else {
251                                 return "tcp"
252                         }
253                 }(), listenAddr())
254                 if err != nil {
255                         return
256                 }
257                 cl.listeners = append(cl.listeners, l)
258                 go cl.acceptConnections(l, false)
259         }
260         if !cl.config.DisableUTP {
261                 cl.utpSock, err = utp.NewSocket(func() string {
262                         if cl.config.DisableIPv6 {
263                                 return "udp4"
264                         } else {
265                                 return "udp"
266                         }
267                 }(), listenAddr())
268                 if err != nil {
269                         return
270                 }
271                 cl.listeners = append(cl.listeners, cl.utpSock)
272                 go cl.acceptConnections(cl.utpSock, true)
273         }
274         if !cfg.NoDHT {
275                 dhtCfg := cfg.DHTConfig
276                 if dhtCfg.IPBlocklist == nil {
277                         dhtCfg.IPBlocklist = cl.ipBlockList
278                 }
279                 if dhtCfg.Addr == "" {
280                         dhtCfg.Addr = listenAddr()
281                 }
282                 if dhtCfg.Conn == nil && cl.utpSock != nil {
283                         dhtCfg.Conn = cl.utpSock
284                 }
285                 cl.dHT, err = dht.NewServer(&dhtCfg)
286                 if err != nil {
287                         return
288                 }
289         }
290
291         return
292 }
293
294 // Stops the client. All connections to peers are closed and all activity will
295 // come to a halt.
296 func (me *Client) Close() {
297         me.mu.Lock()
298         defer me.mu.Unlock()
299         me.closed.Set()
300         if me.dHT != nil {
301                 me.dHT.Close()
302         }
303         for _, l := range me.listeners {
304                 l.Close()
305         }
306         for _, t := range me.torrents {
307                 t.close()
308         }
309         me.event.Broadcast()
310 }
311
312 var ipv6BlockRange = iplist.Range{Description: "non-IPv4 address"}
313
314 func (cl *Client) ipBlockRange(ip net.IP) (r iplist.Range, blocked bool) {
315         if cl.ipBlockList == nil {
316                 return
317         }
318         ip4 := ip.To4()
319         // If blocklists are enabled, then block non-IPv4 addresses, because
320         // blocklists do not yet support IPv6.
321         if ip4 == nil {
322                 if missinggo.CryHeard() {
323                         log.Printf("blocking non-IPv4 address: %s", ip)
324                 }
325                 r = ipv6BlockRange
326                 blocked = true
327                 return
328         }
329         return cl.ipBlockList.Lookup(ip4)
330 }
331
332 func (cl *Client) waitAccept() {
333         cl.mu.Lock()
334         defer cl.mu.Unlock()
335         for {
336                 for _, t := range cl.torrents {
337                         if cl.wantConns(t) {
338                                 return
339                         }
340                 }
341                 if cl.closed.IsSet() {
342                         return
343                 }
344                 cl.event.Wait()
345         }
346 }
347
348 func (cl *Client) acceptConnections(l net.Listener, utp bool) {
349         for {
350                 cl.waitAccept()
351                 conn, err := l.Accept()
352                 conn = pproffd.WrapNetConn(conn)
353                 if cl.closed.IsSet() {
354                         if conn != nil {
355                                 conn.Close()
356                         }
357                         return
358                 }
359                 if err != nil {
360                         log.Print(err)
361                         // I think something harsher should happen here? Our accept
362                         // routine just fucked off.
363                         return
364                 }
365                 if utp {
366                         acceptUTP.Add(1)
367                 } else {
368                         acceptTCP.Add(1)
369                 }
370                 cl.mu.RLock()
371                 doppleganger := cl.dopplegangerAddr(conn.RemoteAddr().String())
372                 _, blocked := cl.ipBlockRange(missinggo.AddrIP(conn.RemoteAddr()))
373                 cl.mu.RUnlock()
374                 if blocked || doppleganger {
375                         acceptReject.Add(1)
376                         // log.Printf("inbound connection from %s blocked by %s", conn.RemoteAddr(), blockRange)
377                         conn.Close()
378                         continue
379                 }
380                 go cl.incomingConnection(conn, utp)
381         }
382 }
383
384 func (cl *Client) incomingConnection(nc net.Conn, utp bool) {
385         defer nc.Close()
386         if tc, ok := nc.(*net.TCPConn); ok {
387                 tc.SetLinger(0)
388         }
389         c := newConnection()
390         c.conn = nc
391         c.rw = nc
392         c.Discovery = peerSourceIncoming
393         c.uTP = utp
394         err := cl.runReceivedConn(c)
395         if err != nil {
396                 // log.Print(err)
397         }
398 }
399
400 // Returns a handle to the given torrent, if it's present in the client.
401 func (cl *Client) Torrent(ih metainfo.Hash) (t *Torrent, ok bool) {
402         cl.mu.Lock()
403         defer cl.mu.Unlock()
404         t, ok = cl.torrents[ih]
405         return
406 }
407
408 func (me *Client) torrent(ih metainfo.Hash) *Torrent {
409         return me.torrents[ih]
410 }
411
412 type dialResult struct {
413         Conn net.Conn
414         UTP  bool
415 }
416
417 func doDial(dial func(addr string, t *Torrent) (net.Conn, error), ch chan dialResult, utp bool, addr string, t *Torrent) {
418         conn, err := dial(addr, t)
419         if err != nil {
420                 if conn != nil {
421                         conn.Close()
422                 }
423                 conn = nil // Pedantic
424         }
425         ch <- dialResult{conn, utp}
426         if err == nil {
427                 successfulDials.Add(1)
428                 return
429         }
430         unsuccessfulDials.Add(1)
431 }
432
433 func reducedDialTimeout(max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
434         ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
435         if ret < minDialTimeout {
436                 ret = minDialTimeout
437         }
438         return
439 }
440
441 // Returns whether an address is known to connect to a client with our own ID.
442 func (me *Client) dopplegangerAddr(addr string) bool {
443         _, ok := me.dopplegangerAddrs[addr]
444         return ok
445 }
446
447 // Start the process of connecting to the given peer for the given torrent if
448 // appropriate.
449 func (me *Client) initiateConn(peer Peer, t *Torrent) {
450         if peer.Id == me.peerID {
451                 return
452         }
453         addr := net.JoinHostPort(peer.IP.String(), fmt.Sprintf("%d", peer.Port))
454         if me.dopplegangerAddr(addr) || t.addrActive(addr) {
455                 duplicateConnsAvoided.Add(1)
456                 return
457         }
458         if r, ok := me.ipBlockRange(peer.IP); ok {
459                 log.Printf("outbound connect to %s blocked by IP blocklist rule %s", peer.IP, r)
460                 return
461         }
462         t.halfOpen[addr] = struct{}{}
463         go me.outgoingConnection(t, addr, peer.Source)
464 }
465
466 func (me *Client) dialTimeout(t *Torrent) time.Duration {
467         me.mu.Lock()
468         pendingPeers := len(t.peers)
469         me.mu.Unlock()
470         return reducedDialTimeout(nominalDialTimeout, me.halfOpenLimit, pendingPeers)
471 }
472
473 func (me *Client) dialTCP(addr string, t *Torrent) (c net.Conn, err error) {
474         c, err = net.DialTimeout("tcp", addr, me.dialTimeout(t))
475         if err == nil {
476                 c.(*net.TCPConn).SetLinger(0)
477         }
478         return
479 }
480
481 func (me *Client) dialUTP(addr string, t *Torrent) (c net.Conn, err error) {
482         return me.utpSock.DialTimeout(addr, me.dialTimeout(t))
483 }
484
485 // Returns a connection over UTP or TCP, whichever is first to connect.
486 func (me *Client) dialFirst(addr string, t *Torrent) (conn net.Conn, utp bool) {
487         // Initiate connections via TCP and UTP simultaneously. Use the first one
488         // that succeeds.
489         left := 0
490         if !me.config.DisableUTP {
491                 left++
492         }
493         if !me.config.DisableTCP {
494                 left++
495         }
496         resCh := make(chan dialResult, left)
497         if !me.config.DisableUTP {
498                 go doDial(me.dialUTP, resCh, true, addr, t)
499         }
500         if !me.config.DisableTCP {
501                 go doDial(me.dialTCP, resCh, false, addr, t)
502         }
503         var res dialResult
504         // Wait for a successful connection.
505         for ; left > 0 && res.Conn == nil; left-- {
506                 res = <-resCh
507         }
508         if left > 0 {
509                 // There are still incompleted dials.
510                 go func() {
511                         for ; left > 0; left-- {
512                                 conn := (<-resCh).Conn
513                                 if conn != nil {
514                                         conn.Close()
515                                 }
516                         }
517                 }()
518         }
519         conn = res.Conn
520         utp = res.UTP
521         return
522 }
523
524 func (me *Client) noLongerHalfOpen(t *Torrent, addr string) {
525         if _, ok := t.halfOpen[addr]; !ok {
526                 panic("invariant broken")
527         }
528         delete(t.halfOpen, addr)
529         me.openNewConns(t)
530 }
531
532 // Performs initiator handshakes and returns a connection. Returns nil
533 // *connection if no connection for valid reasons.
534 func (me *Client) handshakesConnection(nc net.Conn, t *Torrent, encrypted, utp bool) (c *connection, err error) {
535         c = newConnection()
536         c.conn = nc
537         c.rw = nc
538         c.encrypted = encrypted
539         c.uTP = utp
540         err = nc.SetDeadline(time.Now().Add(handshakesTimeout))
541         if err != nil {
542                 return
543         }
544         ok, err := me.initiateHandshakes(c, t)
545         if !ok {
546                 c = nil
547         }
548         return
549 }
550
551 // Returns nil connection and nil error if no connection could be established
552 // for valid reasons.
553 func (me *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) {
554         nc, utp := me.dialFirst(addr, t)
555         if nc == nil {
556                 return
557         }
558         c, err = me.handshakesConnection(nc, t, !me.config.DisableEncryption, utp)
559         if err != nil {
560                 nc.Close()
561                 return
562         } else if c != nil {
563                 return
564         }
565         nc.Close()
566         if me.config.DisableEncryption {
567                 // We already tried without encryption.
568                 return
569         }
570         // Try again without encryption, using whichever protocol type worked last
571         // time.
572         if utp {
573                 nc, err = me.dialUTP(addr, t)
574         } else {
575                 nc, err = me.dialTCP(addr, t)
576         }
577         if err != nil {
578                 err = fmt.Errorf("error dialing for unencrypted connection: %s", err)
579                 return
580         }
581         c, err = me.handshakesConnection(nc, t, false, utp)
582         if err != nil || c == nil {
583                 nc.Close()
584         }
585         return
586 }
587
588 // Called to dial out and run a connection. The addr we're given is already
589 // considered half-open.
590 func (me *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
591         c, err := me.establishOutgoingConn(t, addr)
592         me.mu.Lock()
593         defer me.mu.Unlock()
594         // Don't release lock between here and addConnection, unless it's for
595         // failure.
596         me.noLongerHalfOpen(t, addr)
597         if err != nil {
598                 if me.config.Debug {
599                         log.Printf("error establishing outgoing connection: %s", err)
600                 }
601                 return
602         }
603         if c == nil {
604                 return
605         }
606         defer c.Close()
607         c.Discovery = ps
608         err = me.runInitiatedHandshookConn(c, t)
609         if err != nil {
610                 if me.config.Debug {
611                         log.Printf("error in established outgoing connection: %s", err)
612                 }
613         }
614 }
615
616 // The port number for incoming peer connections. 0 if the client isn't
617 // listening.
618 func (cl *Client) incomingPeerPort() int {
619         listenAddr := cl.ListenAddr()
620         if listenAddr == nil {
621                 return 0
622         }
623         return missinggo.AddrPort(listenAddr)
624 }
625
626 // Convert a net.Addr to its compact IP representation. Either 4 or 16 bytes
627 // per "yourip" field of http://www.bittorrent.org/beps/bep_0010.html.
628 func addrCompactIP(addr net.Addr) (string, error) {
629         host, _, err := net.SplitHostPort(addr.String())
630         if err != nil {
631                 return "", err
632         }
633         ip := net.ParseIP(host)
634         if v4 := ip.To4(); v4 != nil {
635                 if len(v4) != 4 {
636                         panic(v4)
637                 }
638                 return string(v4), nil
639         }
640         return string(ip.To16()), nil
641 }
642
643 func handshakeWriter(w io.Writer, bb <-chan []byte, done chan<- error) {
644         var err error
645         for b := range bb {
646                 _, err = w.Write(b)
647                 if err != nil {
648                         break
649                 }
650         }
651         done <- err
652 }
653
654 type (
655         peerExtensionBytes [8]byte
656         peerID             [20]byte
657 )
658
659 func (me *peerExtensionBytes) SupportsExtended() bool {
660         return me[5]&0x10 != 0
661 }
662
663 func (me *peerExtensionBytes) SupportsDHT() bool {
664         return me[7]&0x01 != 0
665 }
666
667 func (me *peerExtensionBytes) SupportsFast() bool {
668         return me[7]&0x04 != 0
669 }
670
671 type handshakeResult struct {
672         peerExtensionBytes
673         peerID
674         metainfo.Hash
675 }
676
677 // ih is nil if we expect the peer to declare the InfoHash, such as when the
678 // peer initiated the connection. Returns ok if the handshake was successful,
679 // and err if there was an unexpected condition other than the peer simply
680 // abandoning the handshake.
681 func handshake(sock io.ReadWriter, ih *metainfo.Hash, peerID [20]byte, extensions peerExtensionBytes) (res handshakeResult, ok bool, err error) {
682         // Bytes to be sent to the peer. Should never block the sender.
683         postCh := make(chan []byte, 4)
684         // A single error value sent when the writer completes.
685         writeDone := make(chan error, 1)
686         // Performs writes to the socket and ensures posts don't block.
687         go handshakeWriter(sock, postCh, writeDone)
688
689         defer func() {
690                 close(postCh) // Done writing.
691                 if !ok {
692                         return
693                 }
694                 if err != nil {
695                         panic(err)
696                 }
697                 // Wait until writes complete before returning from handshake.
698                 err = <-writeDone
699                 if err != nil {
700                         err = fmt.Errorf("error writing: %s", err)
701                 }
702         }()
703
704         post := func(bb []byte) {
705                 select {
706                 case postCh <- bb:
707                 default:
708                         panic("mustn't block while posting")
709                 }
710         }
711
712         post([]byte(pp.Protocol))
713         post(extensions[:])
714         if ih != nil { // We already know what we want.
715                 post(ih[:])
716                 post(peerID[:])
717         }
718         var b [68]byte
719         _, err = io.ReadFull(sock, b[:68])
720         if err != nil {
721                 err = nil
722                 return
723         }
724         if string(b[:20]) != pp.Protocol {
725                 return
726         }
727         missinggo.CopyExact(&res.peerExtensionBytes, b[20:28])
728         missinggo.CopyExact(&res.Hash, b[28:48])
729         missinggo.CopyExact(&res.peerID, b[48:68])
730         peerExtensions.Add(hex.EncodeToString(res.peerExtensionBytes[:]), 1)
731
732         // TODO: Maybe we can just drop peers here if we're not interested. This
733         // could prevent them trying to reconnect, falsely believing there was
734         // just a problem.
735         if ih == nil { // We were waiting for the peer to tell us what they wanted.
736                 post(res.Hash[:])
737                 post(peerID[:])
738         }
739
740         ok = true
741         return
742 }
743
744 // Wraps a raw connection and provides the interface we want for using the
745 // connection in the message loop.
746 type deadlineReader struct {
747         nc net.Conn
748         r  io.Reader
749 }
750
751 func (me deadlineReader) Read(b []byte) (n int, err error) {
752         // Keep-alives should be received every 2 mins. Give a bit of gracetime.
753         err = me.nc.SetReadDeadline(time.Now().Add(150 * time.Second))
754         if err != nil {
755                 err = fmt.Errorf("error setting read deadline: %s", err)
756         }
757         n, err = me.r.Read(b)
758         // Convert common errors into io.EOF.
759         // if err != nil {
760         //      if opError, ok := err.(*net.OpError); ok && opError.Op == "read" && opError.Err == syscall.ECONNRESET {
761         //              err = io.EOF
762         //      } else if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
763         //              if n != 0 {
764         //                      panic(n)
765         //              }
766         //              err = io.EOF
767         //      }
768         // }
769         return
770 }
771
772 type readWriter struct {
773         io.Reader
774         io.Writer
775 }
776
777 func maybeReceiveEncryptedHandshake(rw io.ReadWriter, skeys [][]byte) (ret io.ReadWriter, encrypted bool, err error) {
778         var protocol [len(pp.Protocol)]byte
779         _, err = io.ReadFull(rw, protocol[:])
780         if err != nil {
781                 return
782         }
783         ret = readWriter{
784                 io.MultiReader(bytes.NewReader(protocol[:]), rw),
785                 rw,
786         }
787         if string(protocol[:]) == pp.Protocol {
788                 return
789         }
790         encrypted = true
791         ret, err = mse.ReceiveHandshake(ret, skeys)
792         return
793 }
794
795 func (cl *Client) receiveSkeys() (ret [][]byte) {
796         for ih := range cl.torrents {
797                 ret = append(ret, ih[:])
798         }
799         return
800 }
801
802 func (me *Client) initiateHandshakes(c *connection, t *Torrent) (ok bool, err error) {
803         if c.encrypted {
804                 c.rw, err = mse.InitiateHandshake(c.rw, t.infoHash[:], nil)
805                 if err != nil {
806                         return
807                 }
808         }
809         ih, ok, err := me.connBTHandshake(c, &t.infoHash)
810         if ih != t.infoHash {
811                 ok = false
812         }
813         return
814 }
815
816 // Do encryption and bittorrent handshakes as receiver.
817 func (cl *Client) receiveHandshakes(c *connection) (t *Torrent, err error) {
818         cl.mu.Lock()
819         skeys := cl.receiveSkeys()
820         cl.mu.Unlock()
821         if !cl.config.DisableEncryption {
822                 c.rw, c.encrypted, err = maybeReceiveEncryptedHandshake(c.rw, skeys)
823                 if err != nil {
824                         if err == mse.ErrNoSecretKeyMatch {
825                                 err = nil
826                         }
827                         return
828                 }
829         }
830         ih, ok, err := cl.connBTHandshake(c, nil)
831         if err != nil {
832                 err = fmt.Errorf("error during bt handshake: %s", err)
833                 return
834         }
835         if !ok {
836                 return
837         }
838         cl.mu.Lock()
839         t = cl.torrents[ih]
840         cl.mu.Unlock()
841         return
842 }
843
844 // Returns !ok if handshake failed for valid reasons.
845 func (cl *Client) connBTHandshake(c *connection, ih *metainfo.Hash) (ret metainfo.Hash, ok bool, err error) {
846         res, ok, err := handshake(c.rw, ih, cl.peerID, cl.extensionBytes)
847         if err != nil || !ok {
848                 return
849         }
850         ret = res.Hash
851         c.PeerExtensionBytes = res.peerExtensionBytes
852         c.PeerID = res.peerID
853         c.completedHandshake = time.Now()
854         return
855 }
856
857 func (cl *Client) runInitiatedHandshookConn(c *connection, t *Torrent) (err error) {
858         if c.PeerID == cl.peerID {
859                 // Only if we initiated the connection is the remote address a
860                 // listen addr for a doppleganger.
861                 connsToSelf.Add(1)
862                 addr := c.conn.RemoteAddr().String()
863                 cl.dopplegangerAddrs[addr] = struct{}{}
864                 return
865         }
866         return cl.runHandshookConn(c, t)
867 }
868
869 func (cl *Client) runReceivedConn(c *connection) (err error) {
870         err = c.conn.SetDeadline(time.Now().Add(handshakesTimeout))
871         if err != nil {
872                 return
873         }
874         t, err := cl.receiveHandshakes(c)
875         if err != nil {
876                 err = fmt.Errorf("error receiving handshakes: %s", err)
877                 return
878         }
879         if t == nil {
880                 return
881         }
882         cl.mu.Lock()
883         defer cl.mu.Unlock()
884         if c.PeerID == cl.peerID {
885                 return
886         }
887         return cl.runHandshookConn(c, t)
888 }
889
890 func (cl *Client) runHandshookConn(c *connection, t *Torrent) (err error) {
891         c.conn.SetWriteDeadline(time.Time{})
892         c.rw = readWriter{
893                 deadlineReader{c.conn, c.rw},
894                 c.rw,
895         }
896         completedHandshakeConnectionFlags.Add(c.connectionFlags(), 1)
897         if !cl.addConnection(t, c) {
898                 return
899         }
900         defer cl.dropConnection(t, c)
901         go c.writer()
902         go c.writeOptimizer(time.Minute)
903         cl.sendInitialMessages(c, t)
904         err = cl.connectionLoop(t, c)
905         if err != nil {
906                 err = fmt.Errorf("error during connection loop: %s", err)
907         }
908         return
909 }
910
911 func (me *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
912         if conn.PeerExtensionBytes.SupportsExtended() && me.extensionBytes.SupportsExtended() {
913                 conn.Post(pp.Message{
914                         Type:       pp.Extended,
915                         ExtendedID: pp.HandshakeExtendedID,
916                         ExtendedPayload: func() []byte {
917                                 d := map[string]interface{}{
918                                         "m": func() (ret map[string]int) {
919                                                 ret = make(map[string]int, 2)
920                                                 ret["ut_metadata"] = metadataExtendedId
921                                                 if !me.config.DisablePEX {
922                                                         ret["ut_pex"] = pexExtendedId
923                                                 }
924                                                 return
925                                         }(),
926                                         "v": extendedHandshakeClientVersion,
927                                         // No upload queue is implemented yet.
928                                         "reqq": 64,
929                                 }
930                                 if !me.config.DisableEncryption {
931                                         d["e"] = 1
932                                 }
933                                 if torrent.metadataSizeKnown() {
934                                         d["metadata_size"] = torrent.metadataSize()
935                                 }
936                                 if p := me.incomingPeerPort(); p != 0 {
937                                         d["p"] = p
938                                 }
939                                 yourip, err := addrCompactIP(conn.remoteAddr())
940                                 if err != nil {
941                                         log.Printf("error calculating yourip field value in extension handshake: %s", err)
942                                 } else {
943                                         d["yourip"] = yourip
944                                 }
945                                 // log.Printf("sending %v", d)
946                                 b, err := bencode.Marshal(d)
947                                 if err != nil {
948                                         panic(err)
949                                 }
950                                 return b
951                         }(),
952                 })
953         }
954         if torrent.haveAnyPieces() {
955                 conn.Bitfield(torrent.bitfield())
956         } else if me.extensionBytes.SupportsFast() && conn.PeerExtensionBytes.SupportsFast() {
957                 conn.Post(pp.Message{
958                         Type: pp.HaveNone,
959                 })
960         }
961         if conn.PeerExtensionBytes.SupportsDHT() && me.extensionBytes.SupportsDHT() && me.dHT != nil {
962                 conn.Post(pp.Message{
963                         Type: pp.Port,
964                         Port: uint16(missinggo.AddrPort(me.dHT.Addr())),
965                 })
966         }
967 }
968
969 func (me *Client) peerUnchoked(torrent *Torrent, conn *connection) {
970         conn.updateRequests()
971 }
972
973 func (cl *Client) connCancel(t *Torrent, cn *connection, r request) (ok bool) {
974         ok = cn.Cancel(r)
975         if ok {
976                 postedCancels.Add(1)
977         }
978         return
979 }
980
981 func (cl *Client) connDeleteRequest(t *Torrent, cn *connection, r request) bool {
982         if !cn.RequestPending(r) {
983                 return false
984         }
985         delete(cn.Requests, r)
986         return true
987 }
988
989 func (cl *Client) requestPendingMetadata(t *Torrent, c *connection) {
990         if t.haveInfo() {
991                 return
992         }
993         if c.PeerExtensionIDs["ut_metadata"] == 0 {
994                 // Peer doesn't support this.
995                 return
996         }
997         // Request metadata pieces that we don't have in a random order.
998         var pending []int
999         for index := 0; index < t.metadataPieceCount(); index++ {
1000                 if !t.haveMetadataPiece(index) && !c.requestedMetadataPiece(index) {
1001                         pending = append(pending, index)
1002                 }
1003         }
1004         for _, i := range mathRand.Perm(len(pending)) {
1005                 c.requestMetadataPiece(pending[i])
1006         }
1007 }
1008
1009 func (cl *Client) completedMetadata(t *Torrent) {
1010         h := sha1.New()
1011         h.Write(t.metadataBytes)
1012         var ih metainfo.Hash
1013         missinggo.CopyExact(&ih, h.Sum(nil))
1014         if ih != t.infoHash {
1015                 log.Print("bad metadata")
1016                 t.invalidateMetadata()
1017                 return
1018         }
1019         var info metainfo.Info
1020         err := bencode.Unmarshal(t.metadataBytes, &info)
1021         if err != nil {
1022                 log.Printf("error unmarshalling metadata: %s", err)
1023                 t.invalidateMetadata()
1024                 return
1025         }
1026         // TODO(anacrolix): If this fails, I think something harsher should be
1027         // done.
1028         err = cl.setMetaData(t, &info, t.metadataBytes)
1029         if err != nil {
1030                 log.Printf("error setting metadata: %s", err)
1031                 t.invalidateMetadata()
1032                 return
1033         }
1034         if cl.config.Debug {
1035                 log.Printf("%s: got metadata from peers", t)
1036         }
1037 }
1038
1039 // Process incoming ut_metadata message.
1040 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *Torrent, c *connection) (err error) {
1041         var d map[string]int
1042         err = bencode.Unmarshal(payload, &d)
1043         if err != nil {
1044                 err = fmt.Errorf("error unmarshalling payload: %s: %q", err, payload)
1045                 return
1046         }
1047         msgType, ok := d["msg_type"]
1048         if !ok {
1049                 err = errors.New("missing msg_type field")
1050                 return
1051         }
1052         piece := d["piece"]
1053         switch msgType {
1054         case pp.DataMetadataExtensionMsgType:
1055                 if t.haveInfo() {
1056                         break
1057                 }
1058                 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
1059                 if begin < 0 || begin >= len(payload) {
1060                         log.Printf("got bad metadata piece")
1061                         break
1062                 }
1063                 if !c.requestedMetadataPiece(piece) {
1064                         log.Printf("got unexpected metadata piece %d", piece)
1065                         break
1066                 }
1067                 c.metadataRequests[piece] = false
1068                 t.saveMetadataPiece(piece, payload[begin:])
1069                 c.UsefulChunksReceived++
1070                 c.lastUsefulChunkReceived = time.Now()
1071                 if !t.haveAllMetadataPieces() {
1072                         break
1073                 }
1074                 cl.completedMetadata(t)
1075         case pp.RequestMetadataExtensionMsgType:
1076                 if !t.haveMetadataPiece(piece) {
1077                         c.Post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
1078                         break
1079                 }
1080                 start := (1 << 14) * piece
1081                 c.Post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.metadataBytes[start:start+t.metadataPieceSize(piece)]))
1082         case pp.RejectMetadataExtensionMsgType:
1083         default:
1084                 err = errors.New("unknown msg_type value")
1085         }
1086         return
1087 }
1088
1089 func (me *Client) upload(t *Torrent, c *connection) {
1090         if me.config.NoUpload {
1091                 return
1092         }
1093         if !c.PeerInterested {
1094                 return
1095         }
1096         seeding := me.seeding(t)
1097         if !seeding && !t.connHasWantedPieces(c) {
1098                 return
1099         }
1100 another:
1101         for seeding || c.chunksSent < c.UsefulChunksReceived+6 {
1102                 c.Unchoke()
1103                 for r := range c.PeerRequests {
1104                         err := me.sendChunk(t, c, r)
1105                         if err != nil {
1106                                 if t.pieceComplete(int(r.Index)) && err == io.ErrUnexpectedEOF {
1107                                         // We had the piece, but not anymore.
1108                                 } else {
1109                                         log.Printf("error sending chunk %+v to peer: %s", r, err)
1110                                 }
1111                                 // If we failed to send a chunk, choke the peer to ensure they
1112                                 // flush all their requests. We've probably dropped a piece,
1113                                 // but there's no way to communicate this to the peer. If they
1114                                 // ask for it again, we'll kick them to allow us to send them
1115                                 // an updated bitfield.
1116                                 break another
1117                         }
1118                         delete(c.PeerRequests, r)
1119                         goto another
1120                 }
1121                 return
1122         }
1123         c.Choke()
1124 }
1125
1126 func (me *Client) sendChunk(t *Torrent, c *connection, r request) error {
1127         // Count the chunk being sent, even if it isn't.
1128         b := make([]byte, r.Length)
1129         p := t.info.Piece(int(r.Index))
1130         n, err := t.readAt(b, p.Offset()+int64(r.Begin))
1131         if n != len(b) {
1132                 if err == nil {
1133                         panic("expected error")
1134                 }
1135                 return err
1136         }
1137         c.Post(pp.Message{
1138                 Type:  pp.Piece,
1139                 Index: r.Index,
1140                 Begin: r.Begin,
1141                 Piece: b,
1142         })
1143         c.chunksSent++
1144         uploadChunksPosted.Add(1)
1145         c.lastChunkSent = time.Now()
1146         return nil
1147 }
1148
1149 // Processes incoming bittorrent messages. The client lock is held upon entry
1150 // and exit.
1151 func (me *Client) connectionLoop(t *Torrent, c *connection) error {
1152         decoder := pp.Decoder{
1153                 R:         bufio.NewReader(c.rw),
1154                 MaxLength: 256 * 1024,
1155         }
1156         for {
1157                 me.mu.Unlock()
1158                 var msg pp.Message
1159                 err := decoder.Decode(&msg)
1160                 me.mu.Lock()
1161                 if me.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
1162                         return nil
1163                 }
1164                 if err != nil {
1165                         return err
1166                 }
1167                 c.lastMessageReceived = time.Now()
1168                 if msg.Keepalive {
1169                         receivedKeepalives.Add(1)
1170                         continue
1171                 }
1172                 receivedMessageTypes.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
1173                 switch msg.Type {
1174                 case pp.Choke:
1175                         c.PeerChoked = true
1176                         c.Requests = nil
1177                         // We can then reset our interest.
1178                         c.updateRequests()
1179                 case pp.Reject:
1180                         me.connDeleteRequest(t, c, newRequest(msg.Index, msg.Begin, msg.Length))
1181                         c.updateRequests()
1182                 case pp.Unchoke:
1183                         c.PeerChoked = false
1184                         me.peerUnchoked(t, c)
1185                 case pp.Interested:
1186                         c.PeerInterested = true
1187                         me.upload(t, c)
1188                 case pp.NotInterested:
1189                         c.PeerInterested = false
1190                         c.Choke()
1191                 case pp.Have:
1192                         err = c.peerSentHave(int(msg.Index))
1193                 case pp.Request:
1194                         if c.Choked {
1195                                 break
1196                         }
1197                         if !c.PeerInterested {
1198                                 err = errors.New("peer sent request but isn't interested")
1199                                 break
1200                         }
1201                         if !t.havePiece(msg.Index.Int()) {
1202                                 // This isn't necessarily them screwing up. We can drop pieces
1203                                 // from our storage, and can't communicate this to peers
1204                                 // except by reconnecting.
1205                                 requestsReceivedForMissingPieces.Add(1)
1206                                 err = errors.New("peer requested piece we don't have")
1207                                 break
1208                         }
1209                         if c.PeerRequests == nil {
1210                                 c.PeerRequests = make(map[request]struct{}, maxRequests)
1211                         }
1212                         c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{}
1213                         me.upload(t, c)
1214                 case pp.Cancel:
1215                         req := newRequest(msg.Index, msg.Begin, msg.Length)
1216                         if !c.PeerCancel(req) {
1217                                 unexpectedCancels.Add(1)
1218                         }
1219                 case pp.Bitfield:
1220                         err = c.peerSentBitfield(msg.Bitfield)
1221                 case pp.HaveAll:
1222                         err = c.peerSentHaveAll()
1223                 case pp.HaveNone:
1224                         err = c.peerSentHaveNone()
1225                 case pp.Piece:
1226                         me.downloadedChunk(t, c, &msg)
1227                 case pp.Extended:
1228                         switch msg.ExtendedID {
1229                         case pp.HandshakeExtendedID:
1230                                 // TODO: Create a bencode struct for this.
1231                                 var d map[string]interface{}
1232                                 err = bencode.Unmarshal(msg.ExtendedPayload, &d)
1233                                 if err != nil {
1234                                         err = fmt.Errorf("error decoding extended message payload: %s", err)
1235                                         break
1236                                 }
1237                                 // log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d)
1238                                 if reqq, ok := d["reqq"]; ok {
1239                                         if i, ok := reqq.(int64); ok {
1240                                                 c.PeerMaxRequests = int(i)
1241                                         }
1242                                 }
1243                                 if v, ok := d["v"]; ok {
1244                                         c.PeerClientName = v.(string)
1245                                 }
1246                                 m, ok := d["m"]
1247                                 if !ok {
1248                                         err = errors.New("handshake missing m item")
1249                                         break
1250                                 }
1251                                 mTyped, ok := m.(map[string]interface{})
1252                                 if !ok {
1253                                         err = errors.New("handshake m value is not dict")
1254                                         break
1255                                 }
1256                                 if c.PeerExtensionIDs == nil {
1257                                         c.PeerExtensionIDs = make(map[string]byte, len(mTyped))
1258                                 }
1259                                 for name, v := range mTyped {
1260                                         id, ok := v.(int64)
1261                                         if !ok {
1262                                                 log.Printf("bad handshake m item extension ID type: %T", v)
1263                                                 continue
1264                                         }
1265                                         if id == 0 {
1266                                                 delete(c.PeerExtensionIDs, name)
1267                                         } else {
1268                                                 if c.PeerExtensionIDs[name] == 0 {
1269                                                         supportedExtensionMessages.Add(name, 1)
1270                                                 }
1271                                                 c.PeerExtensionIDs[name] = byte(id)
1272                                         }
1273                                 }
1274                                 metadata_sizeUntyped, ok := d["metadata_size"]
1275                                 if ok {
1276                                         metadata_size, ok := metadata_sizeUntyped.(int64)
1277                                         if !ok {
1278                                                 log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
1279                                         } else {
1280                                                 t.setMetadataSize(metadata_size, me)
1281                                         }
1282                                 }
1283                                 if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
1284                                         me.requestPendingMetadata(t, c)
1285                                 }
1286                         case metadataExtendedId:
1287                                 err = me.gotMetadataExtensionMsg(msg.ExtendedPayload, t, c)
1288                                 if err != nil {
1289                                         err = fmt.Errorf("error handling metadata extension message: %s", err)
1290                                 }
1291                         case pexExtendedId:
1292                                 if me.config.DisablePEX {
1293                                         break
1294                                 }
1295                                 var pexMsg peerExchangeMessage
1296                                 err = bencode.Unmarshal(msg.ExtendedPayload, &pexMsg)
1297                                 if err != nil {
1298                                         err = fmt.Errorf("error unmarshalling PEX message: %s", err)
1299                                         break
1300                                 }
1301                                 go func() {
1302                                         me.mu.Lock()
1303                                         me.addPeers(t, func() (ret []Peer) {
1304                                                 for i, cp := range pexMsg.Added {
1305                                                         p := Peer{
1306                                                                 IP:     make([]byte, 4),
1307                                                                 Port:   cp.Port,
1308                                                                 Source: peerSourcePEX,
1309                                                         }
1310                                                         if i < len(pexMsg.AddedFlags) && pexMsg.AddedFlags[i]&0x01 != 0 {
1311                                                                 p.SupportsEncryption = true
1312                                                         }
1313                                                         missinggo.CopyExact(p.IP, cp.IP[:])
1314                                                         ret = append(ret, p)
1315                                                 }
1316                                                 return
1317                                         }())
1318                                         me.mu.Unlock()
1319                                 }()
1320                         default:
1321                                 err = fmt.Errorf("unexpected extended message ID: %v", msg.ExtendedID)
1322                         }
1323                         if err != nil {
1324                                 // That client uses its own extension IDs for outgoing message
1325                                 // types, which is incorrect.
1326                                 if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) ||
1327                                         strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
1328                                         return nil
1329                                 }
1330                         }
1331                 case pp.Port:
1332                         if me.dHT == nil {
1333                                 break
1334                         }
1335                         pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
1336                         if err != nil {
1337                                 panic(err)
1338                         }
1339                         if msg.Port != 0 {
1340                                 pingAddr.Port = int(msg.Port)
1341                         }
1342                         me.dHT.Ping(pingAddr)
1343                 default:
1344                         err = fmt.Errorf("received unknown message type: %#v", msg.Type)
1345                 }
1346                 if err != nil {
1347                         return err
1348                 }
1349         }
1350 }
1351
1352 // Returns true if connection is removed from torrent.Conns.
1353 func (me *Client) deleteConnection(t *Torrent, c *connection) bool {
1354         for i0, _c := range t.conns {
1355                 if _c != c {
1356                         continue
1357                 }
1358                 i1 := len(t.conns) - 1
1359                 if i0 != i1 {
1360                         t.conns[i0] = t.conns[i1]
1361                 }
1362                 t.conns = t.conns[:i1]
1363                 return true
1364         }
1365         return false
1366 }
1367
1368 func (me *Client) dropConnection(t *Torrent, c *connection) {
1369         me.event.Broadcast()
1370         c.Close()
1371         if me.deleteConnection(t, c) {
1372                 me.openNewConns(t)
1373         }
1374 }
1375
1376 // Returns true if the connection is added.
1377 func (me *Client) addConnection(t *Torrent, c *connection) bool {
1378         if me.closed.IsSet() {
1379                 return false
1380         }
1381         select {
1382         case <-t.ceasingNetworking:
1383                 return false
1384         default:
1385         }
1386         if !me.wantConns(t) {
1387                 return false
1388         }
1389         for _, c0 := range t.conns {
1390                 if c.PeerID == c0.PeerID {
1391                         // Already connected to a client with that ID.
1392                         duplicateClientConns.Add(1)
1393                         return false
1394                 }
1395         }
1396         if len(t.conns) >= socketsPerTorrent {
1397                 c := t.worstBadConn(me)
1398                 if c == nil {
1399                         return false
1400                 }
1401                 if me.config.Debug && missinggo.CryHeard() {
1402                         log.Printf("%s: dropping connection to make room for new one:\n    %s", t, c)
1403                 }
1404                 c.Close()
1405                 me.deleteConnection(t, c)
1406         }
1407         if len(t.conns) >= socketsPerTorrent {
1408                 panic(len(t.conns))
1409         }
1410         t.conns = append(t.conns, c)
1411         c.t = t
1412         return true
1413 }
1414
1415 func (t *Torrent) readerPieces() (ret bitmap.Bitmap) {
1416         t.forReaderOffsetPieces(func(begin, end int) bool {
1417                 ret.AddRange(begin, end)
1418                 return true
1419         })
1420         return
1421 }
1422
1423 func (t *Torrent) needData() bool {
1424         if !t.haveInfo() {
1425                 return true
1426         }
1427         if t.pendingPieces.Len() != 0 {
1428                 return true
1429         }
1430         return !t.readerPieces().IterTyped(func(piece int) bool {
1431                 return t.pieceComplete(piece)
1432         })
1433 }
1434
1435 func (cl *Client) usefulConn(t *Torrent, c *connection) bool {
1436         if c.closed.IsSet() {
1437                 return false
1438         }
1439         if !t.haveInfo() {
1440                 return c.supportsExtension("ut_metadata")
1441         }
1442         if cl.seeding(t) {
1443                 return c.PeerInterested
1444         }
1445         return t.connHasWantedPieces(c)
1446 }
1447
1448 func (me *Client) wantConns(t *Torrent) bool {
1449         if !me.seeding(t) && !t.needData() {
1450                 return false
1451         }
1452         if len(t.conns) < socketsPerTorrent {
1453                 return true
1454         }
1455         return t.worstBadConn(me) != nil
1456 }
1457
1458 func (me *Client) openNewConns(t *Torrent) {
1459         select {
1460         case <-t.ceasingNetworking:
1461                 return
1462         default:
1463         }
1464         for len(t.peers) != 0 {
1465                 if !me.wantConns(t) {
1466                         return
1467                 }
1468                 if len(t.halfOpen) >= me.halfOpenLimit {
1469                         return
1470                 }
1471                 var (
1472                         k peersKey
1473                         p Peer
1474                 )
1475                 for k, p = range t.peers {
1476                         break
1477                 }
1478                 delete(t.peers, k)
1479                 me.initiateConn(p, t)
1480         }
1481         t.wantPeers.Broadcast()
1482 }
1483
1484 func (me *Client) addPeers(t *Torrent, peers []Peer) {
1485         for _, p := range peers {
1486                 if me.dopplegangerAddr(net.JoinHostPort(
1487                         p.IP.String(),
1488                         strconv.FormatInt(int64(p.Port), 10),
1489                 )) {
1490                         continue
1491                 }
1492                 if _, ok := me.ipBlockRange(p.IP); ok {
1493                         continue
1494                 }
1495                 if p.Port == 0 {
1496                         // The spec says to scrub these yourselves. Fine.
1497                         continue
1498                 }
1499                 t.addPeer(p, me)
1500         }
1501 }
1502
1503 func (cl *Client) cachedMetaInfoFilename(ih metainfo.Hash) string {
1504         return filepath.Join(cl.configDir(), "torrents", ih.HexString()+".torrent")
1505 }
1506
1507 func (cl *Client) saveTorrentFile(t *Torrent) error {
1508         path := cl.cachedMetaInfoFilename(t.infoHash)
1509         os.MkdirAll(filepath.Dir(path), 0777)
1510         f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
1511         if err != nil {
1512                 return fmt.Errorf("error opening file: %s", err)
1513         }
1514         defer f.Close()
1515         e := bencode.NewEncoder(f)
1516         err = e.Encode(t.metainfo())
1517         if err != nil {
1518                 return fmt.Errorf("error marshalling metainfo: %s", err)
1519         }
1520         mi, err := cl.torrentCacheMetaInfo(t.infoHash)
1521         if err != nil {
1522                 // For example, a script kiddy makes us load too many files, and we're
1523                 // able to save the torrent, but not load it again to check it.
1524                 return nil
1525         }
1526         if !bytes.Equal(mi.Info.Hash.Bytes(), t.infoHash[:]) {
1527                 log.Fatalf("%x != %x", mi.Info.Hash, t.infoHash[:])
1528         }
1529         return nil
1530 }
1531
1532 func (cl *Client) setMetaData(t *Torrent, md *metainfo.Info, bytes []byte) (err error) {
1533         err = t.setMetadata(md, bytes)
1534         if err != nil {
1535                 return
1536         }
1537         if !cl.config.DisableMetainfoCache {
1538                 if err := cl.saveTorrentFile(t); err != nil {
1539                         log.Printf("error saving torrent file for %s: %s", t, err)
1540                 }
1541         }
1542         cl.event.Broadcast()
1543         close(t.gotMetainfo)
1544         return
1545 }
1546
1547 // Prepare a Torrent without any attachment to a Client. That means we can
1548 // initialize fields all fields that don't require the Client without locking
1549 // it.
1550 func newTorrent(ih metainfo.Hash) (t *Torrent) {
1551         t = &Torrent{
1552                 infoHash:  ih,
1553                 chunkSize: defaultChunkSize,
1554                 peers:     make(map[peersKey]Peer),
1555
1556                 closing:           make(chan struct{}),
1557                 ceasingNetworking: make(chan struct{}),
1558
1559                 gotMetainfo: make(chan struct{}),
1560
1561                 halfOpen:          make(map[string]struct{}),
1562                 pieceStateChanges: pubsub.NewPubSub(),
1563         }
1564         return
1565 }
1566
1567 func init() {
1568         // For shuffling the tracker tiers.
1569         mathRand.Seed(time.Now().Unix())
1570 }
1571
1572 type trackerTier []string
1573
1574 // The trackers within each tier must be shuffled before use.
1575 // http://stackoverflow.com/a/12267471/149482
1576 // http://www.bittorrent.org/beps/bep_0012.html#order-of-processing
1577 func shuffleTier(tier trackerTier) {
1578         for i := range tier {
1579                 j := mathRand.Intn(i + 1)
1580                 tier[i], tier[j] = tier[j], tier[i]
1581         }
1582 }
1583
1584 func copyTrackers(base []trackerTier) (copy []trackerTier) {
1585         for _, tier := range base {
1586                 copy = append(copy, append(trackerTier(nil), tier...))
1587         }
1588         return
1589 }
1590
1591 func mergeTier(tier trackerTier, newURLs []string) trackerTier {
1592 nextURL:
1593         for _, url := range newURLs {
1594                 for _, trURL := range tier {
1595                         if trURL == url {
1596                                 continue nextURL
1597                         }
1598                 }
1599                 tier = append(tier, url)
1600         }
1601         return tier
1602 }
1603
1604 func (t *Torrent) addTrackers(announceList [][]string) {
1605         newTrackers := copyTrackers(t.trackers)
1606         for tierIndex, tier := range announceList {
1607                 if tierIndex < len(newTrackers) {
1608                         newTrackers[tierIndex] = mergeTier(newTrackers[tierIndex], tier)
1609                 } else {
1610                         newTrackers = append(newTrackers, mergeTier(nil, tier))
1611                 }
1612                 shuffleTier(newTrackers[tierIndex])
1613         }
1614         t.trackers = newTrackers
1615 }
1616
1617 // Don't call this before the info is available.
1618 func (t *Torrent) bytesCompleted() int64 {
1619         if !t.haveInfo() {
1620                 return 0
1621         }
1622         return t.info.TotalLength() - t.bytesLeft()
1623 }
1624
1625 // A file-like handle to some torrent data resource.
1626 type Handle interface {
1627         io.Reader
1628         io.Seeker
1629         io.Closer
1630         io.ReaderAt
1631 }
1632
1633 // Returns handles to the files in the torrent. This requires the metainfo is
1634 // available first.
1635 func (t *Torrent) Files() (ret []File) {
1636         t.cl.mu.Lock()
1637         info := t.Info()
1638         t.cl.mu.Unlock()
1639         if info == nil {
1640                 return
1641         }
1642         var offset int64
1643         for _, fi := range info.UpvertedFiles() {
1644                 ret = append(ret, File{
1645                         t,
1646                         strings.Join(append([]string{info.Name}, fi.Path...), "/"),
1647                         offset,
1648                         fi.Length,
1649                         fi,
1650                 })
1651                 offset += fi.Length
1652         }
1653         return
1654 }
1655
1656 func (t *Torrent) AddPeers(pp []Peer) error {
1657         cl := t.cl
1658         cl.mu.Lock()
1659         defer cl.mu.Unlock()
1660         cl.addPeers(t, pp)
1661         return nil
1662 }
1663
1664 // Marks the entire torrent for download. Requires the info first, see
1665 // GotInfo.
1666 func (t *Torrent) DownloadAll() {
1667         t.cl.mu.Lock()
1668         defer t.cl.mu.Unlock()
1669         t.pendPieceRange(0, t.numPieces())
1670 }
1671
1672 // Returns nil metainfo if it isn't in the cache. Checks that the retrieved
1673 // metainfo has the correct infohash.
1674 func (cl *Client) torrentCacheMetaInfo(ih metainfo.Hash) (mi *metainfo.MetaInfo, err error) {
1675         if cl.config.DisableMetainfoCache {
1676                 return
1677         }
1678         f, err := os.Open(cl.cachedMetaInfoFilename(ih))
1679         if err != nil {
1680                 if os.IsNotExist(err) {
1681                         err = nil
1682                 }
1683                 return
1684         }
1685         defer f.Close()
1686         dec := bencode.NewDecoder(f)
1687         err = dec.Decode(&mi)
1688         if err != nil {
1689                 return
1690         }
1691         if !bytes.Equal(mi.Info.Hash.Bytes(), ih[:]) {
1692                 err = fmt.Errorf("cached torrent has wrong infohash: %x != %x", mi.Info.Hash, ih[:])
1693                 return
1694         }
1695         return
1696 }
1697
1698 // Specifies a new torrent for adding to a client. There are helpers for
1699 // magnet URIs and torrent metainfo files.
1700 type TorrentSpec struct {
1701         // The tiered tracker URIs.
1702         Trackers [][]string
1703         InfoHash metainfo.Hash
1704         Info     *metainfo.InfoEx
1705         // The name to use if the Name field from the Info isn't available.
1706         DisplayName string
1707         // The chunk size to use for outbound requests. Defaults to 16KiB if not
1708         // set.
1709         ChunkSize int
1710         Storage   storage.I
1711 }
1712
1713 func TorrentSpecFromMagnetURI(uri string) (spec *TorrentSpec, err error) {
1714         m, err := metainfo.ParseMagnetURI(uri)
1715         if err != nil {
1716                 return
1717         }
1718         spec = &TorrentSpec{
1719                 Trackers:    [][]string{m.Trackers},
1720                 DisplayName: m.DisplayName,
1721                 InfoHash:    m.InfoHash,
1722         }
1723         return
1724 }
1725
1726 func TorrentSpecFromMetaInfo(mi *metainfo.MetaInfo) (spec *TorrentSpec) {
1727         spec = &TorrentSpec{
1728                 Trackers:    mi.AnnounceList,
1729                 Info:        &mi.Info,
1730                 DisplayName: mi.Info.Name,
1731         }
1732
1733         if len(spec.Trackers) == 0 {
1734                 spec.Trackers = [][]string{[]string{mi.Announce}}
1735         } else {
1736                 spec.Trackers[0] = append(spec.Trackers[0], mi.Announce)
1737         }
1738
1739         missinggo.CopyExact(&spec.InfoHash, mi.Info.Hash)
1740         return
1741 }
1742
1743 // Add or merge a torrent spec. If the torrent is already present, the
1744 // trackers will be merged with the existing ones. If the Info isn't yet
1745 // known, it will be set. The display name is replaced if the new spec
1746 // provides one. Returns new if the torrent wasn't already in the client.
1747 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
1748         cl.mu.Lock()
1749         defer cl.mu.Unlock()
1750
1751         t, ok := cl.torrents[spec.InfoHash]
1752         if !ok {
1753                 new = true
1754
1755                 // TODO: This doesn't belong in the core client, it's more of a
1756                 // helper.
1757                 if _, ok := cl.bannedTorrents[spec.InfoHash]; ok {
1758                         err = errors.New("banned torrent")
1759                         return
1760                 }
1761                 // TODO: Tidy this up?
1762                 t = newTorrent(spec.InfoHash)
1763                 t.cl = cl
1764                 t.wantPeers.L = &cl.mu
1765                 if spec.ChunkSize != 0 {
1766                         t.chunkSize = pp.Integer(spec.ChunkSize)
1767                 }
1768                 t.storageOpener = spec.Storage
1769                 if t.storageOpener == nil {
1770                         t.storageOpener = cl.defaultStorage
1771                 }
1772         }
1773         if spec.DisplayName != "" {
1774                 t.setDisplayName(spec.DisplayName)
1775         }
1776         // Try to merge in info we have on the torrent. Any err left will
1777         // terminate the function.
1778         if t.info == nil {
1779                 if spec.Info != nil {
1780                         err = cl.setMetaData(t, &spec.Info.Info, spec.Info.Bytes)
1781                 } else {
1782                         var mi *metainfo.MetaInfo
1783                         mi, err = cl.torrentCacheMetaInfo(spec.InfoHash)
1784                         if err != nil {
1785                                 log.Printf("error getting cached metainfo: %s", err)
1786                                 err = nil
1787                         } else if mi != nil {
1788                                 t.addTrackers(mi.AnnounceList)
1789                                 err = cl.setMetaData(t, &mi.Info.Info, mi.Info.Bytes)
1790                         }
1791                 }
1792         }
1793         if err != nil {
1794                 return
1795         }
1796         t.addTrackers(spec.Trackers)
1797
1798         cl.torrents[spec.InfoHash] = t
1799         t.maybeNewConns()
1800
1801         // From this point onwards, we can consider the torrent a part of the
1802         // client.
1803         if new {
1804                 if !cl.config.DisableTrackers {
1805                         go cl.announceTorrentTrackers(t)
1806                 }
1807                 if cl.dHT != nil {
1808                         go cl.announceTorrentDHT(t, true)
1809                 }
1810         }
1811         return
1812 }
1813
1814 func (me *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
1815         t, ok := me.torrents[infoHash]
1816         if !ok {
1817                 err = fmt.Errorf("no such torrent")
1818                 return
1819         }
1820         err = t.close()
1821         if err != nil {
1822                 panic(err)
1823         }
1824         delete(me.torrents, infoHash)
1825         return
1826 }
1827
1828 // Returns true when peers are required, or false if the torrent is closing.
1829 func (cl *Client) waitWantPeers(t *Torrent) bool {
1830         cl.mu.Lock()
1831         defer cl.mu.Unlock()
1832         for {
1833                 select {
1834                 case <-t.ceasingNetworking:
1835                         return false
1836                 default:
1837                 }
1838                 if len(t.peers) > torrentPeersLowWater {
1839                         goto wait
1840                 }
1841                 if t.needData() || cl.seeding(t) {
1842                         return true
1843                 }
1844         wait:
1845                 t.wantPeers.Wait()
1846         }
1847 }
1848
1849 // Returns whether the client should make effort to seed the torrent.
1850 func (cl *Client) seeding(t *Torrent) bool {
1851         if cl.config.NoUpload {
1852                 return false
1853         }
1854         if !cl.config.Seed {
1855                 return false
1856         }
1857         if t.needData() {
1858                 return false
1859         }
1860         return true
1861 }
1862
1863 func (cl *Client) announceTorrentDHT(t *Torrent, impliedPort bool) {
1864         for cl.waitWantPeers(t) {
1865                 // log.Printf("getting peers for %q from DHT", t)
1866                 ps, err := cl.dHT.Announce(string(t.infoHash[:]), cl.incomingPeerPort(), impliedPort)
1867                 if err != nil {
1868                         log.Printf("error getting peers from dht: %s", err)
1869                         return
1870                 }
1871                 // Count all the unique addresses we got during this announce.
1872                 allAddrs := make(map[string]struct{})
1873         getPeers:
1874                 for {
1875                         select {
1876                         case v, ok := <-ps.Peers:
1877                                 if !ok {
1878                                         break getPeers
1879                                 }
1880                                 addPeers := make([]Peer, 0, len(v.Peers))
1881                                 for _, cp := range v.Peers {
1882                                         if cp.Port == 0 {
1883                                                 // Can't do anything with this.
1884                                                 continue
1885                                         }
1886                                         addPeers = append(addPeers, Peer{
1887                                                 IP:     cp.IP[:],
1888                                                 Port:   cp.Port,
1889                                                 Source: peerSourceDHT,
1890                                         })
1891                                         key := (&net.UDPAddr{
1892                                                 IP:   cp.IP[:],
1893                                                 Port: cp.Port,
1894                                         }).String()
1895                                         allAddrs[key] = struct{}{}
1896                                 }
1897                                 cl.mu.Lock()
1898                                 cl.addPeers(t, addPeers)
1899                                 numPeers := len(t.peers)
1900                                 cl.mu.Unlock()
1901                                 if numPeers >= torrentPeersHighWater {
1902                                         break getPeers
1903                                 }
1904                         case <-t.ceasingNetworking:
1905                                 ps.Close()
1906                                 return
1907                         }
1908                 }
1909                 ps.Close()
1910                 // log.Printf("finished DHT peer scrape for %s: %d peers", t, len(allAddrs))
1911         }
1912 }
1913
1914 func (cl *Client) trackerBlockedUnlocked(trRawURL string) (blocked bool, err error) {
1915         url_, err := url.Parse(trRawURL)
1916         if err != nil {
1917                 return
1918         }
1919         host, _, err := net.SplitHostPort(url_.Host)
1920         if err != nil {
1921                 host = url_.Host
1922         }
1923         addr, err := net.ResolveIPAddr("ip", host)
1924         if err != nil {
1925                 return
1926         }
1927         cl.mu.RLock()
1928         _, blocked = cl.ipBlockRange(addr.IP)
1929         cl.mu.RUnlock()
1930         return
1931 }
1932
1933 func (cl *Client) announceTorrentSingleTracker(tr string, req *tracker.AnnounceRequest, t *Torrent) error {
1934         blocked, err := cl.trackerBlockedUnlocked(tr)
1935         if err != nil {
1936                 return fmt.Errorf("error determining if tracker blocked: %s", err)
1937         }
1938         if blocked {
1939                 return fmt.Errorf("tracker blocked: %s", tr)
1940         }
1941         resp, err := tracker.Announce(tr, req)
1942         if err != nil {
1943                 return fmt.Errorf("error announcing: %s", err)
1944         }
1945         var peers []Peer
1946         for _, peer := range resp.Peers {
1947                 peers = append(peers, Peer{
1948                         IP:   peer.IP,
1949                         Port: peer.Port,
1950                 })
1951         }
1952         cl.mu.Lock()
1953         cl.addPeers(t, peers)
1954         cl.mu.Unlock()
1955
1956         // log.Printf("%s: %d new peers from %s", t, len(peers), tr)
1957
1958         time.Sleep(time.Second * time.Duration(resp.Interval))
1959         return nil
1960 }
1961
1962 func (cl *Client) announceTorrentTrackersFastStart(req *tracker.AnnounceRequest, trackers []trackerTier, t *Torrent) (atLeastOne bool) {
1963         oks := make(chan bool)
1964         outstanding := 0
1965         for _, tier := range trackers {
1966                 for _, tr := range tier {
1967                         outstanding++
1968                         go func(tr string) {
1969                                 err := cl.announceTorrentSingleTracker(tr, req, t)
1970                                 oks <- err == nil
1971                         }(tr)
1972                 }
1973         }
1974         for outstanding > 0 {
1975                 ok := <-oks
1976                 outstanding--
1977                 if ok {
1978                         atLeastOne = true
1979                 }
1980         }
1981         return
1982 }
1983
1984 // Announce torrent to its trackers.
1985 func (cl *Client) announceTorrentTrackers(t *Torrent) {
1986         req := tracker.AnnounceRequest{
1987                 Event:    tracker.Started,
1988                 NumWant:  -1,
1989                 Port:     uint16(cl.incomingPeerPort()),
1990                 PeerId:   cl.peerID,
1991                 InfoHash: t.infoHash,
1992         }
1993         if !cl.waitWantPeers(t) {
1994                 return
1995         }
1996         cl.mu.RLock()
1997         req.Left = t.bytesLeftAnnounce()
1998         trackers := t.trackers
1999         cl.mu.RUnlock()
2000         if cl.announceTorrentTrackersFastStart(&req, trackers, t) {
2001                 req.Event = tracker.None
2002         }
2003 newAnnounce:
2004         for cl.waitWantPeers(t) {
2005                 cl.mu.RLock()
2006                 req.Left = t.bytesLeftAnnounce()
2007                 trackers = t.trackers
2008                 cl.mu.RUnlock()
2009                 numTrackersTried := 0
2010                 for _, tier := range trackers {
2011                         for trIndex, tr := range tier {
2012                                 numTrackersTried++
2013                                 err := cl.announceTorrentSingleTracker(tr, &req, t)
2014                                 if err != nil {
2015                                         continue
2016                                 }
2017                                 // Float the successful announce to the top of the tier. If
2018                                 // the trackers list has been changed, we'll be modifying an
2019                                 // old copy so it won't matter.
2020                                 cl.mu.Lock()
2021                                 tier[0], tier[trIndex] = tier[trIndex], tier[0]
2022                                 cl.mu.Unlock()
2023
2024                                 req.Event = tracker.None
2025                                 continue newAnnounce
2026                         }
2027                 }
2028                 if numTrackersTried != 0 {
2029                         log.Printf("%s: all trackers failed", t)
2030                 }
2031                 // TODO: Wait until trackers are added if there are none.
2032                 time.Sleep(10 * time.Second)
2033         }
2034 }
2035
2036 func (cl *Client) allTorrentsCompleted() bool {
2037         for _, t := range cl.torrents {
2038                 if !t.haveInfo() {
2039                         return false
2040                 }
2041                 if t.numPiecesCompleted() != t.numPieces() {
2042                         return false
2043                 }
2044         }
2045         return true
2046 }
2047
2048 // Returns true when all torrents are completely downloaded and false if the
2049 // client is stopped before that.
2050 func (me *Client) WaitAll() bool {
2051         me.mu.Lock()
2052         defer me.mu.Unlock()
2053         for !me.allTorrentsCompleted() {
2054                 if me.closed.IsSet() {
2055                         return false
2056                 }
2057                 me.event.Wait()
2058         }
2059         return true
2060 }
2061
2062 // Handle a received chunk from a peer.
2063 func (me *Client) downloadedChunk(t *Torrent, c *connection, msg *pp.Message) {
2064         chunksReceived.Add(1)
2065
2066         req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
2067
2068         // Request has been satisfied.
2069         if me.connDeleteRequest(t, c, req) {
2070                 defer c.updateRequests()
2071         } else {
2072                 unexpectedChunksReceived.Add(1)
2073         }
2074
2075         index := int(req.Index)
2076         piece := &t.pieces[index]
2077
2078         // Do we actually want this chunk?
2079         if !t.wantChunk(req) {
2080                 unwantedChunksReceived.Add(1)
2081                 c.UnwantedChunksReceived++
2082                 return
2083         }
2084
2085         c.UsefulChunksReceived++
2086         c.lastUsefulChunkReceived = time.Now()
2087
2088         me.upload(t, c)
2089
2090         // Need to record that it hasn't been written yet, before we attempt to do
2091         // anything with it.
2092         piece.incrementPendingWrites()
2093         // Record that we have the chunk.
2094         piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
2095
2096         // Cancel pending requests for this chunk.
2097         for _, c := range t.conns {
2098                 if me.connCancel(t, c, req) {
2099                         c.updateRequests()
2100                 }
2101         }
2102
2103         me.mu.Unlock()
2104         // Write the chunk out.
2105         err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
2106         me.mu.Lock()
2107
2108         piece.decrementPendingWrites()
2109
2110         if err != nil {
2111                 log.Printf("%s: error writing chunk %v: %s", t, req, err)
2112                 t.pendRequest(req)
2113                 t.updatePieceCompletion(int(msg.Index))
2114                 return
2115         }
2116
2117         // It's important that the piece is potentially queued before we check if
2118         // the piece is still wanted, because if it is queued, it won't be wanted.
2119         if t.pieceAllDirty(index) {
2120                 me.queuePieceCheck(t, int(req.Index))
2121         }
2122
2123         if c.peerTouchedPieces == nil {
2124                 c.peerTouchedPieces = make(map[int]struct{})
2125         }
2126         c.peerTouchedPieces[index] = struct{}{}
2127
2128         me.event.Broadcast()
2129         t.publishPieceChange(int(req.Index))
2130         return
2131 }
2132
2133 // Return the connections that touched a piece, and clear the entry while
2134 // doing it.
2135 func (me *Client) reapPieceTouches(t *Torrent, piece int) (ret []*connection) {
2136         for _, c := range t.conns {
2137                 if _, ok := c.peerTouchedPieces[piece]; ok {
2138                         ret = append(ret, c)
2139                         delete(c.peerTouchedPieces, piece)
2140                 }
2141         }
2142         return
2143 }
2144
2145 func (me *Client) pieceHashed(t *Torrent, piece int, correct bool) {
2146         p := &t.pieces[piece]
2147         if p.EverHashed {
2148                 // Don't score the first time a piece is hashed, it could be an
2149                 // initial check.
2150                 if correct {
2151                         pieceHashedCorrect.Add(1)
2152                 } else {
2153                         log.Printf("%s: piece %d (%x) failed hash", t, piece, p.Hash)
2154                         pieceHashedNotCorrect.Add(1)
2155                 }
2156         }
2157         p.EverHashed = true
2158         touchers := me.reapPieceTouches(t, piece)
2159         if correct {
2160                 err := p.Storage().MarkComplete()
2161                 if err != nil {
2162                         log.Printf("%T: error completing piece %d: %s", t.storage, piece, err)
2163                 }
2164                 t.updatePieceCompletion(piece)
2165         } else if len(touchers) != 0 {
2166                 log.Printf("dropping %d conns that touched piece", len(touchers))
2167                 for _, c := range touchers {
2168                         me.dropConnection(t, c)
2169                 }
2170         }
2171         me.pieceChanged(t, piece)
2172 }
2173
2174 func (me *Client) onCompletedPiece(t *Torrent, piece int) {
2175         t.pendingPieces.Remove(piece)
2176         t.pendAllChunkSpecs(piece)
2177         for _, conn := range t.conns {
2178                 conn.Have(piece)
2179                 for r := range conn.Requests {
2180                         if int(r.Index) == piece {
2181                                 conn.Cancel(r)
2182                         }
2183                 }
2184                 // Could check here if peer doesn't have piece, but due to caching
2185                 // some peers may have said they have a piece but they don't.
2186                 me.upload(t, conn)
2187         }
2188 }
2189
2190 func (me *Client) onFailedPiece(t *Torrent, piece int) {
2191         if t.pieceAllDirty(piece) {
2192                 t.pendAllChunkSpecs(piece)
2193         }
2194         if !t.wantPiece(piece) {
2195                 return
2196         }
2197         me.openNewConns(t)
2198         for _, conn := range t.conns {
2199                 if conn.PeerHasPiece(piece) {
2200                         conn.updateRequests()
2201                 }
2202         }
2203 }
2204
2205 func (me *Client) pieceChanged(t *Torrent, piece int) {
2206         correct := t.pieceComplete(piece)
2207         defer me.event.Broadcast()
2208         if correct {
2209                 me.onCompletedPiece(t, piece)
2210         } else {
2211                 me.onFailedPiece(t, piece)
2212         }
2213         if t.updatePiecePriority(piece) {
2214                 t.piecePriorityChanged(piece)
2215         }
2216         t.publishPieceChange(piece)
2217 }
2218
2219 func (cl *Client) verifyPiece(t *Torrent, piece int) {
2220         cl.mu.Lock()
2221         defer cl.mu.Unlock()
2222         p := &t.pieces[piece]
2223         for p.Hashing || t.storage == nil {
2224                 cl.event.Wait()
2225         }
2226         p.QueuedForHash = false
2227         if t.isClosed() || t.pieceComplete(piece) {
2228                 t.updatePiecePriority(piece)
2229                 t.publishPieceChange(piece)
2230                 return
2231         }
2232         p.Hashing = true
2233         t.publishPieceChange(piece)
2234         cl.mu.Unlock()
2235         sum := t.hashPiece(piece)
2236         cl.mu.Lock()
2237         select {
2238         case <-t.closing:
2239                 return
2240         default:
2241         }
2242         p.Hashing = false
2243         cl.pieceHashed(t, piece, sum == p.Hash)
2244 }
2245
2246 // Returns handles to all the torrents loaded in the Client.
2247 func (me *Client) Torrents() (ret []*Torrent) {
2248         me.mu.Lock()
2249         for _, t := range me.torrents {
2250                 ret = append(ret, t)
2251         }
2252         me.mu.Unlock()
2253         return
2254 }
2255
2256 func (me *Client) AddMagnet(uri string) (T *Torrent, err error) {
2257         spec, err := TorrentSpecFromMagnetURI(uri)
2258         if err != nil {
2259                 return
2260         }
2261         T, _, err = me.AddTorrentSpec(spec)
2262         return
2263 }
2264
2265 func (me *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
2266         T, _, err = me.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
2267         var ss []string
2268         missinggo.CastSlice(&ss, mi.Nodes)
2269         me.AddDHTNodes(ss)
2270         return
2271 }
2272
2273 func (me *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
2274         mi, err := metainfo.LoadFromFile(filename)
2275         if err != nil {
2276                 return
2277         }
2278         return me.AddTorrent(mi)
2279 }
2280
2281 func (me *Client) DHT() *dht.Server {
2282         return me.dHT
2283 }
2284
2285 func (me *Client) AddDHTNodes(nodes []string) {
2286         for _, n := range nodes {
2287                 hmp := missinggo.SplitHostMaybePort(n)
2288                 ip := net.ParseIP(hmp.Host)
2289                 if ip == nil {
2290                         log.Printf("won't add DHT node with bad IP: %q", hmp.Host)
2291                         continue
2292                 }
2293                 ni := dht.NodeInfo{
2294                         Addr: dht.NewAddr(&net.UDPAddr{
2295                                 IP:   ip,
2296                                 Port: hmp.Port,
2297                         }),
2298                 }
2299                 me.DHT().AddNode(ni)
2300         }
2301 }