]> Sergey Matveev's repositories - btrtrc.git/blob - client.go
3eef1580cecc54fb7e677d8abab76295736894db
[btrtrc.git] / client.go
1 package torrent
2
3 import (
4         "bufio"
5         "bytes"
6         "container/heap"
7         "crypto/rand"
8         "crypto/sha1"
9         "encoding/hex"
10         "errors"
11         "expvar"
12         "fmt"
13         "io"
14         "log"
15         "math/big"
16         mathRand "math/rand"
17         "net"
18         "net/url"
19         "os"
20         "path/filepath"
21         "sort"
22         "strconv"
23         "strings"
24         "syscall"
25         "time"
26
27         "github.com/anacrolix/sync"
28         "github.com/anacrolix/utp"
29         "github.com/bradfitz/iter"
30
31         "github.com/anacrolix/torrent/bencode"
32         "github.com/anacrolix/torrent/data"
33         filePkg "github.com/anacrolix/torrent/data/file"
34         "github.com/anacrolix/torrent/dht"
35         "github.com/anacrolix/torrent/internal/pieceordering"
36         "github.com/anacrolix/torrent/iplist"
37         "github.com/anacrolix/torrent/logonce"
38         "github.com/anacrolix/torrent/metainfo"
39         "github.com/anacrolix/torrent/mse"
40         pp "github.com/anacrolix/torrent/peer_protocol"
41         "github.com/anacrolix/torrent/tracker"
42         . "github.com/anacrolix/torrent/util"
43 )
44
45 var (
46         unwantedChunksReceived   = expvar.NewInt("chunksReceivedUnwanted")
47         unexpectedChunksReceived = expvar.NewInt("chunksReceivedUnexpected")
48         chunksReceived           = expvar.NewInt("chunksReceived")
49         peersFoundByDHT          = expvar.NewInt("peersFoundByDHT")
50         peersFoundByPEX          = expvar.NewInt("peersFoundByPEX")
51         peersFoundByTracker      = expvar.NewInt("peersFoundByTracker")
52         uploadChunksPosted       = expvar.NewInt("uploadChunksPosted")
53         unexpectedCancels        = expvar.NewInt("unexpectedCancels")
54         postedCancels            = expvar.NewInt("postedCancels")
55         duplicateConnsAvoided    = expvar.NewInt("duplicateConnsAvoided")
56         failedPieceHashes        = expvar.NewInt("failedPieceHashes")
57         unsuccessfulDials        = expvar.NewInt("unsuccessfulDials")
58         successfulDials          = expvar.NewInt("successfulDials")
59         acceptedConns            = expvar.NewInt("acceptedConns")
60         inboundConnsBlocked      = expvar.NewInt("inboundConnsBlocked")
61         peerExtensions           = expvar.NewMap("peerExtensions")
62         // Count of connections to peer with same client ID.
63         connsToSelf = expvar.NewInt("connsToSelf")
64         // Number of completed connections to a client we're already connected with.
65         duplicateClientConns       = expvar.NewInt("duplicateClientConns")
66         receivedMessageTypes       = expvar.NewMap("receivedMessageTypes")
67         supportedExtensionMessages = expvar.NewMap("supportedExtensionMessages")
68 )
69
70 const (
71         // Justification for set bits follows.
72         //
73         // Extension protocol: http://www.bittorrent.org/beps/bep_0010.html ([5]|=0x10)
74         // DHT: http://www.bittorrent.org/beps/bep_0005.html ([7]|=1)
75         // Fast Extension:
76         //       http://bittorrent.org/beps/bep_0006.html ([7]|=4)
77         //   Disabled until AllowedFast is implemented
78         defaultExtensionBytes = "\x00\x00\x00\x00\x00\x10\x00\x01"
79
80         socketsPerTorrent     = 40
81         torrentPeersHighWater = 200
82         torrentPeersLowWater  = 50
83
84         // Limit how long handshake can take. This is to reduce the lingering
85         // impact of a few bad apples. 4s loses 1% of successful handshakes that
86         // are obtained with 60s timeout, and 5% of unsuccessful handshakes.
87         btHandshakeTimeout = 4 * time.Second
88         handshakesTimeout  = 20 * time.Second
89
90         pruneInterval = 10 * time.Second
91
92         metadataExtendedId = iota + 1 // 0 is reserved for deleting keys
93         pexExtendedId
94
95         extendedHandshakeClientVersion = "go.torrent dev 20140825"
96 )
97
98 // Currently doesn't really queue, but should in the future.
99 func (cl *Client) queuePieceCheck(t *torrent, pieceIndex pp.Integer) {
100         piece := t.Pieces[pieceIndex]
101         if piece.QueuedForHash {
102                 return
103         }
104         piece.QueuedForHash = true
105         go cl.verifyPiece(t, pieceIndex)
106 }
107
108 // Queue a piece check if one isn't already queued, and the piece has never
109 // been checked before.
110 func (cl *Client) queueFirstHash(t *torrent, piece int) {
111         p := t.Pieces[piece]
112         if p.EverHashed || p.Hashing || p.QueuedForHash || t.pieceComplete(piece) {
113                 return
114         }
115         cl.queuePieceCheck(t, pp.Integer(piece))
116 }
117
118 // Clients contain zero or more Torrents. A client manages a blocklist, the
119 // TCP/UDP protocol ports, and DHT as desired.
120 type Client struct {
121         halfOpenLimit  int
122         peerID         [20]byte
123         listeners      []net.Listener
124         utpSock        *utp.Socket
125         dHT            *dht.Server
126         ipBlockList    *iplist.IPList
127         bannedTorrents map[InfoHash]struct{}
128         config         Config
129         pruneTimer     *time.Timer
130         extensionBytes peerExtensionBytes
131         // Set of addresses that have our client ID. This intentionally will
132         // include ourselves if we end up trying to connect to our own address
133         // through legitimate channels.
134         dopplegangerAddrs map[string]struct{}
135
136         torrentDataOpener TorrentDataOpener
137
138         mu    sync.RWMutex
139         event sync.Cond
140         quit  chan struct{}
141
142         torrents map[InfoHash]*torrent
143 }
144
145 func (me *Client) IPBlockList() *iplist.IPList {
146         me.mu.Lock()
147         defer me.mu.Unlock()
148         return me.ipBlockList
149 }
150
151 func (me *Client) SetIPBlockList(list *iplist.IPList) {
152         me.mu.Lock()
153         defer me.mu.Unlock()
154         me.ipBlockList = list
155         if me.dHT != nil {
156                 me.dHT.SetIPBlockList(list)
157         }
158 }
159
160 func (me *Client) PeerID() string {
161         return string(me.peerID[:])
162 }
163
164 func (me *Client) ListenAddr() (addr net.Addr) {
165         for _, l := range me.listeners {
166                 addr = l.Addr()
167                 break
168         }
169         return
170 }
171
172 type hashSorter struct {
173         Hashes []InfoHash
174 }
175
176 func (me hashSorter) Len() int {
177         return len(me.Hashes)
178 }
179
180 func (me hashSorter) Less(a, b int) bool {
181         return (&big.Int{}).SetBytes(me.Hashes[a][:]).Cmp((&big.Int{}).SetBytes(me.Hashes[b][:])) < 0
182 }
183
184 func (me hashSorter) Swap(a, b int) {
185         me.Hashes[a], me.Hashes[b] = me.Hashes[b], me.Hashes[a]
186 }
187
188 func (cl *Client) sortedTorrents() (ret []*torrent) {
189         var hs hashSorter
190         for ih := range cl.torrents {
191                 hs.Hashes = append(hs.Hashes, ih)
192         }
193         sort.Sort(hs)
194         for _, ih := range hs.Hashes {
195                 ret = append(ret, cl.torrent(ih))
196         }
197         return
198 }
199
200 // Writes out a human readable status of the client, such as for writing to a
201 // HTTP status page.
202 func (cl *Client) WriteStatus(_w io.Writer) {
203         cl.mu.RLock()
204         defer cl.mu.RUnlock()
205         w := bufio.NewWriter(_w)
206         defer w.Flush()
207         if addr := cl.ListenAddr(); addr != nil {
208                 fmt.Fprintf(w, "Listening on %s\n", cl.ListenAddr())
209         } else {
210                 fmt.Fprintln(w, "Not listening!")
211         }
212         fmt.Fprintf(w, "Peer ID: %+q\n", cl.peerID)
213         if cl.dHT != nil {
214                 dhtStats := cl.dHT.Stats()
215                 fmt.Fprintf(w, "DHT nodes: %d (%d good)\n", dhtStats.Nodes, dhtStats.GoodNodes)
216                 fmt.Fprintf(w, "DHT Server ID: %x\n", cl.dHT.ID())
217                 fmt.Fprintf(w, "DHT port: %d\n", addrPort(cl.dHT.Addr()))
218                 fmt.Fprintf(w, "DHT announces: %d\n", dhtStats.ConfirmedAnnounces)
219                 fmt.Fprintf(w, "Outstanding transactions: %d\n", dhtStats.OutstandingTransactions)
220         }
221         fmt.Fprintf(w, "# Torrents: %d\n", len(cl.torrents))
222         fmt.Fprintln(w)
223         for _, t := range cl.sortedTorrents() {
224                 if t.Name() == "" {
225                         fmt.Fprint(w, "<unknown name>")
226                 } else {
227                         fmt.Fprint(w, t.Name())
228                 }
229                 fmt.Fprint(w, "\n")
230                 if t.haveInfo() {
231                         fmt.Fprintf(w, "%f%% of %d bytes", 100*(1-float32(t.bytesLeft())/float32(t.Length())), t.Length())
232                 } else {
233                         w.WriteString("<missing metainfo>")
234                 }
235                 fmt.Fprint(w, "\n")
236                 t.writeStatus(w, cl)
237                 fmt.Fprintln(w)
238         }
239 }
240
241 // A Data that implements this has a streaming interface that should be
242 // preferred over ReadAt. For example, the data is stored in blocks on the
243 // network and have a fixed cost to open.
244 type SectionOpener interface {
245         // Open a ReadCloser at the given offset into torrent data. n is how many
246         // bytes we intend to read.
247         OpenSection(off, n int64) (io.ReadCloser, error)
248 }
249
250 func dataReadAt(d data.Data, b []byte, off int64) (n int, err error) {
251         // defer func() {
252         //      if err == io.ErrUnexpectedEOF && n != 0 {
253         //              err = nil
254         //      }
255         // }()
256         // log.Println("data read at", len(b), off)
257 again:
258         if ra, ok := d.(io.ReaderAt); ok {
259                 return ra.ReadAt(b, off)
260         }
261         if so, ok := d.(SectionOpener); ok {
262                 var rc io.ReadCloser
263                 rc, err = so.OpenSection(off, int64(len(b)))
264                 if err != nil {
265                         return
266                 }
267                 defer rc.Close()
268                 return io.ReadFull(rc, b)
269         }
270         if dp, ok := super(d); ok {
271                 d = dp.(data.Data)
272                 goto again
273         }
274         panic(fmt.Sprintf("can't read from %T", d))
275 }
276
277 // Calculates the number of pieces to set to Readahead priority, after the
278 // Now, and Next pieces.
279 func readaheadPieces(readahead, pieceLength int64) (ret int) {
280         // Expand the readahead to fit any partial pieces. Subtract 1 for the
281         // "next" piece that is assigned.
282         ret = int((readahead+pieceLength-1)/pieceLength - 1)
283         // Lengthen the "readahead tail" to smooth blockiness that occurs when the
284         // piece length is much larger than the readahead.
285         if ret < 2 {
286                 ret++
287         }
288         return
289 }
290
291 func (cl *Client) readRaisePiecePriorities(t *torrent, off, readaheadBytes int64) {
292         index := int(off / int64(t.usualPieceSize()))
293         cl.raisePiecePriority(t, index, PiecePriorityNow)
294         index++
295         if index >= t.numPieces() {
296                 return
297         }
298         cl.raisePiecePriority(t, index, PiecePriorityNext)
299         for range iter.N(readaheadPieces(readaheadBytes, t.Info.PieceLength)) {
300                 index++
301                 if index >= t.numPieces() {
302                         break
303                 }
304                 cl.raisePiecePriority(t, index, PiecePriorityReadahead)
305         }
306 }
307
308 func (cl *Client) addUrgentRequests(t *torrent, off int64, n int) {
309         for n > 0 {
310                 req, ok := t.offsetRequest(off)
311                 if !ok {
312                         break
313                 }
314                 if _, ok := t.urgent[req]; !ok && !t.haveChunk(req) {
315                         if t.urgent == nil {
316                                 t.urgent = make(map[request]struct{}, (n+chunkSize-1)/chunkSize)
317                         }
318                         t.urgent[req] = struct{}{}
319                         cl.event.Broadcast() // Why?
320                         index := int(req.Index)
321                         cl.queueFirstHash(t, index)
322                         cl.pieceChanged(t, index)
323                 }
324                 reqOff := t.requestOffset(req)
325                 n1 := req.Length - pp.Integer(off-reqOff)
326                 off += int64(n1)
327                 n -= int(n1)
328         }
329         // log.Print(t.urgent)
330 }
331
332 func (cl *Client) configDir() string {
333         if cl.config.ConfigDir == "" {
334                 return filepath.Join(os.Getenv("HOME"), ".config/torrent")
335         }
336         return cl.config.ConfigDir
337 }
338
339 // The directory where the Client expects to find and store configuration
340 // data. Defaults to $HOME/.config/torrent.
341 func (cl *Client) ConfigDir() string {
342         return cl.configDir()
343 }
344
345 func (t *torrent) connPendPiece(c *connection, piece int) {
346         c.pendPiece(piece, t.Pieces[piece].Priority)
347 }
348
349 func (cl *Client) raisePiecePriority(t *torrent, piece int, priority piecePriority) {
350         if t.Pieces[piece].Priority < priority {
351                 cl.prioritizePiece(t, piece, priority)
352         }
353 }
354
355 func (cl *Client) prioritizePiece(t *torrent, piece int, priority piecePriority) {
356         if t.havePiece(piece) {
357                 return
358         }
359         if priority != PiecePriorityNone {
360                 cl.queueFirstHash(t, piece)
361         }
362         p := t.Pieces[piece]
363         if p.Priority != priority {
364                 p.Priority = priority
365                 cl.pieceChanged(t, piece)
366         }
367 }
368
369 func (cl *Client) setEnvBlocklist() (err error) {
370         filename := os.Getenv("TORRENT_BLOCKLIST_FILE")
371         defaultBlocklist := filename == ""
372         if defaultBlocklist {
373                 filename = filepath.Join(cl.configDir(), "blocklist")
374         }
375         f, err := os.Open(filename)
376         if err != nil {
377                 if defaultBlocklist {
378                         err = nil
379                 }
380                 return
381         }
382         defer f.Close()
383         cl.ipBlockList, err = iplist.NewFromReader(f)
384         return
385 }
386
387 func (cl *Client) initBannedTorrents() error {
388         f, err := os.Open(filepath.Join(cl.configDir(), "banned_infohashes"))
389         if err != nil {
390                 if os.IsNotExist(err) {
391                         return nil
392                 }
393                 return fmt.Errorf("error opening banned infohashes file: %s", err)
394         }
395         defer f.Close()
396         scanner := bufio.NewScanner(f)
397         cl.bannedTorrents = make(map[InfoHash]struct{})
398         for scanner.Scan() {
399                 if strings.HasPrefix(strings.TrimSpace(scanner.Text()), "#") {
400                         continue
401                 }
402                 var ihs string
403                 n, err := fmt.Sscanf(scanner.Text(), "%x", &ihs)
404                 if err != nil {
405                         return fmt.Errorf("error reading infohash: %s", err)
406                 }
407                 if n != 1 {
408                         continue
409                 }
410                 if len(ihs) != 20 {
411                         return errors.New("bad infohash")
412                 }
413                 var ih InfoHash
414                 CopyExact(&ih, ihs)
415                 cl.bannedTorrents[ih] = struct{}{}
416         }
417         if err := scanner.Err(); err != nil {
418                 return fmt.Errorf("error scanning file: %s", err)
419         }
420         return nil
421 }
422
423 // Creates a new client.
424 func NewClient(cfg *Config) (cl *Client, err error) {
425         if cfg == nil {
426                 cfg = &Config{}
427         }
428
429         defer func() {
430                 if err != nil {
431                         cl = nil
432                 }
433         }()
434         cl = &Client{
435                 halfOpenLimit: socketsPerTorrent,
436                 config:        *cfg,
437                 torrentDataOpener: func(md *metainfo.Info) data.Data {
438                         return filePkg.TorrentData(md, cfg.DataDir)
439                 },
440                 dopplegangerAddrs: make(map[string]struct{}),
441
442                 quit:     make(chan struct{}),
443                 torrents: make(map[InfoHash]*torrent),
444         }
445         CopyExact(&cl.extensionBytes, defaultExtensionBytes)
446         cl.event.L = &cl.mu
447         if cfg.TorrentDataOpener != nil {
448                 cl.torrentDataOpener = cfg.TorrentDataOpener
449         }
450
451         if !cfg.NoDefaultBlocklist {
452                 err = cl.setEnvBlocklist()
453                 if err != nil {
454                         return
455                 }
456         }
457
458         if err = cl.initBannedTorrents(); err != nil {
459                 err = fmt.Errorf("error initing banned torrents: %s", err)
460                 return
461         }
462
463         if cfg.PeerID != "" {
464                 CopyExact(&cl.peerID, cfg.PeerID)
465         } else {
466                 o := copy(cl.peerID[:], bep20)
467                 _, err = rand.Read(cl.peerID[o:])
468                 if err != nil {
469                         panic("error generating peer id")
470                 }
471         }
472
473         // Returns the laddr string to listen on for the next Listen call.
474         listenAddr := func() string {
475                 if addr := cl.ListenAddr(); addr != nil {
476                         return addr.String()
477                 }
478                 if cfg.ListenAddr == "" {
479                         return ":50007"
480                 }
481                 return cfg.ListenAddr
482         }
483         if !cl.config.DisableTCP {
484                 var l net.Listener
485                 l, err = net.Listen("tcp", listenAddr())
486                 if err != nil {
487                         return
488                 }
489                 cl.listeners = append(cl.listeners, l)
490                 go cl.acceptConnections(l, false)
491         }
492         if !cl.config.DisableUTP {
493                 cl.utpSock, err = utp.NewSocket(listenAddr())
494                 if err != nil {
495                         return
496                 }
497                 cl.listeners = append(cl.listeners, cl.utpSock)
498                 go cl.acceptConnections(cl.utpSock, true)
499         }
500         if !cfg.NoDHT {
501                 dhtCfg := cfg.DHTConfig
502                 if dhtCfg == nil {
503                         dhtCfg = &dht.ServerConfig{
504                                 IPBlocklist: cl.ipBlockList,
505                         }
506                 }
507                 if dhtCfg.Addr == "" {
508                         dhtCfg.Addr = listenAddr()
509                 }
510                 if dhtCfg.Conn == nil && cl.utpSock != nil {
511                         dhtCfg.Conn = cl.utpSock.PacketConn()
512                 }
513                 cl.dHT, err = dht.NewServer(dhtCfg)
514                 if err != nil {
515                         return
516                 }
517         }
518
519         return
520 }
521
522 func (cl *Client) stopped() bool {
523         select {
524         case <-cl.quit:
525                 return true
526         default:
527                 return false
528         }
529 }
530
531 // Stops the client. All connections to peers are closed and all activity will
532 // come to a halt.
533 func (me *Client) Close() {
534         me.mu.Lock()
535         defer me.mu.Unlock()
536         close(me.quit)
537         for _, l := range me.listeners {
538                 l.Close()
539         }
540         me.event.Broadcast()
541         for _, t := range me.torrents {
542                 t.close()
543         }
544 }
545
546 var ipv6BlockRange = iplist.Range{Description: "non-IPv4 address"}
547
548 func (cl *Client) ipBlockRange(ip net.IP) (r *iplist.Range) {
549         if cl.ipBlockList == nil {
550                 return
551         }
552         ip4 := ip.To4()
553         if ip4 == nil {
554                 log.Printf("blocking non-IPv4 address: %s", ip)
555                 r = &ipv6BlockRange
556                 return
557         }
558         r = cl.ipBlockList.Lookup(ip4)
559         return
560 }
561
562 func (cl *Client) waitAccept() {
563         cl.mu.Lock()
564         defer cl.mu.Unlock()
565         for {
566                 for _, t := range cl.torrents {
567                         if cl.wantConns(t) {
568                                 return
569                         }
570                 }
571                 cl.event.Wait()
572         }
573 }
574
575 func (cl *Client) acceptConnections(l net.Listener, utp bool) {
576         for {
577                 cl.waitAccept()
578                 // We accept all connections immediately, because we don't know what
579                 // torrent they're for.
580                 conn, err := l.Accept()
581                 select {
582                 case <-cl.quit:
583                         if conn != nil {
584                                 conn.Close()
585                         }
586                         return
587                 default:
588                 }
589                 if err != nil {
590                         log.Print(err)
591                         return
592                 }
593                 acceptedConns.Add(1)
594                 cl.mu.RLock()
595                 doppleganger := cl.dopplegangerAddr(conn.RemoteAddr().String())
596                 blockRange := cl.ipBlockRange(AddrIP(conn.RemoteAddr()))
597                 cl.mu.RUnlock()
598                 if blockRange != nil || doppleganger {
599                         inboundConnsBlocked.Add(1)
600                         // log.Printf("inbound connection from %s blocked by %s", conn.RemoteAddr(), blockRange)
601                         conn.Close()
602                         continue
603                 }
604                 go cl.incomingConnection(conn, utp)
605         }
606 }
607
608 func (cl *Client) incomingConnection(nc net.Conn, utp bool) {
609         defer nc.Close()
610         if tc, ok := nc.(*net.TCPConn); ok {
611                 tc.SetLinger(0)
612         }
613         c := newConnection()
614         c.conn = nc
615         c.rw = nc
616         c.Discovery = peerSourceIncoming
617         c.uTP = utp
618         err := cl.runReceivedConn(c)
619         if err != nil {
620                 // log.Print(err)
621         }
622 }
623
624 // Returns a handle to the given torrent, if it's present in the client.
625 func (cl *Client) Torrent(ih InfoHash) (T Torrent, ok bool) {
626         cl.mu.Lock()
627         defer cl.mu.Unlock()
628         t, ok := cl.torrents[ih]
629         if !ok {
630                 return
631         }
632         T = Torrent{cl, t}
633         return
634 }
635
636 func (me *Client) torrent(ih InfoHash) *torrent {
637         return me.torrents[ih]
638 }
639
640 type dialResult struct {
641         Conn net.Conn
642         UTP  bool
643 }
644
645 func doDial(dial func(addr string, t *torrent) (net.Conn, error), ch chan dialResult, utp bool, addr string, t *torrent) {
646         conn, err := dial(addr, t)
647         if err != nil {
648                 if conn != nil {
649                         conn.Close()
650                 }
651                 conn = nil // Pedantic
652         }
653         ch <- dialResult{conn, utp}
654         if err == nil {
655                 successfulDials.Add(1)
656                 return
657         }
658         unsuccessfulDials.Add(1)
659         if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
660                 return
661         }
662         if netOpErr, ok := err.(*net.OpError); ok {
663                 switch netOpErr.Err {
664                 case syscall.ECONNREFUSED, syscall.EHOSTUNREACH:
665                         return
666                 }
667         }
668         if utp && err.Error() == "timed out waiting for ack" {
669                 return
670         }
671         if err != nil {
672                 log.Printf("error dialing %s: %s", addr, err)
673                 return
674         }
675 }
676
677 func reducedDialTimeout(max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
678         ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
679         if ret < minDialTimeout {
680                 ret = minDialTimeout
681         }
682         return
683 }
684
685 func (me *Client) dopplegangerAddr(addr string) bool {
686         _, ok := me.dopplegangerAddrs[addr]
687         return ok
688 }
689
690 // Start the process of connecting to the given peer for the given torrent if
691 // appropriate.
692 func (me *Client) initiateConn(peer Peer, t *torrent) {
693         if peer.Id == me.peerID {
694                 return
695         }
696         addr := net.JoinHostPort(peer.IP.String(), fmt.Sprintf("%d", peer.Port))
697         if me.dopplegangerAddr(addr) || t.addrActive(addr) {
698                 duplicateConnsAvoided.Add(1)
699                 return
700         }
701         if r := me.ipBlockRange(peer.IP); r != nil {
702                 log.Printf("outbound connect to %s blocked by IP blocklist rule %s", peer.IP, r)
703                 return
704         }
705         t.HalfOpen[addr] = struct{}{}
706         go me.outgoingConnection(t, addr, peer.Source)
707 }
708
709 func (me *Client) dialTimeout(t *torrent) time.Duration {
710         me.mu.Lock()
711         pendingPeers := len(t.Peers)
712         me.mu.Unlock()
713         return reducedDialTimeout(nominalDialTimeout, me.halfOpenLimit, pendingPeers)
714 }
715
716 func (me *Client) dialTCP(addr string, t *torrent) (c net.Conn, err error) {
717         c, err = net.DialTimeout("tcp", addr, me.dialTimeout(t))
718         if err == nil {
719                 c.(*net.TCPConn).SetLinger(0)
720         }
721         return
722 }
723
724 func (me *Client) dialUTP(addr string, t *torrent) (c net.Conn, err error) {
725         return me.utpSock.DialTimeout(addr, me.dialTimeout(t))
726 }
727
728 // Returns a connection over UTP or TCP.
729 func (me *Client) dial(addr string, t *torrent) (conn net.Conn, utp bool) {
730         // Initiate connections via TCP and UTP simultaneously. Use the first one
731         // that succeeds.
732         left := 0
733         if !me.config.DisableUTP {
734                 left++
735         }
736         if !me.config.DisableTCP {
737                 left++
738         }
739         resCh := make(chan dialResult, left)
740         if !me.config.DisableUTP {
741                 go doDial(me.dialUTP, resCh, true, addr, t)
742         }
743         if !me.config.DisableTCP {
744                 go doDial(me.dialTCP, resCh, false, addr, t)
745         }
746         var res dialResult
747         // Wait for a successful connection.
748         for ; left > 0 && res.Conn == nil; left-- {
749                 res = <-resCh
750         }
751         if left > 0 {
752                 // There are still incompleted dials.
753                 go func() {
754                         for ; left > 0; left-- {
755                                 conn := (<-resCh).Conn
756                                 if conn != nil {
757                                         conn.Close()
758                                 }
759                         }
760                 }()
761         }
762         conn = res.Conn
763         utp = res.UTP
764         return
765 }
766
767 func (me *Client) noLongerHalfOpen(t *torrent, addr string) {
768         if _, ok := t.HalfOpen[addr]; !ok {
769                 panic("invariant broken")
770         }
771         delete(t.HalfOpen, addr)
772         me.openNewConns(t)
773 }
774
775 // Returns nil connection and nil error if no connection could be established
776 // for valid reasons.
777 func (me *Client) establishOutgoingConn(t *torrent, addr string) (c *connection, err error) {
778         handshakesConnection := func(nc net.Conn, encrypted, utp bool) (c *connection, err error) {
779                 c = newConnection()
780                 c.conn = nc
781                 c.rw = nc
782                 c.encrypted = encrypted
783                 c.uTP = utp
784                 err = nc.SetDeadline(time.Now().Add(handshakesTimeout))
785                 if err != nil {
786                         return
787                 }
788                 ok, err := me.initiateHandshakes(c, t)
789                 if !ok {
790                         c = nil
791                 }
792                 return
793         }
794         nc, utp := me.dial(addr, t)
795         if nc == nil {
796                 return
797         }
798         c, err = handshakesConnection(nc, !me.config.DisableEncryption, utp)
799         if err != nil {
800                 nc.Close()
801                 return
802         } else if c != nil {
803                 return
804         }
805         nc.Close()
806         // Try again without encryption, using whichever protocol type worked last
807         // time.
808         if me.config.DisableEncryption {
809                 // We already tried without encryption.
810                 return
811         }
812         if utp {
813                 nc, err = me.dialUTP(addr, t)
814         } else {
815                 nc, err = me.dialTCP(addr, t)
816         }
817         if err != nil {
818                 err = fmt.Errorf("error dialing for unencrypted connection: %s", err)
819                 return
820         }
821         c, err = handshakesConnection(nc, false, utp)
822         if err != nil {
823                 nc.Close()
824         }
825         return
826 }
827
828 // Called to dial out and run a connection. The addr we're given is already
829 // considered half-open.
830 func (me *Client) outgoingConnection(t *torrent, addr string, ps peerSource) {
831         c, err := me.establishOutgoingConn(t, addr)
832         me.mu.Lock()
833         defer me.mu.Unlock()
834         // Don't release lock between here and addConnection, unless it's for
835         // failure.
836         me.noLongerHalfOpen(t, addr)
837         if err != nil {
838                 return
839         }
840         if c == nil {
841                 return
842         }
843         defer c.Close()
844         c.Discovery = ps
845         err = me.runInitiatedHandshookConn(c, t)
846         if err != nil {
847                 // log.Print(err)
848         }
849 }
850
851 // The port number for incoming peer connections. 0 if the client isn't
852 // listening.
853 func (cl *Client) incomingPeerPort() int {
854         listenAddr := cl.ListenAddr()
855         if listenAddr == nil {
856                 return 0
857         }
858         return addrPort(listenAddr)
859 }
860
861 // Convert a net.Addr to its compact IP representation. Either 4 or 16 bytes
862 // per "yourip" field of http://www.bittorrent.org/beps/bep_0010.html.
863 func addrCompactIP(addr net.Addr) (string, error) {
864         host, _, err := net.SplitHostPort(addr.String())
865         if err != nil {
866                 return "", err
867         }
868         ip := net.ParseIP(host)
869         if v4 := ip.To4(); v4 != nil {
870                 if len(v4) != 4 {
871                         panic(v4)
872                 }
873                 return string(v4), nil
874         }
875         return string(ip.To16()), nil
876 }
877
878 func handshakeWriter(w io.Writer, bb <-chan []byte, done chan<- error) {
879         var err error
880         for b := range bb {
881                 _, err = w.Write(b)
882                 if err != nil {
883                         break
884                 }
885         }
886         done <- err
887 }
888
889 type (
890         peerExtensionBytes [8]byte
891         peerID             [20]byte
892 )
893
894 func (me *peerExtensionBytes) SupportsExtended() bool {
895         return me[5]&0x10 != 0
896 }
897
898 func (me *peerExtensionBytes) SupportsDHT() bool {
899         return me[7]&0x01 != 0
900 }
901
902 func (me *peerExtensionBytes) SupportsFast() bool {
903         return me[7]&0x04 != 0
904 }
905
906 type handshakeResult struct {
907         peerExtensionBytes
908         peerID
909         InfoHash
910 }
911
912 // ih is nil if we expect the peer to declare the InfoHash, such as when the
913 // peer initiated the connection. Returns ok if the handshake was successful,
914 // and err if there was an unexpected condition other than the peer simply
915 // abandoning the handshake.
916 func handshake(sock io.ReadWriter, ih *InfoHash, peerID [20]byte, extensions peerExtensionBytes) (res handshakeResult, ok bool, err error) {
917         // Bytes to be sent to the peer. Should never block the sender.
918         postCh := make(chan []byte, 4)
919         // A single error value sent when the writer completes.
920         writeDone := make(chan error, 1)
921         // Performs writes to the socket and ensures posts don't block.
922         go handshakeWriter(sock, postCh, writeDone)
923
924         defer func() {
925                 close(postCh) // Done writing.
926                 if !ok {
927                         return
928                 }
929                 if err != nil {
930                         panic(err)
931                 }
932                 // Wait until writes complete before returning from handshake.
933                 err = <-writeDone
934                 if err != nil {
935                         err = fmt.Errorf("error writing: %s", err)
936                 }
937         }()
938
939         post := func(bb []byte) {
940                 select {
941                 case postCh <- bb:
942                 default:
943                         panic("mustn't block while posting")
944                 }
945         }
946
947         post([]byte(pp.Protocol))
948         post(extensions[:])
949         if ih != nil { // We already know what we want.
950                 post(ih[:])
951                 post(peerID[:])
952         }
953         var b [68]byte
954         _, err = io.ReadFull(sock, b[:68])
955         if err != nil {
956                 err = nil
957                 return
958         }
959         if string(b[:20]) != pp.Protocol {
960                 return
961         }
962         CopyExact(&res.peerExtensionBytes, b[20:28])
963         CopyExact(&res.InfoHash, b[28:48])
964         CopyExact(&res.peerID, b[48:68])
965         peerExtensions.Add(hex.EncodeToString(res.peerExtensionBytes[:]), 1)
966
967         // TODO: Maybe we can just drop peers here if we're not interested. This
968         // could prevent them trying to reconnect, falsely believing there was
969         // just a problem.
970         if ih == nil { // We were waiting for the peer to tell us what they wanted.
971                 post(res.InfoHash[:])
972                 post(peerID[:])
973         }
974
975         ok = true
976         return
977 }
978
979 // Wraps a raw connection and provides the interface we want for using the
980 // connection in the message loop.
981 type deadlineReader struct {
982         nc net.Conn
983         r  io.Reader
984 }
985
986 func (me deadlineReader) Read(b []byte) (n int, err error) {
987         // Keep-alives should be received every 2 mins. Give a bit of gracetime.
988         err = me.nc.SetReadDeadline(time.Now().Add(150 * time.Second))
989         if err != nil {
990                 err = fmt.Errorf("error setting read deadline: %s", err)
991         }
992         n, err = me.r.Read(b)
993         // Convert common errors into io.EOF.
994         // if err != nil {
995         //      if opError, ok := err.(*net.OpError); ok && opError.Op == "read" && opError.Err == syscall.ECONNRESET {
996         //              err = io.EOF
997         //      } else if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
998         //              if n != 0 {
999         //                      panic(n)
1000         //              }
1001         //              err = io.EOF
1002         //      }
1003         // }
1004         return
1005 }
1006
1007 type readWriter struct {
1008         io.Reader
1009         io.Writer
1010 }
1011
1012 func maybeReceiveEncryptedHandshake(rw io.ReadWriter, skeys [][]byte) (ret io.ReadWriter, encrypted bool, err error) {
1013         var protocol [len(pp.Protocol)]byte
1014         _, err = io.ReadFull(rw, protocol[:])
1015         if err != nil {
1016                 return
1017         }
1018         ret = readWriter{
1019                 io.MultiReader(bytes.NewReader(protocol[:]), rw),
1020                 rw,
1021         }
1022         if string(protocol[:]) == pp.Protocol {
1023                 return
1024         }
1025         encrypted = true
1026         ret, err = mse.ReceiveHandshake(ret, skeys)
1027         return
1028 }
1029
1030 func (cl *Client) receiveSkeys() (ret [][]byte) {
1031         for ih := range cl.torrents {
1032                 ret = append(ret, ih[:])
1033         }
1034         return
1035 }
1036
1037 func (me *Client) initiateHandshakes(c *connection, t *torrent) (ok bool, err error) {
1038         if c.encrypted {
1039                 c.rw, err = mse.InitiateHandshake(c.rw, t.InfoHash[:], nil)
1040                 if err != nil {
1041                         return
1042                 }
1043         }
1044         ih, ok, err := me.connBTHandshake(c, &t.InfoHash)
1045         if ih != t.InfoHash {
1046                 ok = false
1047         }
1048         return
1049 }
1050
1051 // Do encryption and bittorrent handshakes as receiver.
1052 func (cl *Client) receiveHandshakes(c *connection) (t *torrent, err error) {
1053         cl.mu.Lock()
1054         skeys := cl.receiveSkeys()
1055         cl.mu.Unlock()
1056         if !cl.config.DisableEncryption {
1057                 c.rw, c.encrypted, err = maybeReceiveEncryptedHandshake(c.rw, skeys)
1058                 if err != nil {
1059                         if err == mse.ErrNoSecretKeyMatch {
1060                                 err = nil
1061                         }
1062                         return
1063                 }
1064         }
1065         ih, ok, err := cl.connBTHandshake(c, nil)
1066         if err != nil {
1067                 err = fmt.Errorf("error during bt handshake: %s", err)
1068                 return
1069         }
1070         if !ok {
1071                 return
1072         }
1073         cl.mu.Lock()
1074         t = cl.torrents[ih]
1075         cl.mu.Unlock()
1076         return
1077 }
1078
1079 // Returns !ok if handshake failed for valid reasons.
1080 func (cl *Client) connBTHandshake(c *connection, ih *InfoHash) (ret InfoHash, ok bool, err error) {
1081         res, ok, err := handshake(c.rw, ih, cl.peerID, cl.extensionBytes)
1082         if err != nil || !ok {
1083                 return
1084         }
1085         ret = res.InfoHash
1086         c.PeerExtensionBytes = res.peerExtensionBytes
1087         c.PeerID = res.peerID
1088         c.completedHandshake = time.Now()
1089         return
1090 }
1091
1092 func (cl *Client) runInitiatedHandshookConn(c *connection, t *torrent) (err error) {
1093         if c.PeerID == cl.peerID {
1094                 // Only if we initiated the connection is the remote address a
1095                 // listen addr for a doppleganger.
1096                 connsToSelf.Add(1)
1097                 addr := c.conn.RemoteAddr().String()
1098                 cl.dopplegangerAddrs[addr] = struct{}{}
1099                 return
1100         }
1101         return cl.runHandshookConn(c, t)
1102 }
1103
1104 func (cl *Client) runReceivedConn(c *connection) (err error) {
1105         err = c.conn.SetDeadline(time.Now().Add(handshakesTimeout))
1106         if err != nil {
1107                 return
1108         }
1109         t, err := cl.receiveHandshakes(c)
1110         if err != nil {
1111                 err = fmt.Errorf("error receiving handshakes: %s", err)
1112                 return
1113         }
1114         if t == nil {
1115                 return
1116         }
1117         cl.mu.Lock()
1118         defer cl.mu.Unlock()
1119         if c.PeerID == cl.peerID {
1120                 return
1121         }
1122         return cl.runHandshookConn(c, t)
1123 }
1124
1125 func (cl *Client) runHandshookConn(c *connection, t *torrent) (err error) {
1126         c.conn.SetWriteDeadline(time.Time{})
1127         c.rw = readWriter{
1128                 deadlineReader{c.conn, c.rw},
1129                 c.rw,
1130         }
1131         if !cl.addConnection(t, c) {
1132                 return
1133         }
1134         defer cl.dropConnection(t, c)
1135         go c.writer()
1136         go c.writeOptimizer(time.Minute)
1137         cl.sendInitialMessages(c, t)
1138         if t.haveInfo() {
1139                 t.initRequestOrdering(c)
1140         }
1141         err = cl.connectionLoop(t, c)
1142         if err != nil {
1143                 err = fmt.Errorf("error during connection loop: %s", err)
1144         }
1145         return
1146 }
1147
1148 func (me *Client) sendInitialMessages(conn *connection, torrent *torrent) {
1149         if conn.PeerExtensionBytes.SupportsExtended() && me.extensionBytes.SupportsExtended() {
1150                 conn.Post(pp.Message{
1151                         Type:       pp.Extended,
1152                         ExtendedID: pp.HandshakeExtendedID,
1153                         ExtendedPayload: func() []byte {
1154                                 d := map[string]interface{}{
1155                                         "m": func() (ret map[string]int) {
1156                                                 ret = make(map[string]int, 2)
1157                                                 ret["ut_metadata"] = metadataExtendedId
1158                                                 if !me.config.DisablePEX {
1159                                                         ret["ut_pex"] = pexExtendedId
1160                                                 }
1161                                                 return
1162                                         }(),
1163                                         "v": extendedHandshakeClientVersion,
1164                                         // No upload queue is implemented yet.
1165                                         "reqq": 64,
1166                                 }
1167                                 if !me.config.DisableEncryption {
1168                                         d["e"] = 1
1169                                 }
1170                                 if torrent.metadataSizeKnown() {
1171                                         d["metadata_size"] = torrent.metadataSize()
1172                                 }
1173                                 if p := me.incomingPeerPort(); p != 0 {
1174                                         d["p"] = p
1175                                 }
1176                                 yourip, err := addrCompactIP(conn.remoteAddr())
1177                                 if err != nil {
1178                                         log.Printf("error calculating yourip field value in extension handshake: %s", err)
1179                                 } else {
1180                                         d["yourip"] = yourip
1181                                 }
1182                                 // log.Printf("sending %v", d)
1183                                 b, err := bencode.Marshal(d)
1184                                 if err != nil {
1185                                         panic(err)
1186                                 }
1187                                 return b
1188                         }(),
1189                 })
1190         }
1191         if torrent.haveAnyPieces() {
1192                 conn.Post(pp.Message{
1193                         Type:     pp.Bitfield,
1194                         Bitfield: torrent.bitfield(),
1195                 })
1196         } else if me.extensionBytes.SupportsFast() && conn.PeerExtensionBytes.SupportsFast() {
1197                 conn.Post(pp.Message{
1198                         Type: pp.HaveNone,
1199                 })
1200         }
1201         if conn.PeerExtensionBytes.SupportsDHT() && me.extensionBytes.SupportsDHT() && me.dHT != nil {
1202                 conn.Post(pp.Message{
1203                         Type: pp.Port,
1204                         Port: uint16(AddrPort(me.dHT.Addr())),
1205                 })
1206         }
1207 }
1208
1209 // Randomizes the piece order for this connection. Every connection will be
1210 // given a different ordering. Having it stored per connection saves having to
1211 // randomize during request filling, and constantly recalculate the ordering
1212 // based on piece priorities.
1213 func (t *torrent) initRequestOrdering(c *connection) {
1214         if c.pieceRequestOrder != nil || c.piecePriorities != nil {
1215                 panic("double init of request ordering")
1216         }
1217         c.piecePriorities = mathRand.Perm(t.numPieces())
1218         c.pieceRequestOrder = pieceordering.New()
1219         for i := range iter.N(t.Info.NumPieces()) {
1220                 if !c.PeerHasPiece(i) {
1221                         continue
1222                 }
1223                 if !t.wantPiece(i) {
1224                         continue
1225                 }
1226                 t.connPendPiece(c, i)
1227         }
1228 }
1229
1230 func (me *Client) peerGotPiece(t *torrent, c *connection, piece int) {
1231         if !c.peerHasAll {
1232                 if t.haveInfo() {
1233                         if c.PeerPieces == nil {
1234                                 c.PeerPieces = make([]bool, t.numPieces())
1235                         }
1236                 } else {
1237                         for piece >= len(c.PeerPieces) {
1238                                 c.PeerPieces = append(c.PeerPieces, false)
1239                         }
1240                 }
1241                 c.PeerPieces[piece] = true
1242         }
1243         if t.wantPiece(piece) {
1244                 t.connPendPiece(c, piece)
1245                 me.replenishConnRequests(t, c)
1246         }
1247 }
1248
1249 func (me *Client) peerUnchoked(torrent *torrent, conn *connection) {
1250         me.replenishConnRequests(torrent, conn)
1251 }
1252
1253 func (cl *Client) connCancel(t *torrent, cn *connection, r request) (ok bool) {
1254         ok = cn.Cancel(r)
1255         if ok {
1256                 postedCancels.Add(1)
1257         }
1258         return
1259 }
1260
1261 func (cl *Client) connDeleteRequest(t *torrent, cn *connection, r request) bool {
1262         if !cn.RequestPending(r) {
1263                 return false
1264         }
1265         delete(cn.Requests, r)
1266         return true
1267 }
1268
1269 func (cl *Client) requestPendingMetadata(t *torrent, c *connection) {
1270         if t.haveInfo() {
1271                 return
1272         }
1273         if c.PeerExtensionIDs["ut_metadata"] == 0 {
1274                 // Peer doesn't support this.
1275                 return
1276         }
1277         // Request metadata pieces that we don't have in a random order.
1278         var pending []int
1279         for index := 0; index < t.metadataPieceCount(); index++ {
1280                 if !t.haveMetadataPiece(index) && !c.requestedMetadataPiece(index) {
1281                         pending = append(pending, index)
1282                 }
1283         }
1284         for _, i := range mathRand.Perm(len(pending)) {
1285                 c.requestMetadataPiece(pending[i])
1286         }
1287 }
1288
1289 func (cl *Client) completedMetadata(t *torrent) {
1290         h := sha1.New()
1291         h.Write(t.MetaData)
1292         var ih InfoHash
1293         CopyExact(&ih, h.Sum(nil))
1294         if ih != t.InfoHash {
1295                 log.Print("bad metadata")
1296                 t.invalidateMetadata()
1297                 return
1298         }
1299         var info metainfo.Info
1300         err := bencode.Unmarshal(t.MetaData, &info)
1301         if err != nil {
1302                 log.Printf("error unmarshalling metadata: %s", err)
1303                 t.invalidateMetadata()
1304                 return
1305         }
1306         // TODO(anacrolix): If this fails, I think something harsher should be
1307         // done.
1308         err = cl.setMetaData(t, &info, t.MetaData)
1309         if err != nil {
1310                 log.Printf("error setting metadata: %s", err)
1311                 t.invalidateMetadata()
1312                 return
1313         }
1314         log.Printf("%s: got metadata from peers", t)
1315 }
1316
1317 // Process incoming ut_metadata message.
1318 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *torrent, c *connection) (err error) {
1319         var d map[string]int
1320         err = bencode.Unmarshal(payload, &d)
1321         if err != nil {
1322                 err = fmt.Errorf("error unmarshalling payload: %s: %q", err, payload)
1323                 return
1324         }
1325         msgType, ok := d["msg_type"]
1326         if !ok {
1327                 err = errors.New("missing msg_type field")
1328                 return
1329         }
1330         piece := d["piece"]
1331         switch msgType {
1332         case pp.DataMetadataExtensionMsgType:
1333                 if t.haveInfo() {
1334                         break
1335                 }
1336                 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
1337                 if begin < 0 || begin >= len(payload) {
1338                         log.Printf("got bad metadata piece")
1339                         break
1340                 }
1341                 if !c.requestedMetadataPiece(piece) {
1342                         log.Printf("got unexpected metadata piece %d", piece)
1343                         break
1344                 }
1345                 c.metadataRequests[piece] = false
1346                 t.saveMetadataPiece(piece, payload[begin:])
1347                 c.UsefulChunksReceived++
1348                 c.lastUsefulChunkReceived = time.Now()
1349                 if !t.haveAllMetadataPieces() {
1350                         break
1351                 }
1352                 cl.completedMetadata(t)
1353         case pp.RequestMetadataExtensionMsgType:
1354                 if !t.haveMetadataPiece(piece) {
1355                         c.Post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
1356                         break
1357                 }
1358                 start := (1 << 14) * piece
1359                 c.Post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.MetaData[start:start+t.metadataPieceSize(piece)]))
1360         case pp.RejectMetadataExtensionMsgType:
1361         default:
1362                 err = errors.New("unknown msg_type value")
1363         }
1364         return
1365 }
1366
1367 type peerExchangeMessage struct {
1368         Added      CompactPeers `bencode:"added"`
1369         AddedFlags []byte       `bencode:"added.f"`
1370         Dropped    CompactPeers `bencode:"dropped"`
1371 }
1372
1373 // Extracts the port as an integer from an address string.
1374 func addrPort(addr net.Addr) int {
1375         return AddrPort(addr)
1376 }
1377
1378 func (cl *Client) peerHasAll(t *torrent, cn *connection) {
1379         cn.peerHasAll = true
1380         cn.PeerPieces = nil
1381         if t.haveInfo() {
1382                 for i := 0; i < t.numPieces(); i++ {
1383                         cl.peerGotPiece(t, cn, i)
1384                 }
1385         }
1386 }
1387
1388 func (me *Client) upload(t *torrent, c *connection) {
1389         if me.config.NoUpload {
1390                 return
1391         }
1392         if !c.PeerInterested {
1393                 return
1394         }
1395         if !me.seeding(t) && !t.connHasWantedPieces(c) {
1396                 return
1397         }
1398 another:
1399         for c.chunksSent < c.UsefulChunksReceived+6 {
1400                 c.Unchoke()
1401                 for r := range c.PeerRequests {
1402                         err := me.sendChunk(t, c, r)
1403                         if err != nil {
1404                                 log.Printf("error sending chunk to peer: %s", err)
1405                         }
1406                         delete(c.PeerRequests, r)
1407                         goto another
1408                 }
1409                 return
1410         }
1411         c.Choke()
1412 }
1413
1414 func (me *Client) sendChunk(t *torrent, c *connection, r request) error {
1415         b := make([]byte, r.Length)
1416         p := t.Info.Piece(int(r.Index))
1417         n, err := dataReadAt(t.data, b, p.Offset()+int64(r.Begin))
1418         if err != nil {
1419                 return err
1420         }
1421         if n != len(b) {
1422                 log.Fatal(b)
1423         }
1424         c.Post(pp.Message{
1425                 Type:  pp.Piece,
1426                 Index: r.Index,
1427                 Begin: r.Begin,
1428                 Piece: b,
1429         })
1430         uploadChunksPosted.Add(1)
1431         c.chunksSent++
1432         c.lastChunkSent = time.Now()
1433         return nil
1434 }
1435
1436 // Processes incoming bittorrent messages. The client lock is held upon entry
1437 // and exit.
1438 func (me *Client) connectionLoop(t *torrent, c *connection) error {
1439         decoder := pp.Decoder{
1440                 R:         bufio.NewReader(c.rw),
1441                 MaxLength: 256 * 1024,
1442         }
1443         for {
1444                 me.mu.Unlock()
1445                 var msg pp.Message
1446                 err := decoder.Decode(&msg)
1447                 receivedMessageTypes.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
1448                 me.mu.Lock()
1449                 c.lastMessageReceived = time.Now()
1450                 select {
1451                 case <-c.closing:
1452                         return nil
1453                 default:
1454                 }
1455                 if err != nil {
1456                         if me.stopped() || err == io.EOF {
1457                                 return nil
1458                         }
1459                         return err
1460                 }
1461                 if msg.Keepalive {
1462                         continue
1463                 }
1464                 switch msg.Type {
1465                 case pp.Choke:
1466                         c.PeerChoked = true
1467                         for r := range c.Requests {
1468                                 me.connDeleteRequest(t, c, r)
1469                         }
1470                         // We can then reset our interest.
1471                         me.replenishConnRequests(t, c)
1472                 case pp.Reject:
1473                         me.connDeleteRequest(t, c, newRequest(msg.Index, msg.Begin, msg.Length))
1474                         me.replenishConnRequests(t, c)
1475                 case pp.Unchoke:
1476                         c.PeerChoked = false
1477                         me.peerUnchoked(t, c)
1478                 case pp.Interested:
1479                         c.PeerInterested = true
1480                         me.upload(t, c)
1481                 case pp.NotInterested:
1482                         c.PeerInterested = false
1483                         c.Choke()
1484                 case pp.Have:
1485                         me.peerGotPiece(t, c, int(msg.Index))
1486                 case pp.Request:
1487                         if c.Choked {
1488                                 break
1489                         }
1490                         if !c.PeerInterested {
1491                                 err = errors.New("peer sent request but isn't interested")
1492                                 break
1493                         }
1494                         if c.PeerRequests == nil {
1495                                 c.PeerRequests = make(map[request]struct{}, maxRequests)
1496                         }
1497                         c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{}
1498                         me.upload(t, c)
1499                 case pp.Cancel:
1500                         req := newRequest(msg.Index, msg.Begin, msg.Length)
1501                         if !c.PeerCancel(req) {
1502                                 unexpectedCancels.Add(1)
1503                         }
1504                 case pp.Bitfield:
1505                         if c.PeerPieces != nil || c.peerHasAll {
1506                                 err = errors.New("received unexpected bitfield")
1507                                 break
1508                         }
1509                         if t.haveInfo() {
1510                                 if len(msg.Bitfield) < t.numPieces() {
1511                                         err = errors.New("received invalid bitfield")
1512                                         break
1513                                 }
1514                                 msg.Bitfield = msg.Bitfield[:t.numPieces()]
1515                         }
1516                         c.PeerPieces = msg.Bitfield
1517                         for index, has := range c.PeerPieces {
1518                                 if has {
1519                                         me.peerGotPiece(t, c, index)
1520                                 }
1521                         }
1522                 case pp.HaveAll:
1523                         if c.PeerPieces != nil || c.peerHasAll {
1524                                 err = errors.New("unexpected have-all")
1525                                 break
1526                         }
1527                         me.peerHasAll(t, c)
1528                 case pp.HaveNone:
1529                         if c.peerHasAll || c.PeerPieces != nil {
1530                                 err = errors.New("unexpected have-none")
1531                                 break
1532                         }
1533                         c.PeerPieces = make([]bool, func() int {
1534                                 if t.haveInfo() {
1535                                         return t.numPieces()
1536                                 } else {
1537                                         return 0
1538                                 }
1539                         }())
1540                 case pp.Piece:
1541                         err = me.downloadedChunk(t, c, &msg)
1542                 case pp.Extended:
1543                         switch msg.ExtendedID {
1544                         case pp.HandshakeExtendedID:
1545                                 // TODO: Create a bencode struct for this.
1546                                 var d map[string]interface{}
1547                                 err = bencode.Unmarshal(msg.ExtendedPayload, &d)
1548                                 if err != nil {
1549                                         err = fmt.Errorf("error decoding extended message payload: %s", err)
1550                                         break
1551                                 }
1552                                 // log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d)
1553                                 if reqq, ok := d["reqq"]; ok {
1554                                         if i, ok := reqq.(int64); ok {
1555                                                 c.PeerMaxRequests = int(i)
1556                                         }
1557                                 }
1558                                 if v, ok := d["v"]; ok {
1559                                         c.PeerClientName = v.(string)
1560                                 }
1561                                 m, ok := d["m"]
1562                                 if !ok {
1563                                         err = errors.New("handshake missing m item")
1564                                         break
1565                                 }
1566                                 mTyped, ok := m.(map[string]interface{})
1567                                 if !ok {
1568                                         err = errors.New("handshake m value is not dict")
1569                                         break
1570                                 }
1571                                 if c.PeerExtensionIDs == nil {
1572                                         c.PeerExtensionIDs = make(map[string]byte, len(mTyped))
1573                                 }
1574                                 for name, v := range mTyped {
1575                                         id, ok := v.(int64)
1576                                         if !ok {
1577                                                 log.Printf("bad handshake m item extension ID type: %T", v)
1578                                                 continue
1579                                         }
1580                                         if id == 0 {
1581                                                 delete(c.PeerExtensionIDs, name)
1582                                         } else {
1583                                                 if c.PeerExtensionIDs[name] == 0 {
1584                                                         supportedExtensionMessages.Add(name, 1)
1585                                                 }
1586                                                 c.PeerExtensionIDs[name] = byte(id)
1587                                         }
1588                                 }
1589                                 metadata_sizeUntyped, ok := d["metadata_size"]
1590                                 if ok {
1591                                         metadata_size, ok := metadata_sizeUntyped.(int64)
1592                                         if !ok {
1593                                                 log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
1594                                         } else {
1595                                                 t.setMetadataSize(metadata_size, me)
1596                                         }
1597                                 }
1598                                 if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
1599                                         me.requestPendingMetadata(t, c)
1600                                 }
1601                         case metadataExtendedId:
1602                                 err = me.gotMetadataExtensionMsg(msg.ExtendedPayload, t, c)
1603                                 if err != nil {
1604                                         err = fmt.Errorf("error handling metadata extension message: %s", err)
1605                                 }
1606                         case pexExtendedId:
1607                                 if me.config.DisablePEX {
1608                                         break
1609                                 }
1610                                 var pexMsg peerExchangeMessage
1611                                 err := bencode.Unmarshal(msg.ExtendedPayload, &pexMsg)
1612                                 if err != nil {
1613                                         err = fmt.Errorf("error unmarshalling PEX message: %s", err)
1614                                         break
1615                                 }
1616                                 go func() {
1617                                         me.mu.Lock()
1618                                         me.addPeers(t, func() (ret []Peer) {
1619                                                 for i, cp := range pexMsg.Added {
1620                                                         p := Peer{
1621                                                                 IP:     make([]byte, 4),
1622                                                                 Port:   int(cp.Port),
1623                                                                 Source: peerSourcePEX,
1624                                                         }
1625                                                         if i < len(pexMsg.AddedFlags) && pexMsg.AddedFlags[i]&0x01 != 0 {
1626                                                                 p.SupportsEncryption = true
1627                                                         }
1628                                                         CopyExact(p.IP, cp.IP[:])
1629                                                         ret = append(ret, p)
1630                                                 }
1631                                                 return
1632                                         }())
1633                                         me.mu.Unlock()
1634                                         peersFoundByPEX.Add(int64(len(pexMsg.Added)))
1635                                 }()
1636                         default:
1637                                 err = fmt.Errorf("unexpected extended message ID: %v", msg.ExtendedID)
1638                         }
1639                         if err != nil {
1640                                 // That client uses its own extension IDs for outgoing message
1641                                 // types, which is incorrect.
1642                                 if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) ||
1643                                         strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
1644                                         return nil
1645                                 }
1646                         }
1647                 case pp.Port:
1648                         if me.dHT == nil {
1649                                 break
1650                         }
1651                         pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
1652                         if err != nil {
1653                                 panic(err)
1654                         }
1655                         if msg.Port != 0 {
1656                                 pingAddr.Port = int(msg.Port)
1657                         }
1658                         _, err = me.dHT.Ping(pingAddr)
1659                 default:
1660                         err = fmt.Errorf("received unknown message type: %#v", msg.Type)
1661                 }
1662                 if err != nil {
1663                         return err
1664                 }
1665         }
1666 }
1667
1668 func (me *Client) dropConnection(torrent *torrent, conn *connection) {
1669         me.event.Broadcast()
1670         for r := range conn.Requests {
1671                 me.connDeleteRequest(torrent, conn, r)
1672         }
1673         conn.Close()
1674         for i0, c := range torrent.Conns {
1675                 if c != conn {
1676                         continue
1677                 }
1678                 i1 := len(torrent.Conns) - 1
1679                 if i0 != i1 {
1680                         torrent.Conns[i0] = torrent.Conns[i1]
1681                 }
1682                 torrent.Conns = torrent.Conns[:i1]
1683                 me.openNewConns(torrent)
1684                 return
1685         }
1686         panic("connection not found")
1687 }
1688
1689 func (me *Client) addConnection(t *torrent, c *connection) bool {
1690         if me.stopped() {
1691                 return false
1692         }
1693         select {
1694         case <-t.ceasingNetworking:
1695                 return false
1696         default:
1697         }
1698         if !me.wantConns(t) {
1699                 return false
1700         }
1701         for _, c0 := range t.Conns {
1702                 if c.PeerID == c0.PeerID {
1703                         // Already connected to a client with that ID.
1704                         duplicateClientConns.Add(1)
1705                         return false
1706                 }
1707         }
1708         t.Conns = append(t.Conns, c)
1709         // TODO: This should probably be done by a routine that kills off bad
1710         // connections, and extra connections killed here instead.
1711         if len(t.Conns) > socketsPerTorrent {
1712                 wcs := t.worstConnsHeap(me)
1713                 heap.Pop(wcs).(*connection).Close()
1714         }
1715         return true
1716 }
1717
1718 func (t *torrent) needData() bool {
1719         if !t.haveInfo() {
1720                 return true
1721         }
1722         for i := range t.Pieces {
1723                 if t.wantPiece(i) {
1724                         return true
1725                 }
1726         }
1727         return false
1728 }
1729
1730 func (cl *Client) usefulConn(t *torrent, c *connection) bool {
1731         // A 30 second grace for initial messages to go through.
1732         if time.Since(c.completedHandshake) < 30*time.Second {
1733                 return true
1734         }
1735         if !t.haveInfo() {
1736                 if !c.supportsExtension("ut_metadata") {
1737                         return false
1738                 }
1739                 if time.Since(c.completedHandshake) < 2*time.Minute {
1740                         return true
1741                 }
1742                 return false
1743         }
1744         if cl.seeding(t) {
1745                 return c.PeerInterested
1746         }
1747         return t.connHasWantedPieces(c)
1748 }
1749
1750 func (t *torrent) numGoodConns(cl *Client) (num int) {
1751         for _, c := range t.Conns {
1752                 if cl.usefulConn(t, c) {
1753                         num++
1754                 }
1755         }
1756         return
1757 }
1758
1759 func (me *Client) wantConns(t *torrent) bool {
1760         if !me.seeding(t) && !t.needData() {
1761                 return false
1762         }
1763         if t.numGoodConns(me) >= socketsPerTorrent {
1764                 return false
1765         }
1766         return true
1767 }
1768
1769 func (me *Client) openNewConns(t *torrent) {
1770         select {
1771         case <-t.ceasingNetworking:
1772                 return
1773         default:
1774         }
1775         for len(t.Peers) != 0 {
1776                 if !me.wantConns(t) {
1777                         return
1778                 }
1779                 if len(t.HalfOpen) >= me.halfOpenLimit {
1780                         return
1781                 }
1782                 var (
1783                         k peersKey
1784                         p Peer
1785                 )
1786                 for k, p = range t.Peers {
1787                         break
1788                 }
1789                 delete(t.Peers, k)
1790                 me.initiateConn(p, t)
1791         }
1792         t.wantPeers.Broadcast()
1793 }
1794
1795 func (me *Client) addPeers(t *torrent, peers []Peer) {
1796         for _, p := range peers {
1797                 if me.dopplegangerAddr(net.JoinHostPort(p.IP.String(), strconv.FormatInt(int64(p.Port), 10))) {
1798                         continue
1799                 }
1800                 if me.ipBlockRange(p.IP) != nil {
1801                         continue
1802                 }
1803                 if p.Port == 0 {
1804                         log.Printf("got bad peer: %v", p)
1805                         continue
1806                 }
1807                 t.addPeer(p)
1808         }
1809         me.openNewConns(t)
1810 }
1811
1812 func (cl *Client) cachedMetaInfoFilename(ih InfoHash) string {
1813         return filepath.Join(cl.configDir(), "torrents", ih.HexString()+".torrent")
1814 }
1815
1816 func (cl *Client) saveTorrentFile(t *torrent) error {
1817         path := cl.cachedMetaInfoFilename(t.InfoHash)
1818         os.MkdirAll(filepath.Dir(path), 0777)
1819         f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
1820         if err != nil {
1821                 return fmt.Errorf("error opening file: %s", err)
1822         }
1823         defer f.Close()
1824         e := bencode.NewEncoder(f)
1825         err = e.Encode(t.MetaInfo())
1826         if err != nil {
1827                 return fmt.Errorf("error marshalling metainfo: %s", err)
1828         }
1829         mi, err := cl.torrentCacheMetaInfo(t.InfoHash)
1830         if err != nil {
1831                 // For example, a script kiddy makes us load too many files, and we're
1832                 // able to save the torrent, but not load it again to check it.
1833                 return nil
1834         }
1835         if !bytes.Equal(mi.Info.Hash, t.InfoHash[:]) {
1836                 log.Fatalf("%x != %x", mi.Info.Hash, t.InfoHash[:])
1837         }
1838         return nil
1839 }
1840
1841 func (cl *Client) startTorrent(t *torrent) {
1842         if t.Info == nil || t.data == nil {
1843                 panic("nope")
1844         }
1845         // If the client intends to upload, it needs to know what state pieces are
1846         // in.
1847         if !cl.config.NoUpload {
1848                 // Queue all pieces for hashing. This is done sequentially to avoid
1849                 // spamming goroutines.
1850                 for _, p := range t.Pieces {
1851                         p.QueuedForHash = true
1852                 }
1853                 go func() {
1854                         for i := range t.Pieces {
1855                                 cl.verifyPiece(t, pp.Integer(i))
1856                         }
1857                 }()
1858         }
1859 }
1860
1861 // Storage cannot be changed once it's set.
1862 func (cl *Client) setStorage(t *torrent, td data.Data) (err error) {
1863         err = t.setStorage(td)
1864         cl.event.Broadcast()
1865         if err != nil {
1866                 return
1867         }
1868         for index := range iter.N(t.numPieces()) {
1869                 cl.pieceChanged(t, index)
1870         }
1871         cl.startTorrent(t)
1872         return
1873 }
1874
1875 type TorrentDataOpener func(*metainfo.Info) data.Data
1876
1877 func (cl *Client) setMetaData(t *torrent, md *metainfo.Info, bytes []byte) (err error) {
1878         err = t.setMetadata(md, bytes, &cl.mu)
1879         if err != nil {
1880                 return
1881         }
1882         if !cl.config.DisableMetainfoCache {
1883                 if err := cl.saveTorrentFile(t); err != nil {
1884                         log.Printf("error saving torrent file for %s: %s", t, err)
1885                 }
1886         }
1887         cl.event.Broadcast()
1888         close(t.gotMetainfo)
1889         td := cl.torrentDataOpener(md)
1890         err = cl.setStorage(t, td)
1891         return
1892 }
1893
1894 // Prepare a Torrent without any attachment to a Client. That means we can
1895 // initialize fields all fields that don't require the Client without locking
1896 // it.
1897 func newTorrent(ih InfoHash) (t *torrent, err error) {
1898         t = &torrent{
1899                 InfoHash: ih,
1900                 Peers:    make(map[peersKey]Peer),
1901
1902                 closing:           make(chan struct{}),
1903                 ceasingNetworking: make(chan struct{}),
1904
1905                 gotMetainfo: make(chan struct{}),
1906
1907                 HalfOpen: make(map[string]struct{}),
1908         }
1909         t.wantPeers.L = &t.stateMu
1910         return
1911 }
1912
1913 func init() {
1914         // For shuffling the tracker tiers.
1915         mathRand.Seed(time.Now().Unix())
1916 }
1917
1918 // The trackers within each tier must be shuffled before use.
1919 // http://stackoverflow.com/a/12267471/149482
1920 // http://www.bittorrent.org/beps/bep_0012.html#order-of-processing
1921 func shuffleTier(tier []tracker.Client) {
1922         for i := range tier {
1923                 j := mathRand.Intn(i + 1)
1924                 tier[i], tier[j] = tier[j], tier[i]
1925         }
1926 }
1927
1928 func copyTrackers(base [][]tracker.Client) (copy [][]tracker.Client) {
1929         for _, tier := range base {
1930                 copy = append(copy, append([]tracker.Client{}, tier...))
1931         }
1932         return
1933 }
1934
1935 func mergeTier(tier []tracker.Client, newURLs []string) []tracker.Client {
1936 nextURL:
1937         for _, url := range newURLs {
1938                 for _, tr := range tier {
1939                         if tr.URL() == url {
1940                                 continue nextURL
1941                         }
1942                 }
1943                 tr, err := tracker.New(url)
1944                 if err != nil {
1945                         log.Printf("error creating tracker client for %q: %s", url, err)
1946                         continue
1947                 }
1948                 tier = append(tier, tr)
1949         }
1950         return tier
1951 }
1952
1953 func (t *torrent) addTrackers(announceList [][]string) {
1954         newTrackers := copyTrackers(t.Trackers)
1955         for tierIndex, tier := range announceList {
1956                 if tierIndex < len(newTrackers) {
1957                         newTrackers[tierIndex] = mergeTier(newTrackers[tierIndex], tier)
1958                 } else {
1959                         newTrackers = append(newTrackers, mergeTier(nil, tier))
1960                 }
1961                 shuffleTier(newTrackers[tierIndex])
1962         }
1963         t.Trackers = newTrackers
1964 }
1965
1966 // Don't call this before the info is available.
1967 func (t *torrent) BytesCompleted() int64 {
1968         if !t.haveInfo() {
1969                 return 0
1970         }
1971         return t.Info.TotalLength() - t.bytesLeft()
1972 }
1973
1974 // A file-like handle to some torrent data resource.
1975 type Handle interface {
1976         io.Reader
1977         io.Seeker
1978         io.Closer
1979         io.ReaderAt
1980 }
1981
1982 // Returns handles to the files in the torrent. This requires the metainfo is
1983 // available first.
1984 func (t Torrent) Files() (ret []File) {
1985         t.cl.mu.Lock()
1986         info := t.Info()
1987         t.cl.mu.Unlock()
1988         if info == nil {
1989                 return
1990         }
1991         var offset int64
1992         for _, fi := range info.UpvertedFiles() {
1993                 ret = append(ret, File{
1994                         t,
1995                         strings.Join(append([]string{info.Name}, fi.Path...), "/"),
1996                         offset,
1997                         fi.Length,
1998                         fi,
1999                 })
2000                 offset += fi.Length
2001         }
2002         return
2003 }
2004
2005 // Marks the pieces in the given region for download.
2006 func (t Torrent) SetRegionPriority(off, len int64) {
2007         t.cl.mu.Lock()
2008         defer t.cl.mu.Unlock()
2009         pieceSize := int64(t.usualPieceSize())
2010         for i := off / pieceSize; i*pieceSize < off+len; i++ {
2011                 t.cl.raisePiecePriority(t.torrent, int(i), PiecePriorityNormal)
2012         }
2013 }
2014
2015 func (t Torrent) AddPeers(pp []Peer) error {
2016         cl := t.cl
2017         cl.mu.Lock()
2018         defer cl.mu.Unlock()
2019         cl.addPeers(t.torrent, pp)
2020         return nil
2021 }
2022
2023 // Marks the entire torrent for download. Requires the info first, see
2024 // GotInfo.
2025 func (t Torrent) DownloadAll() {
2026         t.cl.mu.Lock()
2027         defer t.cl.mu.Unlock()
2028         for i := range iter.N(t.numPieces()) {
2029                 t.cl.raisePiecePriority(t.torrent, i, PiecePriorityNormal)
2030         }
2031         // Nice to have the first and last pieces sooner for various interactive
2032         // purposes.
2033         t.cl.raisePiecePriority(t.torrent, 0, PiecePriorityReadahead)
2034         t.cl.raisePiecePriority(t.torrent, t.numPieces()-1, PiecePriorityReadahead)
2035 }
2036
2037 // Returns nil metainfo if it isn't in the cache. Checks that the retrieved
2038 // metainfo has the correct infohash.
2039 func (cl *Client) torrentCacheMetaInfo(ih InfoHash) (mi *metainfo.MetaInfo, err error) {
2040         if cl.config.DisableMetainfoCache {
2041                 return
2042         }
2043         f, err := os.Open(cl.cachedMetaInfoFilename(ih))
2044         if err != nil {
2045                 if os.IsNotExist(err) {
2046                         err = nil
2047                 }
2048                 return
2049         }
2050         defer f.Close()
2051         dec := bencode.NewDecoder(f)
2052         err = dec.Decode(&mi)
2053         if err != nil {
2054                 return
2055         }
2056         if !bytes.Equal(mi.Info.Hash, ih[:]) {
2057                 err = fmt.Errorf("cached torrent has wrong infohash: %x != %x", mi.Info.Hash, ih[:])
2058                 return
2059         }
2060         return
2061 }
2062
2063 // Specifies a new torrent for adding to a client. There are helpers for
2064 // magnet URIs and torrent metainfo files.
2065 type TorrentSpec struct {
2066         Trackers    [][]string
2067         InfoHash    InfoHash
2068         Info        *metainfo.InfoEx
2069         DisplayName string
2070 }
2071
2072 func TorrentSpecFromMagnetURI(uri string) (spec *TorrentSpec, err error) {
2073         m, err := ParseMagnetURI(uri)
2074         if err != nil {
2075                 return
2076         }
2077         spec = &TorrentSpec{
2078                 Trackers:    [][]string{m.Trackers},
2079                 DisplayName: m.DisplayName,
2080                 InfoHash:    m.InfoHash,
2081         }
2082         return
2083 }
2084
2085 func TorrentSpecFromMetaInfo(mi *metainfo.MetaInfo) (spec *TorrentSpec) {
2086         spec = &TorrentSpec{
2087                 Trackers:    mi.AnnounceList,
2088                 Info:        &mi.Info,
2089                 DisplayName: mi.Info.Name,
2090         }
2091         CopyExact(&spec.InfoHash, &mi.Info.Hash)
2092         return
2093 }
2094
2095 // Add or merge a torrent spec. If the torrent is already present, the
2096 // trackers will be merged with the existing ones. If the Info isn't yet
2097 // known, it will be set. The display name is replaced if the new spec
2098 // provides one. Returns new if the torrent wasn't already in the client.
2099 func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (T Torrent, new bool, err error) {
2100         T.cl = cl
2101         cl.mu.Lock()
2102         defer cl.mu.Unlock()
2103
2104         t, ok := cl.torrents[spec.InfoHash]
2105         if !ok {
2106                 new = true
2107
2108                 if _, ok := cl.bannedTorrents[spec.InfoHash]; ok {
2109                         err = errors.New("banned torrent")
2110                         return
2111                 }
2112
2113                 t, err = newTorrent(spec.InfoHash)
2114                 if err != nil {
2115                         return
2116                 }
2117         }
2118         if spec.DisplayName != "" {
2119                 t.DisplayName = spec.DisplayName
2120         }
2121         // Try to merge in info we have on the torrent. Any err left will
2122         // terminate the function.
2123         if t.Info == nil {
2124                 if spec.Info != nil {
2125                         err = cl.setMetaData(t, &spec.Info.Info, spec.Info.Bytes)
2126                 } else {
2127                         var mi *metainfo.MetaInfo
2128                         mi, err = cl.torrentCacheMetaInfo(spec.InfoHash)
2129                         if err != nil {
2130                                 log.Printf("error getting cached metainfo: %s", err)
2131                                 err = nil
2132                         } else if mi != nil {
2133                                 t.addTrackers(mi.AnnounceList)
2134                                 err = cl.setMetaData(t, &mi.Info.Info, mi.Info.Bytes)
2135                         }
2136                 }
2137         }
2138         if err != nil {
2139                 return
2140         }
2141         t.addTrackers(spec.Trackers)
2142
2143         cl.torrents[spec.InfoHash] = t
2144         T.torrent = t
2145
2146         // From this point onwards, we can consider the torrent a part of the
2147         // client.
2148         if new {
2149                 t.pruneTimer = time.AfterFunc(0, func() {
2150                         cl.pruneConnectionsUnlocked(T.torrent)
2151                 })
2152                 if !cl.config.DisableTrackers {
2153                         go cl.announceTorrentTrackers(T.torrent)
2154                 }
2155                 if cl.dHT != nil {
2156                         go cl.announceTorrentDHT(T.torrent, true)
2157                 }
2158         }
2159         return
2160 }
2161
2162 // Prunes unused connections. This is required to make space to dial for
2163 // replacements.
2164 func (cl *Client) pruneConnectionsUnlocked(t *torrent) {
2165         select {
2166         case <-t.ceasingNetworking:
2167                 return
2168         case <-t.closing:
2169                 return
2170         default:
2171         }
2172         cl.mu.Lock()
2173         license := len(t.Conns) - (socketsPerTorrent+1)/2
2174         for _, c := range t.Conns {
2175                 if license <= 0 {
2176                         break
2177                 }
2178                 if time.Now().Sub(c.lastUsefulChunkReceived) < time.Minute {
2179                         continue
2180                 }
2181                 if time.Now().Sub(c.completedHandshake) < time.Minute {
2182                         continue
2183                 }
2184                 c.Close()
2185                 license--
2186         }
2187         cl.mu.Unlock()
2188         t.pruneTimer.Reset(pruneInterval)
2189 }
2190
2191 func (me *Client) dropTorrent(infoHash InfoHash) (err error) {
2192         t, ok := me.torrents[infoHash]
2193         if !ok {
2194                 err = fmt.Errorf("no such torrent")
2195                 return
2196         }
2197         err = t.close()
2198         if err != nil {
2199                 panic(err)
2200         }
2201         delete(me.torrents, infoHash)
2202         return
2203 }
2204
2205 // Returns true when peers are required, or false if the torrent is closing.
2206 func (cl *Client) waitWantPeers(t *torrent) bool {
2207         cl.mu.Lock()
2208         defer cl.mu.Unlock()
2209         t.stateMu.Lock()
2210         defer t.stateMu.Unlock()
2211         for {
2212                 select {
2213                 case <-t.ceasingNetworking:
2214                         return false
2215                 default:
2216                 }
2217                 if len(t.Peers) > torrentPeersLowWater {
2218                         goto wait
2219                 }
2220                 if t.needData() || cl.seeding(t) {
2221                         return true
2222                 }
2223         wait:
2224                 cl.mu.Unlock()
2225                 t.wantPeers.Wait()
2226                 t.stateMu.Unlock()
2227                 cl.mu.Lock()
2228                 t.stateMu.Lock()
2229         }
2230 }
2231
2232 // Returns whether the client should make effort to seed the torrent.
2233 func (cl *Client) seeding(t *torrent) bool {
2234         if cl.config.NoUpload {
2235                 return false
2236         }
2237         if !cl.config.Seed {
2238                 return false
2239         }
2240         if t.needData() {
2241                 return false
2242         }
2243         return true
2244 }
2245
2246 func (cl *Client) announceTorrentDHT(t *torrent, impliedPort bool) {
2247         for cl.waitWantPeers(t) {
2248                 log.Printf("getting peers for %q from DHT", t)
2249                 ps, err := cl.dHT.Announce(string(t.InfoHash[:]), cl.incomingPeerPort(), impliedPort)
2250                 if err != nil {
2251                         log.Printf("error getting peers from dht: %s", err)
2252                         return
2253                 }
2254                 allAddrs := make(map[string]struct{})
2255         getPeers:
2256                 for {
2257                         select {
2258                         case v, ok := <-ps.Peers:
2259                                 if !ok {
2260                                         break getPeers
2261                                 }
2262                                 peersFoundByDHT.Add(int64(len(v.Peers)))
2263                                 for _, p := range v.Peers {
2264                                         allAddrs[(&net.UDPAddr{
2265                                                 IP:   p.IP[:],
2266                                                 Port: int(p.Port),
2267                                         }).String()] = struct{}{}
2268                                 }
2269                                 // log.Printf("%s: %d new peers from DHT", t, len(v.Peers))
2270                                 cl.mu.Lock()
2271                                 cl.addPeers(t, func() (ret []Peer) {
2272                                         for _, cp := range v.Peers {
2273                                                 ret = append(ret, Peer{
2274                                                         IP:     cp.IP[:],
2275                                                         Port:   int(cp.Port),
2276                                                         Source: peerSourceDHT,
2277                                                 })
2278                                         }
2279                                         return
2280                                 }())
2281                                 numPeers := len(t.Peers)
2282                                 cl.mu.Unlock()
2283                                 if numPeers >= torrentPeersHighWater {
2284                                         break getPeers
2285                                 }
2286                         case <-t.ceasingNetworking:
2287                                 ps.Close()
2288                                 return
2289                         }
2290                 }
2291                 ps.Close()
2292                 log.Printf("finished DHT peer scrape for %s: %d peers", t, len(allAddrs))
2293         }
2294 }
2295
2296 func (cl *Client) trackerBlockedUnlocked(tr tracker.Client) (blocked bool, err error) {
2297         url_, err := url.Parse(tr.URL())
2298         if err != nil {
2299                 return
2300         }
2301         host, _, err := net.SplitHostPort(url_.Host)
2302         if err != nil {
2303                 host = url_.Host
2304         }
2305         addr, err := net.ResolveIPAddr("ip", host)
2306         if err != nil {
2307                 return
2308         }
2309         cl.mu.Lock()
2310         if cl.ipBlockList != nil {
2311                 if cl.ipBlockRange(addr.IP) != nil {
2312                         blocked = true
2313                 }
2314         }
2315         cl.mu.Unlock()
2316         return
2317 }
2318
2319 func (cl *Client) announceTorrentSingleTracker(tr tracker.Client, req *tracker.AnnounceRequest, t *torrent) error {
2320         blocked, err := cl.trackerBlockedUnlocked(tr)
2321         if err != nil {
2322                 return fmt.Errorf("error determining if tracker blocked: %s", err)
2323         }
2324         if blocked {
2325                 return fmt.Errorf("tracker blocked: %s", tr)
2326         }
2327         if err := tr.Connect(); err != nil {
2328                 return fmt.Errorf("error connecting: %s", err)
2329         }
2330         resp, err := tr.Announce(req)
2331         if err != nil {
2332                 return fmt.Errorf("error announcing: %s", err)
2333         }
2334         var peers []Peer
2335         for _, peer := range resp.Peers {
2336                 peers = append(peers, Peer{
2337                         IP:   peer.IP,
2338                         Port: peer.Port,
2339                 })
2340         }
2341         cl.mu.Lock()
2342         cl.addPeers(t, peers)
2343         cl.mu.Unlock()
2344
2345         log.Printf("%s: %d new peers from %s", t, len(peers), tr)
2346         peersFoundByTracker.Add(int64(len(peers)))
2347
2348         time.Sleep(time.Second * time.Duration(resp.Interval))
2349         return nil
2350 }
2351
2352 func (cl *Client) announceTorrentTrackersFastStart(req *tracker.AnnounceRequest, trackers [][]tracker.Client, t *torrent) (atLeastOne bool) {
2353         oks := make(chan bool)
2354         outstanding := 0
2355         for _, tier := range trackers {
2356                 for _, tr := range tier {
2357                         outstanding++
2358                         go func(tr tracker.Client) {
2359                                 err := cl.announceTorrentSingleTracker(tr, req, t)
2360                                 oks <- err == nil
2361                         }(tr)
2362                 }
2363         }
2364         for outstanding > 0 {
2365                 ok := <-oks
2366                 outstanding--
2367                 if ok {
2368                         atLeastOne = true
2369                 }
2370         }
2371         return
2372 }
2373
2374 // Announce torrent to its trackers.
2375 func (cl *Client) announceTorrentTrackers(t *torrent) {
2376         req := tracker.AnnounceRequest{
2377                 Event:    tracker.Started,
2378                 NumWant:  -1,
2379                 Port:     uint16(cl.incomingPeerPort()),
2380                 PeerId:   cl.peerID,
2381                 InfoHash: t.InfoHash,
2382         }
2383         if !cl.waitWantPeers(t) {
2384                 return
2385         }
2386         cl.mu.RLock()
2387         req.Left = uint64(t.bytesLeft())
2388         trackers := t.Trackers
2389         cl.mu.RUnlock()
2390         if cl.announceTorrentTrackersFastStart(&req, trackers, t) {
2391                 req.Event = tracker.None
2392         }
2393 newAnnounce:
2394         for cl.waitWantPeers(t) {
2395                 cl.mu.RLock()
2396                 req.Left = uint64(t.bytesLeft())
2397                 trackers = t.Trackers
2398                 cl.mu.RUnlock()
2399                 numTrackersTried := 0
2400                 for _, tier := range trackers {
2401                         for trIndex, tr := range tier {
2402                                 numTrackersTried++
2403                                 err := cl.announceTorrentSingleTracker(tr, &req, t)
2404                                 if err != nil {
2405                                         logonce.Stderr.Printf("%s: error announcing to %s: %s", t, tr, err)
2406                                         continue
2407                                 }
2408                                 // Float the successful announce to the top of the tier. If
2409                                 // the trackers list has been changed, we'll be modifying an
2410                                 // old copy so it won't matter.
2411                                 cl.mu.Lock()
2412                                 tier[0], tier[trIndex] = tier[trIndex], tier[0]
2413                                 cl.mu.Unlock()
2414
2415                                 req.Event = tracker.None
2416                                 continue newAnnounce
2417                         }
2418                 }
2419                 if numTrackersTried != 0 {
2420                         log.Printf("%s: all trackers failed", t)
2421                 }
2422                 // TODO: Wait until trackers are added if there are none.
2423                 time.Sleep(10 * time.Second)
2424         }
2425 }
2426
2427 func (cl *Client) allTorrentsCompleted() bool {
2428         for _, t := range cl.torrents {
2429                 if !t.haveInfo() {
2430                         return false
2431                 }
2432                 if t.numPiecesCompleted() != t.numPieces() {
2433                         return false
2434                 }
2435         }
2436         return true
2437 }
2438
2439 // Returns true when all torrents are completely downloaded and false if the
2440 // client is stopped before that.
2441 func (me *Client) WaitAll() bool {
2442         me.mu.Lock()
2443         defer me.mu.Unlock()
2444         for !me.allTorrentsCompleted() {
2445                 if me.stopped() {
2446                         return false
2447                 }
2448                 me.event.Wait()
2449         }
2450         return true
2451 }
2452
2453 func (me *Client) fillRequests(t *torrent, c *connection) {
2454         if c.Interested {
2455                 if c.PeerChoked {
2456                         return
2457                 }
2458                 if len(c.Requests) > c.requestsLowWater {
2459                         return
2460                 }
2461         }
2462         addRequest := func(req request) (again bool) {
2463                 // TODO: Couldn't this check also be done *after* the request?
2464                 if len(c.Requests) >= 64 {
2465                         return false
2466                 }
2467                 return c.Request(req)
2468         }
2469         for req := range t.urgent {
2470                 if !addRequest(req) {
2471                         return
2472                 }
2473         }
2474         for e := c.pieceRequestOrder.First(); e != nil; e = e.Next() {
2475                 pieceIndex := e.Piece()
2476                 if !c.PeerHasPiece(pieceIndex) {
2477                         panic("piece in request order but peer doesn't have it")
2478                 }
2479                 if !t.wantPiece(pieceIndex) {
2480                         panic("unwanted piece in connection request order")
2481                 }
2482                 piece := t.Pieces[pieceIndex]
2483                 for _, cs := range piece.shuffledPendingChunkSpecs(t.pieceLength(pieceIndex)) {
2484                         r := request{pp.Integer(pieceIndex), cs}
2485                         if !addRequest(r) {
2486                                 return
2487                         }
2488                 }
2489         }
2490         return
2491 }
2492
2493 func (me *Client) replenishConnRequests(t *torrent, c *connection) {
2494         if !t.haveInfo() {
2495                 return
2496         }
2497         me.fillRequests(t, c)
2498         if len(c.Requests) == 0 && !c.PeerChoked {
2499                 // So we're not choked, but we don't want anything right now. We may
2500                 // have completed readahead, and the readahead window has not rolled
2501                 // over to the next piece. Better to stay interested in case we're
2502                 // going to want data in the near future.
2503                 c.SetInterested(!t.haveAllPieces())
2504         }
2505 }
2506
2507 // Handle a received chunk from a peer.
2508 func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) error {
2509         chunksReceived.Add(1)
2510
2511         req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
2512
2513         // Request has been satisfied.
2514         if me.connDeleteRequest(t, c, req) {
2515                 defer me.replenishConnRequests(t, c)
2516         } else {
2517                 unexpectedChunksReceived.Add(1)
2518         }
2519
2520         piece := t.Pieces[req.Index]
2521
2522         // Do we actually want this chunk?
2523         if !t.wantChunk(req) {
2524                 unwantedChunksReceived.Add(1)
2525                 c.UnwantedChunksReceived++
2526                 return nil
2527         }
2528
2529         c.UsefulChunksReceived++
2530         c.lastUsefulChunkReceived = time.Now()
2531
2532         me.upload(t, c)
2533
2534         // Write the chunk out.
2535         err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
2536         if err != nil {
2537                 log.Printf("error writing chunk: %s", err)
2538                 return nil
2539         }
2540
2541         // log.Println("got chunk", req)
2542         piece.Event.Broadcast()
2543         // Record that we have the chunk.
2544         piece.unpendChunkIndex(chunkIndex(req.chunkSpec))
2545         delete(t.urgent, req)
2546         if piece.numPendingChunks() == 0 {
2547                 for _, c := range t.Conns {
2548                         c.pieceRequestOrder.DeletePiece(int(req.Index))
2549                 }
2550                 me.queuePieceCheck(t, req.Index)
2551         }
2552
2553         // Cancel pending requests for this chunk.
2554         for _, c := range t.Conns {
2555                 if me.connCancel(t, c, req) {
2556                         me.replenishConnRequests(t, c)
2557                 }
2558         }
2559
2560         return nil
2561 }
2562
2563 func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) {
2564         p := t.Pieces[piece]
2565         if p.EverHashed && !correct {
2566                 log.Printf("%s: piece %d failed hash", t, piece)
2567                 failedPieceHashes.Add(1)
2568         }
2569         p.EverHashed = true
2570         if correct {
2571                 err := t.data.PieceCompleted(int(piece))
2572                 if err != nil {
2573                         log.Printf("error completing piece: %s", err)
2574                         correct = false
2575                 }
2576         }
2577         me.pieceChanged(t, int(piece))
2578 }
2579
2580 // TODO: Check this isn't called more than once for each piece being correct.
2581 func (me *Client) pieceChanged(t *torrent, piece int) {
2582         correct := t.pieceComplete(piece)
2583         p := t.Pieces[piece]
2584         defer p.Event.Broadcast()
2585         if correct {
2586                 p.Priority = PiecePriorityNone
2587                 p.PendingChunkSpecs = nil
2588                 for req := range t.urgent {
2589                         if int(req.Index) == piece {
2590                                 delete(t.urgent, req)
2591                         }
2592                 }
2593         } else {
2594                 if p.numPendingChunks() == 0 {
2595                         t.pendAllChunkSpecs(int(piece))
2596                 }
2597                 if t.wantPiece(piece) {
2598                         me.openNewConns(t)
2599                 }
2600         }
2601         for _, conn := range t.Conns {
2602                 if correct {
2603                         conn.Post(pp.Message{
2604                                 Type:  pp.Have,
2605                                 Index: pp.Integer(piece),
2606                         })
2607                         // TODO: Cancel requests for this piece.
2608                         for r := range conn.Requests {
2609                                 if int(r.Index) == piece {
2610                                         conn.Cancel(r)
2611                                 }
2612                         }
2613                         conn.pieceRequestOrder.DeletePiece(int(piece))
2614                 }
2615                 if t.wantPiece(piece) && conn.PeerHasPiece(piece) {
2616                         t.connPendPiece(conn, int(piece))
2617                         me.replenishConnRequests(t, conn)
2618                 }
2619         }
2620         me.event.Broadcast()
2621 }
2622
2623 func (cl *Client) verifyPiece(t *torrent, index pp.Integer) {
2624         cl.mu.Lock()
2625         defer cl.mu.Unlock()
2626         p := t.Pieces[index]
2627         for p.Hashing || t.data == nil {
2628                 cl.event.Wait()
2629         }
2630         p.QueuedForHash = false
2631         if t.isClosed() || t.pieceComplete(int(index)) {
2632                 return
2633         }
2634         p.Hashing = true
2635         cl.mu.Unlock()
2636         sum := t.hashPiece(index)
2637         cl.mu.Lock()
2638         select {
2639         case <-t.closing:
2640                 return
2641         default:
2642         }
2643         p.Hashing = false
2644         cl.pieceHashed(t, index, sum == p.Hash)
2645 }
2646
2647 // Returns handles to all the torrents loaded in the Client.
2648 func (me *Client) Torrents() (ret []Torrent) {
2649         me.mu.Lock()
2650         for _, t := range me.torrents {
2651                 ret = append(ret, Torrent{me, t})
2652         }
2653         me.mu.Unlock()
2654         return
2655 }
2656
2657 func (me *Client) AddMagnet(uri string) (T Torrent, err error) {
2658         spec, err := TorrentSpecFromMagnetURI(uri)
2659         if err != nil {
2660                 return
2661         }
2662         T, _, err = me.AddTorrentSpec(spec)
2663         return
2664 }
2665
2666 func (me *Client) AddTorrent(mi *metainfo.MetaInfo) (T Torrent, err error) {
2667         T, _, err = me.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
2668         return
2669 }
2670
2671 func (me *Client) AddTorrentFromFile(filename string) (T Torrent, err error) {
2672         mi, err := metainfo.LoadFromFile(filename)
2673         if err != nil {
2674                 return
2675         }
2676         T, _, err = me.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
2677         return
2678 }