]> Sergey Matveev's repositories - btrtrc.git/blob - client.go
ecbbcc4ce55ceb5d0b8415a68e4cf9b87ca2fb6a
[btrtrc.git] / client.go
1 /*
2 Package torrent implements a torrent client.
3
4 Simple example:
5
6         c := &Client{}
7         c.Start()
8         defer c.Stop()
9         if err := c.AddTorrent(externalMetaInfoPackageSux); err != nil {
10                 return fmt.Errors("error adding torrent: %s", err)
11         }
12         c.WaitAll()
13         log.Print("erhmahgerd, torrent downloaded")
14
15 */
16 package torrent
17
18 import (
19         "bufio"
20         "crypto/rand"
21         "crypto/sha1"
22         "errors"
23         "expvar"
24         "fmt"
25         "io"
26         "log"
27         mathRand "math/rand"
28         "net"
29         "os"
30         "sync"
31         "syscall"
32         "time"
33
34         "bitbucket.org/anacrolix/go.torrent/util/levelmu"
35
36         "bitbucket.org/anacrolix/go.torrent/dht"
37         . "bitbucket.org/anacrolix/go.torrent/util"
38
39         "github.com/anacrolix/libtorgo/metainfo"
40         "github.com/nsf/libtorgo/bencode"
41
42         pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
43         "bitbucket.org/anacrolix/go.torrent/tracker"
44         _ "bitbucket.org/anacrolix/go.torrent/tracker/udp"
45 )
46
47 var (
48         unusedDownloadedChunksCount = expvar.NewInt("unusedDownloadedChunksCount")
49         chunksDownloadedCount       = expvar.NewInt("chunksDownloadedCount")
50         peersFoundByDHT             = expvar.NewInt("peersFoundByDHT")
51         peersFoundByPEX             = expvar.NewInt("peersFoundByPEX")
52         uploadChunksPosted          = expvar.NewInt("uploadChunksPosted")
53         unexpectedCancels           = expvar.NewInt("unexpectedCancels")
54         postedCancels               = expvar.NewInt("postedCancels")
55 )
56
57 const extensionBytes = "\x00\x00\x00\x00\x00\x10\x00\x00"
58
59 // Currently doesn't really queue, but should in the future.
60 func (cl *Client) queuePieceCheck(t *torrent, pieceIndex pp.Integer) {
61         piece := t.Pieces[pieceIndex]
62         if piece.QueuedForHash {
63                 return
64         }
65         piece.QueuedForHash = true
66         go cl.verifyPiece(t, pieceIndex)
67 }
68
69 // Queues the torrent data for the given region for download. The beginning of
70 // the region is given highest priority to allow a subsequent read at the same
71 // offset to return data ASAP.
72 func (me *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) error {
73         me.mu.Lock()
74         defer me.mu.Unlock()
75         t := me.torrent(ih)
76         if t == nil {
77                 return errors.New("no such active torrent")
78         }
79         if !t.haveInfo() {
80                 return errors.New("missing metadata")
81         }
82         me.downloadStrategy.TorrentPrioritize(t, off, len_)
83         for _, cn := range t.Conns {
84                 me.replenishConnRequests(t, cn)
85         }
86         return nil
87 }
88
89 type dataSpec struct {
90         InfoHash
91         request
92 }
93
94 type Client struct {
95         noUpload         bool
96         dataDir          string
97         halfOpenLimit    int
98         peerID           [20]byte
99         listener         net.Listener
100         disableTrackers  bool
101         downloadStrategy DownloadStrategy
102         dHT              *dht.Server
103
104         mu    levelmu.LevelMutex
105         event sync.Cond
106         quit  chan struct{}
107
108         halfOpen int
109         torrents map[InfoHash]*torrent
110
111         dataWaiterMutex sync.Mutex
112         dataWaiter      chan struct{}
113 }
114
115 func (me *Client) ListenAddr() net.Addr {
116         return me.listener.Addr()
117 }
118
119 func (cl *Client) WriteStatus(w io.Writer) {
120         cl.mu.Lock()
121         defer cl.mu.Unlock()
122         if cl.listener != nil {
123                 fmt.Fprintf(w, "Listening on %s\n", cl.listener.Addr())
124         } else {
125                 fmt.Fprintf(w, "No listening torrent port!\n")
126         }
127         fmt.Fprintf(w, "Peer ID: %q\n", cl.peerID)
128         fmt.Fprintf(w, "Half open outgoing connections: %d\n", cl.halfOpen)
129         if cl.dHT != nil {
130                 fmt.Fprintf(w, "DHT nodes: %d\n", cl.dHT.NumNodes())
131                 fmt.Fprintf(w, "DHT Server ID: %x\n", cl.dHT.IDString())
132         }
133         cl.downloadStrategy.WriteStatus(w)
134         fmt.Fprintln(w)
135         for _, t := range cl.torrents {
136                 fmt.Fprintf(w, "%s: %f%%\n", t.Name(), func() float32 {
137                         if !t.haveInfo() {
138                                 return 0
139                         } else {
140                                 return 100 * (1 - float32(t.BytesLeft())/float32(t.Length()))
141                         }
142                 }())
143                 t.WriteStatus(w)
144                 fmt.Fprintln(w)
145         }
146 }
147
148 // Read torrent data at the given offset. Returns ErrDataNotReady if the data
149 // isn't available.
150 func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err error) {
151         cl.mu.LevelLock(1)
152         defer cl.mu.Unlock()
153         t := cl.torrent(ih)
154         if t == nil {
155                 err = errors.New("unknown torrent")
156                 return
157         }
158         index := pp.Integer(off / int64(t.UsualPieceSize()))
159         // Reading outside the bounds of a file is an error.
160         if index < 0 {
161                 err = os.ErrInvalid
162                 return
163         }
164         if int(index) >= len(t.Pieces) {
165                 err = io.EOF
166                 return
167         }
168         piece := t.Pieces[index]
169         pieceOff := pp.Integer(off % int64(t.UsualPieceSize()))
170         pieceLeft := int(t.PieceLength(index) - pieceOff)
171         if pieceLeft <= 0 {
172                 err = io.EOF
173                 return
174         }
175         if len(p) > pieceLeft {
176                 p = p[:pieceLeft]
177         }
178         for cs, _ := range piece.PendingChunkSpecs {
179                 chunkOff := int64(pieceOff) - int64(cs.Begin)
180                 if chunkOff >= int64(t.PieceLength(index)) {
181                         panic(chunkOff)
182                 }
183                 if 0 <= chunkOff && chunkOff < int64(cs.Length) {
184                         // read begins in a pending chunk
185                         err = ErrDataNotReady
186                         return
187                 }
188                 // pending chunk caps available data
189                 if chunkOff < 0 && int64(len(p)) > -chunkOff {
190                         p = p[:-chunkOff]
191                 }
192         }
193         if len(p) == 0 {
194                 panic(len(p))
195         }
196         return t.Data.ReadAt(p, off)
197 }
198
199 func NewClient(cfg *Config) (cl *Client, err error) {
200         if cfg == nil {
201                 cfg = &Config{}
202         }
203
204         cl = &Client{
205                 noUpload:         cfg.NoUpload,
206                 disableTrackers:  cfg.DisableTrackers,
207                 downloadStrategy: cfg.DownloadStrategy,
208                 halfOpenLimit:    100,
209                 dataDir:          cfg.DataDir,
210
211                 quit:     make(chan struct{}),
212                 torrents: make(map[InfoHash]*torrent),
213         }
214         cl.event.L = &cl.mu
215         cl.mu.Init(2)
216
217         o := copy(cl.peerID[:], BEP20)
218         _, err = rand.Read(cl.peerID[o:])
219         if err != nil {
220                 panic("error generating peer id")
221         }
222
223         if cl.downloadStrategy == nil {
224                 cl.downloadStrategy = &DefaultDownloadStrategy{}
225         }
226
227         cl.listener, err = net.Listen("tcp", cfg.ListenAddr)
228         if err != nil {
229                 return
230         }
231         if cl.listener != nil {
232                 go cl.acceptConnections()
233         }
234
235         if !cfg.NoDHT {
236                 cl.dHT, err = dht.NewServer(&dht.ServerConfig{
237                         Addr: cfg.ListenAddr,
238                 })
239                 if err != nil {
240                         return
241                 }
242         }
243
244         return
245 }
246
247 func (cl *Client) stopped() bool {
248         select {
249         case <-cl.quit:
250                 return true
251         default:
252                 return false
253         }
254 }
255
256 // Stops the client. All connections to peers are closed and all activity will
257 // come to a halt.
258 func (me *Client) Stop() {
259         me.mu.Lock()
260         close(me.quit)
261         me.event.Broadcast()
262         for _, t := range me.torrents {
263                 for _, c := range t.Conns {
264                         c.Close()
265                 }
266         }
267         me.mu.Unlock()
268 }
269
270 func (cl *Client) acceptConnections() {
271         for {
272                 conn, err := cl.listener.Accept()
273                 select {
274                 case <-cl.quit:
275                         if conn != nil {
276                                 conn.Close()
277                         }
278                         return
279                 default:
280                 }
281                 if err != nil {
282                         log.Print(err)
283                         return
284                 }
285                 // log.Printf("accepted connection from %s", conn.RemoteAddr())
286                 go func() {
287                         if err := cl.runConnection(conn, nil, peerSourceIncoming); err != nil {
288                                 log.Print(err)
289                         }
290                 }()
291         }
292 }
293
294 func (me *Client) torrent(ih InfoHash) *torrent {
295         for _, t := range me.torrents {
296                 if t.InfoHash == ih {
297                         return t
298                 }
299         }
300         return nil
301 }
302
303 // Start the process of connecting to the given peer for the given torrent if
304 // appropriate.
305 func (me *Client) initiateConn(peer Peer, torrent *torrent) {
306         if peer.Id == me.peerID {
307                 return
308         }
309         me.halfOpen++
310         go func() {
311                 addr := &net.TCPAddr{
312                         IP:   peer.IP,
313                         Port: peer.Port,
314                 }
315                 // Binding to the listener address and dialing via net.Dialer gives
316                 // "address in use" error. It seems it's not possible to dial out from
317                 // this address so that peers associate our local address with our
318                 // listen address.
319                 conn, err := net.DialTimeout(addr.Network(), addr.String(), dialTimeout)
320
321                 // Whether or not the connection attempt succeeds, the half open
322                 // counter should be decremented, and new connection attempts made.
323                 go func() {
324                         me.mu.Lock()
325                         defer me.mu.Unlock()
326                         if me.halfOpen == 0 {
327                                 panic("assert")
328                         }
329                         me.halfOpen--
330                         me.openNewConns()
331                 }()
332
333                 if netOpErr, ok := err.(*net.OpError); ok {
334                         if netOpErr.Timeout() {
335                                 return
336                         }
337                         switch netOpErr.Err {
338                         case syscall.ECONNREFUSED, syscall.EHOSTUNREACH:
339                                 return
340                         }
341                 }
342                 if err != nil {
343                         log.Printf("error connecting to peer: %s %#v", err, err)
344                         return
345                 }
346                 // log.Printf("connected to %s", conn.RemoteAddr())
347                 err = me.runConnection(conn, torrent, peer.Source)
348                 if err != nil {
349                         log.Print(err)
350                 }
351         }()
352 }
353
354 func (cl *Client) incomingPeerPort() int {
355         if cl.listener == nil {
356                 return 0
357         }
358         _, p, err := net.SplitHostPort(cl.listener.Addr().String())
359         if err != nil {
360                 panic(err)
361         }
362         var i int
363         _, err = fmt.Sscanf(p, "%d", &i)
364         if err != nil {
365                 panic(err)
366         }
367         return i
368 }
369
370 // Convert a net.Addr to its compact IP representation. Either 4 or 16 bytes per "yourip" field of http://www.bittorrent.org/beps/bep_0010.html.
371 func addrCompactIP(addr net.Addr) (string, error) {
372         switch typed := addr.(type) {
373         case *net.TCPAddr:
374                 if v4 := typed.IP.To4(); v4 != nil {
375                         if len(v4) != 4 {
376                                 panic(v4)
377                         }
378                         return string(v4), nil
379                 }
380                 return string(typed.IP.To16()), nil
381         default:
382                 return "", fmt.Errorf("unhandled type: %T", addr)
383         }
384 }
385
386 func handshakeWriter(w io.WriteCloser, bb <-chan []byte, done chan<- error) {
387         var err error
388         for b := range bb {
389                 _, err = w.Write(b)
390                 if err != nil {
391                         w.Close()
392                         break
393                 }
394         }
395         done <- err
396 }
397
398 type peerExtensionBytes [8]byte
399 type peerID [20]byte
400
401 type handshakeResult struct {
402         peerExtensionBytes
403         peerID
404         InfoHash
405 }
406
407 func handshake(sock io.ReadWriteCloser, ih *InfoHash, peerID [20]byte) (res handshakeResult, ok bool, err error) {
408         // Bytes to be sent to the peer. Should never block the sender.
409         postCh := make(chan []byte, 4)
410         // A single error value sent when the writer completes.
411         writeDone := make(chan error, 1)
412         // Performs writes to the socket and ensures posts don't block.
413         go handshakeWriter(sock, postCh, writeDone)
414
415         defer func() {
416                 close(postCh) // Done writing.
417                 if !ok {
418                         return
419                 }
420                 if err != nil {
421                         panic(err)
422                 }
423                 // Wait until writes complete before returning from handshake.
424                 err = <-writeDone
425                 if err != nil {
426                         err = fmt.Errorf("error writing during handshake: %s", err)
427                 }
428         }()
429
430         post := func(bb []byte) {
431                 select {
432                 case postCh <- bb:
433                 default:
434                         panic("mustn't block while posting")
435                 }
436         }
437
438         post([]byte(pp.Protocol))
439         post([]byte(extensionBytes))
440         if ih != nil { // We already know what we want.
441                 post(ih[:])
442                 post(peerID[:])
443         }
444         var b [68]byte
445         _, err = io.ReadFull(sock, b[:68])
446         if err != nil {
447                 err = nil
448                 return
449         }
450         if string(b[:20]) != pp.Protocol {
451                 return
452         }
453         CopyExact(&res.peerExtensionBytes, b[20:28])
454         CopyExact(&res.InfoHash, b[28:48])
455         CopyExact(&res.peerID, b[48:68])
456
457         if ih == nil { // We were waiting for the peer to tell us what they wanted.
458                 post(res.InfoHash[:])
459                 post(peerID[:])
460         }
461
462         ok = true
463         return
464 }
465
466 func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerSource) (err error) {
467         defer sock.Close()
468         hsRes, ok, err := handshake(sock, func() *InfoHash {
469                 if torrent == nil {
470                         return nil
471                 } else {
472                         return &torrent.InfoHash
473                 }
474         }(), me.peerID)
475         if err != nil {
476                 err = fmt.Errorf("error during handshake: %s", err)
477                 return
478         }
479         if !ok {
480                 return
481         }
482         me.mu.Lock()
483         defer me.mu.Unlock()
484         torrent = me.torrent(hsRes.InfoHash)
485         if torrent == nil {
486                 return
487         }
488         conn := newConnection(sock, hsRes.peerExtensionBytes, hsRes.peerID)
489         defer conn.Close()
490         conn.Discovery = discovery
491         if !me.addConnection(torrent, conn) {
492                 return
493         }
494         if conn.PeerExtensionBytes[5]&0x10 != 0 {
495                 conn.Post(pp.Message{
496                         Type:       pp.Extended,
497                         ExtendedID: pp.HandshakeExtendedID,
498                         ExtendedPayload: func() []byte {
499                                 d := map[string]interface{}{
500                                         "m": map[string]int{
501                                                 "ut_metadata": 1,
502                                                 "ut_pex":      2,
503                                         },
504                                         "v":    "go.torrent dev",
505                                         "reqq": 1,
506                                 }
507                                 if torrent.metadataSizeKnown() {
508                                         d["metadata_size"] = torrent.metadataSize()
509                                 }
510                                 if p := me.incomingPeerPort(); p != 0 {
511                                         d["p"] = p
512                                 }
513                                 yourip, err := addrCompactIP(conn.Socket.RemoteAddr())
514                                 if err != nil {
515                                         log.Printf("error calculating yourip field value in extension handshake: %s", err)
516                                 } else {
517                                         d["yourip"] = yourip
518                                 }
519                                 // log.Printf("sending %v", d)
520                                 b, err := bencode.Marshal(d)
521                                 if err != nil {
522                                         panic(err)
523                                 }
524                                 return b
525                         }(),
526                 })
527         }
528         if torrent.haveAnyPieces() {
529                 conn.Post(pp.Message{
530                         Type:     pp.Bitfield,
531                         Bitfield: torrent.bitfield(),
532                 })
533         }
534         err = me.connectionLoop(torrent, conn)
535         if err != nil {
536                 err = fmt.Errorf("during Connection loop with peer %q: %s", conn.PeerID, err)
537         }
538         me.dropConnection(torrent, conn)
539         return
540 }
541
542 func (me *Client) peerGotPiece(t *torrent, c *connection, piece int) {
543         for piece >= len(c.PeerPieces) {
544                 c.PeerPieces = append(c.PeerPieces, false)
545         }
546         c.PeerPieces[piece] = true
547         if t.wantPiece(piece) {
548                 me.replenishConnRequests(t, c)
549         }
550 }
551
552 func (me *Client) peerUnchoked(torrent *torrent, conn *connection) {
553         me.replenishConnRequests(torrent, conn)
554 }
555
556 func (cl *Client) connCancel(t *torrent, cn *connection, r request) (ok bool) {
557         ok = cn.Cancel(r)
558         if ok {
559                 postedCancels.Add(1)
560                 cl.downloadStrategy.DeleteRequest(t, r)
561         }
562         return
563 }
564
565 func (cl *Client) connDeleteRequest(t *torrent, cn *connection, r request) {
566         if !cn.RequestPending(r) {
567                 return
568         }
569         cl.downloadStrategy.DeleteRequest(t, r)
570         delete(cn.Requests, r)
571 }
572
573 func (cl *Client) requestPendingMetadata(t *torrent, c *connection) {
574         if t.haveInfo() {
575                 return
576         }
577         var pending []int
578         for index := 0; index < t.MetadataPieceCount(); index++ {
579                 if !t.HaveMetadataPiece(index) {
580                         pending = append(pending, index)
581                 }
582         }
583         for _, i := range mathRand.Perm(len(pending)) {
584                 c.Post(pp.Message{
585                         Type:       pp.Extended,
586                         ExtendedID: byte(c.PeerExtensionIDs["ut_metadata"]),
587                         ExtendedPayload: func() []byte {
588                                 b, err := bencode.Marshal(map[string]int{
589                                         "msg_type": 0,
590                                         "piece":    pending[i],
591                                 })
592                                 if err != nil {
593                                         panic(err)
594                                 }
595                                 return b
596                         }(),
597                 })
598         }
599 }
600
601 func (cl *Client) completedMetadata(t *torrent) {
602         h := sha1.New()
603         h.Write(t.MetaData)
604         var ih InfoHash
605         CopyExact(&ih, h.Sum(nil))
606         if ih != t.InfoHash {
607                 log.Print("bad metadata")
608                 t.InvalidateMetadata()
609                 return
610         }
611         var info metainfo.Info
612         err := bencode.Unmarshal(t.MetaData, &info)
613         if err != nil {
614                 log.Printf("error unmarshalling metadata: %s", err)
615                 t.InvalidateMetadata()
616                 return
617         }
618         // TODO(anacrolix): If this fails, I think something harsher should be
619         // done.
620         err = cl.setMetaData(t, info, t.MetaData)
621         if err != nil {
622                 log.Printf("error setting metadata: %s", err)
623                 t.InvalidateMetadata()
624                 return
625         }
626         log.Printf("%s: got metadata from peers", t)
627 }
628
629 // Process incoming ut_metadata message.
630 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *torrent, c *connection) (err error) {
631         var d map[string]int
632         err = bencode.Unmarshal(payload, &d)
633         if err != nil {
634                 err = fmt.Errorf("error unmarshalling payload: %s: %q", err, payload)
635                 return
636         }
637         msgType, ok := d["msg_type"]
638         if !ok {
639                 err = errors.New("missing msg_type field")
640                 return
641         }
642         piece := d["piece"]
643         switch msgType {
644         case pp.DataMetadataExtensionMsgType:
645                 if t.haveInfo() {
646                         break
647                 }
648                 t.SaveMetadataPiece(piece, payload[len(payload)-metadataPieceSize(d["total_size"], piece):])
649                 if !t.HaveAllMetadataPieces() {
650                         break
651                 }
652                 cl.completedMetadata(t)
653         case pp.RequestMetadataExtensionMsgType:
654                 if !t.HaveMetadataPiece(piece) {
655                         c.Post(t.NewMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
656                         break
657                 }
658                 start := (1 << 14) * piece
659                 c.Post(t.NewMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.MetaData[start:start+t.metadataPieceSize(piece)]))
660         case pp.RejectMetadataExtensionMsgType:
661         default:
662                 err = errors.New("unknown msg_type value")
663         }
664         return
665 }
666
667 type peerExchangeMessage struct {
668         Added      CompactPeers   `bencode:"added"`
669         AddedFlags []byte         `bencode:"added.f"`
670         Dropped    []tracker.Peer `bencode:"dropped"`
671 }
672
673 // Processes incoming bittorrent messages. The client lock is held upon entry
674 // and exit.
675 func (me *Client) connectionLoop(t *torrent, c *connection) error {
676         decoder := pp.Decoder{
677                 R:         bufio.NewReader(c.Socket),
678                 MaxLength: 256 * 1024,
679         }
680         for {
681                 me.mu.Unlock()
682                 var msg pp.Message
683                 err := decoder.Decode(&msg)
684                 me.mu.Lock()
685                 if c.getClosed() {
686                         return nil
687                 }
688                 if err != nil {
689                         if me.stopped() || err == io.EOF {
690                                 return nil
691                         }
692                         return err
693                 }
694                 if msg.Keepalive {
695                         continue
696                 }
697                 switch msg.Type {
698                 case pp.Choke:
699                         c.PeerChoked = true
700                         for r := range c.Requests {
701                                 me.connDeleteRequest(t, c, r)
702                         }
703                 case pp.Unchoke:
704                         c.PeerChoked = false
705                         me.peerUnchoked(t, c)
706                 case pp.Interested:
707                         c.PeerInterested = true
708                         // TODO: This should be done from a dedicated unchoking routine.
709                         if me.noUpload {
710                                 break
711                         }
712                         c.Unchoke()
713                 case pp.NotInterested:
714                         c.PeerInterested = false
715                         c.Choke()
716                 case pp.Have:
717                         me.peerGotPiece(t, c, int(msg.Index))
718                 case pp.Request:
719                         if me.noUpload {
720                                 break
721                         }
722                         if c.PeerRequests == nil {
723                                 c.PeerRequests = make(map[request]struct{}, maxRequests)
724                         }
725                         request := newRequest(msg.Index, msg.Begin, msg.Length)
726                         // TODO: Requests should be satisfied from a dedicated upload routine.
727                         // c.PeerRequests[request] = struct{}{}
728                         p := make([]byte, msg.Length)
729                         n, err := t.Data.ReadAt(p, int64(t.PieceLength(0))*int64(msg.Index)+int64(msg.Begin))
730                         if err != nil {
731                                 return fmt.Errorf("reading t data to serve request %q: %s", request, err)
732                         }
733                         if n != int(msg.Length) {
734                                 return fmt.Errorf("bad request: %v", msg)
735                         }
736                         c.Post(pp.Message{
737                                 Type:  pp.Piece,
738                                 Index: msg.Index,
739                                 Begin: msg.Begin,
740                                 Piece: p,
741                         })
742                         uploadChunksPosted.Add(1)
743                 case pp.Cancel:
744                         req := newRequest(msg.Index, msg.Begin, msg.Length)
745                         if !c.PeerCancel(req) {
746                                 unexpectedCancels.Add(1)
747                         }
748                 case pp.Bitfield:
749                         if c.PeerPieces != nil {
750                                 err = errors.New("received unexpected bitfield")
751                                 break
752                         }
753                         if t.haveInfo() {
754                                 if len(msg.Bitfield) < t.NumPieces() {
755                                         err = errors.New("received invalid bitfield")
756                                         break
757                                 }
758                                 msg.Bitfield = msg.Bitfield[:t.NumPieces()]
759                         }
760                         c.PeerPieces = msg.Bitfield
761                         for index, has := range c.PeerPieces {
762                                 if has {
763                                         me.peerGotPiece(t, c, index)
764                                 }
765                         }
766                 case pp.Piece:
767                         err = me.downloadedChunk(t, c, &msg)
768                 case pp.Extended:
769                         switch msg.ExtendedID {
770                         case pp.HandshakeExtendedID:
771                                 // TODO: Create a bencode struct for this.
772                                 var d map[string]interface{}
773                                 err = bencode.Unmarshal(msg.ExtendedPayload, &d)
774                                 if err != nil {
775                                         err = fmt.Errorf("error decoding extended message payload: %s", err)
776                                         break
777                                 }
778                                 // log.Printf("got handshake: %v", d)
779                                 if reqq, ok := d["reqq"]; ok {
780                                         if i, ok := reqq.(int64); ok {
781                                                 c.PeerMaxRequests = int(i)
782                                         }
783                                 }
784                                 if v, ok := d["v"]; ok {
785                                         c.PeerClientName = v.(string)
786                                 }
787                                 m, ok := d["m"]
788                                 if !ok {
789                                         err = errors.New("handshake missing m item")
790                                         break
791                                 }
792                                 mTyped, ok := m.(map[string]interface{})
793                                 if !ok {
794                                         err = errors.New("handshake m value is not dict")
795                                         break
796                                 }
797                                 if c.PeerExtensionIDs == nil {
798                                         c.PeerExtensionIDs = make(map[string]int64, len(mTyped))
799                                 }
800                                 for name, v := range mTyped {
801                                         id, ok := v.(int64)
802                                         if !ok {
803                                                 log.Printf("bad handshake m item extension ID type: %T", v)
804                                                 continue
805                                         }
806                                         if id == 0 {
807                                                 delete(c.PeerExtensionIDs, name)
808                                         } else {
809                                                 c.PeerExtensionIDs[name] = id
810                                         }
811                                 }
812                                 metadata_sizeUntyped, ok := d["metadata_size"]
813                                 if ok {
814                                         metadata_size, ok := metadata_sizeUntyped.(int64)
815                                         if !ok {
816                                                 log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
817                                         } else {
818                                                 t.SetMetadataSize(metadata_size)
819                                         }
820                                 }
821                                 if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
822                                         me.requestPendingMetadata(t, c)
823                                 }
824                         case 1:
825                                 err = me.gotMetadataExtensionMsg(msg.ExtendedPayload, t, c)
826                                 if err != nil {
827                                         err = fmt.Errorf("error handling metadata extension message: %s", err)
828                                 }
829                         case 2:
830                                 var pexMsg peerExchangeMessage
831                                 err := bencode.Unmarshal(msg.ExtendedPayload, &pexMsg)
832                                 if err != nil {
833                                         err = fmt.Errorf("error unmarshalling PEX message: %s", err)
834                                         break
835                                 }
836                                 go func() {
837                                         err := me.AddPeers(t.InfoHash, func() (ret []Peer) {
838                                                 for _, cp := range pexMsg.Added {
839                                                         p := Peer{
840                                                                 IP:     make([]byte, 4),
841                                                                 Port:   int(cp.Port),
842                                                                 Source: peerSourcePEX,
843                                                         }
844                                                         if n := copy(p.IP, cp.IP[:]); n != 4 {
845                                                                 panic(n)
846                                                         }
847                                                         ret = append(ret, p)
848                                                 }
849                                                 return
850                                         }())
851                                         if err != nil {
852                                                 log.Printf("error adding PEX peers: %s", err)
853                                                 return
854                                         }
855                                         peersFoundByPEX.Add(int64(len(pexMsg.Added)))
856                                 }()
857                         default:
858                                 err = fmt.Errorf("unexpected extended message ID: %v", msg.ExtendedID)
859                         }
860                         if err != nil {
861                                 log.Printf("peer extension map: %#v", c.PeerExtensionIDs)
862                         }
863                 default:
864                         err = fmt.Errorf("received unknown message type: %#v", msg.Type)
865                 }
866                 if err != nil {
867                         return err
868                 }
869         }
870 }
871
872 func (me *Client) dropConnection(torrent *torrent, conn *connection) {
873         conn.Socket.Close()
874         for r := range conn.Requests {
875                 me.connDeleteRequest(torrent, conn, r)
876         }
877         for i0, c := range torrent.Conns {
878                 if c != conn {
879                         continue
880                 }
881                 i1 := len(torrent.Conns) - 1
882                 if i0 != i1 {
883                         torrent.Conns[i0] = torrent.Conns[i1]
884                 }
885                 torrent.Conns = torrent.Conns[:i1]
886                 return
887         }
888         panic("connection not found")
889 }
890
891 func (me *Client) addConnection(t *torrent, c *connection) bool {
892         if me.stopped() {
893                 return false
894         }
895         for _, c0 := range t.Conns {
896                 if c.PeerID == c0.PeerID {
897                         // Already connected to a client with that ID.
898                         return false
899                 }
900         }
901         t.Conns = append(t.Conns, c)
902         return true
903 }
904
905 func (me *Client) openNewConns() {
906         for _, t := range me.torrents {
907                 for len(t.Peers) != 0 {
908                         if me.halfOpen >= me.halfOpenLimit {
909                                 return
910                         }
911                         var (
912                                 k peersKey
913                                 p Peer
914                         )
915                         for k, p = range t.Peers {
916                                 break
917                         }
918                         delete(t.Peers, k)
919                         me.initiateConn(p, t)
920                 }
921         }
922 }
923
924 // Adds peers to the swarm for the torrent corresponding to infoHash.
925 func (me *Client) AddPeers(infoHash InfoHash, peers []Peer) error {
926         me.mu.Lock()
927         defer me.mu.Unlock()
928         t := me.torrent(infoHash)
929         if t == nil {
930                 return errors.New("no such torrent")
931         }
932         t.AddPeers(peers)
933         me.openNewConns()
934         return nil
935 }
936
937 func (cl *Client) setMetaData(t *torrent, md metainfo.Info, bytes []byte) (err error) {
938         err = t.setMetadata(md, cl.dataDir, bytes)
939         if err != nil {
940                 return
941         }
942         // Queue all pieces for hashing. This is done sequentially to avoid
943         // spamming goroutines.
944         for _, p := range t.Pieces {
945                 p.QueuedForHash = true
946         }
947         go func() {
948                 for i := range t.Pieces {
949                         cl.verifyPiece(t, pp.Integer(i))
950                 }
951         }()
952
953         cl.downloadStrategy.TorrentStarted(t)
954         return
955 }
956
957 // Prepare a Torrent without any attachment to a Client. That means we can
958 // initialize fields all fields that don't require the Client without locking
959 // it.
960 func newTorrent(ih InfoHash, announceList [][]string) (t *torrent, err error) {
961         t = &torrent{
962                 InfoHash: ih,
963                 Peers:    make(map[peersKey]Peer, 2000),
964
965                 closing: make(chan struct{}),
966         }
967         t.Trackers = make([][]tracker.Client, len(announceList))
968         for tierIndex := range announceList {
969                 tier := t.Trackers[tierIndex]
970                 for _, url := range announceList[tierIndex] {
971                         tr, err := tracker.New(url)
972                         if err != nil {
973                                 log.Print(err)
974                                 continue
975                         }
976                         tier = append(tier, tr)
977                 }
978                 // The trackers within each tier must be shuffled before use.
979                 // http://stackoverflow.com/a/12267471/149482
980                 // http://www.bittorrent.org/beps/bep_0012.html#order-of-processing
981                 for i := range tier {
982                         j := mathRand.Intn(i + 1)
983                         tier[i], tier[j] = tier[j], tier[i]
984                 }
985                 t.Trackers[tierIndex] = tier
986         }
987         return
988 }
989
990 func (cl *Client) AddMagnet(uri string) (err error) {
991         m, err := ParseMagnetURI(uri)
992         if err != nil {
993                 return
994         }
995         t, err := newTorrent(m.InfoHash, [][]string{m.Trackers})
996         if err != nil {
997                 return
998         }
999         t.DisplayName = m.DisplayName
1000         cl.mu.Lock()
1001         defer cl.mu.Unlock()
1002         err = cl.addTorrent(t)
1003         if err != nil {
1004                 t.Close()
1005         }
1006         return
1007 }
1008
1009 func (me *Client) DropTorrent(infoHash InfoHash) (err error) {
1010         me.mu.Lock()
1011         defer me.mu.Unlock()
1012         t, ok := me.torrents[infoHash]
1013         if !ok {
1014                 err = fmt.Errorf("no such torrent")
1015                 return
1016         }
1017         err = t.Close()
1018         if err != nil {
1019                 panic(err)
1020         }
1021         delete(me.torrents, infoHash)
1022         me.downloadStrategy.TorrentStopped(t)
1023         return
1024 }
1025
1026 func (me *Client) addTorrent(t *torrent) (err error) {
1027         if _, ok := me.torrents[t.InfoHash]; ok {
1028                 err = fmt.Errorf("torrent infohash collision")
1029                 return
1030         }
1031         me.torrents[t.InfoHash] = t
1032         if !me.disableTrackers {
1033                 go me.announceTorrent(t)
1034         }
1035         if me.dHT != nil {
1036                 go me.announceTorrentDHT(t)
1037         }
1038         return
1039 }
1040
1041 // Adds the torrent to the client.
1042 func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) (err error) {
1043         var ih InfoHash
1044         CopyExact(&ih, metaInfo.Info.Hash)
1045         t, err := newTorrent(ih, metaInfo.AnnounceList)
1046         if err != nil {
1047                 return
1048         }
1049         me.mu.Lock()
1050         defer me.mu.Unlock()
1051         err = me.addTorrent(t)
1052         if err != nil {
1053                 return
1054         }
1055         err = me.setMetaData(t, metaInfo.Info.Info, metaInfo.Info.Bytes)
1056         if err != nil {
1057                 return
1058         }
1059         return
1060 }
1061
1062 func (me *Client) AddTorrentFromFile(name string) (err error) {
1063         mi, err := metainfo.LoadFromFile(name)
1064         if err != nil {
1065                 err = fmt.Errorf("error loading metainfo from file: %s", err)
1066                 return
1067         }
1068         return me.AddTorrent(mi)
1069 }
1070
1071 func (cl *Client) listenerAnnouncePort() (port int16) {
1072         l := cl.listener
1073         if l == nil {
1074                 return
1075         }
1076         addr := l.Addr()
1077         switch data := addr.(type) {
1078         case *net.TCPAddr:
1079                 return int16(data.Port)
1080         case *net.UDPAddr:
1081                 return int16(data.Port)
1082         default:
1083                 log.Printf("unknown listener addr type: %T", addr)
1084         }
1085         return
1086 }
1087
1088 func (cl *Client) announceTorrentDHT(t *torrent) {
1089         for {
1090                 ps, err := cl.dHT.GetPeers(string(t.InfoHash[:]))
1091                 if err != nil {
1092                         log.Printf("error getting peers from dht: %s", err)
1093                         return
1094                 }
1095                 nextScrape := time.After(1 * time.Minute)
1096         getPeers:
1097                 for {
1098                         select {
1099                         case <-nextScrape:
1100                                 break getPeers
1101                         case cps, ok := <-ps.Values:
1102                                 if !ok {
1103                                         break getPeers
1104                                 }
1105                                 peersFoundByDHT.Add(int64(len(cps)))
1106                                 err = cl.AddPeers(t.InfoHash, func() (ret []Peer) {
1107                                         for _, cp := range cps {
1108                                                 ret = append(ret, Peer{
1109                                                         IP:     cp.IP[:],
1110                                                         Port:   int(cp.Port),
1111                                                         Source: peerSourceDHT,
1112                                                 })
1113                                         }
1114                                         return
1115                                 }())
1116                                 if err != nil {
1117                                         log.Printf("error adding peers from dht for torrent %q: %s", t, err)
1118                                         break getPeers
1119                                 }
1120                         case <-t.closing:
1121                                 ps.Close()
1122                                 return
1123                         }
1124                 }
1125                 ps.Close()
1126         }
1127 }
1128
1129 func (cl *Client) announceTorrent(t *torrent) {
1130         req := tracker.AnnounceRequest{
1131                 Event:    tracker.Started,
1132                 NumWant:  -1,
1133                 Port:     cl.listenerAnnouncePort(),
1134                 PeerId:   cl.peerID,
1135                 InfoHash: t.InfoHash,
1136         }
1137 newAnnounce:
1138         for {
1139                 cl.mu.Lock()
1140                 if t.isClosed() {
1141                         return
1142                 }
1143                 req.Left = t.BytesLeft()
1144                 cl.mu.Unlock()
1145                 for _, tier := range t.Trackers {
1146                         for trIndex, tr := range tier {
1147                                 if err := tr.Connect(); err != nil {
1148                                         log.Print(err)
1149                                         continue
1150                                 }
1151                                 resp, err := tr.Announce(&req)
1152                                 if err != nil {
1153                                         log.Print(err)
1154                                         continue
1155                                 }
1156                                 var peers []Peer
1157                                 for _, peer := range resp.Peers {
1158                                         peers = append(peers, Peer{
1159                                                 IP:   peer.IP,
1160                                                 Port: peer.Port,
1161                                         })
1162                                 }
1163                                 err = cl.AddPeers(t.InfoHash, peers)
1164                                 if err != nil {
1165                                         log.Printf("error adding peers to torrent %s: %s", t, err)
1166                                 } else {
1167                                         log.Printf("%s: %d new peers from %s", t, len(peers), tr)
1168                                 }
1169                                 tier[0], tier[trIndex] = tier[trIndex], tier[0]
1170                                 time.Sleep(time.Second * time.Duration(resp.Interval))
1171                                 req.Event = tracker.None
1172                                 continue newAnnounce
1173                         }
1174                 }
1175                 time.Sleep(5 * time.Second)
1176         }
1177 }
1178
1179 func (cl *Client) allTorrentsCompleted() bool {
1180         for _, t := range cl.torrents {
1181                 if !t.haveAllPieces() {
1182                         return false
1183                 }
1184         }
1185         return true
1186 }
1187
1188 // Returns true when all torrents are completely downloaded and false if the
1189 // client is stopped before that.
1190 func (me *Client) WaitAll() bool {
1191         me.mu.Lock()
1192         defer me.mu.Unlock()
1193         for !me.allTorrentsCompleted() {
1194                 if me.stopped() {
1195                         return false
1196                 }
1197                 me.event.Wait()
1198         }
1199         return true
1200 }
1201
1202 func (cl *Client) assertRequestHeat() {
1203         dds, ok := cl.downloadStrategy.(*DefaultDownloadStrategy)
1204         if !ok {
1205                 return
1206         }
1207         for _, t := range cl.torrents {
1208                 m := make(map[request]int, 3000)
1209                 for _, cn := range t.Conns {
1210                         for r := range cn.Requests {
1211                                 m[r]++
1212                         }
1213                 }
1214                 for r, h := range dds.heat[t] {
1215                         if m[r] != h {
1216                                 panic(fmt.Sprintln(m[r], h))
1217                         }
1218                 }
1219         }
1220 }
1221
1222 func (me *Client) replenishConnRequests(t *torrent, c *connection) {
1223         if !t.haveInfo() {
1224                 return
1225         }
1226         me.downloadStrategy.FillRequests(t, c)
1227         //me.assertRequestHeat()
1228         if len(c.Requests) == 0 && !c.PeerChoked {
1229                 c.SetInterested(false)
1230         }
1231 }
1232
1233 func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) error {
1234         chunksDownloadedCount.Add(1)
1235
1236         req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
1237
1238         // Request has been satisfied.
1239         me.connDeleteRequest(t, c, req)
1240
1241         defer me.replenishConnRequests(t, c)
1242
1243         // Do we actually want this chunk?
1244         if _, ok := t.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok {
1245                 unusedDownloadedChunksCount.Add(1)
1246                 return nil
1247         }
1248
1249         // Write the chunk out.
1250         err := t.WriteChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
1251         if err != nil {
1252                 return err
1253         }
1254
1255         // Record that we have the chunk.
1256         delete(t.Pieces[req.Index].PendingChunkSpecs, req.chunkSpec)
1257         me.dataReady(dataSpec{t.InfoHash, req})
1258         if len(t.Pieces[req.Index].PendingChunkSpecs) == 0 {
1259                 me.queuePieceCheck(t, req.Index)
1260         }
1261         t.PieceBytesLeftChanged(int(req.Index))
1262
1263         // Unprioritize the chunk.
1264         me.downloadStrategy.TorrentGotChunk(t, req)
1265
1266         // Cancel pending requests for this chunk.
1267         for _, c := range t.Conns {
1268                 if me.connCancel(t, c, req) {
1269                         me.replenishConnRequests(t, c)
1270                 }
1271         }
1272
1273         me.downloadStrategy.AssertNotRequested(t, req)
1274
1275         return nil
1276 }
1277
1278 func (cl *Client) dataReady(ds dataSpec) {
1279         cl.dataWaiterMutex.Lock()
1280         if cl.dataWaiter != nil {
1281                 close(cl.dataWaiter)
1282         }
1283         cl.dataWaiter = nil
1284         cl.dataWaiterMutex.Unlock()
1285 }
1286
1287 // Returns a channel that is closed when new data has become available in the
1288 // client.
1289 func (me *Client) DataWaiter() (ret <-chan struct{}) {
1290         me.dataWaiterMutex.Lock()
1291         if me.dataWaiter == nil {
1292                 me.dataWaiter = make(chan struct{})
1293         }
1294         ret = me.dataWaiter
1295         me.dataWaiterMutex.Unlock()
1296         return
1297 }
1298
1299 func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) {
1300         p := t.Pieces[piece]
1301         p.EverHashed = true
1302         if correct {
1303                 p.PendingChunkSpecs = nil
1304                 me.downloadStrategy.TorrentGotPiece(t, int(piece))
1305                 me.dataReady(dataSpec{
1306                         t.InfoHash,
1307                         request{
1308                                 pp.Integer(piece),
1309                                 chunkSpec{0, pp.Integer(t.PieceLength(piece))},
1310                         },
1311                 })
1312         } else {
1313                 if len(p.PendingChunkSpecs) == 0 {
1314                         t.pendAllChunkSpecs(piece)
1315                 }
1316         }
1317         t.PieceBytesLeftChanged(int(piece))
1318         for _, conn := range t.Conns {
1319                 if correct {
1320                         conn.Post(pp.Message{
1321                                 Type:  pp.Have,
1322                                 Index: pp.Integer(piece),
1323                         })
1324                         // TODO: Cancel requests for this piece.
1325                 } else {
1326                         if conn.PeerHasPiece(piece) {
1327                                 me.replenishConnRequests(t, conn)
1328                         }
1329                 }
1330         }
1331         me.event.Broadcast()
1332 }
1333
1334 func (cl *Client) verifyPiece(t *torrent, index pp.Integer) {
1335         cl.mu.Lock()
1336         p := t.Pieces[index]
1337         for p.Hashing {
1338                 cl.event.Wait()
1339         }
1340         if t.isClosed() {
1341                 cl.mu.Unlock()
1342                 return
1343         }
1344         p.Hashing = true
1345         p.QueuedForHash = false
1346         cl.mu.Unlock()
1347         sum := t.HashPiece(index)
1348         cl.mu.Lock()
1349         p.Hashing = false
1350         cl.pieceHashed(t, index, sum == p.Hash)
1351         cl.mu.Unlock()
1352 }
1353
1354 func (me *Client) Torrents() (ret []*torrent) {
1355         me.mu.Lock()
1356         for _, t := range me.torrents {
1357                 ret = append(ret, t)
1358         }
1359         me.mu.Unlock()
1360         return
1361 }