]> Sergey Matveev's repositories - btrtrc.git/blob - client.go
Add util AddrIP and AddrPort functions
[btrtrc.git] / client.go
1 /*
2 Package torrent implements a torrent client.
3
4 Simple example:
5
6         c := &Client{}
7         c.Start()
8         defer c.Stop()
9         if err := c.AddTorrent(externalMetaInfoPackageSux); err != nil {
10                 return fmt.Errors("error adding torrent: %s", err)
11         }
12         c.WaitAll()
13         log.Print("erhmahgerd, torrent downloaded")
14
15 */
16 package torrent
17
18 import (
19         "bufio"
20         "bytes"
21         "container/heap"
22         "crypto/rand"
23         "crypto/sha1"
24         "errors"
25         "expvar"
26         "fmt"
27         "io"
28         "log"
29         mathRand "math/rand"
30         "net"
31         "os"
32         "strconv"
33         "strings"
34         "sync"
35         "syscall"
36         "time"
37
38         "github.com/h2so5/utp"
39
40         "github.com/anacrolix/libtorgo/metainfo"
41         "github.com/nsf/libtorgo/bencode"
42
43         "bitbucket.org/anacrolix/go.torrent/dht"
44         pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
45         "bitbucket.org/anacrolix/go.torrent/tracker"
46         _ "bitbucket.org/anacrolix/go.torrent/tracker/udp"
47         . "bitbucket.org/anacrolix/go.torrent/util"
48         "bitbucket.org/anacrolix/go.torrent/util/levelmu"
49 )
50
51 var (
52         unusedDownloadedChunksCount = expvar.NewInt("unusedDownloadedChunksCount")
53         chunksDownloadedCount       = expvar.NewInt("chunksDownloadedCount")
54         peersFoundByDHT             = expvar.NewInt("peersFoundByDHT")
55         peersFoundByPEX             = expvar.NewInt("peersFoundByPEX")
56         uploadChunksPosted          = expvar.NewInt("uploadChunksPosted")
57         unexpectedCancels           = expvar.NewInt("unexpectedCancels")
58         postedCancels               = expvar.NewInt("postedCancels")
59         duplicateConnsAvoided       = expvar.NewInt("duplicateConnsAvoided")
60         failedPieceHashes           = expvar.NewInt("failedPieceHashes")
61 )
62
63 const (
64         // Justification for set bits follows.
65         //
66         // Extension protocol: http://www.bittorrent.org/beps/bep_0010.html
67         // DHT: http://www.bittorrent.org/beps/bep_0005.html
68         extensionBytes = "\x00\x00\x00\x00\x00\x10\x00\x01"
69
70         socketsPerTorrent = 40
71 )
72
73 // Currently doesn't really queue, but should in the future.
74 func (cl *Client) queuePieceCheck(t *torrent, pieceIndex pp.Integer) {
75         piece := t.Pieces[pieceIndex]
76         if piece.QueuedForHash {
77                 return
78         }
79         piece.QueuedForHash = true
80         go cl.verifyPiece(t, pieceIndex)
81 }
82
83 func (cl *Client) queueFirstHash(t *torrent, piece int) {
84         p := t.Pieces[piece]
85         if p.EverHashed || p.Hashing || p.QueuedForHash {
86                 return
87         }
88         cl.queuePieceCheck(t, pp.Integer(piece))
89 }
90
91 // Queues the torrent data for the given region for download. The beginning of
92 // the region is given highest priority to allow a subsequent read at the same
93 // offset to return data ASAP.
94 func (me *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) error {
95         me.mu.Lock()
96         defer me.mu.Unlock()
97         t := me.torrent(ih)
98         if t == nil {
99                 return errors.New("no such active torrent")
100         }
101         if !t.haveInfo() {
102                 return errors.New("missing metadata")
103         }
104         me.downloadStrategy.TorrentPrioritize(t, off, len_)
105         for _, cn := range t.Conns {
106                 me.replenishConnRequests(t, cn)
107         }
108         return nil
109 }
110
111 type dataWait struct {
112         offset int64
113         ready  chan struct{}
114 }
115
116 type Client struct {
117         noUpload         bool
118         dataDir          string
119         halfOpenLimit    int
120         peerID           [20]byte
121         listeners        []net.Listener
122         disableTrackers  bool
123         downloadStrategy DownloadStrategy
124         dHT              *dht.Server
125
126         mu    levelmu.LevelMutex
127         event sync.Cond
128         quit  chan struct{}
129
130         handshaking int
131
132         torrents map[InfoHash]*torrent
133
134         dataWaits map[*torrent][]dataWait
135 }
136
137 func (me *Client) PeerID() string {
138         return string(me.peerID[:])
139 }
140
141 func (me *Client) ListenAddr() (addr net.Addr) {
142         for _, l := range me.listeners {
143                 if addr != nil && l.Addr().String() != addr.String() {
144                         panic("listeners exist on different addresses")
145                 }
146                 addr = l.Addr()
147         }
148         return
149 }
150
151 func (cl *Client) WriteStatus(w io.Writer) {
152         cl.mu.LevelLock(1)
153         defer cl.mu.Unlock()
154         fmt.Fprintf(w, "Listening on %s\n", cl.ListenAddr())
155         fmt.Fprintf(w, "Peer ID: %q\n", cl.peerID)
156         fmt.Fprintf(w, "Handshaking: %d\n", cl.handshaking)
157         if cl.dHT != nil {
158                 fmt.Fprintf(w, "DHT nodes: %d\n", cl.dHT.NumNodes())
159                 fmt.Fprintf(w, "DHT Server ID: %x\n", cl.dHT.IDString())
160                 fmt.Fprintf(w, "DHT port: %d\n", addrPort(cl.dHT.LocalAddr()))
161                 fmt.Fprintf(w, "DHT announces: %d\n", cl.dHT.NumConfirmedAnnounces)
162         }
163         cl.downloadStrategy.WriteStatus(w)
164         fmt.Fprintln(w)
165         for _, t := range cl.torrents {
166                 fmt.Fprintf(w, "%s: %f%%\n", t.Name(), func() float32 {
167                         if !t.haveInfo() {
168                                 return 0
169                         } else {
170                                 return 100 * (1 - float32(t.BytesLeft())/float32(t.Length()))
171                         }
172                 }())
173                 fmt.Fprint(w, "Blocked reads:")
174                 for _, dw := range cl.dataWaits[t] {
175                         fmt.Fprintf(w, " %d", dw.offset)
176                 }
177                 fmt.Fprintln(w)
178                 t.WriteStatus(w)
179                 fmt.Fprintln(w)
180         }
181 }
182
183 // Read torrent data at the given offset. Returns ErrDataNotReady if the data
184 // isn't available.
185 func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err error) {
186         cl.mu.LevelLock(1)
187         defer cl.mu.Unlock()
188         t := cl.torrent(ih)
189         if t == nil {
190                 err = errors.New("unknown torrent")
191                 return
192         }
193         index := pp.Integer(off / int64(t.UsualPieceSize()))
194         // Reading outside the bounds of a file is an error.
195         if index < 0 {
196                 err = os.ErrInvalid
197                 return
198         }
199         if int(index) >= len(t.Pieces) {
200                 err = io.EOF
201                 return
202         }
203         piece := t.Pieces[index]
204         pieceOff := pp.Integer(off % int64(t.UsualPieceSize()))
205         pieceLeft := int(t.PieceLength(index) - pieceOff)
206         if pieceLeft <= 0 {
207                 err = io.EOF
208                 return
209         }
210         if len(p) > pieceLeft {
211                 p = p[:pieceLeft]
212         }
213         for cs, _ := range piece.PendingChunkSpecs {
214                 chunkOff := int64(pieceOff) - int64(cs.Begin)
215                 if chunkOff >= int64(t.PieceLength(index)) {
216                         panic(chunkOff)
217                 }
218                 if 0 <= chunkOff && chunkOff < int64(cs.Length) {
219                         // read begins in a pending chunk
220                         err = ErrDataNotReady
221                         return
222                 }
223                 // pending chunk caps available data
224                 if chunkOff < 0 && int64(len(p)) > -chunkOff {
225                         p = p[:-chunkOff]
226                 }
227         }
228         if len(p) == 0 {
229                 panic(len(p))
230         }
231         return t.Data.ReadAt(p, off)
232 }
233
234 func dhtAddr(listen net.Addr) (s string, err error) {
235         host, port, err := net.SplitHostPort(listen.String())
236         if err != nil {
237                 return
238         }
239         i64, err := strconv.ParseInt(port, 0, 0)
240         if err != nil {
241                 return
242         }
243         s = net.JoinHostPort(host, strconv.FormatInt(i64+1, 10))
244         return
245 }
246
247 func NewClient(cfg *Config) (cl *Client, err error) {
248         if cfg == nil {
249                 cfg = &Config{}
250         }
251
252         cl = &Client{
253                 noUpload:         cfg.NoUpload,
254                 disableTrackers:  cfg.DisableTrackers,
255                 downloadStrategy: cfg.DownloadStrategy,
256                 halfOpenLimit:    100,
257                 dataDir:          cfg.DataDir,
258
259                 quit:     make(chan struct{}),
260                 torrents: make(map[InfoHash]*torrent),
261
262                 dataWaits: make(map[*torrent][]dataWait),
263         }
264         cl.event.L = &cl.mu
265         cl.mu.Init(2)
266
267         if cfg.PeerID != "" {
268                 CopyExact(&cl.peerID, cfg.PeerID)
269         } else {
270                 o := copy(cl.peerID[:], BEP20)
271                 _, err = rand.Read(cl.peerID[o:])
272                 if err != nil {
273                         panic("error generating peer id")
274                 }
275         }
276
277         if cl.downloadStrategy == nil {
278                 cl.downloadStrategy = &DefaultDownloadStrategy{}
279         }
280
281         // Returns the laddr string to listen on for the next Listen call.
282         listenAddr := func() string {
283                 if addr := cl.ListenAddr(); addr != nil {
284                         return addr.String()
285                 }
286                 return cfg.ListenAddr
287         }
288         var l net.Listener
289         if false {
290                 l, err = net.Listen("tcp", listenAddr())
291                 if err != nil {
292                         return
293                 }
294                 cl.listeners = append(cl.listeners, l)
295                 go cl.acceptConnections(l, false)
296         }
297         if true {
298                 l, err = utp.Listen("utp", listenAddr())
299                 if err != nil {
300                         return
301                 }
302                 cl.listeners = append(cl.listeners, l)
303                 go cl.acceptConnections(l, true)
304         }
305         if !cfg.NoDHT {
306                 var dhtAddr_ string
307                 dhtAddr_, err = dhtAddr(cl.ListenAddr())
308                 if err != nil {
309                         return
310                 }
311                 cl.dHT, err = dht.NewServer(&dht.ServerConfig{
312                         Addr: dhtAddr_,
313                 })
314                 if err != nil {
315                         return
316                 }
317         }
318
319         return
320 }
321
322 func (cl *Client) stopped() bool {
323         select {
324         case <-cl.quit:
325                 return true
326         default:
327                 return false
328         }
329 }
330
331 // Stops the client. All connections to peers are closed and all activity will
332 // come to a halt.
333 func (me *Client) Stop() {
334         me.mu.Lock()
335         close(me.quit)
336         me.event.Broadcast()
337         for _, t := range me.torrents {
338                 t.Close()
339         }
340         me.mu.Unlock()
341 }
342
343 func (cl *Client) acceptConnections(l net.Listener, utp bool) {
344         for {
345                 // We accept all connections immediately, because we don't what
346                 // torrent they're for.
347                 conn, err := l.Accept()
348                 select {
349                 case <-cl.quit:
350                         if conn != nil {
351                                 conn.Close()
352                         }
353                         return
354                 default:
355                 }
356                 if err != nil {
357                         log.Print(err)
358                         return
359                 }
360                 go func() {
361                         if err := cl.runConnection(conn, nil, peerSourceIncoming, utp); err != nil {
362                                 log.Print(err)
363                         }
364                 }()
365         }
366 }
367
368 func (me *Client) torrent(ih InfoHash) *torrent {
369         for _, t := range me.torrents {
370                 if t.InfoHash == ih {
371                         return t
372                 }
373         }
374         return nil
375 }
376
377 // Start the process of connecting to the given peer for the given torrent if
378 // appropriate.
379 func (me *Client) initiateConn(peer Peer, t *torrent) {
380         if peer.Id == me.peerID {
381                 return
382         }
383         addr := net.JoinHostPort(peer.IP.String(), fmt.Sprintf("%d", peer.Port))
384         if t.addrActive(addr) {
385                 duplicateConnsAvoided.Add(1)
386                 return
387         }
388         t.HalfOpen[addr] = struct{}{}
389         go func() {
390                 // Binding to the listener address and dialing via net.Dialer gives
391                 // "address in use" error. It seems it's not possible to dial out from
392                 // this address so that peers associate our local address with our
393                 // listen address.
394                 var (
395                         conn net.Conn
396                         err  error
397                 )
398                 if false {
399                         conn, err = net.DialTimeout("tcp", addr, dialTimeout)
400                 } else {
401                         conn, err = (&utp.Dialer{Timeout: dialTimeout}).Dial("utp", addr)
402                 }
403
404                 // Whether or not the connection attempt succeeds, the half open
405                 // counter should be decremented, and new connection attempts made.
406                 go func() {
407                         me.mu.Lock()
408                         defer me.mu.Unlock()
409                         if _, ok := t.HalfOpen[addr]; !ok {
410                                 panic("invariant broken")
411                         }
412                         delete(t.HalfOpen, addr)
413                         me.openNewConns()
414                 }()
415
416                 if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
417                         return
418                 }
419                 if netOpErr, ok := err.(*net.OpError); ok {
420                         switch netOpErr.Err {
421                         case syscall.ECONNREFUSED, syscall.EHOSTUNREACH:
422                                 return
423                         }
424                 }
425                 if err != nil {
426                         log.Printf("error connecting to peer: %s %#v", err, err)
427                         return
428                 }
429                 // log.Printf("connected to %s", conn.RemoteAddr())
430                 err = me.runConnection(conn, t, peer.Source, true)
431                 if err != nil {
432                         log.Print(err)
433                 }
434         }()
435 }
436
437 // The port number for incoming peer connections. 0 if the client isn't
438 // listening.
439 func (cl *Client) incomingPeerPort() int {
440         listenAddr := cl.ListenAddr()
441         if listenAddr == nil {
442                 return 0
443         }
444         return addrPort(listenAddr)
445 }
446
447 // Convert a net.Addr to its compact IP representation. Either 4 or 16 bytes
448 // per "yourip" field of http://www.bittorrent.org/beps/bep_0010.html.
449 func addrCompactIP(addr net.Addr) (string, error) {
450         host, _, err := net.SplitHostPort(addr.String())
451         if err != nil {
452                 return "", err
453         }
454         ip := net.ParseIP(host)
455         if v4 := ip.To4(); v4 != nil {
456                 if len(v4) != 4 {
457                         panic(v4)
458                 }
459                 return string(v4), nil
460         }
461         return string(ip.To16()), nil
462 }
463
464 func handshakeWriter(w io.WriteCloser, bb <-chan []byte, done chan<- error) {
465         var err error
466         for b := range bb {
467                 _, err = w.Write(b)
468                 if err != nil {
469                         w.Close()
470                         break
471                 }
472         }
473         done <- err
474 }
475
476 type peerExtensionBytes [8]byte
477 type peerID [20]byte
478
479 type handshakeResult struct {
480         peerExtensionBytes
481         peerID
482         InfoHash
483 }
484
485 func handshake(sock io.ReadWriteCloser, ih *InfoHash, peerID [20]byte) (res handshakeResult, ok bool, err error) {
486         // Bytes to be sent to the peer. Should never block the sender.
487         postCh := make(chan []byte, 4)
488         // A single error value sent when the writer completes.
489         writeDone := make(chan error, 1)
490         // Performs writes to the socket and ensures posts don't block.
491         go handshakeWriter(sock, postCh, writeDone)
492
493         defer func() {
494                 close(postCh) // Done writing.
495                 if !ok {
496                         return
497                 }
498                 if err != nil {
499                         panic(err)
500                 }
501                 // Wait until writes complete before returning from handshake.
502                 err = <-writeDone
503                 if err != nil {
504                         err = fmt.Errorf("error writing during handshake: %s", err)
505                 }
506         }()
507
508         post := func(bb []byte) {
509                 select {
510                 case postCh <- bb:
511                 default:
512                         panic("mustn't block while posting")
513                 }
514         }
515
516         post([]byte(pp.Protocol))
517         post([]byte(extensionBytes))
518         if ih != nil { // We already know what we want.
519                 post(ih[:])
520                 post(peerID[:])
521         }
522         var b [68]byte
523         _, err = io.ReadFull(sock, b[:68])
524         if err != nil {
525                 err = nil
526                 return
527         }
528         if string(b[:20]) != pp.Protocol {
529                 return
530         }
531         CopyExact(&res.peerExtensionBytes, b[20:28])
532         CopyExact(&res.InfoHash, b[28:48])
533         CopyExact(&res.peerID, b[48:68])
534
535         if ih == nil { // We were waiting for the peer to tell us what they wanted.
536                 post(res.InfoHash[:])
537                 post(peerID[:])
538         }
539
540         ok = true
541         return
542 }
543
544 type peerConn struct {
545         net.Conn
546 }
547
548 func (pc peerConn) Read(b []byte) (n int, err error) {
549         err = pc.Conn.SetReadDeadline(time.Now().Add(150 * time.Second))
550         if err != nil {
551                 return
552         }
553         n, err = pc.Conn.Read(b)
554         if err != nil {
555                 if opError, ok := err.(*net.OpError); ok && opError.Op == "read" && opError.Err == syscall.ECONNRESET {
556                         err = io.EOF
557                 } else if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
558                         if n != 0 {
559                                 panic(n)
560                         }
561                         err = io.EOF
562                 }
563         }
564         return
565 }
566
567 func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerSource, uTP bool) (err error) {
568         if tcpConn, ok := sock.(*net.TCPConn); ok {
569                 tcpConn.SetLinger(0)
570         }
571         defer sock.Close()
572         me.mu.Lock()
573         me.handshaking++
574         me.mu.Unlock()
575         // One minute to complete handshake.
576         sock.SetDeadline(time.Now().Add(time.Minute))
577         hsRes, ok, err := handshake(sock, func() *InfoHash {
578                 if torrent == nil {
579                         return nil
580                 } else {
581                         return &torrent.InfoHash
582                 }
583         }(), me.peerID)
584         me.mu.Lock()
585         defer me.mu.Unlock()
586         if me.handshaking == 0 {
587                 panic("handshake count invariant is broken")
588         }
589         me.handshaking--
590         if err != nil {
591                 err = fmt.Errorf("error during handshake: %s", err)
592                 return
593         }
594         if !ok {
595                 return
596         }
597         if hsRes.peerID == me.peerID {
598                 return
599         }
600         torrent = me.torrent(hsRes.InfoHash)
601         if torrent == nil {
602                 return
603         }
604         sock.SetWriteDeadline(time.Time{})
605         sock = peerConn{sock}
606         conn := newConnection(sock, hsRes.peerExtensionBytes, hsRes.peerID, uTP)
607         defer conn.Close()
608         conn.Discovery = discovery
609         if !me.addConnection(torrent, conn) {
610                 return
611         }
612         if conn.PeerExtensionBytes[5]&0x10 != 0 {
613                 conn.Post(pp.Message{
614                         Type:       pp.Extended,
615                         ExtendedID: pp.HandshakeExtendedID,
616                         ExtendedPayload: func() []byte {
617                                 d := map[string]interface{}{
618                                         "m": map[string]int{
619                                                 "ut_metadata": 1,
620                                                 "ut_pex":      2,
621                                         },
622                                         "v": "go.torrent dev 20140825", // Just the date
623                                         // No upload queue is implemented yet.
624                                         "reqq": func() int {
625                                                 if me.noUpload {
626                                                         // No need to look strange if it costs us nothing.
627                                                         return 250
628                                                 } else {
629                                                         return 1
630                                                 }
631                                         }(),
632                                 }
633                                 if torrent.metadataSizeKnown() {
634                                         d["metadata_size"] = torrent.metadataSize()
635                                 }
636                                 if p := me.incomingPeerPort(); p != 0 {
637                                         d["p"] = p
638                                 }
639                                 yourip, err := addrCompactIP(conn.Socket.RemoteAddr())
640                                 if err != nil {
641                                         log.Printf("error calculating yourip field value in extension handshake: %s", err)
642                                 } else {
643                                         d["yourip"] = yourip
644                                 }
645                                 // log.Printf("sending %v", d)
646                                 b, err := bencode.Marshal(d)
647                                 if err != nil {
648                                         panic(err)
649                                 }
650                                 return b
651                         }(),
652                 })
653         }
654         if torrent.haveAnyPieces() {
655                 conn.Post(pp.Message{
656                         Type:     pp.Bitfield,
657                         Bitfield: torrent.bitfield(),
658                 })
659         }
660         if conn.PeerExtensionBytes[7]&0x01 != 0 && me.dHT != nil {
661                 addr, _ := me.dHT.LocalAddr().(*net.UDPAddr)
662                 conn.Post(pp.Message{
663                         Type: pp.Port,
664                         Port: uint16(addr.Port),
665                 })
666         }
667         err = me.connectionLoop(torrent, conn)
668         if err != nil {
669                 err = fmt.Errorf("during Connection loop with peer %q: %s", conn.PeerID, err)
670         }
671         me.dropConnection(torrent, conn)
672         return
673 }
674
675 func (me *Client) peerGotPiece(t *torrent, c *connection, piece int) {
676         for piece >= len(c.PeerPieces) {
677                 c.PeerPieces = append(c.PeerPieces, false)
678         }
679         c.PeerPieces[piece] = true
680         if !t.havePiece(piece) {
681                 me.replenishConnRequests(t, c)
682         }
683 }
684
685 func (me *Client) peerUnchoked(torrent *torrent, conn *connection) {
686         me.replenishConnRequests(torrent, conn)
687 }
688
689 func (cl *Client) connCancel(t *torrent, cn *connection, r request) (ok bool) {
690         ok = cn.Cancel(r)
691         if ok {
692                 postedCancels.Add(1)
693                 cl.downloadStrategy.DeleteRequest(t, r)
694         }
695         return
696 }
697
698 func (cl *Client) connDeleteRequest(t *torrent, cn *connection, r request) {
699         if !cn.RequestPending(r) {
700                 return
701         }
702         cl.downloadStrategy.DeleteRequest(t, r)
703         delete(cn.Requests, r)
704 }
705
706 func (cl *Client) requestPendingMetadata(t *torrent, c *connection) {
707         if t.haveInfo() {
708                 return
709         }
710         var pending []int
711         for index := 0; index < t.MetadataPieceCount(); index++ {
712                 if !t.HaveMetadataPiece(index) {
713                         pending = append(pending, index)
714                 }
715         }
716         for _, i := range mathRand.Perm(len(pending)) {
717                 c.Post(pp.Message{
718                         Type:       pp.Extended,
719                         ExtendedID: byte(c.PeerExtensionIDs["ut_metadata"]),
720                         ExtendedPayload: func() []byte {
721                                 b, err := bencode.Marshal(map[string]int{
722                                         "msg_type": 0,
723                                         "piece":    pending[i],
724                                 })
725                                 if err != nil {
726                                         panic(err)
727                                 }
728                                 return b
729                         }(),
730                 })
731         }
732 }
733
734 func (cl *Client) completedMetadata(t *torrent) {
735         h := sha1.New()
736         h.Write(t.MetaData)
737         var ih InfoHash
738         CopyExact(&ih, h.Sum(nil))
739         if ih != t.InfoHash {
740                 log.Print("bad metadata")
741                 t.InvalidateMetadata()
742                 return
743         }
744         var info metainfo.Info
745         err := bencode.Unmarshal(t.MetaData, &info)
746         if err != nil {
747                 log.Printf("error unmarshalling metadata: %s", err)
748                 t.InvalidateMetadata()
749                 return
750         }
751         // TODO(anacrolix): If this fails, I think something harsher should be
752         // done.
753         err = cl.setMetaData(t, info, t.MetaData)
754         if err != nil {
755                 log.Printf("error setting metadata: %s", err)
756                 t.InvalidateMetadata()
757                 return
758         }
759         log.Printf("%s: got metadata from peers", t)
760 }
761
762 // Process incoming ut_metadata message.
763 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *torrent, c *connection) (err error) {
764         var d map[string]int
765         err = bencode.Unmarshal(payload, &d)
766         if err != nil {
767                 err = fmt.Errorf("error unmarshalling payload: %s: %q", err, payload)
768                 return
769         }
770         msgType, ok := d["msg_type"]
771         if !ok {
772                 err = errors.New("missing msg_type field")
773                 return
774         }
775         piece := d["piece"]
776         switch msgType {
777         case pp.DataMetadataExtensionMsgType:
778                 if t.haveInfo() {
779                         break
780                 }
781                 t.SaveMetadataPiece(piece, payload[len(payload)-metadataPieceSize(d["total_size"], piece):])
782                 if !t.HaveAllMetadataPieces() {
783                         break
784                 }
785                 cl.completedMetadata(t)
786         case pp.RequestMetadataExtensionMsgType:
787                 if !t.HaveMetadataPiece(piece) {
788                         c.Post(t.NewMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
789                         break
790                 }
791                 start := (1 << 14) * piece
792                 c.Post(t.NewMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.MetaData[start:start+t.metadataPieceSize(piece)]))
793         case pp.RejectMetadataExtensionMsgType:
794         default:
795                 err = errors.New("unknown msg_type value")
796         }
797         return
798 }
799
800 type peerExchangeMessage struct {
801         Added      CompactPeers   `bencode:"added"`
802         AddedFlags []byte         `bencode:"added.f"`
803         Dropped    []tracker.Peer `bencode:"dropped"`
804 }
805
806 // Extracts the port as an integer from an address string.
807 func addrPort(addr net.Addr) int {
808         return AddrPort(addr)
809 }
810
811 // Processes incoming bittorrent messages. The client lock is held upon entry
812 // and exit.
813 func (me *Client) connectionLoop(t *torrent, c *connection) error {
814         decoder := pp.Decoder{
815                 R:         bufio.NewReaderSize(c.Socket, 20*1024),
816                 MaxLength: 256 * 1024,
817         }
818         for {
819                 me.mu.Unlock()
820                 var msg pp.Message
821                 err := decoder.Decode(&msg)
822                 me.mu.Lock()
823                 c.lastMessageReceived = time.Now()
824                 select {
825                 case <-c.closing:
826                         return nil
827                 default:
828                 }
829                 if err != nil {
830                         if me.stopped() || err == io.EOF {
831                                 return nil
832                         }
833                         return err
834                 }
835                 if msg.Keepalive {
836                         continue
837                 }
838                 switch msg.Type {
839                 case pp.Choke:
840                         c.PeerChoked = true
841                         for r := range c.Requests {
842                                 me.connDeleteRequest(t, c, r)
843                         }
844                 case pp.Unchoke:
845                         c.PeerChoked = false
846                         me.peerUnchoked(t, c)
847                 case pp.Interested:
848                         c.PeerInterested = true
849                         // TODO: This should be done from a dedicated unchoking routine.
850                         if me.noUpload {
851                                 break
852                         }
853                         c.Unchoke()
854                 case pp.NotInterested:
855                         c.PeerInterested = false
856                         c.Choke()
857                 case pp.Have:
858                         me.peerGotPiece(t, c, int(msg.Index))
859                 case pp.Request:
860                         if me.noUpload {
861                                 break
862                         }
863                         if c.PeerRequests == nil {
864                                 c.PeerRequests = make(map[request]struct{}, maxRequests)
865                         }
866                         request := newRequest(msg.Index, msg.Begin, msg.Length)
867                         // TODO: Requests should be satisfied from a dedicated upload routine.
868                         // c.PeerRequests[request] = struct{}{}
869                         p := make([]byte, msg.Length)
870                         n, err := t.Data.ReadAt(p, int64(t.PieceLength(0))*int64(msg.Index)+int64(msg.Begin))
871                         if err != nil {
872                                 return fmt.Errorf("reading t data to serve request %q: %s", request, err)
873                         }
874                         if n != int(msg.Length) {
875                                 return fmt.Errorf("bad request: %v", msg)
876                         }
877                         c.Post(pp.Message{
878                                 Type:  pp.Piece,
879                                 Index: msg.Index,
880                                 Begin: msg.Begin,
881                                 Piece: p,
882                         })
883                         uploadChunksPosted.Add(1)
884                 case pp.Cancel:
885                         req := newRequest(msg.Index, msg.Begin, msg.Length)
886                         if !c.PeerCancel(req) {
887                                 unexpectedCancels.Add(1)
888                         }
889                 case pp.Bitfield:
890                         if c.PeerPieces != nil {
891                                 err = errors.New("received unexpected bitfield")
892                                 break
893                         }
894                         if t.haveInfo() {
895                                 if len(msg.Bitfield) < t.NumPieces() {
896                                         err = errors.New("received invalid bitfield")
897                                         break
898                                 }
899                                 msg.Bitfield = msg.Bitfield[:t.NumPieces()]
900                         }
901                         c.PeerPieces = msg.Bitfield
902                         for index, has := range c.PeerPieces {
903                                 if has {
904                                         me.peerGotPiece(t, c, index)
905                                 }
906                         }
907                 case pp.Piece:
908                         err = me.downloadedChunk(t, c, &msg)
909                 case pp.Extended:
910                         switch msg.ExtendedID {
911                         case pp.HandshakeExtendedID:
912                                 // TODO: Create a bencode struct for this.
913                                 var d map[string]interface{}
914                                 err = bencode.Unmarshal(msg.ExtendedPayload, &d)
915                                 if err != nil {
916                                         err = fmt.Errorf("error decoding extended message payload: %s", err)
917                                         break
918                                 }
919                                 // log.Printf("got handshake: %v", d)
920                                 if reqq, ok := d["reqq"]; ok {
921                                         if i, ok := reqq.(int64); ok {
922                                                 c.PeerMaxRequests = int(i)
923                                         }
924                                 }
925                                 if v, ok := d["v"]; ok {
926                                         c.PeerClientName = v.(string)
927                                 }
928                                 m, ok := d["m"]
929                                 if !ok {
930                                         err = errors.New("handshake missing m item")
931                                         break
932                                 }
933                                 mTyped, ok := m.(map[string]interface{})
934                                 if !ok {
935                                         err = errors.New("handshake m value is not dict")
936                                         break
937                                 }
938                                 if c.PeerExtensionIDs == nil {
939                                         c.PeerExtensionIDs = make(map[string]int64, len(mTyped))
940                                 }
941                                 for name, v := range mTyped {
942                                         id, ok := v.(int64)
943                                         if !ok {
944                                                 log.Printf("bad handshake m item extension ID type: %T", v)
945                                                 continue
946                                         }
947                                         if id == 0 {
948                                                 delete(c.PeerExtensionIDs, name)
949                                         } else {
950                                                 c.PeerExtensionIDs[name] = id
951                                         }
952                                 }
953                                 metadata_sizeUntyped, ok := d["metadata_size"]
954                                 if ok {
955                                         metadata_size, ok := metadata_sizeUntyped.(int64)
956                                         if !ok {
957                                                 log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
958                                         } else {
959                                                 t.SetMetadataSize(metadata_size)
960                                         }
961                                 }
962                                 if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
963                                         me.requestPendingMetadata(t, c)
964                                 }
965                         case 1:
966                                 err = me.gotMetadataExtensionMsg(msg.ExtendedPayload, t, c)
967                                 if err != nil {
968                                         err = fmt.Errorf("error handling metadata extension message: %s", err)
969                                 }
970                         case 2:
971                                 var pexMsg peerExchangeMessage
972                                 err := bencode.Unmarshal(msg.ExtendedPayload, &pexMsg)
973                                 if err != nil {
974                                         err = fmt.Errorf("error unmarshalling PEX message: %s", err)
975                                         break
976                                 }
977                                 go func() {
978                                         err := me.AddPeers(t.InfoHash, func() (ret []Peer) {
979                                                 for _, cp := range pexMsg.Added {
980                                                         p := Peer{
981                                                                 IP:     make([]byte, 4),
982                                                                 Port:   int(cp.Port),
983                                                                 Source: peerSourcePEX,
984                                                         }
985                                                         if n := copy(p.IP, cp.IP[:]); n != 4 {
986                                                                 panic(n)
987                                                         }
988                                                         ret = append(ret, p)
989                                                 }
990                                                 return
991                                         }())
992                                         if err != nil {
993                                                 log.Printf("error adding PEX peers: %s", err)
994                                                 return
995                                         }
996                                         peersFoundByPEX.Add(int64(len(pexMsg.Added)))
997                                 }()
998                         default:
999                                 err = fmt.Errorf("unexpected extended message ID: %v", msg.ExtendedID)
1000                         }
1001                         if err != nil {
1002                                 // That client uses its own extension IDs for outgoing message
1003                                 // types, which is incorrect.
1004                                 if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) ||
1005                                         strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
1006                                         return nil
1007                                 }
1008                                 // log.Printf("peer extension map: %#v", c.PeerExtensionIDs)
1009                         }
1010                 case pp.Port:
1011                         if me.dHT == nil {
1012                                 break
1013                         }
1014                         pingAddr, err := net.ResolveUDPAddr("", c.Socket.RemoteAddr().String())
1015                         if err != nil {
1016                                 panic(err)
1017                         }
1018                         if msg.Port != 0 {
1019                                 pingAddr.Port = int(msg.Port)
1020                         }
1021                         _, err = me.dHT.Ping(pingAddr)
1022                 default:
1023                         err = fmt.Errorf("received unknown message type: %#v", msg.Type)
1024                 }
1025                 if err != nil {
1026                         return err
1027                 }
1028         }
1029 }
1030
1031 func (me *Client) dropConnection(torrent *torrent, conn *connection) {
1032         for r := range conn.Requests {
1033                 me.connDeleteRequest(torrent, conn, r)
1034         }
1035         conn.Close()
1036         for i0, c := range torrent.Conns {
1037                 if c != conn {
1038                         continue
1039                 }
1040                 i1 := len(torrent.Conns) - 1
1041                 if i0 != i1 {
1042                         torrent.Conns[i0] = torrent.Conns[i1]
1043                 }
1044                 torrent.Conns = torrent.Conns[:i1]
1045                 return
1046         }
1047         panic("connection not found")
1048 }
1049
1050 func (me *Client) addConnection(t *torrent, c *connection) bool {
1051         if me.stopped() {
1052                 return false
1053         }
1054         select {
1055         case <-t.ceasingNetworking:
1056                 return false
1057         default:
1058         }
1059         for _, c0 := range t.Conns {
1060                 if c.PeerID == c0.PeerID {
1061                         // Already connected to a client with that ID.
1062                         return false
1063                 }
1064         }
1065         t.Conns = append(t.Conns, c)
1066         if len(t.Conns) > socketsPerTorrent {
1067                 wcs := t.worstConnsHeap()
1068                 heap.Pop(wcs).(*connection).Close()
1069         }
1070         return true
1071 }
1072
1073 func (me *Client) openNewConns() {
1074         for _, t := range me.torrents {
1075                 select {
1076                 case <-t.ceasingNetworking:
1077                         continue
1078                 default:
1079                 }
1080                 for len(t.Peers) != 0 {
1081                         if len(t.HalfOpen) >= me.halfOpenLimit {
1082                                 return
1083                         }
1084                         if len(t.HalfOpen)+me.handshaking+len(t.Conns) >= socketsPerTorrent {
1085                                 break
1086                         }
1087                         var (
1088                                 k peersKey
1089                                 p Peer
1090                         )
1091                         for k, p = range t.Peers {
1092                                 break
1093                         }
1094                         delete(t.Peers, k)
1095                         me.initiateConn(p, t)
1096                 }
1097         }
1098 }
1099
1100 // Adds peers to the swarm for the torrent corresponding to infoHash.
1101 func (me *Client) AddPeers(infoHash InfoHash, peers []Peer) error {
1102         me.mu.Lock()
1103         defer me.mu.Unlock()
1104         t := me.torrent(infoHash)
1105         if t == nil {
1106                 return errors.New("no such torrent")
1107         }
1108         t.AddPeers(peers)
1109         me.openNewConns()
1110         return nil
1111 }
1112
1113 func (cl *Client) setMetaData(t *torrent, md metainfo.Info, bytes []byte) (err error) {
1114         err = t.setMetadata(md, cl.dataDir, bytes)
1115         if err != nil {
1116                 return
1117         }
1118         // If the client intends to upload, it needs to know what state pieces are
1119         // in.
1120         if !cl.noUpload {
1121                 // Queue all pieces for hashing. This is done sequentially to avoid
1122                 // spamming goroutines.
1123                 for _, p := range t.Pieces {
1124                         p.QueuedForHash = true
1125                 }
1126                 go func() {
1127                         for i := range t.Pieces {
1128                                 cl.verifyPiece(t, pp.Integer(i))
1129                         }
1130                 }()
1131         }
1132
1133         cl.downloadStrategy.TorrentStarted(t)
1134         select {
1135         case t.gotMetainfo <- &metainfo.MetaInfo{
1136                 Info: metainfo.InfoEx{
1137                         Info: md,
1138                 },
1139                 CreationDate: time.Now().Unix(),
1140                 Comment:      "metadata set in client",
1141                 CreatedBy:    "go.torrent",
1142                 // TODO(anacrolix): Expose trackers given when torrent added.
1143         }:
1144         default:
1145                 panic("shouldn't block")
1146         }
1147         close(t.gotMetainfo)
1148         t.gotMetainfo = nil
1149         return
1150 }
1151
1152 // Prepare a Torrent without any attachment to a Client. That means we can
1153 // initialize fields all fields that don't require the Client without locking
1154 // it.
1155 func newTorrent(ih InfoHash, announceList [][]string, halfOpenLimit int) (t *torrent, err error) {
1156         t = &torrent{
1157                 InfoHash: ih,
1158                 Peers:    make(map[peersKey]Peer, 2000),
1159
1160                 closing:           make(chan struct{}),
1161                 ceasingNetworking: make(chan struct{}),
1162
1163                 gotMetainfo: make(chan *metainfo.MetaInfo, 1),
1164
1165                 HalfOpen: make(map[string]struct{}, halfOpenLimit),
1166         }
1167         t.GotMetainfo = t.gotMetainfo
1168         t.Trackers = make([][]tracker.Client, len(announceList))
1169         for tierIndex := range announceList {
1170                 tier := t.Trackers[tierIndex]
1171                 for _, url := range announceList[tierIndex] {
1172                         tr, err := tracker.New(url)
1173                         if err != nil {
1174                                 log.Print(err)
1175                                 continue
1176                         }
1177                         tier = append(tier, tr)
1178                 }
1179                 // The trackers within each tier must be shuffled before use.
1180                 // http://stackoverflow.com/a/12267471/149482
1181                 // http://www.bittorrent.org/beps/bep_0012.html#order-of-processing
1182                 for i := range tier {
1183                         j := mathRand.Intn(i + 1)
1184                         tier[i], tier[j] = tier[j], tier[i]
1185                 }
1186                 t.Trackers[tierIndex] = tier
1187         }
1188         return
1189 }
1190
1191 func (cl *Client) AddMagnet(uri string) (t *torrent, err error) {
1192         m, err := ParseMagnetURI(uri)
1193         if err != nil {
1194                 return
1195         }
1196         t, err = newTorrent(m.InfoHash, [][]string{m.Trackers}, cl.halfOpenLimit)
1197         if err != nil {
1198                 return
1199         }
1200         t.DisplayName = m.DisplayName
1201         cl.mu.Lock()
1202         defer cl.mu.Unlock()
1203         err = cl.addTorrent(t)
1204         if err != nil {
1205                 t.Close()
1206         }
1207         return
1208 }
1209
1210 func (me *Client) DropTorrent(infoHash InfoHash) (err error) {
1211         me.mu.Lock()
1212         defer me.mu.Unlock()
1213         t, ok := me.torrents[infoHash]
1214         if !ok {
1215                 err = fmt.Errorf("no such torrent")
1216                 return
1217         }
1218         err = t.Close()
1219         if err != nil {
1220                 panic(err)
1221         }
1222         delete(me.torrents, infoHash)
1223         me.downloadStrategy.TorrentStopped(t)
1224         for _, dw := range me.dataWaits[t] {
1225                 close(dw.ready)
1226         }
1227         delete(me.dataWaits, t)
1228         return
1229 }
1230
1231 func (me *Client) addTorrent(t *torrent) (err error) {
1232         if _, ok := me.torrents[t.InfoHash]; ok {
1233                 err = fmt.Errorf("torrent infohash collision")
1234                 return
1235         }
1236         me.torrents[t.InfoHash] = t
1237         if !me.disableTrackers {
1238                 go me.announceTorrent(t)
1239         }
1240         if me.dHT != nil {
1241                 go me.announceTorrentDHT(t)
1242         }
1243         return
1244 }
1245
1246 // Adds the torrent to the client.
1247 func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) (err error) {
1248         var ih InfoHash
1249         CopyExact(&ih, metaInfo.Info.Hash)
1250         t, err := newTorrent(ih, metaInfo.AnnounceList, me.halfOpenLimit)
1251         if err != nil {
1252                 return
1253         }
1254         me.mu.Lock()
1255         defer me.mu.Unlock()
1256         err = me.addTorrent(t)
1257         if err != nil {
1258                 return
1259         }
1260         err = me.setMetaData(t, metaInfo.Info.Info, metaInfo.Info.Bytes)
1261         if err != nil {
1262                 return
1263         }
1264         return
1265 }
1266
1267 func (me *Client) AddTorrentFromFile(name string) (err error) {
1268         mi, err := metainfo.LoadFromFile(name)
1269         if err != nil {
1270                 err = fmt.Errorf("error loading metainfo from file: %s", err)
1271                 return
1272         }
1273         return me.AddTorrent(mi)
1274 }
1275
1276 func (cl *Client) announceTorrentDHT(t *torrent) {
1277         for {
1278                 ps, err := cl.dHT.GetPeers(string(t.InfoHash[:]))
1279                 if err != nil {
1280                         log.Printf("error getting peers from dht: %s", err)
1281                         return
1282                 }
1283                 nextScrape := time.After(1 * time.Minute)
1284         getPeers:
1285                 for {
1286                         select {
1287                         case <-nextScrape:
1288                                 break getPeers
1289                         case cps, ok := <-ps.Values:
1290                                 if !ok {
1291                                         break getPeers
1292                                 }
1293                                 peersFoundByDHT.Add(int64(len(cps)))
1294                                 err = cl.AddPeers(t.InfoHash, func() (ret []Peer) {
1295                                         for _, cp := range cps {
1296                                                 ret = append(ret, Peer{
1297                                                         IP:     cp.IP[:],
1298                                                         Port:   int(cp.Port),
1299                                                         Source: peerSourceDHT,
1300                                                 })
1301                                         }
1302                                         return
1303                                 }())
1304                                 if err != nil {
1305                                         log.Printf("error adding peers from dht for torrent %q: %s", t, err)
1306                                         break getPeers
1307                                 }
1308                         case <-t.ceasingNetworking:
1309                                 ps.Close()
1310                                 return
1311                         }
1312                 }
1313                 ps.Close()
1314
1315                 // After a GetPeers, we can announce on the best nodes that gave us an
1316                 // announce token.
1317
1318                 port := cl.incomingPeerPort()
1319                 // If port is zero, then we're not listening, and there's nothing to
1320                 // announce.
1321                 if port != 0 {
1322                         // We can't allow the port to be implied as long as the UTP and
1323                         // DHT ports are different.
1324                         cl.dHT.AnnouncePeer(port, false, t.InfoHash.AsString())
1325                 }
1326         }
1327 }
1328
1329 func (cl *Client) announceTorrent(t *torrent) {
1330         req := tracker.AnnounceRequest{
1331                 Event:    tracker.Started,
1332                 NumWant:  -1,
1333                 Port:     int16(cl.incomingPeerPort()),
1334                 PeerId:   cl.peerID,
1335                 InfoHash: t.InfoHash,
1336         }
1337 newAnnounce:
1338         for {
1339                 select {
1340                 case <-t.ceasingNetworking:
1341                         return
1342                 default:
1343                 }
1344                 cl.mu.Lock()
1345                 req.Left = t.BytesLeft()
1346                 cl.mu.Unlock()
1347                 for _, tier := range t.Trackers {
1348                         for trIndex, tr := range tier {
1349                                 if err := tr.Connect(); err != nil {
1350                                         log.Print(err)
1351                                         continue
1352                                 }
1353                                 resp, err := tr.Announce(&req)
1354                                 if err != nil {
1355                                         log.Print(err)
1356                                         continue
1357                                 }
1358                                 var peers []Peer
1359                                 for _, peer := range resp.Peers {
1360                                         peers = append(peers, Peer{
1361                                                 IP:   peer.IP,
1362                                                 Port: peer.Port,
1363                                         })
1364                                 }
1365                                 err = cl.AddPeers(t.InfoHash, peers)
1366                                 if err != nil {
1367                                         log.Printf("error adding peers to torrent %s: %s", t, err)
1368                                 } else {
1369                                         log.Printf("%s: %d new peers from %s", t, len(peers), tr)
1370                                 }
1371                                 tier[0], tier[trIndex] = tier[trIndex], tier[0]
1372                                 time.Sleep(time.Second * time.Duration(resp.Interval))
1373                                 req.Event = tracker.None
1374                                 continue newAnnounce
1375                         }
1376                 }
1377                 time.Sleep(5 * time.Second)
1378         }
1379 }
1380
1381 func (cl *Client) allTorrentsCompleted() bool {
1382         for _, t := range cl.torrents {
1383                 if !t.haveInfo() {
1384                         return false
1385                 }
1386                 for e := t.IncompletePiecesByBytesLeft.Front(); e != nil; e = e.Next() {
1387                         i := e.Value.(int)
1388                         if t.Pieces[i].Complete() {
1389                                 continue
1390                         }
1391                         // If the piece isn't complete, make sure it's not because it's
1392                         // never been hashed.
1393                         cl.queueFirstHash(t, i)
1394                         return false
1395                 }
1396         }
1397         return true
1398 }
1399
1400 // Returns true when all torrents are completely downloaded and false if the
1401 // client is stopped before that.
1402 func (me *Client) WaitAll() bool {
1403         me.mu.Lock()
1404         defer me.mu.Unlock()
1405         for !me.allTorrentsCompleted() {
1406                 if me.stopped() {
1407                         return false
1408                 }
1409                 me.event.Wait()
1410         }
1411         return true
1412 }
1413
1414 func (cl *Client) assertRequestHeat() {
1415         dds, ok := cl.downloadStrategy.(*DefaultDownloadStrategy)
1416         if !ok {
1417                 return
1418         }
1419         for _, t := range cl.torrents {
1420                 m := make(map[request]int, 3000)
1421                 for _, cn := range t.Conns {
1422                         for r := range cn.Requests {
1423                                 m[r]++
1424                         }
1425                 }
1426                 for r, h := range dds.heat[t] {
1427                         if m[r] != h {
1428                                 panic(fmt.Sprintln(m[r], h))
1429                         }
1430                 }
1431         }
1432 }
1433
1434 func (me *Client) replenishConnRequests(t *torrent, c *connection) {
1435         if !t.haveInfo() {
1436                 return
1437         }
1438         for _, p := range me.downloadStrategy.FillRequests(t, c) {
1439                 // Make sure the state of pieces that would have been requested is
1440                 // known.
1441                 me.queueFirstHash(t, p)
1442         }
1443         //me.assertRequestHeat()
1444         if len(c.Requests) == 0 && !c.PeerChoked {
1445                 c.SetInterested(false)
1446         }
1447 }
1448
1449 // Handle a received chunk from a peer.
1450 func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) error {
1451         chunksDownloadedCount.Add(1)
1452
1453         req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
1454
1455         // Request has been satisfied.
1456         me.connDeleteRequest(t, c, req)
1457
1458         defer me.replenishConnRequests(t, c)
1459
1460         // Do we actually want this chunk?
1461         if _, ok := t.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok {
1462                 unusedDownloadedChunksCount.Add(1)
1463                 c.UnwantedChunksReceived++
1464                 return nil
1465         }
1466
1467         c.UsefulChunksReceived++
1468         c.lastUsefulChunkReceived = time.Now()
1469
1470         // Write the chunk out.
1471         err := t.WriteChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
1472         if err != nil {
1473                 return fmt.Errorf("error writing chunk: %s", err)
1474         }
1475
1476         // Record that we have the chunk.
1477         delete(t.Pieces[req.Index].PendingChunkSpecs, req.chunkSpec)
1478         me.dataReady(t, req)
1479         if len(t.Pieces[req.Index].PendingChunkSpecs) == 0 {
1480                 me.queuePieceCheck(t, req.Index)
1481         }
1482         t.PieceBytesLeftChanged(int(req.Index))
1483
1484         // Unprioritize the chunk.
1485         me.downloadStrategy.TorrentGotChunk(t, req)
1486
1487         // Cancel pending requests for this chunk.
1488         for _, c := range t.Conns {
1489                 if me.connCancel(t, c, req) {
1490                         me.replenishConnRequests(t, c)
1491                 }
1492         }
1493
1494         me.downloadStrategy.AssertNotRequested(t, req)
1495
1496         return nil
1497 }
1498
1499 func (cl *Client) dataReady(t *torrent, r request) {
1500         dws := cl.dataWaits[t]
1501         begin := t.requestOffset(r)
1502         end := begin + int64(r.Length)
1503         for i := 0; i < len(dws); {
1504                 dw := dws[i]
1505                 if begin <= dw.offset && dw.offset < end {
1506                         close(dw.ready)
1507                         dws[i] = dws[len(dws)-1]
1508                         dws = dws[:len(dws)-1]
1509                 } else {
1510                         i++
1511                 }
1512         }
1513         cl.dataWaits[t] = dws
1514 }
1515
1516 // Returns a channel that is closed when new data has become available in the
1517 // client.
1518 func (me *Client) DataWaiter(ih InfoHash, off int64) (ret <-chan struct{}) {
1519         me.mu.Lock()
1520         defer me.mu.Unlock()
1521         ch := make(chan struct{})
1522         ret = ch
1523         t := me.torrents[ih]
1524         if t == nil {
1525                 close(ch)
1526                 return
1527         }
1528         if r, ok := t.offsetRequest(off); !ok || t.haveChunk(r) {
1529                 close(ch)
1530                 return
1531         }
1532         me.dataWaits[t] = append(me.dataWaits[t], dataWait{
1533                 offset: off,
1534                 ready:  ch,
1535         })
1536         return
1537 }
1538
1539 func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) {
1540         p := t.Pieces[piece]
1541         if p.EverHashed && !correct {
1542                 log.Printf("%s: piece %d failed hash", t, piece)
1543                 failedPieceHashes.Add(1)
1544         }
1545         p.EverHashed = true
1546         if correct {
1547                 p.PendingChunkSpecs = nil
1548                 me.downloadStrategy.TorrentGotPiece(t, int(piece))
1549                 me.dataReady(t, request{
1550                         pp.Integer(piece),
1551                         chunkSpec{0, pp.Integer(t.PieceLength(piece))},
1552                 })
1553         } else {
1554                 if len(p.PendingChunkSpecs) == 0 {
1555                         t.pendAllChunkSpecs(piece)
1556                 }
1557         }
1558         t.PieceBytesLeftChanged(int(piece))
1559         for _, conn := range t.Conns {
1560                 if correct {
1561                         conn.Post(pp.Message{
1562                                 Type:  pp.Have,
1563                                 Index: pp.Integer(piece),
1564                         })
1565                         // TODO: Cancel requests for this piece.
1566                         for r := range conn.Requests {
1567                                 if r.Index == piece {
1568                                         panic("wat")
1569                                 }
1570                         }
1571                 }
1572                 // Do this even if the piece is correct because new first-hashings may
1573                 // need to be scheduled.
1574                 if conn.PeerHasPiece(piece) {
1575                         me.replenishConnRequests(t, conn)
1576                 }
1577         }
1578         if t.haveAllPieces() && me.noUpload {
1579                 t.CeaseNetworking()
1580         }
1581         me.event.Broadcast()
1582 }
1583
1584 func (cl *Client) verifyPiece(t *torrent, index pp.Integer) {
1585         cl.mu.Lock()
1586         defer cl.mu.Unlock()
1587         p := t.Pieces[index]
1588         for p.Hashing {
1589                 cl.event.Wait()
1590         }
1591         if t.isClosed() {
1592                 return
1593         }
1594         p.Hashing = true
1595         p.QueuedForHash = false
1596         cl.mu.Unlock()
1597         sum := t.HashPiece(index)
1598         cl.mu.Lock()
1599         select {
1600         case <-t.closing:
1601                 return
1602         default:
1603         }
1604         p.Hashing = false
1605         cl.pieceHashed(t, index, sum == p.Hash)
1606 }
1607
1608 func (me *Client) Torrents() (ret []*torrent) {
1609         me.mu.Lock()
1610         for _, t := range me.torrents {
1611                 ret = append(ret, t)
1612         }
1613         me.mu.Unlock()
1614         return
1615 }