]> Sergey Matveev's repositories - btrtrc.git/blob - client.go
6858f953b168893d50730cc6d9599077c13339e6
[btrtrc.git] / client.go
1 package torrent
2
3 import (
4         "bufio"
5         "container/list"
6         "crypto/rand"
7         "encoding"
8         "errors"
9         "fmt"
10         "io"
11         "log"
12         mathRand "math/rand"
13         "net"
14         "os"
15         "sync"
16         "syscall"
17         "time"
18
19         metainfo "github.com/nsf/libtorgo/torrent"
20
21         "bitbucket.org/anacrolix/go.torrent/peer_protocol"
22         "bitbucket.org/anacrolix/go.torrent/tracker"
23         _ "bitbucket.org/anacrolix/go.torrent/tracker/udp"
24 )
25
26 // Currently doesn't really queue, but should in the future.
27 func (cl *Client) queuePieceCheck(t *Torrent, pieceIndex peer_protocol.Integer) {
28         piece := t.Pieces[pieceIndex]
29         if piece.QueuedForHash {
30                 return
31         }
32         piece.QueuedForHash = true
33         go cl.verifyPiece(t, pieceIndex)
34 }
35
36 func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) error {
37         cl.mu.Lock()
38         defer cl.mu.Unlock()
39         t := cl.torrent(ih)
40         if t == nil {
41                 return errors.New("no such active torrent")
42         }
43         newPriorities := make([]Request, 0, (len_+chunkSize-1)/chunkSize)
44         for len_ > 0 {
45                 req, ok := t.offsetRequest(off)
46                 if !ok {
47                         return errors.New("bad offset")
48                 }
49                 reqOff := t.requestOffset(req)
50                 // Gain the alignment adjustment.
51                 len_ += off - reqOff
52                 // Lose the length of this block.
53                 len_ -= int64(req.Length)
54                 off = reqOff + int64(req.Length)
55                 if !t.wantPiece(int(req.Index)) {
56                         continue
57                 }
58                 newPriorities = append(newPriorities, req)
59         }
60         if len(newPriorities) == 0 {
61                 return nil
62         }
63         if t.Priorities == nil {
64                 t.Priorities = list.New()
65         }
66         t.Priorities.PushFront(newPriorities[0])
67         for _, req := range newPriorities[1:] {
68                 t.Priorities.PushBack(req)
69         }
70         for _, cn := range t.Conns {
71                 cl.replenishConnRequests(t, cn)
72         }
73         return nil
74 }
75
76 type DataSpec struct {
77         InfoHash
78         Request
79 }
80
81 type Client struct {
82         DataDir         string
83         HalfOpenLimit   int
84         PeerId          [20]byte
85         Listener        net.Listener
86         DisableTrackers bool
87
88         sync.Mutex
89         mu    *sync.Mutex
90         event sync.Cond
91         quit  chan struct{}
92
93         halfOpen   int
94         torrents   map[InfoHash]*Torrent
95         dataWaiter chan struct{}
96 }
97
98 func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err error) {
99         cl.mu.Lock()
100         defer cl.mu.Unlock()
101         t := cl.torrent(ih)
102         if t == nil {
103                 err = errors.New("unknown torrent")
104                 return
105         }
106         index := peer_protocol.Integer(off / t.MetaInfo.PieceLength)
107         // Reading outside the bounds of a file is an error.
108         if index < 0 {
109                 err = os.ErrInvalid
110                 return
111         }
112         if int(index) >= len(t.Pieces) {
113                 err = io.EOF
114                 return
115         }
116         piece := t.Pieces[index]
117         if !piece.EverHashed {
118                 cl.queuePieceCheck(t, index)
119         }
120         if piece.Hashing {
121                 err = ErrDataNotReady
122                 return
123         }
124         pieceOff := peer_protocol.Integer(off % int64(t.PieceLength(0)))
125         high := int(t.PieceLength(index) - pieceOff)
126         if high < len(p) {
127                 p = p[:high]
128         }
129         for cs, _ := range piece.PendingChunkSpecs {
130                 chunkOff := int64(pieceOff) - int64(cs.Begin)
131                 if chunkOff >= int64(t.PieceLength(index)) {
132                         panic(chunkOff)
133                 }
134                 if 0 <= chunkOff && chunkOff < int64(cs.Length) {
135                         // read begins in a pending chunk
136                         err = ErrDataNotReady
137                         return
138                 }
139                 // pending chunk caps available data
140                 if chunkOff < 0 && int64(len(p)) > -chunkOff {
141                         p = p[:-chunkOff]
142                 }
143         }
144         return t.Data.ReadAt(p, off)
145 }
146
147 func (c *Client) Start() {
148         c.mu = &c.Mutex
149         c.event.L = c.mu
150         c.torrents = make(map[InfoHash]*Torrent)
151         if c.HalfOpenLimit == 0 {
152                 c.HalfOpenLimit = 10
153         }
154         o := copy(c.PeerId[:], BEP20)
155         _, err := rand.Read(c.PeerId[o:])
156         if err != nil {
157                 panic("error generating peer id")
158         }
159         c.quit = make(chan struct{})
160         if c.Listener != nil {
161                 go c.acceptConnections()
162         }
163 }
164
165 func (cl *Client) stopped() bool {
166         select {
167         case <-cl.quit:
168                 return true
169         default:
170                 return false
171         }
172 }
173
174 func (me *Client) Stop() {
175         me.Lock()
176         close(me.quit)
177         me.event.Broadcast()
178         for _, t := range me.torrents {
179                 for _, c := range t.Conns {
180                         c.Close()
181                 }
182         }
183         me.Unlock()
184 }
185
186 func (cl *Client) acceptConnections() {
187         for {
188                 conn, err := cl.Listener.Accept()
189                 select {
190                 case <-cl.quit:
191                         return
192                 default:
193                 }
194                 if err != nil {
195                         log.Print(err)
196                         return
197                 }
198                 go func() {
199                         if err := cl.runConnection(conn, nil); err != nil {
200                                 log.Print(err)
201                         }
202                 }()
203         }
204 }
205
206 func (me *Client) torrent(ih InfoHash) *Torrent {
207         for _, t := range me.torrents {
208                 if t.InfoHash == ih {
209                         return t
210                 }
211         }
212         return nil
213 }
214
215 func (me *Client) initiateConn(peer Peer, torrent *Torrent) {
216         if peer.Id == me.PeerId {
217                 return
218         }
219         me.halfOpen++
220         go func() {
221                 addr := &net.TCPAddr{
222                         IP:   peer.IP,
223                         Port: peer.Port,
224                 }
225                 conn, err := net.DialTimeout(addr.Network(), addr.String(), dialTimeout)
226
227                 go func() {
228                         me.mu.Lock()
229                         defer me.mu.Unlock()
230                         if me.halfOpen == 0 {
231                                 panic("assert")
232                         }
233                         me.halfOpen--
234                         me.openNewConns()
235                 }()
236
237                 if netOpErr, ok := err.(*net.OpError); ok {
238                         if netOpErr.Timeout() {
239                                 return
240                         }
241                         switch netOpErr.Err {
242                         case syscall.ECONNREFUSED, syscall.EHOSTUNREACH:
243                                 return
244                         }
245                 }
246                 if err != nil {
247                         log.Printf("error connecting to peer: %s %#v", err, err)
248                         return
249                 }
250                 log.Printf("connected to %s", conn.RemoteAddr())
251                 err = me.runConnection(conn, torrent)
252                 if err != nil {
253                         log.Print(err)
254                 }
255         }()
256 }
257
258 func (me *Client) runConnection(sock net.Conn, torrent *Torrent) (err error) {
259         conn := &Connection{
260                 Socket:     sock,
261                 Choked:     true,
262                 PeerChoked: true,
263                 write:      make(chan []byte),
264                 post:       make(chan encoding.BinaryMarshaler),
265         }
266         defer func() {
267                 // There's a lock and deferred unlock later in this function. The
268                 // client will not be locked when this deferred is invoked.
269                 me.mu.Lock()
270                 defer me.mu.Unlock()
271                 conn.Close()
272         }()
273         go conn.writer()
274         go conn.writeOptimizer()
275         conn.post <- peer_protocol.Bytes(peer_protocol.Protocol)
276         conn.post <- peer_protocol.Bytes("\x00\x00\x00\x00\x00\x00\x00\x00")
277         if torrent != nil {
278                 conn.post <- peer_protocol.Bytes(torrent.InfoHash[:])
279                 conn.post <- peer_protocol.Bytes(me.PeerId[:])
280         }
281         var b [28]byte
282         _, err = io.ReadFull(conn.Socket, b[:])
283         if err == io.EOF {
284                 return nil
285         }
286         if err != nil {
287                 err = fmt.Errorf("when reading protocol and extensions: %s", err)
288                 return
289         }
290         if string(b[:20]) != peer_protocol.Protocol {
291                 err = fmt.Errorf("wrong protocol: %#v", string(b[:20]))
292                 return
293         }
294         if 8 != copy(conn.PeerExtensions[:], b[20:]) {
295                 panic("wtf")
296         }
297         // log.Printf("peer extensions: %#v", string(conn.PeerExtensions[:]))
298         var infoHash [20]byte
299         _, err = io.ReadFull(conn.Socket, infoHash[:])
300         if err != nil {
301                 return fmt.Errorf("reading peer info hash: %s", err)
302         }
303         _, err = io.ReadFull(conn.Socket, conn.PeerId[:])
304         if err != nil {
305                 return fmt.Errorf("reading peer id: %s", err)
306         }
307         if torrent == nil {
308                 torrent = me.torrent(infoHash)
309                 if torrent == nil {
310                         return
311                 }
312                 conn.post <- peer_protocol.Bytes(torrent.InfoHash[:])
313                 conn.post <- peer_protocol.Bytes(me.PeerId[:])
314         }
315         me.mu.Lock()
316         defer me.mu.Unlock()
317         if !me.addConnection(torrent, conn) {
318                 return
319         }
320         if torrent.haveAnyPieces() {
321                 conn.Post(peer_protocol.Message{
322                         Type:     peer_protocol.Bitfield,
323                         Bitfield: torrent.bitfield(),
324                 })
325         }
326         err = me.connectionLoop(torrent, conn)
327         if err != nil {
328                 err = fmt.Errorf("during Connection loop: %s", err)
329         }
330         me.dropConnection(torrent, conn)
331         return
332 }
333
334 func (me *Client) peerGotPiece(torrent *Torrent, conn *Connection, piece int) {
335         if conn.PeerPieces == nil {
336                 conn.PeerPieces = make([]bool, len(torrent.Pieces))
337         }
338         conn.PeerPieces[piece] = true
339         if torrent.wantPiece(piece) {
340                 me.replenishConnRequests(torrent, conn)
341         }
342 }
343
344 func (me *Client) peerUnchoked(torrent *Torrent, conn *Connection) {
345         me.replenishConnRequests(torrent, conn)
346 }
347
348 func (me *Client) connectionLoop(torrent *Torrent, conn *Connection) error {
349         decoder := peer_protocol.Decoder{
350                 R:         bufio.NewReader(conn.Socket),
351                 MaxLength: 256 * 1024,
352         }
353         for {
354                 me.mu.Unlock()
355                 // TODO: Can this be allocated on the stack?
356                 msg := new(peer_protocol.Message)
357                 err := decoder.Decode(msg)
358                 me.mu.Lock()
359                 if err != nil {
360                         if me.stopped() || err == io.EOF {
361                                 return nil
362                         }
363                         return err
364                 }
365                 if msg.Keepalive {
366                         continue
367                 }
368                 switch msg.Type {
369                 case peer_protocol.Choke:
370                         conn.PeerChoked = true
371                         conn.Requests = nil
372                 case peer_protocol.Unchoke:
373                         conn.PeerChoked = false
374                         me.peerUnchoked(torrent, conn)
375                 case peer_protocol.Interested:
376                         conn.PeerInterested = true
377                         // TODO: This should be done from a dedicated unchoking routine.
378                         conn.Unchoke()
379                 case peer_protocol.NotInterested:
380                         conn.PeerInterested = false
381                 case peer_protocol.Have:
382                         me.peerGotPiece(torrent, conn, int(msg.Index))
383                 case peer_protocol.Request:
384                         if conn.PeerRequests == nil {
385                                 conn.PeerRequests = make(map[Request]struct{}, maxRequests)
386                         }
387                         request := Request{
388                                 Index:     msg.Index,
389                                 ChunkSpec: ChunkSpec{msg.Begin, msg.Length},
390                         }
391                         conn.PeerRequests[request] = struct{}{}
392                         // TODO: Requests should be satisfied from a dedicated upload routine.
393                         p := make([]byte, msg.Length)
394                         n, err := torrent.Data.ReadAt(p, int64(torrent.PieceLength(0))*int64(msg.Index)+int64(msg.Begin))
395                         if err != nil {
396                                 return fmt.Errorf("reading torrent data to serve request %s: %s", request, err)
397                         }
398                         if n != int(msg.Length) {
399                                 return fmt.Errorf("bad request: %s", msg)
400                         }
401                         conn.Post(peer_protocol.Message{
402                                 Type:  peer_protocol.Piece,
403                                 Index: msg.Index,
404                                 Begin: msg.Begin,
405                                 Piece: p,
406                         })
407                 case peer_protocol.Bitfield:
408                         if len(msg.Bitfield) < len(torrent.Pieces) {
409                                 err = errors.New("received invalid bitfield")
410                                 break
411                         }
412                         if conn.PeerPieces != nil {
413                                 err = errors.New("received unexpected bitfield")
414                                 break
415                         }
416                         conn.PeerPieces = msg.Bitfield[:len(torrent.Pieces)]
417                         for index, has := range conn.PeerPieces {
418                                 if has {
419                                         me.peerGotPiece(torrent, conn, index)
420                                 }
421                         }
422                 case peer_protocol.Piece:
423                         request_ := Request{msg.Index, ChunkSpec{msg.Begin, peer_protocol.Integer(len(msg.Piece))}}
424                         if _, ok := conn.Requests[request_]; !ok {
425                                 err = fmt.Errorf("unexpected piece: %s", request_)
426                                 break
427                         }
428                         delete(conn.Requests, request_)
429                         err = me.downloadedChunk(torrent, msg)
430                 default:
431                         log.Printf("received unknown message type: %#v", msg.Type)
432                 }
433                 if err != nil {
434                         return err
435                 }
436                 me.replenishConnRequests(torrent, conn)
437         }
438 }
439
440 func (me *Client) dropConnection(torrent *Torrent, conn *Connection) {
441         conn.Socket.Close()
442         for i0, c := range torrent.Conns {
443                 if c != conn {
444                         continue
445                 }
446                 i1 := len(torrent.Conns) - 1
447                 if i0 != i1 {
448                         torrent.Conns[i0] = torrent.Conns[i1]
449                 }
450                 torrent.Conns = torrent.Conns[:i1]
451                 return
452         }
453         panic("no such Connection")
454 }
455
456 func (me *Client) addConnection(t *Torrent, c *Connection) bool {
457         for _, c0 := range t.Conns {
458                 if c.PeerId == c0.PeerId {
459                         log.Printf("%s and %s have the same ID: %s", c.Socket.RemoteAddr(), c0.Socket.RemoteAddr(), c.PeerId)
460                         return false
461                 }
462         }
463         t.Conns = append(t.Conns, c)
464         return true
465 }
466
467 func (me *Client) openNewConns() {
468         for _, t := range me.torrents {
469                 for len(t.Peers) != 0 {
470                         if me.halfOpen >= me.HalfOpenLimit {
471                                 return
472                         }
473                         p := t.Peers[0]
474                         t.Peers = t.Peers[1:]
475                         me.initiateConn(p, t)
476                 }
477         }
478 }
479
480 func (me *Client) AddPeers(infoHash InfoHash, peers []Peer) error {
481         me.mu.Lock()
482         t := me.torrent(infoHash)
483         if t == nil {
484                 return errors.New("no such torrent")
485         }
486         t.Peers = append(t.Peers, peers...)
487         me.openNewConns()
488         me.mu.Unlock()
489         return nil
490 }
491
492 // Prepare a Torrent without any attachment to a Client. That means we can
493 // initialize fields all fields that don't require the Client without locking
494 // it.
495 func newTorrent(metaInfo *metainfo.MetaInfo, dataDir string) (torrent *Torrent, err error) {
496         torrent = &Torrent{
497                 InfoHash: BytesInfoHash(metaInfo.InfoHash),
498                 MetaInfo: metaInfo,
499         }
500         torrent.Data, err = mmapTorrentData(metaInfo, dataDir)
501         if err != nil {
502                 return
503         }
504         for offset := 0; offset < len(metaInfo.Pieces); offset += PieceHash.Size() {
505                 hash := metaInfo.Pieces[offset : offset+PieceHash.Size()]
506                 if len(hash) != PieceHash.Size() {
507                         err = errors.New("bad piece hash in metainfo")
508                         return
509                 }
510                 piece := &piece{}
511                 copyHashSum(piece.Hash[:], hash)
512                 torrent.Pieces = append(torrent.Pieces, piece)
513                 torrent.pendAllChunkSpecs(peer_protocol.Integer(len(torrent.Pieces) - 1))
514         }
515         torrent.Trackers = make([][]tracker.Client, len(metaInfo.AnnounceList))
516         for tierIndex := range metaInfo.AnnounceList {
517                 tier := torrent.Trackers[tierIndex]
518                 for _, url := range metaInfo.AnnounceList[tierIndex] {
519                         tr, err := tracker.New(url)
520                         if err != nil {
521                                 log.Print(err)
522                                 continue
523                         }
524                         tier = append(tier, tr)
525                 }
526                 // The trackers within each tier must be shuffled before use.
527                 // http://stackoverflow.com/a/12267471/149482
528                 // http://www.bittorrent.org/beps/bep_0012.html#order-of-processing
529                 for i := range tier {
530                         j := mathRand.Intn(i + 1)
531                         tier[i], tier[j] = tier[j], tier[i]
532                 }
533                 torrent.Trackers[tierIndex] = tier
534         }
535         return
536 }
537
538 func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) error {
539         torrent, err := newTorrent(metaInfo, me.DataDir)
540         if err != nil {
541                 return err
542         }
543         me.mu.Lock()
544         defer me.mu.Unlock()
545         if _, ok := me.torrents[torrent.InfoHash]; ok {
546                 return torrent.Close()
547         }
548         me.torrents[torrent.InfoHash] = torrent
549         if !me.DisableTrackers {
550                 go me.announceTorrent(torrent)
551         }
552         for i := range torrent.Pieces {
553                 me.queuePieceCheck(torrent, peer_protocol.Integer(i))
554         }
555         return nil
556 }
557
558 func (cl *Client) listenerAnnouncePort() (port int16) {
559         l := cl.Listener
560         if l == nil {
561                 return
562         }
563         addr := l.Addr()
564         switch data := addr.(type) {
565         case *net.TCPAddr:
566                 return int16(data.Port)
567         case *net.UDPAddr:
568                 return int16(data.Port)
569         default:
570                 log.Printf("unknown listener addr type: %T", addr)
571         }
572         return
573 }
574
575 func (cl *Client) announceTorrent(t *Torrent) {
576         req := tracker.AnnounceRequest{
577                 Event:   tracker.Started,
578                 NumWant: -1,
579                 Port:    cl.listenerAnnouncePort(),
580         }
581         req.PeerId = cl.PeerId
582         req.InfoHash = t.InfoHash
583 newAnnounce:
584         for {
585                 for _, tier := range t.Trackers {
586                         for trIndex, tr := range tier {
587                                 if err := tr.Connect(); err != nil {
588                                         log.Print(err)
589                                         continue
590                                 }
591                                 resp, err := tr.Announce(&req)
592                                 if err != nil {
593                                         log.Print(err)
594                                         continue
595                                 }
596                                 var peers []Peer
597                                 for _, peer := range resp.Peers {
598                                         peers = append(peers, Peer{
599                                                 IP:   peer.IP,
600                                                 Port: peer.Port,
601                                         })
602                                 }
603                                 if err := cl.AddPeers(t.InfoHash, peers); err != nil {
604                                         log.Print(err)
605                                         return
606                                 }
607                                 log.Printf("%d new peers from %s", len(peers), "TODO")
608                                 tier[0], tier[trIndex] = tier[trIndex], tier[0]
609                                 time.Sleep(time.Second * time.Duration(resp.Interval))
610                                 continue newAnnounce
611                         }
612                 }
613                 time.Sleep(time.Second)
614         }
615 }
616
617 func (cl *Client) allTorrentsCompleted() bool {
618         for _, t := range cl.torrents {
619                 if !t.haveAllPieces() {
620                         return false
621                 }
622         }
623         return true
624 }
625
626 func (me *Client) WaitAll() {
627         me.mu.Lock()
628         for !me.allTorrentsCompleted() {
629                 me.event.Wait()
630         }
631         me.mu.Unlock()
632 }
633
634 func (me *Client) replenishConnRequests(torrent *Torrent, conn *Connection) {
635         requestHeatMap := torrent.requestHeat()
636         addRequest := func(req Request) (again bool) {
637                 piece := torrent.Pieces[req.Index]
638                 if piece.Hashing {
639                         // We can't be sure we want this.
640                         return true
641                 }
642                 if piece.Complete() {
643                         // We already have this.
644                         return true
645                 }
646                 if requestHeatMap[req] > 0 {
647                         // We've already requested this.
648                         return true
649                 }
650                 return conn.Request(req)
651         }
652         // First request prioritized chunks.
653         if torrent.Priorities != nil {
654                 for e := torrent.Priorities.Front(); e != nil; e = e.Next() {
655                         if !addRequest(e.Value.(Request)) {
656                                 return
657                         }
658                 }
659         }
660         // Then finish off incomplete pieces in order of bytes remaining.
661         for _, index := range torrent.piecesByPendingBytesDesc() {
662                 if torrent.PieceNumPendingBytes(index) == torrent.PieceLength(index) {
663                         continue
664                 }
665                 for chunkSpec := range torrent.Pieces[index].PendingChunkSpecs {
666                         if !addRequest(Request{index, chunkSpec}) {
667                                 return
668                         }
669                 }
670         }
671         if len(conn.Requests) == 0 {
672                 conn.SetInterested(false)
673         }
674 }
675
676 func (me *Client) downloadedChunk(torrent *Torrent, msg *peer_protocol.Message) (err error) {
677         request := Request{msg.Index, ChunkSpec{msg.Begin, peer_protocol.Integer(len(msg.Piece))}}
678         if _, ok := torrent.Pieces[request.Index].PendingChunkSpecs[request.ChunkSpec]; !ok {
679                 log.Printf("got unnecessary chunk: %s", request)
680                 return
681         }
682         err = torrent.WriteChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
683         if err != nil {
684                 return
685         }
686         delete(torrent.Pieces[request.Index].PendingChunkSpecs, request.ChunkSpec)
687         if len(torrent.Pieces[request.Index].PendingChunkSpecs) == 0 {
688                 me.queuePieceCheck(torrent, request.Index)
689         }
690         var next *list.Element
691         for e := torrent.Priorities.Front(); e != nil; e = next {
692                 next = e.Next()
693                 if e.Value.(Request) == request {
694                         torrent.Priorities.Remove(e)
695                 }
696         }
697         me.dataReady(DataSpec{torrent.InfoHash, request})
698         return
699 }
700
701 func (cl *Client) dataReady(ds DataSpec) {
702         if cl.dataWaiter != nil {
703                 close(cl.dataWaiter)
704         }
705         cl.dataWaiter = nil
706 }
707
708 func (cl *Client) DataWaiter() <-chan struct{} {
709         cl.Lock()
710         defer cl.Unlock()
711         if cl.dataWaiter == nil {
712                 cl.dataWaiter = make(chan struct{})
713         }
714         return cl.dataWaiter
715 }
716
717 func (me *Client) pieceHashed(t *Torrent, piece peer_protocol.Integer, correct bool) {
718         p := t.Pieces[piece]
719         p.EverHashed = true
720         if correct {
721                 p.PendingChunkSpecs = nil
722                 log.Printf("got piece %d, (%d/%d)", piece, t.NumPiecesCompleted(), t.NumPieces())
723                 var next *list.Element
724                 if t.Priorities != nil {
725                         for e := t.Priorities.Front(); e != nil; e = next {
726                                 next = e.Next()
727                                 if e.Value.(Request).Index == piece {
728                                         t.Priorities.Remove(e)
729                                 }
730                         }
731                 }
732                 me.dataReady(DataSpec{
733                         t.InfoHash,
734                         Request{
735                                 peer_protocol.Integer(piece),
736                                 ChunkSpec{0, peer_protocol.Integer(t.PieceLength(piece))},
737                         },
738                 })
739         } else {
740                 if len(p.PendingChunkSpecs) == 0 {
741                         t.pendAllChunkSpecs(piece)
742                 }
743         }
744         for _, conn := range t.Conns {
745                 if correct {
746                         conn.Post(peer_protocol.Message{
747                                 Type:  peer_protocol.Have,
748                                 Index: peer_protocol.Integer(piece),
749                         })
750                         // TODO: Cancel requests for this piece.
751                 } else {
752                         if conn.PeerHasPiece(piece) {
753                                 me.replenishConnRequests(t, conn)
754                         }
755                 }
756         }
757         me.event.Broadcast()
758 }
759
760 func (cl *Client) verifyPiece(t *Torrent, index peer_protocol.Integer) {
761         cl.mu.Lock()
762         p := t.Pieces[index]
763         for p.Hashing {
764                 cl.event.Wait()
765         }
766         p.Hashing = true
767         p.QueuedForHash = false
768         cl.mu.Unlock()
769         sum := t.HashPiece(index)
770         cl.mu.Lock()
771         p.Hashing = false
772         cl.pieceHashed(t, index, sum == p.Hash)
773         cl.mu.Unlock()
774 }
775
776 func (me *Client) Torrents() (ret []*Torrent) {
777         me.mu.Lock()
778         for _, t := range me.torrents {
779                 ret = append(ret, t)
780         }
781         me.mu.Unlock()
782         return
783 }