]> Sergey Matveev's repositories - btrtrc.git/blob - client.go
5625f017ee96cbc3cc74a2fb87a640265ce112c5
[btrtrc.git] / client.go
1 /*
2 Package torrent implements a torrent client.
3
4 Simple example:
5
6         c := &Client{}
7         c.Start()
8         defer c.Stop()
9         if err := c.AddTorrent(externalMetaInfoPackageSux); err != nil {
10                 return fmt.Errors("error adding torrent: %s", err)
11         }
12         c.WaitAll()
13         log.Print("erhmahgerd, torrent downloaded")
14
15 */
16 package torrent
17
18 import (
19         "bufio"
20         "bytes"
21         "container/heap"
22         "crypto/rand"
23         "crypto/sha1"
24         "errors"
25         "expvar"
26         "fmt"
27         "io"
28         "log"
29         "math/big"
30         mathRand "math/rand"
31         "net"
32         "os"
33         "path/filepath"
34         "sort"
35         "strings"
36         "sync"
37         "syscall"
38         "time"
39
40         "github.com/h2so5/utp"
41
42         "github.com/anacrolix/libtorgo/bencode"
43         "github.com/anacrolix/libtorgo/metainfo"
44
45         "bitbucket.org/anacrolix/go.torrent/dht"
46         "bitbucket.org/anacrolix/go.torrent/iplist"
47         pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
48         "bitbucket.org/anacrolix/go.torrent/tracker"
49         _ "bitbucket.org/anacrolix/go.torrent/tracker/udp"
50         . "bitbucket.org/anacrolix/go.torrent/util"
51 )
52
53 var (
54         unusedDownloadedChunksCount = expvar.NewInt("unusedDownloadedChunksCount")
55         chunksDownloadedCount       = expvar.NewInt("chunksDownloadedCount")
56         peersFoundByDHT             = expvar.NewInt("peersFoundByDHT")
57         peersFoundByPEX             = expvar.NewInt("peersFoundByPEX")
58         uploadChunksPosted          = expvar.NewInt("uploadChunksPosted")
59         unexpectedCancels           = expvar.NewInt("unexpectedCancels")
60         postedCancels               = expvar.NewInt("postedCancels")
61         duplicateConnsAvoided       = expvar.NewInt("duplicateConnsAvoided")
62         failedPieceHashes           = expvar.NewInt("failedPieceHashes")
63         unsuccessfulDials           = expvar.NewInt("unsuccessfulDials")
64         successfulDials             = expvar.NewInt("successfulDials")
65         acceptedConns               = expvar.NewInt("acceptedConns")
66         inboundConnsBlocked         = expvar.NewInt("inboundConnsBlocked")
67 )
68
69 const (
70         // Justification for set bits follows.
71         //
72         // Extension protocol: http://www.bittorrent.org/beps/bep_0010.html
73         // DHT: http://www.bittorrent.org/beps/bep_0005.html
74         extensionBytes = "\x00\x00\x00\x00\x00\x10\x00\x01"
75
76         socketsPerTorrent = 40
77 )
78
79 // Currently doesn't really queue, but should in the future.
80 func (cl *Client) queuePieceCheck(t *torrent, pieceIndex pp.Integer) {
81         piece := t.Pieces[pieceIndex]
82         if piece.QueuedForHash {
83                 return
84         }
85         piece.QueuedForHash = true
86         go cl.verifyPiece(t, pieceIndex)
87 }
88
89 func (cl *Client) queueFirstHash(t *torrent, piece int) {
90         p := t.Pieces[piece]
91         if p.EverHashed || p.Hashing || p.QueuedForHash {
92                 return
93         }
94         cl.queuePieceCheck(t, pp.Integer(piece))
95 }
96
97 // Queues the torrent data for the given region for download. The beginning of
98 // the region is given highest priority to allow a subsequent read at the same
99 // offset to return data ASAP.
100 func (me *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) error {
101         me.mu.Lock()
102         defer me.mu.Unlock()
103         t := me.torrent(ih)
104         if t == nil {
105                 return errors.New("no such active torrent")
106         }
107         if !t.haveInfo() {
108                 return errors.New("missing metadata")
109         }
110         firstIndex := int(off / int64(t.UsualPieceSize()))
111         for i := 0; i < 5; i++ {
112                 index := firstIndex + i
113                 if index >= t.NumPieces() {
114                         continue
115                 }
116                 me.queueFirstHash(t, index)
117         }
118         me.downloadStrategy.TorrentPrioritize(t, off, len_)
119         for _, cn := range t.Conns {
120                 me.replenishConnRequests(t, cn)
121         }
122         me.openNewConns(t)
123         return nil
124 }
125
126 type dataWait struct {
127         offset int64
128         ready  chan struct{}
129 }
130
131 type Client struct {
132         noUpload         bool
133         dataDir          string
134         halfOpenLimit    int
135         peerID           [20]byte
136         listeners        []net.Listener
137         disableTrackers  bool
138         downloadStrategy DownloadStrategy
139         dHT              *dht.Server
140         disableUTP       bool
141         ipBlockList      *iplist.IPList
142         bannedTorrents   map[InfoHash]struct{}
143
144         mu    sync.RWMutex
145         event sync.Cond
146         quit  chan struct{}
147
148         handshaking int
149
150         torrents map[InfoHash]*torrent
151
152         dataWaits map[*torrent][]dataWait
153 }
154
155 func (me *Client) SetIPBlockList(list *iplist.IPList) {
156         me.mu.Lock()
157         defer me.mu.Unlock()
158         me.ipBlockList = list
159         if me.dHT != nil {
160                 me.dHT.SetIPBlockList(list)
161         }
162 }
163
164 func (me *Client) PeerID() string {
165         return string(me.peerID[:])
166 }
167
168 func (me *Client) ListenAddr() (addr net.Addr) {
169         for _, l := range me.listeners {
170                 if addr != nil && l.Addr().String() != addr.String() {
171                         panic("listeners exist on different addresses")
172                 }
173                 addr = l.Addr()
174         }
175         return
176 }
177
178 type hashSorter struct {
179         Hashes []InfoHash
180 }
181
182 func (me hashSorter) Len() int {
183         return len(me.Hashes)
184 }
185
186 func (me hashSorter) Less(a, b int) bool {
187         return (&big.Int{}).SetBytes(me.Hashes[a][:]).Cmp((&big.Int{}).SetBytes(me.Hashes[b][:])) < 0
188 }
189
190 func (me hashSorter) Swap(a, b int) {
191         me.Hashes[a], me.Hashes[b] = me.Hashes[b], me.Hashes[a]
192 }
193
194 func (cl *Client) sortedTorrents() (ret []*torrent) {
195         var hs hashSorter
196         for ih := range cl.torrents {
197                 hs.Hashes = append(hs.Hashes, ih)
198         }
199         sort.Sort(hs)
200         for _, ih := range hs.Hashes {
201                 ret = append(ret, cl.torrent(ih))
202         }
203         return
204 }
205
206 func (cl *Client) WriteStatus(_w io.Writer) {
207         cl.mu.RLock()
208         defer cl.mu.RUnlock()
209         w := bufio.NewWriter(_w)
210         defer w.Flush()
211         fmt.Fprintf(w, "Listening on %s\n", cl.ListenAddr())
212         fmt.Fprintf(w, "Peer ID: %q\n", cl.peerID)
213         fmt.Fprintf(w, "Handshaking: %d\n", cl.handshaking)
214         if cl.dHT != nil {
215                 fmt.Fprintf(w, "DHT nodes: %d\n", cl.dHT.NumNodes())
216                 fmt.Fprintf(w, "DHT Server ID: %x\n", cl.dHT.IDString())
217                 fmt.Fprintf(w, "DHT port: %d\n", addrPort(cl.dHT.LocalAddr()))
218                 fmt.Fprintf(w, "DHT announces: %d\n", cl.dHT.NumConfirmedAnnounces)
219         }
220         cl.downloadStrategy.WriteStatus(w)
221         fmt.Fprintln(w)
222         for _, t := range cl.sortedTorrents() {
223                 if t.Name() == "" {
224                         fmt.Fprint(w, "<unknown name>")
225                 } else {
226                         fmt.Fprint(w, t.Name())
227                 }
228                 if t.haveInfo() {
229                         fmt.Fprintf(w, ": %f%% of %d bytes", 100*(1-float32(t.BytesLeft())/float32(t.Length())), t.Length())
230                 }
231                 fmt.Fprint(w, "\n")
232                 fmt.Fprint(w, "Blocked reads:")
233                 for _, dw := range cl.dataWaits[t] {
234                         fmt.Fprintf(w, " %d", dw.offset)
235                 }
236                 fmt.Fprintln(w)
237                 t.WriteStatus(w)
238                 fmt.Fprintln(w)
239         }
240 }
241
242 // Read torrent data at the given offset. Returns ErrDataNotReady if the data
243 // isn't available.
244 func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err error) {
245         cl.mu.RLock()
246         defer cl.mu.RUnlock()
247         t := cl.torrent(ih)
248         if t == nil {
249                 err = errors.New("unknown torrent")
250                 return
251         }
252         index := pp.Integer(off / int64(t.UsualPieceSize()))
253         // Reading outside the bounds of a file is an error.
254         if index < 0 {
255                 err = os.ErrInvalid
256                 return
257         }
258         if int(index) >= len(t.Pieces) {
259                 err = io.EOF
260                 return
261         }
262         piece := t.Pieces[index]
263         pieceOff := pp.Integer(off % int64(t.UsualPieceSize()))
264         pieceLeft := int(t.PieceLength(index) - pieceOff)
265         if pieceLeft <= 0 {
266                 err = io.EOF
267                 return
268         }
269         if len(p) > pieceLeft {
270                 p = p[:pieceLeft]
271         }
272         for cs, _ := range piece.PendingChunkSpecs {
273                 chunkOff := int64(pieceOff) - int64(cs.Begin)
274                 if chunkOff >= int64(t.PieceLength(index)) {
275                         panic(chunkOff)
276                 }
277                 if 0 <= chunkOff && chunkOff < int64(cs.Length) {
278                         // read begins in a pending chunk
279                         err = ErrDataNotReady
280                         return
281                 }
282                 // pending chunk caps available data
283                 if chunkOff < 0 && int64(len(p)) > -chunkOff {
284                         p = p[:-chunkOff]
285                 }
286         }
287         if len(p) == 0 {
288                 panic(len(p))
289         }
290         return t.Data.ReadAt(p, off)
291 }
292
293 func (cl *Client) configDir() string {
294         return filepath.Join(os.Getenv("HOME"), ".config/torrent")
295 }
296
297 func (cl *Client) setEnvBlocklist() (err error) {
298         filename := os.Getenv("TORRENT_BLOCKLIST_FILE")
299         defaultBlocklist := filename == ""
300         if defaultBlocklist {
301                 filename = filepath.Join(cl.configDir(), "blocklist")
302         }
303         f, err := os.Open(filename)
304         if err != nil {
305                 if defaultBlocklist {
306                         err = nil
307                 }
308                 return
309         }
310         defer f.Close()
311         var ranges []iplist.Range
312         scanner := bufio.NewScanner(f)
313         for scanner.Scan() {
314                 r, ok, lineErr := iplist.ParseBlocklistP2PLine(scanner.Text())
315                 if lineErr != nil {
316                         err = fmt.Errorf("error reading torrent blocklist line: %s", lineErr)
317                         return
318                 }
319                 if !ok {
320                         continue
321                 }
322                 ranges = append(ranges, r)
323         }
324         err = scanner.Err()
325         if err != nil {
326                 err = fmt.Errorf("error reading torrent blocklist: %s", err)
327                 return
328         }
329         cl.ipBlockList = iplist.New(ranges)
330         return
331 }
332
333 func (cl *Client) initBannedTorrents() error {
334         f, err := os.Open(filepath.Join(cl.configDir(), "banned_infohashes"))
335         if err != nil {
336                 if os.IsNotExist(err) {
337                         return nil
338                 }
339                 return fmt.Errorf("error opening banned infohashes file: %s", err)
340         }
341         defer f.Close()
342         scanner := bufio.NewScanner(f)
343         cl.bannedTorrents = make(map[InfoHash]struct{})
344         for scanner.Scan() {
345                 var ihs string
346                 n, err := fmt.Sscanf(scanner.Text(), "%x", &ihs)
347                 if err != nil {
348                         return fmt.Errorf("error reading infohash: %s", err)
349                 }
350                 if n != 1 {
351                         continue
352                 }
353                 if len(ihs) != 20 {
354                         return errors.New("bad infohash")
355                 }
356                 var ih InfoHash
357                 CopyExact(&ih, ihs)
358                 cl.bannedTorrents[ih] = struct{}{}
359         }
360         if err := scanner.Err(); err != nil {
361                 return fmt.Errorf("error scanning file: %s", err)
362         }
363         return nil
364 }
365
366 func NewClient(cfg *Config) (cl *Client, err error) {
367         if cfg == nil {
368                 cfg = &Config{}
369         }
370
371         cl = &Client{
372                 noUpload:         cfg.NoUpload,
373                 disableTrackers:  cfg.DisableTrackers,
374                 downloadStrategy: cfg.DownloadStrategy,
375                 halfOpenLimit:    socketsPerTorrent,
376                 dataDir:          cfg.DataDir,
377                 disableUTP:       cfg.DisableUTP,
378
379                 quit:     make(chan struct{}),
380                 torrents: make(map[InfoHash]*torrent),
381
382                 dataWaits: make(map[*torrent][]dataWait),
383         }
384         cl.event.L = &cl.mu
385
386         if !cfg.NoDefaultBlocklist {
387                 err = cl.setEnvBlocklist()
388                 if err != nil {
389                         return
390                 }
391         }
392
393         if err = cl.initBannedTorrents(); err != nil {
394                 err = fmt.Errorf("error initing banned torrents: %s", err)
395                 return
396         }
397
398         if cfg.PeerID != "" {
399                 CopyExact(&cl.peerID, cfg.PeerID)
400         } else {
401                 o := copy(cl.peerID[:], BEP20)
402                 _, err = rand.Read(cl.peerID[o:])
403                 if err != nil {
404                         panic("error generating peer id")
405                 }
406         }
407
408         if cl.downloadStrategy == nil {
409                 cl.downloadStrategy = &DefaultDownloadStrategy{}
410         }
411
412         // Returns the laddr string to listen on for the next Listen call.
413         listenAddr := func() string {
414                 if addr := cl.ListenAddr(); addr != nil {
415                         return addr.String()
416                 }
417                 if cfg.ListenAddr == "" {
418                         return ":50007"
419                 }
420                 return cfg.ListenAddr
421         }
422         if !cfg.DisableTCP {
423                 var l net.Listener
424                 l, err = net.Listen("tcp", listenAddr())
425                 if err != nil {
426                         return
427                 }
428                 cl.listeners = append(cl.listeners, l)
429                 go cl.acceptConnections(l, false)
430         }
431         var utpL *utp.UTPListener
432         if !cfg.DisableUTP {
433                 utpL, err = utp.Listen("utp", listenAddr())
434                 if err != nil {
435                         return
436                 }
437                 cl.listeners = append(cl.listeners, utpL)
438                 go cl.acceptConnections(utpL, true)
439         }
440         if !cfg.NoDHT {
441                 dhtCfg := cfg.DHTConfig
442                 if dhtCfg == nil {
443                         dhtCfg = &dht.ServerConfig{}
444                 }
445                 if dhtCfg.Addr == "" {
446                         dhtCfg.Addr = listenAddr()
447                 }
448                 if dhtCfg.Conn == nil && utpL != nil {
449                         dhtCfg.Conn = utpL.RawConn
450                 }
451                 cl.dHT, err = dht.NewServer(dhtCfg)
452                 if cl.ipBlockList != nil {
453                         cl.dHT.SetIPBlockList(cl.ipBlockList)
454                 }
455                 if err != nil {
456                         return
457                 }
458         }
459
460         return
461 }
462
463 func (cl *Client) stopped() bool {
464         select {
465         case <-cl.quit:
466                 return true
467         default:
468                 return false
469         }
470 }
471
472 // Stops the client. All connections to peers are closed and all activity will
473 // come to a halt.
474 func (me *Client) Stop() {
475         me.mu.Lock()
476         close(me.quit)
477         for _, l := range me.listeners {
478                 l.Close()
479         }
480         me.event.Broadcast()
481         for _, t := range me.torrents {
482                 t.Close()
483         }
484         me.mu.Unlock()
485 }
486
487 var ipv6BlockRange = iplist.Range{Description: "non-IPv4 address"}
488
489 func (cl *Client) ipBlockRange(ip net.IP) (r *iplist.Range) {
490         if cl.ipBlockList == nil {
491                 return
492         }
493         ip = ip.To4()
494         if ip == nil {
495                 log.Printf("saw non-IPv4 address")
496                 r = &ipv6BlockRange
497                 return
498         }
499         r = cl.ipBlockList.Lookup(ip)
500         return
501 }
502
503 func (cl *Client) acceptConnections(l net.Listener, utp bool) {
504         for {
505                 // We accept all connections immediately, because we don't know what
506                 // torrent they're for.
507                 conn, err := l.Accept()
508                 select {
509                 case <-cl.quit:
510                         if conn != nil {
511                                 conn.Close()
512                         }
513                         return
514                 default:
515                 }
516                 if err != nil {
517                         log.Print(err)
518                         return
519                 }
520                 acceptedConns.Add(1)
521                 cl.mu.RLock()
522                 blockRange := cl.ipBlockRange(AddrIP(conn.RemoteAddr()))
523                 cl.mu.RUnlock()
524                 if blockRange != nil {
525                         inboundConnsBlocked.Add(1)
526                         log.Printf("inbound connection from %s blocked by %s", conn.RemoteAddr(), blockRange)
527                         continue
528                 }
529                 go func() {
530                         if err := cl.runConnection(conn, nil, peerSourceIncoming, utp); err != nil {
531                                 log.Print(err)
532                         }
533                 }()
534         }
535 }
536
537 func (me *Client) torrent(ih InfoHash) *torrent {
538         for _, t := range me.torrents {
539                 if t.InfoHash == ih {
540                         return t
541                 }
542         }
543         return nil
544 }
545
546 type dialResult struct {
547         net.Conn
548         UTP bool
549 }
550
551 func doDial(dial func() (net.Conn, error), ch chan dialResult, utp bool) {
552         conn, err := dial()
553         if err != nil {
554                 conn = nil // Pedantic
555         }
556         ch <- dialResult{conn, utp}
557         if err == nil {
558                 successfulDials.Add(1)
559                 return
560         }
561         unsuccessfulDials.Add(1)
562         if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
563                 return
564         }
565         if netOpErr, ok := err.(*net.OpError); ok {
566                 switch netOpErr.Err {
567                 case syscall.ECONNREFUSED, syscall.EHOSTUNREACH:
568                         return
569                 }
570         }
571         if err != nil {
572                 log.Printf("error connecting to peer: %s %#v", err, err)
573                 return
574         }
575 }
576
577 func reducedDialTimeout(max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
578         ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
579         if ret < minDialTimeout {
580                 ret = minDialTimeout
581         }
582         return
583 }
584
585 // Start the process of connecting to the given peer for the given torrent if
586 // appropriate.
587 func (me *Client) initiateConn(peer Peer, t *torrent) {
588         if peer.Id == me.peerID {
589                 return
590         }
591         addr := net.JoinHostPort(peer.IP.String(), fmt.Sprintf("%d", peer.Port))
592         if t.addrActive(addr) {
593                 duplicateConnsAvoided.Add(1)
594                 return
595         }
596         if r := me.ipBlockRange(peer.IP); r != nil {
597                 log.Printf("outbound connect to %s blocked by IP blocklist rule %s", peer.IP, r)
598                 return
599         }
600         dialTimeout := reducedDialTimeout(nominalDialTimeout, me.halfOpenLimit, len(t.Peers))
601         t.HalfOpen[addr] = struct{}{}
602         go func() {
603                 // Binding to the listen address and dialing via net.Dialer gives
604                 // "address in use" error. It seems it's not possible to dial out from
605                 // this address so that peers associate our local address with our
606                 // listen address.
607
608                 // Initiate connections via TCP and UTP simultaneously. Use the first
609                 // one that succeeds.
610                 left := 1
611                 if !me.disableUTP {
612                         left++
613                 }
614                 resCh := make(chan dialResult, left)
615                 if !me.disableUTP {
616                         go doDial(func() (net.Conn, error) {
617                                 return (&utp.Dialer{Timeout: dialTimeout}).Dial("utp", addr)
618                         }, resCh, true)
619                 }
620                 go doDial(func() (net.Conn, error) {
621                         // time.Sleep(time.Second) // Give uTP a bit of a head start.
622                         return net.DialTimeout("tcp", addr, dialTimeout)
623                 }, resCh, false)
624                 var res dialResult
625                 for ; left > 0 && res.Conn == nil; left-- {
626                         res = <-resCh
627                 }
628                 // Whether or not the connection attempt succeeds, the half open
629                 // counter should be decremented, and new connection attempts made.
630                 go func() {
631                         me.mu.Lock()
632                         defer me.mu.Unlock()
633                         if _, ok := t.HalfOpen[addr]; !ok {
634                                 panic("invariant broken")
635                         }
636                         delete(t.HalfOpen, addr)
637                         me.openNewConns(t)
638                 }()
639                 if res.Conn == nil {
640                         return
641                 }
642                 if left > 0 {
643                         go func() {
644                                 for ; left > 0; left-- {
645                                         conn := (<-resCh).Conn
646                                         if conn != nil {
647                                                 conn.Close()
648                                         }
649                                 }
650                         }()
651                 }
652
653                 // log.Printf("connected to %s", conn.RemoteAddr())
654                 err := me.runConnection(res.Conn, t, peer.Source, res.UTP)
655                 if err != nil {
656                         log.Print(err)
657                 }
658         }()
659 }
660
661 // The port number for incoming peer connections. 0 if the client isn't
662 // listening.
663 func (cl *Client) incomingPeerPort() int {
664         listenAddr := cl.ListenAddr()
665         if listenAddr == nil {
666                 return 0
667         }
668         return addrPort(listenAddr)
669 }
670
671 // Convert a net.Addr to its compact IP representation. Either 4 or 16 bytes
672 // per "yourip" field of http://www.bittorrent.org/beps/bep_0010.html.
673 func addrCompactIP(addr net.Addr) (string, error) {
674         host, _, err := net.SplitHostPort(addr.String())
675         if err != nil {
676                 return "", err
677         }
678         ip := net.ParseIP(host)
679         if v4 := ip.To4(); v4 != nil {
680                 if len(v4) != 4 {
681                         panic(v4)
682                 }
683                 return string(v4), nil
684         }
685         return string(ip.To16()), nil
686 }
687
688 func handshakeWriter(w io.WriteCloser, bb <-chan []byte, done chan<- error) {
689         var err error
690         for b := range bb {
691                 _, err = w.Write(b)
692                 if err != nil {
693                         w.Close()
694                         break
695                 }
696         }
697         done <- err
698 }
699
700 type peerExtensionBytes [8]byte
701 type peerID [20]byte
702
703 type handshakeResult struct {
704         peerExtensionBytes
705         peerID
706         InfoHash
707 }
708
709 func handshake(sock io.ReadWriteCloser, ih *InfoHash, peerID [20]byte) (res handshakeResult, ok bool, err error) {
710         // Bytes to be sent to the peer. Should never block the sender.
711         postCh := make(chan []byte, 4)
712         // A single error value sent when the writer completes.
713         writeDone := make(chan error, 1)
714         // Performs writes to the socket and ensures posts don't block.
715         go handshakeWriter(sock, postCh, writeDone)
716
717         defer func() {
718                 close(postCh) // Done writing.
719                 if !ok {
720                         return
721                 }
722                 if err != nil {
723                         panic(err)
724                 }
725                 // Wait until writes complete before returning from handshake.
726                 err = <-writeDone
727                 if err != nil {
728                         err = fmt.Errorf("error writing during handshake: %s", err)
729                 }
730         }()
731
732         post := func(bb []byte) {
733                 select {
734                 case postCh <- bb:
735                 default:
736                         panic("mustn't block while posting")
737                 }
738         }
739
740         post([]byte(pp.Protocol))
741         post([]byte(extensionBytes))
742         if ih != nil { // We already know what we want.
743                 post(ih[:])
744                 post(peerID[:])
745         }
746         var b [68]byte
747         _, err = io.ReadFull(sock, b[:68])
748         if err != nil {
749                 err = nil
750                 return
751         }
752         if string(b[:20]) != pp.Protocol {
753                 return
754         }
755         CopyExact(&res.peerExtensionBytes, b[20:28])
756         CopyExact(&res.InfoHash, b[28:48])
757         CopyExact(&res.peerID, b[48:68])
758
759         if ih == nil { // We were waiting for the peer to tell us what they wanted.
760                 post(res.InfoHash[:])
761                 post(peerID[:])
762         }
763
764         ok = true
765         return
766 }
767
768 type peerConn struct {
769         net.Conn
770 }
771
772 func (pc peerConn) Read(b []byte) (n int, err error) {
773         // Keep-alives should be received every 2 mins. Give a bit of gracetime.
774         err = pc.Conn.SetReadDeadline(time.Now().Add(150 * time.Second))
775         if err != nil {
776                 return
777         }
778         n, err = pc.Conn.Read(b)
779         if err != nil {
780                 if opError, ok := err.(*net.OpError); ok && opError.Op == "read" && opError.Err == syscall.ECONNRESET {
781                         err = io.EOF
782                 } else if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
783                         if n != 0 {
784                                 panic(n)
785                         }
786                         err = io.EOF
787                 }
788         }
789         return
790 }
791
792 func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerSource, uTP bool) (err error) {
793         if tcpConn, ok := sock.(*net.TCPConn); ok {
794                 tcpConn.SetLinger(0)
795         }
796         defer sock.Close()
797         me.mu.Lock()
798         me.handshaking++
799         me.mu.Unlock()
800         // One minute to complete handshake.
801         sock.SetDeadline(time.Now().Add(time.Minute))
802         hsRes, ok, err := handshake(sock, func() *InfoHash {
803                 if torrent == nil {
804                         return nil
805                 } else {
806                         return &torrent.InfoHash
807                 }
808         }(), me.peerID)
809         me.mu.Lock()
810         defer me.mu.Unlock()
811         if me.handshaking == 0 {
812                 panic("handshake count invariant is broken")
813         }
814         me.handshaking--
815         if err != nil {
816                 err = fmt.Errorf("error during handshake: %s", err)
817                 return
818         }
819         if !ok {
820                 return
821         }
822         if hsRes.peerID == me.peerID {
823                 return
824         }
825         torrent = me.torrent(hsRes.InfoHash)
826         if torrent == nil {
827                 return
828         }
829         sock.SetWriteDeadline(time.Time{})
830         sock = peerConn{sock}
831         conn := newConnection(sock, hsRes.peerExtensionBytes, hsRes.peerID, uTP)
832         defer conn.Close()
833         conn.Discovery = discovery
834         if !me.addConnection(torrent, conn) {
835                 return
836         }
837         if conn.PeerExtensionBytes[5]&0x10 != 0 {
838                 conn.Post(pp.Message{
839                         Type:       pp.Extended,
840                         ExtendedID: pp.HandshakeExtendedID,
841                         ExtendedPayload: func() []byte {
842                                 d := map[string]interface{}{
843                                         "m": map[string]int{
844                                                 "ut_metadata": 1,
845                                                 "ut_pex":      2,
846                                         },
847                                         "v": "go.torrent dev 20140825", // Just the date
848                                         // No upload queue is implemented yet.
849                                         "reqq": func() int {
850                                                 if me.noUpload {
851                                                         // No need to look strange if it costs us nothing.
852                                                         return 250
853                                                 } else {
854                                                         return 1
855                                                 }
856                                         }(),
857                                 }
858                                 if torrent.metadataSizeKnown() {
859                                         d["metadata_size"] = torrent.metadataSize()
860                                 }
861                                 if p := me.incomingPeerPort(); p != 0 {
862                                         d["p"] = p
863                                 }
864                                 yourip, err := addrCompactIP(conn.Socket.RemoteAddr())
865                                 if err != nil {
866                                         log.Printf("error calculating yourip field value in extension handshake: %s", err)
867                                 } else {
868                                         d["yourip"] = yourip
869                                 }
870                                 // log.Printf("sending %v", d)
871                                 b, err := bencode.Marshal(d)
872                                 if err != nil {
873                                         panic(err)
874                                 }
875                                 return b
876                         }(),
877                 })
878         }
879         if torrent.haveAnyPieces() {
880                 conn.Post(pp.Message{
881                         Type:     pp.Bitfield,
882                         Bitfield: torrent.bitfield(),
883                 })
884         }
885         if conn.PeerExtensionBytes[7]&0x01 != 0 && me.dHT != nil {
886                 addr, _ := me.dHT.LocalAddr().(*net.UDPAddr)
887                 conn.Post(pp.Message{
888                         Type: pp.Port,
889                         Port: uint16(addr.Port),
890                 })
891         }
892         if torrent.haveInfo() {
893                 conn.initPieceOrder(torrent.NumPieces())
894         }
895         err = me.connectionLoop(torrent, conn)
896         if err != nil {
897                 err = fmt.Errorf("during Connection loop with peer %q: %s", conn.PeerID, err)
898         }
899         me.dropConnection(torrent, conn)
900         return
901 }
902
903 func (me *Client) peerGotPiece(t *torrent, c *connection, piece int) {
904         for piece >= len(c.PeerPieces) {
905                 c.PeerPieces = append(c.PeerPieces, false)
906         }
907         c.PeerPieces[piece] = true
908         if t.wantPiece(piece) {
909                 me.replenishConnRequests(t, c)
910         }
911 }
912
913 func (me *Client) peerUnchoked(torrent *torrent, conn *connection) {
914         me.replenishConnRequests(torrent, conn)
915 }
916
917 func (cl *Client) connCancel(t *torrent, cn *connection, r request) (ok bool) {
918         ok = cn.Cancel(r)
919         if ok {
920                 postedCancels.Add(1)
921                 cl.downloadStrategy.DeleteRequest(t, r)
922         }
923         return
924 }
925
926 func (cl *Client) connDeleteRequest(t *torrent, cn *connection, r request) {
927         if !cn.RequestPending(r) {
928                 return
929         }
930         cl.downloadStrategy.DeleteRequest(t, r)
931         delete(cn.Requests, r)
932 }
933
934 func (cl *Client) requestPendingMetadata(t *torrent, c *connection) {
935         if t.haveInfo() {
936                 return
937         }
938         var pending []int
939         for index := 0; index < t.MetadataPieceCount(); index++ {
940                 if !t.HaveMetadataPiece(index) {
941                         pending = append(pending, index)
942                 }
943         }
944         for _, i := range mathRand.Perm(len(pending)) {
945                 c.Post(pp.Message{
946                         Type:       pp.Extended,
947                         ExtendedID: byte(c.PeerExtensionIDs["ut_metadata"]),
948                         ExtendedPayload: func() []byte {
949                                 b, err := bencode.Marshal(map[string]int{
950                                         "msg_type": 0,
951                                         "piece":    pending[i],
952                                 })
953                                 if err != nil {
954                                         panic(err)
955                                 }
956                                 return b
957                         }(),
958                 })
959         }
960 }
961
962 func (cl *Client) completedMetadata(t *torrent) {
963         h := sha1.New()
964         h.Write(t.MetaData)
965         var ih InfoHash
966         CopyExact(&ih, h.Sum(nil))
967         if ih != t.InfoHash {
968                 log.Print("bad metadata")
969                 t.InvalidateMetadata()
970                 return
971         }
972         var info metainfo.Info
973         err := bencode.Unmarshal(t.MetaData, &info)
974         if err != nil {
975                 log.Printf("error unmarshalling metadata: %s", err)
976                 t.InvalidateMetadata()
977                 return
978         }
979         // TODO(anacrolix): If this fails, I think something harsher should be
980         // done.
981         err = cl.setMetaData(t, info, t.MetaData)
982         if err != nil {
983                 log.Printf("error setting metadata: %s", err)
984                 t.InvalidateMetadata()
985                 return
986         }
987         log.Printf("%s: got metadata from peers", t)
988 }
989
990 // Process incoming ut_metadata message.
991 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *torrent, c *connection) (err error) {
992         var d map[string]int
993         err = bencode.Unmarshal(payload, &d)
994         if err != nil {
995                 err = fmt.Errorf("error unmarshalling payload: %s: %q", err, payload)
996                 return
997         }
998         msgType, ok := d["msg_type"]
999         if !ok {
1000                 err = errors.New("missing msg_type field")
1001                 return
1002         }
1003         piece := d["piece"]
1004         switch msgType {
1005         case pp.DataMetadataExtensionMsgType:
1006                 if t.haveInfo() {
1007                         break
1008                 }
1009                 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
1010                 if begin < 0 || begin >= len(payload) {
1011                         log.Printf("got bad metadata piece")
1012                         break
1013                 }
1014                 t.SaveMetadataPiece(piece, payload[begin:])
1015                 c.UsefulChunksReceived++
1016                 c.lastUsefulChunkReceived = time.Now()
1017                 if !t.HaveAllMetadataPieces() {
1018                         break
1019                 }
1020                 cl.completedMetadata(t)
1021         case pp.RequestMetadataExtensionMsgType:
1022                 if !t.HaveMetadataPiece(piece) {
1023                         c.Post(t.NewMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
1024                         break
1025                 }
1026                 start := (1 << 14) * piece
1027                 c.Post(t.NewMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.MetaData[start:start+t.metadataPieceSize(piece)]))
1028         case pp.RejectMetadataExtensionMsgType:
1029         default:
1030                 err = errors.New("unknown msg_type value")
1031         }
1032         return
1033 }
1034
1035 type peerExchangeMessage struct {
1036         Added      CompactPeers   `bencode:"added"`
1037         AddedFlags []byte         `bencode:"added.f"`
1038         Dropped    []tracker.Peer `bencode:"dropped"`
1039 }
1040
1041 // Extracts the port as an integer from an address string.
1042 func addrPort(addr net.Addr) int {
1043         return AddrPort(addr)
1044 }
1045
1046 // Processes incoming bittorrent messages. The client lock is held upon entry
1047 // and exit.
1048 func (me *Client) connectionLoop(t *torrent, c *connection) error {
1049         decoder := pp.Decoder{
1050                 R:         bufio.NewReader(c.Socket),
1051                 MaxLength: 256 * 1024,
1052         }
1053         for {
1054                 me.mu.Unlock()
1055                 var msg pp.Message
1056                 err := decoder.Decode(&msg)
1057                 me.mu.Lock()
1058                 c.lastMessageReceived = time.Now()
1059                 select {
1060                 case <-c.closing:
1061                         return nil
1062                 default:
1063                 }
1064                 if err != nil {
1065                         if me.stopped() || err == io.EOF {
1066                                 return nil
1067                         }
1068                         return err
1069                 }
1070                 if msg.Keepalive {
1071                         continue
1072                 }
1073                 switch msg.Type {
1074                 case pp.Choke:
1075                         c.PeerChoked = true
1076                         for r := range c.Requests {
1077                                 me.connDeleteRequest(t, c, r)
1078                         }
1079                 case pp.Unchoke:
1080                         c.PeerChoked = false
1081                         me.peerUnchoked(t, c)
1082                 case pp.Interested:
1083                         c.PeerInterested = true
1084                         // TODO: This should be done from a dedicated unchoking routine.
1085                         if me.noUpload {
1086                                 break
1087                         }
1088                         c.Unchoke()
1089                 case pp.NotInterested:
1090                         c.PeerInterested = false
1091                         c.Choke()
1092                 case pp.Have:
1093                         me.peerGotPiece(t, c, int(msg.Index))
1094                 case pp.Request:
1095                         if me.noUpload {
1096                                 break
1097                         }
1098                         if c.PeerRequests == nil {
1099                                 c.PeerRequests = make(map[request]struct{}, maxRequests)
1100                         }
1101                         request := newRequest(msg.Index, msg.Begin, msg.Length)
1102                         // TODO: Requests should be satisfied from a dedicated upload routine.
1103                         // c.PeerRequests[request] = struct{}{}
1104                         p := make([]byte, msg.Length)
1105                         n, err := t.Data.ReadAt(p, int64(t.PieceLength(0))*int64(msg.Index)+int64(msg.Begin))
1106                         if err != nil {
1107                                 return fmt.Errorf("reading t data to serve request %q: %s", request, err)
1108                         }
1109                         if n != int(msg.Length) {
1110                                 return fmt.Errorf("bad request: %v", msg)
1111                         }
1112                         c.Post(pp.Message{
1113                                 Type:  pp.Piece,
1114                                 Index: msg.Index,
1115                                 Begin: msg.Begin,
1116                                 Piece: p,
1117                         })
1118                         uploadChunksPosted.Add(1)
1119                 case pp.Cancel:
1120                         req := newRequest(msg.Index, msg.Begin, msg.Length)
1121                         if !c.PeerCancel(req) {
1122                                 unexpectedCancels.Add(1)
1123                         }
1124                 case pp.Bitfield:
1125                         if c.PeerPieces != nil {
1126                                 err = errors.New("received unexpected bitfield")
1127                                 break
1128                         }
1129                         if t.haveInfo() {
1130                                 if len(msg.Bitfield) < t.NumPieces() {
1131                                         err = errors.New("received invalid bitfield")
1132                                         break
1133                                 }
1134                                 msg.Bitfield = msg.Bitfield[:t.NumPieces()]
1135                         }
1136                         c.PeerPieces = msg.Bitfield
1137                         for index, has := range c.PeerPieces {
1138                                 if has {
1139                                         me.peerGotPiece(t, c, index)
1140                                 }
1141                         }
1142                 case pp.Piece:
1143                         err = me.downloadedChunk(t, c, &msg)
1144                 case pp.Extended:
1145                         switch msg.ExtendedID {
1146                         case pp.HandshakeExtendedID:
1147                                 // TODO: Create a bencode struct for this.
1148                                 var d map[string]interface{}
1149                                 err = bencode.Unmarshal(msg.ExtendedPayload, &d)
1150                                 if err != nil {
1151                                         err = fmt.Errorf("error decoding extended message payload: %s", err)
1152                                         break
1153                                 }
1154                                 // log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d)
1155                                 if reqq, ok := d["reqq"]; ok {
1156                                         if i, ok := reqq.(int64); ok {
1157                                                 c.PeerMaxRequests = int(i)
1158                                         }
1159                                 }
1160                                 if v, ok := d["v"]; ok {
1161                                         c.PeerClientName = v.(string)
1162                                 }
1163                                 m, ok := d["m"]
1164                                 if !ok {
1165                                         err = errors.New("handshake missing m item")
1166                                         break
1167                                 }
1168                                 mTyped, ok := m.(map[string]interface{})
1169                                 if !ok {
1170                                         err = errors.New("handshake m value is not dict")
1171                                         break
1172                                 }
1173                                 if c.PeerExtensionIDs == nil {
1174                                         c.PeerExtensionIDs = make(map[string]int64, len(mTyped))
1175                                 }
1176                                 for name, v := range mTyped {
1177                                         id, ok := v.(int64)
1178                                         if !ok {
1179                                                 log.Printf("bad handshake m item extension ID type: %T", v)
1180                                                 continue
1181                                         }
1182                                         if id == 0 {
1183                                                 delete(c.PeerExtensionIDs, name)
1184                                         } else {
1185                                                 c.PeerExtensionIDs[name] = id
1186                                         }
1187                                 }
1188                                 metadata_sizeUntyped, ok := d["metadata_size"]
1189                                 if ok {
1190                                         metadata_size, ok := metadata_sizeUntyped.(int64)
1191                                         if !ok {
1192                                                 log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
1193                                         } else {
1194                                                 t.SetMetadataSize(metadata_size)
1195                                         }
1196                                 }
1197                                 if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
1198                                         me.requestPendingMetadata(t, c)
1199                                 }
1200                         case 1:
1201                                 err = me.gotMetadataExtensionMsg(msg.ExtendedPayload, t, c)
1202                                 if err != nil {
1203                                         err = fmt.Errorf("error handling metadata extension message: %s", err)
1204                                 }
1205                         case 2:
1206                                 var pexMsg peerExchangeMessage
1207                                 err := bencode.Unmarshal(msg.ExtendedPayload, &pexMsg)
1208                                 if err != nil {
1209                                         err = fmt.Errorf("error unmarshalling PEX message: %s", err)
1210                                         break
1211                                 }
1212                                 go func() {
1213                                         err := me.AddPeers(t.InfoHash, func() (ret []Peer) {
1214                                                 for _, cp := range pexMsg.Added {
1215                                                         p := Peer{
1216                                                                 IP:     make([]byte, 4),
1217                                                                 Port:   int(cp.Port),
1218                                                                 Source: peerSourcePEX,
1219                                                         }
1220                                                         if n := copy(p.IP, cp.IP[:]); n != 4 {
1221                                                                 panic(n)
1222                                                         }
1223                                                         ret = append(ret, p)
1224                                                 }
1225                                                 return
1226                                         }())
1227                                         if err != nil {
1228                                                 log.Printf("error adding PEX peers: %s", err)
1229                                                 return
1230                                         }
1231                                         peersFoundByPEX.Add(int64(len(pexMsg.Added)))
1232                                 }()
1233                         default:
1234                                 err = fmt.Errorf("unexpected extended message ID: %v", msg.ExtendedID)
1235                         }
1236                         if err != nil {
1237                                 // That client uses its own extension IDs for outgoing message
1238                                 // types, which is incorrect.
1239                                 if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) ||
1240                                         strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
1241                                         return nil
1242                                 }
1243                                 // log.Printf("peer extension map: %#v", c.PeerExtensionIDs)
1244                         }
1245                 case pp.Port:
1246                         if me.dHT == nil {
1247                                 break
1248                         }
1249                         pingAddr, err := net.ResolveUDPAddr("", c.Socket.RemoteAddr().String())
1250                         if err != nil {
1251                                 panic(err)
1252                         }
1253                         if msg.Port != 0 {
1254                                 pingAddr.Port = int(msg.Port)
1255                         }
1256                         _, err = me.dHT.Ping(pingAddr)
1257                 default:
1258                         err = fmt.Errorf("received unknown message type: %#v", msg.Type)
1259                 }
1260                 if err != nil {
1261                         return err
1262                 }
1263         }
1264 }
1265
1266 func (me *Client) dropConnection(torrent *torrent, conn *connection) {
1267         for r := range conn.Requests {
1268                 me.connDeleteRequest(torrent, conn, r)
1269         }
1270         conn.Close()
1271         for i0, c := range torrent.Conns {
1272                 if c != conn {
1273                         continue
1274                 }
1275                 i1 := len(torrent.Conns) - 1
1276                 if i0 != i1 {
1277                         torrent.Conns[i0] = torrent.Conns[i1]
1278                 }
1279                 torrent.Conns = torrent.Conns[:i1]
1280                 me.openNewConns(torrent)
1281                 return
1282         }
1283         panic("connection not found")
1284 }
1285
1286 func (me *Client) addConnection(t *torrent, c *connection) bool {
1287         if me.stopped() {
1288                 return false
1289         }
1290         select {
1291         case <-t.ceasingNetworking:
1292                 return false
1293         default:
1294         }
1295         for _, c0 := range t.Conns {
1296                 if c.PeerID == c0.PeerID {
1297                         // Already connected to a client with that ID.
1298                         return false
1299                 }
1300         }
1301         t.Conns = append(t.Conns, c)
1302         // TODO: This should probably be done by a routine that kills off bad
1303         // connections, and extra connections killed here instead.
1304         if len(t.Conns) > socketsPerTorrent {
1305                 wcs := t.worstConnsHeap()
1306                 heap.Pop(wcs).(*connection).Close()
1307         }
1308         return true
1309 }
1310
1311 func (me *Client) openNewConns(t *torrent) {
1312         select {
1313         case <-t.ceasingNetworking:
1314                 return
1315         default:
1316         }
1317         if t.haveInfo() && !me.downloadStrategy.PendingData(t) {
1318                 return
1319         }
1320         for len(t.Peers) != 0 {
1321                 if len(t.Conns) >= socketsPerTorrent {
1322                         break
1323                 }
1324                 if len(t.HalfOpen)+me.handshaking >= me.halfOpenLimit {
1325                         break
1326                 }
1327                 var (
1328                         k peersKey
1329                         p Peer
1330                 )
1331                 for k, p = range t.Peers {
1332                         break
1333                 }
1334                 delete(t.Peers, k)
1335                 me.initiateConn(p, t)
1336         }
1337         t.wantPeers.Broadcast()
1338 }
1339
1340 // Adds peers to the swarm for the torrent corresponding to infoHash.
1341 func (me *Client) AddPeers(infoHash InfoHash, peers []Peer) error {
1342         me.mu.Lock()
1343         defer me.mu.Unlock()
1344         t := me.torrent(infoHash)
1345         if t == nil {
1346                 return errors.New("no such torrent")
1347         }
1348         blocked := 0
1349         for i, p := range peers {
1350                 if me.ipBlockRange(p.IP) == nil {
1351                         continue
1352                 }
1353                 peers[i] = peers[len(peers)-1]
1354                 peers = peers[:len(peers)-1]
1355                 i--
1356                 blocked++
1357         }
1358         if blocked != 0 {
1359                 log.Printf("IP blocklist screened %d peers from being added", blocked)
1360         }
1361         t.AddPeers(peers)
1362         me.openNewConns(t)
1363         return nil
1364 }
1365
1366 func (cl *Client) torrentFileCachePath(ih InfoHash) string {
1367         return filepath.Join(cl.configDir(), "torrents", ih.HexString()+".torrent")
1368 }
1369
1370 func (cl *Client) saveTorrentFile(t *torrent) error {
1371         path := cl.torrentFileCachePath(t.InfoHash)
1372         os.MkdirAll(filepath.Dir(path), 0777)
1373         f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
1374         if err != nil {
1375                 return fmt.Errorf("error opening file: %s", err)
1376         }
1377         defer f.Close()
1378         e := bencode.NewEncoder(f)
1379         err = e.Encode(t.MetaInfo())
1380         if err != nil {
1381                 return fmt.Errorf("error marshalling metainfo: %s", err)
1382         }
1383         mi, _ := cl.torrentCacheMetaInfo(t.InfoHash)
1384         if !bytes.Equal(mi.Info.Hash, t.InfoHash[:]) {
1385                 log.Fatalf("%x != %x", mi.Info.Hash, t.InfoHash[:])
1386         }
1387         return nil
1388 }
1389
1390 func (cl *Client) setMetaData(t *torrent, md metainfo.Info, bytes []byte) (err error) {
1391         err = t.setMetadata(md, cl.dataDir, bytes)
1392         if err != nil {
1393                 return
1394         }
1395         // If the client intends to upload, it needs to know what state pieces are
1396         // in.
1397         if !cl.noUpload {
1398                 // Queue all pieces for hashing. This is done sequentially to avoid
1399                 // spamming goroutines.
1400                 for _, p := range t.Pieces {
1401                         p.QueuedForHash = true
1402                 }
1403                 go func() {
1404                         for i := range t.Pieces {
1405                                 cl.verifyPiece(t, pp.Integer(i))
1406                         }
1407                 }()
1408         }
1409
1410         cl.downloadStrategy.TorrentStarted(t)
1411         // TODO(anacrolix): I think this should be made available as a method, the
1412         // channel only acts as a signal that the metadata has become available.
1413         // select {
1414         // case t.gotMetainfo <- &metainfo.MetaInfo{
1415         //      Info: metainfo.InfoEx{
1416         //              Info: md,
1417         //      },
1418         //      CreationDate: time.Now().Unix(),
1419         //      Comment:      "metadata set in client",
1420         //      CreatedBy:    "go.torrent",
1421         //      // TODO(anacrolix): Expose trackers given when torrent added.
1422         // }:
1423         // default:
1424         //      panic("shouldn't block")
1425         // }
1426         if err := cl.saveTorrentFile(t); err != nil {
1427                 log.Printf("error saving torrent file for %s: %s", t, err)
1428         }
1429         close(t.gotMetainfo)
1430         // t.gotMetainfo = nil
1431         return
1432 }
1433
1434 // Prepare a Torrent without any attachment to a Client. That means we can
1435 // initialize fields all fields that don't require the Client without locking
1436 // it.
1437 func newTorrent(ih InfoHash, announceList [][]string, halfOpenLimit int) (t *torrent, err error) {
1438         t = &torrent{
1439                 InfoHash: ih,
1440                 Peers:    make(map[peersKey]Peer, 2000),
1441
1442                 closing:           make(chan struct{}),
1443                 ceasingNetworking: make(chan struct{}),
1444
1445                 gotMetainfo: make(chan struct{}),
1446
1447                 HalfOpen: make(map[string]struct{}, halfOpenLimit),
1448         }
1449         t.wantPeers.L = &t.stateMu
1450         t.GotMetainfo = t.gotMetainfo
1451         t.addTrackers(announceList)
1452         return
1453 }
1454
1455 func init() {
1456         // For shuffling the tracker tiers.
1457         mathRand.Seed(time.Now().Unix())
1458 }
1459
1460 // The trackers within each tier must be shuffled before use.
1461 // http://stackoverflow.com/a/12267471/149482
1462 // http://www.bittorrent.org/beps/bep_0012.html#order-of-processing
1463 func shuffleTier(tier []tracker.Client) {
1464         for i := range tier {
1465                 j := mathRand.Intn(i + 1)
1466                 tier[i], tier[j] = tier[j], tier[i]
1467         }
1468 }
1469
1470 func copyTrackers(base [][]tracker.Client) (copy [][]tracker.Client) {
1471         for _, tier := range base {
1472                 copy = append(copy, append([]tracker.Client{}, tier...))
1473         }
1474         return
1475 }
1476
1477 func mergeTier(tier []tracker.Client, newURLs []string) []tracker.Client {
1478 nextURL:
1479         for _, url := range newURLs {
1480                 for _, tr := range tier {
1481                         if tr.URL() == url {
1482                                 continue nextURL
1483                         }
1484                 }
1485                 tr, err := tracker.New(url)
1486                 if err != nil {
1487                         log.Printf("error creating tracker client for %q: %s", url, err)
1488                         continue
1489                 }
1490                 tier = append(tier, tr)
1491         }
1492         return tier
1493 }
1494
1495 func (t *torrent) addTrackers(announceList [][]string) {
1496         newTrackers := copyTrackers(t.Trackers)
1497         for tierIndex, tier := range announceList {
1498                 if tierIndex < len(newTrackers) {
1499                         newTrackers[tierIndex] = mergeTier(newTrackers[tierIndex], tier)
1500                 } else {
1501                         newTrackers = append(newTrackers, mergeTier(nil, tier))
1502                 }
1503                 shuffleTier(newTrackers[tierIndex])
1504         }
1505         t.Trackers = newTrackers
1506 }
1507
1508 type Torrent struct {
1509         cl *Client
1510         *torrent
1511 }
1512
1513 func (t Torrent) AddPeers(pp []Peer) error {
1514         return t.cl.AddPeers(t.torrent.InfoHash, pp)
1515 }
1516
1517 func (t Torrent) DownloadAll() {
1518         t.cl.mu.Lock()
1519         for i := 0; i < t.NumPieces(); i++ {
1520                 t.cl.queueFirstHash(t.torrent, i)
1521         }
1522         t.cl.mu.Unlock()
1523 }
1524
1525 func (me Torrent) ReadAt(p []byte, off int64) (n int, err error) {
1526         err = me.cl.PrioritizeDataRegion(me.InfoHash, off, int64(len(p)))
1527         if err != nil {
1528                 err = fmt.Errorf("error prioritizing: %s", err)
1529                 return
1530         }
1531         <-me.cl.DataWaiter(me.InfoHash, off)
1532         return me.cl.TorrentReadAt(me.InfoHash, off, p)
1533 }
1534
1535 func (cl *Client) torrentCacheMetaInfo(ih InfoHash) (mi *metainfo.MetaInfo, err error) {
1536         f, err := os.Open(cl.torrentFileCachePath(ih))
1537         if err != nil {
1538                 if os.IsNotExist(err) {
1539                         err = nil
1540                 }
1541                 return
1542         }
1543         defer f.Close()
1544         dec := bencode.NewDecoder(f)
1545         err = dec.Decode(&mi)
1546         if err != nil {
1547                 return
1548         }
1549         if !bytes.Equal(mi.Info.Hash, ih[:]) {
1550                 err = fmt.Errorf("cached torrent has wrong infohash: %x != %x", mi.Info.Hash, ih[:])
1551                 return
1552         }
1553         return
1554 }
1555
1556 func (cl *Client) AddMagnet(uri string) (T Torrent, err error) {
1557         m, err := ParseMagnetURI(uri)
1558         if err != nil {
1559                 return
1560         }
1561         mi, err := cl.torrentCacheMetaInfo(m.InfoHash)
1562         if err != nil {
1563                 log.Printf("error getting cached metainfo for %x: %s", m.InfoHash[:], err)
1564         } else if mi != nil {
1565                 cl.AddTorrent(mi)
1566         }
1567         cl.mu.Lock()
1568         defer cl.mu.Unlock()
1569         T, err = cl.addOrMergeTorrent(m.InfoHash, [][]string{m.Trackers})
1570         if err != nil {
1571                 return
1572         }
1573         if m.DisplayName != "" {
1574                 T.DisplayName = m.DisplayName
1575         }
1576         return
1577 }
1578
1579 func (cl *Client) connectionPruner(t *torrent) {
1580         for {
1581                 time.Sleep(15 * time.Second)
1582                 cl.mu.Lock()
1583                 license := len(t.Conns) - (socketsPerTorrent+1)/2
1584                 for _, c := range t.Conns {
1585                         if license <= 0 {
1586                                 break
1587                         }
1588                         if time.Now().Sub(c.lastUsefulChunkReceived) < time.Minute {
1589                                 continue
1590                         }
1591                         if time.Now().Sub(c.completedHandshake) < time.Minute {
1592                                 continue
1593                         }
1594                         c.Close()
1595                         license--
1596                 }
1597                 cl.mu.Unlock()
1598         }
1599 }
1600
1601 func (me *Client) DropTorrent(infoHash InfoHash) (err error) {
1602         me.mu.Lock()
1603         defer me.mu.Unlock()
1604         t, ok := me.torrents[infoHash]
1605         if !ok {
1606                 err = fmt.Errorf("no such torrent")
1607                 return
1608         }
1609         err = t.Close()
1610         if err != nil {
1611                 panic(err)
1612         }
1613         delete(me.torrents, infoHash)
1614         me.downloadStrategy.TorrentStopped(t)
1615         for _, dw := range me.dataWaits[t] {
1616                 close(dw.ready)
1617         }
1618         delete(me.dataWaits, t)
1619         return
1620 }
1621
1622 func (me *Client) addOrMergeTorrent(ih InfoHash, announceList [][]string) (T Torrent, err error) {
1623         if _, ok := me.bannedTorrents[ih]; ok {
1624                 err = errors.New("banned torrent")
1625                 return
1626         }
1627         T.cl = me
1628         var ok bool
1629         T.torrent, ok = me.torrents[ih]
1630         if ok {
1631                 T.torrent.addTrackers(announceList)
1632         } else {
1633                 T.torrent, err = newTorrent(ih, announceList, me.halfOpenLimit)
1634                 if err != nil {
1635                         return
1636                 }
1637                 me.torrents[ih] = T.torrent
1638                 if !me.disableTrackers {
1639                         go me.announceTorrentTrackers(T.torrent)
1640                 }
1641                 if me.dHT != nil {
1642                         go me.announceTorrentDHT(T.torrent, true)
1643                 }
1644                 go me.connectionPruner(T.torrent)
1645         }
1646         return
1647 }
1648
1649 // Adds the torrent to the client.
1650 func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) (t Torrent, err error) {
1651         var ih InfoHash
1652         CopyExact(&ih, metaInfo.Info.Hash)
1653         me.mu.Lock()
1654         defer me.mu.Unlock()
1655         t, err = me.addOrMergeTorrent(ih, metaInfo.AnnounceList)
1656         if err != nil {
1657                 return
1658         }
1659         if !t.torrent.haveInfo() {
1660                 err = me.setMetaData(t.torrent, metaInfo.Info.Info, metaInfo.Info.Bytes)
1661                 if err != nil {
1662                         return
1663                 }
1664         }
1665         return
1666 }
1667
1668 func (me *Client) AddTorrentFromFile(name string) (t Torrent, err error) {
1669         mi, err := metainfo.LoadFromFile(name)
1670         if err != nil {
1671                 err = fmt.Errorf("error loading metainfo from file: %s", err)
1672                 return
1673         }
1674         return me.AddTorrent(mi)
1675 }
1676
1677 // Returns true when peers are required, or false if the torrent is closing.
1678 func (cl *Client) waitWantPeers(t *torrent) bool {
1679         cl.mu.Lock()
1680         defer cl.mu.Unlock()
1681         t.stateMu.Lock()
1682         defer t.stateMu.Unlock()
1683         for {
1684                 select {
1685                 case <-t.ceasingNetworking:
1686                         return false
1687                 default:
1688                 }
1689                 if len(t.Peers) < socketsPerTorrent*5 {
1690                         return true
1691                 }
1692                 cl.mu.Unlock()
1693                 t.wantPeers.Wait()
1694                 t.stateMu.Unlock()
1695                 cl.mu.Lock()
1696                 t.stateMu.Lock()
1697         }
1698 }
1699
1700 func (cl *Client) announceTorrentDHT(t *torrent, impliedPort bool) {
1701         for cl.waitWantPeers(t) {
1702                 log.Printf("getting peers for %q from DHT", t)
1703                 ps, err := cl.dHT.GetPeers(string(t.InfoHash[:]))
1704                 if err != nil {
1705                         log.Printf("error getting peers from dht: %s", err)
1706                         return
1707                 }
1708         getPeers:
1709                 for {
1710                         select {
1711                         case v, ok := <-ps.Values:
1712                                 if !ok {
1713                                         break getPeers
1714                                 }
1715                                 peersFoundByDHT.Add(int64(len(v.Peers)))
1716                                 err = cl.AddPeers(t.InfoHash, func() (ret []Peer) {
1717                                         for _, cp := range v.Peers {
1718                                                 ret = append(ret, Peer{
1719                                                         IP:     cp.IP[:],
1720                                                         Port:   int(cp.Port),
1721                                                         Source: peerSourceDHT,
1722                                                 })
1723                                         }
1724                                         return
1725                                 }())
1726                                 if err != nil {
1727                                         log.Printf("error adding peers from dht for torrent %q: %s", t, err)
1728                                         break getPeers
1729                                 }
1730                         case <-t.ceasingNetworking:
1731                                 ps.Close()
1732                                 return
1733                         }
1734                 }
1735                 ps.Close()
1736                 log.Printf("finished DHT peer scrape for %s", t)
1737
1738                 // After a GetPeers, we can announce on the best nodes that gave us an
1739                 // announce token.
1740
1741                 port := cl.incomingPeerPort()
1742                 // If port is zero, then we're not listening, and there's nothing to
1743                 // announce.
1744                 if port != 0 {
1745                         // We can't allow the port to be implied as long as the UTP and
1746                         // DHT ports are different.
1747                         err := cl.dHT.AnnouncePeer(port, impliedPort, t.InfoHash.AsString())
1748                         if err != nil {
1749                                 log.Printf("error announcing torrent to DHT: %s", err)
1750                         } else {
1751                                 log.Printf("announced %q to DHT", t)
1752                         }
1753                 }
1754         }
1755 }
1756
1757 func (cl *Client) announceTorrentSingleTracker(tr tracker.Client, req *tracker.AnnounceRequest, t *torrent) error {
1758         if err := tr.Connect(); err != nil {
1759                 return fmt.Errorf("error connecting: %s", err)
1760         }
1761         resp, err := tr.Announce(req)
1762         if err != nil {
1763                 return fmt.Errorf("error announcing: %s", err)
1764         }
1765         var peers []Peer
1766         for _, peer := range resp.Peers {
1767                 peers = append(peers, Peer{
1768                         IP:   peer.IP,
1769                         Port: peer.Port,
1770                 })
1771         }
1772         err = cl.AddPeers(t.InfoHash, peers)
1773         if err != nil {
1774                 log.Printf("error adding peers to torrent %s: %s", t, err)
1775         } else {
1776                 log.Printf("%s: %d new peers from %s", t, len(peers), tr)
1777         }
1778
1779         time.Sleep(time.Second * time.Duration(resp.Interval))
1780         return nil
1781 }
1782
1783 func (cl *Client) announceTorrentTrackersFastStart(req *tracker.AnnounceRequest, trackers [][]tracker.Client, t *torrent) (atLeastOne bool) {
1784         oks := make(chan bool)
1785         outstanding := 0
1786         for _, tier := range trackers {
1787                 for _, tr := range tier {
1788                         outstanding++
1789                         go func(tr tracker.Client) {
1790                                 err := cl.announceTorrentSingleTracker(tr, req, t)
1791                                 if err != nil {
1792                                         log.Printf("error announcing to %s: %s", tr, err)
1793                                 }
1794                                 oks <- err == nil
1795                         }(tr)
1796                 }
1797         }
1798         for outstanding > 0 {
1799                 ok := <-oks
1800                 outstanding--
1801                 if ok {
1802                         atLeastOne = true
1803                 }
1804         }
1805         return
1806 }
1807
1808 // Announce torrent to its trackers.
1809 func (cl *Client) announceTorrentTrackers(t *torrent) {
1810         req := tracker.AnnounceRequest{
1811                 Event:    tracker.Started,
1812                 NumWant:  -1,
1813                 Port:     int16(cl.incomingPeerPort()),
1814                 PeerId:   cl.peerID,
1815                 InfoHash: t.InfoHash,
1816         }
1817         cl.mu.RLock()
1818         req.Left = t.BytesLeft()
1819         trackers := t.Trackers
1820         cl.mu.RUnlock()
1821         if cl.announceTorrentTrackersFastStart(&req, trackers, t) {
1822                 req.Event = tracker.None
1823         }
1824 newAnnounce:
1825         for cl.waitWantPeers(t) {
1826                 cl.mu.RLock()
1827                 req.Left = t.BytesLeft()
1828                 trackers = t.Trackers
1829                 cl.mu.RUnlock()
1830                 for _, tier := range trackers {
1831                         for trIndex, tr := range tier {
1832                                 err := cl.announceTorrentSingleTracker(tr, &req, t)
1833                                 if err != nil {
1834                                         log.Printf("error announcing to %s: %s", tr, err)
1835                                         continue
1836                                 }
1837                                 // Float the successful announce to the top of the tier. If
1838                                 // the trackers list has been changed, we'll be modifying an
1839                                 // old copy so it won't matter.
1840                                 cl.mu.Lock()
1841                                 tier[0], tier[trIndex] = tier[trIndex], tier[0]
1842                                 cl.mu.Unlock()
1843
1844                                 req.Event = tracker.None
1845                                 continue newAnnounce
1846                         }
1847                 }
1848                 // TODO: Wait until trackers are added if there are none.
1849                 time.Sleep(10 * time.Second)
1850         }
1851 }
1852
1853 func (cl *Client) allTorrentsCompleted() bool {
1854         for _, t := range cl.torrents {
1855                 if !t.haveInfo() {
1856                         return false
1857                 }
1858                 for e := t.IncompletePiecesByBytesLeft.Front(); e != nil; e = e.Next() {
1859                         i := e.Value.(int)
1860                         if t.Pieces[i].Complete() {
1861                                 continue
1862                         }
1863                         // If the piece isn't complete, make sure it's not because it's
1864                         // never been hashed.
1865                         cl.queueFirstHash(t, i)
1866                         return false
1867                 }
1868         }
1869         return true
1870 }
1871
1872 // Returns true when all torrents are completely downloaded and false if the
1873 // client is stopped before that.
1874 func (me *Client) WaitAll() bool {
1875         me.mu.Lock()
1876         defer me.mu.Unlock()
1877         for !me.allTorrentsCompleted() {
1878                 if me.stopped() {
1879                         return false
1880                 }
1881                 me.event.Wait()
1882         }
1883         return true
1884 }
1885
1886 func (me *Client) replenishConnRequests(t *torrent, c *connection) {
1887         if !t.haveInfo() {
1888                 return
1889         }
1890         me.downloadStrategy.FillRequests(t, c)
1891         if len(c.Requests) == 0 && !c.PeerChoked {
1892                 c.SetInterested(false)
1893         }
1894 }
1895
1896 // Handle a received chunk from a peer.
1897 func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) error {
1898         chunksDownloadedCount.Add(1)
1899
1900         req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
1901
1902         // Request has been satisfied.
1903         me.connDeleteRequest(t, c, req)
1904
1905         defer me.replenishConnRequests(t, c)
1906
1907         // Do we actually want this chunk?
1908         if _, ok := t.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok {
1909                 unusedDownloadedChunksCount.Add(1)
1910                 c.UnwantedChunksReceived++
1911                 return nil
1912         }
1913
1914         c.UsefulChunksReceived++
1915         c.lastUsefulChunkReceived = time.Now()
1916
1917         // Write the chunk out.
1918         err := t.WriteChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
1919         if err != nil {
1920                 return fmt.Errorf("error writing chunk: %s", err)
1921         }
1922
1923         // Record that we have the chunk.
1924         delete(t.Pieces[req.Index].PendingChunkSpecs, req.chunkSpec)
1925         me.dataReady(t, req)
1926         if len(t.Pieces[req.Index].PendingChunkSpecs) == 0 {
1927                 me.queuePieceCheck(t, req.Index)
1928         }
1929         t.PieceBytesLeftChanged(int(req.Index))
1930
1931         // Unprioritize the chunk.
1932         me.downloadStrategy.TorrentGotChunk(t, req)
1933
1934         // Cancel pending requests for this chunk.
1935         for _, c := range t.Conns {
1936                 if me.connCancel(t, c, req) {
1937                         me.replenishConnRequests(t, c)
1938                 }
1939         }
1940
1941         me.downloadStrategy.AssertNotRequested(t, req)
1942
1943         return nil
1944 }
1945
1946 func (cl *Client) dataReady(t *torrent, r request) {
1947         dws := cl.dataWaits[t]
1948         begin := t.requestOffset(r)
1949         end := begin + int64(r.Length)
1950         for i := 0; i < len(dws); {
1951                 dw := dws[i]
1952                 if begin <= dw.offset && dw.offset < end {
1953                         close(dw.ready)
1954                         dws[i] = dws[len(dws)-1]
1955                         dws = dws[:len(dws)-1]
1956                 } else {
1957                         i++
1958                 }
1959         }
1960         cl.dataWaits[t] = dws
1961 }
1962
1963 // Returns a channel that is closed when new data has become available in the
1964 // client.
1965 func (me *Client) DataWaiter(ih InfoHash, off int64) (ret <-chan struct{}) {
1966         me.mu.Lock()
1967         defer me.mu.Unlock()
1968         ch := make(chan struct{})
1969         ret = ch
1970         t := me.torrents[ih]
1971         if t == nil {
1972                 close(ch)
1973                 return
1974         }
1975         if r, ok := t.offsetRequest(off); !ok || t.haveChunk(r) {
1976                 close(ch)
1977                 return
1978         }
1979         me.dataWaits[t] = append(me.dataWaits[t], dataWait{
1980                 offset: off,
1981                 ready:  ch,
1982         })
1983         return
1984 }
1985
1986 func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) {
1987         p := t.Pieces[piece]
1988         if p.EverHashed && !correct {
1989                 log.Printf("%s: piece %d failed hash", t, piece)
1990                 failedPieceHashes.Add(1)
1991         }
1992         p.EverHashed = true
1993         if correct {
1994                 p.PendingChunkSpecs = nil
1995                 me.downloadStrategy.TorrentGotPiece(t, int(piece))
1996                 me.dataReady(t, request{
1997                         pp.Integer(piece),
1998                         chunkSpec{0, pp.Integer(t.PieceLength(piece))},
1999                 })
2000         } else {
2001                 if len(p.PendingChunkSpecs) == 0 {
2002                         t.pendAllChunkSpecs(piece)
2003                 }
2004         }
2005         t.PieceBytesLeftChanged(int(piece))
2006         for _, conn := range t.Conns {
2007                 if correct {
2008                         conn.Post(pp.Message{
2009                                 Type:  pp.Have,
2010                                 Index: pp.Integer(piece),
2011                         })
2012                         // TODO: Cancel requests for this piece.
2013                         for r := range conn.Requests {
2014                                 if r.Index == piece {
2015                                         panic("wat")
2016                                 }
2017                         }
2018                 }
2019         }
2020         if t.haveAllPieces() && me.noUpload {
2021                 t.CeaseNetworking()
2022         }
2023         me.event.Broadcast()
2024 }
2025
2026 func (cl *Client) verifyPiece(t *torrent, index pp.Integer) {
2027         cl.mu.Lock()
2028         defer cl.mu.Unlock()
2029         p := t.Pieces[index]
2030         for p.Hashing {
2031                 cl.event.Wait()
2032         }
2033         if t.isClosed() {
2034                 return
2035         }
2036         p.Hashing = true
2037         p.QueuedForHash = false
2038         cl.mu.Unlock()
2039         sum := t.HashPiece(index)
2040         cl.mu.Lock()
2041         select {
2042         case <-t.closing:
2043                 return
2044         default:
2045         }
2046         p.Hashing = false
2047         cl.pieceHashed(t, index, sum == p.Hash)
2048 }
2049
2050 func (me *Client) Torrents() (ret []*torrent) {
2051         me.mu.Lock()
2052         for _, t := range me.torrents {
2053                 ret = append(ret, t)
2054         }
2055         me.mu.Unlock()
2056         return
2057 }