]> Sergey Matveev's repositories - btrtrc.git/blob - client.go
68d89965a34558a52d4edea30ec605b4595292d9
[btrtrc.git] / client.go
1 /*
2 Package torrent implements a torrent client.
3
4 Simple example:
5
6         c := &Client{}
7         c.Start()
8         defer c.Stop()
9         if err := c.AddTorrent(externalMetaInfoPackageSux); err != nil {
10                 return fmt.Errors("error adding torrent: %s", err)
11         }
12         c.WaitAll()
13         log.Print("erhmahgerd, torrent downloaded")
14
15 */
16 package torrent
17
18 import (
19         "bufio"
20         "crypto/rand"
21         "crypto/sha1"
22         "errors"
23         "fmt"
24         "io"
25         "log"
26         mathRand "math/rand"
27         "net"
28         "os"
29         "sync"
30         "syscall"
31         "time"
32
33         "bitbucket.org/anacrolix/go.torrent/dht"
34         . "bitbucket.org/anacrolix/go.torrent/util"
35
36         "github.com/anacrolix/libtorgo/metainfo"
37         "github.com/nsf/libtorgo/bencode"
38
39         pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
40         "bitbucket.org/anacrolix/go.torrent/tracker"
41         _ "bitbucket.org/anacrolix/go.torrent/tracker/udp"
42 )
43
44 const extensionBytes = "\x00\x00\x00\x00\x00\x10\x00\x00"
45
46 // Currently doesn't really queue, but should in the future.
47 func (cl *Client) queuePieceCheck(t *torrent, pieceIndex pp.Integer) {
48         piece := t.Pieces[pieceIndex]
49         if piece.QueuedForHash {
50                 return
51         }
52         piece.QueuedForHash = true
53         go cl.verifyPiece(t, pieceIndex)
54 }
55
56 // Queues the torrent data for the given region for download. The beginning of
57 // the region is given highest priority to allow a subsequent read at the same
58 // offset to return data ASAP.
59 func (me *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) error {
60         me.mu.Lock()
61         defer me.mu.Unlock()
62         t := me.torrent(ih)
63         if t == nil {
64                 return errors.New("no such active torrent")
65         }
66         if !t.haveInfo() {
67                 return errors.New("missing metadata")
68         }
69         me.downloadStrategy.TorrentPrioritize(t, off, len_)
70         for _, cn := range t.Conns {
71                 me.replenishConnRequests(t, cn)
72         }
73         return nil
74 }
75
76 type dataSpec struct {
77         InfoHash
78         request
79 }
80
81 type Client struct {
82         dataDir          string
83         halfOpenLimit    int
84         peerID           [20]byte
85         listener         net.Listener
86         disableTrackers  bool
87         downloadStrategy DownloadStrategy
88         dHT              *dht.Server
89
90         mu    sync.Mutex
91         event sync.Cond
92         quit  chan struct{}
93
94         halfOpen   int
95         torrents   map[InfoHash]*torrent
96         dataWaiter chan struct{}
97 }
98
99 func (me *Client) ListenAddr() net.Addr {
100         return me.listener.Addr()
101 }
102
103 func (cl *Client) WriteStatus(w io.Writer) {
104         cl.mu.Lock()
105         defer cl.mu.Unlock()
106         if cl.listener != nil {
107                 fmt.Fprintf(w, "Listening on %s\n", cl.listener.Addr())
108         } else {
109                 fmt.Fprintf(w, "No listening torrent port!\n")
110         }
111         fmt.Fprintf(w, "Peer ID: %q\n", cl.peerID)
112         fmt.Fprintf(w, "Half open outgoing connections: %d\n", cl.halfOpen)
113         if cl.dHT != nil {
114                 fmt.Fprintf(w, "DHT nodes: %d\n", cl.dHT.NumNodes())
115                 fmt.Fprintf(w, "DHT Server ID: %x\n", cl.dHT.IDString())
116         }
117         fmt.Fprintln(w)
118         for _, t := range cl.torrents {
119                 fmt.Fprintf(w, "%s: %f%%\n", t.Name(), func() float32 {
120                         if !t.haveInfo() {
121                                 return 0
122                         } else {
123                                 return 100 * (1 - float32(t.BytesLeft())/float32(t.Length()))
124                         }
125                 }())
126                 t.WriteStatus(w)
127                 fmt.Fprintln(w)
128         }
129 }
130
131 // Read torrent data at the given offset. Returns ErrDataNotReady if the data
132 // isn't available.
133 func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err error) {
134         cl.mu.Lock()
135         defer cl.mu.Unlock()
136         t := cl.torrent(ih)
137         if t == nil {
138                 err = errors.New("unknown torrent")
139                 return
140         }
141         index := pp.Integer(off / int64(t.UsualPieceSize()))
142         // Reading outside the bounds of a file is an error.
143         if index < 0 {
144                 err = os.ErrInvalid
145                 return
146         }
147         if int(index) >= len(t.Pieces) {
148                 err = io.EOF
149                 return
150         }
151         piece := t.Pieces[index]
152         pieceOff := pp.Integer(off % int64(t.PieceLength(0)))
153         high := int(t.PieceLength(index) - pieceOff)
154         if high < len(p) {
155                 p = p[:high]
156         }
157         for cs, _ := range piece.PendingChunkSpecs {
158                 chunkOff := int64(pieceOff) - int64(cs.Begin)
159                 if chunkOff >= int64(t.PieceLength(index)) {
160                         panic(chunkOff)
161                 }
162                 if 0 <= chunkOff && chunkOff < int64(cs.Length) {
163                         // read begins in a pending chunk
164                         err = ErrDataNotReady
165                         return
166                 }
167                 // pending chunk caps available data
168                 if chunkOff < 0 && int64(len(p)) > -chunkOff {
169                         p = p[:-chunkOff]
170                 }
171         }
172         return t.Data.ReadAt(p, off)
173 }
174
175 func NewClient(cfg *Config) (cl *Client, err error) {
176         if cfg == nil {
177                 cfg = &Config{}
178         }
179
180         cl = &Client{
181                 disableTrackers:  cfg.DisableTrackers,
182                 downloadStrategy: cfg.DownloadStrategy,
183                 halfOpenLimit:    100,
184                 dataDir:          cfg.DataDir,
185
186                 quit:     make(chan struct{}),
187                 torrents: make(map[InfoHash]*torrent),
188         }
189         cl.event.L = &cl.mu
190
191         o := copy(cl.peerID[:], BEP20)
192         _, err = rand.Read(cl.peerID[o:])
193         if err != nil {
194                 panic("error generating peer id")
195         }
196
197         if cl.downloadStrategy == nil {
198                 cl.downloadStrategy = &DefaultDownloadStrategy{}
199         }
200
201         cl.listener, err = net.Listen("tcp", cfg.ListenAddr)
202         if err != nil {
203                 return
204         }
205         if cl.listener != nil {
206                 go cl.acceptConnections()
207         }
208
209         if !cfg.NoDHT {
210                 cl.dHT, err = dht.NewServer(&dht.ServerConfig{
211                         Addr: cfg.ListenAddr,
212                 })
213                 if err != nil {
214                         return
215                 }
216         }
217
218         return
219 }
220
221 func (cl *Client) stopped() bool {
222         select {
223         case <-cl.quit:
224                 return true
225         default:
226                 return false
227         }
228 }
229
230 // Stops the client. All connections to peers are closed and all activity will
231 // come to a halt.
232 func (me *Client) Stop() {
233         me.mu.Lock()
234         close(me.quit)
235         me.event.Broadcast()
236         for _, t := range me.torrents {
237                 for _, c := range t.Conns {
238                         c.Close()
239                 }
240         }
241         me.mu.Unlock()
242 }
243
244 func (cl *Client) acceptConnections() {
245         for {
246                 conn, err := cl.listener.Accept()
247                 select {
248                 case <-cl.quit:
249                         if conn != nil {
250                                 conn.Close()
251                         }
252                         return
253                 default:
254                 }
255                 if err != nil {
256                         log.Print(err)
257                         return
258                 }
259                 // log.Printf("accepted connection from %s", conn.RemoteAddr())
260                 go func() {
261                         if err := cl.runConnection(conn, nil, peerSourceIncoming); err != nil {
262                                 log.Print(err)
263                         }
264                 }()
265         }
266 }
267
268 func (me *Client) torrent(ih InfoHash) *torrent {
269         for _, t := range me.torrents {
270                 if t.InfoHash == ih {
271                         return t
272                 }
273         }
274         return nil
275 }
276
277 // Start the process of connecting to the given peer for the given torrent if
278 // appropriate.
279 func (me *Client) initiateConn(peer Peer, torrent *torrent) {
280         if peer.Id == me.peerID {
281                 return
282         }
283         me.halfOpen++
284         go func() {
285                 addr := &net.TCPAddr{
286                         IP:   peer.IP,
287                         Port: peer.Port,
288                 }
289                 // Binding to the listener address and dialing via net.Dialer gives
290                 // "address in use" error. It seems it's not possible to dial out from
291                 // this address so that peers associate our local address with our
292                 // listen address.
293                 conn, err := net.DialTimeout(addr.Network(), addr.String(), dialTimeout)
294
295                 // Whether or not the connection attempt succeeds, the half open
296                 // counter should be decremented, and new connection attempts made.
297                 go func() {
298                         me.mu.Lock()
299                         defer me.mu.Unlock()
300                         if me.halfOpen == 0 {
301                                 panic("assert")
302                         }
303                         me.halfOpen--
304                         me.openNewConns()
305                 }()
306
307                 if netOpErr, ok := err.(*net.OpError); ok {
308                         if netOpErr.Timeout() {
309                                 return
310                         }
311                         switch netOpErr.Err {
312                         case syscall.ECONNREFUSED, syscall.EHOSTUNREACH:
313                                 return
314                         }
315                 }
316                 if err != nil {
317                         log.Printf("error connecting to peer: %s %#v", err, err)
318                         return
319                 }
320                 // log.Printf("connected to %s", conn.RemoteAddr())
321                 err = me.runConnection(conn, torrent, peer.Source)
322                 if err != nil {
323                         log.Print(err)
324                 }
325         }()
326 }
327
328 func (cl *Client) incomingPeerPort() int {
329         if cl.listener == nil {
330                 return 0
331         }
332         _, p, err := net.SplitHostPort(cl.listener.Addr().String())
333         if err != nil {
334                 panic(err)
335         }
336         var i int
337         _, err = fmt.Sscanf(p, "%d", &i)
338         if err != nil {
339                 panic(err)
340         }
341         return i
342 }
343
344 // Convert a net.Addr to its compact IP representation. Either 4 or 16 bytes per "yourip" field of http://www.bittorrent.org/beps/bep_0010.html.
345 func addrCompactIP(addr net.Addr) (string, error) {
346         switch typed := addr.(type) {
347         case *net.TCPAddr:
348                 if v4 := typed.IP.To4(); v4 != nil {
349                         if len(v4) != 4 {
350                                 panic(v4)
351                         }
352                         return string(v4), nil
353                 }
354                 return string(typed.IP.To16()), nil
355         default:
356                 return "", fmt.Errorf("unhandled type: %T", addr)
357         }
358 }
359
360 func handshakeWriter(w io.WriteCloser, bb <-chan []byte, done chan<- error) {
361         var err error
362         for b := range bb {
363                 _, err = w.Write(b)
364                 if err != nil {
365                         w.Close()
366                         break
367                 }
368         }
369         done <- err
370 }
371
372 type peerExtensionBytes [8]byte
373 type peerID [20]byte
374
375 type handshakeResult struct {
376         peerExtensionBytes
377         peerID
378         InfoHash
379 }
380
381 func handshake(sock io.ReadWriteCloser, ih *InfoHash, peerID [20]byte) (res handshakeResult, ok bool, err error) {
382         // Bytes to be sent to the peer. Should never block the sender.
383         postCh := make(chan []byte, 4)
384         // A single error value sent when the writer completes.
385         writeDone := make(chan error, 1)
386         // Performs writes to the socket and ensures posts don't block.
387         go handshakeWriter(sock, postCh, writeDone)
388
389         defer func() {
390                 close(postCh) // Done writing.
391                 if !ok {
392                         return
393                 }
394                 if err != nil {
395                         panic(err)
396                 }
397                 // Wait until writes complete before returning from handshake.
398                 err = <-writeDone
399                 if err != nil {
400                         err = fmt.Errorf("error writing during handshake: %s", err)
401                 }
402         }()
403
404         post := func(bb []byte) {
405                 select {
406                 case postCh <- bb:
407                 default:
408                         panic("mustn't block while posting")
409                 }
410         }
411
412         post([]byte(pp.Protocol))
413         post([]byte(extensionBytes))
414         if ih != nil { // We already know what we want.
415                 post(ih[:])
416                 post(peerID[:])
417         }
418         var b [68]byte
419         _, err = io.ReadFull(sock, b[:68])
420         if err != nil {
421                 err = nil
422                 return
423         }
424         if string(b[:20]) != pp.Protocol {
425                 return
426         }
427         copy(res.peerExtensionBytes[:], b[20:28])
428         copy(res.InfoHash[:], b[28:48])
429         copy(res.peerID[:], b[48:68])
430
431         if ih == nil { // We were waiting for the peer to tell us what they wanted.
432                 post(res.InfoHash[:])
433                 post(peerID[:])
434         }
435
436         ok = true
437         return
438 }
439
440 func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerSource) (err error) {
441         defer sock.Close()
442         hsRes, ok, err := handshake(sock, func() *InfoHash {
443                 if torrent == nil {
444                         return nil
445                 } else {
446                         return &torrent.InfoHash
447                 }
448         }(), me.peerID)
449         if err != nil {
450                 err = fmt.Errorf("error during handshake: %s", err)
451                 return
452         }
453         if !ok {
454                 return
455         }
456         me.mu.Lock()
457         defer me.mu.Unlock()
458         torrent = me.torrent(hsRes.InfoHash)
459         if torrent == nil {
460                 return
461         }
462         conn := newConnection(sock, hsRes.peerExtensionBytes, hsRes.peerID)
463         defer conn.Close()
464         conn.Discovery = discovery
465         if !me.addConnection(torrent, conn) {
466                 return
467         }
468         if conn.PeerExtensionBytes[5]&0x10 != 0 {
469                 conn.Post(pp.Message{
470                         Type:       pp.Extended,
471                         ExtendedID: pp.HandshakeExtendedID,
472                         ExtendedPayload: func() []byte {
473                                 d := map[string]interface{}{
474                                         "m": map[string]int{
475                                                 "ut_metadata": 1,
476                                                 "ut_pex":      2,
477                                         },
478                                         "v":    "go.torrent dev",
479                                         "reqq": 1,
480                                 }
481                                 if torrent.metadataSizeKnown() {
482                                         d["metadata_size"] = torrent.metadataSize()
483                                 }
484                                 if p := me.incomingPeerPort(); p != 0 {
485                                         d["p"] = p
486                                 }
487                                 yourip, err := addrCompactIP(conn.Socket.RemoteAddr())
488                                 if err != nil {
489                                         log.Printf("error calculating yourip field value in extension handshake: %s", err)
490                                 } else {
491                                         d["yourip"] = yourip
492                                 }
493                                 // log.Printf("sending %v", d)
494                                 b, err := bencode.Marshal(d)
495                                 if err != nil {
496                                         panic(err)
497                                 }
498                                 return b
499                         }(),
500                 })
501         }
502         if torrent.haveAnyPieces() {
503                 conn.Post(pp.Message{
504                         Type:     pp.Bitfield,
505                         Bitfield: torrent.bitfield(),
506                 })
507         }
508         err = me.connectionLoop(torrent, conn)
509         if err != nil {
510                 err = fmt.Errorf("during Connection loop with peer %q: %s", conn.PeerID, err)
511         }
512         me.dropConnection(torrent, conn)
513         return
514 }
515
516 func (me *Client) peerGotPiece(t *torrent, c *connection, piece int) {
517         for piece >= len(c.PeerPieces) {
518                 c.PeerPieces = append(c.PeerPieces, false)
519         }
520         c.PeerPieces[piece] = true
521         if t.wantPiece(piece) {
522                 me.replenishConnRequests(t, c)
523         }
524 }
525
526 func (me *Client) peerUnchoked(torrent *torrent, conn *connection) {
527         me.replenishConnRequests(torrent, conn)
528 }
529
530 func (cl *Client) connCancel(t *torrent, cn *connection, r request) (ok bool) {
531         ok = cn.Cancel(r)
532         if ok {
533                 cl.downloadStrategy.DeleteRequest(t, r)
534         }
535         return
536 }
537
538 func (cl *Client) connDeleteRequest(t *torrent, cn *connection, r request) {
539         if !cn.RequestPending(r) {
540                 return
541         }
542         cl.downloadStrategy.DeleteRequest(t, r)
543         delete(cn.Requests, r)
544 }
545
546 func (cl *Client) requestPendingMetadata(t *torrent, c *connection) {
547         if t.haveInfo() {
548                 return
549         }
550         var pending []int
551         for index := 0; index < t.MetadataPieceCount(); index++ {
552                 if !t.HaveMetadataPiece(index) {
553                         pending = append(pending, index)
554                 }
555         }
556         for _, i := range mathRand.Perm(len(pending)) {
557                 c.Post(pp.Message{
558                         Type:       pp.Extended,
559                         ExtendedID: byte(c.PeerExtensionIDs["ut_metadata"]),
560                         ExtendedPayload: func() []byte {
561                                 b, err := bencode.Marshal(map[string]int{
562                                         "msg_type": 0,
563                                         "piece":    pending[i],
564                                 })
565                                 if err != nil {
566                                         panic(err)
567                                 }
568                                 return b
569                         }(),
570                 })
571         }
572 }
573
574 func (cl *Client) completedMetadata(t *torrent) {
575         h := sha1.New()
576         h.Write(t.MetaData)
577         var ih InfoHash
578         CopyExact(&ih, h.Sum(nil))
579         if ih != t.InfoHash {
580                 log.Print("bad metadata")
581                 t.InvalidateMetadata()
582                 return
583         }
584         var info metainfo.Info
585         err := bencode.Unmarshal(t.MetaData, &info)
586         if err != nil {
587                 log.Printf("error unmarshalling metadata: %s", err)
588                 t.InvalidateMetadata()
589                 return
590         }
591         // TODO(anacrolix): If this fails, I think something harsher should be
592         // done.
593         err = cl.setMetaData(t, info, t.MetaData)
594         if err != nil {
595                 log.Printf("error setting metadata: %s", err)
596                 t.InvalidateMetadata()
597                 return
598         }
599         log.Printf("%s: got metadata from peers", t)
600 }
601
602 // Process incoming ut_metadata message.
603 func (cl *Client) gotMetadataExtensionMsg(payload []byte, t *torrent, c *connection) (err error) {
604         var d map[string]int
605         err = bencode.Unmarshal(payload, &d)
606         if err != nil {
607                 err = fmt.Errorf("error unmarshalling payload: %s: %q", err, payload)
608                 return
609         }
610         msgType, ok := d["msg_type"]
611         if !ok {
612                 err = errors.New("missing msg_type field")
613                 return
614         }
615         piece := d["piece"]
616         switch msgType {
617         case pp.DataMetadataExtensionMsgType:
618                 if t.haveInfo() {
619                         break
620                 }
621                 t.SaveMetadataPiece(piece, payload[len(payload)-metadataPieceSize(d["total_size"], piece):])
622                 if !t.HaveAllMetadataPieces() {
623                         break
624                 }
625                 cl.completedMetadata(t)
626         case pp.RequestMetadataExtensionMsgType:
627                 if !t.HaveMetadataPiece(piece) {
628                         c.Post(t.NewMetadataExtensionMessage(c, pp.RejectMetadataExtensionMsgType, d["piece"], nil))
629                         break
630                 }
631                 start := (1 << 14) * piece
632                 c.Post(t.NewMetadataExtensionMessage(c, pp.DataMetadataExtensionMsgType, piece, t.MetaData[start:start+t.metadataPieceSize(piece)]))
633         case pp.RejectMetadataExtensionMsgType:
634         default:
635                 err = errors.New("unknown msg_type value")
636         }
637         return
638 }
639
640 type peerExchangeMessage struct {
641         Added      CompactPeers   `bencode:"added"`
642         AddedFlags []byte         `bencode:"added.f"`
643         Dropped    []tracker.Peer `bencode:"dropped"`
644 }
645
646 // Processes incoming bittorrent messages. The client lock is held upon entry
647 // and exit.
648 func (me *Client) connectionLoop(t *torrent, c *connection) error {
649         decoder := pp.Decoder{
650                 R:         bufio.NewReader(c.Socket),
651                 MaxLength: 256 * 1024,
652         }
653         for {
654                 me.mu.Unlock()
655                 var msg pp.Message
656                 err := decoder.Decode(&msg)
657                 me.mu.Lock()
658                 if c.getClosed() {
659                         return nil
660                 }
661                 if err != nil {
662                         if me.stopped() || err == io.EOF {
663                                 return nil
664                         }
665                         return err
666                 }
667                 if msg.Keepalive {
668                         continue
669                 }
670                 switch msg.Type {
671                 case pp.Choke:
672                         c.PeerChoked = true
673                         for r := range c.Requests {
674                                 me.connDeleteRequest(t, c, r)
675                         }
676                 case pp.Unchoke:
677                         c.PeerChoked = false
678                         me.peerUnchoked(t, c)
679                 case pp.Interested:
680                         c.PeerInterested = true
681                         // TODO: This should be done from a dedicated unchoking routine.
682                         c.Unchoke()
683                 case pp.NotInterested:
684                         c.PeerInterested = false
685                         c.Choke()
686                 case pp.Have:
687                         me.peerGotPiece(t, c, int(msg.Index))
688                 case pp.Request:
689                         if c.PeerRequests == nil {
690                                 c.PeerRequests = make(map[request]struct{}, maxRequests)
691                         }
692                         request := newRequest(msg.Index, msg.Begin, msg.Length)
693                         // TODO: Requests should be satisfied from a dedicated upload routine.
694                         // c.PeerRequests[request] = struct{}{}
695                         p := make([]byte, msg.Length)
696                         n, err := t.Data.ReadAt(p, int64(t.PieceLength(0))*int64(msg.Index)+int64(msg.Begin))
697                         if err != nil {
698                                 return fmt.Errorf("reading t data to serve request %q: %s", request, err)
699                         }
700                         if n != int(msg.Length) {
701                                 return fmt.Errorf("bad request: %v", msg)
702                         }
703                         c.Post(pp.Message{
704                                 Type:  pp.Piece,
705                                 Index: msg.Index,
706                                 Begin: msg.Begin,
707                                 Piece: p,
708                         })
709                 case pp.Cancel:
710                         req := newRequest(msg.Index, msg.Begin, msg.Length)
711                         if !c.PeerCancel(req) {
712                                 log.Printf("received unexpected cancel: %v", req)
713                         }
714                 case pp.Bitfield:
715                         if c.PeerPieces != nil {
716                                 err = errors.New("received unexpected bitfield")
717                                 break
718                         }
719                         if t.haveInfo() {
720                                 if len(msg.Bitfield) < t.NumPieces() {
721                                         err = errors.New("received invalid bitfield")
722                                         break
723                                 }
724                                 msg.Bitfield = msg.Bitfield[:t.NumPieces()]
725                         }
726                         c.PeerPieces = msg.Bitfield
727                         for index, has := range c.PeerPieces {
728                                 if has {
729                                         me.peerGotPiece(t, c, index)
730                                 }
731                         }
732                 case pp.Piece:
733                         err = me.downloadedChunk(t, c, &msg)
734                 case pp.Extended:
735                         switch msg.ExtendedID {
736                         case pp.HandshakeExtendedID:
737                                 // TODO: Create a bencode struct for this.
738                                 var d map[string]interface{}
739                                 err = bencode.Unmarshal(msg.ExtendedPayload, &d)
740                                 if err != nil {
741                                         err = fmt.Errorf("error decoding extended message payload: %s", err)
742                                         break
743                                 }
744                                 // log.Printf("got handshake: %v", d)
745                                 if reqq, ok := d["reqq"]; ok {
746                                         if i, ok := reqq.(int64); ok {
747                                                 c.PeerMaxRequests = int(i)
748                                         }
749                                 }
750                                 if v, ok := d["v"]; ok {
751                                         c.PeerClientName = v.(string)
752                                 }
753                                 m, ok := d["m"]
754                                 if !ok {
755                                         err = errors.New("handshake missing m item")
756                                         break
757                                 }
758                                 mTyped, ok := m.(map[string]interface{})
759                                 if !ok {
760                                         err = errors.New("handshake m value is not dict")
761                                         break
762                                 }
763                                 if c.PeerExtensionIDs == nil {
764                                         c.PeerExtensionIDs = make(map[string]int64, len(mTyped))
765                                 }
766                                 for name, v := range mTyped {
767                                         id, ok := v.(int64)
768                                         if !ok {
769                                                 log.Printf("bad handshake m item extension ID type: %T", v)
770                                                 continue
771                                         }
772                                         if id == 0 {
773                                                 delete(c.PeerExtensionIDs, name)
774                                         } else {
775                                                 c.PeerExtensionIDs[name] = id
776                                         }
777                                 }
778                                 metadata_sizeUntyped, ok := d["metadata_size"]
779                                 if ok {
780                                         metadata_size, ok := metadata_sizeUntyped.(int64)
781                                         if !ok {
782                                                 log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
783                                         } else {
784                                                 t.SetMetadataSize(metadata_size)
785                                         }
786                                 }
787                                 if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
788                                         me.requestPendingMetadata(t, c)
789                                 }
790                         case 1:
791                                 err = me.gotMetadataExtensionMsg(msg.ExtendedPayload, t, c)
792                                 if err != nil {
793                                         err = fmt.Errorf("error handling metadata extension message: %s", err)
794                                 }
795                         case 2:
796                                 var pexMsg peerExchangeMessage
797                                 err := bencode.Unmarshal(msg.ExtendedPayload, &pexMsg)
798                                 if err != nil {
799                                         err = fmt.Errorf("error unmarshalling PEX message: %s", err)
800                                         break
801                                 }
802                                 go func() {
803                                         err := me.AddPeers(t.InfoHash, func() (ret []Peer) {
804                                                 for _, cp := range pexMsg.Added {
805                                                         p := Peer{
806                                                                 IP:     make([]byte, 4),
807                                                                 Port:   int(cp.Port),
808                                                                 Source: peerSourcePEX,
809                                                         }
810                                                         if n := copy(p.IP, cp.IP[:]); n != 4 {
811                                                                 panic(n)
812                                                         }
813                                                         ret = append(ret, p)
814                                                 }
815                                                 return
816                                         }())
817                                         if err != nil {
818                                                 log.Printf("error adding PEX peers: %s", err)
819                                                 return
820                                         }
821                                         log.Printf("added %d peers from PEX", len(pexMsg.Added))
822                                 }()
823                         default:
824                                 err = fmt.Errorf("unexpected extended message ID: %v", msg.ExtendedID)
825                         }
826                         if err != nil {
827                                 log.Printf("peer extension map: %#v", c.PeerExtensionIDs)
828                         }
829                 default:
830                         err = fmt.Errorf("received unknown message type: %#v", msg.Type)
831                 }
832                 if err != nil {
833                         return err
834                 }
835         }
836 }
837
838 func (me *Client) dropConnection(torrent *torrent, conn *connection) {
839         conn.Socket.Close()
840         for r := range conn.Requests {
841                 me.connDeleteRequest(torrent, conn, r)
842         }
843         for i0, c := range torrent.Conns {
844                 if c != conn {
845                         continue
846                 }
847                 i1 := len(torrent.Conns) - 1
848                 if i0 != i1 {
849                         torrent.Conns[i0] = torrent.Conns[i1]
850                 }
851                 torrent.Conns = torrent.Conns[:i1]
852                 return
853         }
854         panic("connection not found")
855 }
856
857 func (me *Client) addConnection(t *torrent, c *connection) bool {
858         if me.stopped() {
859                 return false
860         }
861         for _, c0 := range t.Conns {
862                 if c.PeerID == c0.PeerID {
863                         // Already connected to a client with that ID.
864                         return false
865                 }
866         }
867         t.Conns = append(t.Conns, c)
868         return true
869 }
870
871 func (me *Client) openNewConns() {
872         for _, t := range me.torrents {
873                 for len(t.Peers) != 0 {
874                         if me.halfOpen >= me.halfOpenLimit {
875                                 return
876                         }
877                         p := t.Peers[0]
878                         t.Peers = t.Peers[1:]
879                         me.initiateConn(p, t)
880                 }
881         }
882 }
883
884 // Adds peers to the swarm for the torrent corresponding to infoHash.
885 func (me *Client) AddPeers(infoHash InfoHash, peers []Peer) error {
886         me.mu.Lock()
887         defer me.mu.Unlock()
888         t := me.torrent(infoHash)
889         if t == nil {
890                 return errors.New("no such torrent")
891         }
892         t.Peers = append(t.Peers, peers...)
893         me.openNewConns()
894         return nil
895 }
896
897 func (cl *Client) setMetaData(t *torrent, md metainfo.Info, bytes []byte) (err error) {
898         err = t.setMetadata(md, cl.dataDir, bytes)
899         if err != nil {
900                 return
901         }
902         // Queue all pieces for hashing. This is done sequentially to avoid
903         // spamming goroutines.
904         for _, p := range t.Pieces {
905                 p.QueuedForHash = true
906         }
907         go func() {
908                 for i := range t.Pieces {
909                         cl.verifyPiece(t, pp.Integer(i))
910                 }
911         }()
912
913         cl.downloadStrategy.TorrentStarted(t)
914         return
915 }
916
917 // Prepare a Torrent without any attachment to a Client. That means we can
918 // initialize fields all fields that don't require the Client without locking
919 // it.
920 func newTorrent(ih InfoHash, announceList [][]string) (t *torrent, err error) {
921         t = &torrent{
922                 InfoHash: ih,
923         }
924         t.Trackers = make([][]tracker.Client, len(announceList))
925         for tierIndex := range announceList {
926                 tier := t.Trackers[tierIndex]
927                 for _, url := range announceList[tierIndex] {
928                         tr, err := tracker.New(url)
929                         if err != nil {
930                                 log.Print(err)
931                                 continue
932                         }
933                         tier = append(tier, tr)
934                 }
935                 // The trackers within each tier must be shuffled before use.
936                 // http://stackoverflow.com/a/12267471/149482
937                 // http://www.bittorrent.org/beps/bep_0012.html#order-of-processing
938                 for i := range tier {
939                         j := mathRand.Intn(i + 1)
940                         tier[i], tier[j] = tier[j], tier[i]
941                 }
942                 t.Trackers[tierIndex] = tier
943         }
944         return
945 }
946
947 func (cl *Client) AddMagnet(uri string) (err error) {
948         m, err := ParseMagnetURI(uri)
949         if err != nil {
950                 return
951         }
952         t, err := newTorrent(m.InfoHash, [][]string{m.Trackers})
953         if err != nil {
954                 return
955         }
956         t.DisplayName = m.DisplayName
957         cl.mu.Lock()
958         defer cl.mu.Unlock()
959         err = cl.addTorrent(t)
960         if err != nil {
961                 t.Close()
962         }
963         return
964 }
965
966 func (me *Client) DropTorrent(infoHash InfoHash) (err error) {
967         me.mu.Lock()
968         defer me.mu.Unlock()
969         t, ok := me.torrents[infoHash]
970         if !ok {
971                 err = fmt.Errorf("no such torrent")
972                 return
973         }
974         err = t.Close()
975         if err != nil {
976                 panic(err)
977         }
978         delete(me.torrents, infoHash)
979         return
980 }
981
982 func (me *Client) addTorrent(t *torrent) (err error) {
983         if _, ok := me.torrents[t.InfoHash]; ok {
984                 err = fmt.Errorf("torrent infohash collision")
985                 return
986         }
987         me.torrents[t.InfoHash] = t
988         if !me.disableTrackers {
989                 go me.announceTorrent(t)
990         }
991         if me.dHT != nil {
992                 go me.announceTorrentDHT(t)
993         }
994         return
995 }
996
997 // Adds the torrent to the client.
998 func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) (err error) {
999         t, err := newTorrent(BytesInfoHash(metaInfo.Info.Hash), metaInfo.AnnounceList)
1000         if err != nil {
1001                 return
1002         }
1003         me.mu.Lock()
1004         defer me.mu.Unlock()
1005         err = me.addTorrent(t)
1006         if err != nil {
1007                 return
1008         }
1009         err = me.setMetaData(t, metaInfo.Info.Info, metaInfo.Info.Bytes)
1010         if err != nil {
1011                 return
1012         }
1013         return
1014 }
1015
1016 func (me *Client) AddTorrentFromFile(name string) (err error) {
1017         mi, err := metainfo.LoadFromFile(name)
1018         if err != nil {
1019                 err = fmt.Errorf("error loading metainfo from file: %s", err)
1020                 return
1021         }
1022         return me.AddTorrent(mi)
1023 }
1024
1025 func (cl *Client) listenerAnnouncePort() (port int16) {
1026         l := cl.listener
1027         if l == nil {
1028                 return
1029         }
1030         addr := l.Addr()
1031         switch data := addr.(type) {
1032         case *net.TCPAddr:
1033                 return int16(data.Port)
1034         case *net.UDPAddr:
1035                 return int16(data.Port)
1036         default:
1037                 log.Printf("unknown listener addr type: %T", addr)
1038         }
1039         return
1040 }
1041
1042 func (cl *Client) announceTorrentDHT(t *torrent) {
1043         for {
1044                 ps, err := cl.dHT.GetPeers(string(t.InfoHash[:]))
1045                 if err != nil {
1046                         log.Printf("error getting peers from dht: %s", err)
1047                         return
1048                 }
1049                 nextScrape := time.After(1 * time.Minute)
1050         getPeers:
1051                 for {
1052                         select {
1053                         case <-nextScrape:
1054                                 break getPeers
1055                         case cps, ok := <-ps.Values:
1056                                 if !ok {
1057                                         break getPeers
1058                                 }
1059                                 err = cl.AddPeers(t.InfoHash, func() (ret []Peer) {
1060                                         for _, cp := range cps {
1061                                                 ret = append(ret, Peer{
1062                                                         IP:     cp.IP[:],
1063                                                         Port:   int(cp.Port),
1064                                                         Source: peerSourceDHT,
1065                                                 })
1066                                                 // log.Printf("peer from dht: %s", &net.UDPAddr{
1067                                                 //      IP:   cp.IP[:],
1068                                                 //      Port: int(cp.Port),
1069                                                 // })
1070                                         }
1071                                         return
1072                                 }())
1073                                 if err != nil {
1074                                         log.Printf("error adding peers from dht for torrent %q: %s", t, err)
1075                                         break getPeers
1076                                 }
1077                                 // log.Printf("got %d peers from dht for torrent %q", len(cps), t)
1078                         }
1079                 }
1080                 ps.Close()
1081         }
1082 }
1083
1084 func (cl *Client) announceTorrent(t *torrent) {
1085         req := tracker.AnnounceRequest{
1086                 Event:    tracker.Started,
1087                 NumWant:  -1,
1088                 Port:     cl.listenerAnnouncePort(),
1089                 PeerId:   cl.peerID,
1090                 InfoHash: t.InfoHash,
1091         }
1092 newAnnounce:
1093         for {
1094                 cl.mu.Lock()
1095                 if t.isClosed() {
1096                         return
1097                 }
1098                 req.Left = t.BytesLeft()
1099                 cl.mu.Unlock()
1100                 for _, tier := range t.Trackers {
1101                         for trIndex, tr := range tier {
1102                                 if err := tr.Connect(); err != nil {
1103                                         log.Print(err)
1104                                         continue
1105                                 }
1106                                 resp, err := tr.Announce(&req)
1107                                 if err != nil {
1108                                         log.Print(err)
1109                                         continue
1110                                 }
1111                                 var peers []Peer
1112                                 for _, peer := range resp.Peers {
1113                                         peers = append(peers, Peer{
1114                                                 IP:   peer.IP,
1115                                                 Port: peer.Port,
1116                                         })
1117                                 }
1118                                 err = cl.AddPeers(t.InfoHash, peers)
1119                                 if err != nil {
1120                                         log.Printf("error adding peers to torrent %s: %s", t, err)
1121                                 } else {
1122                                         log.Printf("%s: %d new peers from %s", t, len(peers), tr)
1123                                 }
1124                                 tier[0], tier[trIndex] = tier[trIndex], tier[0]
1125                                 time.Sleep(time.Second * time.Duration(resp.Interval))
1126                                 req.Event = tracker.None
1127                                 continue newAnnounce
1128                         }
1129                 }
1130                 time.Sleep(5 * time.Second)
1131         }
1132 }
1133
1134 func (cl *Client) allTorrentsCompleted() bool {
1135         for _, t := range cl.torrents {
1136                 if !t.haveAllPieces() {
1137                         return false
1138                 }
1139         }
1140         return true
1141 }
1142
1143 // Returns true when all torrents are completely downloaded and false if the
1144 // client is stopped before that.
1145 func (me *Client) WaitAll() bool {
1146         me.mu.Lock()
1147         defer me.mu.Unlock()
1148         for !me.allTorrentsCompleted() {
1149                 if me.stopped() {
1150                         return false
1151                 }
1152                 me.event.Wait()
1153         }
1154         return true
1155 }
1156
1157 func (cl *Client) assertRequestHeat() {
1158         dds, ok := cl.downloadStrategy.(*DefaultDownloadStrategy)
1159         if !ok {
1160                 return
1161         }
1162         for _, t := range cl.torrents {
1163                 m := make(map[request]int, 3000)
1164                 for _, cn := range t.Conns {
1165                         for r := range cn.Requests {
1166                                 m[r]++
1167                         }
1168                 }
1169                 for r, h := range dds.heat[t] {
1170                         if m[r] != h {
1171                                 panic(fmt.Sprintln(m[r], h))
1172                         }
1173                 }
1174         }
1175 }
1176
1177 func (me *Client) replenishConnRequests(t *torrent, c *connection) {
1178         if !t.haveInfo() {
1179                 return
1180         }
1181         me.downloadStrategy.FillRequests(t, c)
1182         //me.assertRequestHeat()
1183         if len(c.Requests) == 0 && !c.PeerChoked {
1184                 c.SetInterested(false)
1185         }
1186 }
1187
1188 func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) error {
1189         req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
1190
1191         // Request has been satisfied.
1192         me.connDeleteRequest(t, c, req)
1193
1194         defer me.replenishConnRequests(t, c)
1195
1196         // Do we actually want this chunk?
1197         if _, ok := t.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok {
1198                 log.Printf("got unnecessary chunk from %v: %q", req, string(c.PeerId[:]))
1199                 return nil
1200         }
1201
1202         // Write the chunk out.
1203         err := t.WriteChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
1204         if err != nil {
1205                 return err
1206         }
1207
1208         // Record that we have the chunk.
1209         delete(t.Pieces[req.Index].PendingChunkSpecs, req.chunkSpec)
1210         t.PiecesByBytesLeft.ValueChanged(t.Pieces[req.Index].bytesLeftElement)
1211         if len(t.Pieces[req.Index].PendingChunkSpecs) == 0 {
1212                 me.queuePieceCheck(t, req.Index)
1213         }
1214
1215         // Unprioritize the chunk.
1216         me.downloadStrategy.TorrentGotChunk(t, req)
1217
1218         // Cancel pending requests for this chunk.
1219         cancelled := false
1220         for _, c := range t.Conns {
1221                 if me.connCancel(t, c, req) {
1222                         cancelled = true
1223                         me.replenishConnRequests(t, c)
1224                 }
1225         }
1226         if cancelled {
1227                 log.Printf("cancelled concurrent requests for %v", req)
1228         }
1229
1230         me.dataReady(dataSpec{t.InfoHash, req})
1231         return nil
1232 }
1233
1234 func (cl *Client) dataReady(ds dataSpec) {
1235         if cl.dataWaiter != nil {
1236                 close(cl.dataWaiter)
1237         }
1238         cl.dataWaiter = nil
1239 }
1240
1241 // Returns a channel that is closed when new data has become available in the
1242 // client.
1243 func (me *Client) DataWaiter() <-chan struct{} {
1244         me.mu.Lock()
1245         defer me.mu.Unlock()
1246         if me.dataWaiter == nil {
1247                 me.dataWaiter = make(chan struct{})
1248         }
1249         return me.dataWaiter
1250 }
1251
1252 func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) {
1253         p := t.Pieces[piece]
1254         p.EverHashed = true
1255         if correct {
1256                 p.PendingChunkSpecs = nil
1257                 me.downloadStrategy.TorrentGotPiece(t, int(piece))
1258                 me.dataReady(dataSpec{
1259                         t.InfoHash,
1260                         request{
1261                                 pp.Integer(piece),
1262                                 chunkSpec{0, pp.Integer(t.PieceLength(piece))},
1263                         },
1264                 })
1265         } else {
1266                 if len(p.PendingChunkSpecs) == 0 {
1267                         t.pendAllChunkSpecs(piece)
1268                 }
1269         }
1270         for _, conn := range t.Conns {
1271                 if correct {
1272                         conn.Post(pp.Message{
1273                                 Type:  pp.Have,
1274                                 Index: pp.Integer(piece),
1275                         })
1276                         // TODO: Cancel requests for this piece.
1277                 } else {
1278                         if conn.PeerHasPiece(piece) {
1279                                 me.replenishConnRequests(t, conn)
1280                         }
1281                 }
1282         }
1283         me.event.Broadcast()
1284 }
1285
1286 func (cl *Client) verifyPiece(t *torrent, index pp.Integer) {
1287         cl.mu.Lock()
1288         p := t.Pieces[index]
1289         for p.Hashing {
1290                 cl.event.Wait()
1291         }
1292         if t.isClosed() {
1293                 cl.mu.Unlock()
1294                 return
1295         }
1296         p.Hashing = true
1297         p.QueuedForHash = false
1298         cl.mu.Unlock()
1299         sum := t.HashPiece(index)
1300         cl.mu.Lock()
1301         p.Hashing = false
1302         cl.pieceHashed(t, index, sum == p.Hash)
1303         cl.mu.Unlock()
1304 }
1305
1306 func (me *Client) Torrents() (ret []*torrent) {
1307         me.mu.Lock()
1308         for _, t := range me.torrents {
1309                 ret = append(ret, t)
1310         }
1311         me.mu.Unlock()
1312         return
1313 }