]> Sergey Matveev's repositories - btrtrc.git/blob - client.go
Break up client.go into several files and a few fixes
[btrtrc.git] / client.go
1 package torrent
2
3 import (
4         "bufio"
5         "container/list"
6         "crypto/rand"
7         "encoding"
8         "errors"
9         "fmt"
10         "io"
11         "log"
12         mathRand "math/rand"
13         "net"
14         "os"
15         "sync"
16         "syscall"
17         "time"
18
19         metainfo "github.com/nsf/libtorgo/torrent"
20
21         "bitbucket.org/anacrolix/go.torrent/peer_protocol"
22         "bitbucket.org/anacrolix/go.torrent/tracker"
23         _ "bitbucket.org/anacrolix/go.torrent/tracker/udp"
24 )
25
26 // Currently doesn't really queue, but should in the future.
27 func (cl *Client) queuePieceCheck(t *Torrent, pieceIndex peer_protocol.Integer) {
28         piece := t.Pieces[pieceIndex]
29         if piece.QueuedForHash {
30                 return
31         }
32         piece.QueuedForHash = true
33         go cl.verifyPiece(t, pieceIndex)
34 }
35
36 func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) {
37         cl.mu.Lock()
38         defer cl.mu.Unlock()
39         t := cl.torrent(ih)
40         newPriorities := make([]Request, 0, (len_+2*(chunkSize-1))/chunkSize)
41         for len_ > 0 {
42                 // TODO: Write a function to return the Request for a given offset.
43                 req, ok := t.offsetRequest(off)
44                 if !ok {
45                         break
46                 }
47                 off += int64(req.Length)
48                 len_ -= int64(req.Length)
49                 if _, ok = t.Pieces[req.Index].PendingChunkSpecs[req.ChunkSpec]; !ok {
50                         continue
51                 }
52                 newPriorities = append(newPriorities, req)
53         }
54         if len(newPriorities) == 0 {
55                 return
56         }
57         if t.Priorities == nil {
58                 t.Priorities = list.New()
59         }
60         t.Priorities.PushFront(newPriorities[0])
61         for _, req := range newPriorities[1:] {
62                 t.Priorities.PushBack(req)
63         }
64         for _, cn := range t.Conns {
65                 cl.replenishConnRequests(t, cn)
66         }
67 }
68
69 type DataSpec struct {
70         InfoHash
71         Request
72 }
73
74 type Client struct {
75         DataDir         string
76         HalfOpenLimit   int
77         PeerId          [20]byte
78         Listener        net.Listener
79         DisableTrackers bool
80
81         sync.Mutex
82         mu    *sync.Mutex
83         event sync.Cond
84         quit  chan struct{}
85
86         halfOpen   int
87         torrents   map[InfoHash]*Torrent
88         dataWaiter chan struct{}
89 }
90
91 func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err error) {
92         cl.mu.Lock()
93         defer cl.mu.Unlock()
94         t := cl.torrent(ih)
95         if t == nil {
96                 err = errors.New("unknown torrent")
97                 return
98         }
99         index := peer_protocol.Integer(off / t.MetaInfo.PieceLength)
100         // Reading outside the bounds of a file is an error.
101         if index < 0 {
102                 err = os.ErrInvalid
103                 return
104         }
105         if int(index) >= len(t.Pieces) {
106                 err = io.EOF
107                 return
108         }
109         piece := t.Pieces[index]
110         if !piece.EverHashed {
111                 cl.queuePieceCheck(t, index)
112         }
113         if piece.Hashing {
114                 err = ErrDataNotReady
115                 return
116         }
117         pieceOff := peer_protocol.Integer(off % int64(t.PieceLength(0)))
118         high := int(t.PieceLength(index) - pieceOff)
119         if high < len(p) {
120                 p = p[:high]
121         }
122         for cs, _ := range piece.PendingChunkSpecs {
123                 chunkOff := int64(pieceOff) - int64(cs.Begin)
124                 if chunkOff >= int64(t.PieceLength(index)) {
125                         panic(chunkOff)
126                 }
127                 if 0 <= chunkOff && chunkOff < int64(cs.Length) {
128                         // read begins in a pending chunk
129                         err = ErrDataNotReady
130                         return
131                 }
132                 // pending chunk caps available data
133                 if chunkOff < 0 && int64(len(p)) > -chunkOff {
134                         p = p[:-chunkOff]
135                 }
136         }
137         return t.Data.ReadAt(p, off)
138 }
139
140 func (c *Client) Start() {
141         c.mu = &c.Mutex
142         c.event.L = c.mu
143         c.torrents = make(map[InfoHash]*Torrent)
144         if c.HalfOpenLimit == 0 {
145                 c.HalfOpenLimit = 10
146         }
147         o := copy(c.PeerId[:], BEP20)
148         _, err := rand.Read(c.PeerId[o:])
149         if err != nil {
150                 panic("error generating peer id")
151         }
152         c.quit = make(chan struct{})
153         if c.Listener != nil {
154                 go c.acceptConnections()
155         }
156 }
157
158 func (cl *Client) stopped() bool {
159         select {
160         case <-cl.quit:
161                 return true
162         default:
163                 return false
164         }
165 }
166
167 func (me *Client) Stop() {
168         me.Lock()
169         close(me.quit)
170         me.event.Broadcast()
171         for _, t := range me.torrents {
172                 for _, c := range t.Conns {
173                         c.Close()
174                 }
175         }
176         me.Unlock()
177 }
178
179 func (cl *Client) acceptConnections() {
180         for {
181                 conn, err := cl.Listener.Accept()
182                 select {
183                 case <-cl.quit:
184                         return
185                 default:
186                 }
187                 if err != nil {
188                         log.Print(err)
189                         return
190                 }
191                 go func() {
192                         if err := cl.runConnection(conn, nil); err != nil {
193                                 log.Print(err)
194                         }
195                 }()
196         }
197 }
198
199 func (me *Client) torrent(ih InfoHash) *Torrent {
200         for _, t := range me.torrents {
201                 if t.InfoHash == ih {
202                         return t
203                 }
204         }
205         return nil
206 }
207
208 func (me *Client) initiateConn(peer Peer, torrent *Torrent) {
209         if peer.Id == me.PeerId {
210                 return
211         }
212         me.halfOpen++
213         go func() {
214                 addr := &net.TCPAddr{
215                         IP:   peer.IP,
216                         Port: peer.Port,
217                 }
218                 conn, err := net.DialTimeout(addr.Network(), addr.String(), dialTimeout)
219
220                 go func() {
221                         me.mu.Lock()
222                         defer me.mu.Unlock()
223                         if me.halfOpen == 0 {
224                                 panic("assert")
225                         }
226                         me.halfOpen--
227                         me.openNewConns()
228                 }()
229
230                 if netOpErr, ok := err.(*net.OpError); ok {
231                         if netOpErr.Timeout() {
232                                 return
233                         }
234                         switch netOpErr.Err {
235                         case syscall.ECONNREFUSED:
236                                 return
237                         }
238                 }
239                 if err != nil {
240                         log.Printf("error connecting to peer: %s %#v", err, err)
241                         return
242                 }
243                 log.Printf("connected to %s", conn.RemoteAddr())
244                 err = me.runConnection(conn, torrent)
245                 if err != nil {
246                         log.Print(err)
247                 }
248         }()
249 }
250
251 func (me *Client) runConnection(sock net.Conn, torrent *Torrent) (err error) {
252         conn := &Connection{
253                 Socket:     sock,
254                 Choked:     true,
255                 PeerChoked: true,
256                 write:      make(chan []byte),
257                 post:       make(chan encoding.BinaryMarshaler),
258         }
259         defer func() {
260                 // There's a lock and deferred unlock later in this function. The
261                 // client will not be locked when this deferred is invoked.
262                 me.mu.Lock()
263                 defer me.mu.Unlock()
264                 conn.Close()
265         }()
266         go conn.writer()
267         go conn.writeOptimizer()
268         conn.post <- peer_protocol.Bytes(peer_protocol.Protocol)
269         conn.post <- peer_protocol.Bytes("\x00\x00\x00\x00\x00\x00\x00\x00")
270         if torrent != nil {
271                 conn.post <- peer_protocol.Bytes(torrent.InfoHash[:])
272                 conn.post <- peer_protocol.Bytes(me.PeerId[:])
273         }
274         var b [28]byte
275         _, err = io.ReadFull(conn.Socket, b[:])
276         if err == io.EOF {
277                 return nil
278         }
279         if err != nil {
280                 err = fmt.Errorf("when reading protocol and extensions: %s", err)
281                 return
282         }
283         if string(b[:20]) != peer_protocol.Protocol {
284                 err = fmt.Errorf("wrong protocol: %#v", string(b[:20]))
285                 return
286         }
287         if 8 != copy(conn.PeerExtensions[:], b[20:]) {
288                 panic("wtf")
289         }
290         // log.Printf("peer extensions: %#v", string(conn.PeerExtensions[:]))
291         var infoHash [20]byte
292         _, err = io.ReadFull(conn.Socket, infoHash[:])
293         if err != nil {
294                 return fmt.Errorf("reading peer info hash: %s", err)
295         }
296         _, err = io.ReadFull(conn.Socket, conn.PeerId[:])
297         if err != nil {
298                 return fmt.Errorf("reading peer id: %s", err)
299         }
300         if torrent == nil {
301                 torrent = me.torrent(infoHash)
302                 if torrent == nil {
303                         return
304                 }
305                 conn.post <- peer_protocol.Bytes(torrent.InfoHash[:])
306                 conn.post <- peer_protocol.Bytes(me.PeerId[:])
307         }
308         me.mu.Lock()
309         defer me.mu.Unlock()
310         if !me.addConnection(torrent, conn) {
311                 return
312         }
313         if torrent.haveAnyPieces() {
314                 conn.Post(peer_protocol.Message{
315                         Type:     peer_protocol.Bitfield,
316                         Bitfield: torrent.bitfield(),
317                 })
318         }
319         err = me.connectionLoop(torrent, conn)
320         if err != nil {
321                 err = fmt.Errorf("during Connection loop: %s", err)
322         }
323         me.dropConnection(torrent, conn)
324         return
325 }
326
327 func (me *Client) peerGotPiece(torrent *Torrent, conn *Connection, piece int) {
328         if conn.PeerPieces == nil {
329                 conn.PeerPieces = make([]bool, len(torrent.Pieces))
330         }
331         conn.PeerPieces[piece] = true
332         if torrent.wantPiece(piece) {
333                 me.replenishConnRequests(torrent, conn)
334         }
335 }
336
337 func (me *Client) peerUnchoked(torrent *Torrent, conn *Connection) {
338         me.replenishConnRequests(torrent, conn)
339 }
340
341 func (me *Client) connectionLoop(torrent *Torrent, conn *Connection) error {
342         decoder := peer_protocol.Decoder{
343                 R:         bufio.NewReader(conn.Socket),
344                 MaxLength: 256 * 1024,
345         }
346         for {
347                 me.mu.Unlock()
348                 // TODO: Can this be allocated on the stack?
349                 msg := new(peer_protocol.Message)
350                 err := decoder.Decode(msg)
351                 me.mu.Lock()
352                 if err != nil {
353                         if me.stopped() || err == io.EOF {
354                                 return nil
355                         }
356                         return err
357                 }
358                 if msg.Keepalive {
359                         continue
360                 }
361                 switch msg.Type {
362                 case peer_protocol.Choke:
363                         conn.PeerChoked = true
364                         conn.Requests = nil
365                 case peer_protocol.Unchoke:
366                         conn.PeerChoked = false
367                         me.peerUnchoked(torrent, conn)
368                 case peer_protocol.Interested:
369                         conn.PeerInterested = true
370                         // TODO: This should be done from a dedicated unchoking routine.
371                         conn.Unchoke()
372                 case peer_protocol.NotInterested:
373                         conn.PeerInterested = false
374                 case peer_protocol.Have:
375                         me.peerGotPiece(torrent, conn, int(msg.Index))
376                 case peer_protocol.Request:
377                         if conn.PeerRequests == nil {
378                                 conn.PeerRequests = make(map[Request]struct{}, maxRequests)
379                         }
380                         request := Request{
381                                 Index:     msg.Index,
382                                 ChunkSpec: ChunkSpec{msg.Begin, msg.Length},
383                         }
384                         conn.PeerRequests[request] = struct{}{}
385                         // TODO: Requests should be satisfied from a dedicated upload routine.
386                         p := make([]byte, msg.Length)
387                         n, err := torrent.Data.ReadAt(p, int64(torrent.PieceLength(0))*int64(msg.Index)+int64(msg.Begin))
388                         if err != nil {
389                                 return fmt.Errorf("reading torrent data to serve request %s: %s", request, err)
390                         }
391                         if n != int(msg.Length) {
392                                 return fmt.Errorf("bad request: %s", msg)
393                         }
394                         conn.Post(peer_protocol.Message{
395                                 Type:  peer_protocol.Piece,
396                                 Index: msg.Index,
397                                 Begin: msg.Begin,
398                                 Piece: p,
399                         })
400                 case peer_protocol.Bitfield:
401                         if len(msg.Bitfield) < len(torrent.Pieces) {
402                                 err = errors.New("received invalid bitfield")
403                                 break
404                         }
405                         if conn.PeerPieces != nil {
406                                 err = errors.New("received unexpected bitfield")
407                                 break
408                         }
409                         conn.PeerPieces = msg.Bitfield[:len(torrent.Pieces)]
410                         for index, has := range conn.PeerPieces {
411                                 if has {
412                                         me.peerGotPiece(torrent, conn, index)
413                                 }
414                         }
415                 case peer_protocol.Piece:
416                         request_ := Request{msg.Index, ChunkSpec{msg.Begin, peer_protocol.Integer(len(msg.Piece))}}
417                         if _, ok := conn.Requests[request_]; !ok {
418                                 err = fmt.Errorf("unexpected piece: %s", request_)
419                                 break
420                         }
421                         delete(conn.Requests, request_)
422                         err = me.downloadedChunk(torrent, msg)
423                 default:
424                         log.Printf("received unknown message type: %#v", msg.Type)
425                 }
426                 if err != nil {
427                         return err
428                 }
429                 me.replenishConnRequests(torrent, conn)
430         }
431 }
432
433 func (me *Client) dropConnection(torrent *Torrent, conn *Connection) {
434         conn.Socket.Close()
435         for i0, c := range torrent.Conns {
436                 if c != conn {
437                         continue
438                 }
439                 i1 := len(torrent.Conns) - 1
440                 if i0 != i1 {
441                         torrent.Conns[i0] = torrent.Conns[i1]
442                 }
443                 torrent.Conns = torrent.Conns[:i1]
444                 return
445         }
446         panic("no such Connection")
447 }
448
449 func (me *Client) addConnection(t *Torrent, c *Connection) bool {
450         for _, c0 := range t.Conns {
451                 if c.PeerId == c0.PeerId {
452                         log.Printf("%s and %s have the same ID: %s", c.Socket.RemoteAddr(), c0.Socket.RemoteAddr(), c.PeerId)
453                         return false
454                 }
455         }
456         t.Conns = append(t.Conns, c)
457         return true
458 }
459
460 func (me *Client) openNewConns() {
461         for _, t := range me.torrents {
462                 for len(t.Peers) != 0 {
463                         if me.halfOpen >= me.HalfOpenLimit {
464                                 return
465                         }
466                         p := t.Peers[0]
467                         t.Peers = t.Peers[1:]
468                         me.initiateConn(p, t)
469                 }
470         }
471 }
472
473 func (me *Client) AddPeers(infoHash InfoHash, peers []Peer) error {
474         me.mu.Lock()
475         t := me.torrent(infoHash)
476         if t == nil {
477                 return errors.New("no such torrent")
478         }
479         t.Peers = append(t.Peers, peers...)
480         me.openNewConns()
481         me.mu.Unlock()
482         return nil
483 }
484
485 // Prepare a Torrent without any attachment to a Client. That means we can
486 // initialize fields all fields that don't require the Client without locking
487 // it.
488 func newTorrent(metaInfo *metainfo.MetaInfo, dataDir string) (torrent *Torrent, err error) {
489         torrent = &Torrent{
490                 InfoHash: BytesInfoHash(metaInfo.InfoHash),
491                 MetaInfo: metaInfo,
492         }
493         torrent.Data, err = mmapTorrentData(metaInfo, dataDir)
494         if err != nil {
495                 return
496         }
497         for offset := 0; offset < len(metaInfo.Pieces); offset += PieceHash.Size() {
498                 hash := metaInfo.Pieces[offset : offset+PieceHash.Size()]
499                 if len(hash) != PieceHash.Size() {
500                         err = errors.New("bad piece hash in metainfo")
501                         return
502                 }
503                 piece := &piece{}
504                 copyHashSum(piece.Hash[:], hash)
505                 torrent.Pieces = append(torrent.Pieces, piece)
506                 torrent.pendAllChunkSpecs(peer_protocol.Integer(len(torrent.Pieces) - 1))
507         }
508         torrent.Trackers = make([][]tracker.Client, len(metaInfo.AnnounceList))
509         for tierIndex := range metaInfo.AnnounceList {
510                 tier := torrent.Trackers[tierIndex]
511                 for _, url := range metaInfo.AnnounceList[tierIndex] {
512                         tr, err := tracker.New(url)
513                         if err != nil {
514                                 log.Print(err)
515                                 continue
516                         }
517                         tier = append(tier, tr)
518                 }
519                 // The trackers within each tier must be shuffled before use.
520                 // http://stackoverflow.com/a/12267471/149482
521                 // http://www.bittorrent.org/beps/bep_0012.html#order-of-processing
522                 for i := range tier {
523                         j := mathRand.Intn(i + 1)
524                         tier[i], tier[j] = tier[j], tier[i]
525                 }
526                 torrent.Trackers[tierIndex] = tier
527         }
528         return
529 }
530
531 func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) error {
532         torrent, err := newTorrent(metaInfo, me.DataDir)
533         if err != nil {
534                 return err
535         }
536         me.mu.Lock()
537         defer me.mu.Unlock()
538         if _, ok := me.torrents[torrent.InfoHash]; ok {
539                 return torrent.Close()
540         }
541         me.torrents[torrent.InfoHash] = torrent
542         if !me.DisableTrackers {
543                 go me.announceTorrent(torrent)
544         }
545         for i := range torrent.Pieces {
546                 me.queuePieceCheck(torrent, peer_protocol.Integer(i))
547         }
548         return nil
549 }
550
551 func (cl *Client) listenerAnnouncePort() (port int16) {
552         l := cl.Listener
553         if l == nil {
554                 return
555         }
556         addr := l.Addr()
557         switch data := addr.(type) {
558         case *net.TCPAddr:
559                 return int16(data.Port)
560         case *net.UDPAddr:
561                 return int16(data.Port)
562         default:
563                 log.Printf("unknown listener addr type: %T", addr)
564         }
565         return
566 }
567
568 func (cl *Client) announceTorrent(t *Torrent) {
569         req := tracker.AnnounceRequest{
570                 Event:   tracker.Started,
571                 NumWant: -1,
572                 Port:    cl.listenerAnnouncePort(),
573         }
574         req.PeerId = cl.PeerId
575         req.InfoHash = t.InfoHash
576 newAnnounce:
577         for {
578                 for _, tier := range t.Trackers {
579                         for trIndex, tr := range tier {
580                                 if err := tr.Connect(); err != nil {
581                                         log.Print(err)
582                                         continue
583                                 }
584                                 resp, err := tr.Announce(&req)
585                                 if err != nil {
586                                         log.Print(err)
587                                         continue
588                                 }
589                                 var peers []Peer
590                                 for _, peer := range resp.Peers {
591                                         peers = append(peers, Peer{
592                                                 IP:   peer.IP,
593                                                 Port: peer.Port,
594                                         })
595                                 }
596                                 if err := cl.AddPeers(t.InfoHash, peers); err != nil {
597                                         log.Print(err)
598                                         return
599                                 }
600                                 log.Printf("%d new peers from %s", len(peers), "TODO")
601                                 tier[0], tier[trIndex] = tier[trIndex], tier[0]
602                                 time.Sleep(time.Second * time.Duration(resp.Interval))
603                                 continue newAnnounce
604                         }
605                 }
606                 time.Sleep(time.Second)
607         }
608 }
609
610 func (cl *Client) allTorrentsCompleted() bool {
611         for _, t := range cl.torrents {
612                 if !t.haveAllPieces() {
613                         return false
614                 }
615         }
616         return true
617 }
618
619 func (me *Client) WaitAll() {
620         me.mu.Lock()
621         for !me.allTorrentsCompleted() {
622                 me.event.Wait()
623         }
624         me.mu.Unlock()
625 }
626
627 func (me *Client) replenishConnRequests(torrent *Torrent, conn *Connection) {
628         requestHeatMap := torrent.requestHeat()
629         addRequest := func(req Request) (again bool) {
630                 piece := torrent.Pieces[req.Index]
631                 if piece.Hashing {
632                         // We can't be sure we want this.
633                         return true
634                 }
635                 if piece.Complete() {
636                         // We already have this.
637                         return true
638                 }
639                 if requestHeatMap[req] > 0 {
640                         // We've already requested this.
641                         return true
642                 }
643                 return conn.Request(req)
644         }
645         // First request prioritized chunks.
646         if torrent.Priorities != nil {
647                 for e := torrent.Priorities.Front(); e != nil; e = e.Next() {
648                         if !addRequest(e.Value.(Request)) {
649                                 return
650                         }
651                 }
652         }
653         // Then finish off incomplete pieces in order of bytes remaining.
654         for _, index := range torrent.piecesByPendingBytesDesc() {
655                 if torrent.PieceNumPendingBytes(index) == torrent.PieceLength(index) {
656                         continue
657                 }
658                 for chunkSpec := range torrent.Pieces[index].PendingChunkSpecs {
659                         if !addRequest(Request{index, chunkSpec}) {
660                                 return
661                         }
662                 }
663         }
664         if len(conn.Requests) == 0 {
665                 conn.SetInterested(false)
666         }
667 }
668
669 func (me *Client) downloadedChunk(torrent *Torrent, msg *peer_protocol.Message) (err error) {
670         request := Request{msg.Index, ChunkSpec{msg.Begin, peer_protocol.Integer(len(msg.Piece))}}
671         if _, ok := torrent.Pieces[request.Index].PendingChunkSpecs[request.ChunkSpec]; !ok {
672                 log.Printf("got unnecessary chunk: %s", request)
673                 return
674         }
675         err = torrent.WriteChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
676         if err != nil {
677                 return
678         }
679         delete(torrent.Pieces[request.Index].PendingChunkSpecs, request.ChunkSpec)
680         if len(torrent.Pieces[request.Index].PendingChunkSpecs) == 0 {
681                 me.queuePieceCheck(torrent, request.Index)
682         }
683         var next *list.Element
684         for e := torrent.Priorities.Front(); e != nil; e = next {
685                 next = e.Next()
686                 if e.Value.(Request) == request {
687                         torrent.Priorities.Remove(e)
688                 }
689         }
690         me.dataReady(DataSpec{torrent.InfoHash, request})
691         return
692 }
693
694 func (cl *Client) dataReady(ds DataSpec) {
695         if cl.dataWaiter != nil {
696                 close(cl.dataWaiter)
697         }
698         cl.dataWaiter = nil
699 }
700
701 func (cl *Client) DataWaiter() <-chan struct{} {
702         cl.Lock()
703         defer cl.Unlock()
704         if cl.dataWaiter == nil {
705                 cl.dataWaiter = make(chan struct{})
706         }
707         return cl.dataWaiter
708 }
709
710 func (me *Client) pieceHashed(t *Torrent, piece peer_protocol.Integer, correct bool) {
711         p := t.Pieces[piece]
712         p.EverHashed = true
713         if correct {
714                 p.PendingChunkSpecs = nil
715                 log.Printf("got piece %d, (%d/%d)", piece, t.NumPiecesCompleted(), t.NumPieces())
716                 var next *list.Element
717                 if t.Priorities != nil {
718                         for e := t.Priorities.Front(); e != nil; e = next {
719                                 next = e.Next()
720                                 if e.Value.(Request).Index == piece {
721                                         t.Priorities.Remove(e)
722                                 }
723                         }
724                 }
725                 me.dataReady(DataSpec{
726                         t.InfoHash,
727                         Request{
728                                 peer_protocol.Integer(piece),
729                                 ChunkSpec{0, peer_protocol.Integer(t.PieceLength(piece))},
730                         },
731                 })
732         } else {
733                 if len(p.PendingChunkSpecs) == 0 {
734                         t.pendAllChunkSpecs(piece)
735                 }
736         }
737         for _, conn := range t.Conns {
738                 if correct {
739                         conn.Post(peer_protocol.Message{
740                                 Type:  peer_protocol.Have,
741                                 Index: peer_protocol.Integer(piece),
742                         })
743                         // TODO: Cancel requests for this piece.
744                 } else {
745                         if conn.PeerHasPiece(piece) {
746                                 me.replenishConnRequests(t, conn)
747                         }
748                 }
749         }
750         me.event.Broadcast()
751 }
752
753 func (cl *Client) verifyPiece(t *Torrent, index peer_protocol.Integer) {
754         cl.mu.Lock()
755         p := t.Pieces[index]
756         for p.Hashing {
757                 cl.event.Wait()
758         }
759         p.Hashing = true
760         p.QueuedForHash = false
761         cl.mu.Unlock()
762         sum := t.HashPiece(index)
763         cl.mu.Lock()
764         p.Hashing = false
765         cl.pieceHashed(t, index, sum == p.Hash)
766         cl.mu.Unlock()
767 }
768
769 func (me *Client) Torrents() (ret []*Torrent) {
770         me.mu.Lock()
771         for _, t := range me.torrents {
772                 ret = append(ret, t)
773         }
774         me.mu.Unlock()
775         return
776 }