]> Sergey Matveev's repositories - btrtrc.git/blob - client.go
cd65f55de54994213f2b8c759148b0129b673c2b
[btrtrc.git] / client.go
1 /*
2 Package torrent implements a torrent client.
3
4 Simple example:
5
6         c := &Client{}
7         c.Start()
8         defer c.Stop()
9         if err := c.AddTorrent(externalMetaInfoPackageSux); err != nil {
10                 return fmt.Errors("error adding torrent: %s", err)
11         }
12         c.WaitAll()
13         log.Print("erhmahgerd, torrent downloaded")
14
15 */
16 package torrent
17
18 import (
19         "bufio"
20         "bytes"
21         "container/heap"
22         "crypto/rand"
23         "crypto/sha1"
24         "errors"
25         "expvar"
26         "fmt"
27         "io"
28         "log"
29         "math/big"
30         mathRand "math/rand"
31         "net"
32         "os"
33         "path/filepath"
34         "sort"
35         "strings"
36         "syscall"
37         "time"
38
39         "bitbucket.org/anacrolix/go.torrent/dht"
40         "bitbucket.org/anacrolix/go.torrent/internal/pieceordering"
41         "bitbucket.org/anacrolix/go.torrent/iplist"
42         pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
43         "bitbucket.org/anacrolix/go.torrent/tracker"
44         _ "bitbucket.org/anacrolix/go.torrent/tracker/udp"
45         . "bitbucket.org/anacrolix/go.torrent/util"
46         "bitbucket.org/anacrolix/sync"
47         "bitbucket.org/anacrolix/utp"
48         "github.com/anacrolix/libtorgo/bencode"
49         "github.com/anacrolix/libtorgo/metainfo"
50 )
51
52 var (
53         unusedDownloadedChunksCount = expvar.NewInt("unusedDownloadedChunksCount")
54         chunksDownloadedCount       = expvar.NewInt("chunksDownloadedCount")
55         peersFoundByDHT             = expvar.NewInt("peersFoundByDHT")
56         peersFoundByPEX             = expvar.NewInt("peersFoundByPEX")
57         uploadChunksPosted          = expvar.NewInt("uploadChunksPosted")
58         unexpectedCancels           = expvar.NewInt("unexpectedCancels")
59         postedCancels               = expvar.NewInt("postedCancels")
60         duplicateConnsAvoided       = expvar.NewInt("duplicateConnsAvoided")
61         failedPieceHashes           = expvar.NewInt("failedPieceHashes")
62         unsuccessfulDials           = expvar.NewInt("unsuccessfulDials")
63         successfulDials             = expvar.NewInt("successfulDials")
64         acceptedConns               = expvar.NewInt("acceptedConns")
65         inboundConnsBlocked         = expvar.NewInt("inboundConnsBlocked")
66 )
67
68 const (
69         // Justification for set bits follows.
70         //
71         // Extension protocol: http://www.bittorrent.org/beps/bep_0010.html
72         // DHT: http://www.bittorrent.org/beps/bep_0005.html
73         extensionBytes = "\x00\x00\x00\x00\x00\x10\x00\x01"
74
75         socketsPerTorrent     = 40
76         torrentPeersHighWater = 1000
77         torrentPeersLowWater  = socketsPerTorrent * 5
78 )
79
80 // Currently doesn't really queue, but should in the future.
81 func (cl *Client) queuePieceCheck(t *torrent, pieceIndex pp.Integer) {
82         piece := t.Pieces[pieceIndex]
83         if piece.QueuedForHash {
84                 return
85         }
86         piece.QueuedForHash = true
87         go cl.verifyPiece(t, pieceIndex)
88 }
89
90 func (cl *Client) queueFirstHash(t *torrent, piece int) {
91         p := t.Pieces[piece]
92         if p.EverHashed || p.Hashing || p.QueuedForHash {
93                 return
94         }
95         cl.queuePieceCheck(t, pp.Integer(piece))
96 }
97
98 // Queues the torrent data for the given region for download. The beginning of
99 // the region is given highest priority to allow a subsequent read at the same
100 // offset to return data ASAP.
101 func (me *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) error {
102         me.mu.Lock()
103         defer me.mu.Unlock()
104         t := me.torrent(ih)
105         if t == nil {
106                 return errors.New("no such active torrent")
107         }
108         if !t.haveInfo() {
109                 return errors.New("missing metadata")
110         }
111         firstIndex := int(off / int64(t.UsualPieceSize()))
112         for i := 0; i < 5; i++ {
113                 index := firstIndex + i
114                 if index >= t.NumPieces() {
115                         continue
116                 }
117                 me.queueFirstHash(t, index)
118         }
119         me.downloadStrategy.TorrentPrioritize(t, off, len_)
120         for _, cn := range t.Conns {
121                 me.replenishConnRequests(t, cn)
122         }
123         me.openNewConns(t)
124         return nil
125 }
126
127 type dataWait struct {
128         offset int64
129         ready  chan struct{}
130 }
131
132 type Client struct {
133         noUpload         bool
134         dataDir          string
135         halfOpenLimit    int
136         peerID           [20]byte
137         listeners        []net.Listener
138         utpSock          *utp.Socket
139         disableTrackers  bool
140         downloadStrategy DownloadStrategy
141         dHT              *dht.Server
142         disableUTP       bool
143         disableTCP       bool
144         ipBlockList      *iplist.IPList
145         bannedTorrents   map[InfoHash]struct{}
146
147         mu    sync.RWMutex
148         event sync.Cond
149         quit  chan struct{}
150
151         handshaking int
152
153         torrents map[InfoHash]*torrent
154
155         dataWaits map[*torrent][]dataWait
156 }
157
158 func (me *Client) SetIPBlockList(list *iplist.IPList) {
159         me.mu.Lock()
160         defer me.mu.Unlock()
161         me.ipBlockList = list
162         if me.dHT != nil {
163                 me.dHT.SetIPBlockList(list)
164         }
165 }
166
167 func (me *Client) PeerID() string {
168         return string(me.peerID[:])
169 }
170
171 func (me *Client) ListenAddr() (addr net.Addr) {
172         for _, l := range me.listeners {
173                 if addr != nil && l.Addr().String() != addr.String() {
174                         panic("listeners exist on different addresses")
175                 }
176                 addr = l.Addr()
177         }
178         return
179 }
180
181 type hashSorter struct {
182         Hashes []InfoHash
183 }
184
185 func (me hashSorter) Len() int {
186         return len(me.Hashes)
187 }
188
189 func (me hashSorter) Less(a, b int) bool {
190         return (&big.Int{}).SetBytes(me.Hashes[a][:]).Cmp((&big.Int{}).SetBytes(me.Hashes[b][:])) < 0
191 }
192
193 func (me hashSorter) Swap(a, b int) {
194         me.Hashes[a], me.Hashes[b] = me.Hashes[b], me.Hashes[a]
195 }
196
197 func (cl *Client) sortedTorrents() (ret []*torrent) {
198         var hs hashSorter
199         for ih := range cl.torrents {
200                 hs.Hashes = append(hs.Hashes, ih)
201         }
202         sort.Sort(hs)
203         for _, ih := range hs.Hashes {
204                 ret = append(ret, cl.torrent(ih))
205         }
206         return
207 }
208
209 func (cl *Client) WriteStatus(_w io.Writer) {
210         cl.mu.RLock()
211         defer cl.mu.RUnlock()
212         w := bufio.NewWriter(_w)
213         defer w.Flush()
214         fmt.Fprintf(w, "Listening on %s\n", cl.ListenAddr())
215         fmt.Fprintf(w, "Peer ID: %q\n", cl.peerID)
216         fmt.Fprintf(w, "Handshaking: %d\n", cl.handshaking)
217         if cl.dHT != nil {
218                 dhtStats := cl.dHT.Stats()
219                 fmt.Fprintf(w, "DHT nodes: %d (%d good)\n", dhtStats.NumNodes, dhtStats.NumGoodNodes)
220                 fmt.Fprintf(w, "DHT Server ID: %x\n", cl.dHT.IDString())
221                 fmt.Fprintf(w, "DHT port: %d\n", addrPort(cl.dHT.LocalAddr()))
222                 fmt.Fprintf(w, "DHT announces: %d\n", cl.dHT.NumConfirmedAnnounces)
223                 fmt.Fprintf(w, "Outstanding transactions: %d\n", dhtStats.NumOutstandingTransactions)
224         }
225         cl.downloadStrategy.WriteStatus(w)
226         fmt.Fprintln(w)
227         for _, t := range cl.sortedTorrents() {
228                 if t.Name() == "" {
229                         fmt.Fprint(w, "<unknown name>")
230                 } else {
231                         fmt.Fprint(w, t.Name())
232                 }
233                 if t.haveInfo() {
234                         fmt.Fprintf(w, ": %f%% of %d bytes", 100*(1-float32(t.BytesLeft())/float32(t.Length())), t.Length())
235                 }
236                 fmt.Fprint(w, "\n")
237                 fmt.Fprint(w, "Blocked reads:")
238                 for _, dw := range cl.dataWaits[t] {
239                         fmt.Fprintf(w, " %d", dw.offset)
240                 }
241                 fmt.Fprintln(w)
242                 t.WriteStatus(w)
243                 fmt.Fprintln(w)
244         }
245 }
246
247 // Read torrent data at the given offset. Returns ErrDataNotReady if the data
248 // isn't available.
249 func (cl *Client) torrentReadAt(t *torrent, off int64, p []byte) (n int, err error) {
250         cl.mu.Lock()
251         defer cl.mu.Unlock()
252         index := int(off / int64(t.UsualPieceSize()))
253         // Reading outside the bounds of a file is an error.
254         if index < 0 {
255                 err = os.ErrInvalid
256                 return
257         }
258         if int(index) >= len(t.Pieces) {
259                 err = io.EOF
260                 return
261         }
262         piece := t.Pieces[index]
263         pieceOff := pp.Integer(off % int64(t.UsualPieceSize()))
264         pieceLeft := int(t.PieceLength(pp.Integer(index)) - pieceOff)
265         if pieceLeft <= 0 {
266                 err = io.EOF
267                 return
268         }
269         cl.readRaisePiecePriorities(t, off, int64(len(p)))
270         if len(p) > pieceLeft {
271                 p = p[:pieceLeft]
272         }
273         if len(p) == 0 {
274                 panic(len(p))
275         }
276         for !piece.Complete() {
277                 piece.Event.Wait()
278         }
279         return t.Data.ReadAt(p, off)
280 }
281
282 func (cl *Client) readRaisePiecePriorities(t *torrent, off, _len int64) {
283         index := int(off / int64(t.UsualPieceSize()))
284         cl.raisePiecePriority(t, index, piecePriorityNow)
285         index++
286         if index >= t.NumPieces() {
287                 return
288         }
289         cl.raisePiecePriority(t, index, piecePriorityNext)
290         for i := 0; i < t.numConnsUnchoked()-2; i++ {
291                 index++
292                 if index >= t.NumPieces() {
293                         break
294                 }
295                 cl.raisePiecePriority(t, index, piecePriorityReadahead)
296         }
297 }
298
299 func (cl *Client) configDir() string {
300         return filepath.Join(os.Getenv("HOME"), ".config/torrent")
301 }
302
303 func (cl *Client) ConfigDir() string {
304         return cl.configDir()
305 }
306
307 func (t *torrent) connPendPiece(c *connection, piece int) {
308         c.pendPiece(piece, t.Pieces[piece].Priority)
309 }
310
311 func (cl *Client) raisePiecePriority(t *torrent, piece int, priority piecePriority) {
312         if t.Pieces[piece].Priority < priority {
313                 cl.prioritizePiece(t, piece, priority)
314         }
315 }
316
317 func (cl *Client) prioritizePiece(t *torrent, piece int, priority piecePriority) {
318         if t.havePiece(piece) {
319                 return
320         }
321         cl.queueFirstHash(t, piece)
322         t.Pieces[piece].Priority = priority
323         if t.wantPiece(piece) {
324                 for _, c := range t.Conns {
325                         if c.PeerHasPiece(pp.Integer(piece)) {
326                                 t.connPendPiece(c, piece)
327                                 cl.replenishConnRequests(t, c)
328                         }
329                 }
330         }
331 }
332
333 func (cl *Client) setEnvBlocklist() (err error) {
334         filename := os.Getenv("TORRENT_BLOCKLIST_FILE")
335         defaultBlocklist := filename == ""
336         if defaultBlocklist {
337                 filename = filepath.Join(cl.configDir(), "blocklist")
338         }
339         f, err := os.Open(filename)
340         if err != nil {
341                 if defaultBlocklist {
342                         err = nil
343                 }
344                 return
345         }
346         defer f.Close()
347         var ranges []iplist.Range
348         scanner := bufio.NewScanner(f)
349         for scanner.Scan() {
350                 r, ok, lineErr := iplist.ParseBlocklistP2PLine(scanner.Text())
351                 if lineErr != nil {
352                         err = fmt.Errorf("error reading torrent blocklist line: %s", lineErr)
353                         return
354                 }
355                 if !ok {
356                         continue
357                 }
358                 ranges = append(ranges, r)
359         }
360         err = scanner.Err()
361         if err != nil {
362                 err = fmt.Errorf("error reading torrent blocklist: %s", err)
363                 return
364         }
365         cl.ipBlockList = iplist.New(ranges)
366         return
367 }
368
369 func (cl *Client) initBannedTorrents() error {
370         f, err := os.Open(filepath.Join(cl.configDir(), "banned_infohashes"))
371         if err != nil {
372                 if os.IsNotExist(err) {
373                         return nil
374                 }
375                 return fmt.Errorf("error opening banned infohashes file: %s", err)
376         }
377         defer f.Close()
378         scanner := bufio.NewScanner(f)
379         cl.bannedTorrents = make(map[InfoHash]struct{})
380         for scanner.Scan() {
381                 var ihs string
382                 n, err := fmt.Sscanf(scanner.Text(), "%x", &ihs)
383                 if err != nil {
384                         return fmt.Errorf("error reading infohash: %s", err)
385                 }
386                 if n != 1 {
387                         continue
388                 }
389                 if len(ihs) != 20 {
390                         return errors.New("bad infohash")
391                 }
392                 var ih InfoHash
393                 CopyExact(&ih, ihs)
394                 cl.bannedTorrents[ih] = struct{}{}
395         }
396         if err := scanner.Err(); err != nil {
397                 return fmt.Errorf("error scanning file: %s", err)
398         }
399         return nil
400 }
401
402 func NewClient(cfg *Config) (cl *Client, err error) {
403         if cfg == nil {
404                 cfg = &Config{}
405         }
406
407         cl = &Client{
408                 noUpload:         cfg.NoUpload,
409                 disableTrackers:  cfg.DisableTrackers,
410                 downloadStrategy: cfg.DownloadStrategy,
411                 halfOpenLimit:    socketsPerTorrent,
412                 dataDir:          cfg.DataDir,
413                 disableUTP:       cfg.DisableUTP,
414                 disableTCP:       cfg.DisableTCP,
415
416                 quit:     make(chan struct{}),
417                 torrents: make(map[InfoHash]*torrent),
418
419                 dataWaits: make(map[*torrent][]dataWait),
420         }
421         // TODO: Write my own UTP library à² _ಠ
422         // cl.disableUTP = true
423         cl.event.L = &cl.mu
424
425         if !cfg.NoDefaultBlocklist {
426                 err = cl.setEnvBlocklist()
427                 if err != nil {
428                         return
429                 }
430         }
431
432         if err = cl.initBannedTorrents(); err != nil {
433                 err = fmt.Errorf("error initing banned torrents: %s", err)
434                 return
435         }
436
437         if cfg.PeerID != "" {
438                 CopyExact(&cl.peerID, cfg.PeerID)
439         } else {
440                 o := copy(cl.peerID[:], BEP20)
441                 _, err = rand.Read(cl.peerID[o:])
442                 if err != nil {
443                         panic("error generating peer id")
444                 }
445         }
446
447         if cl.downloadStrategy == nil {
448                 cl.downloadStrategy = &DefaultDownloadStrategy{}
449         }
450
451         // Returns the laddr string to listen on for the next Listen call.
452         listenAddr := func() string {
453                 if addr := cl.ListenAddr(); addr != nil {
454                         return addr.String()
455                 }
456                 if cfg.ListenAddr == "" {
457                         return ":50007"
458                 }
459                 return cfg.ListenAddr
460         }
461         if !cl.disableTCP {
462                 var l net.Listener
463                 l, err = net.Listen("tcp", listenAddr())
464                 if err != nil {
465                         return
466                 }
467                 cl.listeners = append(cl.listeners, l)
468                 go cl.acceptConnections(l, false)
469         }
470         if !cl.disableUTP {
471                 cl.utpSock, err = utp.NewSocket(listenAddr())
472                 if err != nil {
473                         return
474                 }
475                 cl.listeners = append(cl.listeners, cl.utpSock)
476                 go cl.acceptConnections(cl.utpSock, true)
477         }
478         if !cfg.NoDHT {
479                 dhtCfg := cfg.DHTConfig
480                 if dhtCfg == nil {
481                         dhtCfg = &dht.ServerConfig{}
482                 }
483                 if dhtCfg.Addr == "" {
484                         dhtCfg.Addr = listenAddr()
485                 }
486                 if dhtCfg.Conn == nil && cl.utpSock != nil {
487                         dhtCfg.Conn = cl.utpSock
488                 }
489                 cl.dHT, err = dht.NewServer(dhtCfg)
490                 if cl.ipBlockList != nil {
491                         cl.dHT.SetIPBlockList(cl.ipBlockList)
492                 }
493                 if err != nil {
494                         return
495                 }
496         }
497
498         return
499 }
500
501 func (cl *Client) stopped() bool {
502         select {
503         case <-cl.quit:
504                 return true
505         default:
506                 return false
507         }
508 }
509
510 // Stops the client. All connections to peers are closed and all activity will
511 // come to a halt.
512 func (me *Client) Stop() {
513         me.mu.Lock()
514         close(me.quit)
515         for _, l := range me.listeners {
516                 l.Close()
517         }
518         me.event.Broadcast()
519         for _, t := range me.torrents {
520                 t.Close()
521         }
522         me.mu.Unlock()
523 }
524
525 var ipv6BlockRange = iplist.Range{Description: "non-IPv4 address"}
526
527 func (cl *Client) ipBlockRange(ip net.IP) (r *iplist.Range) {
528         if cl.ipBlockList == nil {
529                 return
530         }
531         ip = ip.To4()
532         if ip == nil {
533                 log.Printf("saw non-IPv4 address")
534                 r = &ipv6BlockRange
535                 return
536         }
537         r = cl.ipBlockList.Lookup(ip)
538         return
539 }
540
541 func (cl *Client) acceptConnections(l net.Listener, utp bool) {
542         for {
543                 // We accept all connections immediately, because we don't know what
544                 // torrent they're for.
545                 conn, err := l.Accept()
546                 select {
547                 case <-cl.quit:
548                         if conn != nil {
549                                 conn.Close()
550                         }
551                         return
552                 default:
553                 }
554                 if err != nil {
555                         log.Print(err)
556                         return
557                 }
558                 acceptedConns.Add(1)
559                 cl.mu.RLock()
560                 blockRange := cl.ipBlockRange(AddrIP(conn.RemoteAddr()))
561                 cl.mu.RUnlock()
562                 if blockRange != nil {
563                         inboundConnsBlocked.Add(1)
564                         log.Printf("inbound connection from %s blocked by %s", conn.RemoteAddr(), blockRange)
565                         conn.Close()
566                         continue
567                 }
568                 go func() {
569                         if err := cl.runConnection(conn, nil, peerSourceIncoming, utp); err != nil {
570                                 log.Print(err)
571                         }
572                 }()
573         }
574 }
575
576 func (me *Client) torrent(ih InfoHash) *torrent {
577         for _, t := range me.torrents {
578                 if t.InfoHash == ih {
579                         return t
580                 }
581         }
582         return nil
583 }
584
585 type dialResult struct {
586         net.Conn
587         UTP bool
588 }
589
590 func doDial(dial func() (net.Conn, error), ch chan dialResult, utp bool) {
591         conn, err := dial()
592         if err != nil {
593                 if conn != nil {
594                         conn.Close()
595                 }
596                 conn = nil // Pedantic
597         }
598         ch <- dialResult{conn, utp}
599         if err == nil {
600                 successfulDials.Add(1)
601                 return
602         }
603         unsuccessfulDials.Add(1)
604         if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
605                 return
606         }
607         if netOpErr, ok := err.(*net.OpError); ok {
608                 switch netOpErr.Err {
609                 case syscall.ECONNREFUSED, syscall.EHOSTUNREACH:
610                         return
611                 }
612         }
613         if err != nil {
614                 log.Printf("error connecting to peer: %s %#v", err, err)
615                 return
616         }
617 }
618
619 func reducedDialTimeout(max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
620         ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
621         if ret < minDialTimeout {
622                 ret = minDialTimeout
623         }
624         return
625 }
626
627 // Start the process of connecting to the given peer for the given torrent if
628 // appropriate.
629 func (me *Client) initiateConn(peer Peer, t *torrent) {
630         if peer.Id == me.peerID {
631                 return
632         }
633         addr := net.JoinHostPort(peer.IP.String(), fmt.Sprintf("%d", peer.Port))
634         if t.addrActive(addr) {
635                 duplicateConnsAvoided.Add(1)
636                 return
637         }
638         if r := me.ipBlockRange(peer.IP); r != nil {
639                 log.Printf("outbound connect to %s blocked by IP blocklist rule %s", peer.IP, r)
640                 return
641         }
642         dialTimeout := reducedDialTimeout(nominalDialTimeout, me.halfOpenLimit, len(t.Peers))
643         t.HalfOpen[addr] = struct{}{}
644         go func() {
645                 // Binding to the listen address and dialing via net.Dialer gives
646                 // "address in use" error. It seems it's not possible to dial out from
647                 // this address so that peers associate our local address with our
648                 // listen address.
649
650                 // Initiate connections via TCP and UTP simultaneously. Use the first
651                 // one that succeeds.
652                 left := 0
653                 if !me.disableUTP {
654                         left++
655                 }
656                 if !me.disableTCP {
657                         left++
658                 }
659                 resCh := make(chan dialResult, left)
660                 if !me.disableUTP {
661                         go doDial(func() (net.Conn, error) {
662                                 return me.utpSock.DialTimeout(addr, dialTimeout)
663                         }, resCh, true)
664                 }
665                 if !me.disableTCP {
666                         go doDial(func() (net.Conn, error) {
667                                 // time.Sleep(time.Second) // Give uTP a bit of a head start.
668                                 return net.DialTimeout("tcp", addr, dialTimeout)
669                         }, resCh, false)
670                 }
671                 var res dialResult
672                 for ; left > 0 && res.Conn == nil; left-- {
673                         res = <-resCh
674                 }
675                 // Whether or not the connection attempt succeeds, the half open
676                 // counter should be decremented, and new connection attempts made.
677                 go func() {
678                         me.mu.Lock()
679                         defer me.mu.Unlock()
680                         if _, ok := t.HalfOpen[addr]; !ok {
681                                 panic("invariant broken")
682                         }
683                         delete(t.HalfOpen, addr)
684                         me.openNewConns(t)
685                 }()
686                 if res.Conn == nil {
687                         return
688                 }
689                 if left > 0 {
690                         go func() {
691                                 for ; left > 0; left-- {
692                                         conn := (<-resCh).Conn
693                                         if conn != nil {
694                                                 conn.Close()
695                                         }
696                                 }
697                         }()
698                 }
699
700                 // log.Printf("connected to %s", conn.RemoteAddr())
701                 err := me.runConnection(res.Conn, t, peer.Source, res.UTP)
702                 if err != nil {
703                         log.Print(err)
704                 }
705         }()
706 }
707
708 // The port number for incoming peer connections. 0 if the client isn't
709 // listening.
710 func (cl *Client) incomingPeerPort() int {
711         listenAddr := cl.ListenAddr()
712         if listenAddr == nil {
713                 return 0
714         }
715         return addrPort(listenAddr)
716 }
717
718 // Convert a net.Addr to its compact IP representation. Either 4 or 16 bytes
719 // per "yourip" field of http://www.bittorrent.org/beps/bep_0010.html.
720 func addrCompactIP(addr net.Addr) (string, error) {
721         host, _, err := net.SplitHostPort(addr.String())
722         if err != nil {
723                 return "", err
724         }
725         ip := net.ParseIP(host)
726         if v4 := ip.To4(); v4 != nil {
727                 if len(v4) != 4 {
728                         panic(v4)
729                 }
730                 return string(v4), nil
731         }
732         return string(ip.To16()), nil
733 }
734
735 func handshakeWriter(w io.WriteCloser, bb <-chan []byte, done chan<- error) {
736         var err error
737         for b := range bb {
738                 _, err = w.Write(b)
739                 if err != nil {
740                         w.Close()
741                         break
742                 }
743         }
744         done <- err
745 }
746
747 type peerExtensionBytes [8]byte
748 type peerID [20]byte
749
750 type handshakeResult struct {
751         peerExtensionBytes
752         peerID
753         InfoHash
754 }
755
756 func handshake(sock io.ReadWriteCloser, ih *InfoHash, peerID [20]byte) (res handshakeResult, ok bool, err error) {
757         // Bytes to be sent to the peer. Should never block the sender.
758         postCh := make(chan []byte, 4)
759         // A single error value sent when the writer completes.
760         writeDone := make(chan error, 1)
761         // Performs writes to the socket and ensures posts don't block.
762         go handshakeWriter(sock, postCh, writeDone)
763
764         defer func() {
765                 close(postCh) // Done writing.
766                 if !ok {
767                         return
768                 }
769                 if err != nil {
770                         panic(err)
771                 }
772                 // Wait until writes complete before returning from handshake.
773                 err = <-writeDone
774                 if err != nil {
775                         err = fmt.Errorf("error writing during handshake: %s", err)
776                 }
777         }()
778
779         post := func(bb []byte) {
780                 select {
781                 case postCh <- bb:
782                 default:
783                         panic("mustn't block while posting")
784                 }
785         }
786
787         post([]byte(pp.Protocol))
788         post([]byte(extensionBytes))
789         if ih != nil { // We already know what we want.
790                 post(ih[:])
791                 post(peerID[:])
792         }
793         var b [68]byte
794         _, err = io.ReadFull(sock, b[:68])
795         if err != nil {
796                 err = nil
797                 return
798         }
799         if string(b[:20]) != pp.Protocol {
800                 return
801         }
802         CopyExact(&res.peerExtensionBytes, b[20:28])
803         CopyExact(&res.InfoHash, b[28:48])
804         CopyExact(&res.peerID, b[48:68])
805
806         if ih == nil { // We were waiting for the peer to tell us what they wanted.
807                 post(res.InfoHash[:])
808                 post(peerID[:])
809         }
810
811         ok = true
812         return
813 }
814
815 type peerConn struct {
816         net.Conn
817 }
818
819 func (pc peerConn) Read(b []byte) (n int, err error) {
820         // Keep-alives should be received every 2 mins. Give a bit of gracetime.
821         err = pc.Conn.SetReadDeadline(time.Now().Add(150 * time.Second))
822         if err != nil {
823                 err = fmt.Errorf("error setting read deadline: %s", err)
824         }
825         n, err = pc.Conn.Read(b)
826         if err != nil {
827                 if opError, ok := err.(*net.OpError); ok && opError.Op == "read" && opError.Err == syscall.ECONNRESET {
828                         err = io.EOF
829                 } else if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
830                         if n != 0 {
831                                 panic(n)
832                         }
833                         err = io.EOF
834                 }
835         }
836         return
837 }
838
839 func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerSource, uTP bool) (err error) {
840         if tcpConn, ok := sock.(*net.TCPConn); ok {
841                 tcpConn.SetLinger(0)
842         }
843         defer sock.Close()
844         me.mu.Lock()
845         me.handshaking++
846         me.mu.Unlock()
847         // One minute to complete handshake.
848         sock.SetDeadline(time.Now().Add(time.Minute))
849         hsRes, ok, err := handshake(sock, func() *InfoHash {
850                 if torrent == nil {
851                         return nil
852                 } else {
853                         return &torrent.InfoHash
854                 }
855         }(), me.peerID)
856         me.mu.Lock()
857         defer me.mu.Unlock()
858         if me.handshaking == 0 {
859                 panic("handshake count invariant is broken")
860         }
861         me.handshaking--
862         if err != nil {
863                 err = fmt.Errorf("error during handshake: %s", err)
864                 return
865         }
866         if !ok {
867                 return
868         }
869         if hsRes.peerID == me.peerID {
870                 return
871         }
872         torrent = me.torrent(hsRes.InfoHash)
873         if torrent == nil {
874                 return
875         }
876         sock.SetWriteDeadline(time.Time{})
877         sock = peerConn{sock}
878         conn := newConnection(sock, hsRes.peerExtensionBytes, hsRes.peerID, uTP)
879         defer conn.Close()
880         conn.Discovery = discovery
881         if !me.addConnection(torrent, conn) {
882                 return
883         }
884         if conn.PeerExtensionBytes[5]&0x10 != 0 {
885                 conn.Post(pp.Message{
886                         Type:       pp.Extended,
887                         ExtendedID: pp.HandshakeExtendedID,
888                         ExtendedPayload: func() []byte {
889                                 d := map[string]interface{}{
890                                         "m": map[string]int{
891                                                 "ut_metadata": 1,
892                                                 "ut_pex":      2,
893                                         },
894                                         "v": "go.torrent dev 20140825", // Just the date
895                                         // No upload queue is implemented yet.
896                                         "reqq": func() int {
897                                                 if me.noUpload {
898                                                         // No need to look strange if it costs us nothing.
899                                                         return 250
900                                                 } else {
901                                                         return 1
902                                                 }
903                                         }(),
904                                 }
905                                 if torrent.metadataSizeKnown() {
906                                         d["metadata_size"] = torrent.metadataSize()
907                                 }
908                                 if p := me.incomingPeerPort(); p != 0 {
909                                         d["p"] = p
910                                 }
911                                 yourip, err := addrCompactIP(conn.Socket.RemoteAddr())
912                                 if err != nil {
913                                         log.Printf("error calculating yourip field value in extension handshake: %s", err)
914                                 } else {
915                                         d["yourip"] = yourip
916                                 }
917                                 // log.Printf("sending %v", d)
918                                 b, err := bencode.Marshal(d)
919                                 if err != nil {
920                                         panic(err)
921                                 }
922                                 return b
923                         }(),
924                 })
925         }
926         if torrent.haveAnyPieces() {
927                 conn.Post(pp.Message{
928                         Type:     pp.Bitfield,
929                         Bitfield: torrent.bitfield(),
930                 })
931         }
932         if conn.PeerExtensionBytes[7]&0x01 != 0 && me.dHT != nil {
933                 conn.Post(pp.Message{
934                         Type: pp.Port,
935                         Port: uint16(AddrPort(me.dHT.LocalAddr())),
936                 })
937         }
938         if torrent.haveInfo() {
939                 torrent.initRequestOrdering(conn)
940                 me.replenishConnRequests(torrent, conn)
941         }
942         err = me.connectionLoop(torrent, conn)
943         if err != nil {
944                 err = fmt.Errorf("during Connection loop with peer %q: %s", conn.PeerID, err)
945         }
946         me.dropConnection(torrent, conn)
947         return
948 }
949
950 func (t *torrent) initRequestOrdering(c *connection) {
951         if c.pieceRequestOrder != nil || c.piecePriorities != nil {
952                 panic("double init of request ordering")
953         }
954         c.piecePriorities = mathRand.Perm(t.NumPieces())
955         c.pieceRequestOrder = pieceordering.New()
956         for i := 0; i < t.NumPieces(); i++ {
957                 if !c.PeerHasPiece(pp.Integer(i)) {
958                         continue
959                 }
960                 if !t.wantPiece(i) {
961                         continue
962                 }
963                 t.connPendPiece(c, i)
964         }
965 }
966
967 func (me *Client) peerGotPiece(t *torrent, c *connection, piece int) {
968         if t.haveInfo() {
969                 if c.PeerPieces == nil {
970                         c.PeerPieces = make([]bool, t.NumPieces())
971                 }
972         } else {
973                 for piece >= len(c.PeerPieces) {
974                         c.PeerPieces = append(c.PeerPieces, false)
975                 }
976         }
977         c.PeerPieces[piece] = true
978         if t.wantPiece(piece) {
979                 t.connPendPiece(c, piece)
980                 me.replenishConnRequests(t, c)
981         }
982 }
983
984 func (me *Client) peerUnchoked(torrent *torrent, conn *connection) {
985         me.replenishConnRequests(torrent, conn)
986 }
987
988 func (cl *Client) connCancel(t *torrent, cn *connection, r request) (ok bool) {
989         ok = cn.Cancel(r)
990         if ok {
991                 postedCancels.Add(1)
992                 cl.downloadStrategy.DeleteRequest(t, r)
993         }
994         return
995 }
996
997 func (cl *Client) connDeleteRequest(t *torrent, cn *connection, r request) {
998         if !cn.RequestPending(r) {
999                 return
1000         }
1001         cl.downloadStrategy.DeleteRequest(t, r)
1002         delete(cn.Requests, r)
1003 }
1004
1005 func (cl *Client) requestPendingMetadata(t *torrent, c *connection) {
1006         if t.haveInfo() {
1007                 return
1008         }
1009         var pending []int
1010         for index := 0; index < t.MetadataPieceCount(); index++ {
1011                 if !t.HaveMetadataPiece(index) {
1012                         pending = append(pending, index)
1013                 }
1014         }
1015         for _, i := range mathRand.Perm(len(pending)) {
1016                 c.Post(pp.Message{
1017                         Type:       pp.Extended,
1018                         ExtendedID: byte(c.PeerExtensionIDs["ut_metadata"]),
1019                         ExtendedPayload: func() []byte {
1020                                 b, err := bencode.Marshal(map[string]int{
1021                                         "msg_type": 0,
1022                                         "piece":    pending[i],
1023                                 })
1024                                 if err != nil {
1025                                         panic(err)
1026                                 }
1027                                 return b
1028                         }(),
1029                 })
1030         }
1031 }
1032
1033 func (cl *Client) completedMetadata(t *torrent) {
1034         h := sha1.New()
1035         h.Write(t.MetaData)
1036         var ih InfoHash
1037         CopyExact(&ih, h.Sum(nil))
1038         if ih != t.InfoHash {
1039                 log.Print("bad metadata")
1040                 t.InvalidateMetadata()
1041                 return
1042         }
1043         var info metainfo.Info
1044         err := bencode.Unmarshal(t.MetaData, &info)
1045         if err != nil {
1046                 log.Printf("error unmarshalling metadata: %s", err)
1047                 t.InvalidateMetadata()
1048                 return
1049         }
1050         // TODO(anacrolix): If this fails, I think something harsher should be
1051         // done.
1052         err = cl.setMetaData(t, info, t.MetaData)
1053         if err != nil {
1054                 log.Printf("error setting metadata: %s", err)
1055                 t.InvalidateMetadata()
1056                 return
1057         }
1058         log.Printf("%s: got metadata from peers", t)
1059 }
1060
1061 // Process incoming ut_metadata message.
1062 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *torrent, c *connection) (err error) {
1063         var d map[string]int
1064         err = bencode.Unmarshal(payload, &d)
1065         if err != nil {
1066                 err = fmt.Errorf("error unmarshalling payload: %s: %q", err, payload)
1067                 return
1068         }
1069         msgType, ok := d["msg_type"]
1070         if !ok {
1071                 err = errors.New("missing msg_type field")
1072                 return
1073         }
1074         piece := d["piece"]
1075         switch msgType {
1076         case pp.DataMetadataExtensionMsgType:
1077                 if t.haveInfo() {
1078                         break
1079                 }
1080                 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
1081                 if begin < 0 || begin >= len(payload) {
1082                         log.Printf("got bad metadata piece")
1083                         break
1084                 }
1085                 t.SaveMetadataPiece(piece, payload[begin:])
1086                 c.UsefulChunksReceived++
1087                 c.lastUsefulChunkReceived = time.Now()
1088                 if !t.HaveAllMetadataPieces() {
1089                         break
1090                 }
1091                 cl.completedMetadata(t)
1092         case pp.RequestMetadataExtensionMsgType:
1093                 if !t.HaveMetadataPiece(piece) {
1094                         c.Post(t.NewMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
1095                         break
1096                 }
1097                 start := (1 << 14) * piece
1098                 c.Post(t.NewMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.MetaData[start:start+t.metadataPieceSize(piece)]))
1099         case pp.RejectMetadataExtensionMsgType:
1100         default:
1101                 err = errors.New("unknown msg_type value")
1102         }
1103         return
1104 }
1105
1106 type peerExchangeMessage struct {
1107         Added      CompactPeers   `bencode:"added"`
1108         AddedFlags []byte         `bencode:"added.f"`
1109         Dropped    []tracker.Peer `bencode:"dropped"`
1110 }
1111
1112 // Extracts the port as an integer from an address string.
1113 func addrPort(addr net.Addr) int {
1114         return AddrPort(addr)
1115 }
1116
1117 // Processes incoming bittorrent messages. The client lock is held upon entry
1118 // and exit.
1119 func (me *Client) connectionLoop(t *torrent, c *connection) error {
1120         decoder := pp.Decoder{
1121                 R:         bufio.NewReader(c.Socket),
1122                 MaxLength: 256 * 1024,
1123         }
1124         for {
1125                 me.mu.Unlock()
1126                 var msg pp.Message
1127                 err := decoder.Decode(&msg)
1128                 me.mu.Lock()
1129                 c.lastMessageReceived = time.Now()
1130                 select {
1131                 case <-c.closing:
1132                         return nil
1133                 default:
1134                 }
1135                 if err != nil {
1136                         if me.stopped() || err == io.EOF {
1137                                 return nil
1138                         }
1139                         return err
1140                 }
1141                 if msg.Keepalive {
1142                         continue
1143                 }
1144                 switch msg.Type {
1145                 case pp.Choke:
1146                         c.PeerChoked = true
1147                         for r := range c.Requests {
1148                                 me.connDeleteRequest(t, c, r)
1149                         }
1150                         // We can then reset our interest.
1151                         me.replenishConnRequests(t, c)
1152                 case pp.Unchoke:
1153                         c.PeerChoked = false
1154                         me.peerUnchoked(t, c)
1155                 case pp.Interested:
1156                         c.PeerInterested = true
1157                         // TODO: This should be done from a dedicated unchoking routine.
1158                         if me.noUpload {
1159                                 break
1160                         }
1161                         c.Unchoke()
1162                 case pp.NotInterested:
1163                         c.PeerInterested = false
1164                         c.Choke()
1165                 case pp.Have:
1166                         me.peerGotPiece(t, c, int(msg.Index))
1167                 case pp.Request:
1168                         if me.noUpload {
1169                                 break
1170                         }
1171                         if c.PeerRequests == nil {
1172                                 c.PeerRequests = make(map[request]struct{}, maxRequests)
1173                         }
1174                         request := newRequest(msg.Index, msg.Begin, msg.Length)
1175                         // TODO: Requests should be satisfied from a dedicated upload routine.
1176                         // c.PeerRequests[request] = struct{}{}
1177                         p := make([]byte, msg.Length)
1178                         n, err := t.Data.ReadAt(p, int64(t.PieceLength(0))*int64(msg.Index)+int64(msg.Begin))
1179                         if err != nil {
1180                                 return fmt.Errorf("reading t data to serve request %q: %s", request, err)
1181                         }
1182                         if n != int(msg.Length) {
1183                                 return fmt.Errorf("bad request: %v", msg)
1184                         }
1185                         c.Post(pp.Message{
1186                                 Type:  pp.Piece,
1187                                 Index: msg.Index,
1188                                 Begin: msg.Begin,
1189                                 Piece: p,
1190                         })
1191                         uploadChunksPosted.Add(1)
1192                 case pp.Cancel:
1193                         req := newRequest(msg.Index, msg.Begin, msg.Length)
1194                         if !c.PeerCancel(req) {
1195                                 unexpectedCancels.Add(1)
1196                         }
1197                 case pp.Bitfield:
1198                         if c.PeerPieces != nil {
1199                                 err = errors.New("received unexpected bitfield")
1200                                 break
1201                         }
1202                         if t.haveInfo() {
1203                                 if len(msg.Bitfield) < t.NumPieces() {
1204                                         err = errors.New("received invalid bitfield")
1205                                         break
1206                                 }
1207                                 msg.Bitfield = msg.Bitfield[:t.NumPieces()]
1208                         }
1209                         c.PeerPieces = msg.Bitfield
1210                         for index, has := range c.PeerPieces {
1211                                 if has {
1212                                         me.peerGotPiece(t, c, index)
1213                                 }
1214                         }
1215                 case pp.Piece:
1216                         err = me.downloadedChunk(t, c, &msg)
1217                 case pp.Extended:
1218                         switch msg.ExtendedID {
1219                         case pp.HandshakeExtendedID:
1220                                 // TODO: Create a bencode struct for this.
1221                                 var d map[string]interface{}
1222                                 err = bencode.Unmarshal(msg.ExtendedPayload, &d)
1223                                 if err != nil {
1224                                         err = fmt.Errorf("error decoding extended message payload: %s", err)
1225                                         break
1226                                 }
1227                                 // log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d)
1228                                 if reqq, ok := d["reqq"]; ok {
1229                                         if i, ok := reqq.(int64); ok {
1230                                                 c.PeerMaxRequests = int(i)
1231                                         }
1232                                 }
1233                                 if v, ok := d["v"]; ok {
1234                                         c.PeerClientName = v.(string)
1235                                 }
1236                                 m, ok := d["m"]
1237                                 if !ok {
1238                                         err = errors.New("handshake missing m item")
1239                                         break
1240                                 }
1241                                 mTyped, ok := m.(map[string]interface{})
1242                                 if !ok {
1243                                         err = errors.New("handshake m value is not dict")
1244                                         break
1245                                 }
1246                                 if c.PeerExtensionIDs == nil {
1247                                         c.PeerExtensionIDs = make(map[string]int64, len(mTyped))
1248                                 }
1249                                 for name, v := range mTyped {
1250                                         id, ok := v.(int64)
1251                                         if !ok {
1252                                                 log.Printf("bad handshake m item extension ID type: %T", v)
1253                                                 continue
1254                                         }
1255                                         if id == 0 {
1256                                                 delete(c.PeerExtensionIDs, name)
1257                                         } else {
1258                                                 c.PeerExtensionIDs[name] = id
1259                                         }
1260                                 }
1261                                 metadata_sizeUntyped, ok := d["metadata_size"]
1262                                 if ok {
1263                                         metadata_size, ok := metadata_sizeUntyped.(int64)
1264                                         if !ok {
1265                                                 log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
1266                                         } else {
1267                                                 t.SetMetadataSize(metadata_size)
1268                                         }
1269                                 }
1270                                 if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
1271                                         me.requestPendingMetadata(t, c)
1272                                 }
1273                         case 1:
1274                                 err = me.gotMetadataExtensionMsg(msg.ExtendedPayload, t, c)
1275                                 if err != nil {
1276                                         err = fmt.Errorf("error handling metadata extension message: %s", err)
1277                                 }
1278                         case 2:
1279                                 var pexMsg peerExchangeMessage
1280                                 err := bencode.Unmarshal(msg.ExtendedPayload, &pexMsg)
1281                                 if err != nil {
1282                                         err = fmt.Errorf("error unmarshalling PEX message: %s", err)
1283                                         break
1284                                 }
1285                                 go func() {
1286                                         err := me.AddPeers(t.InfoHash, func() (ret []Peer) {
1287                                                 for _, cp := range pexMsg.Added {
1288                                                         p := Peer{
1289                                                                 IP:     make([]byte, 4),
1290                                                                 Port:   int(cp.Port),
1291                                                                 Source: peerSourcePEX,
1292                                                         }
1293                                                         if n := copy(p.IP, cp.IP[:]); n != 4 {
1294                                                                 panic(n)
1295                                                         }
1296                                                         ret = append(ret, p)
1297                                                 }
1298                                                 return
1299                                         }())
1300                                         if err != nil {
1301                                                 log.Printf("error adding PEX peers: %s", err)
1302                                                 return
1303                                         }
1304                                         peersFoundByPEX.Add(int64(len(pexMsg.Added)))
1305                                 }()
1306                         default:
1307                                 err = fmt.Errorf("unexpected extended message ID: %v", msg.ExtendedID)
1308                         }
1309                         if err != nil {
1310                                 // That client uses its own extension IDs for outgoing message
1311                                 // types, which is incorrect.
1312                                 if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) ||
1313                                         strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
1314                                         return nil
1315                                 }
1316                                 // log.Printf("peer extension map: %#v", c.PeerExtensionIDs)
1317                         }
1318                 case pp.Port:
1319                         if me.dHT == nil {
1320                                 break
1321                         }
1322                         pingAddr, err := net.ResolveUDPAddr("", c.Socket.RemoteAddr().String())
1323                         if err != nil {
1324                                 panic(err)
1325                         }
1326                         if msg.Port != 0 {
1327                                 pingAddr.Port = int(msg.Port)
1328                         }
1329                         _, err = me.dHT.Ping(pingAddr)
1330                 default:
1331                         err = fmt.Errorf("received unknown message type: %#v", msg.Type)
1332                 }
1333                 if err != nil {
1334                         return err
1335                 }
1336         }
1337 }
1338
1339 func (me *Client) dropConnection(torrent *torrent, conn *connection) {
1340         for r := range conn.Requests {
1341                 me.connDeleteRequest(torrent, conn, r)
1342         }
1343         conn.Close()
1344         for i0, c := range torrent.Conns {
1345                 if c != conn {
1346                         continue
1347                 }
1348                 i1 := len(torrent.Conns) - 1
1349                 if i0 != i1 {
1350                         torrent.Conns[i0] = torrent.Conns[i1]
1351                 }
1352                 torrent.Conns = torrent.Conns[:i1]
1353                 me.openNewConns(torrent)
1354                 return
1355         }
1356         panic("connection not found")
1357 }
1358
1359 func (me *Client) addConnection(t *torrent, c *connection) bool {
1360         if me.stopped() {
1361                 return false
1362         }
1363         select {
1364         case <-t.ceasingNetworking:
1365                 return false
1366         default:
1367         }
1368         if !me.wantConns(t) {
1369                 return false
1370         }
1371         for _, c0 := range t.Conns {
1372                 if c.PeerID == c0.PeerID {
1373                         // Already connected to a client with that ID.
1374                         return false
1375                 }
1376         }
1377         t.Conns = append(t.Conns, c)
1378         // TODO: This should probably be done by a routine that kills off bad
1379         // connections, and extra connections killed here instead.
1380         if len(t.Conns) > socketsPerTorrent {
1381                 wcs := t.worstConnsHeap()
1382                 heap.Pop(wcs).(*connection).Close()
1383         }
1384         return true
1385 }
1386
1387 func (t *torrent) needData() bool {
1388         if !t.haveInfo() {
1389                 return true
1390         }
1391         for i := range t.Pieces {
1392                 if t.wantPiece(i) {
1393                         return true
1394                 }
1395         }
1396         return false
1397 }
1398
1399 // TODO: I'm sure there's something here to do with seeding.
1400 func (t *torrent) badConn(c *connection) bool {
1401         if time.Now().Sub(c.completedHandshake) < 30*time.Second {
1402                 return false
1403         }
1404         if !t.haveInfo() {
1405                 return !c.supportsExtension("ut_metadata")
1406         }
1407         return !t.connHasWantedPieces(c)
1408 }
1409
1410 func (t *torrent) numGoodConns() (num int) {
1411         for _, c := range t.Conns {
1412                 if !t.badConn(c) {
1413                         num++
1414                 }
1415         }
1416         return
1417 }
1418
1419 func (me *Client) wantConns(t *torrent) bool {
1420         if !t.needData() && me.noUpload {
1421                 return false
1422         }
1423         if t.numGoodConns() >= socketsPerTorrent {
1424                 return false
1425         }
1426         return true
1427 }
1428
1429 func (me *Client) openNewConns(t *torrent) {
1430         select {
1431         case <-t.ceasingNetworking:
1432                 return
1433         default:
1434         }
1435         for len(t.Peers) != 0 {
1436                 if !me.wantConns(t) {
1437                         return
1438                 }
1439                 if len(t.HalfOpen) >= me.halfOpenLimit {
1440                         return
1441                 }
1442                 var (
1443                         k peersKey
1444                         p Peer
1445                 )
1446                 for k, p = range t.Peers {
1447                         break
1448                 }
1449                 delete(t.Peers, k)
1450                 me.initiateConn(p, t)
1451         }
1452         t.wantPeers.Broadcast()
1453 }
1454
1455 func (me *Client) addPeers(t *torrent, peers []Peer) {
1456         blocked := 0
1457         for i, p := range peers {
1458                 if me.ipBlockRange(p.IP) == nil {
1459                         continue
1460                 }
1461                 peers[i] = peers[len(peers)-1]
1462                 peers = peers[:len(peers)-1]
1463                 i--
1464                 blocked++
1465         }
1466         if blocked != 0 {
1467                 log.Printf("IP blocklist screened %d peers from being added", blocked)
1468         }
1469         t.AddPeers(peers)
1470         me.openNewConns(t)
1471 }
1472
1473 // Adds peers to the swarm for the torrent corresponding to infoHash.
1474 func (me *Client) AddPeers(infoHash InfoHash, peers []Peer) error {
1475         me.mu.Lock()
1476         defer me.mu.Unlock()
1477         t := me.torrent(infoHash)
1478         if t == nil {
1479                 return errors.New("no such torrent")
1480         }
1481         me.addPeers(t, peers)
1482         return nil
1483 }
1484
1485 func (cl *Client) torrentFileCachePath(ih InfoHash) string {
1486         return filepath.Join(cl.configDir(), "torrents", ih.HexString()+".torrent")
1487 }
1488
1489 func (cl *Client) saveTorrentFile(t *torrent) error {
1490         path := cl.torrentFileCachePath(t.InfoHash)
1491         os.MkdirAll(filepath.Dir(path), 0777)
1492         f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
1493         if err != nil {
1494                 return fmt.Errorf("error opening file: %s", err)
1495         }
1496         defer f.Close()
1497         e := bencode.NewEncoder(f)
1498         err = e.Encode(t.MetaInfo())
1499         if err != nil {
1500                 return fmt.Errorf("error marshalling metainfo: %s", err)
1501         }
1502         mi, _ := cl.torrentCacheMetaInfo(t.InfoHash)
1503         if !bytes.Equal(mi.Info.Hash, t.InfoHash[:]) {
1504                 log.Fatalf("%x != %x", mi.Info.Hash, t.InfoHash[:])
1505         }
1506         return nil
1507 }
1508
1509 func (cl *Client) setMetaData(t *torrent, md metainfo.Info, bytes []byte) (err error) {
1510         err = t.setMetadata(md, cl.dataDir, bytes, &cl.mu)
1511         if err != nil {
1512                 return
1513         }
1514         // If the client intends to upload, it needs to know what state pieces are
1515         // in.
1516         if !cl.noUpload {
1517                 // Queue all pieces for hashing. This is done sequentially to avoid
1518                 // spamming goroutines.
1519                 for _, p := range t.Pieces {
1520                         p.QueuedForHash = true
1521                 }
1522                 go func() {
1523                         for i := range t.Pieces {
1524                                 cl.verifyPiece(t, pp.Integer(i))
1525                         }
1526                 }()
1527         }
1528
1529         cl.downloadStrategy.TorrentStarted(t)
1530         if err := cl.saveTorrentFile(t); err != nil {
1531                 log.Printf("error saving torrent file for %s: %s", t, err)
1532         }
1533         close(t.gotMetainfo)
1534         return
1535 }
1536
1537 // Prepare a Torrent without any attachment to a Client. That means we can
1538 // initialize fields all fields that don't require the Client without locking
1539 // it.
1540 func newTorrent(ih InfoHash, announceList [][]string, halfOpenLimit int) (t *torrent, err error) {
1541         t = &torrent{
1542                 InfoHash: ih,
1543                 Peers:    make(map[peersKey]Peer),
1544
1545                 closing:           make(chan struct{}),
1546                 ceasingNetworking: make(chan struct{}),
1547
1548                 gotMetainfo: make(chan struct{}),
1549
1550                 HalfOpen: make(map[string]struct{}),
1551         }
1552         t.wantPeers.L = &t.stateMu
1553         t.GotMetainfo = t.gotMetainfo
1554         t.addTrackers(announceList)
1555         return
1556 }
1557
1558 func init() {
1559         // For shuffling the tracker tiers.
1560         mathRand.Seed(time.Now().Unix())
1561 }
1562
1563 // The trackers within each tier must be shuffled before use.
1564 // http://stackoverflow.com/a/12267471/149482
1565 // http://www.bittorrent.org/beps/bep_0012.html#order-of-processing
1566 func shuffleTier(tier []tracker.Client) {
1567         for i := range tier {
1568                 j := mathRand.Intn(i + 1)
1569                 tier[i], tier[j] = tier[j], tier[i]
1570         }
1571 }
1572
1573 func copyTrackers(base [][]tracker.Client) (copy [][]tracker.Client) {
1574         for _, tier := range base {
1575                 copy = append(copy, append([]tracker.Client{}, tier...))
1576         }
1577         return
1578 }
1579
1580 func mergeTier(tier []tracker.Client, newURLs []string) []tracker.Client {
1581 nextURL:
1582         for _, url := range newURLs {
1583                 for _, tr := range tier {
1584                         if tr.URL() == url {
1585                                 continue nextURL
1586                         }
1587                 }
1588                 tr, err := tracker.New(url)
1589                 if err != nil {
1590                         log.Printf("error creating tracker client for %q: %s", url, err)
1591                         continue
1592                 }
1593                 tier = append(tier, tr)
1594         }
1595         return tier
1596 }
1597
1598 func (t *torrent) addTrackers(announceList [][]string) {
1599         newTrackers := copyTrackers(t.Trackers)
1600         for tierIndex, tier := range announceList {
1601                 if tierIndex < len(newTrackers) {
1602                         newTrackers[tierIndex] = mergeTier(newTrackers[tierIndex], tier)
1603                 } else {
1604                         newTrackers = append(newTrackers, mergeTier(nil, tier))
1605                 }
1606                 shuffleTier(newTrackers[tierIndex])
1607         }
1608         t.Trackers = newTrackers
1609 }
1610
1611 type Torrent struct {
1612         cl *Client
1613         *torrent
1614 }
1615
1616 type File struct {
1617         t      Torrent
1618         path   string
1619         offset int64
1620         length int64
1621 }
1622
1623 func (f *File) Length() int64 {
1624         return f.length
1625 }
1626
1627 func (f *File) PrioritizeRegion(off, len int64) {
1628         if off < 0 || off >= f.length {
1629                 return
1630         }
1631         if off+len > f.length {
1632                 len = f.length - off
1633         }
1634         off += f.offset
1635         f.t.SetRegionPriority(off, len)
1636 }
1637
1638 // Returns handles to the files in the torrent. This requires the metainfo is
1639 // available first.
1640 func (t Torrent) Files() (ret []File) {
1641         var offset int64
1642         for _, fi := range t.Info.UpvertedFiles() {
1643                 ret = append(ret, File{
1644                         t,
1645                         strings.Join(fi.Path, "/"),
1646                         offset,
1647                         fi.Length,
1648                 })
1649                 offset += fi.Length
1650         }
1651         return
1652 }
1653
1654 func (t Torrent) SetRegionPriority(off, len int64) {
1655         t.cl.mu.Lock()
1656         defer t.cl.mu.Unlock()
1657         pieceSize := int64(t.UsualPieceSize())
1658         for i := off / pieceSize; i*pieceSize < off+len; i++ {
1659                 t.cl.prioritizePiece(t.torrent, int(i), piecePriorityNormal)
1660         }
1661 }
1662
1663 func (t Torrent) MetainfoFilepath() string {
1664         return filepath.Join(t.cl.ConfigDir(), "torrents", t.InfoHash.HexString()+".torrent")
1665 }
1666
1667 func (t Torrent) AddPeers(pp []Peer) error {
1668         return t.cl.AddPeers(t.torrent.InfoHash, pp)
1669 }
1670
1671 func (t Torrent) DownloadAll() {
1672         t.cl.mu.Lock()
1673         for i := 0; i < t.NumPieces(); i++ {
1674                 // TODO: Leave higher priorities as they were?
1675                 t.cl.prioritizePiece(t.torrent, i, piecePriorityNormal)
1676         }
1677         // Nice to have the first and last pieces soon for various interactive
1678         // purposes.
1679         t.cl.prioritizePiece(t.torrent, 0, piecePriorityReadahead)
1680         t.cl.prioritizePiece(t.torrent, t.NumPieces()-1, piecePriorityReadahead)
1681         t.cl.mu.Unlock()
1682 }
1683
1684 func (me Torrent) ReadAt(p []byte, off int64) (n int, err error) {
1685         return me.cl.torrentReadAt(me.torrent, off, p)
1686 }
1687
1688 func (cl *Client) torrentCacheMetaInfo(ih InfoHash) (mi *metainfo.MetaInfo, err error) {
1689         f, err := os.Open(cl.torrentFileCachePath(ih))
1690         if err != nil {
1691                 if os.IsNotExist(err) {
1692                         err = nil
1693                 }
1694                 return
1695         }
1696         defer f.Close()
1697         dec := bencode.NewDecoder(f)
1698         err = dec.Decode(&mi)
1699         if err != nil {
1700                 return
1701         }
1702         if !bytes.Equal(mi.Info.Hash, ih[:]) {
1703                 err = fmt.Errorf("cached torrent has wrong infohash: %x != %x", mi.Info.Hash, ih[:])
1704                 return
1705         }
1706         return
1707 }
1708
1709 func (cl *Client) AddMagnet(uri string) (T Torrent, err error) {
1710         m, err := ParseMagnetURI(uri)
1711         if err != nil {
1712                 return
1713         }
1714         mi, err := cl.torrentCacheMetaInfo(m.InfoHash)
1715         if err != nil {
1716                 log.Printf("error getting cached metainfo for %x: %s", m.InfoHash[:], err)
1717         } else if mi != nil {
1718                 cl.AddTorrent(mi)
1719         }
1720         cl.mu.Lock()
1721         defer cl.mu.Unlock()
1722         T, err = cl.addOrMergeTorrent(m.InfoHash, [][]string{m.Trackers})
1723         if err != nil {
1724                 return
1725         }
1726         if m.DisplayName != "" {
1727                 T.DisplayName = m.DisplayName
1728         }
1729         return
1730 }
1731
1732 // Actively prunes unused connections. This is required to make space to dial
1733 // for replacements.
1734 func (cl *Client) connectionPruner(t *torrent) {
1735         for {
1736                 time.Sleep(15 * time.Second)
1737                 cl.mu.Lock()
1738                 select {
1739                 case <-t.ceasingNetworking:
1740                 default:
1741                 }
1742                 license := len(t.Conns) - (socketsPerTorrent+1)/2
1743                 for _, c := range t.Conns {
1744                         if license <= 0 {
1745                                 break
1746                         }
1747                         if time.Now().Sub(c.lastUsefulChunkReceived) < time.Minute {
1748                                 continue
1749                         }
1750                         if time.Now().Sub(c.completedHandshake) < time.Minute {
1751                                 continue
1752                         }
1753                         c.Close()
1754                         license--
1755                 }
1756                 cl.mu.Unlock()
1757         }
1758 }
1759
1760 func (me *Client) DropTorrent(infoHash InfoHash) (err error) {
1761         me.mu.Lock()
1762         defer me.mu.Unlock()
1763         t, ok := me.torrents[infoHash]
1764         if !ok {
1765                 err = fmt.Errorf("no such torrent")
1766                 return
1767         }
1768         err = t.Close()
1769         if err != nil {
1770                 panic(err)
1771         }
1772         delete(me.torrents, infoHash)
1773         me.downloadStrategy.TorrentStopped(t)
1774         for _, dw := range me.dataWaits[t] {
1775                 close(dw.ready)
1776         }
1777         delete(me.dataWaits, t)
1778         return
1779 }
1780
1781 func (me *Client) addOrMergeTorrent(ih InfoHash, announceList [][]string) (T Torrent, err error) {
1782         if _, ok := me.bannedTorrents[ih]; ok {
1783                 err = errors.New("banned torrent")
1784                 return
1785         }
1786         T.cl = me
1787         var ok bool
1788         T.torrent, ok = me.torrents[ih]
1789         if ok {
1790                 T.torrent.addTrackers(announceList)
1791         } else {
1792                 T.torrent, err = newTorrent(ih, announceList, me.halfOpenLimit)
1793                 if err != nil {
1794                         return
1795                 }
1796                 me.torrents[ih] = T.torrent
1797                 if !me.disableTrackers {
1798                         go me.announceTorrentTrackers(T.torrent)
1799                 }
1800                 if me.dHT != nil {
1801                         go me.announceTorrentDHT(T.torrent, true)
1802                 }
1803                 go me.connectionPruner(T.torrent)
1804         }
1805         return
1806 }
1807
1808 // Adds the torrent to the client.
1809 func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) (t Torrent, err error) {
1810         var ih InfoHash
1811         CopyExact(&ih, metaInfo.Info.Hash)
1812         me.mu.Lock()
1813         defer me.mu.Unlock()
1814         t, err = me.addOrMergeTorrent(ih, metaInfo.AnnounceList)
1815         if err != nil {
1816                 return
1817         }
1818         if !t.torrent.haveInfo() {
1819                 err = me.setMetaData(t.torrent, metaInfo.Info.Info, metaInfo.Info.Bytes)
1820                 if err != nil {
1821                         return
1822                 }
1823         }
1824         return
1825 }
1826
1827 func (me *Client) AddTorrentFromFile(name string) (t Torrent, err error) {
1828         mi, err := metainfo.LoadFromFile(name)
1829         if err != nil {
1830                 err = fmt.Errorf("error loading metainfo from file: %s", err)
1831                 return
1832         }
1833         return me.AddTorrent(mi)
1834 }
1835
1836 // Returns true when peers are required, or false if the torrent is closing.
1837 func (cl *Client) waitWantPeers(t *torrent) bool {
1838         cl.mu.Lock()
1839         defer cl.mu.Unlock()
1840         t.stateMu.Lock()
1841         defer t.stateMu.Unlock()
1842         for {
1843                 select {
1844                 case <-t.ceasingNetworking:
1845                         return false
1846                 default:
1847                 }
1848                 if len(t.Peers) < torrentPeersLowWater {
1849                         return true
1850                 }
1851                 cl.mu.Unlock()
1852                 t.wantPeers.Wait()
1853                 t.stateMu.Unlock()
1854                 cl.mu.Lock()
1855                 t.stateMu.Lock()
1856         }
1857 }
1858
1859 func (cl *Client) announceTorrentDHT(t *torrent, impliedPort bool) {
1860         for cl.waitWantPeers(t) {
1861                 log.Printf("getting peers for %q from DHT", t)
1862                 ps, err := cl.dHT.Announce(string(t.InfoHash[:]), cl.incomingPeerPort(), impliedPort)
1863                 if err != nil {
1864                         log.Printf("error getting peers from dht: %s", err)
1865                         return
1866                 }
1867                 allAddrs := make(map[string]struct{})
1868         getPeers:
1869                 for {
1870                         select {
1871                         case v, ok := <-ps.Values:
1872                                 if !ok {
1873                                         break getPeers
1874                                 }
1875                                 peersFoundByDHT.Add(int64(len(v.Peers)))
1876                                 for _, p := range v.Peers {
1877                                         allAddrs[(&net.UDPAddr{
1878                                                 IP:   p.IP[:],
1879                                                 Port: int(p.Port),
1880                                         }).String()] = struct{}{}
1881                                 }
1882                                 // log.Printf("%s: %d new peers from DHT", t, len(v.Peers))
1883                                 cl.mu.Lock()
1884                                 cl.addPeers(t, func() (ret []Peer) {
1885                                         for _, cp := range v.Peers {
1886                                                 ret = append(ret, Peer{
1887                                                         IP:     cp.IP[:],
1888                                                         Port:   int(cp.Port),
1889                                                         Source: peerSourceDHT,
1890                                                 })
1891                                         }
1892                                         return
1893                                 }())
1894                                 numPeers := len(t.Peers)
1895                                 cl.mu.Unlock()
1896                                 if numPeers >= torrentPeersHighWater {
1897                                         break getPeers
1898                                 }
1899                         case <-t.ceasingNetworking:
1900                                 ps.Close()
1901                                 return
1902                         }
1903                 }
1904                 ps.Close()
1905                 log.Printf("finished DHT peer scrape for %s: %d peers", t, len(allAddrs))
1906
1907         }
1908 }
1909
1910 func (cl *Client) announceTorrentSingleTracker(tr tracker.Client, req *tracker.AnnounceRequest, t *torrent) error {
1911         if err := tr.Connect(); err != nil {
1912                 return fmt.Errorf("error connecting: %s", err)
1913         }
1914         resp, err := tr.Announce(req)
1915         if err != nil {
1916                 return fmt.Errorf("error announcing: %s", err)
1917         }
1918         var peers []Peer
1919         for _, peer := range resp.Peers {
1920                 peers = append(peers, Peer{
1921                         IP:   peer.IP,
1922                         Port: peer.Port,
1923                 })
1924         }
1925         err = cl.AddPeers(t.InfoHash, peers)
1926         if err != nil {
1927                 log.Printf("error adding peers to torrent %s: %s", t, err)
1928         } else {
1929                 log.Printf("%s: %d new peers from %s", t, len(peers), tr)
1930         }
1931
1932         time.Sleep(time.Second * time.Duration(resp.Interval))
1933         return nil
1934 }
1935
1936 func (cl *Client) announceTorrentTrackersFastStart(req *tracker.AnnounceRequest, trackers [][]tracker.Client, t *torrent) (atLeastOne bool) {
1937         oks := make(chan bool)
1938         outstanding := 0
1939         for _, tier := range trackers {
1940                 for _, tr := range tier {
1941                         outstanding++
1942                         go func(tr tracker.Client) {
1943                                 err := cl.announceTorrentSingleTracker(tr, req, t)
1944                                 oks <- err == nil
1945                         }(tr)
1946                 }
1947         }
1948         for outstanding > 0 {
1949                 ok := <-oks
1950                 outstanding--
1951                 if ok {
1952                         atLeastOne = true
1953                 }
1954         }
1955         return
1956 }
1957
1958 // Announce torrent to its trackers.
1959 func (cl *Client) announceTorrentTrackers(t *torrent) {
1960         req := tracker.AnnounceRequest{
1961                 Event:    tracker.Started,
1962                 NumWant:  -1,
1963                 Port:     int16(cl.incomingPeerPort()),
1964                 PeerId:   cl.peerID,
1965                 InfoHash: t.InfoHash,
1966         }
1967         cl.mu.RLock()
1968         req.Left = t.BytesLeft()
1969         trackers := t.Trackers
1970         cl.mu.RUnlock()
1971         if cl.announceTorrentTrackersFastStart(&req, trackers, t) {
1972                 req.Event = tracker.None
1973         }
1974 newAnnounce:
1975         for cl.waitWantPeers(t) {
1976                 cl.mu.RLock()
1977                 req.Left = t.BytesLeft()
1978                 trackers = t.Trackers
1979                 cl.mu.RUnlock()
1980                 numTrackersTried := 0
1981                 for _, tier := range trackers {
1982                         for trIndex, tr := range tier {
1983                                 numTrackersTried++
1984                                 err := cl.announceTorrentSingleTracker(tr, &req, t)
1985                                 if err != nil {
1986                                         continue
1987                                 }
1988                                 // Float the successful announce to the top of the tier. If
1989                                 // the trackers list has been changed, we'll be modifying an
1990                                 // old copy so it won't matter.
1991                                 cl.mu.Lock()
1992                                 tier[0], tier[trIndex] = tier[trIndex], tier[0]
1993                                 cl.mu.Unlock()
1994
1995                                 req.Event = tracker.None
1996                                 continue newAnnounce
1997                         }
1998                 }
1999                 if numTrackersTried != 0 {
2000                         log.Printf("%s: all trackers failed", t)
2001                 }
2002                 // TODO: Wait until trackers are added if there are none.
2003                 time.Sleep(10 * time.Second)
2004         }
2005 }
2006
2007 func (cl *Client) allTorrentsCompleted() bool {
2008         for _, t := range cl.torrents {
2009                 if !t.haveInfo() {
2010                         return false
2011                 }
2012                 if t.NumPiecesCompleted() != t.NumPieces() {
2013                         return false
2014                 }
2015         }
2016         return true
2017 }
2018
2019 // Returns true when all torrents are completely downloaded and false if the
2020 // client is stopped before that.
2021 func (me *Client) WaitAll() bool {
2022         me.mu.Lock()
2023         defer me.mu.Unlock()
2024         for !me.allTorrentsCompleted() {
2025                 if me.stopped() {
2026                         return false
2027                 }
2028                 me.event.Wait()
2029         }
2030         return true
2031 }
2032
2033 func (me *Client) replenishConnRequests(t *torrent, c *connection) {
2034         if !t.haveInfo() {
2035                 return
2036         }
2037         me.downloadStrategy.FillRequests(t, c)
2038         if len(c.Requests) == 0 && !c.PeerChoked {
2039                 c.SetInterested(false)
2040         }
2041 }
2042
2043 // Handle a received chunk from a peer.
2044 func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) error {
2045         chunksDownloadedCount.Add(1)
2046
2047         req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
2048
2049         // Request has been satisfied.
2050         me.connDeleteRequest(t, c, req)
2051
2052         defer me.replenishConnRequests(t, c)
2053
2054         // Do we actually want this chunk?
2055         if _, ok := t.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok {
2056                 unusedDownloadedChunksCount.Add(1)
2057                 c.UnwantedChunksReceived++
2058                 return nil
2059         }
2060
2061         c.UsefulChunksReceived++
2062         c.lastUsefulChunkReceived = time.Now()
2063
2064         // Write the chunk out.
2065         err := t.WriteChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
2066         if err != nil {
2067                 return fmt.Errorf("error writing chunk: %s", err)
2068         }
2069
2070         // Record that we have the chunk.
2071         delete(t.Pieces[req.Index].PendingChunkSpecs, req.chunkSpec)
2072         me.dataReady(t, req)
2073         if len(t.Pieces[req.Index].PendingChunkSpecs) == 0 {
2074                 for _, c := range t.Conns {
2075                         c.pieceRequestOrder.DeletePiece(int(req.Index))
2076                 }
2077                 me.queuePieceCheck(t, req.Index)
2078         }
2079
2080         // Unprioritize the chunk.
2081         me.downloadStrategy.TorrentGotChunk(t, req)
2082
2083         // Cancel pending requests for this chunk.
2084         for _, c := range t.Conns {
2085                 if me.connCancel(t, c, req) {
2086                         me.replenishConnRequests(t, c)
2087                 }
2088         }
2089
2090         me.downloadStrategy.AssertNotRequested(t, req)
2091
2092         return nil
2093 }
2094
2095 func (cl *Client) dataReady(t *torrent, r request) {
2096         dws := cl.dataWaits[t]
2097         begin := t.requestOffset(r)
2098         end := begin + int64(r.Length)
2099         for i := 0; i < len(dws); {
2100                 dw := dws[i]
2101                 if begin <= dw.offset && dw.offset < end {
2102                         close(dw.ready)
2103                         dws[i] = dws[len(dws)-1]
2104                         dws = dws[:len(dws)-1]
2105                 } else {
2106                         i++
2107                 }
2108         }
2109         cl.dataWaits[t] = dws
2110 }
2111
2112 // Returns a channel that is closed when new data has become available in the
2113 // client.
2114 func (me *Client) DataWaiter(ih InfoHash, off int64) (ret <-chan struct{}) {
2115         me.mu.Lock()
2116         defer me.mu.Unlock()
2117         ch := make(chan struct{})
2118         ret = ch
2119         t := me.torrents[ih]
2120         if t == nil {
2121                 close(ch)
2122                 return
2123         }
2124         if r, ok := t.offsetRequest(off); !ok || t.haveChunk(r) {
2125                 close(ch)
2126                 return
2127         }
2128         me.dataWaits[t] = append(me.dataWaits[t], dataWait{
2129                 offset: off,
2130                 ready:  ch,
2131         })
2132         return
2133 }
2134
2135 func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) {
2136         p := t.Pieces[piece]
2137         if p.EverHashed && !correct {
2138                 log.Printf("%s: piece %d failed hash", t, piece)
2139                 failedPieceHashes.Add(1)
2140         }
2141         p.EverHashed = true
2142         if correct {
2143                 p.Priority = piecePriorityNone
2144                 p.PendingChunkSpecs = nil
2145                 p.Event.Broadcast()
2146                 me.downloadStrategy.TorrentGotPiece(t, int(piece))
2147                 me.dataReady(t, request{
2148                         pp.Integer(piece),
2149                         chunkSpec{0, pp.Integer(t.PieceLength(piece))},
2150                 })
2151         } else {
2152                 if len(p.PendingChunkSpecs) == 0 {
2153                         t.pendAllChunkSpecs(piece)
2154                 }
2155                 if p.Priority != piecePriorityNone {
2156                         me.openNewConns(t)
2157                 }
2158         }
2159         for _, conn := range t.Conns {
2160                 if correct {
2161                         conn.Post(pp.Message{
2162                                 Type:  pp.Have,
2163                                 Index: pp.Integer(piece),
2164                         })
2165                         // TODO: Cancel requests for this piece.
2166                         for r := range conn.Requests {
2167                                 if r.Index == piece {
2168                                         panic("wat")
2169                                 }
2170                         }
2171                         conn.pieceRequestOrder.DeletePiece(int(piece))
2172                 }
2173                 if t.wantPiece(int(piece)) && conn.PeerHasPiece(piece) {
2174                         t.connPendPiece(conn, int(piece))
2175                         me.replenishConnRequests(t, conn)
2176                 }
2177         }
2178         if t.haveAllPieces() && me.noUpload {
2179                 t.CeaseNetworking()
2180         }
2181         me.event.Broadcast()
2182 }
2183
2184 func (cl *Client) verifyPiece(t *torrent, index pp.Integer) {
2185         cl.mu.Lock()
2186         defer cl.mu.Unlock()
2187         p := t.Pieces[index]
2188         for p.Hashing {
2189                 cl.event.Wait()
2190         }
2191         if t.isClosed() {
2192                 return
2193         }
2194         p.Hashing = true
2195         p.QueuedForHash = false
2196         cl.mu.Unlock()
2197         sum := t.HashPiece(index)
2198         cl.mu.Lock()
2199         select {
2200         case <-t.closing:
2201                 return
2202         default:
2203         }
2204         p.Hashing = false
2205         cl.pieceHashed(t, index, sum == p.Hash)
2206 }
2207
2208 func (me *Client) Torrents() (ret []Torrent) {
2209         me.mu.Lock()
2210         for _, t := range me.torrents {
2211                 ret = append(ret, Torrent{me, t})
2212         }
2213         me.mu.Unlock()
2214         return
2215 }