]> Sergey Matveev's repositories - btrtrc.git/blob - connection.go
Track buffered but not posted messages, and unify the expvar names for those counters
[btrtrc.git] / connection.go
1 package torrent
2
3 import (
4         "bufio"
5         "bytes"
6         "errors"
7         "fmt"
8         "io"
9         "log"
10         "math/rand"
11         "net"
12         "strconv"
13         "strings"
14         "sync"
15         "time"
16
17         "github.com/anacrolix/missinggo"
18         "github.com/anacrolix/missinggo/bitmap"
19         "github.com/anacrolix/missinggo/iter"
20         "github.com/anacrolix/missinggo/prioritybitmap"
21
22         "github.com/anacrolix/torrent/bencode"
23         pp "github.com/anacrolix/torrent/peer_protocol"
24 )
25
26 type peerSource string
27
28 const (
29         peerSourceTracker         = "T" // It's the default.
30         peerSourceIncoming        = "I"
31         peerSourceDHTGetPeers     = "Hg"
32         peerSourceDHTAnnouncePeer = "Ha"
33         peerSourcePEX             = "X"
34 )
35
36 // Maintains the state of a connection with a peer.
37 type connection struct {
38         t *Torrent
39         // The actual Conn, used for closing, and setting socket options.
40         conn net.Conn
41         // The Reader and Writer for this Conn, with hooks installed for stats,
42         // limiting, deadlines etc.
43         w io.Writer
44         r io.Reader
45         // True if the connection is operating over MSE obfuscation.
46         encrypted bool
47         Discovery peerSource
48         uTP       bool
49         closed    missinggo.Event
50
51         stats                  ConnStats
52         UnwantedChunksReceived int
53         UsefulChunksReceived   int
54         chunksSent             int
55         goodPiecesDirtied      int
56         badPiecesDirtied       int
57
58         lastMessageReceived     time.Time
59         completedHandshake      time.Time
60         lastUsefulChunkReceived time.Time
61         lastChunkSent           time.Time
62
63         // Stuff controlled by the local peer.
64         Interested       bool
65         Choked           bool
66         requests         map[request]struct{}
67         requestsLowWater int
68         // Indexed by metadata piece, set to true if posted and pending a
69         // response.
70         metadataRequests []bool
71         sentHaves        []bool
72
73         // Stuff controlled by the remote peer.
74         PeerID             [20]byte
75         PeerInterested     bool
76         PeerChoked         bool
77         PeerRequests       map[request]struct{}
78         PeerExtensionBytes peerExtensionBytes
79         // The pieces the peer has claimed to have.
80         peerPieces bitmap.Bitmap
81         // The peer has everything. This can occur due to a special message, when
82         // we may not even know the number of pieces in the torrent yet.
83         peerHasAll bool
84         // The highest possible number of pieces the torrent could have based on
85         // communication with the peer. Generally only useful until we have the
86         // torrent info.
87         peerMinPieces int
88         // Pieces we've accepted chunks for from the peer.
89         peerTouchedPieces map[int]struct{}
90
91         PeerMaxRequests  int // Maximum pending requests the peer allows.
92         PeerExtensionIDs map[string]byte
93         PeerClientName   string
94
95         pieceInclination  []int
96         pieceRequestOrder prioritybitmap.PriorityBitmap
97
98         postedBuffer bytes.Buffer
99         writerCond   sync.Cond
100 }
101
102 func (cn *connection) mu() sync.Locker {
103         return &cn.t.cl.mu
104 }
105
106 func (cn *connection) remoteAddr() net.Addr {
107         return cn.conn.RemoteAddr()
108 }
109
110 func (cn *connection) localAddr() net.Addr {
111         return cn.conn.LocalAddr()
112 }
113
114 func (cn *connection) supportsExtension(ext string) bool {
115         _, ok := cn.PeerExtensionIDs[ext]
116         return ok
117 }
118
119 // The best guess at number of pieces in the torrent for this peer.
120 func (cn *connection) bestPeerNumPieces() int {
121         if cn.t.haveInfo() {
122                 return cn.t.numPieces()
123         }
124         return cn.peerMinPieces
125 }
126
127 func (cn *connection) completedString() string {
128         return fmt.Sprintf("%d/%d", cn.peerPieces.Len(), cn.bestPeerNumPieces())
129 }
130
131 // Correct the PeerPieces slice length. Return false if the existing slice is
132 // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
133 // messages.
134 func (cn *connection) setNumPieces(num int) error {
135         cn.peerPieces.RemoveRange(num, -1)
136         cn.peerPiecesChanged()
137         return nil
138 }
139
140 func eventAgeString(t time.Time) string {
141         if t.IsZero() {
142                 return "never"
143         }
144         return fmt.Sprintf("%.2fs ago", time.Now().Sub(t).Seconds())
145 }
146
147 func (cn *connection) connectionFlags() (ret string) {
148         c := func(b byte) {
149                 ret += string([]byte{b})
150         }
151         if cn.encrypted {
152                 c('E')
153         }
154         ret += string(cn.Discovery)
155         if cn.uTP {
156                 c('T')
157         }
158         return
159 }
160
161 // Inspired by https://trac.transmissionbt.com/wiki/PeerStatusText
162 func (cn *connection) statusFlags() (ret string) {
163         c := func(b byte) {
164                 ret += string([]byte{b})
165         }
166         if cn.Interested {
167                 c('i')
168         }
169         if cn.Choked {
170                 c('c')
171         }
172         c('-')
173         ret += cn.connectionFlags()
174         c('-')
175         if cn.PeerInterested {
176                 c('i')
177         }
178         if cn.PeerChoked {
179                 c('c')
180         }
181         return
182 }
183
184 func (cn *connection) String() string {
185         var buf bytes.Buffer
186         cn.WriteStatus(&buf, nil)
187         return buf.String()
188 }
189
190 func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
191         // \t isn't preserved in <pre> blocks?
192         fmt.Fprintf(w, "%+q: %s-%s\n", cn.PeerID, cn.localAddr(), cn.remoteAddr())
193         fmt.Fprintf(w, "    last msg: %s, connected: %s, last useful chunk: %s\n",
194                 eventAgeString(cn.lastMessageReceived),
195                 eventAgeString(cn.completedHandshake),
196                 eventAgeString(cn.lastUsefulChunkReceived))
197         fmt.Fprintf(w,
198                 "    %s completed, %d pieces touched, good chunks: %d/%d-%d reqq: %d-%d, flags: %s\n",
199                 cn.completedString(),
200                 len(cn.peerTouchedPieces),
201                 cn.UsefulChunksReceived,
202                 cn.UnwantedChunksReceived+cn.UsefulChunksReceived,
203                 cn.chunksSent,
204                 cn.numLocalRequests(),
205                 len(cn.PeerRequests),
206                 cn.statusFlags(),
207         )
208         fmt.Fprintf(w, "    next pieces: %v\n", priorityBitmapHeadAsSlice(&cn.pieceRequestOrder, 10))
209 }
210
211 func priorityBitmapHeadAsSlice(pb *prioritybitmap.PriorityBitmap, n int) (ret []int) {
212         pb.IterTyped(func(i int) bool {
213                 if len(ret) >= n {
214                         return false
215                 }
216                 ret = append(ret, i)
217                 return true
218         })
219         return
220 }
221
222 func (cn *connection) Close() {
223         cn.closed.Set()
224         cn.discardPieceInclination()
225         cn.pieceRequestOrder.Clear()
226         if cn.conn != nil {
227                 // TODO: This call blocks sometimes, why?
228                 go cn.conn.Close()
229         }
230 }
231
232 func (cn *connection) PeerHasPiece(piece int) bool {
233         return cn.peerHasAll || cn.peerPieces.Contains(piece)
234 }
235
236 func (cn *connection) Post(msg pp.Message) {
237         messageTypesPosted.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
238         cn.postedBuffer.Write(msg.MustMarshalBinary())
239         cn.writerCond.Broadcast()
240 }
241
242 func (cn *connection) RequestPending(r request) bool {
243         _, ok := cn.requests[r]
244         return ok
245 }
246
247 func (cn *connection) requestMetadataPiece(index int) {
248         eID := cn.PeerExtensionIDs["ut_metadata"]
249         if eID == 0 {
250                 return
251         }
252         if index < len(cn.metadataRequests) && cn.metadataRequests[index] {
253                 return
254         }
255         cn.Post(pp.Message{
256                 Type:       pp.Extended,
257                 ExtendedID: eID,
258                 ExtendedPayload: func() []byte {
259                         b, err := bencode.Marshal(map[string]int{
260                                 "msg_type": pp.RequestMetadataExtensionMsgType,
261                                 "piece":    index,
262                         })
263                         if err != nil {
264                                 panic(err)
265                         }
266                         return b
267                 }(),
268         })
269         for index >= len(cn.metadataRequests) {
270                 cn.metadataRequests = append(cn.metadataRequests, false)
271         }
272         cn.metadataRequests[index] = true
273 }
274
275 func (cn *connection) requestedMetadataPiece(index int) bool {
276         return index < len(cn.metadataRequests) && cn.metadataRequests[index]
277 }
278
279 // The actual value to use as the maximum outbound requests.
280 func (cn *connection) nominalMaxRequests() (ret int) {
281         ret = cn.PeerMaxRequests
282         if ret > 64 {
283                 ret = 64
284         }
285         return
286 }
287
288 // Returns true if an unsatisfied request was canceled.
289 func (cn *connection) PeerCancel(r request) bool {
290         if cn.PeerRequests == nil {
291                 return false
292         }
293         if _, ok := cn.PeerRequests[r]; !ok {
294                 return false
295         }
296         delete(cn.PeerRequests, r)
297         return true
298 }
299
300 func (cn *connection) Choke() {
301         if cn.Choked {
302                 return
303         }
304         cn.Post(pp.Message{
305                 Type: pp.Choke,
306         })
307         cn.PeerRequests = nil
308         cn.Choked = true
309 }
310
311 func (cn *connection) Unchoke() {
312         if !cn.Choked {
313                 return
314         }
315         cn.Post(pp.Message{
316                 Type: pp.Unchoke,
317         })
318         cn.Choked = false
319 }
320
321 func (cn *connection) SetInterested(interested bool, msg func(pp.Message) bool) bool {
322         if cn.Interested == interested {
323                 return true
324         }
325         cn.Interested = interested
326         // log.Printf("%p: setting interest: %v", cn, interested)
327         return msg(pp.Message{
328                 Type: func() pp.MessageType {
329                         if interested {
330                                 return pp.Interested
331                         } else {
332                                 return pp.NotInterested
333                         }
334                 }(),
335         })
336 }
337
338 func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) {
339         numFillBuffers.Add(1)
340         rs, i := cn.desiredRequestState()
341         if !cn.SetInterested(i, msg) {
342                 return
343         }
344         sentCancels := false
345         for r := range cn.requests {
346                 if _, ok := rs[r]; !ok {
347                         sentCancels = true
348                         delete(cn.requests, r)
349                         // log.Printf("%p: cancelling request: %v", cn, r)
350                         if !msg(pp.Message{
351                                 Type:   pp.Cancel,
352                                 Index:  r.Index,
353                                 Begin:  r.Begin,
354                                 Length: r.Length,
355                         }) {
356                                 return
357                         }
358                 }
359         }
360         if sentCancels {
361                 fillBufferSentCancels.Add(1)
362         }
363         sentRequests := false
364         for r := range rs {
365                 if _, ok := cn.requests[r]; !ok {
366                         if cn.requests == nil {
367                                 cn.requests = make(map[request]struct{}, cn.nominalMaxRequests())
368                         }
369                         cn.requests[r] = struct{}{}
370                         sentRequests = true
371                         // log.Printf("%p: requesting %v", cn, r)
372                         if !msg(pp.Message{
373                                 Type:   pp.Request,
374                                 Index:  r.Index,
375                                 Begin:  r.Begin,
376                                 Length: r.Length,
377                         }) {
378                                 return
379                         }
380                 }
381         }
382         if sentRequests {
383                 fillBufferSentRequests.Add(1)
384         }
385 }
386
387 // Writes buffers to the socket from the write channel.
388 func (cn *connection) writer(keepAliveTimeout time.Duration) {
389         var (
390                 buf       bytes.Buffer
391                 lastWrite time.Time = time.Now()
392         )
393         var keepAliveTimer *time.Timer
394         keepAliveTimer = time.AfterFunc(keepAliveTimeout, func() {
395                 cn.mu().Lock()
396                 defer cn.mu().Unlock()
397                 if time.Since(lastWrite) >= keepAliveTimeout {
398                         cn.writerCond.Broadcast()
399                 }
400                 keepAliveTimer.Reset(keepAliveTimeout)
401         })
402         cn.mu().Lock()
403         defer cn.mu().Unlock()
404         defer cn.Close()
405         defer keepAliveTimer.Stop()
406         for {
407                 buf.Write(cn.postedBuffer.Bytes())
408                 cn.postedBuffer.Reset()
409                 if buf.Len() == 0 {
410                         cn.fillWriteBuffer(func(msg pp.Message) bool {
411                                 cn.wroteMsg(&msg)
412                                 buf.Write(msg.MustMarshalBinary())
413                                 return buf.Len() < 1<<16
414                         })
415                 }
416                 if buf.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout {
417                         buf.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
418                         postedKeepalives.Add(1)
419                 }
420                 if buf.Len() == 0 {
421                         cn.writerCond.Wait()
422                         continue
423                 }
424                 cn.mu().Unlock()
425                 // log.Printf("writing %d bytes", buf.Len())
426                 n, err := cn.w.Write(buf.Bytes())
427                 cn.mu().Lock()
428                 if n != 0 {
429                         lastWrite = time.Now()
430                         keepAliveTimer.Reset(keepAliveTimeout)
431                 }
432                 if err != nil {
433                         return
434                 }
435                 if n != buf.Len() {
436                         panic("short write")
437                 }
438                 buf.Reset()
439         }
440 }
441
442 func (cn *connection) Have(piece int) {
443         for piece >= len(cn.sentHaves) {
444                 cn.sentHaves = append(cn.sentHaves, false)
445         }
446         if cn.sentHaves[piece] {
447                 return
448         }
449         cn.Post(pp.Message{
450                 Type:  pp.Have,
451                 Index: pp.Integer(piece),
452         })
453         cn.sentHaves[piece] = true
454 }
455
456 func (cn *connection) Bitfield(haves []bool) {
457         if cn.sentHaves != nil {
458                 panic("bitfield must be first have-related message sent")
459         }
460         cn.Post(pp.Message{
461                 Type:     pp.Bitfield,
462                 Bitfield: haves,
463         })
464         // Make a copy of haves, as that's read when the message is marshalled
465         // without the lock. Also it obviously shouldn't change in the Msg due to
466         // changes in .sentHaves.
467         cn.sentHaves = append([]bool(nil), haves...)
468 }
469
470 func nextRequestState(
471         networkingEnabled bool,
472         currentRequests map[request]struct{},
473         peerChoking bool,
474         nextPieces *prioritybitmap.PriorityBitmap,
475         pendingChunks func(piece int, f func(chunkSpec) bool) bool,
476         requestsLowWater int,
477         requestsHighWater int,
478 ) (
479         requests map[request]struct{},
480         interested bool,
481 ) {
482         if !networkingEnabled || nextPieces.IsEmpty() {
483                 return nil, false
484         }
485         if peerChoking || len(currentRequests) > requestsLowWater {
486                 return currentRequests, true
487         }
488         requests = make(map[request]struct{}, requestsHighWater)
489         for r := range currentRequests {
490                 requests[r] = struct{}{}
491         }
492         nextPieces.IterTyped(func(piece int) bool {
493                 return pendingChunks(piece, func(cs chunkSpec) bool {
494                         if len(requests) >= requestsHighWater {
495                                 return false
496                         }
497                         r := request{pp.Integer(piece), cs}
498                         requests[r] = struct{}{}
499                         return true
500                 })
501         })
502         return requests, true
503 }
504
505 func (cn *connection) updateRequests() {
506         cn.writerCond.Broadcast()
507 }
508
509 func (cn *connection) desiredRequestState() (map[request]struct{}, bool) {
510         return nextRequestState(
511                 cn.t.networkingEnabled,
512                 cn.requests,
513                 cn.PeerChoked,
514                 &cn.pieceRequestOrder,
515                 func(piece int, f func(chunkSpec) bool) bool {
516                         return undirtiedChunks(piece, cn.t, f)
517                 },
518                 cn.requestsLowWater,
519                 cn.nominalMaxRequests(),
520         )
521 }
522
523 func undirtiedChunks(piece int, t *Torrent, f func(chunkSpec) bool) bool {
524         chunkIndices := t.pieces[piece].undirtiedChunkIndices().ToSortedSlice()
525         return iter.ForPerm(len(chunkIndices), func(i int) bool {
526                 return f(t.chunkIndexSpec(chunkIndices[i], piece))
527         })
528 }
529
530 func (cn *connection) stopRequestingPiece(piece int) {
531         cn.pieceRequestOrder.Remove(piece)
532         cn.writerCond.Broadcast()
533 }
534
535 // This is distinct from Torrent piece priority, which is the user's
536 // preference. Connection piece priority is specific to a connection,
537 // pseudorandomly avoids connections always requesting the same pieces and
538 // thus wasting effort.
539 func (cn *connection) updatePiecePriority(piece int) {
540         tpp := cn.t.piecePriority(piece)
541         if !cn.PeerHasPiece(piece) {
542                 tpp = PiecePriorityNone
543         }
544         if tpp == PiecePriorityNone {
545                 cn.stopRequestingPiece(piece)
546                 return
547         }
548         prio := cn.getPieceInclination()[piece]
549         switch tpp {
550         case PiecePriorityNormal:
551         case PiecePriorityReadahead:
552                 prio -= cn.t.numPieces()
553         case PiecePriorityNext, PiecePriorityNow:
554                 prio -= 2 * cn.t.numPieces()
555         default:
556                 panic(tpp)
557         }
558         prio += piece / 3
559         cn.pieceRequestOrder.Set(piece, prio)
560         cn.updateRequests()
561 }
562
563 func (cn *connection) getPieceInclination() []int {
564         if cn.pieceInclination == nil {
565                 cn.pieceInclination = cn.t.getConnPieceInclination()
566         }
567         return cn.pieceInclination
568 }
569
570 func (cn *connection) discardPieceInclination() {
571         if cn.pieceInclination == nil {
572                 return
573         }
574         cn.t.putPieceInclination(cn.pieceInclination)
575         cn.pieceInclination = nil
576 }
577
578 func (cn *connection) peerHasPieceChanged(piece int) {
579         cn.updatePiecePriority(piece)
580 }
581
582 func (cn *connection) peerPiecesChanged() {
583         if cn.t.haveInfo() {
584                 for i := range iter.N(cn.t.numPieces()) {
585                         cn.peerHasPieceChanged(i)
586                 }
587         }
588 }
589
590 func (cn *connection) raisePeerMinPieces(newMin int) {
591         if newMin > cn.peerMinPieces {
592                 cn.peerMinPieces = newMin
593         }
594 }
595
596 func (cn *connection) peerSentHave(piece int) error {
597         if cn.t.haveInfo() && piece >= cn.t.numPieces() {
598                 return errors.New("invalid piece")
599         }
600         if cn.PeerHasPiece(piece) {
601                 return nil
602         }
603         cn.raisePeerMinPieces(piece + 1)
604         cn.peerPieces.Set(piece, true)
605         cn.peerHasPieceChanged(piece)
606         return nil
607 }
608
609 func (cn *connection) peerSentBitfield(bf []bool) error {
610         cn.peerHasAll = false
611         if len(bf)%8 != 0 {
612                 panic("expected bitfield length divisible by 8")
613         }
614         // We know that the last byte means that at most the last 7 bits are
615         // wasted.
616         cn.raisePeerMinPieces(len(bf) - 7)
617         if cn.t.haveInfo() && len(bf) > cn.t.numPieces() {
618                 // Ignore known excess pieces.
619                 bf = bf[:cn.t.numPieces()]
620         }
621         for i, have := range bf {
622                 if have {
623                         cn.raisePeerMinPieces(i + 1)
624                 }
625                 cn.peerPieces.Set(i, have)
626         }
627         cn.peerPiecesChanged()
628         return nil
629 }
630
631 func (cn *connection) peerSentHaveAll() error {
632         cn.peerHasAll = true
633         cn.peerPieces.Clear()
634         cn.peerPiecesChanged()
635         return nil
636 }
637
638 func (cn *connection) peerSentHaveNone() error {
639         cn.peerPieces.Clear()
640         cn.peerHasAll = false
641         cn.peerPiecesChanged()
642         return nil
643 }
644
645 func (c *connection) requestPendingMetadata() {
646         if c.t.haveInfo() {
647                 return
648         }
649         if c.PeerExtensionIDs["ut_metadata"] == 0 {
650                 // Peer doesn't support this.
651                 return
652         }
653         // Request metadata pieces that we don't have in a random order.
654         var pending []int
655         for index := 0; index < c.t.metadataPieceCount(); index++ {
656                 if !c.t.haveMetadataPiece(index) && !c.requestedMetadataPiece(index) {
657                         pending = append(pending, index)
658                 }
659         }
660         for _, i := range rand.Perm(len(pending)) {
661                 c.requestMetadataPiece(pending[i])
662         }
663 }
664
665 func (cn *connection) wroteMsg(msg *pp.Message) {
666         messageTypesSent.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
667         cn.stats.wroteMsg(msg)
668         cn.t.stats.wroteMsg(msg)
669 }
670
671 func (cn *connection) readMsg(msg *pp.Message) {
672         cn.stats.readMsg(msg)
673         cn.t.stats.readMsg(msg)
674 }
675
676 func (cn *connection) wroteBytes(n int64) {
677         cn.stats.wroteBytes(n)
678         if cn.t != nil {
679                 cn.t.stats.wroteBytes(n)
680         }
681 }
682
683 func (cn *connection) readBytes(n int64) {
684         cn.stats.readBytes(n)
685         if cn.t != nil {
686                 cn.t.stats.readBytes(n)
687         }
688 }
689
690 // Returns whether the connection is currently useful to us. We're seeding and
691 // they want data, we don't have metainfo and they can provide it, etc.
692 func (c *connection) useful() bool {
693         t := c.t
694         if c.closed.IsSet() {
695                 return false
696         }
697         if !t.haveInfo() {
698                 return c.supportsExtension("ut_metadata")
699         }
700         if t.seeding() {
701                 return c.PeerInterested
702         }
703         return c.peerHasWantedPieces()
704 }
705
706 func (c *connection) lastHelpful() (ret time.Time) {
707         ret = c.lastUsefulChunkReceived
708         if c.t.seeding() && c.lastChunkSent.After(ret) {
709                 ret = c.lastChunkSent
710         }
711         return
712 }
713
714 // Processes incoming bittorrent messages. The client lock is held upon entry
715 // and exit. Returning will end the connection.
716 func (c *connection) mainReadLoop() error {
717         t := c.t
718         cl := t.cl
719
720         decoder := pp.Decoder{
721                 R:         bufio.NewReader(c.r),
722                 MaxLength: 256 * 1024,
723                 Pool:      t.chunkPool,
724         }
725         for {
726                 var (
727                         msg pp.Message
728                         err error
729                 )
730                 func() {
731                         cl.mu.Unlock()
732                         defer cl.mu.Lock()
733                         err = decoder.Decode(&msg)
734                 }()
735                 if cl.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
736                         return nil
737                 }
738                 if err != nil {
739                         return err
740                 }
741                 c.readMsg(&msg)
742                 c.lastMessageReceived = time.Now()
743                 if msg.Keepalive {
744                         receivedKeepalives.Add(1)
745                         continue
746                 }
747                 messageTypesReceived.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
748                 switch msg.Type {
749                 case pp.Choke:
750                         c.PeerChoked = true
751                         c.requests = nil
752                         // We can then reset our interest.
753                         c.updateRequests()
754                 case pp.Reject:
755                         cl.connDeleteRequest(t, c, newRequest(msg.Index, msg.Begin, msg.Length))
756                         c.updateRequests()
757                 case pp.Unchoke:
758                         c.PeerChoked = false
759                         c.writerCond.Broadcast()
760                 case pp.Interested:
761                         c.PeerInterested = true
762                         c.upload()
763                 case pp.NotInterested:
764                         c.PeerInterested = false
765                         c.Choke()
766                 case pp.Have:
767                         err = c.peerSentHave(int(msg.Index))
768                 case pp.Request:
769                         if c.Choked {
770                                 break
771                         }
772                         if !c.PeerInterested {
773                                 err = errors.New("peer sent request but isn't interested")
774                                 break
775                         }
776                         if !t.havePiece(msg.Index.Int()) {
777                                 // This isn't necessarily them screwing up. We can drop pieces
778                                 // from our storage, and can't communicate this to peers
779                                 // except by reconnecting.
780                                 requestsReceivedForMissingPieces.Add(1)
781                                 err = errors.New("peer requested piece we don't have")
782                                 break
783                         }
784                         if c.PeerRequests == nil {
785                                 c.PeerRequests = make(map[request]struct{}, maxRequests)
786                         }
787                         c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{}
788                         c.upload()
789                 case pp.Cancel:
790                         req := newRequest(msg.Index, msg.Begin, msg.Length)
791                         if !c.PeerCancel(req) {
792                                 unexpectedCancels.Add(1)
793                         }
794                 case pp.Bitfield:
795                         err = c.peerSentBitfield(msg.Bitfield)
796                 case pp.HaveAll:
797                         err = c.peerSentHaveAll()
798                 case pp.HaveNone:
799                         err = c.peerSentHaveNone()
800                 case pp.Piece:
801                         c.receiveChunk(&msg)
802                         if len(msg.Piece) == int(t.chunkSize) {
803                                 t.chunkPool.Put(msg.Piece)
804                         }
805                 case pp.Extended:
806                         switch msg.ExtendedID {
807                         case pp.HandshakeExtendedID:
808                                 // TODO: Create a bencode struct for this.
809                                 var d map[string]interface{}
810                                 err = bencode.Unmarshal(msg.ExtendedPayload, &d)
811                                 if err != nil {
812                                         err = fmt.Errorf("error decoding extended message payload: %s", err)
813                                         break
814                                 }
815                                 // log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d)
816                                 if reqq, ok := d["reqq"]; ok {
817                                         if i, ok := reqq.(int64); ok {
818                                                 c.PeerMaxRequests = int(i)
819                                         }
820                                 }
821                                 if v, ok := d["v"]; ok {
822                                         c.PeerClientName = v.(string)
823                                 }
824                                 m, ok := d["m"]
825                                 if !ok {
826                                         err = errors.New("handshake missing m item")
827                                         break
828                                 }
829                                 mTyped, ok := m.(map[string]interface{})
830                                 if !ok {
831                                         err = errors.New("handshake m value is not dict")
832                                         break
833                                 }
834                                 if c.PeerExtensionIDs == nil {
835                                         c.PeerExtensionIDs = make(map[string]byte, len(mTyped))
836                                 }
837                                 for name, v := range mTyped {
838                                         id, ok := v.(int64)
839                                         if !ok {
840                                                 log.Printf("bad handshake m item extension ID type: %T", v)
841                                                 continue
842                                         }
843                                         if id == 0 {
844                                                 delete(c.PeerExtensionIDs, name)
845                                         } else {
846                                                 if c.PeerExtensionIDs[name] == 0 {
847                                                         supportedExtensionMessages.Add(name, 1)
848                                                 }
849                                                 c.PeerExtensionIDs[name] = byte(id)
850                                         }
851                                 }
852                                 metadata_sizeUntyped, ok := d["metadata_size"]
853                                 if ok {
854                                         metadata_size, ok := metadata_sizeUntyped.(int64)
855                                         if !ok {
856                                                 log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
857                                         } else {
858                                                 err = t.setMetadataSize(metadata_size)
859                                                 if err != nil {
860                                                         err = fmt.Errorf("error setting metadata size to %d", metadata_size)
861                                                         break
862                                                 }
863                                         }
864                                 }
865                                 if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
866                                         c.requestPendingMetadata()
867                                 }
868                         case metadataExtendedId:
869                                 err = cl.gotMetadataExtensionMsg(msg.ExtendedPayload, t, c)
870                                 if err != nil {
871                                         err = fmt.Errorf("error handling metadata extension message: %s", err)
872                                 }
873                         case pexExtendedId:
874                                 if cl.config.DisablePEX {
875                                         break
876                                 }
877                                 var pexMsg peerExchangeMessage
878                                 err = bencode.Unmarshal(msg.ExtendedPayload, &pexMsg)
879                                 if err != nil {
880                                         err = fmt.Errorf("error unmarshalling PEX message: %s", err)
881                                         break
882                                 }
883                                 go func() {
884                                         cl.mu.Lock()
885                                         t.addPeers(func() (ret []Peer) {
886                                                 for i, cp := range pexMsg.Added {
887                                                         p := Peer{
888                                                                 IP:     make([]byte, 4),
889                                                                 Port:   cp.Port,
890                                                                 Source: peerSourcePEX,
891                                                         }
892                                                         if i < len(pexMsg.AddedFlags) && pexMsg.AddedFlags[i]&0x01 != 0 {
893                                                                 p.SupportsEncryption = true
894                                                         }
895                                                         missinggo.CopyExact(p.IP, cp.IP[:])
896                                                         ret = append(ret, p)
897                                                 }
898                                                 return
899                                         }())
900                                         cl.mu.Unlock()
901                                 }()
902                         default:
903                                 err = fmt.Errorf("unexpected extended message ID: %v", msg.ExtendedID)
904                         }
905                         if err != nil {
906                                 // That client uses its own extension IDs for outgoing message
907                                 // types, which is incorrect.
908                                 if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) ||
909                                         strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
910                                         return nil
911                                 }
912                         }
913                 case pp.Port:
914                         if cl.dHT == nil {
915                                 break
916                         }
917                         pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
918                         if err != nil {
919                                 panic(err)
920                         }
921                         if msg.Port != 0 {
922                                 pingAddr.Port = int(msg.Port)
923                         }
924                         go cl.dHT.Ping(pingAddr, nil)
925                 default:
926                         err = fmt.Errorf("received unknown message type: %#v", msg.Type)
927                 }
928                 if err != nil {
929                         return err
930                 }
931         }
932 }
933
934 // Set both the Reader and Writer for the connection from a single ReadWriter.
935 func (cn *connection) setRW(rw io.ReadWriter) {
936         cn.r = rw
937         cn.w = rw
938 }
939
940 // Returns the Reader and Writer as a combined ReadWriter.
941 func (cn *connection) rw() io.ReadWriter {
942         return struct {
943                 io.Reader
944                 io.Writer
945         }{cn.r, cn.w}
946 }
947
948 // Handle a received chunk from a peer.
949 func (c *connection) receiveChunk(msg *pp.Message) {
950         t := c.t
951         cl := t.cl
952         chunksReceived.Add(1)
953
954         req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
955
956         // Request has been satisfied.
957         if cl.connDeleteRequest(t, c, req) {
958                 defer c.updateRequests()
959         } else {
960                 unexpectedChunksReceived.Add(1)
961         }
962
963         // Do we actually want this chunk?
964         if !t.wantPiece(req) {
965                 unwantedChunksReceived.Add(1)
966                 c.UnwantedChunksReceived++
967                 return
968         }
969
970         index := int(req.Index)
971         piece := &t.pieces[index]
972
973         c.UsefulChunksReceived++
974         c.lastUsefulChunkReceived = time.Now()
975
976         c.upload()
977
978         // Need to record that it hasn't been written yet, before we attempt to do
979         // anything with it.
980         piece.incrementPendingWrites()
981         // Record that we have the chunk.
982         piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
983
984         // Cancel pending requests for this chunk.
985         for c := range t.conns {
986                 c.updateRequests()
987         }
988
989         cl.mu.Unlock()
990         // Write the chunk out. Note that the upper bound on chunk writing
991         // concurrency will be the number of connections.
992         err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
993         cl.mu.Lock()
994
995         piece.decrementPendingWrites()
996
997         if err != nil {
998                 log.Printf("%s (%x): error writing chunk %v: %s", t, t.infoHash, req, err)
999                 t.pendRequest(req)
1000                 t.updatePieceCompletion(int(msg.Index))
1001                 return
1002         }
1003
1004         // It's important that the piece is potentially queued before we check if
1005         // the piece is still wanted, because if it is queued, it won't be wanted.
1006         if t.pieceAllDirty(index) {
1007                 t.queuePieceCheck(int(req.Index))
1008         }
1009
1010         if c.peerTouchedPieces == nil {
1011                 c.peerTouchedPieces = make(map[int]struct{})
1012         }
1013         c.peerTouchedPieces[index] = struct{}{}
1014
1015         cl.event.Broadcast()
1016         t.publishPieceChange(int(req.Index))
1017         return
1018 }
1019
1020 // Also handles choking and unchoking of the remote peer.
1021 func (c *connection) upload() {
1022         t := c.t
1023         cl := t.cl
1024         if cl.config.NoUpload {
1025                 return
1026         }
1027         if !c.PeerInterested {
1028                 return
1029         }
1030         seeding := t.seeding()
1031         if !seeding && !c.peerHasWantedPieces() {
1032                 // There's no reason to upload to this peer.
1033                 return
1034         }
1035         // Breaking or completing this loop means we don't want to upload to the
1036         // peer anymore, and we choke them.
1037 another:
1038         for seeding || c.chunksSent < c.UsefulChunksReceived+6 {
1039                 // We want to upload to the peer.
1040                 c.Unchoke()
1041                 for r := range c.PeerRequests {
1042                         res := cl.uploadLimit.ReserveN(time.Now(), int(r.Length))
1043                         delay := res.Delay()
1044                         if delay > 0 {
1045                                 res.Cancel()
1046                                 go func() {
1047                                         time.Sleep(delay)
1048                                         cl.mu.Lock()
1049                                         defer cl.mu.Unlock()
1050                                         c.upload()
1051                                 }()
1052                                 return
1053                         }
1054                         err := cl.sendChunk(t, c, r)
1055                         if err != nil {
1056                                 i := int(r.Index)
1057                                 if t.pieceComplete(i) {
1058                                         t.updatePieceCompletion(i)
1059                                         if !t.pieceComplete(i) {
1060                                                 // We had the piece, but not anymore.
1061                                                 break another
1062                                         }
1063                                 }
1064                                 log.Printf("error sending chunk %+v to peer: %s", r, err)
1065                                 // If we failed to send a chunk, choke the peer to ensure they
1066                                 // flush all their requests. We've probably dropped a piece,
1067                                 // but there's no way to communicate this to the peer. If they
1068                                 // ask for it again, we'll kick them to allow us to send them
1069                                 // an updated bitfield.
1070                                 break another
1071                         }
1072                         delete(c.PeerRequests, r)
1073                         goto another
1074                 }
1075                 return
1076         }
1077         c.Choke()
1078 }
1079
1080 func (cn *connection) Drop() {
1081         cn.t.dropConnection(cn)
1082 }
1083
1084 func (cn *connection) netGoodPiecesDirtied() int {
1085         return cn.goodPiecesDirtied - cn.badPiecesDirtied
1086 }
1087
1088 func (c *connection) peerHasWantedPieces() bool {
1089         return !c.pieceRequestOrder.IsEmpty()
1090 }
1091
1092 func (c *connection) numLocalRequests() int {
1093         return len(c.requests)
1094 }