]> Sergey Matveev's repositories - btrtrc.git/blob - connection.go
20fec26f4a3516db04be0e2df1ad23869b499e14
[btrtrc.git] / connection.go
1 package torrent
2
3 import (
4         "bufio"
5         "bytes"
6         "errors"
7         "expvar"
8         "fmt"
9         "io"
10         "log"
11         "math/rand"
12         "net"
13         "strconv"
14         "strings"
15         "sync"
16         "time"
17
18         "github.com/anacrolix/missinggo"
19         "github.com/anacrolix/missinggo/bitmap"
20         "github.com/anacrolix/missinggo/iter"
21         "github.com/anacrolix/missinggo/prioritybitmap"
22
23         "github.com/anacrolix/torrent/bencode"
24         pp "github.com/anacrolix/torrent/peer_protocol"
25 )
26
27 var optimizedCancels = expvar.NewInt("optimizedCancels")
28
29 type peerSource string
30
31 const (
32         peerSourceTracker         = "T" // It's the default.
33         peerSourceIncoming        = "I"
34         peerSourceDHTGetPeers     = "Hg"
35         peerSourceDHTAnnouncePeer = "Ha"
36         peerSourcePEX             = "X"
37 )
38
39 // Maintains the state of a connection with a peer.
40 type connection struct {
41         t *Torrent
42         // The actual Conn, used for closing, and setting socket options.
43         conn net.Conn
44         // The Reader and Writer for this Conn, with hooks installed for stats,
45         // limiting, deadlines etc.
46         w io.Writer
47         r io.Reader
48         // True if the connection is operating over MSE obfuscation.
49         encrypted bool
50         Discovery peerSource
51         uTP       bool
52         closed    missinggo.Event
53
54         stats                  ConnStats
55         UnwantedChunksReceived int
56         UsefulChunksReceived   int
57         chunksSent             int
58         goodPiecesDirtied      int
59         badPiecesDirtied       int
60
61         lastMessageReceived     time.Time
62         completedHandshake      time.Time
63         lastUsefulChunkReceived time.Time
64         lastChunkSent           time.Time
65
66         // Stuff controlled by the local peer.
67         Interested       bool
68         Choked           bool
69         requests         map[request]struct{}
70         requestsLowWater int
71         // Indexed by metadata piece, set to true if posted and pending a
72         // response.
73         metadataRequests []bool
74         sentHaves        []bool
75
76         // Stuff controlled by the remote peer.
77         PeerID             [20]byte
78         PeerInterested     bool
79         PeerChoked         bool
80         PeerRequests       map[request]struct{}
81         PeerExtensionBytes peerExtensionBytes
82         // The pieces the peer has claimed to have.
83         peerPieces bitmap.Bitmap
84         // The peer has everything. This can occur due to a special message, when
85         // we may not even know the number of pieces in the torrent yet.
86         peerHasAll bool
87         // The highest possible number of pieces the torrent could have based on
88         // communication with the peer. Generally only useful until we have the
89         // torrent info.
90         peerMinPieces int
91         // Pieces we've accepted chunks for from the peer.
92         peerTouchedPieces map[int]struct{}
93
94         PeerMaxRequests  int // Maximum pending requests the peer allows.
95         PeerExtensionIDs map[string]byte
96         PeerClientName   string
97
98         pieceInclination  []int
99         pieceRequestOrder prioritybitmap.PriorityBitmap
100
101         postedBuffer bytes.Buffer
102         writerCond   sync.Cond
103 }
104
105 func (cn *connection) mu() sync.Locker {
106         return &cn.t.cl.mu
107 }
108
109 func (cn *connection) remoteAddr() net.Addr {
110         return cn.conn.RemoteAddr()
111 }
112
113 func (cn *connection) localAddr() net.Addr {
114         return cn.conn.LocalAddr()
115 }
116
117 func (cn *connection) supportsExtension(ext string) bool {
118         _, ok := cn.PeerExtensionIDs[ext]
119         return ok
120 }
121
122 // The best guess at number of pieces in the torrent for this peer.
123 func (cn *connection) bestPeerNumPieces() int {
124         if cn.t.haveInfo() {
125                 return cn.t.numPieces()
126         }
127         return cn.peerMinPieces
128 }
129
130 func (cn *connection) completedString() string {
131         return fmt.Sprintf("%d/%d", cn.peerPieces.Len(), cn.bestPeerNumPieces())
132 }
133
134 // Correct the PeerPieces slice length. Return false if the existing slice is
135 // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
136 // messages.
137 func (cn *connection) setNumPieces(num int) error {
138         cn.peerPieces.RemoveRange(num, -1)
139         cn.peerPiecesChanged()
140         return nil
141 }
142
143 func eventAgeString(t time.Time) string {
144         if t.IsZero() {
145                 return "never"
146         }
147         return fmt.Sprintf("%.2fs ago", time.Now().Sub(t).Seconds())
148 }
149
150 func (cn *connection) connectionFlags() (ret string) {
151         c := func(b byte) {
152                 ret += string([]byte{b})
153         }
154         if cn.encrypted {
155                 c('E')
156         }
157         ret += string(cn.Discovery)
158         if cn.uTP {
159                 c('T')
160         }
161         return
162 }
163
164 // Inspired by https://trac.transmissionbt.com/wiki/PeerStatusText
165 func (cn *connection) statusFlags() (ret string) {
166         c := func(b byte) {
167                 ret += string([]byte{b})
168         }
169         if cn.Interested {
170                 c('i')
171         }
172         if cn.Choked {
173                 c('c')
174         }
175         c('-')
176         ret += cn.connectionFlags()
177         c('-')
178         if cn.PeerInterested {
179                 c('i')
180         }
181         if cn.PeerChoked {
182                 c('c')
183         }
184         return
185 }
186
187 func (cn *connection) String() string {
188         var buf bytes.Buffer
189         cn.WriteStatus(&buf, nil)
190         return buf.String()
191 }
192
193 func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
194         // \t isn't preserved in <pre> blocks?
195         fmt.Fprintf(w, "%+q: %s-%s\n", cn.PeerID, cn.localAddr(), cn.remoteAddr())
196         fmt.Fprintf(w, "    last msg: %s, connected: %s, last useful chunk: %s\n",
197                 eventAgeString(cn.lastMessageReceived),
198                 eventAgeString(cn.completedHandshake),
199                 eventAgeString(cn.lastUsefulChunkReceived))
200         fmt.Fprintf(w,
201                 "    %s completed, %d pieces touched, good chunks: %d/%d-%d reqq: %d-%d, flags: %s\n",
202                 cn.completedString(),
203                 len(cn.peerTouchedPieces),
204                 cn.UsefulChunksReceived,
205                 cn.UnwantedChunksReceived+cn.UsefulChunksReceived,
206                 cn.chunksSent,
207                 cn.numLocalRequests(),
208                 len(cn.PeerRequests),
209                 cn.statusFlags(),
210         )
211         fmt.Fprintf(w, "    next pieces: %v\n", priorityBitmapHeadAsSlice(&cn.pieceRequestOrder, 10))
212 }
213
214 func priorityBitmapHeadAsSlice(pb *prioritybitmap.PriorityBitmap, n int) (ret []int) {
215         pb.IterTyped(func(i int) bool {
216                 if len(ret) >= n {
217                         return false
218                 }
219                 ret = append(ret, i)
220                 return true
221         })
222         return
223 }
224
225 func (cn *connection) Close() {
226         cn.closed.Set()
227         cn.discardPieceInclination()
228         cn.pieceRequestOrder.Clear()
229         if cn.conn != nil {
230                 // TODO: This call blocks sometimes, why?
231                 go cn.conn.Close()
232         }
233 }
234
235 func (cn *connection) PeerHasPiece(piece int) bool {
236         return cn.peerHasAll || cn.peerPieces.Contains(piece)
237 }
238
239 func (cn *connection) Post(msg pp.Message) {
240         postedMessageTypes.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
241         cn.postedBuffer.Write(msg.MustMarshalBinary())
242         cn.writerCond.Broadcast()
243 }
244
245 func (cn *connection) RequestPending(r request) bool {
246         _, ok := cn.requests[r]
247         return ok
248 }
249
250 func (cn *connection) requestMetadataPiece(index int) {
251         eID := cn.PeerExtensionIDs["ut_metadata"]
252         if eID == 0 {
253                 return
254         }
255         if index < len(cn.metadataRequests) && cn.metadataRequests[index] {
256                 return
257         }
258         cn.Post(pp.Message{
259                 Type:       pp.Extended,
260                 ExtendedID: eID,
261                 ExtendedPayload: func() []byte {
262                         b, err := bencode.Marshal(map[string]int{
263                                 "msg_type": pp.RequestMetadataExtensionMsgType,
264                                 "piece":    index,
265                         })
266                         if err != nil {
267                                 panic(err)
268                         }
269                         return b
270                 }(),
271         })
272         for index >= len(cn.metadataRequests) {
273                 cn.metadataRequests = append(cn.metadataRequests, false)
274         }
275         cn.metadataRequests[index] = true
276 }
277
278 func (cn *connection) requestedMetadataPiece(index int) bool {
279         return index < len(cn.metadataRequests) && cn.metadataRequests[index]
280 }
281
282 // The actual value to use as the maximum outbound requests.
283 func (cn *connection) nominalMaxRequests() (ret int) {
284         ret = cn.PeerMaxRequests
285         if ret > 64 {
286                 ret = 64
287         }
288         return
289 }
290
291 // Returns true if an unsatisfied request was canceled.
292 func (cn *connection) PeerCancel(r request) bool {
293         if cn.PeerRequests == nil {
294                 return false
295         }
296         if _, ok := cn.PeerRequests[r]; !ok {
297                 return false
298         }
299         delete(cn.PeerRequests, r)
300         return true
301 }
302
303 func (cn *connection) Choke() {
304         if cn.Choked {
305                 return
306         }
307         cn.Post(pp.Message{
308                 Type: pp.Choke,
309         })
310         cn.PeerRequests = nil
311         cn.Choked = true
312 }
313
314 func (cn *connection) Unchoke() {
315         if !cn.Choked {
316                 return
317         }
318         cn.Post(pp.Message{
319                 Type: pp.Unchoke,
320         })
321         cn.Choked = false
322 }
323
324 func (cn *connection) SetInterested(interested bool, msg func(pp.Message) bool) bool {
325         if cn.Interested == interested {
326                 return true
327         }
328         cn.Interested = interested
329         // log.Printf("%p: setting interest: %v", cn, interested)
330         return msg(pp.Message{
331                 Type: func() pp.MessageType {
332                         if interested {
333                                 return pp.Interested
334                         } else {
335                                 return pp.NotInterested
336                         }
337                 }(),
338         })
339 }
340
341 var (
342         // Track connection writer buffer writes and flushes, to determine its
343         // efficiency.
344         connectionWriterFlush = expvar.NewInt("connectionWriterFlush")
345         connectionWriterWrite = expvar.NewInt("connectionWriterWrite")
346 )
347
348 func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) {
349         rs, i := cn.desiredRequestState()
350         if !cn.SetInterested(i, msg) {
351                 return
352         }
353         for r := range cn.requests {
354                 if _, ok := rs[r]; !ok {
355                         delete(cn.requests, r)
356                         // log.Printf("%p: cancelling request: %v", cn, r)
357                         if !msg(pp.Message{
358                                 Type:   pp.Cancel,
359                                 Index:  r.Index,
360                                 Begin:  r.Begin,
361                                 Length: r.Length,
362                         }) {
363                                 return
364                         }
365                 }
366         }
367         for r := range rs {
368                 if _, ok := cn.requests[r]; !ok {
369                         if cn.requests == nil {
370                                 cn.requests = make(map[request]struct{}, cn.nominalMaxRequests())
371                         }
372                         cn.requests[r] = struct{}{}
373                         // log.Printf("%p: requesting %v", cn, r)
374                         if !msg(pp.Message{
375                                 Type:   pp.Request,
376                                 Index:  r.Index,
377                                 Begin:  r.Begin,
378                                 Length: r.Length,
379                         }) {
380                                 return
381                         }
382                 }
383         }
384 }
385
386 // Writes buffers to the socket from the write channel.
387 func (cn *connection) writer(keepAliveTimeout time.Duration) {
388         var (
389                 buf       bytes.Buffer
390                 lastWrite time.Time = time.Now()
391         )
392         var keepAliveTimer *time.Timer
393         keepAliveTimer = time.AfterFunc(keepAliveTimeout, func() {
394                 cn.mu().Lock()
395                 defer cn.mu().Unlock()
396                 if time.Since(lastWrite) >= keepAliveTimeout {
397                         cn.writerCond.Broadcast()
398                 }
399                 keepAliveTimer.Reset(keepAliveTimeout)
400         })
401         cn.mu().Lock()
402         defer cn.mu().Unlock()
403         defer cn.Close()
404         defer keepAliveTimer.Stop()
405         for {
406                 buf.Write(cn.postedBuffer.Bytes())
407                 cn.postedBuffer.Reset()
408                 if buf.Len() == 0 {
409                         cn.fillWriteBuffer(func(msg pp.Message) bool {
410                                 buf.Write(msg.MustMarshalBinary())
411                                 return buf.Len() < 1<<16
412                         })
413                 }
414                 if buf.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout {
415                         buf.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
416                         postedKeepalives.Add(1)
417                 }
418                 if buf.Len() == 0 {
419                         cn.writerCond.Wait()
420                         continue
421                 }
422                 cn.mu().Unlock()
423                 // log.Printf("writing %d bytes", buf.Len())
424                 n, err := cn.w.Write(buf.Bytes())
425                 cn.mu().Lock()
426                 if n != 0 {
427                         lastWrite = time.Now()
428                         keepAliveTimer.Reset(keepAliveTimeout)
429                 }
430                 if err != nil {
431                         return
432                 }
433                 if n != buf.Len() {
434                         panic("short write")
435                 }
436                 buf.Reset()
437         }
438 }
439
440 func (cn *connection) Have(piece int) {
441         for piece >= len(cn.sentHaves) {
442                 cn.sentHaves = append(cn.sentHaves, false)
443         }
444         if cn.sentHaves[piece] {
445                 return
446         }
447         cn.Post(pp.Message{
448                 Type:  pp.Have,
449                 Index: pp.Integer(piece),
450         })
451         cn.sentHaves[piece] = true
452 }
453
454 func (cn *connection) Bitfield(haves []bool) {
455         if cn.sentHaves != nil {
456                 panic("bitfield must be first have-related message sent")
457         }
458         cn.Post(pp.Message{
459                 Type:     pp.Bitfield,
460                 Bitfield: haves,
461         })
462         // Make a copy of haves, as that's read when the message is marshalled
463         // without the lock. Also it obviously shouldn't change in the Msg due to
464         // changes in .sentHaves.
465         cn.sentHaves = append([]bool(nil), haves...)
466 }
467
468 func nextRequestState(
469         networkingEnabled bool,
470         currentRequests map[request]struct{},
471         peerChoking bool,
472         nextPieces *prioritybitmap.PriorityBitmap,
473         pendingChunks func(piece int, f func(chunkSpec) bool) bool,
474         requestsLowWater int,
475         requestsHighWater int,
476 ) (
477         requests map[request]struct{},
478         interested bool,
479 ) {
480         if !networkingEnabled || nextPieces.IsEmpty() {
481                 return nil, false
482         }
483         if peerChoking || len(currentRequests) > requestsLowWater {
484                 return currentRequests, true
485         }
486         requests = make(map[request]struct{}, requestsHighWater)
487         for r := range currentRequests {
488                 requests[r] = struct{}{}
489         }
490         nextPieces.IterTyped(func(piece int) bool {
491                 return pendingChunks(piece, func(cs chunkSpec) bool {
492                         if len(requests) >= requestsHighWater {
493                                 return false
494                         }
495                         r := request{pp.Integer(piece), cs}
496                         requests[r] = struct{}{}
497                         return true
498                 })
499         })
500         return requests, true
501 }
502
503 func (cn *connection) updateRequests() {
504         cn.writerCond.Broadcast()
505 }
506
507 func (cn *connection) desiredRequestState() (map[request]struct{}, bool) {
508         return nextRequestState(
509                 cn.t.networkingEnabled,
510                 cn.requests,
511                 cn.PeerChoked,
512                 &cn.pieceRequestOrder,
513                 func(piece int, f func(chunkSpec) bool) bool {
514                         return undirtiedChunks(piece, cn.t, f)
515                 },
516                 cn.requestsLowWater,
517                 cn.nominalMaxRequests(),
518         )
519 }
520
521 func undirtiedChunks(piece int, t *Torrent, f func(chunkSpec) bool) bool {
522         chunkIndices := t.pieces[piece].undirtiedChunkIndices().ToSortedSlice()
523         return iter.ForPerm(len(chunkIndices), func(i int) bool {
524                 return f(t.chunkIndexSpec(chunkIndices[i], piece))
525         })
526 }
527
528 func (cn *connection) stopRequestingPiece(piece int) {
529         cn.pieceRequestOrder.Remove(piece)
530         cn.writerCond.Broadcast()
531 }
532
533 // This is distinct from Torrent piece priority, which is the user's
534 // preference. Connection piece priority is specific to a connection,
535 // pseudorandomly avoids connections always requesting the same pieces and
536 // thus wasting effort.
537 func (cn *connection) updatePiecePriority(piece int) {
538         tpp := cn.t.piecePriority(piece)
539         if !cn.PeerHasPiece(piece) {
540                 tpp = PiecePriorityNone
541         }
542         if tpp == PiecePriorityNone {
543                 cn.stopRequestingPiece(piece)
544                 return
545         }
546         prio := cn.getPieceInclination()[piece]
547         switch tpp {
548         case PiecePriorityNormal:
549         case PiecePriorityReadahead:
550                 prio -= cn.t.numPieces()
551         case PiecePriorityNext, PiecePriorityNow:
552                 prio -= 2 * cn.t.numPieces()
553         default:
554                 panic(tpp)
555         }
556         prio += piece / 3
557         cn.pieceRequestOrder.Set(piece, prio)
558         cn.updateRequests()
559 }
560
561 func (cn *connection) getPieceInclination() []int {
562         if cn.pieceInclination == nil {
563                 cn.pieceInclination = cn.t.getConnPieceInclination()
564         }
565         return cn.pieceInclination
566 }
567
568 func (cn *connection) discardPieceInclination() {
569         if cn.pieceInclination == nil {
570                 return
571         }
572         cn.t.putPieceInclination(cn.pieceInclination)
573         cn.pieceInclination = nil
574 }
575
576 func (cn *connection) peerHasPieceChanged(piece int) {
577         cn.updatePiecePriority(piece)
578 }
579
580 func (cn *connection) peerPiecesChanged() {
581         if cn.t.haveInfo() {
582                 for i := range iter.N(cn.t.numPieces()) {
583                         cn.peerHasPieceChanged(i)
584                 }
585         }
586 }
587
588 func (cn *connection) raisePeerMinPieces(newMin int) {
589         if newMin > cn.peerMinPieces {
590                 cn.peerMinPieces = newMin
591         }
592 }
593
594 func (cn *connection) peerSentHave(piece int) error {
595         if cn.t.haveInfo() && piece >= cn.t.numPieces() {
596                 return errors.New("invalid piece")
597         }
598         if cn.PeerHasPiece(piece) {
599                 return nil
600         }
601         cn.raisePeerMinPieces(piece + 1)
602         cn.peerPieces.Set(piece, true)
603         cn.peerHasPieceChanged(piece)
604         return nil
605 }
606
607 func (cn *connection) peerSentBitfield(bf []bool) error {
608         cn.peerHasAll = false
609         if len(bf)%8 != 0 {
610                 panic("expected bitfield length divisible by 8")
611         }
612         // We know that the last byte means that at most the last 7 bits are
613         // wasted.
614         cn.raisePeerMinPieces(len(bf) - 7)
615         if cn.t.haveInfo() && len(bf) > cn.t.numPieces() {
616                 // Ignore known excess pieces.
617                 bf = bf[:cn.t.numPieces()]
618         }
619         for i, have := range bf {
620                 if have {
621                         cn.raisePeerMinPieces(i + 1)
622                 }
623                 cn.peerPieces.Set(i, have)
624         }
625         cn.peerPiecesChanged()
626         return nil
627 }
628
629 func (cn *connection) peerSentHaveAll() error {
630         cn.peerHasAll = true
631         cn.peerPieces.Clear()
632         cn.peerPiecesChanged()
633         return nil
634 }
635
636 func (cn *connection) peerSentHaveNone() error {
637         cn.peerPieces.Clear()
638         cn.peerHasAll = false
639         cn.peerPiecesChanged()
640         return nil
641 }
642
643 func (c *connection) requestPendingMetadata() {
644         if c.t.haveInfo() {
645                 return
646         }
647         if c.PeerExtensionIDs["ut_metadata"] == 0 {
648                 // Peer doesn't support this.
649                 return
650         }
651         // Request metadata pieces that we don't have in a random order.
652         var pending []int
653         for index := 0; index < c.t.metadataPieceCount(); index++ {
654                 if !c.t.haveMetadataPiece(index) && !c.requestedMetadataPiece(index) {
655                         pending = append(pending, index)
656                 }
657         }
658         for _, i := range rand.Perm(len(pending)) {
659                 c.requestMetadataPiece(pending[i])
660         }
661 }
662
663 func (cn *connection) wroteMsg(msg *pp.Message) {
664         cn.stats.wroteMsg(msg)
665         cn.t.stats.wroteMsg(msg)
666 }
667
668 func (cn *connection) readMsg(msg *pp.Message) {
669         cn.stats.readMsg(msg)
670         cn.t.stats.readMsg(msg)
671 }
672
673 func (cn *connection) wroteBytes(n int64) {
674         cn.stats.wroteBytes(n)
675         if cn.t != nil {
676                 cn.t.stats.wroteBytes(n)
677         }
678 }
679
680 func (cn *connection) readBytes(n int64) {
681         cn.stats.readBytes(n)
682         if cn.t != nil {
683                 cn.t.stats.readBytes(n)
684         }
685 }
686
687 // Returns whether the connection is currently useful to us. We're seeding and
688 // they want data, we don't have metainfo and they can provide it, etc.
689 func (c *connection) useful() bool {
690         t := c.t
691         if c.closed.IsSet() {
692                 return false
693         }
694         if !t.haveInfo() {
695                 return c.supportsExtension("ut_metadata")
696         }
697         if t.seeding() {
698                 return c.PeerInterested
699         }
700         return c.peerHasWantedPieces()
701 }
702
703 func (c *connection) lastHelpful() (ret time.Time) {
704         ret = c.lastUsefulChunkReceived
705         if c.t.seeding() && c.lastChunkSent.After(ret) {
706                 ret = c.lastChunkSent
707         }
708         return
709 }
710
711 // Processes incoming bittorrent messages. The client lock is held upon entry
712 // and exit. Returning will end the connection.
713 func (c *connection) mainReadLoop() error {
714         t := c.t
715         cl := t.cl
716
717         decoder := pp.Decoder{
718                 R:         bufio.NewReader(c.r),
719                 MaxLength: 256 * 1024,
720                 Pool:      t.chunkPool,
721         }
722         for {
723                 var (
724                         msg pp.Message
725                         err error
726                 )
727                 func() {
728                         cl.mu.Unlock()
729                         defer cl.mu.Lock()
730                         err = decoder.Decode(&msg)
731                 }()
732                 if cl.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
733                         return nil
734                 }
735                 if err != nil {
736                         return err
737                 }
738                 c.readMsg(&msg)
739                 c.lastMessageReceived = time.Now()
740                 if msg.Keepalive {
741                         receivedKeepalives.Add(1)
742                         continue
743                 }
744                 receivedMessageTypes.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
745                 switch msg.Type {
746                 case pp.Choke:
747                         c.PeerChoked = true
748                         c.requests = nil
749                         // We can then reset our interest.
750                         c.updateRequests()
751                 case pp.Reject:
752                         cl.connDeleteRequest(t, c, newRequest(msg.Index, msg.Begin, msg.Length))
753                         c.updateRequests()
754                 case pp.Unchoke:
755                         c.PeerChoked = false
756                         c.writerCond.Broadcast()
757                 case pp.Interested:
758                         c.PeerInterested = true
759                         c.upload()
760                 case pp.NotInterested:
761                         c.PeerInterested = false
762                         c.Choke()
763                 case pp.Have:
764                         err = c.peerSentHave(int(msg.Index))
765                 case pp.Request:
766                         if c.Choked {
767                                 break
768                         }
769                         if !c.PeerInterested {
770                                 err = errors.New("peer sent request but isn't interested")
771                                 break
772                         }
773                         if !t.havePiece(msg.Index.Int()) {
774                                 // This isn't necessarily them screwing up. We can drop pieces
775                                 // from our storage, and can't communicate this to peers
776                                 // except by reconnecting.
777                                 requestsReceivedForMissingPieces.Add(1)
778                                 err = errors.New("peer requested piece we don't have")
779                                 break
780                         }
781                         if c.PeerRequests == nil {
782                                 c.PeerRequests = make(map[request]struct{}, maxRequests)
783                         }
784                         c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{}
785                         c.upload()
786                 case pp.Cancel:
787                         req := newRequest(msg.Index, msg.Begin, msg.Length)
788                         if !c.PeerCancel(req) {
789                                 unexpectedCancels.Add(1)
790                         }
791                 case pp.Bitfield:
792                         err = c.peerSentBitfield(msg.Bitfield)
793                 case pp.HaveAll:
794                         err = c.peerSentHaveAll()
795                 case pp.HaveNone:
796                         err = c.peerSentHaveNone()
797                 case pp.Piece:
798                         c.receiveChunk(&msg)
799                         if len(msg.Piece) == int(t.chunkSize) {
800                                 t.chunkPool.Put(msg.Piece)
801                         }
802                 case pp.Extended:
803                         switch msg.ExtendedID {
804                         case pp.HandshakeExtendedID:
805                                 // TODO: Create a bencode struct for this.
806                                 var d map[string]interface{}
807                                 err = bencode.Unmarshal(msg.ExtendedPayload, &d)
808                                 if err != nil {
809                                         err = fmt.Errorf("error decoding extended message payload: %s", err)
810                                         break
811                                 }
812                                 // log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d)
813                                 if reqq, ok := d["reqq"]; ok {
814                                         if i, ok := reqq.(int64); ok {
815                                                 c.PeerMaxRequests = int(i)
816                                         }
817                                 }
818                                 if v, ok := d["v"]; ok {
819                                         c.PeerClientName = v.(string)
820                                 }
821                                 m, ok := d["m"]
822                                 if !ok {
823                                         err = errors.New("handshake missing m item")
824                                         break
825                                 }
826                                 mTyped, ok := m.(map[string]interface{})
827                                 if !ok {
828                                         err = errors.New("handshake m value is not dict")
829                                         break
830                                 }
831                                 if c.PeerExtensionIDs == nil {
832                                         c.PeerExtensionIDs = make(map[string]byte, len(mTyped))
833                                 }
834                                 for name, v := range mTyped {
835                                         id, ok := v.(int64)
836                                         if !ok {
837                                                 log.Printf("bad handshake m item extension ID type: %T", v)
838                                                 continue
839                                         }
840                                         if id == 0 {
841                                                 delete(c.PeerExtensionIDs, name)
842                                         } else {
843                                                 if c.PeerExtensionIDs[name] == 0 {
844                                                         supportedExtensionMessages.Add(name, 1)
845                                                 }
846                                                 c.PeerExtensionIDs[name] = byte(id)
847                                         }
848                                 }
849                                 metadata_sizeUntyped, ok := d["metadata_size"]
850                                 if ok {
851                                         metadata_size, ok := metadata_sizeUntyped.(int64)
852                                         if !ok {
853                                                 log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
854                                         } else {
855                                                 err = t.setMetadataSize(metadata_size)
856                                                 if err != nil {
857                                                         err = fmt.Errorf("error setting metadata size to %d", metadata_size)
858                                                         break
859                                                 }
860                                         }
861                                 }
862                                 if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
863                                         c.requestPendingMetadata()
864                                 }
865                         case metadataExtendedId:
866                                 err = cl.gotMetadataExtensionMsg(msg.ExtendedPayload, t, c)
867                                 if err != nil {
868                                         err = fmt.Errorf("error handling metadata extension message: %s", err)
869                                 }
870                         case pexExtendedId:
871                                 if cl.config.DisablePEX {
872                                         break
873                                 }
874                                 var pexMsg peerExchangeMessage
875                                 err = bencode.Unmarshal(msg.ExtendedPayload, &pexMsg)
876                                 if err != nil {
877                                         err = fmt.Errorf("error unmarshalling PEX message: %s", err)
878                                         break
879                                 }
880                                 go func() {
881                                         cl.mu.Lock()
882                                         t.addPeers(func() (ret []Peer) {
883                                                 for i, cp := range pexMsg.Added {
884                                                         p := Peer{
885                                                                 IP:     make([]byte, 4),
886                                                                 Port:   cp.Port,
887                                                                 Source: peerSourcePEX,
888                                                         }
889                                                         if i < len(pexMsg.AddedFlags) && pexMsg.AddedFlags[i]&0x01 != 0 {
890                                                                 p.SupportsEncryption = true
891                                                         }
892                                                         missinggo.CopyExact(p.IP, cp.IP[:])
893                                                         ret = append(ret, p)
894                                                 }
895                                                 return
896                                         }())
897                                         cl.mu.Unlock()
898                                 }()
899                         default:
900                                 err = fmt.Errorf("unexpected extended message ID: %v", msg.ExtendedID)
901                         }
902                         if err != nil {
903                                 // That client uses its own extension IDs for outgoing message
904                                 // types, which is incorrect.
905                                 if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) ||
906                                         strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
907                                         return nil
908                                 }
909                         }
910                 case pp.Port:
911                         if cl.dHT == nil {
912                                 break
913                         }
914                         pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
915                         if err != nil {
916                                 panic(err)
917                         }
918                         if msg.Port != 0 {
919                                 pingAddr.Port = int(msg.Port)
920                         }
921                         go cl.dHT.Ping(pingAddr, nil)
922                 default:
923                         err = fmt.Errorf("received unknown message type: %#v", msg.Type)
924                 }
925                 if err != nil {
926                         return err
927                 }
928         }
929 }
930
931 // Set both the Reader and Writer for the connection from a single ReadWriter.
932 func (cn *connection) setRW(rw io.ReadWriter) {
933         cn.r = rw
934         cn.w = rw
935 }
936
937 // Returns the Reader and Writer as a combined ReadWriter.
938 func (cn *connection) rw() io.ReadWriter {
939         return struct {
940                 io.Reader
941                 io.Writer
942         }{cn.r, cn.w}
943 }
944
945 // Handle a received chunk from a peer.
946 func (c *connection) receiveChunk(msg *pp.Message) {
947         t := c.t
948         cl := t.cl
949         chunksReceived.Add(1)
950
951         req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
952
953         // Request has been satisfied.
954         if cl.connDeleteRequest(t, c, req) {
955                 defer c.updateRequests()
956         } else {
957                 unexpectedChunksReceived.Add(1)
958         }
959
960         // Do we actually want this chunk?
961         if !t.wantPiece(req) {
962                 unwantedChunksReceived.Add(1)
963                 c.UnwantedChunksReceived++
964                 return
965         }
966
967         index := int(req.Index)
968         piece := &t.pieces[index]
969
970         c.UsefulChunksReceived++
971         c.lastUsefulChunkReceived = time.Now()
972
973         c.upload()
974
975         // Need to record that it hasn't been written yet, before we attempt to do
976         // anything with it.
977         piece.incrementPendingWrites()
978         // Record that we have the chunk.
979         piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
980
981         // Cancel pending requests for this chunk.
982         for c := range t.conns {
983                 c.updateRequests()
984         }
985
986         cl.mu.Unlock()
987         // Write the chunk out. Note that the upper bound on chunk writing
988         // concurrency will be the number of connections.
989         err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
990         cl.mu.Lock()
991
992         piece.decrementPendingWrites()
993
994         if err != nil {
995                 log.Printf("%s (%x): error writing chunk %v: %s", t, t.infoHash, req, err)
996                 t.pendRequest(req)
997                 t.updatePieceCompletion(int(msg.Index))
998                 return
999         }
1000
1001         // It's important that the piece is potentially queued before we check if
1002         // the piece is still wanted, because if it is queued, it won't be wanted.
1003         if t.pieceAllDirty(index) {
1004                 t.queuePieceCheck(int(req.Index))
1005         }
1006
1007         if c.peerTouchedPieces == nil {
1008                 c.peerTouchedPieces = make(map[int]struct{})
1009         }
1010         c.peerTouchedPieces[index] = struct{}{}
1011
1012         cl.event.Broadcast()
1013         t.publishPieceChange(int(req.Index))
1014         return
1015 }
1016
1017 // Also handles choking and unchoking of the remote peer.
1018 func (c *connection) upload() {
1019         t := c.t
1020         cl := t.cl
1021         if cl.config.NoUpload {
1022                 return
1023         }
1024         if !c.PeerInterested {
1025                 return
1026         }
1027         seeding := t.seeding()
1028         if !seeding && !c.peerHasWantedPieces() {
1029                 // There's no reason to upload to this peer.
1030                 return
1031         }
1032         // Breaking or completing this loop means we don't want to upload to the
1033         // peer anymore, and we choke them.
1034 another:
1035         for seeding || c.chunksSent < c.UsefulChunksReceived+6 {
1036                 // We want to upload to the peer.
1037                 c.Unchoke()
1038                 for r := range c.PeerRequests {
1039                         res := cl.uploadLimit.ReserveN(time.Now(), int(r.Length))
1040                         delay := res.Delay()
1041                         if delay > 0 {
1042                                 res.Cancel()
1043                                 go func() {
1044                                         time.Sleep(delay)
1045                                         cl.mu.Lock()
1046                                         defer cl.mu.Unlock()
1047                                         c.upload()
1048                                 }()
1049                                 return
1050                         }
1051                         err := cl.sendChunk(t, c, r)
1052                         if err != nil {
1053                                 i := int(r.Index)
1054                                 if t.pieceComplete(i) {
1055                                         t.updatePieceCompletion(i)
1056                                         if !t.pieceComplete(i) {
1057                                                 // We had the piece, but not anymore.
1058                                                 break another
1059                                         }
1060                                 }
1061                                 log.Printf("error sending chunk %+v to peer: %s", r, err)
1062                                 // If we failed to send a chunk, choke the peer to ensure they
1063                                 // flush all their requests. We've probably dropped a piece,
1064                                 // but there's no way to communicate this to the peer. If they
1065                                 // ask for it again, we'll kick them to allow us to send them
1066                                 // an updated bitfield.
1067                                 break another
1068                         }
1069                         delete(c.PeerRequests, r)
1070                         goto another
1071                 }
1072                 return
1073         }
1074         c.Choke()
1075 }
1076
1077 func (cn *connection) Drop() {
1078         cn.t.dropConnection(cn)
1079 }
1080
1081 func (cn *connection) netGoodPiecesDirtied() int {
1082         return cn.goodPiecesDirtied - cn.badPiecesDirtied
1083 }
1084
1085 func (c *connection) peerHasWantedPieces() bool {
1086         return !c.pieceRequestOrder.IsEmpty()
1087 }
1088
1089 func (c *connection) numLocalRequests() int {
1090         return len(c.requests)
1091 }