]> Sergey Matveev's repositories - btrtrc.git/blob - client.go
Make Request private
[btrtrc.git] / client.go
1 /*
2 Package torrent implements a torrent client.
3
4 Simple example:
5
6         c := &Client{}
7         c.Start()
8         defer c.Stop()
9         if err := c.AddTorrent(externalMetaInfoPackageSux); err != nil {
10                 return fmt.Errors("error adding torrent: %s", err)
11         }
12         c.WaitAll()
13         log.Print("erhmahgerd, torrent downloaded")
14
15 */
16 package torrent
17
18 import (
19         "bufio"
20         "container/list"
21         "crypto/rand"
22         "encoding"
23         "errors"
24         "fmt"
25         "io"
26         "log"
27         mathRand "math/rand"
28         "net"
29         "os"
30         "sync"
31         "syscall"
32         "time"
33
34         metainfo "github.com/nsf/libtorgo/torrent"
35
36         "bitbucket.org/anacrolix/go.torrent/peer_protocol"
37         "bitbucket.org/anacrolix/go.torrent/tracker"
38         _ "bitbucket.org/anacrolix/go.torrent/tracker/udp"
39 )
40
41 // Currently doesn't really queue, but should in the future.
42 func (cl *Client) queuePieceCheck(t *torrent, pieceIndex peer_protocol.Integer) {
43         piece := t.Pieces[pieceIndex]
44         if piece.QueuedForHash {
45                 return
46         }
47         piece.QueuedForHash = true
48         go cl.verifyPiece(t, pieceIndex)
49 }
50
51 // Queues the torrent data for the given region for download. The beginning of
52 // the region is given highest priority to allow a subsequent read at the same
53 // offset to return data ASAP.
54 func (me *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) error {
55         me.mu.Lock()
56         defer me.mu.Unlock()
57         t := me.torrent(ih)
58         if t == nil {
59                 return errors.New("no such active torrent")
60         }
61         newPriorities := make([]request, 0, (len_+chunkSize-1)/chunkSize)
62         for len_ > 0 {
63                 req, ok := t.offsetRequest(off)
64                 if !ok {
65                         return errors.New("bad offset")
66                 }
67                 reqOff := t.requestOffset(req)
68                 // Gain the alignment adjustment.
69                 len_ += off - reqOff
70                 // Lose the length of this block.
71                 len_ -= int64(req.Length)
72                 off = reqOff + int64(req.Length)
73                 if !t.wantPiece(int(req.Index)) {
74                         continue
75                 }
76                 newPriorities = append(newPriorities, req)
77         }
78         if len(newPriorities) == 0 {
79                 return nil
80         }
81         if t.Priorities == nil {
82                 t.Priorities = list.New()
83         }
84         t.Priorities.PushFront(newPriorities[0])
85         for _, req := range newPriorities[1:] {
86                 t.Priorities.PushBack(req)
87         }
88         for _, cn := range t.Conns {
89                 me.replenishConnRequests(t, cn)
90         }
91         return nil
92 }
93
94 type dataSpec struct {
95         InfoHash
96         request
97 }
98
99 type Client struct {
100         DataDir         string
101         HalfOpenLimit   int
102         PeerId          [20]byte
103         Listener        net.Listener
104         DisableTrackers bool
105
106         mu    sync.Mutex
107         event sync.Cond
108         quit  chan struct{}
109
110         halfOpen   int
111         torrents   map[InfoHash]*torrent
112         dataWaiter chan struct{}
113 }
114
115 // Read torrent data at the given offset. Returns ErrDataNotReady if the data
116 // isn't available.
117 func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err error) {
118         cl.mu.Lock()
119         defer cl.mu.Unlock()
120         t := cl.torrent(ih)
121         if t == nil {
122                 err = errors.New("unknown torrent")
123                 return
124         }
125         index := peer_protocol.Integer(off / t.MetaInfo.PieceLength)
126         // Reading outside the bounds of a file is an error.
127         if index < 0 {
128                 err = os.ErrInvalid
129                 return
130         }
131         if int(index) >= len(t.Pieces) {
132                 err = io.EOF
133                 return
134         }
135         piece := t.Pieces[index]
136         if !piece.EverHashed {
137                 cl.queuePieceCheck(t, index)
138         }
139         if piece.Hashing {
140                 err = ErrDataNotReady
141                 return
142         }
143         pieceOff := peer_protocol.Integer(off % int64(t.PieceLength(0)))
144         high := int(t.PieceLength(index) - pieceOff)
145         if high < len(p) {
146                 p = p[:high]
147         }
148         for cs, _ := range piece.PendingChunkSpecs {
149                 chunkOff := int64(pieceOff) - int64(cs.Begin)
150                 if chunkOff >= int64(t.PieceLength(index)) {
151                         panic(chunkOff)
152                 }
153                 if 0 <= chunkOff && chunkOff < int64(cs.Length) {
154                         // read begins in a pending chunk
155                         err = ErrDataNotReady
156                         return
157                 }
158                 // pending chunk caps available data
159                 if chunkOff < 0 && int64(len(p)) > -chunkOff {
160                         p = p[:-chunkOff]
161                 }
162         }
163         return t.Data.ReadAt(p, off)
164 }
165
166 // Starts the client. Defaults are applied. The client will begin accepting connections and tracking.
167 func (c *Client) Start() {
168         c.event.L = &c.mu
169         c.torrents = make(map[InfoHash]*torrent)
170         if c.HalfOpenLimit == 0 {
171                 c.HalfOpenLimit = 10
172         }
173         o := copy(c.PeerId[:], BEP20)
174         _, err := rand.Read(c.PeerId[o:])
175         if err != nil {
176                 panic("error generating peer id")
177         }
178         c.quit = make(chan struct{})
179         if c.Listener != nil {
180                 go c.acceptConnections()
181         }
182 }
183
184 func (cl *Client) stopped() bool {
185         select {
186         case <-cl.quit:
187                 return true
188         default:
189                 return false
190         }
191 }
192
193 // Stops the client. All connections to peers are closed and all activity will
194 // come to a halt.
195 func (me *Client) Stop() {
196         me.mu.Lock()
197         close(me.quit)
198         me.event.Broadcast()
199         for _, t := range me.torrents {
200                 for _, c := range t.Conns {
201                         c.Close()
202                 }
203         }
204         me.mu.Unlock()
205 }
206
207 func (cl *Client) acceptConnections() {
208         for {
209                 conn, err := cl.Listener.Accept()
210                 select {
211                 case <-cl.quit:
212                         return
213                 default:
214                 }
215                 if err != nil {
216                         log.Print(err)
217                         return
218                 }
219                 go func() {
220                         if err := cl.runConnection(conn, nil); err != nil {
221                                 log.Print(err)
222                         }
223                 }()
224         }
225 }
226
227 func (me *Client) torrent(ih InfoHash) *torrent {
228         for _, t := range me.torrents {
229                 if t.InfoHash == ih {
230                         return t
231                 }
232         }
233         return nil
234 }
235
236 func (me *Client) initiateConn(peer Peer, torrent *torrent) {
237         if peer.Id == me.PeerId {
238                 return
239         }
240         me.halfOpen++
241         go func() {
242                 addr := &net.TCPAddr{
243                         IP:   peer.IP,
244                         Port: peer.Port,
245                 }
246                 conn, err := net.DialTimeout(addr.Network(), addr.String(), dialTimeout)
247
248                 go func() {
249                         me.mu.Lock()
250                         defer me.mu.Unlock()
251                         if me.halfOpen == 0 {
252                                 panic("assert")
253                         }
254                         me.halfOpen--
255                         me.openNewConns()
256                 }()
257
258                 if netOpErr, ok := err.(*net.OpError); ok {
259                         if netOpErr.Timeout() {
260                                 return
261                         }
262                         switch netOpErr.Err {
263                         case syscall.ECONNREFUSED, syscall.EHOSTUNREACH:
264                                 return
265                         }
266                 }
267                 if err != nil {
268                         log.Printf("error connecting to peer: %s %#v", err, err)
269                         return
270                 }
271                 log.Printf("connected to %s", conn.RemoteAddr())
272                 err = me.runConnection(conn, torrent)
273                 if err != nil {
274                         log.Print(err)
275                 }
276         }()
277 }
278
279 func (me *Client) runConnection(sock net.Conn, torrent *torrent) (err error) {
280         conn := &connection{
281                 Socket:     sock,
282                 Choked:     true,
283                 PeerChoked: true,
284                 write:      make(chan []byte),
285                 post:       make(chan encoding.BinaryMarshaler),
286         }
287         defer func() {
288                 // There's a lock and deferred unlock later in this function. The
289                 // client will not be locked when this deferred is invoked.
290                 me.mu.Lock()
291                 defer me.mu.Unlock()
292                 conn.Close()
293         }()
294         go conn.writer()
295         go conn.writeOptimizer()
296         conn.post <- peer_protocol.Bytes(peer_protocol.Protocol)
297         conn.post <- peer_protocol.Bytes("\x00\x00\x00\x00\x00\x00\x00\x00")
298         if torrent != nil {
299                 conn.post <- peer_protocol.Bytes(torrent.InfoHash[:])
300                 conn.post <- peer_protocol.Bytes(me.PeerId[:])
301         }
302         var b [28]byte
303         _, err = io.ReadFull(conn.Socket, b[:])
304         if err == io.EOF {
305                 return nil
306         }
307         if err != nil {
308                 err = fmt.Errorf("when reading protocol and extensions: %s", err)
309                 return
310         }
311         if string(b[:20]) != peer_protocol.Protocol {
312                 err = fmt.Errorf("wrong protocol: %#v", string(b[:20]))
313                 return
314         }
315         if 8 != copy(conn.PeerExtensions[:], b[20:]) {
316                 panic("wtf")
317         }
318         // log.Printf("peer extensions: %#v", string(conn.PeerExtensions[:]))
319         var infoHash [20]byte
320         _, err = io.ReadFull(conn.Socket, infoHash[:])
321         if err != nil {
322                 return fmt.Errorf("reading peer info hash: %s", err)
323         }
324         _, err = io.ReadFull(conn.Socket, conn.PeerId[:])
325         if err != nil {
326                 return fmt.Errorf("reading peer id: %s", err)
327         }
328         if torrent == nil {
329                 torrent = me.torrent(infoHash)
330                 if torrent == nil {
331                         return
332                 }
333                 conn.post <- peer_protocol.Bytes(torrent.InfoHash[:])
334                 conn.post <- peer_protocol.Bytes(me.PeerId[:])
335         }
336         me.mu.Lock()
337         defer me.mu.Unlock()
338         if !me.addConnection(torrent, conn) {
339                 return
340         }
341         if torrent.haveAnyPieces() {
342                 conn.Post(peer_protocol.Message{
343                         Type:     peer_protocol.Bitfield,
344                         Bitfield: torrent.bitfield(),
345                 })
346         }
347         err = me.connectionLoop(torrent, conn)
348         if err != nil {
349                 err = fmt.Errorf("during Connection loop: %s", err)
350         }
351         me.dropConnection(torrent, conn)
352         return
353 }
354
355 func (me *Client) peerGotPiece(torrent *torrent, conn *connection, piece int) {
356         if conn.PeerPieces == nil {
357                 conn.PeerPieces = make([]bool, len(torrent.Pieces))
358         }
359         conn.PeerPieces[piece] = true
360         if torrent.wantPiece(piece) {
361                 me.replenishConnRequests(torrent, conn)
362         }
363 }
364
365 func (me *Client) peerUnchoked(torrent *torrent, conn *connection) {
366         me.replenishConnRequests(torrent, conn)
367 }
368
369 func (me *Client) connectionLoop(torrent *torrent, conn *connection) error {
370         decoder := peer_protocol.Decoder{
371                 R:         bufio.NewReader(conn.Socket),
372                 MaxLength: 256 * 1024,
373         }
374         for {
375                 me.mu.Unlock()
376                 // TODO: Can this be allocated on the stack?
377                 msg := new(peer_protocol.Message)
378                 err := decoder.Decode(msg)
379                 me.mu.Lock()
380                 if err != nil {
381                         if me.stopped() || err == io.EOF {
382                                 return nil
383                         }
384                         return err
385                 }
386                 if msg.Keepalive {
387                         continue
388                 }
389                 switch msg.Type {
390                 case peer_protocol.Choke:
391                         conn.PeerChoked = true
392                         conn.Requests = nil
393                 case peer_protocol.Unchoke:
394                         conn.PeerChoked = false
395                         me.peerUnchoked(torrent, conn)
396                 case peer_protocol.Interested:
397                         conn.PeerInterested = true
398                         // TODO: This should be done from a dedicated unchoking routine.
399                         conn.Unchoke()
400                 case peer_protocol.NotInterested:
401                         conn.PeerInterested = false
402                 case peer_protocol.Have:
403                         me.peerGotPiece(torrent, conn, int(msg.Index))
404                 case peer_protocol.Request:
405                         if conn.PeerRequests == nil {
406                                 conn.PeerRequests = make(map[request]struct{}, maxRequests)
407                         }
408                         request := request{
409                                 Index:     msg.Index,
410                                 chunkSpec: chunkSpec{msg.Begin, msg.Length},
411                         }
412                         conn.PeerRequests[request] = struct{}{}
413                         // TODO: Requests should be satisfied from a dedicated upload routine.
414                         p := make([]byte, msg.Length)
415                         n, err := torrent.Data.ReadAt(p, int64(torrent.PieceLength(0))*int64(msg.Index)+int64(msg.Begin))
416                         if err != nil {
417                                 return fmt.Errorf("reading torrent data to serve request %s: %s", request, err)
418                         }
419                         if n != int(msg.Length) {
420                                 return fmt.Errorf("bad request: %s", msg)
421                         }
422                         conn.Post(peer_protocol.Message{
423                                 Type:  peer_protocol.Piece,
424                                 Index: msg.Index,
425                                 Begin: msg.Begin,
426                                 Piece: p,
427                         })
428                 case peer_protocol.Cancel:
429                         req := newRequest(msg.Index, msg.Begin, msg.Length)
430                         if !conn.PeerCancel(req) {
431                                 log.Printf("received unexpected cancel: %v", req)
432                         }
433                 case peer_protocol.Bitfield:
434                         if len(msg.Bitfield) < len(torrent.Pieces) {
435                                 err = errors.New("received invalid bitfield")
436                                 break
437                         }
438                         if conn.PeerPieces != nil {
439                                 err = errors.New("received unexpected bitfield")
440                                 break
441                         }
442                         conn.PeerPieces = msg.Bitfield[:len(torrent.Pieces)]
443                         for index, has := range conn.PeerPieces {
444                                 if has {
445                                         me.peerGotPiece(torrent, conn, index)
446                                 }
447                         }
448                 case peer_protocol.Piece:
449                         request_ := request{msg.Index, chunkSpec{msg.Begin, peer_protocol.Integer(len(msg.Piece))}}
450                         if _, ok := conn.Requests[request_]; !ok {
451                                 err = fmt.Errorf("unexpected piece: %s", request_)
452                                 break
453                         }
454                         delete(conn.Requests, request_)
455                         err = me.downloadedChunk(torrent, msg)
456                 default:
457                         log.Printf("received unknown message type: %#v", msg.Type)
458                 }
459                 if err != nil {
460                         return err
461                 }
462                 me.replenishConnRequests(torrent, conn)
463         }
464 }
465
466 func (me *Client) dropConnection(torrent *torrent, conn *connection) {
467         conn.Socket.Close()
468         for i0, c := range torrent.Conns {
469                 if c != conn {
470                         continue
471                 }
472                 i1 := len(torrent.Conns) - 1
473                 if i0 != i1 {
474                         torrent.Conns[i0] = torrent.Conns[i1]
475                 }
476                 torrent.Conns = torrent.Conns[:i1]
477                 return
478         }
479         panic("no such Connection")
480 }
481
482 func (me *Client) addConnection(t *torrent, c *connection) bool {
483         for _, c0 := range t.Conns {
484                 if c.PeerId == c0.PeerId {
485                         log.Printf("%s and %s have the same ID: %s", c.Socket.RemoteAddr(), c0.Socket.RemoteAddr(), c.PeerId)
486                         return false
487                 }
488         }
489         t.Conns = append(t.Conns, c)
490         return true
491 }
492
493 func (me *Client) openNewConns() {
494         for _, t := range me.torrents {
495                 for len(t.Peers) != 0 {
496                         if me.halfOpen >= me.HalfOpenLimit {
497                                 return
498                         }
499                         p := t.Peers[0]
500                         t.Peers = t.Peers[1:]
501                         me.initiateConn(p, t)
502                 }
503         }
504 }
505
506 // Adds peers to the swarm for the torrent corresponding to infoHash.
507 func (me *Client) AddPeers(infoHash InfoHash, peers []Peer) error {
508         me.mu.Lock()
509         t := me.torrent(infoHash)
510         if t == nil {
511                 return errors.New("no such torrent")
512         }
513         t.Peers = append(t.Peers, peers...)
514         me.openNewConns()
515         me.mu.Unlock()
516         return nil
517 }
518
519 // Prepare a Torrent without any attachment to a Client. That means we can
520 // initialize fields all fields that don't require the Client without locking
521 // it.
522 func newTorrent(metaInfo *metainfo.MetaInfo, dataDir string) (t *torrent, err error) {
523         t = &torrent{
524                 InfoHash: BytesInfoHash(metaInfo.InfoHash),
525                 MetaInfo: metaInfo,
526         }
527         t.Data, err = mmapTorrentData(metaInfo, dataDir)
528         if err != nil {
529                 return
530         }
531         for offset := 0; offset < len(metaInfo.Pieces); offset += pieceHash.Size() {
532                 hash := metaInfo.Pieces[offset : offset+pieceHash.Size()]
533                 if len(hash) != pieceHash.Size() {
534                         err = errors.New("bad piece hash in metainfo")
535                         return
536                 }
537                 piece := &piece{}
538                 copyHashSum(piece.Hash[:], hash)
539                 t.Pieces = append(t.Pieces, piece)
540                 t.pendAllChunkSpecs(peer_protocol.Integer(len(t.Pieces) - 1))
541         }
542         t.Trackers = make([][]tracker.Client, len(metaInfo.AnnounceList))
543         for tierIndex := range metaInfo.AnnounceList {
544                 tier := t.Trackers[tierIndex]
545                 for _, url := range metaInfo.AnnounceList[tierIndex] {
546                         tr, err := tracker.New(url)
547                         if err != nil {
548                                 log.Print(err)
549                                 continue
550                         }
551                         tier = append(tier, tr)
552                 }
553                 // The trackers within each tier must be shuffled before use.
554                 // http://stackoverflow.com/a/12267471/149482
555                 // http://www.bittorrent.org/beps/bep_0012.html#order-of-processing
556                 for i := range tier {
557                         j := mathRand.Intn(i + 1)
558                         tier[i], tier[j] = tier[j], tier[i]
559                 }
560                 t.Trackers[tierIndex] = tier
561         }
562         return
563 }
564
565 // Adds the torrent to the client.
566 func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) error {
567         torrent, err := newTorrent(metaInfo, me.DataDir)
568         if err != nil {
569                 return err
570         }
571         me.mu.Lock()
572         defer me.mu.Unlock()
573         if _, ok := me.torrents[torrent.InfoHash]; ok {
574                 return torrent.Close()
575         }
576         me.torrents[torrent.InfoHash] = torrent
577         if !me.DisableTrackers {
578                 go me.announceTorrent(torrent)
579         }
580         for i := range torrent.Pieces {
581                 me.queuePieceCheck(torrent, peer_protocol.Integer(i))
582         }
583         return nil
584 }
585
586 func (cl *Client) listenerAnnouncePort() (port int16) {
587         l := cl.Listener
588         if l == nil {
589                 return
590         }
591         addr := l.Addr()
592         switch data := addr.(type) {
593         case *net.TCPAddr:
594                 return int16(data.Port)
595         case *net.UDPAddr:
596                 return int16(data.Port)
597         default:
598                 log.Printf("unknown listener addr type: %T", addr)
599         }
600         return
601 }
602
603 func (cl *Client) announceTorrent(t *torrent) {
604         req := tracker.AnnounceRequest{
605                 Event:   tracker.Started,
606                 NumWant: -1,
607                 Port:    cl.listenerAnnouncePort(),
608         }
609         req.PeerId = cl.PeerId
610         req.InfoHash = t.InfoHash
611 newAnnounce:
612         for {
613                 for _, tier := range t.Trackers {
614                         for trIndex, tr := range tier {
615                                 if err := tr.Connect(); err != nil {
616                                         log.Print(err)
617                                         continue
618                                 }
619                                 resp, err := tr.Announce(&req)
620                                 if err != nil {
621                                         log.Print(err)
622                                         continue
623                                 }
624                                 var peers []Peer
625                                 for _, peer := range resp.Peers {
626                                         peers = append(peers, Peer{
627                                                 IP:   peer.IP,
628                                                 Port: peer.Port,
629                                         })
630                                 }
631                                 if err := cl.AddPeers(t.InfoHash, peers); err != nil {
632                                         log.Print(err)
633                                         return
634                                 }
635                                 log.Printf("%d new peers from %s", len(peers), "TODO")
636                                 tier[0], tier[trIndex] = tier[trIndex], tier[0]
637                                 time.Sleep(time.Second * time.Duration(resp.Interval))
638                                 continue newAnnounce
639                         }
640                 }
641                 time.Sleep(time.Second)
642         }
643 }
644
645 func (cl *Client) allTorrentsCompleted() bool {
646         for _, t := range cl.torrents {
647                 if !t.haveAllPieces() {
648                         return false
649                 }
650         }
651         return true
652 }
653
654 // Returns true when all torrents are completely downloaded and false if the
655 // client is stopped.
656 func (me *Client) WaitAll() bool {
657         me.mu.Lock()
658         defer me.mu.Unlock()
659         for !me.allTorrentsCompleted() {
660                 if me.stopped() {
661                         return false
662                 }
663                 me.event.Wait()
664         }
665         return true
666 }
667
668 func (me *Client) replenishConnRequests(torrent *torrent, conn *connection) {
669         requestHeatMap := torrent.requestHeat()
670         addRequest := func(req request) (again bool) {
671                 piece := torrent.Pieces[req.Index]
672                 if piece.Hashing {
673                         // We can't be sure we want this.
674                         return true
675                 }
676                 if piece.Complete() {
677                         // We already have this.
678                         return true
679                 }
680                 if requestHeatMap[req] > 0 {
681                         // We've already requested this.
682                         return true
683                 }
684                 return conn.Request(req)
685         }
686         // First request prioritized chunks.
687         if torrent.Priorities != nil {
688                 for e := torrent.Priorities.Front(); e != nil; e = e.Next() {
689                         if !addRequest(e.Value.(request)) {
690                                 return
691                         }
692                 }
693         }
694         // Then finish off incomplete pieces in order of bytes remaining.
695         for _, index := range torrent.piecesByPendingBytesDesc() {
696                 if torrent.PieceNumPendingBytes(index) == torrent.PieceLength(index) {
697                         continue
698                 }
699                 for chunkSpec := range torrent.Pieces[index].PendingChunkSpecs {
700                         if !addRequest(request{index, chunkSpec}) {
701                                 return
702                         }
703                 }
704         }
705         if len(conn.Requests) == 0 {
706                 conn.SetInterested(false)
707         }
708 }
709
710 func (me *Client) downloadedChunk(torrent *torrent, msg *peer_protocol.Message) (err error) {
711         req := request{msg.Index, chunkSpec{msg.Begin, peer_protocol.Integer(len(msg.Piece))}}
712         if _, ok := torrent.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok {
713                 log.Printf("got unnecessary chunk: %s", req)
714                 return
715         }
716         err = torrent.WriteChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
717         if err != nil {
718                 return
719         }
720         delete(torrent.Pieces[req.Index].PendingChunkSpecs, req.chunkSpec)
721         if len(torrent.Pieces[req.Index].PendingChunkSpecs) == 0 {
722                 me.queuePieceCheck(torrent, req.Index)
723         }
724         var next *list.Element
725         for e := torrent.Priorities.Front(); e != nil; e = next {
726                 next = e.Next()
727                 if e.Value.(request) == req {
728                         torrent.Priorities.Remove(e)
729                 }
730         }
731         me.dataReady(dataSpec{torrent.InfoHash, req})
732         return
733 }
734
735 func (cl *Client) dataReady(ds dataSpec) {
736         if cl.dataWaiter != nil {
737                 close(cl.dataWaiter)
738         }
739         cl.dataWaiter = nil
740 }
741
742 // Returns a channel that is closed when new data has become available in the
743 // client.
744 func (me *Client) DataWaiter() <-chan struct{} {
745         me.mu.Lock()
746         defer me.mu.Unlock()
747         if me.dataWaiter == nil {
748                 me.dataWaiter = make(chan struct{})
749         }
750         return me.dataWaiter
751 }
752
753 func (me *Client) pieceHashed(t *torrent, piece peer_protocol.Integer, correct bool) {
754         p := t.Pieces[piece]
755         p.EverHashed = true
756         if correct {
757                 p.PendingChunkSpecs = nil
758                 log.Printf("got piece %d, (%d/%d)", piece, t.NumPiecesCompleted(), t.NumPieces())
759                 var next *list.Element
760                 if t.Priorities != nil {
761                         for e := t.Priorities.Front(); e != nil; e = next {
762                                 next = e.Next()
763                                 if e.Value.(request).Index == piece {
764                                         t.Priorities.Remove(e)
765                                 }
766                         }
767                 }
768                 me.dataReady(dataSpec{
769                         t.InfoHash,
770                         request{
771                                 peer_protocol.Integer(piece),
772                                 chunkSpec{0, peer_protocol.Integer(t.PieceLength(piece))},
773                         },
774                 })
775         } else {
776                 if len(p.PendingChunkSpecs) == 0 {
777                         t.pendAllChunkSpecs(piece)
778                 }
779         }
780         for _, conn := range t.Conns {
781                 if correct {
782                         conn.Post(peer_protocol.Message{
783                                 Type:  peer_protocol.Have,
784                                 Index: peer_protocol.Integer(piece),
785                         })
786                         // TODO: Cancel requests for this piece.
787                 } else {
788                         if conn.PeerHasPiece(piece) {
789                                 me.replenishConnRequests(t, conn)
790                         }
791                 }
792         }
793         me.event.Broadcast()
794 }
795
796 func (cl *Client) verifyPiece(t *torrent, index peer_protocol.Integer) {
797         cl.mu.Lock()
798         p := t.Pieces[index]
799         for p.Hashing {
800                 cl.event.Wait()
801         }
802         p.Hashing = true
803         p.QueuedForHash = false
804         cl.mu.Unlock()
805         sum := t.HashPiece(index)
806         cl.mu.Lock()
807         p.Hashing = false
808         cl.pieceHashed(t, index, sum == p.Hash)
809         cl.mu.Unlock()
810 }
811
812 func (me *Client) Torrents() (ret []*torrent) {
813         me.mu.Lock()
814         for _, t := range me.torrents {
815                 ret = append(ret, t)
816         }
817         me.mu.Unlock()
818         return
819 }