]> Sergey Matveev's repositories - btrtrc.git/blob - client.go
Move half-open tracking into per-torrent
[btrtrc.git] / client.go
1 /*
2 Package torrent implements a torrent client.
3
4 Simple example:
5
6         c := &Client{}
7         c.Start()
8         defer c.Stop()
9         if err := c.AddTorrent(externalMetaInfoPackageSux); err != nil {
10                 return fmt.Errors("error adding torrent: %s", err)
11         }
12         c.WaitAll()
13         log.Print("erhmahgerd, torrent downloaded")
14
15 */
16 package torrent
17
18 import (
19         "bufio"
20         "bytes"
21         "container/heap"
22         "crypto/rand"
23         "crypto/sha1"
24         "errors"
25         "expvar"
26         "fmt"
27         "io"
28         "log"
29         mathRand "math/rand"
30         "net"
31         "os"
32         "strconv"
33         "strings"
34         "sync"
35         "syscall"
36         "time"
37
38         "github.com/h2so5/utp"
39
40         "github.com/anacrolix/libtorgo/metainfo"
41         "github.com/nsf/libtorgo/bencode"
42
43         "bitbucket.org/anacrolix/go.torrent/dht"
44         pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
45         "bitbucket.org/anacrolix/go.torrent/tracker"
46         _ "bitbucket.org/anacrolix/go.torrent/tracker/udp"
47         . "bitbucket.org/anacrolix/go.torrent/util"
48         "bitbucket.org/anacrolix/go.torrent/util/levelmu"
49 )
50
51 var (
52         unusedDownloadedChunksCount = expvar.NewInt("unusedDownloadedChunksCount")
53         chunksDownloadedCount       = expvar.NewInt("chunksDownloadedCount")
54         peersFoundByDHT             = expvar.NewInt("peersFoundByDHT")
55         peersFoundByPEX             = expvar.NewInt("peersFoundByPEX")
56         uploadChunksPosted          = expvar.NewInt("uploadChunksPosted")
57         unexpectedCancels           = expvar.NewInt("unexpectedCancels")
58         postedCancels               = expvar.NewInt("postedCancels")
59         duplicateConnsAvoided       = expvar.NewInt("duplicateConnsAvoided")
60         failedPieceHashes           = expvar.NewInt("failedPieceHashes")
61 )
62
63 const (
64         // Justification for set bits follows.
65         //
66         // Extension protocol: http://www.bittorrent.org/beps/bep_0010.html
67         // DHT: http://www.bittorrent.org/beps/bep_0005.html
68         extensionBytes = "\x00\x00\x00\x00\x00\x10\x00\x01"
69
70         socketsPerTorrent = 40
71 )
72
73 // Currently doesn't really queue, but should in the future.
74 func (cl *Client) queuePieceCheck(t *torrent, pieceIndex pp.Integer) {
75         piece := t.Pieces[pieceIndex]
76         if piece.QueuedForHash {
77                 return
78         }
79         piece.QueuedForHash = true
80         go cl.verifyPiece(t, pieceIndex)
81 }
82
83 func (cl *Client) queueFirstHash(t *torrent, piece int) {
84         p := t.Pieces[piece]
85         if p.EverHashed || p.Hashing || p.QueuedForHash {
86                 return
87         }
88         cl.queuePieceCheck(t, pp.Integer(piece))
89 }
90
91 // Queues the torrent data for the given region for download. The beginning of
92 // the region is given highest priority to allow a subsequent read at the same
93 // offset to return data ASAP.
94 func (me *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) error {
95         me.mu.Lock()
96         defer me.mu.Unlock()
97         t := me.torrent(ih)
98         if t == nil {
99                 return errors.New("no such active torrent")
100         }
101         if !t.haveInfo() {
102                 return errors.New("missing metadata")
103         }
104         me.downloadStrategy.TorrentPrioritize(t, off, len_)
105         for _, cn := range t.Conns {
106                 me.replenishConnRequests(t, cn)
107         }
108         return nil
109 }
110
111 type dataWait struct {
112         offset int64
113         ready  chan struct{}
114 }
115
116 type Client struct {
117         noUpload         bool
118         dataDir          string
119         halfOpenLimit    int
120         peerID           [20]byte
121         listeners        []net.Listener
122         disableTrackers  bool
123         downloadStrategy DownloadStrategy
124         dHT              *dht.Server
125
126         mu    levelmu.LevelMutex
127         event sync.Cond
128         quit  chan struct{}
129
130         handshaking int
131
132         torrents map[InfoHash]*torrent
133
134         dataWaits map[*torrent][]dataWait
135 }
136
137 func (me *Client) ListenAddr() (addr net.Addr) {
138         for _, l := range me.listeners {
139                 if addr != nil && l.Addr().String() != addr.String() {
140                         panic("listeners exist on different addresses")
141                 }
142                 addr = l.Addr()
143         }
144         return
145 }
146
147 func (cl *Client) WriteStatus(w io.Writer) {
148         cl.mu.LevelLock(1)
149         defer cl.mu.Unlock()
150         fmt.Fprintf(w, "Listening on %s\n", cl.ListenAddr())
151         fmt.Fprintf(w, "Peer ID: %q\n", cl.peerID)
152         fmt.Fprintf(w, "Handshaking: %d\n", cl.handshaking)
153         if cl.dHT != nil {
154                 fmt.Fprintf(w, "DHT nodes: %d\n", cl.dHT.NumNodes())
155                 fmt.Fprintf(w, "DHT Server ID: %x\n", cl.dHT.IDString())
156                 fmt.Fprintf(w, "DHT port: %d\n", addrPort(cl.dHT.LocalAddr()))
157                 fmt.Fprintf(w, "DHT announces: %d\n", cl.dHT.NumConfirmedAnnounces)
158         }
159         cl.downloadStrategy.WriteStatus(w)
160         fmt.Fprintln(w)
161         for _, t := range cl.torrents {
162                 fmt.Fprintf(w, "%s: %f%%\n", t.Name(), func() float32 {
163                         if !t.haveInfo() {
164                                 return 0
165                         } else {
166                                 return 100 * (1 - float32(t.BytesLeft())/float32(t.Length()))
167                         }
168                 }())
169                 fmt.Fprint(w, "Blocked reads:")
170                 for _, dw := range cl.dataWaits[t] {
171                         fmt.Fprintf(w, " %d", dw.offset)
172                 }
173                 fmt.Fprintln(w)
174                 t.WriteStatus(w)
175                 fmt.Fprintln(w)
176         }
177 }
178
179 // Read torrent data at the given offset. Returns ErrDataNotReady if the data
180 // isn't available.
181 func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err error) {
182         cl.mu.LevelLock(1)
183         defer cl.mu.Unlock()
184         t := cl.torrent(ih)
185         if t == nil {
186                 err = errors.New("unknown torrent")
187                 return
188         }
189         index := pp.Integer(off / int64(t.UsualPieceSize()))
190         // Reading outside the bounds of a file is an error.
191         if index < 0 {
192                 err = os.ErrInvalid
193                 return
194         }
195         if int(index) >= len(t.Pieces) {
196                 err = io.EOF
197                 return
198         }
199         piece := t.Pieces[index]
200         pieceOff := pp.Integer(off % int64(t.UsualPieceSize()))
201         pieceLeft := int(t.PieceLength(index) - pieceOff)
202         if pieceLeft <= 0 {
203                 err = io.EOF
204                 return
205         }
206         if len(p) > pieceLeft {
207                 p = p[:pieceLeft]
208         }
209         for cs, _ := range piece.PendingChunkSpecs {
210                 chunkOff := int64(pieceOff) - int64(cs.Begin)
211                 if chunkOff >= int64(t.PieceLength(index)) {
212                         panic(chunkOff)
213                 }
214                 if 0 <= chunkOff && chunkOff < int64(cs.Length) {
215                         // read begins in a pending chunk
216                         err = ErrDataNotReady
217                         return
218                 }
219                 // pending chunk caps available data
220                 if chunkOff < 0 && int64(len(p)) > -chunkOff {
221                         p = p[:-chunkOff]
222                 }
223         }
224         if len(p) == 0 {
225                 panic(len(p))
226         }
227         return t.Data.ReadAt(p, off)
228 }
229
230 func dhtAddr(listen net.Addr) (s string, err error) {
231         host, port, err := net.SplitHostPort(listen.String())
232         if err != nil {
233                 return
234         }
235         i64, err := strconv.ParseInt(port, 0, 0)
236         if err != nil {
237                 return
238         }
239         s = net.JoinHostPort(host, strconv.FormatInt(i64+1, 10))
240         return
241 }
242
243 func NewClient(cfg *Config) (cl *Client, err error) {
244         if cfg == nil {
245                 cfg = &Config{}
246         }
247
248         cl = &Client{
249                 noUpload:         cfg.NoUpload,
250                 disableTrackers:  cfg.DisableTrackers,
251                 downloadStrategy: cfg.DownloadStrategy,
252                 halfOpenLimit:    100,
253                 dataDir:          cfg.DataDir,
254
255                 quit:     make(chan struct{}),
256                 torrents: make(map[InfoHash]*torrent),
257
258                 dataWaits: make(map[*torrent][]dataWait),
259         }
260         cl.event.L = &cl.mu
261         cl.mu.Init(2)
262
263         o := copy(cl.peerID[:], BEP20)
264         _, err = rand.Read(cl.peerID[o:])
265         if err != nil {
266                 panic("error generating peer id")
267         }
268
269         if cl.downloadStrategy == nil {
270                 cl.downloadStrategy = &DefaultDownloadStrategy{}
271         }
272
273         // Returns the laddr string to listen on for the next Listen call.
274         listenAddr := func() string {
275                 if addr := cl.ListenAddr(); addr != nil {
276                         return addr.String()
277                 }
278                 return cfg.ListenAddr
279         }
280         var l net.Listener
281         if false {
282                 l, err = net.Listen("tcp", listenAddr())
283                 if err != nil {
284                         return
285                 }
286                 cl.listeners = append(cl.listeners, l)
287                 go cl.acceptConnections(l, false)
288         }
289         if true {
290                 l, err = utp.Listen("utp", listenAddr())
291                 if err != nil {
292                         return
293                 }
294                 cl.listeners = append(cl.listeners, l)
295                 go cl.acceptConnections(l, true)
296         }
297         if !cfg.NoDHT {
298                 var dhtAddr_ string
299                 dhtAddr_, err = dhtAddr(cl.ListenAddr())
300                 if err != nil {
301                         return
302                 }
303                 cl.dHT, err = dht.NewServer(&dht.ServerConfig{
304                         Addr: dhtAddr_,
305                 })
306                 if err != nil {
307                         return
308                 }
309         }
310
311         return
312 }
313
314 func (cl *Client) stopped() bool {
315         select {
316         case <-cl.quit:
317                 return true
318         default:
319                 return false
320         }
321 }
322
323 // Stops the client. All connections to peers are closed and all activity will
324 // come to a halt.
325 func (me *Client) Stop() {
326         me.mu.Lock()
327         close(me.quit)
328         me.event.Broadcast()
329         for _, t := range me.torrents {
330                 t.Close()
331         }
332         me.mu.Unlock()
333 }
334
335 func (cl *Client) acceptConnections(l net.Listener, utp bool) {
336         for {
337                 // We accept all connections immediately, because we don't what
338                 // torrent they're for.
339                 conn, err := l.Accept()
340                 select {
341                 case <-cl.quit:
342                         if conn != nil {
343                                 conn.Close()
344                         }
345                         return
346                 default:
347                 }
348                 if err != nil {
349                         log.Print(err)
350                         return
351                 }
352                 go func() {
353                         if err := cl.runConnection(conn, nil, peerSourceIncoming, utp); err != nil {
354                                 log.Print(err)
355                         }
356                 }()
357         }
358 }
359
360 func (me *Client) torrent(ih InfoHash) *torrent {
361         for _, t := range me.torrents {
362                 if t.InfoHash == ih {
363                         return t
364                 }
365         }
366         return nil
367 }
368
369 // Start the process of connecting to the given peer for the given torrent if
370 // appropriate.
371 func (me *Client) initiateConn(peer Peer, t *torrent) {
372         if peer.Id == me.peerID {
373                 return
374         }
375         addr := net.JoinHostPort(peer.IP.String(), fmt.Sprintf("%d", peer.Port))
376         if t.addrActive(addr) {
377                 duplicateConnsAvoided.Add(1)
378                 return
379         }
380         t.HalfOpen[addr] = struct{}{}
381         go func() {
382                 // Binding to the listener address and dialing via net.Dialer gives
383                 // "address in use" error. It seems it's not possible to dial out from
384                 // this address so that peers associate our local address with our
385                 // listen address.
386                 if false {
387                         conn, err := net.DialTimeout("tcp", addr, dialTimeout)
388                 } else {
389                         conn, err := (&utp.Dialer{Timeout: dialTimeout}).Dial("utp", addr)
390                 }
391
392                 // Whether or not the connection attempt succeeds, the half open
393                 // counter should be decremented, and new connection attempts made.
394                 go func() {
395                         me.mu.Lock()
396                         defer me.mu.Unlock()
397                         if _, ok := t.HalfOpen[addr]; !ok {
398                                 panic("invariant broken")
399                         }
400                         delete(t.HalfOpen, addr)
401                         me.openNewConns()
402                 }()
403
404                 if netOpErr, ok := err.(*net.OpError); ok {
405                         if netOpErr.Timeout() {
406                                 return
407                         }
408                         switch netOpErr.Err {
409                         case syscall.ECONNREFUSED, syscall.EHOSTUNREACH:
410                                 return
411                         }
412                 }
413                 if err != nil {
414                         log.Printf("error connecting to peer: %s %#v", err, err)
415                         return
416                 }
417                 // log.Printf("connected to %s", conn.RemoteAddr())
418                 err = me.runConnection(conn, t, peer.Source, true)
419                 if err != nil {
420                         log.Print(err)
421                 }
422         }()
423 }
424
425 // The port number for incoming peer connections. 0 if the client isn't
426 // listening.
427 func (cl *Client) incomingPeerPort() int {
428         listenAddr := cl.ListenAddr()
429         if listenAddr == nil {
430                 return 0
431         }
432         return addrPort(listenAddr)
433 }
434
435 // Convert a net.Addr to its compact IP representation. Either 4 or 16 bytes
436 // per "yourip" field of http://www.bittorrent.org/beps/bep_0010.html.
437 func addrCompactIP(addr net.Addr) (string, error) {
438         host, _, err := net.SplitHostPort(addr.String())
439         if err != nil {
440                 return "", err
441         }
442         ip := net.ParseIP(host)
443         if v4 := ip.To4(); v4 != nil {
444                 if len(v4) != 4 {
445                         panic(v4)
446                 }
447                 return string(v4), nil
448         }
449         return string(ip.To16()), nil
450 }
451
452 func handshakeWriter(w io.WriteCloser, bb <-chan []byte, done chan<- error) {
453         var err error
454         for b := range bb {
455                 _, err = w.Write(b)
456                 if err != nil {
457                         w.Close()
458                         break
459                 }
460         }
461         done <- err
462 }
463
464 type peerExtensionBytes [8]byte
465 type peerID [20]byte
466
467 type handshakeResult struct {
468         peerExtensionBytes
469         peerID
470         InfoHash
471 }
472
473 func handshake(sock io.ReadWriteCloser, ih *InfoHash, peerID [20]byte) (res handshakeResult, ok bool, err error) {
474         // Bytes to be sent to the peer. Should never block the sender.
475         postCh := make(chan []byte, 4)
476         // A single error value sent when the writer completes.
477         writeDone := make(chan error, 1)
478         // Performs writes to the socket and ensures posts don't block.
479         go handshakeWriter(sock, postCh, writeDone)
480
481         defer func() {
482                 close(postCh) // Done writing.
483                 if !ok {
484                         return
485                 }
486                 if err != nil {
487                         panic(err)
488                 }
489                 // Wait until writes complete before returning from handshake.
490                 err = <-writeDone
491                 if err != nil {
492                         err = fmt.Errorf("error writing during handshake: %s", err)
493                 }
494         }()
495
496         post := func(bb []byte) {
497                 select {
498                 case postCh <- bb:
499                 default:
500                         panic("mustn't block while posting")
501                 }
502         }
503
504         post([]byte(pp.Protocol))
505         post([]byte(extensionBytes))
506         if ih != nil { // We already know what we want.
507                 post(ih[:])
508                 post(peerID[:])
509         }
510         var b [68]byte
511         _, err = io.ReadFull(sock, b[:68])
512         if err != nil {
513                 err = nil
514                 return
515         }
516         if string(b[:20]) != pp.Protocol {
517                 return
518         }
519         CopyExact(&res.peerExtensionBytes, b[20:28])
520         CopyExact(&res.InfoHash, b[28:48])
521         CopyExact(&res.peerID, b[48:68])
522
523         if ih == nil { // We were waiting for the peer to tell us what they wanted.
524                 post(res.InfoHash[:])
525                 post(peerID[:])
526         }
527
528         ok = true
529         return
530 }
531
532 type peerConn struct {
533         net.Conn
534 }
535
536 func (pc peerConn) Read(b []byte) (n int, err error) {
537         err = pc.Conn.SetReadDeadline(time.Now().Add(150 * time.Second))
538         if err != nil {
539                 return
540         }
541         n, err = pc.Conn.Read(b)
542         if err != nil {
543                 if opError, ok := err.(*net.OpError); ok && opError.Op == "read" && opError.Err == syscall.ECONNRESET {
544                         err = io.EOF
545                 } else if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
546                         if n != 0 {
547                                 panic(n)
548                         }
549                         err = io.EOF
550                 }
551         }
552         return
553 }
554
555 func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerSource, uTP bool) (err error) {
556         if tcpConn, ok := sock.(*net.TCPConn); ok {
557                 tcpConn.SetLinger(0)
558         }
559         defer sock.Close()
560         me.mu.Lock()
561         me.handshaking++
562         me.mu.Unlock()
563         // One minute to complete handshake.
564         sock.SetDeadline(time.Now().Add(time.Minute))
565         hsRes, ok, err := handshake(sock, func() *InfoHash {
566                 if torrent == nil {
567                         return nil
568                 } else {
569                         return &torrent.InfoHash
570                 }
571         }(), me.peerID)
572         me.mu.Lock()
573         defer me.mu.Unlock()
574         if me.handshaking == 0 {
575                 panic("handshake count invariant is broken")
576         }
577         me.handshaking--
578         if err != nil {
579                 err = fmt.Errorf("error during handshake: %s", err)
580                 return
581         }
582         if !ok {
583                 return
584         }
585         torrent = me.torrent(hsRes.InfoHash)
586         if torrent == nil {
587                 return
588         }
589         sock.SetWriteDeadline(time.Time{})
590         sock = peerConn{sock}
591         conn := newConnection(sock, hsRes.peerExtensionBytes, hsRes.peerID, uTP)
592         defer conn.Close()
593         conn.Discovery = discovery
594         if !me.addConnection(torrent, conn) {
595                 return
596         }
597         if conn.PeerExtensionBytes[5]&0x10 != 0 {
598                 conn.Post(pp.Message{
599                         Type:       pp.Extended,
600                         ExtendedID: pp.HandshakeExtendedID,
601                         ExtendedPayload: func() []byte {
602                                 d := map[string]interface{}{
603                                         "m": map[string]int{
604                                                 "ut_metadata": 1,
605                                                 "ut_pex":      2,
606                                         },
607                                         "v": "go.torrent dev 20140825", // Just the date
608                                         // No upload queue is implemented yet.
609                                         "reqq": func() int {
610                                                 if me.noUpload {
611                                                         // No need to look strange if it costs us nothing.
612                                                         return 250
613                                                 } else {
614                                                         return 1
615                                                 }
616                                         }(),
617                                 }
618                                 if torrent.metadataSizeKnown() {
619                                         d["metadata_size"] = torrent.metadataSize()
620                                 }
621                                 if p := me.incomingPeerPort(); p != 0 {
622                                         d["p"] = p
623                                 }
624                                 yourip, err := addrCompactIP(conn.Socket.RemoteAddr())
625                                 if err != nil {
626                                         log.Printf("error calculating yourip field value in extension handshake: %s", err)
627                                 } else {
628                                         d["yourip"] = yourip
629                                 }
630                                 // log.Printf("sending %v", d)
631                                 b, err := bencode.Marshal(d)
632                                 if err != nil {
633                                         panic(err)
634                                 }
635                                 return b
636                         }(),
637                 })
638         }
639         if torrent.haveAnyPieces() {
640                 conn.Post(pp.Message{
641                         Type:     pp.Bitfield,
642                         Bitfield: torrent.bitfield(),
643                 })
644         }
645         if conn.PeerExtensionBytes[7]&0x01 != 0 && me.dHT != nil {
646                 addr, _ := me.dHT.LocalAddr().(*net.UDPAddr)
647                 conn.Post(pp.Message{
648                         Type: pp.Port,
649                         Port: uint16(addr.Port),
650                 })
651         }
652         err = me.connectionLoop(torrent, conn)
653         if err != nil {
654                 err = fmt.Errorf("during Connection loop with peer %q: %s", conn.PeerID, err)
655         }
656         me.dropConnection(torrent, conn)
657         return
658 }
659
660 func (me *Client) peerGotPiece(t *torrent, c *connection, piece int) {
661         for piece >= len(c.PeerPieces) {
662                 c.PeerPieces = append(c.PeerPieces, false)
663         }
664         c.PeerPieces[piece] = true
665         if !t.havePiece(piece) {
666                 me.replenishConnRequests(t, c)
667         }
668 }
669
670 func (me *Client) peerUnchoked(torrent *torrent, conn *connection) {
671         me.replenishConnRequests(torrent, conn)
672 }
673
674 func (cl *Client) connCancel(t *torrent, cn *connection, r request) (ok bool) {
675         ok = cn.Cancel(r)
676         if ok {
677                 postedCancels.Add(1)
678                 cl.downloadStrategy.DeleteRequest(t, r)
679         }
680         return
681 }
682
683 func (cl *Client) connDeleteRequest(t *torrent, cn *connection, r request) {
684         if !cn.RequestPending(r) {
685                 return
686         }
687         cl.downloadStrategy.DeleteRequest(t, r)
688         delete(cn.Requests, r)
689 }
690
691 func (cl *Client) requestPendingMetadata(t *torrent, c *connection) {
692         if t.haveInfo() {
693                 return
694         }
695         var pending []int
696         for index := 0; index < t.MetadataPieceCount(); index++ {
697                 if !t.HaveMetadataPiece(index) {
698                         pending = append(pending, index)
699                 }
700         }
701         for _, i := range mathRand.Perm(len(pending)) {
702                 c.Post(pp.Message{
703                         Type:       pp.Extended,
704                         ExtendedID: byte(c.PeerExtensionIDs["ut_metadata"]),
705                         ExtendedPayload: func() []byte {
706                                 b, err := bencode.Marshal(map[string]int{
707                                         "msg_type": 0,
708                                         "piece":    pending[i],
709                                 })
710                                 if err != nil {
711                                         panic(err)
712                                 }
713                                 return b
714                         }(),
715                 })
716         }
717 }
718
719 func (cl *Client) completedMetadata(t *torrent) {
720         h := sha1.New()
721         h.Write(t.MetaData)
722         var ih InfoHash
723         CopyExact(&ih, h.Sum(nil))
724         if ih != t.InfoHash {
725                 log.Print("bad metadata")
726                 t.InvalidateMetadata()
727                 return
728         }
729         var info metainfo.Info
730         err := bencode.Unmarshal(t.MetaData, &info)
731         if err != nil {
732                 log.Printf("error unmarshalling metadata: %s", err)
733                 t.InvalidateMetadata()
734                 return
735         }
736         // TODO(anacrolix): If this fails, I think something harsher should be
737         // done.
738         err = cl.setMetaData(t, info, t.MetaData)
739         if err != nil {
740                 log.Printf("error setting metadata: %s", err)
741                 t.InvalidateMetadata()
742                 return
743         }
744         log.Printf("%s: got metadata from peers", t)
745 }
746
747 // Process incoming ut_metadata message.
748 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *torrent, c *connection) (err error) {
749         var d map[string]int
750         err = bencode.Unmarshal(payload, &d)
751         if err != nil {
752                 err = fmt.Errorf("error unmarshalling payload: %s: %q", err, payload)
753                 return
754         }
755         msgType, ok := d["msg_type"]
756         if !ok {
757                 err = errors.New("missing msg_type field")
758                 return
759         }
760         piece := d["piece"]
761         switch msgType {
762         case pp.DataMetadataExtensionMsgType:
763                 if t.haveInfo() {
764                         break
765                 }
766                 t.SaveMetadataPiece(piece, payload[len(payload)-metadataPieceSize(d["total_size"], piece):])
767                 if !t.HaveAllMetadataPieces() {
768                         break
769                 }
770                 cl.completedMetadata(t)
771         case pp.RequestMetadataExtensionMsgType:
772                 if !t.HaveMetadataPiece(piece) {
773                         c.Post(t.NewMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
774                         break
775                 }
776                 start := (1 << 14) * piece
777                 c.Post(t.NewMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.MetaData[start:start+t.metadataPieceSize(piece)]))
778         case pp.RejectMetadataExtensionMsgType:
779         default:
780                 err = errors.New("unknown msg_type value")
781         }
782         return
783 }
784
785 type peerExchangeMessage struct {
786         Added      CompactPeers   `bencode:"added"`
787         AddedFlags []byte         `bencode:"added.f"`
788         Dropped    []tracker.Peer `bencode:"dropped"`
789 }
790
791 // Extracts the port as an integer from an address string.
792 func addrPort(addr net.Addr) int {
793         _, port, err := net.SplitHostPort(addr.String())
794         if err != nil {
795                 panic(err)
796         }
797         i64, err := strconv.ParseInt(port, 0, 0)
798         if err != nil {
799                 panic(err)
800         }
801         return int(i64)
802 }
803
804 // Processes incoming bittorrent messages. The client lock is held upon entry
805 // and exit.
806 func (me *Client) connectionLoop(t *torrent, c *connection) error {
807         decoder := pp.Decoder{
808                 R:         bufio.NewReaderSize(c.Socket, 20*1024),
809                 MaxLength: 256 * 1024,
810         }
811         for {
812                 me.mu.Unlock()
813                 var msg pp.Message
814                 err := decoder.Decode(&msg)
815                 me.mu.Lock()
816                 c.lastMessageReceived = time.Now()
817                 select {
818                 case <-c.closing:
819                         return nil
820                 default:
821                 }
822                 if err != nil {
823                         if me.stopped() || err == io.EOF {
824                                 return nil
825                         }
826                         return err
827                 }
828                 if msg.Keepalive {
829                         continue
830                 }
831                 switch msg.Type {
832                 case pp.Choke:
833                         c.PeerChoked = true
834                         for r := range c.Requests {
835                                 me.connDeleteRequest(t, c, r)
836                         }
837                 case pp.Unchoke:
838                         c.PeerChoked = false
839                         me.peerUnchoked(t, c)
840                 case pp.Interested:
841                         c.PeerInterested = true
842                         // TODO: This should be done from a dedicated unchoking routine.
843                         if me.noUpload {
844                                 break
845                         }
846                         c.Unchoke()
847                 case pp.NotInterested:
848                         c.PeerInterested = false
849                         c.Choke()
850                 case pp.Have:
851                         me.peerGotPiece(t, c, int(msg.Index))
852                 case pp.Request:
853                         if me.noUpload {
854                                 break
855                         }
856                         if c.PeerRequests == nil {
857                                 c.PeerRequests = make(map[request]struct{}, maxRequests)
858                         }
859                         request := newRequest(msg.Index, msg.Begin, msg.Length)
860                         // TODO: Requests should be satisfied from a dedicated upload routine.
861                         // c.PeerRequests[request] = struct{}{}
862                         p := make([]byte, msg.Length)
863                         n, err := t.Data.ReadAt(p, int64(t.PieceLength(0))*int64(msg.Index)+int64(msg.Begin))
864                         if err != nil {
865                                 return fmt.Errorf("reading t data to serve request %q: %s", request, err)
866                         }
867                         if n != int(msg.Length) {
868                                 return fmt.Errorf("bad request: %v", msg)
869                         }
870                         c.Post(pp.Message{
871                                 Type:  pp.Piece,
872                                 Index: msg.Index,
873                                 Begin: msg.Begin,
874                                 Piece: p,
875                         })
876                         uploadChunksPosted.Add(1)
877                 case pp.Cancel:
878                         req := newRequest(msg.Index, msg.Begin, msg.Length)
879                         if !c.PeerCancel(req) {
880                                 unexpectedCancels.Add(1)
881                         }
882                 case pp.Bitfield:
883                         if c.PeerPieces != nil {
884                                 err = errors.New("received unexpected bitfield")
885                                 break
886                         }
887                         if t.haveInfo() {
888                                 if len(msg.Bitfield) < t.NumPieces() {
889                                         err = errors.New("received invalid bitfield")
890                                         break
891                                 }
892                                 msg.Bitfield = msg.Bitfield[:t.NumPieces()]
893                         }
894                         c.PeerPieces = msg.Bitfield
895                         for index, has := range c.PeerPieces {
896                                 if has {
897                                         me.peerGotPiece(t, c, index)
898                                 }
899                         }
900                 case pp.Piece:
901                         err = me.downloadedChunk(t, c, &msg)
902                 case pp.Extended:
903                         switch msg.ExtendedID {
904                         case pp.HandshakeExtendedID:
905                                 // TODO: Create a bencode struct for this.
906                                 var d map[string]interface{}
907                                 err = bencode.Unmarshal(msg.ExtendedPayload, &d)
908                                 if err != nil {
909                                         err = fmt.Errorf("error decoding extended message payload: %s", err)
910                                         break
911                                 }
912                                 // log.Printf("got handshake: %v", d)
913                                 if reqq, ok := d["reqq"]; ok {
914                                         if i, ok := reqq.(int64); ok {
915                                                 c.PeerMaxRequests = int(i)
916                                         }
917                                 }
918                                 if v, ok := d["v"]; ok {
919                                         c.PeerClientName = v.(string)
920                                 }
921                                 m, ok := d["m"]
922                                 if !ok {
923                                         err = errors.New("handshake missing m item")
924                                         break
925                                 }
926                                 mTyped, ok := m.(map[string]interface{})
927                                 if !ok {
928                                         err = errors.New("handshake m value is not dict")
929                                         break
930                                 }
931                                 if c.PeerExtensionIDs == nil {
932                                         c.PeerExtensionIDs = make(map[string]int64, len(mTyped))
933                                 }
934                                 for name, v := range mTyped {
935                                         id, ok := v.(int64)
936                                         if !ok {
937                                                 log.Printf("bad handshake m item extension ID type: %T", v)
938                                                 continue
939                                         }
940                                         if id == 0 {
941                                                 delete(c.PeerExtensionIDs, name)
942                                         } else {
943                                                 c.PeerExtensionIDs[name] = id
944                                         }
945                                 }
946                                 metadata_sizeUntyped, ok := d["metadata_size"]
947                                 if ok {
948                                         metadata_size, ok := metadata_sizeUntyped.(int64)
949                                         if !ok {
950                                                 log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
951                                         } else {
952                                                 t.SetMetadataSize(metadata_size)
953                                         }
954                                 }
955                                 if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
956                                         me.requestPendingMetadata(t, c)
957                                 }
958                         case 1:
959                                 err = me.gotMetadataExtensionMsg(msg.ExtendedPayload, t, c)
960                                 if err != nil {
961                                         err = fmt.Errorf("error handling metadata extension message: %s", err)
962                                 }
963                         case 2:
964                                 var pexMsg peerExchangeMessage
965                                 err := bencode.Unmarshal(msg.ExtendedPayload, &pexMsg)
966                                 if err != nil {
967                                         err = fmt.Errorf("error unmarshalling PEX message: %s", err)
968                                         break
969                                 }
970                                 go func() {
971                                         err := me.AddPeers(t.InfoHash, func() (ret []Peer) {
972                                                 for _, cp := range pexMsg.Added {
973                                                         p := Peer{
974                                                                 IP:     make([]byte, 4),
975                                                                 Port:   int(cp.Port),
976                                                                 Source: peerSourcePEX,
977                                                         }
978                                                         if n := copy(p.IP, cp.IP[:]); n != 4 {
979                                                                 panic(n)
980                                                         }
981                                                         ret = append(ret, p)
982                                                 }
983                                                 return
984                                         }())
985                                         if err != nil {
986                                                 log.Printf("error adding PEX peers: %s", err)
987                                                 return
988                                         }
989                                         peersFoundByPEX.Add(int64(len(pexMsg.Added)))
990                                 }()
991                         default:
992                                 err = fmt.Errorf("unexpected extended message ID: %v", msg.ExtendedID)
993                         }
994                         if err != nil {
995                                 // That client uses its own extension IDs for outgoing message
996                                 // types, which is incorrect.
997                                 if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) ||
998                                         strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
999                                         return nil
1000                                 }
1001                                 // log.Printf("peer extension map: %#v", c.PeerExtensionIDs)
1002                         }
1003                 case pp.Port:
1004                         if me.dHT == nil {
1005                                 break
1006                         }
1007                         pingAddr, err := net.ResolveUDPAddr("", c.Socket.RemoteAddr().String())
1008                         if err != nil {
1009                                 panic(err)
1010                         }
1011                         if msg.Port != 0 {
1012                                 pingAddr.Port = int(msg.Port)
1013                         }
1014                         _, err = me.dHT.Ping(pingAddr)
1015                 default:
1016                         err = fmt.Errorf("received unknown message type: %#v", msg.Type)
1017                 }
1018                 if err != nil {
1019                         return err
1020                 }
1021         }
1022 }
1023
1024 func (me *Client) dropConnection(torrent *torrent, conn *connection) {
1025         for r := range conn.Requests {
1026                 me.connDeleteRequest(torrent, conn, r)
1027         }
1028         conn.Close()
1029         for i0, c := range torrent.Conns {
1030                 if c != conn {
1031                         continue
1032                 }
1033                 i1 := len(torrent.Conns) - 1
1034                 if i0 != i1 {
1035                         torrent.Conns[i0] = torrent.Conns[i1]
1036                 }
1037                 torrent.Conns = torrent.Conns[:i1]
1038                 return
1039         }
1040         panic("connection not found")
1041 }
1042
1043 func (me *Client) addConnection(t *torrent, c *connection) bool {
1044         if me.stopped() {
1045                 return false
1046         }
1047         select {
1048         case <-t.ceasingNetworking:
1049                 return false
1050         default:
1051         }
1052         for _, c0 := range t.Conns {
1053                 if c.PeerID == c0.PeerID {
1054                         // Already connected to a client with that ID.
1055                         return false
1056                 }
1057         }
1058         t.Conns = append(t.Conns, c)
1059         if len(t.Conns) > socketsPerTorrent {
1060                 wcs := t.worstConnsHeap()
1061                 heap.Pop(wcs).(*connection).Close()
1062         }
1063         return true
1064 }
1065
1066 func (me *Client) openNewConns() {
1067         for _, t := range me.torrents {
1068                 select {
1069                 case <-t.ceasingNetworking:
1070                         continue
1071                 default:
1072                 }
1073                 for len(t.Peers) != 0 {
1074                         if len(t.HalfOpen) >= me.halfOpenLimit {
1075                                 return
1076                         }
1077                         if len(t.HalfOpen)+me.handshaking+len(t.Conns) >= socketsPerTorrent {
1078                                 break
1079                         }
1080                         var (
1081                                 k peersKey
1082                                 p Peer
1083                         )
1084                         for k, p = range t.Peers {
1085                                 break
1086                         }
1087                         delete(t.Peers, k)
1088                         me.initiateConn(p, t)
1089                 }
1090         }
1091 }
1092
1093 // Adds peers to the swarm for the torrent corresponding to infoHash.
1094 func (me *Client) AddPeers(infoHash InfoHash, peers []Peer) error {
1095         me.mu.Lock()
1096         defer me.mu.Unlock()
1097         t := me.torrent(infoHash)
1098         if t == nil {
1099                 return errors.New("no such torrent")
1100         }
1101         t.AddPeers(peers)
1102         me.openNewConns()
1103         return nil
1104 }
1105
1106 func (cl *Client) setMetaData(t *torrent, md metainfo.Info, bytes []byte) (err error) {
1107         err = t.setMetadata(md, cl.dataDir, bytes)
1108         if err != nil {
1109                 return
1110         }
1111         // If the client intends to upload, it needs to know what state pieces are
1112         // in.
1113         if !cl.noUpload {
1114                 // Queue all pieces for hashing. This is done sequentially to avoid
1115                 // spamming goroutines.
1116                 for _, p := range t.Pieces {
1117                         p.QueuedForHash = true
1118                 }
1119                 go func() {
1120                         for i := range t.Pieces {
1121                                 cl.verifyPiece(t, pp.Integer(i))
1122                         }
1123                 }()
1124         }
1125
1126         cl.downloadStrategy.TorrentStarted(t)
1127         select {
1128         case t.gotMetainfo <- &metainfo.MetaInfo{
1129                 Info: metainfo.InfoEx{
1130                         Info: md,
1131                 },
1132                 CreationDate: time.Now().Unix(),
1133                 Comment:      "metadata set in client",
1134                 CreatedBy:    "go.torrent",
1135                 // TODO(anacrolix): Expose trackers given when torrent added.
1136         }:
1137         default:
1138                 panic("shouldn't block")
1139         }
1140         close(t.gotMetainfo)
1141         t.gotMetainfo = nil
1142         return
1143 }
1144
1145 // Prepare a Torrent without any attachment to a Client. That means we can
1146 // initialize fields all fields that don't require the Client without locking
1147 // it.
1148 func newTorrent(ih InfoHash, announceList [][]string, halfOpenLimit int) (t *torrent, err error) {
1149         t = &torrent{
1150                 InfoHash: ih,
1151                 Peers:    make(map[peersKey]Peer, 2000),
1152
1153                 closing:           make(chan struct{}),
1154                 ceasingNetworking: make(chan struct{}),
1155
1156                 gotMetainfo: make(chan *metainfo.MetaInfo, 1),
1157
1158                 HalfOpen: make(map[string]struct{}, halfOpenLimit),
1159         }
1160         t.GotMetainfo = t.gotMetainfo
1161         t.Trackers = make([][]tracker.Client, len(announceList))
1162         for tierIndex := range announceList {
1163                 tier := t.Trackers[tierIndex]
1164                 for _, url := range announceList[tierIndex] {
1165                         tr, err := tracker.New(url)
1166                         if err != nil {
1167                                 log.Print(err)
1168                                 continue
1169                         }
1170                         tier = append(tier, tr)
1171                 }
1172                 // The trackers within each tier must be shuffled before use.
1173                 // http://stackoverflow.com/a/12267471/149482
1174                 // http://www.bittorrent.org/beps/bep_0012.html#order-of-processing
1175                 for i := range tier {
1176                         j := mathRand.Intn(i + 1)
1177                         tier[i], tier[j] = tier[j], tier[i]
1178                 }
1179                 t.Trackers[tierIndex] = tier
1180         }
1181         return
1182 }
1183
1184 func (cl *Client) AddMagnet(uri string) (t *torrent, err error) {
1185         m, err := ParseMagnetURI(uri)
1186         if err != nil {
1187                 return
1188         }
1189         t, err = newTorrent(m.InfoHash, [][]string{m.Trackers}, cl.halfOpenLimit)
1190         if err != nil {
1191                 return
1192         }
1193         t.DisplayName = m.DisplayName
1194         cl.mu.Lock()
1195         defer cl.mu.Unlock()
1196         err = cl.addTorrent(t)
1197         if err != nil {
1198                 t.Close()
1199         }
1200         return
1201 }
1202
1203 func (me *Client) DropTorrent(infoHash InfoHash) (err error) {
1204         me.mu.Lock()
1205         defer me.mu.Unlock()
1206         t, ok := me.torrents[infoHash]
1207         if !ok {
1208                 err = fmt.Errorf("no such torrent")
1209                 return
1210         }
1211         err = t.Close()
1212         if err != nil {
1213                 panic(err)
1214         }
1215         delete(me.torrents, infoHash)
1216         me.downloadStrategy.TorrentStopped(t)
1217         for _, dw := range me.dataWaits[t] {
1218                 close(dw.ready)
1219         }
1220         delete(me.dataWaits, t)
1221         return
1222 }
1223
1224 func (me *Client) addTorrent(t *torrent) (err error) {
1225         if _, ok := me.torrents[t.InfoHash]; ok {
1226                 err = fmt.Errorf("torrent infohash collision")
1227                 return
1228         }
1229         me.torrents[t.InfoHash] = t
1230         if !me.disableTrackers {
1231                 go me.announceTorrent(t)
1232         }
1233         if me.dHT != nil {
1234                 go me.announceTorrentDHT(t)
1235         }
1236         return
1237 }
1238
1239 // Adds the torrent to the client.
1240 func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) (err error) {
1241         var ih InfoHash
1242         CopyExact(&ih, metaInfo.Info.Hash)
1243         t, err := newTorrent(ih, metaInfo.AnnounceList, me.halfOpenLimit)
1244         if err != nil {
1245                 return
1246         }
1247         me.mu.Lock()
1248         defer me.mu.Unlock()
1249         err = me.addTorrent(t)
1250         if err != nil {
1251                 return
1252         }
1253         err = me.setMetaData(t, metaInfo.Info.Info, metaInfo.Info.Bytes)
1254         if err != nil {
1255                 return
1256         }
1257         return
1258 }
1259
1260 func (me *Client) AddTorrentFromFile(name string) (err error) {
1261         mi, err := metainfo.LoadFromFile(name)
1262         if err != nil {
1263                 err = fmt.Errorf("error loading metainfo from file: %s", err)
1264                 return
1265         }
1266         return me.AddTorrent(mi)
1267 }
1268
1269 func (cl *Client) announceTorrentDHT(t *torrent) {
1270         for {
1271                 ps, err := cl.dHT.GetPeers(string(t.InfoHash[:]))
1272                 if err != nil {
1273                         log.Printf("error getting peers from dht: %s", err)
1274                         return
1275                 }
1276                 nextScrape := time.After(1 * time.Minute)
1277         getPeers:
1278                 for {
1279                         select {
1280                         case <-nextScrape:
1281                                 break getPeers
1282                         case cps, ok := <-ps.Values:
1283                                 if !ok {
1284                                         break getPeers
1285                                 }
1286                                 peersFoundByDHT.Add(int64(len(cps)))
1287                                 err = cl.AddPeers(t.InfoHash, func() (ret []Peer) {
1288                                         for _, cp := range cps {
1289                                                 ret = append(ret, Peer{
1290                                                         IP:     cp.IP[:],
1291                                                         Port:   int(cp.Port),
1292                                                         Source: peerSourceDHT,
1293                                                 })
1294                                         }
1295                                         return
1296                                 }())
1297                                 if err != nil {
1298                                         log.Printf("error adding peers from dht for torrent %q: %s", t, err)
1299                                         break getPeers
1300                                 }
1301                         case <-t.ceasingNetworking:
1302                                 ps.Close()
1303                                 return
1304                         }
1305                 }
1306                 ps.Close()
1307
1308                 // After a GetPeers, we can announce on the best nodes that gave us an
1309                 // announce token.
1310
1311                 port := cl.incomingPeerPort()
1312                 // If port is zero, then we're not listening, and there's nothing to
1313                 // announce.
1314                 if port != 0 {
1315                         // We can't allow the port to be implied as long as the UTP and
1316                         // DHT ports are different.
1317                         cl.dHT.AnnouncePeer(port, false, t.InfoHash.AsString())
1318                 }
1319         }
1320 }
1321
1322 func (cl *Client) announceTorrent(t *torrent) {
1323         req := tracker.AnnounceRequest{
1324                 Event:    tracker.Started,
1325                 NumWant:  -1,
1326                 Port:     int16(cl.incomingPeerPort()),
1327                 PeerId:   cl.peerID,
1328                 InfoHash: t.InfoHash,
1329         }
1330 newAnnounce:
1331         for {
1332                 select {
1333                 case <-t.ceasingNetworking:
1334                         return
1335                 default:
1336                 }
1337                 cl.mu.Lock()
1338                 req.Left = t.BytesLeft()
1339                 cl.mu.Unlock()
1340                 for _, tier := range t.Trackers {
1341                         for trIndex, tr := range tier {
1342                                 if err := tr.Connect(); err != nil {
1343                                         log.Print(err)
1344                                         continue
1345                                 }
1346                                 resp, err := tr.Announce(&req)
1347                                 if err != nil {
1348                                         log.Print(err)
1349                                         continue
1350                                 }
1351                                 var peers []Peer
1352                                 for _, peer := range resp.Peers {
1353                                         peers = append(peers, Peer{
1354                                                 IP:   peer.IP,
1355                                                 Port: peer.Port,
1356                                         })
1357                                 }
1358                                 err = cl.AddPeers(t.InfoHash, peers)
1359                                 if err != nil {
1360                                         log.Printf("error adding peers to torrent %s: %s", t, err)
1361                                 } else {
1362                                         log.Printf("%s: %d new peers from %s", t, len(peers), tr)
1363                                 }
1364                                 tier[0], tier[trIndex] = tier[trIndex], tier[0]
1365                                 time.Sleep(time.Second * time.Duration(resp.Interval))
1366                                 req.Event = tracker.None
1367                                 continue newAnnounce
1368                         }
1369                 }
1370                 time.Sleep(5 * time.Second)
1371         }
1372 }
1373
1374 func (cl *Client) allTorrentsCompleted() bool {
1375         for _, t := range cl.torrents {
1376                 if !t.haveInfo() {
1377                         return false
1378                 }
1379                 for e := t.IncompletePiecesByBytesLeft.Front(); e != nil; e = e.Next() {
1380                         i := e.Value.(int)
1381                         if t.Pieces[i].Complete() {
1382                                 continue
1383                         }
1384                         // If the piece isn't complete, make sure it's not because it's
1385                         // never been hashed.
1386                         cl.queueFirstHash(t, i)
1387                         return false
1388                 }
1389         }
1390         return true
1391 }
1392
1393 // Returns true when all torrents are completely downloaded and false if the
1394 // client is stopped before that.
1395 func (me *Client) WaitAll() bool {
1396         me.mu.Lock()
1397         defer me.mu.Unlock()
1398         for !me.allTorrentsCompleted() {
1399                 if me.stopped() {
1400                         return false
1401                 }
1402                 me.event.Wait()
1403         }
1404         return true
1405 }
1406
1407 func (cl *Client) assertRequestHeat() {
1408         dds, ok := cl.downloadStrategy.(*DefaultDownloadStrategy)
1409         if !ok {
1410                 return
1411         }
1412         for _, t := range cl.torrents {
1413                 m := make(map[request]int, 3000)
1414                 for _, cn := range t.Conns {
1415                         for r := range cn.Requests {
1416                                 m[r]++
1417                         }
1418                 }
1419                 for r, h := range dds.heat[t] {
1420                         if m[r] != h {
1421                                 panic(fmt.Sprintln(m[r], h))
1422                         }
1423                 }
1424         }
1425 }
1426
1427 func (me *Client) replenishConnRequests(t *torrent, c *connection) {
1428         if !t.haveInfo() {
1429                 return
1430         }
1431         for _, p := range me.downloadStrategy.FillRequests(t, c) {
1432                 // Make sure the state of pieces that would have been requested is
1433                 // known.
1434                 me.queueFirstHash(t, p)
1435         }
1436         //me.assertRequestHeat()
1437         if len(c.Requests) == 0 && !c.PeerChoked {
1438                 c.SetInterested(false)
1439         }
1440 }
1441
1442 // Handle a received chunk from a peer.
1443 func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) error {
1444         chunksDownloadedCount.Add(1)
1445
1446         req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
1447
1448         // Request has been satisfied.
1449         me.connDeleteRequest(t, c, req)
1450
1451         defer me.replenishConnRequests(t, c)
1452
1453         // Do we actually want this chunk?
1454         if _, ok := t.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok {
1455                 unusedDownloadedChunksCount.Add(1)
1456                 c.UnwantedChunksReceived++
1457                 return nil
1458         }
1459
1460         c.UsefulChunksReceived++
1461         c.lastUsefulChunkReceived = time.Now()
1462
1463         // Write the chunk out.
1464         err := t.WriteChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
1465         if err != nil {
1466                 return fmt.Errorf("error writing chunk: %s", err)
1467         }
1468
1469         // Record that we have the chunk.
1470         delete(t.Pieces[req.Index].PendingChunkSpecs, req.chunkSpec)
1471         me.dataReady(t, req)
1472         if len(t.Pieces[req.Index].PendingChunkSpecs) == 0 {
1473                 me.queuePieceCheck(t, req.Index)
1474         }
1475         t.PieceBytesLeftChanged(int(req.Index))
1476
1477         // Unprioritize the chunk.
1478         me.downloadStrategy.TorrentGotChunk(t, req)
1479
1480         // Cancel pending requests for this chunk.
1481         for _, c := range t.Conns {
1482                 if me.connCancel(t, c, req) {
1483                         me.replenishConnRequests(t, c)
1484                 }
1485         }
1486
1487         me.downloadStrategy.AssertNotRequested(t, req)
1488
1489         return nil
1490 }
1491
1492 func (cl *Client) dataReady(t *torrent, r request) {
1493         dws := cl.dataWaits[t]
1494         begin := t.requestOffset(r)
1495         end := begin + int64(r.Length)
1496         for i := 0; i < len(dws); {
1497                 dw := dws[i]
1498                 if begin <= dw.offset && dw.offset < end {
1499                         close(dw.ready)
1500                         dws[i] = dws[len(dws)-1]
1501                         dws = dws[:len(dws)-1]
1502                 } else {
1503                         i++
1504                 }
1505         }
1506         cl.dataWaits[t] = dws
1507 }
1508
1509 // Returns a channel that is closed when new data has become available in the
1510 // client.
1511 func (me *Client) DataWaiter(ih InfoHash, off int64) (ret <-chan struct{}) {
1512         me.mu.Lock()
1513         defer me.mu.Unlock()
1514         ch := make(chan struct{})
1515         ret = ch
1516         t := me.torrents[ih]
1517         if t == nil {
1518                 close(ch)
1519                 return
1520         }
1521         if r, ok := t.offsetRequest(off); !ok || t.haveChunk(r) {
1522                 close(ch)
1523                 return
1524         }
1525         me.dataWaits[t] = append(me.dataWaits[t], dataWait{
1526                 offset: off,
1527                 ready:  ch,
1528         })
1529         return
1530 }
1531
1532 func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) {
1533         p := t.Pieces[piece]
1534         if p.EverHashed && !correct {
1535                 log.Printf("%s: piece %d failed hash", t, piece)
1536                 failedPieceHashes.Add(1)
1537         }
1538         p.EverHashed = true
1539         if correct {
1540                 p.PendingChunkSpecs = nil
1541                 me.downloadStrategy.TorrentGotPiece(t, int(piece))
1542                 me.dataReady(t, request{
1543                         pp.Integer(piece),
1544                         chunkSpec{0, pp.Integer(t.PieceLength(piece))},
1545                 })
1546         } else {
1547                 if len(p.PendingChunkSpecs) == 0 {
1548                         t.pendAllChunkSpecs(piece)
1549                 }
1550         }
1551         t.PieceBytesLeftChanged(int(piece))
1552         for _, conn := range t.Conns {
1553                 if correct {
1554                         conn.Post(pp.Message{
1555                                 Type:  pp.Have,
1556                                 Index: pp.Integer(piece),
1557                         })
1558                         // TODO: Cancel requests for this piece.
1559                         for r := range conn.Requests {
1560                                 if r.Index == piece {
1561                                         panic("wat")
1562                                 }
1563                         }
1564                 }
1565                 // Do this even if the piece is correct because new first-hashings may
1566                 // need to be scheduled.
1567                 if conn.PeerHasPiece(piece) {
1568                         me.replenishConnRequests(t, conn)
1569                 }
1570         }
1571         if t.haveAllPieces() && me.noUpload {
1572                 t.CeaseNetworking()
1573         }
1574         me.event.Broadcast()
1575 }
1576
1577 func (cl *Client) verifyPiece(t *torrent, index pp.Integer) {
1578         cl.mu.Lock()
1579         defer cl.mu.Unlock()
1580         p := t.Pieces[index]
1581         for p.Hashing {
1582                 cl.event.Wait()
1583         }
1584         if t.isClosed() {
1585                 return
1586         }
1587         p.Hashing = true
1588         p.QueuedForHash = false
1589         cl.mu.Unlock()
1590         sum := t.HashPiece(index)
1591         cl.mu.Lock()
1592         select {
1593         case <-t.closing:
1594                 return
1595         default:
1596         }
1597         p.Hashing = false
1598         cl.pieceHashed(t, index, sum == p.Hash)
1599 }
1600
1601 func (me *Client) Torrents() (ret []*torrent) {
1602         me.mu.Lock()
1603         for _, t := range me.torrents {
1604                 ret = append(ret, t)
1605         }
1606         me.mu.Unlock()
1607         return
1608 }