]> Sergey Matveev's repositories - btrtrc.git/blob - client.go
Make blob data stateful
[btrtrc.git] / client.go
1 /*
2 Package torrent implements a torrent client.
3
4 Simple example:
5
6         c := &Client{}
7         c.Start()
8         defer c.Stop()
9         if err := c.AddTorrent(externalMetaInfoPackageSux); err != nil {
10                 return fmt.Errors("error adding torrent: %s", err)
11         }
12         c.WaitAll()
13         log.Print("erhmahgerd, torrent downloaded")
14
15 */
16 package torrent
17
18 import (
19         "bufio"
20         "bytes"
21         "container/heap"
22         "crypto/rand"
23         "crypto/sha1"
24         "errors"
25         "expvar"
26         "fmt"
27         "io"
28         "log"
29         "math/big"
30         mathRand "math/rand"
31         "net"
32         "os"
33         "path/filepath"
34         "sort"
35         "strings"
36         "syscall"
37         "time"
38
39         filePkg "bitbucket.org/anacrolix/go.torrent/data/file"
40         "bitbucket.org/anacrolix/go.torrent/dht"
41         "bitbucket.org/anacrolix/go.torrent/internal/pieceordering"
42         "bitbucket.org/anacrolix/go.torrent/iplist"
43         pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
44         "bitbucket.org/anacrolix/go.torrent/tracker"
45         _ "bitbucket.org/anacrolix/go.torrent/tracker/udp"
46         . "bitbucket.org/anacrolix/go.torrent/util"
47         "bitbucket.org/anacrolix/sync"
48         "bitbucket.org/anacrolix/utp"
49         "github.com/anacrolix/libtorgo/bencode"
50         "github.com/anacrolix/libtorgo/metainfo"
51 )
52
53 var (
54         unusedDownloadedChunksCount = expvar.NewInt("unusedDownloadedChunksCount")
55         chunksDownloadedCount       = expvar.NewInt("chunksDownloadedCount")
56         peersFoundByDHT             = expvar.NewInt("peersFoundByDHT")
57         peersFoundByPEX             = expvar.NewInt("peersFoundByPEX")
58         uploadChunksPosted          = expvar.NewInt("uploadChunksPosted")
59         unexpectedCancels           = expvar.NewInt("unexpectedCancels")
60         postedCancels               = expvar.NewInt("postedCancels")
61         duplicateConnsAvoided       = expvar.NewInt("duplicateConnsAvoided")
62         failedPieceHashes           = expvar.NewInt("failedPieceHashes")
63         unsuccessfulDials           = expvar.NewInt("unsuccessfulDials")
64         successfulDials             = expvar.NewInt("successfulDials")
65         acceptedConns               = expvar.NewInt("acceptedConns")
66         inboundConnsBlocked         = expvar.NewInt("inboundConnsBlocked")
67 )
68
69 const (
70         // Justification for set bits follows.
71         //
72         // Extension protocol: http://www.bittorrent.org/beps/bep_0010.html
73         // DHT: http://www.bittorrent.org/beps/bep_0005.html
74         extensionBytes = "\x00\x00\x00\x00\x00\x10\x00\x01"
75
76         socketsPerTorrent     = 40
77         torrentPeersHighWater = 200
78         torrentPeersLowWater  = 50
79
80         // Limit how long handshake can take. This is to reduce the lingering
81         // impact of a few bad apples. 4s loses 1% of successful handshakes that
82         // are obtained with 60s timeout, and 5% of unsuccessful handshakes.
83         handshakeTimeout = 4 * time.Second
84
85         pruneInterval = 10 * time.Second
86 )
87
88 // Currently doesn't really queue, but should in the future.
89 func (cl *Client) queuePieceCheck(t *torrent, pieceIndex pp.Integer) {
90         piece := t.Pieces[pieceIndex]
91         if piece.QueuedForHash {
92                 return
93         }
94         piece.QueuedForHash = true
95         go cl.verifyPiece(t, pieceIndex)
96 }
97
98 // Queue a piece check if one isn't already queued, and the piece has never
99 // been checked before.
100 func (cl *Client) queueFirstHash(t *torrent, piece int) {
101         p := t.Pieces[piece]
102         if p.EverHashed || p.Hashing || p.QueuedForHash || p.Complete() {
103                 return
104         }
105         cl.queuePieceCheck(t, pp.Integer(piece))
106 }
107
108 type Client struct {
109         noUpload         bool
110         dataDir          string
111         halfOpenLimit    int
112         peerID           [20]byte
113         listeners        []net.Listener
114         utpSock          *utp.Socket
115         disableTrackers  bool
116         downloadStrategy DownloadStrategy
117         dHT              *dht.Server
118         disableUTP       bool
119         disableTCP       bool
120         ipBlockList      *iplist.IPList
121         bannedTorrents   map[InfoHash]struct{}
122         _configDir       string
123         config           Config
124         pruneTimer       *time.Timer
125
126         torrentDataOpener TorrentDataOpener
127
128         mu    sync.RWMutex
129         event sync.Cond
130         quit  chan struct{}
131
132         handshaking int
133
134         torrents map[InfoHash]*torrent
135 }
136
137 func (me *Client) IPBlockList() *iplist.IPList {
138         me.mu.Lock()
139         defer me.mu.Unlock()
140         return me.ipBlockList
141 }
142
143 func (me *Client) SetIPBlockList(list *iplist.IPList) {
144         me.mu.Lock()
145         defer me.mu.Unlock()
146         me.ipBlockList = list
147         if me.dHT != nil {
148                 me.dHT.SetIPBlockList(list)
149         }
150 }
151
152 func (me *Client) PeerID() string {
153         return string(me.peerID[:])
154 }
155
156 func (me *Client) ListenAddr() (addr net.Addr) {
157         for _, l := range me.listeners {
158                 if addr != nil && l.Addr().String() != addr.String() {
159                         panic("listeners exist on different addresses")
160                 }
161                 addr = l.Addr()
162         }
163         return
164 }
165
166 type hashSorter struct {
167         Hashes []InfoHash
168 }
169
170 func (me hashSorter) Len() int {
171         return len(me.Hashes)
172 }
173
174 func (me hashSorter) Less(a, b int) bool {
175         return (&big.Int{}).SetBytes(me.Hashes[a][:]).Cmp((&big.Int{}).SetBytes(me.Hashes[b][:])) < 0
176 }
177
178 func (me hashSorter) Swap(a, b int) {
179         me.Hashes[a], me.Hashes[b] = me.Hashes[b], me.Hashes[a]
180 }
181
182 func (cl *Client) sortedTorrents() (ret []*torrent) {
183         var hs hashSorter
184         for ih := range cl.torrents {
185                 hs.Hashes = append(hs.Hashes, ih)
186         }
187         sort.Sort(hs)
188         for _, ih := range hs.Hashes {
189                 ret = append(ret, cl.torrent(ih))
190         }
191         return
192 }
193
194 func (cl *Client) WriteStatus(_w io.Writer) {
195         cl.mu.RLock()
196         defer cl.mu.RUnlock()
197         w := bufio.NewWriter(_w)
198         defer w.Flush()
199         if addr := cl.ListenAddr(); addr != nil {
200                 fmt.Fprintf(w, "Listening on %s\n", cl.ListenAddr())
201         } else {
202                 fmt.Println("Not listening!")
203         }
204         fmt.Fprintf(w, "Peer ID: %q\n", cl.peerID)
205         fmt.Fprintf(w, "Handshaking: %d\n", cl.handshaking)
206         if cl.dHT != nil {
207                 dhtStats := cl.dHT.Stats()
208                 fmt.Fprintf(w, "DHT nodes: %d (%d good)\n", dhtStats.NumNodes, dhtStats.NumGoodNodes)
209                 fmt.Fprintf(w, "DHT Server ID: %x\n", cl.dHT.IDString())
210                 fmt.Fprintf(w, "DHT port: %d\n", addrPort(cl.dHT.LocalAddr()))
211                 fmt.Fprintf(w, "DHT announces: %d\n", cl.dHT.NumConfirmedAnnounces)
212                 fmt.Fprintf(w, "Outstanding transactions: %d\n", dhtStats.NumOutstandingTransactions)
213         }
214         cl.downloadStrategy.WriteStatus(w)
215         fmt.Fprintln(w)
216         for _, t := range cl.sortedTorrents() {
217                 if t.Name() == "" {
218                         fmt.Fprint(w, "<unknown name>")
219                 } else {
220                         fmt.Fprint(w, t.Name())
221                 }
222                 fmt.Fprint(w, "\n")
223                 if t.haveInfo() {
224                         fmt.Fprintf(w, "%f%% of %d bytes", 100*(1-float32(t.bytesLeft())/float32(t.Length())), t.Length())
225                 } else {
226                         w.WriteString("<missing metainfo>")
227                 }
228                 fmt.Fprint(w, "\n")
229                 t.WriteStatus(w)
230                 fmt.Fprintln(w)
231         }
232 }
233
234 // Read torrent data at the given offset. Will block until it is available.
235 func (cl *Client) torrentReadAt(t *torrent, off int64, p []byte) (n int, err error) {
236         cl.mu.Lock()
237         defer cl.mu.Unlock()
238         index := int(off / int64(t.usualPieceSize()))
239         // Reading outside the bounds of a file is an error.
240         if index < 0 {
241                 err = os.ErrInvalid
242                 return
243         }
244         if int(index) >= len(t.Pieces) {
245                 err = io.EOF
246                 return
247         }
248         piece := t.Pieces[index]
249         pieceOff := pp.Integer(off % int64(t.usualPieceSize()))
250         pieceLeft := int(t.PieceLength(pp.Integer(index)) - pieceOff)
251         if pieceLeft <= 0 {
252                 err = io.EOF
253                 return
254         }
255         cl.readRaisePiecePriorities(t, off, int64(len(p)))
256         if len(p) > pieceLeft {
257                 p = p[:pieceLeft]
258         }
259         if len(p) == 0 {
260                 panic(len(p))
261         }
262         for !piece.Complete() && !t.isClosed() {
263                 piece.Event.Wait()
264         }
265         return t.data.ReadAt(p, off)
266 }
267
268 func (cl *Client) readRaisePiecePriorities(t *torrent, off, _len int64) {
269         index := int(off / int64(t.usualPieceSize()))
270         cl.raisePiecePriority(t, index, piecePriorityNow)
271         index++
272         if index >= t.numPieces() {
273                 return
274         }
275         cl.raisePiecePriority(t, index, piecePriorityNext)
276         for i := 0; i < t.numConnsUnchoked()/2; i++ {
277                 index++
278                 if index >= t.numPieces() {
279                         break
280                 }
281                 cl.raisePiecePriority(t, index, piecePriorityReadahead)
282         }
283 }
284
285 func (cl *Client) configDir() string {
286         if cl._configDir == "" {
287                 return filepath.Join(os.Getenv("HOME"), ".config/torrent")
288         }
289         return cl._configDir
290 }
291
292 func (cl *Client) ConfigDir() string {
293         return cl.configDir()
294 }
295
296 func (t *torrent) connPendPiece(c *connection, piece int) {
297         c.pendPiece(piece, t.Pieces[piece].Priority)
298 }
299
300 func (cl *Client) raisePiecePriority(t *torrent, piece int, priority piecePriority) {
301         if t.Pieces[piece].Priority < priority {
302                 cl.prioritizePiece(t, piece, priority)
303         }
304 }
305
306 func (cl *Client) prioritizePiece(t *torrent, piece int, priority piecePriority) {
307         if t.havePiece(piece) {
308                 return
309         }
310         cl.queueFirstHash(t, piece)
311         t.Pieces[piece].Priority = priority
312         if t.wantPiece(piece) {
313                 for _, c := range t.Conns {
314                         if c.PeerHasPiece(pp.Integer(piece)) {
315                                 t.connPendPiece(c, piece)
316                                 cl.replenishConnRequests(t, c)
317                         }
318                 }
319         }
320 }
321
322 func (cl *Client) setEnvBlocklist() (err error) {
323         filename := os.Getenv("TORRENT_BLOCKLIST_FILE")
324         defaultBlocklist := filename == ""
325         if defaultBlocklist {
326                 filename = filepath.Join(cl.configDir(), "blocklist")
327         }
328         f, err := os.Open(filename)
329         if err != nil {
330                 if defaultBlocklist {
331                         err = nil
332                 }
333                 return
334         }
335         defer f.Close()
336         var ranges []iplist.Range
337         uniqStrs := make(map[string]string)
338         scanner := bufio.NewScanner(f)
339         for scanner.Scan() {
340                 r, ok, lineErr := iplist.ParseBlocklistP2PLine(scanner.Bytes())
341                 if lineErr != nil {
342                         err = fmt.Errorf("error reading torrent blocklist line: %s", lineErr)
343                         return
344                 }
345                 if !ok {
346                         continue
347                 }
348                 if s, ok := uniqStrs[r.Description]; ok {
349                         r.Description = s
350                 } else {
351                         uniqStrs[r.Description] = r.Description
352                 }
353                 ranges = append(ranges, r)
354         }
355         err = scanner.Err()
356         if err != nil {
357                 err = fmt.Errorf("error reading torrent blocklist: %s", err)
358                 return
359         }
360         cl.ipBlockList = iplist.New(ranges)
361         return
362 }
363
364 func (cl *Client) initBannedTorrents() error {
365         f, err := os.Open(filepath.Join(cl.configDir(), "banned_infohashes"))
366         if err != nil {
367                 if os.IsNotExist(err) {
368                         return nil
369                 }
370                 return fmt.Errorf("error opening banned infohashes file: %s", err)
371         }
372         defer f.Close()
373         scanner := bufio.NewScanner(f)
374         cl.bannedTorrents = make(map[InfoHash]struct{})
375         for scanner.Scan() {
376                 var ihs string
377                 n, err := fmt.Sscanf(scanner.Text(), "%x", &ihs)
378                 if err != nil {
379                         return fmt.Errorf("error reading infohash: %s", err)
380                 }
381                 if n != 1 {
382                         continue
383                 }
384                 if len(ihs) != 20 {
385                         return errors.New("bad infohash")
386                 }
387                 var ih InfoHash
388                 CopyExact(&ih, ihs)
389                 cl.bannedTorrents[ih] = struct{}{}
390         }
391         if err := scanner.Err(); err != nil {
392                 return fmt.Errorf("error scanning file: %s", err)
393         }
394         return nil
395 }
396
397 func NewClient(cfg *Config) (cl *Client, err error) {
398         if cfg == nil {
399                 cfg = &Config{}
400         }
401
402         cl = &Client{
403                 noUpload:         cfg.NoUpload,
404                 disableTrackers:  cfg.DisableTrackers,
405                 downloadStrategy: cfg.DownloadStrategy,
406                 halfOpenLimit:    socketsPerTorrent,
407                 dataDir:          cfg.DataDir,
408                 disableUTP:       cfg.DisableUTP,
409                 disableTCP:       cfg.DisableTCP,
410                 _configDir:       cfg.ConfigDir,
411                 config:           *cfg,
412                 torrentDataOpener: func(md *metainfo.Info) (StatelessData, error) {
413                         return filePkg.TorrentData(md, cfg.DataDir), nil
414                 },
415
416                 quit:     make(chan struct{}),
417                 torrents: make(map[InfoHash]*torrent),
418         }
419         cl.event.L = &cl.mu
420         if cfg.TorrentDataOpener != nil {
421                 cl.torrentDataOpener = cfg.TorrentDataOpener
422         }
423
424         if !cfg.NoDefaultBlocklist {
425                 err = cl.setEnvBlocklist()
426                 if err != nil {
427                         return
428                 }
429         }
430
431         if err = cl.initBannedTorrents(); err != nil {
432                 err = fmt.Errorf("error initing banned torrents: %s", err)
433                 return
434         }
435
436         if cfg.PeerID != "" {
437                 CopyExact(&cl.peerID, cfg.PeerID)
438         } else {
439                 o := copy(cl.peerID[:], BEP20)
440                 _, err = rand.Read(cl.peerID[o:])
441                 if err != nil {
442                         panic("error generating peer id")
443                 }
444         }
445
446         if cl.downloadStrategy == nil {
447                 cl.downloadStrategy = &DefaultDownloadStrategy{}
448         }
449
450         // Returns the laddr string to listen on for the next Listen call.
451         listenAddr := func() string {
452                 if addr := cl.ListenAddr(); addr != nil {
453                         return addr.String()
454                 }
455                 if cfg.ListenAddr == "" {
456                         return ":50007"
457                 }
458                 return cfg.ListenAddr
459         }
460         if !cl.disableTCP {
461                 var l net.Listener
462                 l, err = net.Listen("tcp", listenAddr())
463                 if err != nil {
464                         return
465                 }
466                 cl.listeners = append(cl.listeners, l)
467                 go cl.acceptConnections(l, false)
468         }
469         if !cl.disableUTP {
470                 cl.utpSock, err = utp.NewSocket(listenAddr())
471                 if err != nil {
472                         return
473                 }
474                 cl.listeners = append(cl.listeners, cl.utpSock)
475                 go cl.acceptConnections(cl.utpSock, true)
476         }
477         if !cfg.NoDHT {
478                 dhtCfg := cfg.DHTConfig
479                 if dhtCfg == nil {
480                         dhtCfg = &dht.ServerConfig{}
481                 }
482                 if dhtCfg.Addr == "" {
483                         dhtCfg.Addr = listenAddr()
484                 }
485                 if dhtCfg.Conn == nil && cl.utpSock != nil {
486                         dhtCfg.Conn = cl.utpSock.PacketConn()
487                 }
488                 cl.dHT, err = dht.NewServer(dhtCfg)
489                 if cl.ipBlockList != nil {
490                         cl.dHT.SetIPBlockList(cl.ipBlockList)
491                 }
492                 if err != nil {
493                         return
494                 }
495         }
496
497         return
498 }
499
500 func (cl *Client) stopped() bool {
501         select {
502         case <-cl.quit:
503                 return true
504         default:
505                 return false
506         }
507 }
508
509 // Stops the client. All connections to peers are closed and all activity will
510 // come to a halt.
511 func (me *Client) Stop() {
512         me.mu.Lock()
513         defer me.mu.Unlock()
514         close(me.quit)
515         for _, l := range me.listeners {
516                 l.Close()
517         }
518         me.event.Broadcast()
519         for _, t := range me.torrents {
520                 t.close()
521         }
522 }
523
524 var ipv6BlockRange = iplist.Range{Description: "non-IPv4 address"}
525
526 func (cl *Client) ipBlockRange(ip net.IP) (r *iplist.Range) {
527         if cl.ipBlockList == nil {
528                 return
529         }
530         ip = ip.To4()
531         if ip == nil {
532                 log.Printf("saw non-IPv4 address")
533                 r = &ipv6BlockRange
534                 return
535         }
536         r = cl.ipBlockList.Lookup(ip)
537         return
538 }
539
540 func (cl *Client) acceptConnections(l net.Listener, utp bool) {
541         for {
542                 // We accept all connections immediately, because we don't know what
543                 // torrent they're for.
544                 conn, err := l.Accept()
545                 select {
546                 case <-cl.quit:
547                         if conn != nil {
548                                 conn.Close()
549                         }
550                         return
551                 default:
552                 }
553                 if err != nil {
554                         log.Print(err)
555                         return
556                 }
557                 acceptedConns.Add(1)
558                 cl.mu.RLock()
559                 blockRange := cl.ipBlockRange(AddrIP(conn.RemoteAddr()))
560                 cl.mu.RUnlock()
561                 if blockRange != nil {
562                         inboundConnsBlocked.Add(1)
563                         log.Printf("inbound connection from %s blocked by %s", conn.RemoteAddr(), blockRange)
564                         conn.Close()
565                         continue
566                 }
567                 go func() {
568                         if err := cl.runConnection(conn, nil, peerSourceIncoming, utp); err != nil {
569                                 log.Print(err)
570                         }
571                 }()
572         }
573 }
574
575 func (me *Client) torrent(ih InfoHash) *torrent {
576         for _, t := range me.torrents {
577                 if t.InfoHash == ih {
578                         return t
579                 }
580         }
581         return nil
582 }
583
584 type dialResult struct {
585         net.Conn
586         UTP bool
587 }
588
589 func doDial(dial func(addr string) (net.Conn, error), ch chan dialResult, utp bool, addr string) {
590         conn, err := dial(addr)
591         if err != nil {
592                 if conn != nil {
593                         conn.Close()
594                 }
595                 conn = nil // Pedantic
596         }
597         ch <- dialResult{conn, utp}
598         if err == nil {
599                 successfulDials.Add(1)
600                 return
601         }
602         unsuccessfulDials.Add(1)
603         if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
604                 return
605         }
606         if netOpErr, ok := err.(*net.OpError); ok {
607                 switch netOpErr.Err {
608                 case syscall.ECONNREFUSED, syscall.EHOSTUNREACH:
609                         return
610                 }
611         }
612         if err != nil {
613                 log.Printf("error dialing %s: %s", addr, err)
614                 return
615         }
616 }
617
618 func reducedDialTimeout(max time.Duration, halfOpenLimit int, pendingPeers int) (ret time.Duration) {
619         ret = max / time.Duration((pendingPeers+halfOpenLimit)/halfOpenLimit)
620         if ret < minDialTimeout {
621                 ret = minDialTimeout
622         }
623         return
624 }
625
626 // Start the process of connecting to the given peer for the given torrent if
627 // appropriate.
628 func (me *Client) initiateConn(peer Peer, t *torrent) {
629         if peer.Id == me.peerID {
630                 return
631         }
632         addr := net.JoinHostPort(peer.IP.String(), fmt.Sprintf("%d", peer.Port))
633         if t.addrActive(addr) {
634                 duplicateConnsAvoided.Add(1)
635                 return
636         }
637         if r := me.ipBlockRange(peer.IP); r != nil {
638                 log.Printf("outbound connect to %s blocked by IP blocklist rule %s", peer.IP, r)
639                 return
640         }
641         dialTimeout := reducedDialTimeout(nominalDialTimeout, me.halfOpenLimit, len(t.Peers))
642         t.HalfOpen[addr] = struct{}{}
643         go func() {
644                 // Binding to the listen address and dialing via net.Dialer gives
645                 // "address in use" error. It seems it's not possible to dial out from
646                 // this address so that peers associate our local address with our
647                 // listen address.
648
649                 // Initiate connections via TCP and UTP simultaneously. Use the first
650                 // one that succeeds.
651                 left := 0
652                 if !me.disableUTP {
653                         left++
654                 }
655                 if !me.disableTCP {
656                         left++
657                 }
658                 resCh := make(chan dialResult, left)
659                 if !me.disableUTP {
660                         go doDial(func(addr string) (net.Conn, error) {
661                                 return me.utpSock.DialTimeout(addr, dialTimeout)
662                         }, resCh, true, addr)
663                 }
664                 if !me.disableTCP {
665                         go doDial(func(addr string) (net.Conn, error) {
666                                 // time.Sleep(time.Second) // Give uTP a bit of a head start.
667                                 return net.DialTimeout("tcp", addr, dialTimeout)
668                         }, resCh, false, addr)
669                 }
670                 var res dialResult
671                 for ; left > 0 && res.Conn == nil; left-- {
672                         res = <-resCh
673                 }
674                 // Whether or not the connection attempt succeeds, the half open
675                 // counter should be decremented, and new connection attempts made.
676                 go func() {
677                         me.mu.Lock()
678                         defer me.mu.Unlock()
679                         if _, ok := t.HalfOpen[addr]; !ok {
680                                 panic("invariant broken")
681                         }
682                         delete(t.HalfOpen, addr)
683                         me.openNewConns(t)
684                 }()
685                 if res.Conn == nil {
686                         return
687                 }
688                 if left > 0 {
689                         go func() {
690                                 for ; left > 0; left-- {
691                                         conn := (<-resCh).Conn
692                                         if conn != nil {
693                                                 conn.Close()
694                                         }
695                                 }
696                         }()
697                 }
698
699                 // log.Printf("connected to %s", conn.RemoteAddr())
700                 err := me.runConnection(res.Conn, t, peer.Source, res.UTP)
701                 if err != nil {
702                         log.Print(err)
703                 }
704         }()
705 }
706
707 // The port number for incoming peer connections. 0 if the client isn't
708 // listening.
709 func (cl *Client) incomingPeerPort() int {
710         listenAddr := cl.ListenAddr()
711         if listenAddr == nil {
712                 return 0
713         }
714         return addrPort(listenAddr)
715 }
716
717 // Convert a net.Addr to its compact IP representation. Either 4 or 16 bytes
718 // per "yourip" field of http://www.bittorrent.org/beps/bep_0010.html.
719 func addrCompactIP(addr net.Addr) (string, error) {
720         host, _, err := net.SplitHostPort(addr.String())
721         if err != nil {
722                 return "", err
723         }
724         ip := net.ParseIP(host)
725         if v4 := ip.To4(); v4 != nil {
726                 if len(v4) != 4 {
727                         panic(v4)
728                 }
729                 return string(v4), nil
730         }
731         return string(ip.To16()), nil
732 }
733
734 func handshakeWriter(w io.WriteCloser, bb <-chan []byte, done chan<- error) {
735         var err error
736         for b := range bb {
737                 _, err = w.Write(b)
738                 if err != nil {
739                         w.Close()
740                         break
741                 }
742         }
743         done <- err
744 }
745
746 type peerExtensionBytes [8]byte
747 type peerID [20]byte
748
749 type handshakeResult struct {
750         peerExtensionBytes
751         peerID
752         InfoHash
753 }
754
755 // ih is nil if we expect the peer to declare the InfoHash, such as when the
756 // peer initiated the connection. Returns ok if the handshake was successful,
757 // and err if there was an unexpected condition other than the peer simply
758 // abandoning the handshake.
759 func handshake(sock io.ReadWriteCloser, ih *InfoHash, peerID [20]byte) (res handshakeResult, ok bool, err error) {
760         // Bytes to be sent to the peer. Should never block the sender.
761         postCh := make(chan []byte, 4)
762         // A single error value sent when the writer completes.
763         writeDone := make(chan error, 1)
764         // Performs writes to the socket and ensures posts don't block.
765         go handshakeWriter(sock, postCh, writeDone)
766
767         defer func() {
768                 close(postCh) // Done writing.
769                 if !ok {
770                         return
771                 }
772                 if err != nil {
773                         panic(err)
774                 }
775                 // Wait until writes complete before returning from handshake.
776                 err = <-writeDone
777                 if err != nil {
778                         err = fmt.Errorf("error writing during handshake: %s", err)
779                 }
780         }()
781
782         post := func(bb []byte) {
783                 select {
784                 case postCh <- bb:
785                 default:
786                         panic("mustn't block while posting")
787                 }
788         }
789
790         post([]byte(pp.Protocol))
791         post([]byte(extensionBytes))
792         if ih != nil { // We already know what we want.
793                 post(ih[:])
794                 post(peerID[:])
795         }
796         var b [68]byte
797         _, err = io.ReadFull(sock, b[:68])
798         if err != nil {
799                 err = nil
800                 return
801         }
802         if string(b[:20]) != pp.Protocol {
803                 return
804         }
805         CopyExact(&res.peerExtensionBytes, b[20:28])
806         CopyExact(&res.InfoHash, b[28:48])
807         CopyExact(&res.peerID, b[48:68])
808
809         if ih == nil { // We were waiting for the peer to tell us what they wanted.
810                 post(res.InfoHash[:])
811                 post(peerID[:])
812         }
813
814         ok = true
815         return
816 }
817
818 // Wraps a raw connection and provides the interface we want for using the
819 // connection in the message loop.
820 type peerConn struct {
821         net.Conn
822 }
823
824 func (pc peerConn) Read(b []byte) (n int, err error) {
825         // Keep-alives should be received every 2 mins. Give a bit of gracetime.
826         err = pc.Conn.SetReadDeadline(time.Now().Add(150 * time.Second))
827         if err != nil {
828                 err = fmt.Errorf("error setting read deadline: %s", err)
829         }
830         n, err = pc.Conn.Read(b)
831         // Convert common errors into io.EOF.
832         if err != nil {
833                 if opError, ok := err.(*net.OpError); ok && opError.Op == "read" && opError.Err == syscall.ECONNRESET {
834                         err = io.EOF
835                 } else if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
836                         if n != 0 {
837                                 panic(n)
838                         }
839                         err = io.EOF
840                 }
841         }
842         return
843 }
844
845 func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerSource, uTP bool) (err error) {
846         if tcpConn, ok := sock.(*net.TCPConn); ok {
847                 tcpConn.SetLinger(0)
848         }
849         defer sock.Close()
850         err = sock.SetDeadline(time.Now().Add(handshakeTimeout))
851         if err != nil {
852                 err = fmt.Errorf("couldn't set handshake deadline: %s", err)
853                 return
854         }
855         me.mu.Lock()
856         me.handshaking++
857         me.mu.Unlock()
858         hsRes, ok, err := handshake(sock, func() *InfoHash {
859                 if torrent == nil {
860                         return nil
861                 } else {
862                         return &torrent.InfoHash
863                 }
864         }(), me.peerID)
865         me.mu.Lock()
866         defer me.mu.Unlock()
867         if me.handshaking == 0 {
868                 panic("handshake count invariant is broken")
869         }
870         me.handshaking--
871         if err != nil {
872                 err = fmt.Errorf("error during handshake: %s", err)
873                 return
874         }
875         if !ok {
876                 return
877         }
878         if hsRes.peerID == me.peerID {
879                 return
880         }
881         torrent = me.torrent(hsRes.InfoHash)
882         if torrent == nil {
883                 return
884         }
885         sock.SetWriteDeadline(time.Time{})
886         sock = peerConn{sock}
887         conn := newConnection(sock, hsRes.peerExtensionBytes, hsRes.peerID, uTP)
888         defer conn.Close()
889         conn.Discovery = discovery
890         if !me.addConnection(torrent, conn) {
891                 return
892         }
893         if conn.PeerExtensionBytes[5]&0x10 != 0 {
894                 conn.Post(pp.Message{
895                         Type:       pp.Extended,
896                         ExtendedID: pp.HandshakeExtendedID,
897                         ExtendedPayload: func() []byte {
898                                 d := map[string]interface{}{
899                                         "m": map[string]int{
900                                                 "ut_metadata": 1,
901                                                 "ut_pex":      2,
902                                         },
903                                         "v": "go.torrent dev 20140825", // Just the date
904                                         // No upload queue is implemented yet.
905                                         "reqq": func() int {
906                                                 if me.noUpload {
907                                                         // No need to look strange if it costs us nothing.
908                                                         return 250
909                                                 } else {
910                                                         return 1
911                                                 }
912                                         }(),
913                                 }
914                                 if torrent.metadataSizeKnown() {
915                                         d["metadata_size"] = torrent.metadataSize()
916                                 }
917                                 if p := me.incomingPeerPort(); p != 0 {
918                                         d["p"] = p
919                                 }
920                                 yourip, err := addrCompactIP(conn.Socket.RemoteAddr())
921                                 if err != nil {
922                                         log.Printf("error calculating yourip field value in extension handshake: %s", err)
923                                 } else {
924                                         d["yourip"] = yourip
925                                 }
926                                 // log.Printf("sending %v", d)
927                                 b, err := bencode.Marshal(d)
928                                 if err != nil {
929                                         panic(err)
930                                 }
931                                 return b
932                         }(),
933                 })
934         }
935         if torrent.haveAnyPieces() {
936                 conn.Post(pp.Message{
937                         Type:     pp.Bitfield,
938                         Bitfield: torrent.bitfield(),
939                 })
940         }
941         if conn.PeerExtensionBytes[7]&0x01 != 0 && me.dHT != nil {
942                 conn.Post(pp.Message{
943                         Type: pp.Port,
944                         Port: uint16(AddrPort(me.dHT.LocalAddr())),
945                 })
946         }
947         if torrent.haveInfo() {
948                 torrent.initRequestOrdering(conn)
949                 me.replenishConnRequests(torrent, conn)
950         }
951         err = me.connectionLoop(torrent, conn)
952         if err != nil {
953                 err = fmt.Errorf("during Connection loop with peer %q: %s", conn.PeerID, err)
954         }
955         me.dropConnection(torrent, conn)
956         return
957 }
958
959 func (t *torrent) initRequestOrdering(c *connection) {
960         if c.pieceRequestOrder != nil || c.piecePriorities != nil {
961                 panic("double init of request ordering")
962         }
963         c.piecePriorities = mathRand.Perm(t.numPieces())
964         c.pieceRequestOrder = pieceordering.New()
965         for i := 0; i < t.numPieces(); i++ {
966                 if !c.PeerHasPiece(pp.Integer(i)) {
967                         continue
968                 }
969                 if !t.wantPiece(i) {
970                         continue
971                 }
972                 t.connPendPiece(c, i)
973         }
974 }
975
976 func (me *Client) peerGotPiece(t *torrent, c *connection, piece int) {
977         if t.haveInfo() {
978                 if c.PeerPieces == nil {
979                         c.PeerPieces = make([]bool, t.numPieces())
980                 }
981         } else {
982                 for piece >= len(c.PeerPieces) {
983                         c.PeerPieces = append(c.PeerPieces, false)
984                 }
985         }
986         c.PeerPieces[piece] = true
987         if t.wantPiece(piece) {
988                 t.connPendPiece(c, piece)
989                 me.replenishConnRequests(t, c)
990         }
991 }
992
993 func (me *Client) peerUnchoked(torrent *torrent, conn *connection) {
994         me.replenishConnRequests(torrent, conn)
995 }
996
997 func (cl *Client) connCancel(t *torrent, cn *connection, r request) (ok bool) {
998         ok = cn.Cancel(r)
999         if ok {
1000                 postedCancels.Add(1)
1001                 cl.downloadStrategy.DeleteRequest(t, r)
1002         }
1003         return
1004 }
1005
1006 func (cl *Client) connDeleteRequest(t *torrent, cn *connection, r request) {
1007         if !cn.RequestPending(r) {
1008                 return
1009         }
1010         cl.downloadStrategy.DeleteRequest(t, r)
1011         delete(cn.Requests, r)
1012 }
1013
1014 func (cl *Client) requestPendingMetadata(t *torrent, c *connection) {
1015         if t.haveInfo() {
1016                 return
1017         }
1018         var pending []int
1019         for index := 0; index < t.metadataPieceCount(); index++ {
1020                 if !t.haveMetadataPiece(index) {
1021                         pending = append(pending, index)
1022                 }
1023         }
1024         for _, i := range mathRand.Perm(len(pending)) {
1025                 c.Post(pp.Message{
1026                         Type:       pp.Extended,
1027                         ExtendedID: byte(c.PeerExtensionIDs["ut_metadata"]),
1028                         ExtendedPayload: func() []byte {
1029                                 b, err := bencode.Marshal(map[string]int{
1030                                         "msg_type": 0,
1031                                         "piece":    pending[i],
1032                                 })
1033                                 if err != nil {
1034                                         panic(err)
1035                                 }
1036                                 return b
1037                         }(),
1038                 })
1039         }
1040 }
1041
1042 func (cl *Client) completedMetadata(t *torrent) {
1043         h := sha1.New()
1044         h.Write(t.MetaData)
1045         var ih InfoHash
1046         CopyExact(&ih, h.Sum(nil))
1047         if ih != t.InfoHash {
1048                 log.Print("bad metadata")
1049                 t.invalidateMetadata()
1050                 return
1051         }
1052         var info metainfo.Info
1053         err := bencode.Unmarshal(t.MetaData, &info)
1054         if err != nil {
1055                 log.Printf("error unmarshalling metadata: %s", err)
1056                 t.invalidateMetadata()
1057                 return
1058         }
1059         // TODO(anacrolix): If this fails, I think something harsher should be
1060         // done.
1061         err = cl.setMetaData(t, info, t.MetaData)
1062         if err != nil {
1063                 log.Printf("error setting metadata: %s", err)
1064                 t.invalidateMetadata()
1065                 return
1066         }
1067         log.Printf("%s: got metadata from peers", t)
1068 }
1069
1070 // Process incoming ut_metadata message.
1071 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *torrent, c *connection) (err error) {
1072         var d map[string]int
1073         err = bencode.Unmarshal(payload, &d)
1074         if err != nil {
1075                 err = fmt.Errorf("error unmarshalling payload: %s: %q", err, payload)
1076                 return
1077         }
1078         msgType, ok := d["msg_type"]
1079         if !ok {
1080                 err = errors.New("missing msg_type field")
1081                 return
1082         }
1083         piece := d["piece"]
1084         switch msgType {
1085         case pp.DataMetadataExtensionMsgType:
1086                 if t.haveInfo() {
1087                         break
1088                 }
1089                 begin := len(payload) - metadataPieceSize(d["total_size"], piece)
1090                 if begin < 0 || begin >= len(payload) {
1091                         log.Printf("got bad metadata piece")
1092                         break
1093                 }
1094                 t.SaveMetadataPiece(piece, payload[begin:])
1095                 c.UsefulChunksReceived++
1096                 c.lastUsefulChunkReceived = time.Now()
1097                 if !t.haveAllMetadataPieces() {
1098                         break
1099                 }
1100                 cl.completedMetadata(t)
1101         case pp.RequestMetadataExtensionMsgType:
1102                 if !t.haveMetadataPiece(piece) {
1103                         c.Post(t.newMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
1104                         break
1105                 }
1106                 start := (1 << 14) * piece
1107                 c.Post(t.newMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.MetaData[start:start+t.metadataPieceSize(piece)]))
1108         case pp.RejectMetadataExtensionMsgType:
1109         default:
1110                 err = errors.New("unknown msg_type value")
1111         }
1112         return
1113 }
1114
1115 type peerExchangeMessage struct {
1116         Added      CompactPeers   `bencode:"added"`
1117         AddedFlags []byte         `bencode:"added.f"`
1118         Dropped    []tracker.Peer `bencode:"dropped"`
1119 }
1120
1121 // Extracts the port as an integer from an address string.
1122 func addrPort(addr net.Addr) int {
1123         return AddrPort(addr)
1124 }
1125
1126 // Processes incoming bittorrent messages. The client lock is held upon entry
1127 // and exit.
1128 func (me *Client) connectionLoop(t *torrent, c *connection) error {
1129         decoder := pp.Decoder{
1130                 R:         bufio.NewReader(c.Socket),
1131                 MaxLength: 256 * 1024,
1132         }
1133         for {
1134                 me.mu.Unlock()
1135                 var msg pp.Message
1136                 err := decoder.Decode(&msg)
1137                 me.mu.Lock()
1138                 c.lastMessageReceived = time.Now()
1139                 select {
1140                 case <-c.closing:
1141                         return nil
1142                 default:
1143                 }
1144                 if err != nil {
1145                         if me.stopped() || err == io.EOF {
1146                                 return nil
1147                         }
1148                         return err
1149                 }
1150                 if msg.Keepalive {
1151                         continue
1152                 }
1153                 switch msg.Type {
1154                 case pp.Choke:
1155                         c.PeerChoked = true
1156                         for r := range c.Requests {
1157                                 me.connDeleteRequest(t, c, r)
1158                         }
1159                         // We can then reset our interest.
1160                         me.replenishConnRequests(t, c)
1161                 case pp.Unchoke:
1162                         c.PeerChoked = false
1163                         me.peerUnchoked(t, c)
1164                 case pp.Interested:
1165                         c.PeerInterested = true
1166                         // TODO: This should be done from a dedicated unchoking routine.
1167                         if me.noUpload {
1168                                 break
1169                         }
1170                         c.Unchoke()
1171                 case pp.NotInterested:
1172                         c.PeerInterested = false
1173                         c.Choke()
1174                 case pp.Have:
1175                         me.peerGotPiece(t, c, int(msg.Index))
1176                 case pp.Request:
1177                         if me.noUpload {
1178                                 break
1179                         }
1180                         if c.PeerRequests == nil {
1181                                 c.PeerRequests = make(map[request]struct{}, maxRequests)
1182                         }
1183                         request := newRequest(msg.Index, msg.Begin, msg.Length)
1184                         // TODO: Requests should be satisfied from a dedicated upload
1185                         // routine.
1186                         // c.PeerRequests[request] = struct{}{}
1187                         p := make([]byte, msg.Length)
1188                         n, err := t.data.ReadAt(p, int64(t.PieceLength(0))*int64(msg.Index)+int64(msg.Begin))
1189                         if err != nil {
1190                                 return fmt.Errorf("reading t data to serve request %q: %s", request, err)
1191                         }
1192                         if n != int(msg.Length) {
1193                                 return fmt.Errorf("bad request: %v", msg)
1194                         }
1195                         c.Post(pp.Message{
1196                                 Type:  pp.Piece,
1197                                 Index: msg.Index,
1198                                 Begin: msg.Begin,
1199                                 Piece: p,
1200                         })
1201                         uploadChunksPosted.Add(1)
1202                 case pp.Cancel:
1203                         req := newRequest(msg.Index, msg.Begin, msg.Length)
1204                         if !c.PeerCancel(req) {
1205                                 unexpectedCancels.Add(1)
1206                         }
1207                 case pp.Bitfield:
1208                         if c.PeerPieces != nil {
1209                                 err = errors.New("received unexpected bitfield")
1210                                 break
1211                         }
1212                         if t.haveInfo() {
1213                                 if len(msg.Bitfield) < t.numPieces() {
1214                                         err = errors.New("received invalid bitfield")
1215                                         break
1216                                 }
1217                                 msg.Bitfield = msg.Bitfield[:t.numPieces()]
1218                         }
1219                         c.PeerPieces = msg.Bitfield
1220                         for index, has := range c.PeerPieces {
1221                                 if has {
1222                                         me.peerGotPiece(t, c, index)
1223                                 }
1224                         }
1225                 case pp.Piece:
1226                         err = me.downloadedChunk(t, c, &msg)
1227                 case pp.Extended:
1228                         switch msg.ExtendedID {
1229                         case pp.HandshakeExtendedID:
1230                                 // TODO: Create a bencode struct for this.
1231                                 var d map[string]interface{}
1232                                 err = bencode.Unmarshal(msg.ExtendedPayload, &d)
1233                                 if err != nil {
1234                                         err = fmt.Errorf("error decoding extended message payload: %s", err)
1235                                         break
1236                                 }
1237                                 // log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d)
1238                                 if reqq, ok := d["reqq"]; ok {
1239                                         if i, ok := reqq.(int64); ok {
1240                                                 c.PeerMaxRequests = int(i)
1241                                         }
1242                                 }
1243                                 if v, ok := d["v"]; ok {
1244                                         c.PeerClientName = v.(string)
1245                                 }
1246                                 m, ok := d["m"]
1247                                 if !ok {
1248                                         err = errors.New("handshake missing m item")
1249                                         break
1250                                 }
1251                                 mTyped, ok := m.(map[string]interface{})
1252                                 if !ok {
1253                                         err = errors.New("handshake m value is not dict")
1254                                         break
1255                                 }
1256                                 if c.PeerExtensionIDs == nil {
1257                                         c.PeerExtensionIDs = make(map[string]int64, len(mTyped))
1258                                 }
1259                                 for name, v := range mTyped {
1260                                         id, ok := v.(int64)
1261                                         if !ok {
1262                                                 log.Printf("bad handshake m item extension ID type: %T", v)
1263                                                 continue
1264                                         }
1265                                         if id == 0 {
1266                                                 delete(c.PeerExtensionIDs, name)
1267                                         } else {
1268                                                 c.PeerExtensionIDs[name] = id
1269                                         }
1270                                 }
1271                                 metadata_sizeUntyped, ok := d["metadata_size"]
1272                                 if ok {
1273                                         metadata_size, ok := metadata_sizeUntyped.(int64)
1274                                         if !ok {
1275                                                 log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
1276                                         } else {
1277                                                 t.SetMetadataSize(metadata_size)
1278                                         }
1279                                 }
1280                                 if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
1281                                         me.requestPendingMetadata(t, c)
1282                                 }
1283                         case 1:
1284                                 err = me.gotMetadataExtensionMsg(msg.ExtendedPayload, t, c)
1285                                 if err != nil {
1286                                         err = fmt.Errorf("error handling metadata extension message: %s", err)
1287                                 }
1288                         case 2:
1289                                 var pexMsg peerExchangeMessage
1290                                 err := bencode.Unmarshal(msg.ExtendedPayload, &pexMsg)
1291                                 if err != nil {
1292                                         err = fmt.Errorf("error unmarshalling PEX message: %s", err)
1293                                         break
1294                                 }
1295                                 go func() {
1296                                         err := me.AddPeers(t.InfoHash, func() (ret []Peer) {
1297                                                 for _, cp := range pexMsg.Added {
1298                                                         p := Peer{
1299                                                                 IP:     make([]byte, 4),
1300                                                                 Port:   int(cp.Port),
1301                                                                 Source: peerSourcePEX,
1302                                                         }
1303                                                         if n := copy(p.IP, cp.IP[:]); n != 4 {
1304                                                                 panic(n)
1305                                                         }
1306                                                         ret = append(ret, p)
1307                                                 }
1308                                                 return
1309                                         }())
1310                                         if err != nil {
1311                                                 log.Printf("error adding PEX peers: %s", err)
1312                                                 return
1313                                         }
1314                                         peersFoundByPEX.Add(int64(len(pexMsg.Added)))
1315                                 }()
1316                         default:
1317                                 err = fmt.Errorf("unexpected extended message ID: %v", msg.ExtendedID)
1318                         }
1319                         if err != nil {
1320                                 // That client uses its own extension IDs for outgoing message
1321                                 // types, which is incorrect.
1322                                 if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) ||
1323                                         strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
1324                                         return nil
1325                                 }
1326                                 // log.Printf("peer extension map: %#v", c.PeerExtensionIDs)
1327                         }
1328                 case pp.Port:
1329                         if me.dHT == nil {
1330                                 break
1331                         }
1332                         pingAddr, err := net.ResolveUDPAddr("", c.Socket.RemoteAddr().String())
1333                         if err != nil {
1334                                 panic(err)
1335                         }
1336                         if msg.Port != 0 {
1337                                 pingAddr.Port = int(msg.Port)
1338                         }
1339                         _, err = me.dHT.Ping(pingAddr)
1340                 default:
1341                         err = fmt.Errorf("received unknown message type: %#v", msg.Type)
1342                 }
1343                 if err != nil {
1344                         return err
1345                 }
1346         }
1347 }
1348
1349 func (me *Client) dropConnection(torrent *torrent, conn *connection) {
1350         for r := range conn.Requests {
1351                 me.connDeleteRequest(torrent, conn, r)
1352         }
1353         conn.Close()
1354         for i0, c := range torrent.Conns {
1355                 if c != conn {
1356                         continue
1357                 }
1358                 i1 := len(torrent.Conns) - 1
1359                 if i0 != i1 {
1360                         torrent.Conns[i0] = torrent.Conns[i1]
1361                 }
1362                 torrent.Conns = torrent.Conns[:i1]
1363                 me.openNewConns(torrent)
1364                 return
1365         }
1366         panic("connection not found")
1367 }
1368
1369 func (me *Client) addConnection(t *torrent, c *connection) bool {
1370         if me.stopped() {
1371                 return false
1372         }
1373         select {
1374         case <-t.ceasingNetworking:
1375                 return false
1376         default:
1377         }
1378         if !me.wantConns(t) {
1379                 return false
1380         }
1381         for _, c0 := range t.Conns {
1382                 if c.PeerID == c0.PeerID {
1383                         // Already connected to a client with that ID.
1384                         return false
1385                 }
1386         }
1387         t.Conns = append(t.Conns, c)
1388         // TODO: This should probably be done by a routine that kills off bad
1389         // connections, and extra connections killed here instead.
1390         if len(t.Conns) > socketsPerTorrent {
1391                 wcs := t.worstConnsHeap()
1392                 heap.Pop(wcs).(*connection).Close()
1393         }
1394         return true
1395 }
1396
1397 func (t *torrent) needData() bool {
1398         if !t.haveInfo() {
1399                 return true
1400         }
1401         for i := range t.Pieces {
1402                 if t.wantPiece(i) {
1403                         return true
1404                 }
1405         }
1406         return false
1407 }
1408
1409 // TODO: I'm sure there's something here to do with seeding.
1410 func (t *torrent) badConn(c *connection) bool {
1411         if time.Now().Sub(c.completedHandshake) < 30*time.Second {
1412                 return false
1413         }
1414         if !t.haveInfo() {
1415                 return !c.supportsExtension("ut_metadata")
1416         }
1417         return !t.connHasWantedPieces(c)
1418 }
1419
1420 func (t *torrent) numGoodConns() (num int) {
1421         for _, c := range t.Conns {
1422                 if !t.badConn(c) {
1423                         num++
1424                 }
1425         }
1426         return
1427 }
1428
1429 func (me *Client) wantConns(t *torrent) bool {
1430         if !t.needData() && me.noUpload {
1431                 return false
1432         }
1433         if t.numGoodConns() >= socketsPerTorrent {
1434                 return false
1435         }
1436         return true
1437 }
1438
1439 func (me *Client) openNewConns(t *torrent) {
1440         select {
1441         case <-t.ceasingNetworking:
1442                 return
1443         default:
1444         }
1445         for len(t.Peers) != 0 {
1446                 if !me.wantConns(t) {
1447                         return
1448                 }
1449                 if len(t.HalfOpen) >= me.halfOpenLimit {
1450                         return
1451                 }
1452                 var (
1453                         k peersKey
1454                         p Peer
1455                 )
1456                 for k, p = range t.Peers {
1457                         break
1458                 }
1459                 delete(t.Peers, k)
1460                 me.initiateConn(p, t)
1461         }
1462         t.wantPeers.Broadcast()
1463 }
1464
1465 func (me *Client) addPeers(t *torrent, peers []Peer) {
1466         blocked := 0
1467         for i, p := range peers {
1468                 if me.ipBlockRange(p.IP) == nil {
1469                         continue
1470                 }
1471                 peers[i] = peers[len(peers)-1]
1472                 peers = peers[:len(peers)-1]
1473                 i--
1474                 blocked++
1475         }
1476         if blocked != 0 {
1477                 log.Printf("IP blocklist screened %d peers from being added", blocked)
1478         }
1479         t.AddPeers(peers)
1480         me.openNewConns(t)
1481 }
1482
1483 // Adds peers to the swarm for the torrent corresponding to infoHash.
1484 func (me *Client) AddPeers(infoHash InfoHash, peers []Peer) error {
1485         me.mu.Lock()
1486         defer me.mu.Unlock()
1487         t := me.torrent(infoHash)
1488         if t == nil {
1489                 return errors.New("no such torrent")
1490         }
1491         me.addPeers(t, peers)
1492         return nil
1493 }
1494
1495 func (cl *Client) torrentFileCachePath(ih InfoHash) string {
1496         return filepath.Join(cl.configDir(), "torrents", ih.HexString()+".torrent")
1497 }
1498
1499 func (cl *Client) saveTorrentFile(t *torrent) error {
1500         path := cl.torrentFileCachePath(t.InfoHash)
1501         os.MkdirAll(filepath.Dir(path), 0777)
1502         f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
1503         if err != nil {
1504                 return fmt.Errorf("error opening file: %s", err)
1505         }
1506         defer f.Close()
1507         e := bencode.NewEncoder(f)
1508         err = e.Encode(t.MetaInfo())
1509         if err != nil {
1510                 return fmt.Errorf("error marshalling metainfo: %s", err)
1511         }
1512         mi, err := cl.torrentCacheMetaInfo(t.InfoHash)
1513         if err != nil {
1514                 // For example, a script kiddy makes us load too many files, and we're
1515                 // able to save the torrent, but not load it again to check it.
1516                 return nil
1517         }
1518         if !bytes.Equal(mi.Info.Hash, t.InfoHash[:]) {
1519                 log.Fatalf("%x != %x", mi.Info.Hash, t.InfoHash[:])
1520         }
1521         return nil
1522 }
1523
1524 func (cl *Client) startTorrent(t *torrent) {
1525         if t.Info == nil || t.data == nil {
1526                 panic("nope")
1527         }
1528         // If the client intends to upload, it needs to know what state pieces are
1529         // in.
1530         if !cl.noUpload {
1531                 // Queue all pieces for hashing. This is done sequentially to avoid
1532                 // spamming goroutines.
1533                 for _, p := range t.Pieces {
1534                         p.QueuedForHash = true
1535                 }
1536                 go func() {
1537                         for i := range t.Pieces {
1538                                 cl.verifyPiece(t, pp.Integer(i))
1539                         }
1540                 }()
1541         }
1542         cl.downloadStrategy.TorrentStarted(t)
1543 }
1544
1545 // Storage cannot be changed once it's set.
1546 func (cl *Client) setStorage(t *torrent, td Data) (err error) {
1547         err = t.setStorage(td)
1548         cl.event.Broadcast()
1549         if err != nil {
1550                 return
1551         }
1552         cl.startTorrent(t)
1553         return
1554 }
1555
1556 type TorrentDataOpener func(*metainfo.Info) (StatelessData, error)
1557
1558 type statelessDataWrapper struct {
1559         StatelessData
1560 }
1561
1562 func (statelessDataWrapper) PieceComplete(int) bool   { return false }
1563 func (statelessDataWrapper) PieceCompleted(int) error { return nil }
1564
1565 var _ Data = statelessDataWrapper{}
1566
1567 func (cl *Client) setMetaData(t *torrent, md metainfo.Info, bytes []byte) (err error) {
1568         err = t.setMetadata(md, bytes, &cl.mu)
1569         if err != nil {
1570                 return
1571         }
1572         if !cl.config.DisableMetainfoCache {
1573                 if err := cl.saveTorrentFile(t); err != nil {
1574                         log.Printf("error saving torrent file for %s: %s", t, err)
1575                 }
1576         }
1577         if strings.Contains(strings.ToLower(md.Name), "porn") {
1578                 cl.dropTorrent(t.InfoHash)
1579                 err = errors.New("no porn plx")
1580                 return
1581         }
1582         close(t.gotMetainfo)
1583         stateless, err := cl.torrentDataOpener(&md)
1584         if err != nil {
1585                 return
1586         }
1587         td, ok := stateless.(Data)
1588         if !ok {
1589                 td = statelessDataWrapper{stateless}
1590         }
1591         err = cl.setStorage(t, td)
1592         return
1593 }
1594
1595 // Prepare a Torrent without any attachment to a Client. That means we can
1596 // initialize fields all fields that don't require the Client without locking
1597 // it.
1598 func newTorrent(ih InfoHash, announceList [][]string, halfOpenLimit int) (t *torrent, err error) {
1599         t = &torrent{
1600                 InfoHash: ih,
1601                 Peers:    make(map[peersKey]Peer),
1602
1603                 closing:           make(chan struct{}),
1604                 ceasingNetworking: make(chan struct{}),
1605
1606                 gotMetainfo: make(chan struct{}),
1607
1608                 HalfOpen: make(map[string]struct{}),
1609         }
1610         t.wantPeers.L = &t.stateMu
1611         t.GotMetainfo = t.gotMetainfo
1612         t.addTrackers(announceList)
1613         return
1614 }
1615
1616 func init() {
1617         // For shuffling the tracker tiers.
1618         mathRand.Seed(time.Now().Unix())
1619 }
1620
1621 // The trackers within each tier must be shuffled before use.
1622 // http://stackoverflow.com/a/12267471/149482
1623 // http://www.bittorrent.org/beps/bep_0012.html#order-of-processing
1624 func shuffleTier(tier []tracker.Client) {
1625         for i := range tier {
1626                 j := mathRand.Intn(i + 1)
1627                 tier[i], tier[j] = tier[j], tier[i]
1628         }
1629 }
1630
1631 func copyTrackers(base [][]tracker.Client) (copy [][]tracker.Client) {
1632         for _, tier := range base {
1633                 copy = append(copy, append([]tracker.Client{}, tier...))
1634         }
1635         return
1636 }
1637
1638 func mergeTier(tier []tracker.Client, newURLs []string) []tracker.Client {
1639 nextURL:
1640         for _, url := range newURLs {
1641                 for _, tr := range tier {
1642                         if tr.URL() == url {
1643                                 continue nextURL
1644                         }
1645                 }
1646                 tr, err := tracker.New(url)
1647                 if err != nil {
1648                         log.Printf("error creating tracker client for %q: %s", url, err)
1649                         continue
1650                 }
1651                 tier = append(tier, tr)
1652         }
1653         return tier
1654 }
1655
1656 func (t *torrent) addTrackers(announceList [][]string) {
1657         newTrackers := copyTrackers(t.Trackers)
1658         for tierIndex, tier := range announceList {
1659                 if tierIndex < len(newTrackers) {
1660                         newTrackers[tierIndex] = mergeTier(newTrackers[tierIndex], tier)
1661                 } else {
1662                         newTrackers = append(newTrackers, mergeTier(nil, tier))
1663                 }
1664                 shuffleTier(newTrackers[tierIndex])
1665         }
1666         t.Trackers = newTrackers
1667 }
1668
1669 type Torrent struct {
1670         cl *Client
1671         *torrent
1672 }
1673
1674 func (t Torrent) NumPieces() int {
1675         return t.numPieces()
1676 }
1677
1678 func (t Torrent) Drop() {
1679         t.cl.dropTorrent(t.InfoHash)
1680 }
1681
1682 type File struct {
1683         t      Torrent
1684         path   string
1685         offset int64
1686         length int64
1687         fi     metainfo.FileInfo
1688 }
1689
1690 func (f File) FileInfo() metainfo.FileInfo {
1691         return f.fi
1692 }
1693
1694 func (f File) Path() string {
1695         return f.path
1696 }
1697
1698 func (f File) ReadAt(p []byte, off int64) (n int, err error) {
1699         maxLen := f.length - off
1700         if int64(len(p)) > maxLen {
1701                 p = p[:maxLen]
1702         }
1703         return f.t.ReadAt(p, off+f.offset)
1704 }
1705
1706 func (f *File) Length() int64 {
1707         return f.length
1708 }
1709
1710 type FilePieceState struct {
1711         Length int64
1712         State  byte
1713 }
1714
1715 func (f *File) Progress() (ret []FilePieceState) {
1716         pieceSize := int64(f.t.usualPieceSize())
1717         off := f.offset % pieceSize
1718         remaining := f.length
1719         for i := int(f.offset / pieceSize); ; i++ {
1720                 if remaining == 0 {
1721                         break
1722                 }
1723                 len1 := pieceSize - off
1724                 if len1 > remaining {
1725                         len1 = remaining
1726                 }
1727                 ret = append(ret, FilePieceState{len1, f.t.pieceStatusChar(i)})
1728                 off = 0
1729                 remaining -= len1
1730         }
1731         return
1732 }
1733
1734 func (f *File) PrioritizeRegion(off, len int64) {
1735         if off < 0 || off >= f.length {
1736                 return
1737         }
1738         if off+len > f.length {
1739                 len = f.length - off
1740         }
1741         off += f.offset
1742         f.t.SetRegionPriority(off, len)
1743 }
1744
1745 // Returns handles to the files in the torrent. This requires the metainfo is
1746 // available first.
1747 func (t Torrent) Files() (ret []File) {
1748         select {
1749         case <-t.GotMetainfo:
1750         default:
1751                 return
1752         }
1753         var offset int64
1754         for _, fi := range t.Info.UpvertedFiles() {
1755                 ret = append(ret, File{
1756                         t,
1757                         strings.Join(append([]string{t.Info.Name}, fi.Path...), "/"),
1758                         offset,
1759                         fi.Length,
1760                         fi,
1761                 })
1762                 offset += fi.Length
1763         }
1764         return
1765 }
1766
1767 func (t Torrent) SetRegionPriority(off, len int64) {
1768         t.cl.mu.Lock()
1769         defer t.cl.mu.Unlock()
1770         pieceSize := int64(t.usualPieceSize())
1771         for i := off / pieceSize; i*pieceSize < off+len; i++ {
1772                 t.cl.prioritizePiece(t.torrent, int(i), piecePriorityNormal)
1773         }
1774 }
1775
1776 func (t Torrent) MetainfoFilepath() string {
1777         return filepath.Join(t.cl.ConfigDir(), "torrents", t.InfoHash.HexString()+".torrent")
1778 }
1779
1780 func (t Torrent) AddPeers(pp []Peer) error {
1781         return t.cl.AddPeers(t.torrent.InfoHash, pp)
1782 }
1783
1784 func (t Torrent) DownloadAll() {
1785         t.cl.mu.Lock()
1786         for i := 0; i < t.numPieces(); i++ {
1787                 // TODO: Leave higher priorities as they were?
1788                 t.cl.prioritizePiece(t.torrent, i, piecePriorityNormal)
1789         }
1790         // Nice to have the first and last pieces soon for various interactive
1791         // purposes.
1792         t.cl.prioritizePiece(t.torrent, 0, piecePriorityReadahead)
1793         t.cl.prioritizePiece(t.torrent, t.numPieces()-1, piecePriorityReadahead)
1794         t.cl.mu.Unlock()
1795 }
1796
1797 func (me Torrent) ReadAt(p []byte, off int64) (n int, err error) {
1798         return me.cl.torrentReadAt(me.torrent, off, p)
1799 }
1800
1801 // Returns nil metainfo if it isn't in the cache.
1802 func (cl *Client) torrentCacheMetaInfo(ih InfoHash) (mi *metainfo.MetaInfo, err error) {
1803         if cl.config.DisableMetainfoCache {
1804                 return
1805         }
1806         f, err := os.Open(cl.torrentFileCachePath(ih))
1807         if err != nil {
1808                 if os.IsNotExist(err) {
1809                         err = nil
1810                 }
1811                 return
1812         }
1813         defer f.Close()
1814         dec := bencode.NewDecoder(f)
1815         err = dec.Decode(&mi)
1816         if err != nil {
1817                 return
1818         }
1819         if !bytes.Equal(mi.Info.Hash, ih[:]) {
1820                 err = fmt.Errorf("cached torrent has wrong infohash: %x != %x", mi.Info.Hash, ih[:])
1821                 return
1822         }
1823         return
1824 }
1825
1826 func (cl *Client) AddMagnet(uri string) (T Torrent, err error) {
1827         m, err := ParseMagnetURI(uri)
1828         if err != nil {
1829                 return
1830         }
1831         mi, err := cl.torrentCacheMetaInfo(m.InfoHash)
1832         if err != nil {
1833                 log.Printf("error getting cached metainfo for %x: %s", m.InfoHash[:], err)
1834         } else if mi != nil {
1835                 _, err = cl.AddTorrent(mi)
1836                 if err != nil {
1837                         return
1838                 }
1839         }
1840         cl.mu.Lock()
1841         defer cl.mu.Unlock()
1842         T, err = cl.addOrMergeTorrent(m.InfoHash, [][]string{m.Trackers})
1843         if err != nil {
1844                 return
1845         }
1846         if m.DisplayName != "" {
1847                 T.DisplayName = m.DisplayName
1848         }
1849         return
1850 }
1851
1852 // Prunes unused connections. This is required to make space to dial for
1853 // replacements.
1854 func (cl *Client) pruneConnectionsUnlocked(t *torrent) {
1855         select {
1856         case <-t.ceasingNetworking:
1857                 return
1858         case <-t.closing:
1859                 return
1860         default:
1861         }
1862         cl.mu.Lock()
1863         license := len(t.Conns) - (socketsPerTorrent+1)/2
1864         for _, c := range t.Conns {
1865                 if license <= 0 {
1866                         break
1867                 }
1868                 if time.Now().Sub(c.lastUsefulChunkReceived) < time.Minute {
1869                         continue
1870                 }
1871                 if time.Now().Sub(c.completedHandshake) < time.Minute {
1872                         continue
1873                 }
1874                 c.Close()
1875                 license--
1876         }
1877         cl.mu.Unlock()
1878         t.pruneTimer.Reset(pruneInterval)
1879 }
1880
1881 func (me *Client) dropTorrent(infoHash InfoHash) (err error) {
1882         t, ok := me.torrents[infoHash]
1883         if !ok {
1884                 err = fmt.Errorf("no such torrent")
1885                 return
1886         }
1887         err = t.close()
1888         if err != nil {
1889                 panic(err)
1890         }
1891         delete(me.torrents, infoHash)
1892         me.downloadStrategy.TorrentStopped(t)
1893         return
1894 }
1895
1896 func (me *Client) addOrMergeTorrent(ih InfoHash, announceList [][]string) (T Torrent, err error) {
1897         if _, ok := me.bannedTorrents[ih]; ok {
1898                 err = errors.New("banned torrent")
1899                 return
1900         }
1901         T.cl = me
1902         var ok bool
1903         T.torrent, ok = me.torrents[ih]
1904         if ok {
1905                 T.torrent.addTrackers(announceList)
1906         } else {
1907                 T.torrent, err = newTorrent(ih, announceList, me.halfOpenLimit)
1908                 if err != nil {
1909                         return
1910                 }
1911                 me.torrents[ih] = T.torrent
1912                 if !me.disableTrackers {
1913                         go me.announceTorrentTrackers(T.torrent)
1914                 }
1915                 if me.dHT != nil {
1916                         go me.announceTorrentDHT(T.torrent, true)
1917                 }
1918                 T.torrent.pruneTimer = time.AfterFunc(0, func() {
1919                         me.pruneConnectionsUnlocked(T.torrent)
1920                 })
1921         }
1922         return
1923 }
1924
1925 // Adds a torrent to the client.
1926 func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) (t Torrent, err error) {
1927         var ih InfoHash
1928         CopyExact(&ih, metaInfo.Info.Hash)
1929         me.mu.Lock()
1930         defer me.mu.Unlock()
1931         t, err = me.addOrMergeTorrent(ih, metaInfo.AnnounceList)
1932         if err != nil {
1933                 return
1934         }
1935         if !t.torrent.haveInfo() {
1936                 err = me.setMetaData(t.torrent, metaInfo.Info.Info, metaInfo.Info.Bytes)
1937                 if err != nil {
1938                         return
1939                 }
1940         }
1941         return
1942 }
1943
1944 func (me *Client) AddTorrentFromFile(name string) (t Torrent, err error) {
1945         mi, err := metainfo.LoadFromFile(name)
1946         if err != nil {
1947                 err = fmt.Errorf("error loading metainfo from file: %s", err)
1948                 return
1949         }
1950         return me.AddTorrent(mi)
1951 }
1952
1953 // Returns true when peers are required, or false if the torrent is closing.
1954 func (cl *Client) waitWantPeers(t *torrent) bool {
1955         cl.mu.Lock()
1956         defer cl.mu.Unlock()
1957         t.stateMu.Lock()
1958         defer t.stateMu.Unlock()
1959         for {
1960                 select {
1961                 case <-t.ceasingNetworking:
1962                         return false
1963                 default:
1964                 }
1965                 if len(t.Peers) < torrentPeersLowWater && t.needData() {
1966                         return true
1967                 }
1968                 cl.mu.Unlock()
1969                 t.wantPeers.Wait()
1970                 t.stateMu.Unlock()
1971                 cl.mu.Lock()
1972                 t.stateMu.Lock()
1973         }
1974 }
1975
1976 func (cl *Client) announceTorrentDHT(t *torrent, impliedPort bool) {
1977         for cl.waitWantPeers(t) {
1978                 log.Printf("getting peers for %q from DHT", t)
1979                 ps, err := cl.dHT.Announce(string(t.InfoHash[:]), cl.incomingPeerPort(), impliedPort)
1980                 if err != nil {
1981                         log.Printf("error getting peers from dht: %s", err)
1982                         return
1983                 }
1984                 allAddrs := make(map[string]struct{})
1985         getPeers:
1986                 for {
1987                         select {
1988                         case v, ok := <-ps.Values:
1989                                 if !ok {
1990                                         break getPeers
1991                                 }
1992                                 peersFoundByDHT.Add(int64(len(v.Peers)))
1993                                 for _, p := range v.Peers {
1994                                         allAddrs[(&net.UDPAddr{
1995                                                 IP:   p.IP[:],
1996                                                 Port: int(p.Port),
1997                                         }).String()] = struct{}{}
1998                                 }
1999                                 // log.Printf("%s: %d new peers from DHT", t, len(v.Peers))
2000                                 cl.mu.Lock()
2001                                 cl.addPeers(t, func() (ret []Peer) {
2002                                         for _, cp := range v.Peers {
2003                                                 ret = append(ret, Peer{
2004                                                         IP:     cp.IP[:],
2005                                                         Port:   int(cp.Port),
2006                                                         Source: peerSourceDHT,
2007                                                 })
2008                                         }
2009                                         return
2010                                 }())
2011                                 numPeers := len(t.Peers)
2012                                 cl.mu.Unlock()
2013                                 if numPeers >= torrentPeersHighWater {
2014                                         break getPeers
2015                                 }
2016                         case <-t.ceasingNetworking:
2017                                 ps.Close()
2018                                 return
2019                         }
2020                 }
2021                 ps.Close()
2022                 log.Printf("finished DHT peer scrape for %s: %d peers", t, len(allAddrs))
2023
2024         }
2025 }
2026
2027 func (cl *Client) announceTorrentSingleTracker(tr tracker.Client, req *tracker.AnnounceRequest, t *torrent) error {
2028         if err := tr.Connect(); err != nil {
2029                 return fmt.Errorf("error connecting: %s", err)
2030         }
2031         resp, err := tr.Announce(req)
2032         if err != nil {
2033                 return fmt.Errorf("error announcing: %s", err)
2034         }
2035         var peers []Peer
2036         for _, peer := range resp.Peers {
2037                 peers = append(peers, Peer{
2038                         IP:   peer.IP,
2039                         Port: peer.Port,
2040                 })
2041         }
2042         err = cl.AddPeers(t.InfoHash, peers)
2043         if err != nil {
2044                 log.Printf("error adding peers to torrent %s: %s", t, err)
2045         } else {
2046                 log.Printf("%s: %d new peers from %s", t, len(peers), tr)
2047         }
2048
2049         time.Sleep(time.Second * time.Duration(resp.Interval))
2050         return nil
2051 }
2052
2053 func (cl *Client) announceTorrentTrackersFastStart(req *tracker.AnnounceRequest, trackers [][]tracker.Client, t *torrent) (atLeastOne bool) {
2054         oks := make(chan bool)
2055         outstanding := 0
2056         for _, tier := range trackers {
2057                 for _, tr := range tier {
2058                         outstanding++
2059                         go func(tr tracker.Client) {
2060                                 err := cl.announceTorrentSingleTracker(tr, req, t)
2061                                 oks <- err == nil
2062                         }(tr)
2063                 }
2064         }
2065         for outstanding > 0 {
2066                 ok := <-oks
2067                 outstanding--
2068                 if ok {
2069                         atLeastOne = true
2070                 }
2071         }
2072         return
2073 }
2074
2075 // Announce torrent to its trackers.
2076 func (cl *Client) announceTorrentTrackers(t *torrent) {
2077         req := tracker.AnnounceRequest{
2078                 Event:    tracker.Started,
2079                 NumWant:  -1,
2080                 Port:     int16(cl.incomingPeerPort()),
2081                 PeerId:   cl.peerID,
2082                 InfoHash: t.InfoHash,
2083         }
2084         if !cl.waitWantPeers(t) {
2085                 return
2086         }
2087         cl.mu.RLock()
2088         req.Left = t.bytesLeft()
2089         trackers := t.Trackers
2090         cl.mu.RUnlock()
2091         if cl.announceTorrentTrackersFastStart(&req, trackers, t) {
2092                 req.Event = tracker.None
2093         }
2094 newAnnounce:
2095         for cl.waitWantPeers(t) {
2096                 cl.mu.RLock()
2097                 req.Left = t.bytesLeft()
2098                 trackers = t.Trackers
2099                 cl.mu.RUnlock()
2100                 numTrackersTried := 0
2101                 for _, tier := range trackers {
2102                         for trIndex, tr := range tier {
2103                                 numTrackersTried++
2104                                 err := cl.announceTorrentSingleTracker(tr, &req, t)
2105                                 if err != nil {
2106                                         continue
2107                                 }
2108                                 // Float the successful announce to the top of the tier. If
2109                                 // the trackers list has been changed, we'll be modifying an
2110                                 // old copy so it won't matter.
2111                                 cl.mu.Lock()
2112                                 tier[0], tier[trIndex] = tier[trIndex], tier[0]
2113                                 cl.mu.Unlock()
2114
2115                                 req.Event = tracker.None
2116                                 continue newAnnounce
2117                         }
2118                 }
2119                 if numTrackersTried != 0 {
2120                         log.Printf("%s: all trackers failed", t)
2121                 }
2122                 // TODO: Wait until trackers are added if there are none.
2123                 time.Sleep(10 * time.Second)
2124         }
2125 }
2126
2127 func (cl *Client) allTorrentsCompleted() bool {
2128         for _, t := range cl.torrents {
2129                 if !t.haveInfo() {
2130                         return false
2131                 }
2132                 if t.numPiecesCompleted() != t.numPieces() {
2133                         return false
2134                 }
2135         }
2136         return true
2137 }
2138
2139 // Returns true when all torrents are completely downloaded and false if the
2140 // client is stopped before that.
2141 func (me *Client) WaitAll() bool {
2142         me.mu.Lock()
2143         defer me.mu.Unlock()
2144         for !me.allTorrentsCompleted() {
2145                 if me.stopped() {
2146                         return false
2147                 }
2148                 me.event.Wait()
2149         }
2150         return true
2151 }
2152
2153 func (me *Client) replenishConnRequests(t *torrent, c *connection) {
2154         if !t.haveInfo() {
2155                 return
2156         }
2157         me.downloadStrategy.FillRequests(t, c)
2158         if len(c.Requests) == 0 && !c.PeerChoked {
2159                 c.SetInterested(false)
2160         }
2161 }
2162
2163 // Handle a received chunk from a peer.
2164 func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) error {
2165         chunksDownloadedCount.Add(1)
2166
2167         req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
2168
2169         // Request has been satisfied.
2170         me.connDeleteRequest(t, c, req)
2171
2172         defer me.replenishConnRequests(t, c)
2173
2174         // Do we actually want this chunk?
2175         if _, ok := t.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok {
2176                 unusedDownloadedChunksCount.Add(1)
2177                 c.UnwantedChunksReceived++
2178                 return nil
2179         }
2180
2181         c.UsefulChunksReceived++
2182         c.lastUsefulChunkReceived = time.Now()
2183
2184         // Write the chunk out.
2185         err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
2186         if err != nil {
2187                 return fmt.Errorf("error writing chunk: %s", err)
2188         }
2189
2190         // Record that we have the chunk.
2191         delete(t.Pieces[req.Index].PendingChunkSpecs, req.chunkSpec)
2192         if len(t.Pieces[req.Index].PendingChunkSpecs) == 0 {
2193                 for _, c := range t.Conns {
2194                         c.pieceRequestOrder.DeletePiece(int(req.Index))
2195                 }
2196                 me.queuePieceCheck(t, req.Index)
2197         }
2198
2199         // Unprioritize the chunk.
2200         me.downloadStrategy.TorrentGotChunk(t, req)
2201
2202         // Cancel pending requests for this chunk.
2203         for _, c := range t.Conns {
2204                 if me.connCancel(t, c, req) {
2205                         me.replenishConnRequests(t, c)
2206                 }
2207         }
2208
2209         me.downloadStrategy.AssertNotRequested(t, req)
2210
2211         return nil
2212 }
2213
2214 func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) {
2215         p := t.Pieces[piece]
2216         if p.EverHashed && !correct {
2217                 log.Printf("%s: piece %d failed hash", t, piece)
2218                 failedPieceHashes.Add(1)
2219         }
2220         p.EverHashed = true
2221         if correct {
2222                 err := t.data.PieceCompleted(int(piece))
2223                 if err != nil {
2224                         log.Printf("error completing piece: %s", err)
2225                         correct = false
2226                 }
2227         }
2228         if correct {
2229                 p.Priority = piecePriorityNone
2230                 p.PendingChunkSpecs = nil
2231                 p.complete = true
2232                 p.Event.Broadcast()
2233                 me.downloadStrategy.TorrentGotPiece(t, int(piece))
2234         } else {
2235                 if len(p.PendingChunkSpecs) == 0 {
2236                         t.pendAllChunkSpecs(piece)
2237                 }
2238                 if p.Priority != piecePriorityNone {
2239                         me.openNewConns(t)
2240                 }
2241         }
2242         for _, conn := range t.Conns {
2243                 if correct {
2244                         conn.Post(pp.Message{
2245                                 Type:  pp.Have,
2246                                 Index: pp.Integer(piece),
2247                         })
2248                         // TODO: Cancel requests for this piece.
2249                         for r := range conn.Requests {
2250                                 if r.Index == piece {
2251                                         panic("wat")
2252                                 }
2253                         }
2254                         conn.pieceRequestOrder.DeletePiece(int(piece))
2255                 }
2256                 if t.wantPiece(int(piece)) && conn.PeerHasPiece(piece) {
2257                         t.connPendPiece(conn, int(piece))
2258                         me.replenishConnRequests(t, conn)
2259                 }
2260         }
2261         if t.haveAllPieces() && me.noUpload {
2262                 t.ceaseNetworking()
2263         }
2264         me.event.Broadcast()
2265 }
2266
2267 func (cl *Client) verifyPiece(t *torrent, index pp.Integer) {
2268         cl.mu.Lock()
2269         defer cl.mu.Unlock()
2270         p := t.Pieces[index]
2271         for p.Hashing || t.data == nil {
2272                 cl.event.Wait()
2273         }
2274         p.QueuedForHash = false
2275         if t.isClosed() || p.complete {
2276                 return
2277         }
2278         p.Hashing = true
2279         cl.mu.Unlock()
2280         sum := t.hashPiece(index)
2281         cl.mu.Lock()
2282         select {
2283         case <-t.closing:
2284                 return
2285         default:
2286         }
2287         p.Hashing = false
2288         cl.pieceHashed(t, index, sum == p.Hash)
2289 }
2290
2291 func (cl *Client) Torrent(ih InfoHash) (t Torrent, ok bool) {
2292         cl.mu.Lock()
2293         defer cl.mu.Unlock()
2294         t.torrent, ok = cl.torrents[ih]
2295         t.cl = cl
2296         return
2297 }
2298
2299 func (me *Client) Torrents() (ret []Torrent) {
2300         me.mu.Lock()
2301         for _, t := range me.torrents {
2302                 ret = append(ret, Torrent{me, t})
2303         }
2304         me.mu.Unlock()
2305         return
2306 }