]> Sergey Matveev's repositories - btrtrc.git/blob - client.go
76425ccc2c37eb91866848145f685d256df9c586
[btrtrc.git] / client.go
1 /*
2 Package torrent implements a torrent client.
3
4 Simple example:
5
6         c := &Client{}
7         c.Start()
8         defer c.Stop()
9         if err := c.AddTorrent(externalMetaInfoPackageSux); err != nil {
10                 return fmt.Errors("error adding torrent: %s", err)
11         }
12         c.WaitAll()
13         log.Print("erhmahgerd, torrent downloaded")
14
15 */
16 package torrent
17
18 import (
19         "bufio"
20         "container/list"
21         "crypto/rand"
22         "encoding"
23         "errors"
24         "fmt"
25         "io"
26         "log"
27         mathRand "math/rand"
28         "net"
29         "os"
30         "sync"
31         "syscall"
32         "time"
33
34         metainfo "github.com/nsf/libtorgo/torrent"
35
36         "bitbucket.org/anacrolix/go.torrent/peer_protocol"
37         "bitbucket.org/anacrolix/go.torrent/tracker"
38         _ "bitbucket.org/anacrolix/go.torrent/tracker/udp"
39 )
40
41 // Currently doesn't really queue, but should in the future.
42 func (cl *Client) queuePieceCheck(t *torrent, pieceIndex peer_protocol.Integer) {
43         piece := t.Pieces[pieceIndex]
44         if piece.QueuedForHash {
45                 return
46         }
47         piece.QueuedForHash = true
48         go cl.verifyPiece(t, pieceIndex)
49 }
50
51 // Queues the torrent data for the given region for download. The beginning of
52 // the region is given highest priority to allow a subsequent read at the same
53 // offset to return data ASAP.
54 func (me *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) error {
55         me.mu.Lock()
56         defer me.mu.Unlock()
57         t := me.torrent(ih)
58         if t == nil {
59                 return errors.New("no such active torrent")
60         }
61         newPriorities := make([]Request, 0, (len_+chunkSize-1)/chunkSize)
62         for len_ > 0 {
63                 req, ok := t.offsetRequest(off)
64                 if !ok {
65                         return errors.New("bad offset")
66                 }
67                 reqOff := t.requestOffset(req)
68                 // Gain the alignment adjustment.
69                 len_ += off - reqOff
70                 // Lose the length of this block.
71                 len_ -= int64(req.Length)
72                 off = reqOff + int64(req.Length)
73                 if !t.wantPiece(int(req.Index)) {
74                         continue
75                 }
76                 newPriorities = append(newPriorities, req)
77         }
78         if len(newPriorities) == 0 {
79                 return nil
80         }
81         if t.Priorities == nil {
82                 t.Priorities = list.New()
83         }
84         t.Priorities.PushFront(newPriorities[0])
85         for _, req := range newPriorities[1:] {
86                 t.Priorities.PushBack(req)
87         }
88         for _, cn := range t.Conns {
89                 me.replenishConnRequests(t, cn)
90         }
91         return nil
92 }
93
94 type dataSpec struct {
95         InfoHash
96         Request
97 }
98
99 type Client struct {
100         DataDir         string
101         HalfOpenLimit   int
102         PeerId          [20]byte
103         Listener        net.Listener
104         DisableTrackers bool
105
106         mu    sync.Mutex
107         event sync.Cond
108         quit  chan struct{}
109
110         halfOpen   int
111         torrents   map[InfoHash]*torrent
112         dataWaiter chan struct{}
113 }
114
115 // Read torrent data at the given offset. Returns ErrDataNotReady if the data
116 // isn't available.
117 func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err error) {
118         cl.mu.Lock()
119         defer cl.mu.Unlock()
120         t := cl.torrent(ih)
121         if t == nil {
122                 err = errors.New("unknown torrent")
123                 return
124         }
125         index := peer_protocol.Integer(off / t.MetaInfo.PieceLength)
126         // Reading outside the bounds of a file is an error.
127         if index < 0 {
128                 err = os.ErrInvalid
129                 return
130         }
131         if int(index) >= len(t.Pieces) {
132                 err = io.EOF
133                 return
134         }
135         piece := t.Pieces[index]
136         if !piece.EverHashed {
137                 cl.queuePieceCheck(t, index)
138         }
139         if piece.Hashing {
140                 err = ErrDataNotReady
141                 return
142         }
143         pieceOff := peer_protocol.Integer(off % int64(t.PieceLength(0)))
144         high := int(t.PieceLength(index) - pieceOff)
145         if high < len(p) {
146                 p = p[:high]
147         }
148         for cs, _ := range piece.PendingChunkSpecs {
149                 chunkOff := int64(pieceOff) - int64(cs.Begin)
150                 if chunkOff >= int64(t.PieceLength(index)) {
151                         panic(chunkOff)
152                 }
153                 if 0 <= chunkOff && chunkOff < int64(cs.Length) {
154                         // read begins in a pending chunk
155                         err = ErrDataNotReady
156                         return
157                 }
158                 // pending chunk caps available data
159                 if chunkOff < 0 && int64(len(p)) > -chunkOff {
160                         p = p[:-chunkOff]
161                 }
162         }
163         return t.Data.ReadAt(p, off)
164 }
165
166 // Starts the client. Defaults are applied. The client will begin accepting connections and tracking.
167 func (c *Client) Start() {
168         c.event.L = &c.mu
169         c.torrents = make(map[InfoHash]*torrent)
170         if c.HalfOpenLimit == 0 {
171                 c.HalfOpenLimit = 10
172         }
173         o := copy(c.PeerId[:], BEP20)
174         _, err := rand.Read(c.PeerId[o:])
175         if err != nil {
176                 panic("error generating peer id")
177         }
178         c.quit = make(chan struct{})
179         if c.Listener != nil {
180                 go c.acceptConnections()
181         }
182 }
183
184 func (cl *Client) stopped() bool {
185         select {
186         case <-cl.quit:
187                 return true
188         default:
189                 return false
190         }
191 }
192
193 // Stops the client. All connections to peers are closed and all activity will
194 // come to a halt.
195 func (me *Client) Stop() {
196         me.mu.Lock()
197         close(me.quit)
198         me.event.Broadcast()
199         for _, t := range me.torrents {
200                 for _, c := range t.Conns {
201                         c.Close()
202                 }
203         }
204         me.mu.Unlock()
205 }
206
207 func (cl *Client) acceptConnections() {
208         for {
209                 conn, err := cl.Listener.Accept()
210                 select {
211                 case <-cl.quit:
212                         return
213                 default:
214                 }
215                 if err != nil {
216                         log.Print(err)
217                         return
218                 }
219                 go func() {
220                         if err := cl.runConnection(conn, nil); err != nil {
221                                 log.Print(err)
222                         }
223                 }()
224         }
225 }
226
227 func (me *Client) torrent(ih InfoHash) *torrent {
228         for _, t := range me.torrents {
229                 if t.InfoHash == ih {
230                         return t
231                 }
232         }
233         return nil
234 }
235
236 func (me *Client) initiateConn(peer Peer, torrent *torrent) {
237         if peer.Id == me.PeerId {
238                 return
239         }
240         me.halfOpen++
241         go func() {
242                 addr := &net.TCPAddr{
243                         IP:   peer.IP,
244                         Port: peer.Port,
245                 }
246                 conn, err := net.DialTimeout(addr.Network(), addr.String(), dialTimeout)
247
248                 go func() {
249                         me.mu.Lock()
250                         defer me.mu.Unlock()
251                         if me.halfOpen == 0 {
252                                 panic("assert")
253                         }
254                         me.halfOpen--
255                         me.openNewConns()
256                 }()
257
258                 if netOpErr, ok := err.(*net.OpError); ok {
259                         if netOpErr.Timeout() {
260                                 return
261                         }
262                         switch netOpErr.Err {
263                         case syscall.ECONNREFUSED, syscall.EHOSTUNREACH:
264                                 return
265                         }
266                 }
267                 if err != nil {
268                         log.Printf("error connecting to peer: %s %#v", err, err)
269                         return
270                 }
271                 log.Printf("connected to %s", conn.RemoteAddr())
272                 err = me.runConnection(conn, torrent)
273                 if err != nil {
274                         log.Print(err)
275                 }
276         }()
277 }
278
279 func (me *Client) runConnection(sock net.Conn, torrent *torrent) (err error) {
280         conn := &connection{
281                 Socket:     sock,
282                 Choked:     true,
283                 PeerChoked: true,
284                 write:      make(chan []byte),
285                 post:       make(chan encoding.BinaryMarshaler),
286         }
287         defer func() {
288                 // There's a lock and deferred unlock later in this function. The
289                 // client will not be locked when this deferred is invoked.
290                 me.mu.Lock()
291                 defer me.mu.Unlock()
292                 conn.Close()
293         }()
294         go conn.writer()
295         go conn.writeOptimizer()
296         conn.post <- peer_protocol.Bytes(peer_protocol.Protocol)
297         conn.post <- peer_protocol.Bytes("\x00\x00\x00\x00\x00\x00\x00\x00")
298         if torrent != nil {
299                 conn.post <- peer_protocol.Bytes(torrent.InfoHash[:])
300                 conn.post <- peer_protocol.Bytes(me.PeerId[:])
301         }
302         var b [28]byte
303         _, err = io.ReadFull(conn.Socket, b[:])
304         if err == io.EOF {
305                 return nil
306         }
307         if err != nil {
308                 err = fmt.Errorf("when reading protocol and extensions: %s", err)
309                 return
310         }
311         if string(b[:20]) != peer_protocol.Protocol {
312                 err = fmt.Errorf("wrong protocol: %#v", string(b[:20]))
313                 return
314         }
315         if 8 != copy(conn.PeerExtensions[:], b[20:]) {
316                 panic("wtf")
317         }
318         // log.Printf("peer extensions: %#v", string(conn.PeerExtensions[:]))
319         var infoHash [20]byte
320         _, err = io.ReadFull(conn.Socket, infoHash[:])
321         if err != nil {
322                 return fmt.Errorf("reading peer info hash: %s", err)
323         }
324         _, err = io.ReadFull(conn.Socket, conn.PeerId[:])
325         if err != nil {
326                 return fmt.Errorf("reading peer id: %s", err)
327         }
328         if torrent == nil {
329                 torrent = me.torrent(infoHash)
330                 if torrent == nil {
331                         return
332                 }
333                 conn.post <- peer_protocol.Bytes(torrent.InfoHash[:])
334                 conn.post <- peer_protocol.Bytes(me.PeerId[:])
335         }
336         me.mu.Lock()
337         defer me.mu.Unlock()
338         if !me.addConnection(torrent, conn) {
339                 return
340         }
341         if torrent.haveAnyPieces() {
342                 conn.Post(peer_protocol.Message{
343                         Type:     peer_protocol.Bitfield,
344                         Bitfield: torrent.bitfield(),
345                 })
346         }
347         err = me.connectionLoop(torrent, conn)
348         if err != nil {
349                 err = fmt.Errorf("during Connection loop: %s", err)
350         }
351         me.dropConnection(torrent, conn)
352         return
353 }
354
355 func (me *Client) peerGotPiece(torrent *torrent, conn *connection, piece int) {
356         if conn.PeerPieces == nil {
357                 conn.PeerPieces = make([]bool, len(torrent.Pieces))
358         }
359         conn.PeerPieces[piece] = true
360         if torrent.wantPiece(piece) {
361                 me.replenishConnRequests(torrent, conn)
362         }
363 }
364
365 func (me *Client) peerUnchoked(torrent *torrent, conn *connection) {
366         me.replenishConnRequests(torrent, conn)
367 }
368
369 func (me *Client) connectionLoop(torrent *torrent, conn *connection) error {
370         decoder := peer_protocol.Decoder{
371                 R:         bufio.NewReader(conn.Socket),
372                 MaxLength: 256 * 1024,
373         }
374         for {
375                 me.mu.Unlock()
376                 // TODO: Can this be allocated on the stack?
377                 msg := new(peer_protocol.Message)
378                 err := decoder.Decode(msg)
379                 me.mu.Lock()
380                 if err != nil {
381                         if me.stopped() || err == io.EOF {
382                                 return nil
383                         }
384                         return err
385                 }
386                 if msg.Keepalive {
387                         continue
388                 }
389                 switch msg.Type {
390                 case peer_protocol.Choke:
391                         conn.PeerChoked = true
392                         conn.Requests = nil
393                 case peer_protocol.Unchoke:
394                         conn.PeerChoked = false
395                         me.peerUnchoked(torrent, conn)
396                 case peer_protocol.Interested:
397                         conn.PeerInterested = true
398                         // TODO: This should be done from a dedicated unchoking routine.
399                         conn.Unchoke()
400                 case peer_protocol.NotInterested:
401                         conn.PeerInterested = false
402                 case peer_protocol.Have:
403                         me.peerGotPiece(torrent, conn, int(msg.Index))
404                 case peer_protocol.Request:
405                         if conn.PeerRequests == nil {
406                                 conn.PeerRequests = make(map[Request]struct{}, maxRequests)
407                         }
408                         request := Request{
409                                 Index:     msg.Index,
410                                 chunkSpec: chunkSpec{msg.Begin, msg.Length},
411                         }
412                         conn.PeerRequests[request] = struct{}{}
413                         // TODO: Requests should be satisfied from a dedicated upload routine.
414                         p := make([]byte, msg.Length)
415                         n, err := torrent.Data.ReadAt(p, int64(torrent.PieceLength(0))*int64(msg.Index)+int64(msg.Begin))
416                         if err != nil {
417                                 return fmt.Errorf("reading torrent data to serve request %s: %s", request, err)
418                         }
419                         if n != int(msg.Length) {
420                                 return fmt.Errorf("bad request: %s", msg)
421                         }
422                         conn.Post(peer_protocol.Message{
423                                 Type:  peer_protocol.Piece,
424                                 Index: msg.Index,
425                                 Begin: msg.Begin,
426                                 Piece: p,
427                         })
428                 case peer_protocol.Bitfield:
429                         if len(msg.Bitfield) < len(torrent.Pieces) {
430                                 err = errors.New("received invalid bitfield")
431                                 break
432                         }
433                         if conn.PeerPieces != nil {
434                                 err = errors.New("received unexpected bitfield")
435                                 break
436                         }
437                         conn.PeerPieces = msg.Bitfield[:len(torrent.Pieces)]
438                         for index, has := range conn.PeerPieces {
439                                 if has {
440                                         me.peerGotPiece(torrent, conn, index)
441                                 }
442                         }
443                 case peer_protocol.Piece:
444                         request_ := Request{msg.Index, chunkSpec{msg.Begin, peer_protocol.Integer(len(msg.Piece))}}
445                         if _, ok := conn.Requests[request_]; !ok {
446                                 err = fmt.Errorf("unexpected piece: %s", request_)
447                                 break
448                         }
449                         delete(conn.Requests, request_)
450                         err = me.downloadedChunk(torrent, msg)
451                 default:
452                         log.Printf("received unknown message type: %#v", msg.Type)
453                 }
454                 if err != nil {
455                         return err
456                 }
457                 me.replenishConnRequests(torrent, conn)
458         }
459 }
460
461 func (me *Client) dropConnection(torrent *torrent, conn *connection) {
462         conn.Socket.Close()
463         for i0, c := range torrent.Conns {
464                 if c != conn {
465                         continue
466                 }
467                 i1 := len(torrent.Conns) - 1
468                 if i0 != i1 {
469                         torrent.Conns[i0] = torrent.Conns[i1]
470                 }
471                 torrent.Conns = torrent.Conns[:i1]
472                 return
473         }
474         panic("no such Connection")
475 }
476
477 func (me *Client) addConnection(t *torrent, c *connection) bool {
478         for _, c0 := range t.Conns {
479                 if c.PeerId == c0.PeerId {
480                         log.Printf("%s and %s have the same ID: %s", c.Socket.RemoteAddr(), c0.Socket.RemoteAddr(), c.PeerId)
481                         return false
482                 }
483         }
484         t.Conns = append(t.Conns, c)
485         return true
486 }
487
488 func (me *Client) openNewConns() {
489         for _, t := range me.torrents {
490                 for len(t.Peers) != 0 {
491                         if me.halfOpen >= me.HalfOpenLimit {
492                                 return
493                         }
494                         p := t.Peers[0]
495                         t.Peers = t.Peers[1:]
496                         me.initiateConn(p, t)
497                 }
498         }
499 }
500
501 // Adds peers to the swarm for the torrent corresponding to infoHash.
502 func (me *Client) AddPeers(infoHash InfoHash, peers []Peer) error {
503         me.mu.Lock()
504         t := me.torrent(infoHash)
505         if t == nil {
506                 return errors.New("no such torrent")
507         }
508         t.Peers = append(t.Peers, peers...)
509         me.openNewConns()
510         me.mu.Unlock()
511         return nil
512 }
513
514 // Prepare a Torrent without any attachment to a Client. That means we can
515 // initialize fields all fields that don't require the Client without locking
516 // it.
517 func newTorrent(metaInfo *metainfo.MetaInfo, dataDir string) (t *torrent, err error) {
518         t = &torrent{
519                 InfoHash: BytesInfoHash(metaInfo.InfoHash),
520                 MetaInfo: metaInfo,
521         }
522         t.Data, err = mmapTorrentData(metaInfo, dataDir)
523         if err != nil {
524                 return
525         }
526         for offset := 0; offset < len(metaInfo.Pieces); offset += pieceHash.Size() {
527                 hash := metaInfo.Pieces[offset : offset+pieceHash.Size()]
528                 if len(hash) != pieceHash.Size() {
529                         err = errors.New("bad piece hash in metainfo")
530                         return
531                 }
532                 piece := &piece{}
533                 copyHashSum(piece.Hash[:], hash)
534                 t.Pieces = append(t.Pieces, piece)
535                 t.pendAllChunkSpecs(peer_protocol.Integer(len(t.Pieces) - 1))
536         }
537         t.Trackers = make([][]tracker.Client, len(metaInfo.AnnounceList))
538         for tierIndex := range metaInfo.AnnounceList {
539                 tier := t.Trackers[tierIndex]
540                 for _, url := range metaInfo.AnnounceList[tierIndex] {
541                         tr, err := tracker.New(url)
542                         if err != nil {
543                                 log.Print(err)
544                                 continue
545                         }
546                         tier = append(tier, tr)
547                 }
548                 // The trackers within each tier must be shuffled before use.
549                 // http://stackoverflow.com/a/12267471/149482
550                 // http://www.bittorrent.org/beps/bep_0012.html#order-of-processing
551                 for i := range tier {
552                         j := mathRand.Intn(i + 1)
553                         tier[i], tier[j] = tier[j], tier[i]
554                 }
555                 t.Trackers[tierIndex] = tier
556         }
557         return
558 }
559
560 // Adds the torrent to the client.
561 func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) error {
562         torrent, err := newTorrent(metaInfo, me.DataDir)
563         if err != nil {
564                 return err
565         }
566         me.mu.Lock()
567         defer me.mu.Unlock()
568         if _, ok := me.torrents[torrent.InfoHash]; ok {
569                 return torrent.Close()
570         }
571         me.torrents[torrent.InfoHash] = torrent
572         if !me.DisableTrackers {
573                 go me.announceTorrent(torrent)
574         }
575         for i := range torrent.Pieces {
576                 me.queuePieceCheck(torrent, peer_protocol.Integer(i))
577         }
578         return nil
579 }
580
581 func (cl *Client) listenerAnnouncePort() (port int16) {
582         l := cl.Listener
583         if l == nil {
584                 return
585         }
586         addr := l.Addr()
587         switch data := addr.(type) {
588         case *net.TCPAddr:
589                 return int16(data.Port)
590         case *net.UDPAddr:
591                 return int16(data.Port)
592         default:
593                 log.Printf("unknown listener addr type: %T", addr)
594         }
595         return
596 }
597
598 func (cl *Client) announceTorrent(t *torrent) {
599         req := tracker.AnnounceRequest{
600                 Event:   tracker.Started,
601                 NumWant: -1,
602                 Port:    cl.listenerAnnouncePort(),
603         }
604         req.PeerId = cl.PeerId
605         req.InfoHash = t.InfoHash
606 newAnnounce:
607         for {
608                 for _, tier := range t.Trackers {
609                         for trIndex, tr := range tier {
610                                 if err := tr.Connect(); err != nil {
611                                         log.Print(err)
612                                         continue
613                                 }
614                                 resp, err := tr.Announce(&req)
615                                 if err != nil {
616                                         log.Print(err)
617                                         continue
618                                 }
619                                 var peers []Peer
620                                 for _, peer := range resp.Peers {
621                                         peers = append(peers, Peer{
622                                                 IP:   peer.IP,
623                                                 Port: peer.Port,
624                                         })
625                                 }
626                                 if err := cl.AddPeers(t.InfoHash, peers); err != nil {
627                                         log.Print(err)
628                                         return
629                                 }
630                                 log.Printf("%d new peers from %s", len(peers), "TODO")
631                                 tier[0], tier[trIndex] = tier[trIndex], tier[0]
632                                 time.Sleep(time.Second * time.Duration(resp.Interval))
633                                 continue newAnnounce
634                         }
635                 }
636                 time.Sleep(time.Second)
637         }
638 }
639
640 func (cl *Client) allTorrentsCompleted() bool {
641         for _, t := range cl.torrents {
642                 if !t.haveAllPieces() {
643                         return false
644                 }
645         }
646         return true
647 }
648
649 // Returns true when all torrents are completely downloaded and false if the
650 // client is stopped.
651 func (me *Client) WaitAll() bool {
652         me.mu.Lock()
653         defer me.mu.Unlock()
654         for !me.allTorrentsCompleted() {
655                 if me.stopped() {
656                         return false
657                 }
658                 me.event.Wait()
659         }
660         return true
661 }
662
663 func (me *Client) replenishConnRequests(torrent *torrent, conn *connection) {
664         requestHeatMap := torrent.requestHeat()
665         addRequest := func(req Request) (again bool) {
666                 piece := torrent.Pieces[req.Index]
667                 if piece.Hashing {
668                         // We can't be sure we want this.
669                         return true
670                 }
671                 if piece.Complete() {
672                         // We already have this.
673                         return true
674                 }
675                 if requestHeatMap[req] > 0 {
676                         // We've already requested this.
677                         return true
678                 }
679                 return conn.Request(req)
680         }
681         // First request prioritized chunks.
682         if torrent.Priorities != nil {
683                 for e := torrent.Priorities.Front(); e != nil; e = e.Next() {
684                         if !addRequest(e.Value.(Request)) {
685                                 return
686                         }
687                 }
688         }
689         // Then finish off incomplete pieces in order of bytes remaining.
690         for _, index := range torrent.piecesByPendingBytesDesc() {
691                 if torrent.PieceNumPendingBytes(index) == torrent.PieceLength(index) {
692                         continue
693                 }
694                 for chunkSpec := range torrent.Pieces[index].PendingChunkSpecs {
695                         if !addRequest(Request{index, chunkSpec}) {
696                                 return
697                         }
698                 }
699         }
700         if len(conn.Requests) == 0 {
701                 conn.SetInterested(false)
702         }
703 }
704
705 func (me *Client) downloadedChunk(torrent *torrent, msg *peer_protocol.Message) (err error) {
706         request := Request{msg.Index, chunkSpec{msg.Begin, peer_protocol.Integer(len(msg.Piece))}}
707         if _, ok := torrent.Pieces[request.Index].PendingChunkSpecs[request.chunkSpec]; !ok {
708                 log.Printf("got unnecessary chunk: %s", request)
709                 return
710         }
711         err = torrent.WriteChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
712         if err != nil {
713                 return
714         }
715         delete(torrent.Pieces[request.Index].PendingChunkSpecs, request.chunkSpec)
716         if len(torrent.Pieces[request.Index].PendingChunkSpecs) == 0 {
717                 me.queuePieceCheck(torrent, request.Index)
718         }
719         var next *list.Element
720         for e := torrent.Priorities.Front(); e != nil; e = next {
721                 next = e.Next()
722                 if e.Value.(Request) == request {
723                         torrent.Priorities.Remove(e)
724                 }
725         }
726         me.dataReady(dataSpec{torrent.InfoHash, request})
727         return
728 }
729
730 func (cl *Client) dataReady(ds dataSpec) {
731         if cl.dataWaiter != nil {
732                 close(cl.dataWaiter)
733         }
734         cl.dataWaiter = nil
735 }
736
737 // Returns a channel that is closed when new data has become available in the
738 // client.
739 func (me *Client) DataWaiter() <-chan struct{} {
740         me.mu.Lock()
741         defer me.mu.Unlock()
742         if me.dataWaiter == nil {
743                 me.dataWaiter = make(chan struct{})
744         }
745         return me.dataWaiter
746 }
747
748 func (me *Client) pieceHashed(t *torrent, piece peer_protocol.Integer, correct bool) {
749         p := t.Pieces[piece]
750         p.EverHashed = true
751         if correct {
752                 p.PendingChunkSpecs = nil
753                 log.Printf("got piece %d, (%d/%d)", piece, t.NumPiecesCompleted(), t.NumPieces())
754                 var next *list.Element
755                 if t.Priorities != nil {
756                         for e := t.Priorities.Front(); e != nil; e = next {
757                                 next = e.Next()
758                                 if e.Value.(Request).Index == piece {
759                                         t.Priorities.Remove(e)
760                                 }
761                         }
762                 }
763                 me.dataReady(dataSpec{
764                         t.InfoHash,
765                         Request{
766                                 peer_protocol.Integer(piece),
767                                 chunkSpec{0, peer_protocol.Integer(t.PieceLength(piece))},
768                         },
769                 })
770         } else {
771                 if len(p.PendingChunkSpecs) == 0 {
772                         t.pendAllChunkSpecs(piece)
773                 }
774         }
775         for _, conn := range t.Conns {
776                 if correct {
777                         conn.Post(peer_protocol.Message{
778                                 Type:  peer_protocol.Have,
779                                 Index: peer_protocol.Integer(piece),
780                         })
781                         // TODO: Cancel requests for this piece.
782                 } else {
783                         if conn.PeerHasPiece(piece) {
784                                 me.replenishConnRequests(t, conn)
785                         }
786                 }
787         }
788         me.event.Broadcast()
789 }
790
791 func (cl *Client) verifyPiece(t *torrent, index peer_protocol.Integer) {
792         cl.mu.Lock()
793         p := t.Pieces[index]
794         for p.Hashing {
795                 cl.event.Wait()
796         }
797         p.Hashing = true
798         p.QueuedForHash = false
799         cl.mu.Unlock()
800         sum := t.HashPiece(index)
801         cl.mu.Lock()
802         p.Hashing = false
803         cl.pieceHashed(t, index, sum == p.Hash)
804         cl.mu.Unlock()
805 }
806
807 func (me *Client) Torrents() (ret []*torrent) {
808         me.mu.Lock()
809         for _, t := range me.torrents {
810                 ret = append(ret, t)
811         }
812         me.mu.Unlock()
813         return
814 }