]> Sergey Matveev's repositories - btrtrc.git/blob - client.go
e079881ab2b66d06cde800fc70cd49e21715dcee
[btrtrc.git] / client.go
1 /*
2 Package torrent implements a torrent client.
3
4 Simple example:
5
6         c := &Client{}
7         c.Start()
8         defer c.Stop()
9         if err := c.AddTorrent(externalMetaInfoPackageSux); err != nil {
10                 return fmt.Errors("error adding torrent: %s", err)
11         }
12         c.WaitAll()
13         log.Print("erhmahgerd, torrent downloaded")
14
15 */
16 package torrent
17
18 import (
19         "bufio"
20         "container/list"
21         "crypto/rand"
22         "errors"
23         "fmt"
24         "io"
25         "log"
26         mathRand "math/rand"
27         "net"
28         "os"
29         "sync"
30         "syscall"
31         "time"
32
33         metainfo "github.com/nsf/libtorgo/torrent"
34
35         pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
36         "bitbucket.org/anacrolix/go.torrent/tracker"
37         _ "bitbucket.org/anacrolix/go.torrent/tracker/udp"
38 )
39
40 // Currently doesn't really queue, but should in the future.
41 func (cl *Client) queuePieceCheck(t *torrent, pieceIndex pp.Integer) {
42         piece := t.Pieces[pieceIndex]
43         if piece.QueuedForHash {
44                 return
45         }
46         piece.QueuedForHash = true
47         go cl.verifyPiece(t, pieceIndex)
48 }
49
50 // Queues the torrent data for the given region for download. The beginning of
51 // the region is given highest priority to allow a subsequent read at the same
52 // offset to return data ASAP.
53 func (me *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) error {
54         me.mu.Lock()
55         defer me.mu.Unlock()
56         t := me.torrent(ih)
57         if t == nil {
58                 return errors.New("no such active torrent")
59         }
60         newPriorities := make([]request, 0, (len_+chunkSize-1)/chunkSize)
61         for len_ > 0 {
62                 req, ok := t.offsetRequest(off)
63                 if !ok {
64                         return errors.New("bad offset")
65                 }
66                 reqOff := t.requestOffset(req)
67                 // Gain the alignment adjustment.
68                 len_ += off - reqOff
69                 // Lose the length of this block.
70                 len_ -= int64(req.Length)
71                 off = reqOff + int64(req.Length)
72                 if !t.wantPiece(int(req.Index)) {
73                         continue
74                 }
75                 newPriorities = append(newPriorities, req)
76         }
77         if len(newPriorities) == 0 {
78                 return nil
79         }
80         t.Priorities.PushFront(newPriorities[0])
81         for _, req := range newPriorities[1:] {
82                 t.Priorities.PushBack(req)
83         }
84         for _, cn := range t.Conns {
85                 me.replenishConnRequests(t, cn)
86         }
87         return nil
88 }
89
90 type dataSpec struct {
91         InfoHash
92         request
93 }
94
95 type Client struct {
96         DataDir          string
97         HalfOpenLimit    int
98         PeerId           [20]byte
99         Listener         net.Listener
100         DisableTrackers  bool
101         DownloadStrategy DownloadStrategy
102
103         mu    sync.Mutex
104         event sync.Cond
105         quit  chan struct{}
106
107         halfOpen   int
108         torrents   map[InfoHash]*torrent
109         dataWaiter chan struct{}
110 }
111
112 // Read torrent data at the given offset. Returns ErrDataNotReady if the data
113 // isn't available.
114 func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err error) {
115         cl.mu.Lock()
116         defer cl.mu.Unlock()
117         t := cl.torrent(ih)
118         if t == nil {
119                 err = errors.New("unknown torrent")
120                 return
121         }
122         index := pp.Integer(off / t.MetaInfo.PieceLength)
123         // Reading outside the bounds of a file is an error.
124         if index < 0 {
125                 err = os.ErrInvalid
126                 return
127         }
128         if int(index) >= len(t.Pieces) {
129                 err = io.EOF
130                 return
131         }
132         t.lastReadPiece = int(index)
133         piece := t.Pieces[index]
134         pieceOff := pp.Integer(off % int64(t.PieceLength(0)))
135         high := int(t.PieceLength(index) - pieceOff)
136         if high < len(p) {
137                 p = p[:high]
138         }
139         for cs, _ := range piece.PendingChunkSpecs {
140                 chunkOff := int64(pieceOff) - int64(cs.Begin)
141                 if chunkOff >= int64(t.PieceLength(index)) {
142                         panic(chunkOff)
143                 }
144                 if 0 <= chunkOff && chunkOff < int64(cs.Length) {
145                         // read begins in a pending chunk
146                         err = ErrDataNotReady
147                         return
148                 }
149                 // pending chunk caps available data
150                 if chunkOff < 0 && int64(len(p)) > -chunkOff {
151                         p = p[:-chunkOff]
152                 }
153         }
154         return t.Data.ReadAt(p, off)
155 }
156
157 // Starts the client. Defaults are applied. The client will begin accepting
158 // connections and tracking.
159 func (c *Client) Start() {
160         c.event.L = &c.mu
161         c.torrents = make(map[InfoHash]*torrent)
162         if c.HalfOpenLimit == 0 {
163                 c.HalfOpenLimit = 10
164         }
165         o := copy(c.PeerId[:], BEP20)
166         _, err := rand.Read(c.PeerId[o:])
167         if err != nil {
168                 panic("error generating peer id")
169         }
170         c.quit = make(chan struct{})
171         if c.DownloadStrategy == nil {
172                 c.DownloadStrategy = &DefaultDownloadStrategy{}
173         }
174         if c.Listener != nil {
175                 go c.acceptConnections()
176         }
177 }
178
179 func (cl *Client) stopped() bool {
180         select {
181         case <-cl.quit:
182                 return true
183         default:
184                 return false
185         }
186 }
187
188 // Stops the client. All connections to peers are closed and all activity will
189 // come to a halt.
190 func (me *Client) Stop() {
191         me.mu.Lock()
192         close(me.quit)
193         me.event.Broadcast()
194         for _, t := range me.torrents {
195                 for _, c := range t.Conns {
196                         c.Close()
197                 }
198         }
199         me.mu.Unlock()
200 }
201
202 func (cl *Client) acceptConnections() {
203         for {
204                 conn, err := cl.Listener.Accept()
205                 select {
206                 case <-cl.quit:
207                         return
208                 default:
209                 }
210                 if err != nil {
211                         log.Print(err)
212                         return
213                 }
214                 go func() {
215                         if err := cl.runConnection(conn, nil); err != nil {
216                                 log.Print(err)
217                         }
218                 }()
219         }
220 }
221
222 func (me *Client) torrent(ih InfoHash) *torrent {
223         for _, t := range me.torrents {
224                 if t.InfoHash == ih {
225                         return t
226                 }
227         }
228         return nil
229 }
230
231 func (me *Client) initiateConn(peer Peer, torrent *torrent) {
232         if peer.Id == me.PeerId {
233                 return
234         }
235         me.halfOpen++
236         go func() {
237                 addr := &net.TCPAddr{
238                         IP:   peer.IP,
239                         Port: peer.Port,
240                 }
241                 conn, err := net.DialTimeout(addr.Network(), addr.String(), dialTimeout)
242
243                 go func() {
244                         me.mu.Lock()
245                         defer me.mu.Unlock()
246                         if me.halfOpen == 0 {
247                                 panic("assert")
248                         }
249                         me.halfOpen--
250                         me.openNewConns()
251                 }()
252
253                 if netOpErr, ok := err.(*net.OpError); ok {
254                         if netOpErr.Timeout() {
255                                 return
256                         }
257                         switch netOpErr.Err {
258                         case syscall.ECONNREFUSED, syscall.EHOSTUNREACH:
259                                 return
260                         }
261                 }
262                 if err != nil {
263                         log.Printf("error connecting to peer: %s %#v", err, err)
264                         return
265                 }
266                 log.Printf("connected to %s", conn.RemoteAddr())
267                 err = me.runConnection(conn, torrent)
268                 if err != nil {
269                         log.Print(err)
270                 }
271         }()
272 }
273
274 func (me *Client) runConnection(sock net.Conn, torrent *torrent) (err error) {
275         conn := &connection{
276                 Socket:          sock,
277                 Choked:          true,
278                 PeerChoked:      true,
279                 write:           make(chan []byte),
280                 post:            make(chan pp.Message),
281                 PeerMaxRequests: 250,
282         }
283         defer func() {
284                 // There's a lock and deferred unlock later in this function. The
285                 // client will not be locked when this deferred is invoked.
286                 me.mu.Lock()
287                 defer me.mu.Unlock()
288                 conn.Close()
289         }()
290         go conn.writer()
291         // go conn.writeOptimizer()
292         conn.write <- pp.Bytes(pp.Protocol)
293         conn.write <- pp.Bytes("\x00\x00\x00\x00\x00\x00\x00\x00")
294         if torrent != nil {
295                 conn.write <- pp.Bytes(torrent.InfoHash[:])
296                 conn.write <- pp.Bytes(me.PeerId[:])
297         }
298         var b [28]byte
299         _, err = io.ReadFull(conn.Socket, b[:])
300         if err == io.EOF {
301                 return nil
302         }
303         if err != nil {
304                 err = fmt.Errorf("when reading protocol and extensions: %s", err)
305                 return
306         }
307         if string(b[:20]) != pp.Protocol {
308                 err = fmt.Errorf("wrong protocol: %#v", string(b[:20]))
309                 return
310         }
311         if 8 != copy(conn.PeerExtensions[:], b[20:]) {
312                 panic("wtf")
313         }
314         // log.Printf("peer extensions: %#v", string(conn.PeerExtensions[:]))
315         var infoHash [20]byte
316         _, err = io.ReadFull(conn.Socket, infoHash[:])
317         if err != nil {
318                 return fmt.Errorf("reading peer info hash: %s", err)
319         }
320         _, err = io.ReadFull(conn.Socket, conn.PeerId[:])
321         if err != nil {
322                 return fmt.Errorf("reading peer id: %s", err)
323         }
324         if torrent == nil {
325                 torrent = me.torrent(infoHash)
326                 if torrent == nil {
327                         return
328                 }
329                 conn.write <- pp.Bytes(torrent.InfoHash[:])
330                 conn.write <- pp.Bytes(me.PeerId[:])
331         }
332         me.mu.Lock()
333         defer me.mu.Unlock()
334         if !me.addConnection(torrent, conn) {
335                 return
336         }
337         go conn.writeOptimizer(time.Minute)
338         if torrent.haveAnyPieces() {
339                 conn.Post(pp.Message{
340                         Type:     pp.Bitfield,
341                         Bitfield: torrent.bitfield(),
342                 })
343         }
344         err = me.connectionLoop(torrent, conn)
345         if err != nil {
346                 err = fmt.Errorf("during Connection loop: %s", err)
347         }
348         me.dropConnection(torrent, conn)
349         return
350 }
351
352 func (me *Client) peerGotPiece(torrent *torrent, conn *connection, piece int) {
353         if conn.PeerPieces == nil {
354                 conn.PeerPieces = make([]bool, len(torrent.Pieces))
355         }
356         conn.PeerPieces[piece] = true
357         if torrent.wantPiece(piece) {
358                 me.replenishConnRequests(torrent, conn)
359         }
360 }
361
362 func (me *Client) peerUnchoked(torrent *torrent, conn *connection) {
363         me.replenishConnRequests(torrent, conn)
364 }
365
366 func (cl *Client) connCancel(t *torrent, cn *connection, r request) (ok bool) {
367         ok = cn.Cancel(r)
368         if ok {
369                 cl.DownloadStrategy.DeleteRequest(t, r)
370         }
371         return
372 }
373
374 func (cl *Client) connDeleteRequest(t *torrent, cn *connection, r request) {
375         if !cn.RequestPending(r) {
376                 return
377         }
378         cl.DownloadStrategy.DeleteRequest(t, r)
379         delete(cn.Requests, r)
380 }
381
382 func (me *Client) connectionLoop(t *torrent, c *connection) error {
383         decoder := pp.Decoder{
384                 R:         bufio.NewReader(c.Socket),
385                 MaxLength: 256 * 1024,
386         }
387         for {
388                 me.mu.Unlock()
389                 var msg pp.Message
390                 err := decoder.Decode(&msg)
391                 me.mu.Lock()
392                 if err != nil {
393                         if me.stopped() || err == io.EOF {
394                                 return nil
395                         }
396                         return err
397                 }
398                 if msg.Keepalive {
399                         continue
400                 }
401                 switch msg.Type {
402                 case pp.Choke:
403                         c.PeerChoked = true
404                         for r := range c.Requests {
405                                 me.connDeleteRequest(t, c, r)
406                         }
407                 case pp.Unchoke:
408                         c.PeerChoked = false
409                         me.peerUnchoked(t, c)
410                 case pp.Interested:
411                         c.PeerInterested = true
412                         // TODO: This should be done from a dedicated unchoking routine.
413                         c.Unchoke()
414                 case pp.NotInterested:
415                         c.PeerInterested = false
416                         c.Choke()
417                 case pp.Have:
418                         me.peerGotPiece(t, c, int(msg.Index))
419                 case pp.Request:
420                         if c.PeerRequests == nil {
421                                 c.PeerRequests = make(map[request]struct{}, maxRequests)
422                         }
423                         request := newRequest(msg.Index, msg.Begin, msg.Length)
424                         c.PeerRequests[request] = struct{}{}
425                         // TODO: Requests should be satisfied from a dedicated upload routine.
426                         p := make([]byte, msg.Length)
427                         n, err := t.Data.ReadAt(p, int64(t.PieceLength(0))*int64(msg.Index)+int64(msg.Begin))
428                         if err != nil {
429                                 return fmt.Errorf("reading t data to serve request %s: %s", request, err)
430                         }
431                         if n != int(msg.Length) {
432                                 return fmt.Errorf("bad request: %s", msg)
433                         }
434                         c.Post(pp.Message{
435                                 Type:  pp.Piece,
436                                 Index: msg.Index,
437                                 Begin: msg.Begin,
438                                 Piece: p,
439                         })
440                 case pp.Cancel:
441                         req := newRequest(msg.Index, msg.Begin, msg.Length)
442                         if !c.PeerCancel(req) {
443                                 log.Printf("received unexpected cancel: %v", req)
444                         }
445                 case pp.Bitfield:
446                         if len(msg.Bitfield) < len(t.Pieces) {
447                                 err = errors.New("received invalid bitfield")
448                                 break
449                         }
450                         if c.PeerPieces != nil {
451                                 err = errors.New("received unexpected bitfield")
452                                 break
453                         }
454                         c.PeerPieces = msg.Bitfield[:len(t.Pieces)]
455                         for index, has := range c.PeerPieces {
456                                 if has {
457                                         me.peerGotPiece(t, c, index)
458                                 }
459                         }
460                 case pp.Piece:
461                         err = me.downloadedChunk(t, c, &msg)
462                 default:
463                         err = fmt.Errorf("received unknown message type: %#v", msg.Type)
464                 }
465                 if err != nil {
466                         return err
467                 }
468         }
469 }
470
471 func (me *Client) dropConnection(torrent *torrent, conn *connection) {
472         conn.Socket.Close()
473         for r := range conn.Requests {
474                 me.connDeleteRequest(torrent, conn, r)
475         }
476         for i0, c := range torrent.Conns {
477                 if c != conn {
478                         continue
479                 }
480                 i1 := len(torrent.Conns) - 1
481                 if i0 != i1 {
482                         torrent.Conns[i0] = torrent.Conns[i1]
483                 }
484                 torrent.Conns = torrent.Conns[:i1]
485                 return
486         }
487         panic("connection not found")
488 }
489
490 func (me *Client) addConnection(t *torrent, c *connection) bool {
491         for _, c0 := range t.Conns {
492                 if c.PeerId == c0.PeerId {
493                         // Already connected to a client with that ID.
494                         return false
495                 }
496         }
497         t.Conns = append(t.Conns, c)
498         return true
499 }
500
501 func (me *Client) openNewConns() {
502         for _, t := range me.torrents {
503                 for len(t.Peers) != 0 {
504                         if me.halfOpen >= me.HalfOpenLimit {
505                                 return
506                         }
507                         p := t.Peers[0]
508                         t.Peers = t.Peers[1:]
509                         me.initiateConn(p, t)
510                 }
511         }
512 }
513
514 // Adds peers to the swarm for the torrent corresponding to infoHash.
515 func (me *Client) AddPeers(infoHash InfoHash, peers []Peer) error {
516         me.mu.Lock()
517         t := me.torrent(infoHash)
518         if t == nil {
519                 return errors.New("no such torrent")
520         }
521         t.Peers = append(t.Peers, peers...)
522         me.openNewConns()
523         me.mu.Unlock()
524         return nil
525 }
526
527 // Prepare a Torrent without any attachment to a Client. That means we can
528 // initialize fields all fields that don't require the Client without locking
529 // it.
530 func newTorrent(metaInfo *metainfo.MetaInfo, dataDir string) (t *torrent, err error) {
531         t = &torrent{
532                 InfoHash: BytesInfoHash(metaInfo.InfoHash),
533                 MetaInfo: metaInfo,
534         }
535         t.Data, err = mmapTorrentData(metaInfo, dataDir)
536         if err != nil {
537                 return
538         }
539         for offset := 0; offset < len(metaInfo.Pieces); offset += pieceHash.Size() {
540                 hash := metaInfo.Pieces[offset : offset+pieceHash.Size()]
541                 if len(hash) != pieceHash.Size() {
542                         err = errors.New("bad piece hash in metainfo")
543                         return
544                 }
545                 piece := &piece{}
546                 copyHashSum(piece.Hash[:], hash)
547                 t.Pieces = append(t.Pieces, piece)
548                 t.pendAllChunkSpecs(pp.Integer(len(t.Pieces) - 1))
549         }
550         t.Trackers = make([][]tracker.Client, len(metaInfo.AnnounceList))
551         for tierIndex := range metaInfo.AnnounceList {
552                 tier := t.Trackers[tierIndex]
553                 for _, url := range metaInfo.AnnounceList[tierIndex] {
554                         tr, err := tracker.New(url)
555                         if err != nil {
556                                 log.Print(err)
557                                 continue
558                         }
559                         tier = append(tier, tr)
560                 }
561                 // The trackers within each tier must be shuffled before use.
562                 // http://stackoverflow.com/a/12267471/149482
563                 // http://www.bittorrent.org/beps/bep_0012.html#order-of-processing
564                 for i := range tier {
565                         j := mathRand.Intn(i + 1)
566                         tier[i], tier[j] = tier[j], tier[i]
567                 }
568                 t.Trackers[tierIndex] = tier
569         }
570         return
571 }
572
573 // Adds the torrent to the client.
574 func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) error {
575         torrent, err := newTorrent(metaInfo, me.DataDir)
576         if err != nil {
577                 return err
578         }
579         me.mu.Lock()
580         defer me.mu.Unlock()
581         if _, ok := me.torrents[torrent.InfoHash]; ok {
582                 return torrent.Close()
583         }
584         me.torrents[torrent.InfoHash] = torrent
585         me.DownloadStrategy.TorrentStarted(torrent)
586         if !me.DisableTrackers {
587                 go me.announceTorrent(torrent)
588         }
589         torrent.Priorities = list.New()
590
591         // Queue all pieces for hashing. This is done sequentially to avoid
592         // spamming goroutines.
593         for _, p := range torrent.Pieces {
594                 p.QueuedForHash = true
595         }
596         go func() {
597                 for i := range torrent.Pieces {
598                         me.verifyPiece(torrent, pp.Integer(i))
599                 }
600         }()
601
602         return nil
603 }
604
605 func (cl *Client) listenerAnnouncePort() (port int16) {
606         l := cl.Listener
607         if l == nil {
608                 return
609         }
610         addr := l.Addr()
611         switch data := addr.(type) {
612         case *net.TCPAddr:
613                 return int16(data.Port)
614         case *net.UDPAddr:
615                 return int16(data.Port)
616         default:
617                 log.Printf("unknown listener addr type: %T", addr)
618         }
619         return
620 }
621
622 func (cl *Client) announceTorrent(t *torrent) {
623         req := tracker.AnnounceRequest{
624                 Event:    tracker.Started,
625                 NumWant:  -1,
626                 Port:     cl.listenerAnnouncePort(),
627                 PeerId:   cl.PeerId,
628                 InfoHash: t.InfoHash,
629         }
630 newAnnounce:
631         for {
632                 cl.mu.Lock()
633                 req.Left = t.BytesLeft()
634                 cl.mu.Unlock()
635                 for _, tier := range t.Trackers {
636                         for trIndex, tr := range tier {
637                                 if err := tr.Connect(); err != nil {
638                                         log.Print(err)
639                                         continue
640                                 }
641                                 resp, err := tr.Announce(&req)
642                                 if err != nil {
643                                         log.Print(err)
644                                         continue
645                                 }
646                                 var peers []Peer
647                                 for _, peer := range resp.Peers {
648                                         peers = append(peers, Peer{
649                                                 IP:   peer.IP,
650                                                 Port: peer.Port,
651                                         })
652                                 }
653                                 err = cl.AddPeers(t.InfoHash, peers)
654                                 if err != nil {
655                                         log.Print(err)
656                                 } else {
657                                         log.Printf("%s: %d new peers from %s", t, len(peers), tr)
658                                 }
659                                 tier[0], tier[trIndex] = tier[trIndex], tier[0]
660                                 time.Sleep(time.Second * time.Duration(resp.Interval))
661                                 req.Event = tracker.None
662                                 continue newAnnounce
663                         }
664                 }
665                 time.Sleep(5 * time.Second)
666         }
667 }
668
669 func (cl *Client) allTorrentsCompleted() bool {
670         for _, t := range cl.torrents {
671                 if !t.haveAllPieces() {
672                         return false
673                 }
674         }
675         return true
676 }
677
678 // Returns true when all torrents are completely downloaded and false if the
679 // client is stopped.
680 func (me *Client) WaitAll() bool {
681         me.mu.Lock()
682         defer me.mu.Unlock()
683         for !me.allTorrentsCompleted() {
684                 if me.stopped() {
685                         return false
686                 }
687                 me.event.Wait()
688         }
689         return true
690 }
691
692 type DownloadStrategy interface {
693         FillRequests(t *torrent, c *connection)
694         TorrentStarted(t *torrent)
695         TorrentStopped(t *torrent)
696         DeleteRequest(t *torrent, r request)
697 }
698
699 type DefaultDownloadStrategy struct {
700         heat map[*torrent]map[request]int
701 }
702
703 func (cl *Client) assertRequestHeat() {
704         dds, ok := cl.DownloadStrategy.(*DefaultDownloadStrategy)
705         if !ok {
706                 return
707         }
708         for _, t := range cl.torrents {
709                 m := make(map[request]int, 3000)
710                 for _, cn := range t.Conns {
711                         for r := range cn.Requests {
712                                 m[r]++
713                         }
714                 }
715                 for r, h := range dds.heat[t] {
716                         if m[r] != h {
717                                 panic(fmt.Sprintln(m[r], h))
718                         }
719                 }
720         }
721 }
722
723 func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) {
724         th := s.heat[t]
725         addRequest := func(req request) (again bool) {
726                 piece := t.Pieces[req.Index]
727                 if piece.Hashing {
728                         // We can't be sure we want this.
729                         return true
730                 }
731                 if piece.Complete() {
732                         // We already have this.
733                         return true
734                 }
735                 if c.RequestPending(req) {
736                         return true
737                 }
738                 again = c.Request(req)
739                 if c.RequestPending(req) {
740                         th[req]++
741                 }
742                 return
743         }
744         // First request prioritized chunks.
745         for e := t.Priorities.Front(); e != nil; e = e.Next() {
746                 if !addRequest(e.Value.(request)) {
747                         return
748                 }
749         }
750         ppbs := t.piecesByPendingBytes()
751         // Then finish off incomplete pieces in order of bytes remaining.
752         for _, heatThreshold := range []int{0, 4, 100} {
753                 for _, pieceIndex := range ppbs {
754                         for chunkSpec := range t.Pieces[pieceIndex].PendingChunkSpecs {
755                                 r := request{pieceIndex, chunkSpec}
756                                 if th[r] > heatThreshold {
757                                         continue
758                                 }
759                                 if !addRequest(request{pieceIndex, chunkSpec}) {
760                                         return
761                                 }
762                         }
763                 }
764         }
765 }
766
767 func (s *DefaultDownloadStrategy) TorrentStarted(t *torrent) {
768         if s.heat[t] != nil {
769                 panic("torrent already started")
770         }
771         if s.heat == nil {
772                 s.heat = make(map[*torrent]map[request]int, 10)
773         }
774         s.heat[t] = make(map[request]int, t.ChunkCount())
775 }
776
777 func (s *DefaultDownloadStrategy) TorrentStopped(t *torrent) {
778         if _, ok := s.heat[t]; !ok {
779                 panic("torrent not yet started")
780         }
781         delete(s.heat, t)
782 }
783
784 func (s *DefaultDownloadStrategy) DeleteRequest(t *torrent, r request) {
785         m := s.heat[t]
786         if m[r] <= 0 {
787                 panic("no pending requests")
788         }
789         m[r]--
790 }
791
792 type ResponsiveDownloadStrategy struct {
793         // How many bytes to preemptively download starting at the beginning of
794         // the last piece read for a given torrent.
795         Readahead int
796 }
797
798 func (ResponsiveDownloadStrategy) TorrentStarted(*torrent)         {}
799 func (ResponsiveDownloadStrategy) TorrentStopped(*torrent)         {}
800 func (ResponsiveDownloadStrategy) DeleteRequest(*torrent, request) {}
801
802 func (me *ResponsiveDownloadStrategy) FillRequests(t *torrent, c *connection) {
803         for e := t.Priorities.Front(); e != nil; e = e.Next() {
804                 if !c.Request(e.Value.(request)) {
805                         return
806                 }
807         }
808         readaheadPieces := (me.Readahead + t.UsualPieceSize() - 1) / t.UsualPieceSize()
809         for i := t.lastReadPiece; i < t.lastReadPiece+readaheadPieces && i < t.NumPieces(); i++ {
810                 for cs := range t.Pieces[i].PendingChunkSpecs {
811                         if !c.Request(request{pp.Integer(i), cs}) {
812                                 return
813                         }
814                 }
815         }
816         // Then finish off incomplete pieces in order of bytes remaining.
817         for _, index := range t.piecesByPendingBytes() {
818                 // Stop when we're onto untouched pieces.
819                 if t.PieceNumPendingBytes(index) == t.PieceLength(index) {
820                         break
821                 }
822                 for chunkSpec := range t.Pieces[index].PendingChunkSpecs {
823                         if !c.Request(request{index, chunkSpec}) {
824                                 return
825                         }
826                 }
827         }
828 }
829
830 func (me *Client) replenishConnRequests(t *torrent, c *connection) {
831         me.DownloadStrategy.FillRequests(t, c)
832         if len(c.Requests) == 0 {
833                 c.SetInterested(false)
834         }
835 }
836
837 func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) error {
838         req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
839
840         // Request has been satisfied.
841         me.connDeleteRequest(t, c, req)
842
843         defer me.replenishConnRequests(t, c)
844
845         // Do we actually want this chunk?
846         if _, ok := t.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok {
847                 return nil
848         }
849
850         // Write the chunk out.
851         err := t.WriteChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
852         if err != nil {
853                 return err
854         }
855         me.dataReady(dataSpec{t.InfoHash, req})
856
857         // Cancel pending requests for this chunk.
858         for _, c := range t.Conns {
859                 if me.connCancel(t, c, req) {
860                         me.replenishConnRequests(t, c)
861                 }
862         }
863
864         // Record that we have the chunk.
865         delete(t.Pieces[req.Index].PendingChunkSpecs, req.chunkSpec)
866         if len(t.Pieces[req.Index].PendingChunkSpecs) == 0 {
867                 me.queuePieceCheck(t, req.Index)
868         }
869
870         // Unprioritize the chunk.
871         var next *list.Element
872         for e := t.Priorities.Front(); e != nil; e = next {
873                 next = e.Next()
874                 if e.Value.(request) == req {
875                         t.Priorities.Remove(e)
876                 }
877         }
878
879         return nil
880 }
881
882 func (cl *Client) dataReady(ds dataSpec) {
883         if cl.dataWaiter != nil {
884                 close(cl.dataWaiter)
885         }
886         cl.dataWaiter = nil
887 }
888
889 // Returns a channel that is closed when new data has become available in the
890 // client.
891 func (me *Client) DataWaiter() <-chan struct{} {
892         me.mu.Lock()
893         defer me.mu.Unlock()
894         if me.dataWaiter == nil {
895                 me.dataWaiter = make(chan struct{})
896         }
897         return me.dataWaiter
898 }
899
900 func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) {
901         p := t.Pieces[piece]
902         p.EverHashed = true
903         if correct {
904                 p.PendingChunkSpecs = nil
905                 log.Printf("%s: got piece %d, (%d/%d)", t, piece, t.NumPiecesCompleted(), t.NumPieces())
906                 var next *list.Element
907                 for e := t.Priorities.Front(); e != nil; e = next {
908                         next = e.Next()
909                         if e.Value.(request).Index == piece {
910                                 t.Priorities.Remove(e)
911                         }
912                 }
913                 me.dataReady(dataSpec{
914                         t.InfoHash,
915                         request{
916                                 pp.Integer(piece),
917                                 chunkSpec{0, pp.Integer(t.PieceLength(piece))},
918                         },
919                 })
920         } else {
921                 if len(p.PendingChunkSpecs) == 0 {
922                         t.pendAllChunkSpecs(piece)
923                 }
924         }
925         for _, conn := range t.Conns {
926                 if correct {
927                         conn.Post(pp.Message{
928                                 Type:  pp.Have,
929                                 Index: pp.Integer(piece),
930                         })
931                         // TODO: Cancel requests for this piece.
932                 } else {
933                         if conn.PeerHasPiece(piece) {
934                                 me.replenishConnRequests(t, conn)
935                         }
936                 }
937         }
938         me.event.Broadcast()
939 }
940
941 func (cl *Client) verifyPiece(t *torrent, index pp.Integer) {
942         cl.mu.Lock()
943         p := t.Pieces[index]
944         for p.Hashing {
945                 cl.event.Wait()
946         }
947         p.Hashing = true
948         p.QueuedForHash = false
949         cl.mu.Unlock()
950         sum := t.HashPiece(index)
951         cl.mu.Lock()
952         p.Hashing = false
953         cl.pieceHashed(t, index, sum == p.Hash)
954         cl.mu.Unlock()
955 }
956
957 func (me *Client) Torrents() (ret []*torrent) {
958         me.mu.Lock()
959         for _, t := range me.torrents {
960                 ret = append(ret, t)
961         }
962         me.mu.Unlock()
963         return
964 }