]> Sergey Matveev's repositories - btrtrc.git/blob - client.go
Actually announce to DHT, don't just get peers
[btrtrc.git] / client.go
1 /*
2 Package torrent implements a torrent client.
3
4 Simple example:
5
6         c := &Client{}
7         c.Start()
8         defer c.Stop()
9         if err := c.AddTorrent(externalMetaInfoPackageSux); err != nil {
10                 return fmt.Errors("error adding torrent: %s", err)
11         }
12         c.WaitAll()
13         log.Print("erhmahgerd, torrent downloaded")
14
15 */
16 package torrent
17
18 import (
19         "bufio"
20         "bytes"
21         "container/heap"
22         "crypto/rand"
23         "crypto/sha1"
24         "errors"
25         "expvar"
26         "fmt"
27         "io"
28         "log"
29         mathRand "math/rand"
30         "net"
31         "os"
32         "strings"
33         "sync"
34         "syscall"
35         "time"
36
37         "bitbucket.org/anacrolix/go.torrent/util/levelmu"
38
39         "bitbucket.org/anacrolix/go.torrent/dht"
40         . "bitbucket.org/anacrolix/go.torrent/util"
41
42         "github.com/anacrolix/libtorgo/metainfo"
43         "github.com/nsf/libtorgo/bencode"
44
45         pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
46         "bitbucket.org/anacrolix/go.torrent/tracker"
47         _ "bitbucket.org/anacrolix/go.torrent/tracker/udp"
48 )
49
50 var (
51         unusedDownloadedChunksCount = expvar.NewInt("unusedDownloadedChunksCount")
52         chunksDownloadedCount       = expvar.NewInt("chunksDownloadedCount")
53         peersFoundByDHT             = expvar.NewInt("peersFoundByDHT")
54         peersFoundByPEX             = expvar.NewInt("peersFoundByPEX")
55         uploadChunksPosted          = expvar.NewInt("uploadChunksPosted")
56         unexpectedCancels           = expvar.NewInt("unexpectedCancels")
57         postedCancels               = expvar.NewInt("postedCancels")
58         duplicateConnsAvoided       = expvar.NewInt("duplicateConnsAvoided")
59         failedPieceHashes           = expvar.NewInt("failedPieceHashes")
60 )
61
62 const (
63         // Justification for set bits follows.
64         //
65         // Extension protocol: http://www.bittorrent.org/beps/bep_0010.html
66         // DHT: http://www.bittorrent.org/beps/bep_0005.html
67         extensionBytes = "\x00\x00\x00\x00\x00\x10\x00\x01"
68
69         socketsPerTorrent = 40
70 )
71
72 // Currently doesn't really queue, but should in the future.
73 func (cl *Client) queuePieceCheck(t *torrent, pieceIndex pp.Integer) {
74         piece := t.Pieces[pieceIndex]
75         if piece.QueuedForHash {
76                 return
77         }
78         piece.QueuedForHash = true
79         go cl.verifyPiece(t, pieceIndex)
80 }
81
82 func (cl *Client) queueFirstHash(t *torrent, piece int) {
83         p := t.Pieces[piece]
84         if p.EverHashed || p.Hashing || p.QueuedForHash {
85                 return
86         }
87         cl.queuePieceCheck(t, pp.Integer(piece))
88 }
89
90 // Queues the torrent data for the given region for download. The beginning of
91 // the region is given highest priority to allow a subsequent read at the same
92 // offset to return data ASAP.
93 func (me *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) error {
94         me.mu.Lock()
95         defer me.mu.Unlock()
96         t := me.torrent(ih)
97         if t == nil {
98                 return errors.New("no such active torrent")
99         }
100         if !t.haveInfo() {
101                 return errors.New("missing metadata")
102         }
103         me.downloadStrategy.TorrentPrioritize(t, off, len_)
104         for _, cn := range t.Conns {
105                 me.replenishConnRequests(t, cn)
106         }
107         return nil
108 }
109
110 type dataWait struct {
111         offset int64
112         ready  chan struct{}
113 }
114
115 type Client struct {
116         noUpload         bool
117         dataDir          string
118         halfOpenLimit    int
119         peerID           [20]byte
120         listener         net.Listener
121         disableTrackers  bool
122         downloadStrategy DownloadStrategy
123         dHT              *dht.Server
124
125         mu    levelmu.LevelMutex
126         event sync.Cond
127         quit  chan struct{}
128
129         halfOpen    int
130         handshaking int
131         torrents    map[InfoHash]*torrent
132
133         dataWaits map[*torrent][]dataWait
134 }
135
136 func (me *Client) ListenAddr() (addr net.Addr) {
137         for _, l := range me.listeners {
138                 if addr != nil && l.Addr().String() != addr.String() {
139                         panic("listeners exist on different addresses")
140                 }
141                 addr = l.Addr()
142         }
143         return
144 }
145
146 func (cl *Client) WriteStatus(w io.Writer) {
147         cl.mu.LevelLock(1)
148         defer cl.mu.Unlock()
149         fmt.Fprintf(w, "Listening on %s\n", cl.ListenAddr())
150         fmt.Fprintf(w, "Peer ID: %q\n", cl.peerID)
151         fmt.Fprintf(w, "Half open outgoing connections: %d\n", cl.halfOpen)
152         fmt.Fprintf(w, "Handshaking: %d\n", cl.handshaking)
153         if cl.dHT != nil {
154                 fmt.Fprintf(w, "DHT nodes: %d\n", cl.dHT.NumNodes())
155                 fmt.Fprintf(w, "DHT Server ID: %x\n", cl.dHT.IDString())
156         }
157         cl.downloadStrategy.WriteStatus(w)
158         fmt.Fprintln(w)
159         for _, t := range cl.torrents {
160                 fmt.Fprintf(w, "%s: %f%%\n", t.Name(), func() float32 {
161                         if !t.haveInfo() {
162                                 return 0
163                         } else {
164                                 return 100 * (1 - float32(t.BytesLeft())/float32(t.Length()))
165                         }
166                 }())
167                 fmt.Fprint(w, "Blocked reads:")
168                 for _, dw := range cl.dataWaits[t] {
169                         fmt.Fprintf(w, " %d", dw.offset)
170                 }
171                 fmt.Fprintln(w)
172                 t.WriteStatus(w)
173                 fmt.Fprintln(w)
174         }
175 }
176
177 // Read torrent data at the given offset. Returns ErrDataNotReady if the data
178 // isn't available.
179 func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err error) {
180         cl.mu.LevelLock(1)
181         defer cl.mu.Unlock()
182         t := cl.torrent(ih)
183         if t == nil {
184                 err = errors.New("unknown torrent")
185                 return
186         }
187         index := pp.Integer(off / int64(t.UsualPieceSize()))
188         // Reading outside the bounds of a file is an error.
189         if index < 0 {
190                 err = os.ErrInvalid
191                 return
192         }
193         if int(index) >= len(t.Pieces) {
194                 err = io.EOF
195                 return
196         }
197         piece := t.Pieces[index]
198         pieceOff := pp.Integer(off % int64(t.UsualPieceSize()))
199         pieceLeft := int(t.PieceLength(index) - pieceOff)
200         if pieceLeft <= 0 {
201                 err = io.EOF
202                 return
203         }
204         if len(p) > pieceLeft {
205                 p = p[:pieceLeft]
206         }
207         for cs, _ := range piece.PendingChunkSpecs {
208                 chunkOff := int64(pieceOff) - int64(cs.Begin)
209                 if chunkOff >= int64(t.PieceLength(index)) {
210                         panic(chunkOff)
211                 }
212                 if 0 <= chunkOff && chunkOff < int64(cs.Length) {
213                         // read begins in a pending chunk
214                         err = ErrDataNotReady
215                         return
216                 }
217                 // pending chunk caps available data
218                 if chunkOff < 0 && int64(len(p)) > -chunkOff {
219                         p = p[:-chunkOff]
220                 }
221         }
222         if len(p) == 0 {
223                 panic(len(p))
224         }
225         return t.Data.ReadAt(p, off)
226 }
227
228 func NewClient(cfg *Config) (cl *Client, err error) {
229         if cfg == nil {
230                 cfg = &Config{}
231         }
232
233         cl = &Client{
234                 noUpload:         cfg.NoUpload,
235                 disableTrackers:  cfg.DisableTrackers,
236                 downloadStrategy: cfg.DownloadStrategy,
237                 halfOpenLimit:    100,
238                 dataDir:          cfg.DataDir,
239
240                 quit:     make(chan struct{}),
241                 torrents: make(map[InfoHash]*torrent),
242
243                 dataWaits: make(map[*torrent][]dataWait),
244         }
245         cl.event.L = &cl.mu
246         cl.mu.Init(2)
247
248         o := copy(cl.peerID[:], BEP20)
249         _, err = rand.Read(cl.peerID[o:])
250         if err != nil {
251                 panic("error generating peer id")
252         }
253
254         if cl.downloadStrategy == nil {
255                 cl.downloadStrategy = &DefaultDownloadStrategy{}
256         }
257
258         cl.listener, err = net.Listen("tcp", cfg.ListenAddr)
259         if err != nil {
260                 return
261         }
262         if cl.listener != nil {
263                 go cl.acceptConnections()
264         }
265
266         if !cfg.NoDHT {
267                 cl.dHT, err = dht.NewServer(&dht.ServerConfig{
268                         Addr: cfg.ListenAddr,
269                 })
270                 if err != nil {
271                         return
272                 }
273         }
274
275         return
276 }
277
278 func (cl *Client) stopped() bool {
279         select {
280         case <-cl.quit:
281                 return true
282         default:
283                 return false
284         }
285 }
286
287 // Stops the client. All connections to peers are closed and all activity will
288 // come to a halt.
289 func (me *Client) Stop() {
290         me.mu.Lock()
291         close(me.quit)
292         me.event.Broadcast()
293         for _, t := range me.torrents {
294                 t.Close()
295         }
296         me.mu.Unlock()
297 }
298
299 func (cl *Client) acceptConnections() {
300         for {
301                 // We accept all connections immediately, because we don't what
302                 // torrent they're for.
303                 conn, err := cl.listener.Accept()
304                 select {
305                 case <-cl.quit:
306                         if conn != nil {
307                                 conn.Close()
308                         }
309                         return
310                 default:
311                 }
312                 if err != nil {
313                         log.Print(err)
314                         return
315                 }
316                 go func() {
317                         if err := cl.runConnection(conn, nil, peerSourceIncoming); err != nil {
318                                 log.Print(err)
319                         }
320                 }()
321         }
322 }
323
324 func (me *Client) torrent(ih InfoHash) *torrent {
325         for _, t := range me.torrents {
326                 if t.InfoHash == ih {
327                         return t
328                 }
329         }
330         return nil
331 }
332
333 // Start the process of connecting to the given peer for the given torrent if
334 // appropriate.
335 func (me *Client) initiateConn(peer Peer, torrent *torrent) {
336         if peer.Id == me.peerID {
337                 return
338         }
339         addr := &net.TCPAddr{
340                 IP:   peer.IP,
341                 Port: peer.Port,
342         }
343         // Don't connect to the same address twice for the same torrent.
344         for _, c := range torrent.Conns {
345                 if c.Socket.RemoteAddr().String() == addr.String() {
346                         duplicateConnsAvoided.Add(1)
347                         return
348                 }
349         }
350         me.halfOpen++
351         go func() {
352                 // Binding to the listener address and dialing via net.Dialer gives
353                 // "address in use" error. It seems it's not possible to dial out from
354                 // this address so that peers associate our local address with our
355                 // listen address.
356                 conn, err := net.DialTimeout(addr.Network(), addr.String(), dialTimeout)
357
358                 // Whether or not the connection attempt succeeds, the half open
359                 // counter should be decremented, and new connection attempts made.
360                 go func() {
361                         me.mu.Lock()
362                         defer me.mu.Unlock()
363                         if me.halfOpen == 0 {
364                                 panic("assert")
365                         }
366                         me.halfOpen--
367                         me.openNewConns()
368                 }()
369
370                 if netOpErr, ok := err.(*net.OpError); ok {
371                         if netOpErr.Timeout() {
372                                 return
373                         }
374                         switch netOpErr.Err {
375                         case syscall.ECONNREFUSED, syscall.EHOSTUNREACH:
376                                 return
377                         }
378                 }
379                 if err != nil {
380                         log.Printf("error connecting to peer: %s %#v", err, err)
381                         return
382                 }
383                 // log.Printf("connected to %s", conn.RemoteAddr())
384                 err = me.runConnection(conn, torrent, peer.Source)
385                 if err != nil {
386                         log.Print(err)
387                 }
388         }()
389 }
390
391 // The port number for incoming peer connections. 0 if the client isn't
392 // listening.
393 func (cl *Client) incomingPeerPort() int {
394         listenAddr := cl.ListenAddr()
395         if listenAddr == nil {
396                 return 0
397         }
398         return addrPort(listenAddr)
399 }
400
401 // Convert a net.Addr to its compact IP representation. Either 4 or 16 bytes
402 // per "yourip" field of http://www.bittorrent.org/beps/bep_0010.html.
403 func addrCompactIP(addr net.Addr) (string, error) {
404         host, _, err := net.SplitHostPort(addr.String())
405         if err != nil {
406                 return "", err
407         }
408         ip := net.ParseIP(host)
409         if v4 := ip.To4(); v4 != nil {
410                 if len(v4) != 4 {
411                         panic(v4)
412                 }
413                 return string(v4), nil
414         }
415         return string(ip.To16()), nil
416 }
417
418 func handshakeWriter(w io.WriteCloser, bb <-chan []byte, done chan<- error) {
419         var err error
420         for b := range bb {
421                 _, err = w.Write(b)
422                 if err != nil {
423                         w.Close()
424                         break
425                 }
426         }
427         done <- err
428 }
429
430 type peerExtensionBytes [8]byte
431 type peerID [20]byte
432
433 type handshakeResult struct {
434         peerExtensionBytes
435         peerID
436         InfoHash
437 }
438
439 func handshake(sock io.ReadWriteCloser, ih *InfoHash, peerID [20]byte) (res handshakeResult, ok bool, err error) {
440         // Bytes to be sent to the peer. Should never block the sender.
441         postCh := make(chan []byte, 4)
442         // A single error value sent when the writer completes.
443         writeDone := make(chan error, 1)
444         // Performs writes to the socket and ensures posts don't block.
445         go handshakeWriter(sock, postCh, writeDone)
446
447         defer func() {
448                 close(postCh) // Done writing.
449                 if !ok {
450                         return
451                 }
452                 if err != nil {
453                         panic(err)
454                 }
455                 // Wait until writes complete before returning from handshake.
456                 err = <-writeDone
457                 if err != nil {
458                         err = fmt.Errorf("error writing during handshake: %s", err)
459                 }
460         }()
461
462         post := func(bb []byte) {
463                 select {
464                 case postCh <- bb:
465                 default:
466                         panic("mustn't block while posting")
467                 }
468         }
469
470         post([]byte(pp.Protocol))
471         post([]byte(extensionBytes))
472         if ih != nil { // We already know what we want.
473                 post(ih[:])
474                 post(peerID[:])
475         }
476         var b [68]byte
477         _, err = io.ReadFull(sock, b[:68])
478         if err != nil {
479                 err = nil
480                 return
481         }
482         if string(b[:20]) != pp.Protocol {
483                 return
484         }
485         CopyExact(&res.peerExtensionBytes, b[20:28])
486         CopyExact(&res.InfoHash, b[28:48])
487         CopyExact(&res.peerID, b[48:68])
488
489         if ih == nil { // We were waiting for the peer to tell us what they wanted.
490                 post(res.InfoHash[:])
491                 post(peerID[:])
492         }
493
494         ok = true
495         return
496 }
497
498 type peerConn struct {
499         net.Conn
500 }
501
502 func (pc peerConn) Read(b []byte) (n int, err error) {
503         err = pc.Conn.SetReadDeadline(time.Now().Add(150 * time.Second))
504         if err != nil {
505                 return
506         }
507         n, err = pc.Conn.Read(b)
508         if err != nil {
509                 if opError, ok := err.(*net.OpError); ok && opError.Op == "read" && opError.Err == syscall.ECONNRESET {
510                         err = io.EOF
511                 } else if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
512                         if n != 0 {
513                                 panic(n)
514                         }
515                         err = io.EOF
516                 }
517         }
518         return
519 }
520
521 func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerSource) (err error) {
522         if tcpConn, ok := sock.(*net.TCPConn); ok {
523                 tcpConn.SetLinger(0)
524         }
525         defer sock.Close()
526         me.mu.Lock()
527         me.handshaking++
528         me.mu.Unlock()
529         // One minute to complete handshake.
530         sock.SetDeadline(time.Now().Add(time.Minute))
531         hsRes, ok, err := handshake(sock, func() *InfoHash {
532                 if torrent == nil {
533                         return nil
534                 } else {
535                         return &torrent.InfoHash
536                 }
537         }(), me.peerID)
538         me.mu.Lock()
539         defer me.mu.Unlock()
540         if me.handshaking == 0 {
541                 panic("handshake count invariant is broken")
542         }
543         me.handshaking--
544         if err != nil {
545                 err = fmt.Errorf("error during handshake: %s", err)
546                 return
547         }
548         if !ok {
549                 return
550         }
551         torrent = me.torrent(hsRes.InfoHash)
552         if torrent == nil {
553                 return
554         }
555         sock.SetWriteDeadline(time.Time{})
556         sock = peerConn{sock}
557         conn := newConnection(sock, hsRes.peerExtensionBytes, hsRes.peerID)
558         defer conn.Close()
559         conn.Discovery = discovery
560         if !me.addConnection(torrent, conn) {
561                 return
562         }
563         if conn.PeerExtensionBytes[5]&0x10 != 0 {
564                 conn.Post(pp.Message{
565                         Type:       pp.Extended,
566                         ExtendedID: pp.HandshakeExtendedID,
567                         ExtendedPayload: func() []byte {
568                                 d := map[string]interface{}{
569                                         "m": map[string]int{
570                                                 "ut_metadata": 1,
571                                                 "ut_pex":      2,
572                                         },
573                                         "v": "go.torrent dev 20140825", // Just the date
574                                         // No upload queue is implemented yet.
575                                         "reqq": func() int {
576                                                 if me.noUpload {
577                                                         // No need to look strange if it costs us nothing.
578                                                         return 250
579                                                 } else {
580                                                         return 1
581                                                 }
582                                         }(),
583                                 }
584                                 if torrent.metadataSizeKnown() {
585                                         d["metadata_size"] = torrent.metadataSize()
586                                 }
587                                 if p := me.incomingPeerPort(); p != 0 {
588                                         d["p"] = p
589                                 }
590                                 yourip, err := addrCompactIP(conn.Socket.RemoteAddr())
591                                 if err != nil {
592                                         log.Printf("error calculating yourip field value in extension handshake: %s", err)
593                                 } else {
594                                         d["yourip"] = yourip
595                                 }
596                                 // log.Printf("sending %v", d)
597                                 b, err := bencode.Marshal(d)
598                                 if err != nil {
599                                         panic(err)
600                                 }
601                                 return b
602                         }(),
603                 })
604         }
605         if torrent.haveAnyPieces() {
606                 conn.Post(pp.Message{
607                         Type:     pp.Bitfield,
608                         Bitfield: torrent.bitfield(),
609                 })
610         }
611         if conn.PeerExtensionBytes[7]&0x01 != 0 && me.dHT != nil {
612                 addr, _ := me.dHT.LocalAddr().(*net.UDPAddr)
613                 conn.Post(pp.Message{
614                         Type: pp.Port,
615                         Port: uint16(addr.Port),
616                 })
617         }
618         err = me.connectionLoop(torrent, conn)
619         if err != nil {
620                 err = fmt.Errorf("during Connection loop with peer %q: %s", conn.PeerID, err)
621         }
622         me.dropConnection(torrent, conn)
623         return
624 }
625
626 func (me *Client) peerGotPiece(t *torrent, c *connection, piece int) {
627         for piece >= len(c.PeerPieces) {
628                 c.PeerPieces = append(c.PeerPieces, false)
629         }
630         c.PeerPieces[piece] = true
631         if !t.havePiece(piece) {
632                 me.replenishConnRequests(t, c)
633         }
634 }
635
636 func (me *Client) peerUnchoked(torrent *torrent, conn *connection) {
637         me.replenishConnRequests(torrent, conn)
638 }
639
640 func (cl *Client) connCancel(t *torrent, cn *connection, r request) (ok bool) {
641         ok = cn.Cancel(r)
642         if ok {
643                 postedCancels.Add(1)
644                 cl.downloadStrategy.DeleteRequest(t, r)
645         }
646         return
647 }
648
649 func (cl *Client) connDeleteRequest(t *torrent, cn *connection, r request) {
650         if !cn.RequestPending(r) {
651                 return
652         }
653         cl.downloadStrategy.DeleteRequest(t, r)
654         delete(cn.Requests, r)
655 }
656
657 func (cl *Client) requestPendingMetadata(t *torrent, c *connection) {
658         if t.haveInfo() {
659                 return
660         }
661         var pending []int
662         for index := 0; index < t.MetadataPieceCount(); index++ {
663                 if !t.HaveMetadataPiece(index) {
664                         pending = append(pending, index)
665                 }
666         }
667         for _, i := range mathRand.Perm(len(pending)) {
668                 c.Post(pp.Message{
669                         Type:       pp.Extended,
670                         ExtendedID: byte(c.PeerExtensionIDs["ut_metadata"]),
671                         ExtendedPayload: func() []byte {
672                                 b, err := bencode.Marshal(map[string]int{
673                                         "msg_type": 0,
674                                         "piece":    pending[i],
675                                 })
676                                 if err != nil {
677                                         panic(err)
678                                 }
679                                 return b
680                         }(),
681                 })
682         }
683 }
684
685 func (cl *Client) completedMetadata(t *torrent) {
686         h := sha1.New()
687         h.Write(t.MetaData)
688         var ih InfoHash
689         CopyExact(&ih, h.Sum(nil))
690         if ih != t.InfoHash {
691                 log.Print("bad metadata")
692                 t.InvalidateMetadata()
693                 return
694         }
695         var info metainfo.Info
696         err := bencode.Unmarshal(t.MetaData, &info)
697         if err != nil {
698                 log.Printf("error unmarshalling metadata: %s", err)
699                 t.InvalidateMetadata()
700                 return
701         }
702         // TODO(anacrolix): If this fails, I think something harsher should be
703         // done.
704         err = cl.setMetaData(t, info, t.MetaData)
705         if err != nil {
706                 log.Printf("error setting metadata: %s", err)
707                 t.InvalidateMetadata()
708                 return
709         }
710         log.Printf("%s: got metadata from peers", t)
711 }
712
713 // Process incoming ut_metadata message.
714 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *torrent, c *connection) (err error) {
715         var d map[string]int
716         err = bencode.Unmarshal(payload, &d)
717         if err != nil {
718                 err = fmt.Errorf("error unmarshalling payload: %s: %q", err, payload)
719                 return
720         }
721         msgType, ok := d["msg_type"]
722         if !ok {
723                 err = errors.New("missing msg_type field")
724                 return
725         }
726         piece := d["piece"]
727         switch msgType {
728         case pp.DataMetadataExtensionMsgType:
729                 if t.haveInfo() {
730                         break
731                 }
732                 t.SaveMetadataPiece(piece, payload[len(payload)-metadataPieceSize(d["total_size"], piece):])
733                 if !t.HaveAllMetadataPieces() {
734                         break
735                 }
736                 cl.completedMetadata(t)
737         case pp.RequestMetadataExtensionMsgType:
738                 if !t.HaveMetadataPiece(piece) {
739                         c.Post(t.NewMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
740                         break
741                 }
742                 start := (1 << 14) * piece
743                 c.Post(t.NewMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.MetaData[start:start+t.metadataPieceSize(piece)]))
744         case pp.RejectMetadataExtensionMsgType:
745         default:
746                 err = errors.New("unknown msg_type value")
747         }
748         return
749 }
750
751 type peerExchangeMessage struct {
752         Added      CompactPeers   `bencode:"added"`
753         AddedFlags []byte         `bencode:"added.f"`
754         Dropped    []tracker.Peer `bencode:"dropped"`
755 }
756
757 // Extracts the port as an integer from an address string.
758 func addrPort(addr net.Addr) int {
759         _, port, err := net.SplitHostPort(addr.String())
760         if err != nil {
761                 panic(err)
762         }
763         i64, err := strconv.ParseInt(port, 0, 0)
764         if err != nil {
765                 panic(err)
766         }
767         return int(i64)
768 }
769
770 // Processes incoming bittorrent messages. The client lock is held upon entry
771 // and exit.
772 func (me *Client) connectionLoop(t *torrent, c *connection) error {
773         decoder := pp.Decoder{
774                 R:         bufio.NewReaderSize(c.Socket, 20*1024),
775                 MaxLength: 256 * 1024,
776         }
777         for {
778                 me.mu.Unlock()
779                 var msg pp.Message
780                 err := decoder.Decode(&msg)
781                 me.mu.Lock()
782                 c.lastMessageReceived = time.Now()
783                 select {
784                 case <-c.closing:
785                         return nil
786                 default:
787                 }
788                 if err != nil {
789                         if me.stopped() || err == io.EOF {
790                                 return nil
791                         }
792                         return err
793                 }
794                 if msg.Keepalive {
795                         continue
796                 }
797                 switch msg.Type {
798                 case pp.Choke:
799                         c.PeerChoked = true
800                         for r := range c.Requests {
801                                 me.connDeleteRequest(t, c, r)
802                         }
803                 case pp.Unchoke:
804                         c.PeerChoked = false
805                         me.peerUnchoked(t, c)
806                 case pp.Interested:
807                         c.PeerInterested = true
808                         // TODO: This should be done from a dedicated unchoking routine.
809                         if me.noUpload {
810                                 break
811                         }
812                         c.Unchoke()
813                 case pp.NotInterested:
814                         c.PeerInterested = false
815                         c.Choke()
816                 case pp.Have:
817                         me.peerGotPiece(t, c, int(msg.Index))
818                 case pp.Request:
819                         if me.noUpload {
820                                 break
821                         }
822                         if c.PeerRequests == nil {
823                                 c.PeerRequests = make(map[request]struct{}, maxRequests)
824                         }
825                         request := newRequest(msg.Index, msg.Begin, msg.Length)
826                         // TODO: Requests should be satisfied from a dedicated upload routine.
827                         // c.PeerRequests[request] = struct{}{}
828                         p := make([]byte, msg.Length)
829                         n, err := t.Data.ReadAt(p, int64(t.PieceLength(0))*int64(msg.Index)+int64(msg.Begin))
830                         if err != nil {
831                                 return fmt.Errorf("reading t data to serve request %q: %s", request, err)
832                         }
833                         if n != int(msg.Length) {
834                                 return fmt.Errorf("bad request: %v", msg)
835                         }
836                         c.Post(pp.Message{
837                                 Type:  pp.Piece,
838                                 Index: msg.Index,
839                                 Begin: msg.Begin,
840                                 Piece: p,
841                         })
842                         uploadChunksPosted.Add(1)
843                 case pp.Cancel:
844                         req := newRequest(msg.Index, msg.Begin, msg.Length)
845                         if !c.PeerCancel(req) {
846                                 unexpectedCancels.Add(1)
847                         }
848                 case pp.Bitfield:
849                         if c.PeerPieces != nil {
850                                 err = errors.New("received unexpected bitfield")
851                                 break
852                         }
853                         if t.haveInfo() {
854                                 if len(msg.Bitfield) < t.NumPieces() {
855                                         err = errors.New("received invalid bitfield")
856                                         break
857                                 }
858                                 msg.Bitfield = msg.Bitfield[:t.NumPieces()]
859                         }
860                         c.PeerPieces = msg.Bitfield
861                         for index, has := range c.PeerPieces {
862                                 if has {
863                                         me.peerGotPiece(t, c, index)
864                                 }
865                         }
866                 case pp.Piece:
867                         err = me.downloadedChunk(t, c, &msg)
868                 case pp.Extended:
869                         switch msg.ExtendedID {
870                         case pp.HandshakeExtendedID:
871                                 // TODO: Create a bencode struct for this.
872                                 var d map[string]interface{}
873                                 err = bencode.Unmarshal(msg.ExtendedPayload, &d)
874                                 if err != nil {
875                                         err = fmt.Errorf("error decoding extended message payload: %s", err)
876                                         break
877                                 }
878                                 // log.Printf("got handshake: %v", d)
879                                 if reqq, ok := d["reqq"]; ok {
880                                         if i, ok := reqq.(int64); ok {
881                                                 c.PeerMaxRequests = int(i)
882                                         }
883                                 }
884                                 if v, ok := d["v"]; ok {
885                                         c.PeerClientName = v.(string)
886                                 }
887                                 m, ok := d["m"]
888                                 if !ok {
889                                         err = errors.New("handshake missing m item")
890                                         break
891                                 }
892                                 mTyped, ok := m.(map[string]interface{})
893                                 if !ok {
894                                         err = errors.New("handshake m value is not dict")
895                                         break
896                                 }
897                                 if c.PeerExtensionIDs == nil {
898                                         c.PeerExtensionIDs = make(map[string]int64, len(mTyped))
899                                 }
900                                 for name, v := range mTyped {
901                                         id, ok := v.(int64)
902                                         if !ok {
903                                                 log.Printf("bad handshake m item extension ID type: %T", v)
904                                                 continue
905                                         }
906                                         if id == 0 {
907                                                 delete(c.PeerExtensionIDs, name)
908                                         } else {
909                                                 c.PeerExtensionIDs[name] = id
910                                         }
911                                 }
912                                 metadata_sizeUntyped, ok := d["metadata_size"]
913                                 if ok {
914                                         metadata_size, ok := metadata_sizeUntyped.(int64)
915                                         if !ok {
916                                                 log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
917                                         } else {
918                                                 t.SetMetadataSize(metadata_size)
919                                         }
920                                 }
921                                 if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
922                                         me.requestPendingMetadata(t, c)
923                                 }
924                         case 1:
925                                 err = me.gotMetadataExtensionMsg(msg.ExtendedPayload, t, c)
926                                 if err != nil {
927                                         err = fmt.Errorf("error handling metadata extension message: %s", err)
928                                 }
929                         case 2:
930                                 var pexMsg peerExchangeMessage
931                                 err := bencode.Unmarshal(msg.ExtendedPayload, &pexMsg)
932                                 if err != nil {
933                                         err = fmt.Errorf("error unmarshalling PEX message: %s", err)
934                                         break
935                                 }
936                                 go func() {
937                                         err := me.AddPeers(t.InfoHash, func() (ret []Peer) {
938                                                 for _, cp := range pexMsg.Added {
939                                                         p := Peer{
940                                                                 IP:     make([]byte, 4),
941                                                                 Port:   int(cp.Port),
942                                                                 Source: peerSourcePEX,
943                                                         }
944                                                         if n := copy(p.IP, cp.IP[:]); n != 4 {
945                                                                 panic(n)
946                                                         }
947                                                         ret = append(ret, p)
948                                                 }
949                                                 return
950                                         }())
951                                         if err != nil {
952                                                 log.Printf("error adding PEX peers: %s", err)
953                                                 return
954                                         }
955                                         peersFoundByPEX.Add(int64(len(pexMsg.Added)))
956                                 }()
957                         default:
958                                 err = fmt.Errorf("unexpected extended message ID: %v", msg.ExtendedID)
959                         }
960                         if err != nil {
961                                 // That client uses its own extension IDs for outgoing message
962                                 // types, which is incorrect.
963                                 if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) ||
964                                         strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
965                                         return nil
966                                 }
967                                 // log.Printf("peer extension map: %#v", c.PeerExtensionIDs)
968                         }
969                 case pp.Port:
970                         if me.dHT == nil {
971                                 break
972                         }
973                         pingAddr, err := net.ResolveUDPAddr("", c.Socket.RemoteAddr().String())
974                         if err != nil {
975                                 panic(err)
976                         }
977                         if msg.Port != 0 {
978                                 pingAddr.Port = int(msg.Port)
979                         }
980                         _, err = me.dHT.Ping(pingAddr)
981                 default:
982                         err = fmt.Errorf("received unknown message type: %#v", msg.Type)
983                 }
984                 if err != nil {
985                         return err
986                 }
987         }
988 }
989
990 func (me *Client) dropConnection(torrent *torrent, conn *connection) {
991         for r := range conn.Requests {
992                 me.connDeleteRequest(torrent, conn, r)
993         }
994         conn.Close()
995         for i0, c := range torrent.Conns {
996                 if c != conn {
997                         continue
998                 }
999                 i1 := len(torrent.Conns) - 1
1000                 if i0 != i1 {
1001                         torrent.Conns[i0] = torrent.Conns[i1]
1002                 }
1003                 torrent.Conns = torrent.Conns[:i1]
1004                 return
1005         }
1006         panic("connection not found")
1007 }
1008
1009 func (me *Client) addConnection(t *torrent, c *connection) bool {
1010         if me.stopped() {
1011                 return false
1012         }
1013         select {
1014         case <-t.ceasingNetworking:
1015                 return false
1016         default:
1017         }
1018         for _, c0 := range t.Conns {
1019                 if c.PeerID == c0.PeerID {
1020                         // Already connected to a client with that ID.
1021                         return false
1022                 }
1023         }
1024         t.Conns = append(t.Conns, c)
1025         if len(t.Conns) > socketsPerTorrent {
1026                 wcs := t.worstConnsHeap()
1027                 heap.Pop(wcs).(*connection).Close()
1028         }
1029         return true
1030 }
1031
1032 func (me *Client) openNewConns() {
1033         for _, t := range me.torrents {
1034                 select {
1035                 case <-t.ceasingNetworking:
1036                         continue
1037                 default:
1038                 }
1039                 for len(t.Peers) != 0 {
1040                         if me.halfOpen >= me.halfOpenLimit {
1041                                 return
1042                         }
1043                         if me.halfOpen+me.handshaking+len(t.Conns) >= socketsPerTorrent {
1044                                 break
1045                         }
1046                         var (
1047                                 k peersKey
1048                                 p Peer
1049                         )
1050                         for k, p = range t.Peers {
1051                                 break
1052                         }
1053                         delete(t.Peers, k)
1054                         me.initiateConn(p, t)
1055                 }
1056         }
1057 }
1058
1059 // Adds peers to the swarm for the torrent corresponding to infoHash.
1060 func (me *Client) AddPeers(infoHash InfoHash, peers []Peer) error {
1061         me.mu.Lock()
1062         defer me.mu.Unlock()
1063         t := me.torrent(infoHash)
1064         if t == nil {
1065                 return errors.New("no such torrent")
1066         }
1067         t.AddPeers(peers)
1068         me.openNewConns()
1069         return nil
1070 }
1071
1072 func (cl *Client) setMetaData(t *torrent, md metainfo.Info, bytes []byte) (err error) {
1073         err = t.setMetadata(md, cl.dataDir, bytes)
1074         if err != nil {
1075                 return
1076         }
1077         // If the client intends to upload, it needs to know what state pieces are
1078         // in.
1079         if !cl.noUpload {
1080                 // Queue all pieces for hashing. This is done sequentially to avoid
1081                 // spamming goroutines.
1082                 for _, p := range t.Pieces {
1083                         p.QueuedForHash = true
1084                 }
1085                 go func() {
1086                         for i := range t.Pieces {
1087                                 cl.verifyPiece(t, pp.Integer(i))
1088                         }
1089                 }()
1090         }
1091
1092         cl.downloadStrategy.TorrentStarted(t)
1093         select {
1094         case t.gotMetainfo <- &metainfo.MetaInfo{
1095                 Info: metainfo.InfoEx{
1096                         Info: md,
1097                 },
1098                 CreationDate: time.Now().Unix(),
1099                 Comment:      "metadata set in client",
1100                 CreatedBy:    "go.torrent",
1101                 // TODO(anacrolix): Expose trackers given when torrent added.
1102         }:
1103         default:
1104                 panic("shouldn't block")
1105         }
1106         close(t.gotMetainfo)
1107         t.gotMetainfo = nil
1108         return
1109 }
1110
1111 // Prepare a Torrent without any attachment to a Client. That means we can
1112 // initialize fields all fields that don't require the Client without locking
1113 // it.
1114 func newTorrent(ih InfoHash, announceList [][]string) (t *torrent, err error) {
1115         t = &torrent{
1116                 InfoHash: ih,
1117                 Peers:    make(map[peersKey]Peer, 2000),
1118
1119                 closing:           make(chan struct{}),
1120                 ceasingNetworking: make(chan struct{}),
1121
1122                 gotMetainfo: make(chan *metainfo.MetaInfo, 1),
1123         }
1124         t.GotMetainfo = t.gotMetainfo
1125         t.Trackers = make([][]tracker.Client, len(announceList))
1126         for tierIndex := range announceList {
1127                 tier := t.Trackers[tierIndex]
1128                 for _, url := range announceList[tierIndex] {
1129                         tr, err := tracker.New(url)
1130                         if err != nil {
1131                                 log.Print(err)
1132                                 continue
1133                         }
1134                         tier = append(tier, tr)
1135                 }
1136                 // The trackers within each tier must be shuffled before use.
1137                 // http://stackoverflow.com/a/12267471/149482
1138                 // http://www.bittorrent.org/beps/bep_0012.html#order-of-processing
1139                 for i := range tier {
1140                         j := mathRand.Intn(i + 1)
1141                         tier[i], tier[j] = tier[j], tier[i]
1142                 }
1143                 t.Trackers[tierIndex] = tier
1144         }
1145         return
1146 }
1147
1148 func (cl *Client) AddMagnet(uri string) (t *torrent, err error) {
1149         m, err := ParseMagnetURI(uri)
1150         if err != nil {
1151                 return
1152         }
1153         t, err = newTorrent(m.InfoHash, [][]string{m.Trackers})
1154         if err != nil {
1155                 return
1156         }
1157         t.DisplayName = m.DisplayName
1158         cl.mu.Lock()
1159         defer cl.mu.Unlock()
1160         err = cl.addTorrent(t)
1161         if err != nil {
1162                 t.Close()
1163         }
1164         return
1165 }
1166
1167 func (me *Client) DropTorrent(infoHash InfoHash) (err error) {
1168         me.mu.Lock()
1169         defer me.mu.Unlock()
1170         t, ok := me.torrents[infoHash]
1171         if !ok {
1172                 err = fmt.Errorf("no such torrent")
1173                 return
1174         }
1175         err = t.Close()
1176         if err != nil {
1177                 panic(err)
1178         }
1179         delete(me.torrents, infoHash)
1180         me.downloadStrategy.TorrentStopped(t)
1181         for _, dw := range me.dataWaits[t] {
1182                 close(dw.ready)
1183         }
1184         delete(me.dataWaits, t)
1185         return
1186 }
1187
1188 func (me *Client) addTorrent(t *torrent) (err error) {
1189         if _, ok := me.torrents[t.InfoHash]; ok {
1190                 err = fmt.Errorf("torrent infohash collision")
1191                 return
1192         }
1193         me.torrents[t.InfoHash] = t
1194         if !me.disableTrackers {
1195                 go me.announceTorrent(t)
1196         }
1197         if me.dHT != nil {
1198                 go me.announceTorrentDHT(t)
1199         }
1200         return
1201 }
1202
1203 // Adds the torrent to the client.
1204 func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) (err error) {
1205         var ih InfoHash
1206         CopyExact(&ih, metaInfo.Info.Hash)
1207         t, err := newTorrent(ih, metaInfo.AnnounceList)
1208         if err != nil {
1209                 return
1210         }
1211         me.mu.Lock()
1212         defer me.mu.Unlock()
1213         err = me.addTorrent(t)
1214         if err != nil {
1215                 return
1216         }
1217         err = me.setMetaData(t, metaInfo.Info.Info, metaInfo.Info.Bytes)
1218         if err != nil {
1219                 return
1220         }
1221         return
1222 }
1223
1224 func (me *Client) AddTorrentFromFile(name string) (err error) {
1225         mi, err := metainfo.LoadFromFile(name)
1226         if err != nil {
1227                 err = fmt.Errorf("error loading metainfo from file: %s", err)
1228                 return
1229         }
1230         return me.AddTorrent(mi)
1231 }
1232
1233 func (cl *Client) announceTorrentDHT(t *torrent) {
1234         for {
1235                 ps, err := cl.dHT.GetPeers(string(t.InfoHash[:]))
1236                 if err != nil {
1237                         log.Printf("error getting peers from dht: %s", err)
1238                         return
1239                 }
1240                 nextScrape := time.After(1 * time.Minute)
1241         getPeers:
1242                 for {
1243                         select {
1244                         case <-nextScrape:
1245                                 break getPeers
1246                         case cps, ok := <-ps.Values:
1247                                 if !ok {
1248                                         break getPeers
1249                                 }
1250                                 peersFoundByDHT.Add(int64(len(cps)))
1251                                 err = cl.AddPeers(t.InfoHash, func() (ret []Peer) {
1252                                         for _, cp := range cps {
1253                                                 ret = append(ret, Peer{
1254                                                         IP:     cp.IP[:],
1255                                                         Port:   int(cp.Port),
1256                                                         Source: peerSourceDHT,
1257                                                 })
1258                                         }
1259                                         return
1260                                 }())
1261                                 if err != nil {
1262                                         log.Printf("error adding peers from dht for torrent %q: %s", t, err)
1263                                         break getPeers
1264                                 }
1265                         case <-t.ceasingNetworking:
1266                                 ps.Close()
1267                                 return
1268                         }
1269                 }
1270                 ps.Close()
1271
1272                 // After a GetPeers, we can announce on the best nodes that gave us an
1273                 // announce token.
1274
1275                 port := cl.incomingPeerPort()
1276                 // If port is zero, then we're not listening, and there's nothing to
1277                 // announce.
1278                 if port != 0 {
1279                         // We can't allow the port to be implied as long as the UTP and
1280                         // DHT ports are different.
1281                         cl.dHT.AnnouncePeer(port, false, t.InfoHash.AsString())
1282                 }
1283         }
1284 }
1285
1286 func (cl *Client) announceTorrent(t *torrent) {
1287         req := tracker.AnnounceRequest{
1288                 Event:    tracker.Started,
1289                 NumWant:  -1,
1290                 Port:     int16(cl.incomingPeerPort()),
1291                 PeerId:   cl.peerID,
1292                 InfoHash: t.InfoHash,
1293         }
1294 newAnnounce:
1295         for {
1296                 select {
1297                 case <-t.ceasingNetworking:
1298                         return
1299                 default:
1300                 }
1301                 cl.mu.Lock()
1302                 req.Left = t.BytesLeft()
1303                 cl.mu.Unlock()
1304                 for _, tier := range t.Trackers {
1305                         for trIndex, tr := range tier {
1306                                 if err := tr.Connect(); err != nil {
1307                                         log.Print(err)
1308                                         continue
1309                                 }
1310                                 resp, err := tr.Announce(&req)
1311                                 if err != nil {
1312                                         log.Print(err)
1313                                         continue
1314                                 }
1315                                 var peers []Peer
1316                                 for _, peer := range resp.Peers {
1317                                         peers = append(peers, Peer{
1318                                                 IP:   peer.IP,
1319                                                 Port: peer.Port,
1320                                         })
1321                                 }
1322                                 err = cl.AddPeers(t.InfoHash, peers)
1323                                 if err != nil {
1324                                         log.Printf("error adding peers to torrent %s: %s", t, err)
1325                                 } else {
1326                                         log.Printf("%s: %d new peers from %s", t, len(peers), tr)
1327                                 }
1328                                 tier[0], tier[trIndex] = tier[trIndex], tier[0]
1329                                 time.Sleep(time.Second * time.Duration(resp.Interval))
1330                                 req.Event = tracker.None
1331                                 continue newAnnounce
1332                         }
1333                 }
1334                 time.Sleep(5 * time.Second)
1335         }
1336 }
1337
1338 func (cl *Client) allTorrentsCompleted() bool {
1339         for _, t := range cl.torrents {
1340                 if !t.haveInfo() {
1341                         return false
1342                 }
1343                 for e := t.IncompletePiecesByBytesLeft.Front(); e != nil; e = e.Next() {
1344                         i := e.Value.(int)
1345                         if t.Pieces[i].Complete() {
1346                                 continue
1347                         }
1348                         // If the piece isn't complete, make sure it's not because it's
1349                         // never been hashed.
1350                         cl.queueFirstHash(t, i)
1351                         return false
1352                 }
1353         }
1354         return true
1355 }
1356
1357 // Returns true when all torrents are completely downloaded and false if the
1358 // client is stopped before that.
1359 func (me *Client) WaitAll() bool {
1360         me.mu.Lock()
1361         defer me.mu.Unlock()
1362         for !me.allTorrentsCompleted() {
1363                 if me.stopped() {
1364                         return false
1365                 }
1366                 me.event.Wait()
1367         }
1368         return true
1369 }
1370
1371 func (cl *Client) assertRequestHeat() {
1372         dds, ok := cl.downloadStrategy.(*DefaultDownloadStrategy)
1373         if !ok {
1374                 return
1375         }
1376         for _, t := range cl.torrents {
1377                 m := make(map[request]int, 3000)
1378                 for _, cn := range t.Conns {
1379                         for r := range cn.Requests {
1380                                 m[r]++
1381                         }
1382                 }
1383                 for r, h := range dds.heat[t] {
1384                         if m[r] != h {
1385                                 panic(fmt.Sprintln(m[r], h))
1386                         }
1387                 }
1388         }
1389 }
1390
1391 func (me *Client) replenishConnRequests(t *torrent, c *connection) {
1392         if !t.haveInfo() {
1393                 return
1394         }
1395         for _, p := range me.downloadStrategy.FillRequests(t, c) {
1396                 // Make sure the state of pieces that would have been requested is
1397                 // known.
1398                 me.queueFirstHash(t, p)
1399         }
1400         //me.assertRequestHeat()
1401         if len(c.Requests) == 0 && !c.PeerChoked {
1402                 c.SetInterested(false)
1403         }
1404 }
1405
1406 // Handle a received chunk from a peer.
1407 func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) error {
1408         chunksDownloadedCount.Add(1)
1409
1410         req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
1411
1412         // Request has been satisfied.
1413         me.connDeleteRequest(t, c, req)
1414
1415         defer me.replenishConnRequests(t, c)
1416
1417         // Do we actually want this chunk?
1418         if _, ok := t.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok {
1419                 unusedDownloadedChunksCount.Add(1)
1420                 c.UnwantedChunksReceived++
1421                 return nil
1422         }
1423
1424         c.UsefulChunksReceived++
1425         c.lastUsefulChunkReceived = time.Now()
1426
1427         // Write the chunk out.
1428         err := t.WriteChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
1429         if err != nil {
1430                 return fmt.Errorf("error writing chunk: %s", err)
1431         }
1432
1433         // Record that we have the chunk.
1434         delete(t.Pieces[req.Index].PendingChunkSpecs, req.chunkSpec)
1435         me.dataReady(t, req)
1436         if len(t.Pieces[req.Index].PendingChunkSpecs) == 0 {
1437                 me.queuePieceCheck(t, req.Index)
1438         }
1439         t.PieceBytesLeftChanged(int(req.Index))
1440
1441         // Unprioritize the chunk.
1442         me.downloadStrategy.TorrentGotChunk(t, req)
1443
1444         // Cancel pending requests for this chunk.
1445         for _, c := range t.Conns {
1446                 if me.connCancel(t, c, req) {
1447                         me.replenishConnRequests(t, c)
1448                 }
1449         }
1450
1451         me.downloadStrategy.AssertNotRequested(t, req)
1452
1453         return nil
1454 }
1455
1456 func (cl *Client) dataReady(t *torrent, r request) {
1457         dws := cl.dataWaits[t]
1458         begin := t.requestOffset(r)
1459         end := begin + int64(r.Length)
1460         for i := 0; i < len(dws); {
1461                 dw := dws[i]
1462                 if begin <= dw.offset && dw.offset < end {
1463                         close(dw.ready)
1464                         dws[i] = dws[len(dws)-1]
1465                         dws = dws[:len(dws)-1]
1466                 } else {
1467                         i++
1468                 }
1469         }
1470         cl.dataWaits[t] = dws
1471 }
1472
1473 // Returns a channel that is closed when new data has become available in the
1474 // client.
1475 func (me *Client) DataWaiter(ih InfoHash, off int64) (ret <-chan struct{}) {
1476         me.mu.Lock()
1477         defer me.mu.Unlock()
1478         ch := make(chan struct{})
1479         ret = ch
1480         t := me.torrents[ih]
1481         if t == nil {
1482                 close(ch)
1483                 return
1484         }
1485         if r, ok := t.offsetRequest(off); !ok || t.haveChunk(r) {
1486                 close(ch)
1487                 return
1488         }
1489         me.dataWaits[t] = append(me.dataWaits[t], dataWait{
1490                 offset: off,
1491                 ready:  ch,
1492         })
1493         return
1494 }
1495
1496 func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) {
1497         p := t.Pieces[piece]
1498         if p.EverHashed && !correct {
1499                 log.Printf("%s: piece %d failed hash", t, piece)
1500                 failedPieceHashes.Add(1)
1501         }
1502         p.EverHashed = true
1503         if correct {
1504                 p.PendingChunkSpecs = nil
1505                 me.downloadStrategy.TorrentGotPiece(t, int(piece))
1506                 me.dataReady(t, request{
1507                         pp.Integer(piece),
1508                         chunkSpec{0, pp.Integer(t.PieceLength(piece))},
1509                 })
1510         } else {
1511                 if len(p.PendingChunkSpecs) == 0 {
1512                         t.pendAllChunkSpecs(piece)
1513                 }
1514         }
1515         t.PieceBytesLeftChanged(int(piece))
1516         for _, conn := range t.Conns {
1517                 if correct {
1518                         conn.Post(pp.Message{
1519                                 Type:  pp.Have,
1520                                 Index: pp.Integer(piece),
1521                         })
1522                         // TODO: Cancel requests for this piece.
1523                         for r := range conn.Requests {
1524                                 if r.Index == piece {
1525                                         panic("wat")
1526                                 }
1527                         }
1528                 }
1529                 // Do this even if the piece is correct because new first-hashings may
1530                 // need to be scheduled.
1531                 if conn.PeerHasPiece(piece) {
1532                         me.replenishConnRequests(t, conn)
1533                 }
1534         }
1535         if t.haveAllPieces() && me.noUpload {
1536                 t.CeaseNetworking()
1537         }
1538         me.event.Broadcast()
1539 }
1540
1541 func (cl *Client) verifyPiece(t *torrent, index pp.Integer) {
1542         cl.mu.Lock()
1543         defer cl.mu.Unlock()
1544         p := t.Pieces[index]
1545         for p.Hashing {
1546                 cl.event.Wait()
1547         }
1548         if t.isClosed() {
1549                 return
1550         }
1551         p.Hashing = true
1552         p.QueuedForHash = false
1553         cl.mu.Unlock()
1554         sum := t.HashPiece(index)
1555         cl.mu.Lock()
1556         select {
1557         case <-t.closing:
1558                 return
1559         default:
1560         }
1561         p.Hashing = false
1562         cl.pieceHashed(t, index, sum == p.Hash)
1563 }
1564
1565 func (me *Client) Torrents() (ret []*torrent) {
1566         me.mu.Lock()
1567         for _, t := range me.torrents {
1568                 ret = append(ret, t)
1569         }
1570         me.mu.Unlock()
1571         return
1572 }