]> Sergey Matveev's repositories - btrtrc.git/blob - connection.go
Support plaintext crypto method for protocol header encryption
[btrtrc.git] / connection.go
1 package torrent
2
3 import (
4         "bufio"
5         "bytes"
6         "errors"
7         "fmt"
8         "io"
9         "log"
10         "math/rand"
11         "net"
12         "strconv"
13         "strings"
14         "sync"
15         "time"
16
17         "github.com/anacrolix/torrent/mse"
18
19         "github.com/anacrolix/missinggo"
20         "github.com/anacrolix/missinggo/bitmap"
21         "github.com/anacrolix/missinggo/iter"
22         "github.com/anacrolix/missinggo/prioritybitmap"
23
24         "github.com/anacrolix/torrent/bencode"
25         pp "github.com/anacrolix/torrent/peer_protocol"
26 )
27
28 type peerSource string
29
30 const (
31         peerSourceTracker         = "T" // It's the default.
32         peerSourceIncoming        = "I"
33         peerSourceDHTGetPeers     = "Hg"
34         peerSourceDHTAnnouncePeer = "Ha"
35         peerSourcePEX             = "X"
36 )
37
38 // Maintains the state of a connection with a peer.
39 type connection struct {
40         t *Torrent
41         // The actual Conn, used for closing, and setting socket options.
42         conn net.Conn
43         // The Reader and Writer for this Conn, with hooks installed for stats,
44         // limiting, deadlines etc.
45         w io.Writer
46         r io.Reader
47         // True if the connection is operating over MSE obfuscation.
48         headerEncrypted bool
49         cryptoMethod    uint32
50         Discovery       peerSource
51         uTP             bool
52         closed          missinggo.Event
53
54         stats                  ConnStats
55         UnwantedChunksReceived int
56         UsefulChunksReceived   int
57         chunksSent             int
58         goodPiecesDirtied      int
59         badPiecesDirtied       int
60
61         lastMessageReceived     time.Time
62         completedHandshake      time.Time
63         lastUsefulChunkReceived time.Time
64         lastChunkSent           time.Time
65
66         // Stuff controlled by the local peer.
67         Interested       bool
68         Choked           bool
69         requests         map[request]struct{}
70         requestsLowWater int
71         // Indexed by metadata piece, set to true if posted and pending a
72         // response.
73         metadataRequests []bool
74         sentHaves        []bool
75
76         // Stuff controlled by the remote peer.
77         PeerID             [20]byte
78         PeerInterested     bool
79         PeerChoked         bool
80         PeerRequests       map[request]struct{}
81         PeerExtensionBytes peerExtensionBytes
82         // The pieces the peer has claimed to have.
83         peerPieces bitmap.Bitmap
84         // The peer has everything. This can occur due to a special message, when
85         // we may not even know the number of pieces in the torrent yet.
86         peerHasAll bool
87         // The highest possible number of pieces the torrent could have based on
88         // communication with the peer. Generally only useful until we have the
89         // torrent info.
90         peerMinPieces int
91         // Pieces we've accepted chunks for from the peer.
92         peerTouchedPieces map[int]struct{}
93
94         PeerMaxRequests  int // Maximum pending requests the peer allows.
95         PeerExtensionIDs map[string]byte
96         PeerClientName   string
97
98         pieceInclination  []int
99         pieceRequestOrder prioritybitmap.PriorityBitmap
100
101         postedBuffer bytes.Buffer
102         writerCond   sync.Cond
103 }
104
105 func (cn *connection) mu() sync.Locker {
106         return &cn.t.cl.mu
107 }
108
109 func (cn *connection) remoteAddr() net.Addr {
110         return cn.conn.RemoteAddr()
111 }
112
113 func (cn *connection) localAddr() net.Addr {
114         return cn.conn.LocalAddr()
115 }
116
117 func (cn *connection) supportsExtension(ext string) bool {
118         _, ok := cn.PeerExtensionIDs[ext]
119         return ok
120 }
121
122 // The best guess at number of pieces in the torrent for this peer.
123 func (cn *connection) bestPeerNumPieces() int {
124         if cn.t.haveInfo() {
125                 return cn.t.numPieces()
126         }
127         return cn.peerMinPieces
128 }
129
130 func (cn *connection) completedString() string {
131         return fmt.Sprintf("%d/%d", cn.peerPieces.Len(), cn.bestPeerNumPieces())
132 }
133
134 // Correct the PeerPieces slice length. Return false if the existing slice is
135 // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
136 // messages.
137 func (cn *connection) setNumPieces(num int) error {
138         cn.peerPieces.RemoveRange(num, -1)
139         cn.peerPiecesChanged()
140         return nil
141 }
142
143 func eventAgeString(t time.Time) string {
144         if t.IsZero() {
145                 return "never"
146         }
147         return fmt.Sprintf("%.2fs ago", time.Now().Sub(t).Seconds())
148 }
149
150 func (cn *connection) connectionFlags() (ret string) {
151         c := func(b byte) {
152                 ret += string([]byte{b})
153         }
154         if cn.cryptoMethod == mse.CryptoMethodRC4 {
155                 c('E')
156         } else if cn.headerEncrypted {
157                 c('e')
158         }
159         ret += string(cn.Discovery)
160         if cn.uTP {
161                 c('T')
162         }
163         return
164 }
165
166 // Inspired by https://trac.transmissionbt.com/wiki/PeerStatusText
167 func (cn *connection) statusFlags() (ret string) {
168         c := func(b byte) {
169                 ret += string([]byte{b})
170         }
171         if cn.Interested {
172                 c('i')
173         }
174         if cn.Choked {
175                 c('c')
176         }
177         c('-')
178         ret += cn.connectionFlags()
179         c('-')
180         if cn.PeerInterested {
181                 c('i')
182         }
183         if cn.PeerChoked {
184                 c('c')
185         }
186         return
187 }
188
189 func (cn *connection) String() string {
190         var buf bytes.Buffer
191         cn.WriteStatus(&buf, nil)
192         return buf.String()
193 }
194
195 func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
196         // \t isn't preserved in <pre> blocks?
197         fmt.Fprintf(w, "%+q: %s-%s\n", cn.PeerID, cn.localAddr(), cn.remoteAddr())
198         fmt.Fprintf(w, "    last msg: %s, connected: %s, last useful chunk: %s\n",
199                 eventAgeString(cn.lastMessageReceived),
200                 eventAgeString(cn.completedHandshake),
201                 eventAgeString(cn.lastUsefulChunkReceived))
202         fmt.Fprintf(w,
203                 "    %s completed, %d pieces touched, good chunks: %d/%d-%d reqq: %d-%d, flags: %s\n",
204                 cn.completedString(),
205                 len(cn.peerTouchedPieces),
206                 cn.UsefulChunksReceived,
207                 cn.UnwantedChunksReceived+cn.UsefulChunksReceived,
208                 cn.chunksSent,
209                 cn.numLocalRequests(),
210                 len(cn.PeerRequests),
211                 cn.statusFlags(),
212         )
213         fmt.Fprintf(w, "    next pieces: %v\n", priorityBitmapHeadAsSlice(&cn.pieceRequestOrder, 10))
214 }
215
216 func priorityBitmapHeadAsSlice(pb *prioritybitmap.PriorityBitmap, n int) (ret []int) {
217         pb.IterTyped(func(i int) bool {
218                 if len(ret) >= n {
219                         return false
220                 }
221                 ret = append(ret, i)
222                 return true
223         })
224         return
225 }
226
227 func (cn *connection) Close() {
228         cn.closed.Set()
229         cn.discardPieceInclination()
230         cn.pieceRequestOrder.Clear()
231         if cn.conn != nil {
232                 // TODO: This call blocks sometimes, why?
233                 go cn.conn.Close()
234         }
235 }
236
237 func (cn *connection) PeerHasPiece(piece int) bool {
238         return cn.peerHasAll || cn.peerPieces.Contains(piece)
239 }
240
241 func (cn *connection) Post(msg pp.Message) {
242         messageTypesPosted.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
243         cn.postedBuffer.Write(msg.MustMarshalBinary())
244         cn.tickleWriter()
245 }
246
247 func (cn *connection) requestMetadataPiece(index int) {
248         eID := cn.PeerExtensionIDs["ut_metadata"]
249         if eID == 0 {
250                 return
251         }
252         if index < len(cn.metadataRequests) && cn.metadataRequests[index] {
253                 return
254         }
255         cn.Post(pp.Message{
256                 Type:       pp.Extended,
257                 ExtendedID: eID,
258                 ExtendedPayload: func() []byte {
259                         b, err := bencode.Marshal(map[string]int{
260                                 "msg_type": pp.RequestMetadataExtensionMsgType,
261                                 "piece":    index,
262                         })
263                         if err != nil {
264                                 panic(err)
265                         }
266                         return b
267                 }(),
268         })
269         for index >= len(cn.metadataRequests) {
270                 cn.metadataRequests = append(cn.metadataRequests, false)
271         }
272         cn.metadataRequests[index] = true
273 }
274
275 func (cn *connection) requestedMetadataPiece(index int) bool {
276         return index < len(cn.metadataRequests) && cn.metadataRequests[index]
277 }
278
279 // The actual value to use as the maximum outbound requests.
280 func (cn *connection) nominalMaxRequests() (ret int) {
281         ret = cn.PeerMaxRequests
282         if ret > 64 {
283                 ret = 64
284         }
285         return
286 }
287
288 // Returns true if an unsatisfied request was canceled.
289 func (cn *connection) PeerCancel(r request) bool {
290         if cn.PeerRequests == nil {
291                 return false
292         }
293         if _, ok := cn.PeerRequests[r]; !ok {
294                 return false
295         }
296         delete(cn.PeerRequests, r)
297         return true
298 }
299
300 func (cn *connection) Choke() {
301         if cn.Choked {
302                 return
303         }
304         cn.Post(pp.Message{
305                 Type: pp.Choke,
306         })
307         cn.PeerRequests = nil
308         cn.Choked = true
309 }
310
311 func (cn *connection) Unchoke() {
312         if !cn.Choked {
313                 return
314         }
315         cn.Post(pp.Message{
316                 Type: pp.Unchoke,
317         })
318         cn.Choked = false
319 }
320
321 func (cn *connection) SetInterested(interested bool, msg func(pp.Message) bool) bool {
322         if cn.Interested == interested {
323                 return true
324         }
325         cn.Interested = interested
326         // log.Printf("%p: setting interest: %v", cn, interested)
327         return msg(pp.Message{
328                 Type: func() pp.MessageType {
329                         if interested {
330                                 return pp.Interested
331                         } else {
332                                 return pp.NotInterested
333                         }
334                 }(),
335         })
336 }
337
338 func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) {
339         numFillBuffers.Add(1)
340         cancel, new, i := cn.desiredRequestState()
341         if !cn.SetInterested(i, msg) {
342                 return
343         }
344         if cancel && len(cn.requests) != 0 {
345                 fillBufferSentCancels.Add(1)
346                 for r := range cn.requests {
347                         cn.deleteRequest(r)
348                         // log.Printf("%p: cancelling request: %v", cn, r)
349                         if !msg(pp.Message{
350                                 Type:   pp.Cancel,
351                                 Index:  r.Index,
352                                 Begin:  r.Begin,
353                                 Length: r.Length,
354                         }) {
355                                 return
356                         }
357                 }
358         }
359         if len(new) != 0 {
360                 fillBufferSentRequests.Add(1)
361                 for _, r := range new {
362                         if cn.requests == nil {
363                                 cn.requests = make(map[request]struct{}, cn.nominalMaxRequests())
364                         }
365                         cn.requests[r] = struct{}{}
366                         // log.Printf("%p: requesting %v", cn, r)
367                         if !msg(pp.Message{
368                                 Type:   pp.Request,
369                                 Index:  r.Index,
370                                 Begin:  r.Begin,
371                                 Length: r.Length,
372                         }) {
373                                 return
374                         }
375                 }
376                 // If we didn't completely top up the requests, we shouldn't mark the
377                 // low water, since we'll want to top up the requests as soon as we
378                 // have more write buffer space.
379                 cn.requestsLowWater = len(cn.requests) / 2
380         }
381 }
382
383 // Writes buffers to the socket from the write channel.
384 func (cn *connection) writer(keepAliveTimeout time.Duration) {
385         var (
386                 buf       bytes.Buffer
387                 lastWrite time.Time = time.Now()
388         )
389         var keepAliveTimer *time.Timer
390         keepAliveTimer = time.AfterFunc(keepAliveTimeout, func() {
391                 cn.mu().Lock()
392                 defer cn.mu().Unlock()
393                 if time.Since(lastWrite) >= keepAliveTimeout {
394                         cn.tickleWriter()
395                 }
396                 keepAliveTimer.Reset(keepAliveTimeout)
397         })
398         cn.mu().Lock()
399         defer cn.mu().Unlock()
400         defer cn.Close()
401         defer keepAliveTimer.Stop()
402         for {
403                 buf.Write(cn.postedBuffer.Bytes())
404                 cn.postedBuffer.Reset()
405                 if buf.Len() == 0 {
406                         cn.fillWriteBuffer(func(msg pp.Message) bool {
407                                 cn.wroteMsg(&msg)
408                                 buf.Write(msg.MustMarshalBinary())
409                                 return buf.Len() < 1<<16
410                         })
411                 }
412                 if buf.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout {
413                         buf.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
414                         postedKeepalives.Add(1)
415                 }
416                 if buf.Len() == 0 {
417                         cn.writerCond.Wait()
418                         continue
419                 }
420                 cn.mu().Unlock()
421                 // log.Printf("writing %d bytes", buf.Len())
422                 n, err := cn.w.Write(buf.Bytes())
423                 cn.mu().Lock()
424                 if n != 0 {
425                         lastWrite = time.Now()
426                         keepAliveTimer.Reset(keepAliveTimeout)
427                 }
428                 if err != nil {
429                         return
430                 }
431                 if n != buf.Len() {
432                         panic("short write")
433                 }
434                 buf.Reset()
435         }
436 }
437
438 func (cn *connection) Have(piece int) {
439         for piece >= len(cn.sentHaves) {
440                 cn.sentHaves = append(cn.sentHaves, false)
441         }
442         if cn.sentHaves[piece] {
443                 return
444         }
445         cn.Post(pp.Message{
446                 Type:  pp.Have,
447                 Index: pp.Integer(piece),
448         })
449         cn.sentHaves[piece] = true
450 }
451
452 func (cn *connection) Bitfield(haves []bool) {
453         if cn.sentHaves != nil {
454                 panic("bitfield must be first have-related message sent")
455         }
456         cn.Post(pp.Message{
457                 Type:     pp.Bitfield,
458                 Bitfield: haves,
459         })
460         // Make a copy of haves, as that's read when the message is marshalled
461         // without the lock. Also it obviously shouldn't change in the Msg due to
462         // changes in .sentHaves.
463         cn.sentHaves = append([]bool(nil), haves...)
464 }
465
466 func nextRequestState(
467         networkingEnabled bool,
468         currentRequests map[request]struct{},
469         peerChoking bool,
470         nextPieces *prioritybitmap.PriorityBitmap,
471         pendingChunks func(piece int, f func(chunkSpec) bool) bool,
472         requestsLowWater int,
473         requestsHighWater int,
474 ) (
475         cancelExisting bool,
476         newRequests []request,
477         interested bool,
478 ) {
479         if !networkingEnabled || nextPieces.IsEmpty() {
480                 return true, nil, false
481         }
482         if peerChoking || len(currentRequests) > requestsLowWater {
483                 return false, nil, !nextPieces.IsEmpty()
484         }
485         nextPieces.IterTyped(func(piece int) bool {
486                 return pendingChunks(piece, func(cs chunkSpec) bool {
487                         r := request{pp.Integer(piece), cs}
488                         if _, ok := currentRequests[r]; !ok {
489                                 if newRequests == nil {
490                                         newRequests = make([]request, 0, requestsHighWater-len(currentRequests))
491                                 }
492                                 newRequests = append(newRequests, r)
493                         }
494                         return len(currentRequests)+len(newRequests) < requestsHighWater
495                 })
496         })
497         return false, newRequests, true
498 }
499
500 func (cn *connection) updateRequests() {
501         cn.tickleWriter()
502 }
503
504 func (cn *connection) desiredRequestState() (bool, []request, bool) {
505         return nextRequestState(
506                 cn.t.networkingEnabled,
507                 cn.requests,
508                 cn.PeerChoked,
509                 &cn.pieceRequestOrder,
510                 func(piece int, f func(chunkSpec) bool) bool {
511                         return undirtiedChunks(piece, cn.t, f)
512                 },
513                 cn.requestsLowWater,
514                 cn.nominalMaxRequests(),
515         )
516 }
517
518 func undirtiedChunks(piece int, t *Torrent, f func(chunkSpec) bool) bool {
519         chunkIndices := t.pieces[piece].undirtiedChunkIndices().ToSortedSlice()
520         return iter.ForPerm(len(chunkIndices), func(i int) bool {
521                 return f(t.chunkIndexSpec(chunkIndices[i], piece))
522         })
523 }
524
525 // check callers updaterequests
526 func (cn *connection) stopRequestingPiece(piece int) bool {
527         return cn.pieceRequestOrder.Remove(piece)
528 }
529
530 // This is distinct from Torrent piece priority, which is the user's
531 // preference. Connection piece priority is specific to a connection,
532 // pseudorandomly avoids connections always requesting the same pieces and
533 // thus wasting effort.
534 func (cn *connection) updatePiecePriority(piece int) bool {
535         tpp := cn.t.piecePriority(piece)
536         if !cn.PeerHasPiece(piece) {
537                 tpp = PiecePriorityNone
538         }
539         if tpp == PiecePriorityNone {
540                 return cn.stopRequestingPiece(piece)
541         }
542         prio := cn.getPieceInclination()[piece]
543         switch tpp {
544         case PiecePriorityNormal:
545         case PiecePriorityReadahead:
546                 prio -= cn.t.numPieces()
547         case PiecePriorityNext, PiecePriorityNow:
548                 prio -= 2 * cn.t.numPieces()
549         default:
550                 panic(tpp)
551         }
552         prio += piece / 3
553         return cn.pieceRequestOrder.Set(piece, prio)
554 }
555
556 func (cn *connection) getPieceInclination() []int {
557         if cn.pieceInclination == nil {
558                 cn.pieceInclination = cn.t.getConnPieceInclination()
559         }
560         return cn.pieceInclination
561 }
562
563 func (cn *connection) discardPieceInclination() {
564         if cn.pieceInclination == nil {
565                 return
566         }
567         cn.t.putPieceInclination(cn.pieceInclination)
568         cn.pieceInclination = nil
569 }
570
571 func (cn *connection) peerPiecesChanged() {
572         if cn.t.haveInfo() {
573                 prioritiesChanged := false
574                 for i := range iter.N(cn.t.numPieces()) {
575                         if cn.updatePiecePriority(i) {
576                                 prioritiesChanged = true
577                         }
578                 }
579                 if prioritiesChanged {
580                         cn.updateRequests()
581                 }
582         }
583 }
584
585 func (cn *connection) raisePeerMinPieces(newMin int) {
586         if newMin > cn.peerMinPieces {
587                 cn.peerMinPieces = newMin
588         }
589 }
590
591 func (cn *connection) peerSentHave(piece int) error {
592         if cn.t.haveInfo() && piece >= cn.t.numPieces() {
593                 return errors.New("invalid piece")
594         }
595         if cn.PeerHasPiece(piece) {
596                 return nil
597         }
598         cn.raisePeerMinPieces(piece + 1)
599         cn.peerPieces.Set(piece, true)
600         if cn.updatePiecePriority(piece) {
601                 cn.updateRequests()
602         }
603         return nil
604 }
605
606 func (cn *connection) peerSentBitfield(bf []bool) error {
607         cn.peerHasAll = false
608         if len(bf)%8 != 0 {
609                 panic("expected bitfield length divisible by 8")
610         }
611         // We know that the last byte means that at most the last 7 bits are
612         // wasted.
613         cn.raisePeerMinPieces(len(bf) - 7)
614         if cn.t.haveInfo() && len(bf) > cn.t.numPieces() {
615                 // Ignore known excess pieces.
616                 bf = bf[:cn.t.numPieces()]
617         }
618         for i, have := range bf {
619                 if have {
620                         cn.raisePeerMinPieces(i + 1)
621                 }
622                 cn.peerPieces.Set(i, have)
623         }
624         cn.peerPiecesChanged()
625         return nil
626 }
627
628 func (cn *connection) peerSentHaveAll() error {
629         cn.peerHasAll = true
630         cn.peerPieces.Clear()
631         cn.peerPiecesChanged()
632         return nil
633 }
634
635 func (cn *connection) peerSentHaveNone() error {
636         cn.peerPieces.Clear()
637         cn.peerHasAll = false
638         cn.peerPiecesChanged()
639         return nil
640 }
641
642 func (c *connection) requestPendingMetadata() {
643         if c.t.haveInfo() {
644                 return
645         }
646         if c.PeerExtensionIDs["ut_metadata"] == 0 {
647                 // Peer doesn't support this.
648                 return
649         }
650         // Request metadata pieces that we don't have in a random order.
651         var pending []int
652         for index := 0; index < c.t.metadataPieceCount(); index++ {
653                 if !c.t.haveMetadataPiece(index) && !c.requestedMetadataPiece(index) {
654                         pending = append(pending, index)
655                 }
656         }
657         for _, i := range rand.Perm(len(pending)) {
658                 c.requestMetadataPiece(pending[i])
659         }
660 }
661
662 func (cn *connection) wroteMsg(msg *pp.Message) {
663         messageTypesSent.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
664         cn.stats.wroteMsg(msg)
665         cn.t.stats.wroteMsg(msg)
666 }
667
668 func (cn *connection) readMsg(msg *pp.Message) {
669         cn.stats.readMsg(msg)
670         cn.t.stats.readMsg(msg)
671 }
672
673 func (cn *connection) wroteBytes(n int64) {
674         cn.stats.wroteBytes(n)
675         if cn.t != nil {
676                 cn.t.stats.wroteBytes(n)
677         }
678 }
679
680 func (cn *connection) readBytes(n int64) {
681         cn.stats.readBytes(n)
682         if cn.t != nil {
683                 cn.t.stats.readBytes(n)
684         }
685 }
686
687 // Returns whether the connection is currently useful to us. We're seeding and
688 // they want data, we don't have metainfo and they can provide it, etc.
689 func (c *connection) useful() bool {
690         t := c.t
691         if c.closed.IsSet() {
692                 return false
693         }
694         if !t.haveInfo() {
695                 return c.supportsExtension("ut_metadata")
696         }
697         if t.seeding() {
698                 return c.PeerInterested
699         }
700         return c.peerHasWantedPieces()
701 }
702
703 func (c *connection) lastHelpful() (ret time.Time) {
704         ret = c.lastUsefulChunkReceived
705         if c.t.seeding() && c.lastChunkSent.After(ret) {
706                 ret = c.lastChunkSent
707         }
708         return
709 }
710
711 // Processes incoming bittorrent messages. The client lock is held upon entry
712 // and exit. Returning will end the connection.
713 func (c *connection) mainReadLoop() error {
714         t := c.t
715         cl := t.cl
716
717         decoder := pp.Decoder{
718                 R:         bufio.NewReader(c.r),
719                 MaxLength: 256 * 1024,
720                 Pool:      t.chunkPool,
721         }
722         for {
723                 var (
724                         msg pp.Message
725                         err error
726                 )
727                 func() {
728                         cl.mu.Unlock()
729                         defer cl.mu.Lock()
730                         err = decoder.Decode(&msg)
731                 }()
732                 if cl.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
733                         return nil
734                 }
735                 if err != nil {
736                         return err
737                 }
738                 c.readMsg(&msg)
739                 c.lastMessageReceived = time.Now()
740                 if msg.Keepalive {
741                         receivedKeepalives.Add(1)
742                         continue
743                 }
744                 messageTypesReceived.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
745                 switch msg.Type {
746                 case pp.Choke:
747                         c.PeerChoked = true
748                         c.requests = nil
749                         // We can then reset our interest.
750                         c.updateRequests()
751                 case pp.Reject:
752                         if c.deleteRequest(newRequest(msg.Index, msg.Begin, msg.Length)) {
753                                 c.updateRequests()
754                         }
755                 case pp.Unchoke:
756                         c.PeerChoked = false
757                         c.tickleWriter()
758                 case pp.Interested:
759                         c.PeerInterested = true
760                         c.upload()
761                 case pp.NotInterested:
762                         c.PeerInterested = false
763                         c.Choke()
764                 case pp.Have:
765                         err = c.peerSentHave(int(msg.Index))
766                 case pp.Request:
767                         if c.Choked {
768                                 break
769                         }
770                         if !c.PeerInterested {
771                                 err = errors.New("peer sent request but isn't interested")
772                                 break
773                         }
774                         if !t.havePiece(msg.Index.Int()) {
775                                 // This isn't necessarily them screwing up. We can drop pieces
776                                 // from our storage, and can't communicate this to peers
777                                 // except by reconnecting.
778                                 requestsReceivedForMissingPieces.Add(1)
779                                 err = errors.New("peer requested piece we don't have")
780                                 break
781                         }
782                         if c.PeerRequests == nil {
783                                 c.PeerRequests = make(map[request]struct{}, maxRequests)
784                         }
785                         c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{}
786                         c.upload()
787                 case pp.Cancel:
788                         req := newRequest(msg.Index, msg.Begin, msg.Length)
789                         if !c.PeerCancel(req) {
790                                 unexpectedCancels.Add(1)
791                         }
792                 case pp.Bitfield:
793                         err = c.peerSentBitfield(msg.Bitfield)
794                 case pp.HaveAll:
795                         err = c.peerSentHaveAll()
796                 case pp.HaveNone:
797                         err = c.peerSentHaveNone()
798                 case pp.Piece:
799                         c.receiveChunk(&msg)
800                         if len(msg.Piece) == int(t.chunkSize) {
801                                 t.chunkPool.Put(msg.Piece)
802                         }
803                 case pp.Extended:
804                         switch msg.ExtendedID {
805                         case pp.HandshakeExtendedID:
806                                 // TODO: Create a bencode struct for this.
807                                 var d map[string]interface{}
808                                 err = bencode.Unmarshal(msg.ExtendedPayload, &d)
809                                 if err != nil {
810                                         err = fmt.Errorf("error decoding extended message payload: %s", err)
811                                         break
812                                 }
813                                 // log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d)
814                                 if reqq, ok := d["reqq"]; ok {
815                                         if i, ok := reqq.(int64); ok {
816                                                 c.PeerMaxRequests = int(i)
817                                         }
818                                 }
819                                 if v, ok := d["v"]; ok {
820                                         c.PeerClientName = v.(string)
821                                 }
822                                 m, ok := d["m"]
823                                 if !ok {
824                                         err = errors.New("handshake missing m item")
825                                         break
826                                 }
827                                 mTyped, ok := m.(map[string]interface{})
828                                 if !ok {
829                                         err = errors.New("handshake m value is not dict")
830                                         break
831                                 }
832                                 if c.PeerExtensionIDs == nil {
833                                         c.PeerExtensionIDs = make(map[string]byte, len(mTyped))
834                                 }
835                                 for name, v := range mTyped {
836                                         id, ok := v.(int64)
837                                         if !ok {
838                                                 log.Printf("bad handshake m item extension ID type: %T", v)
839                                                 continue
840                                         }
841                                         if id == 0 {
842                                                 delete(c.PeerExtensionIDs, name)
843                                         } else {
844                                                 if c.PeerExtensionIDs[name] == 0 {
845                                                         supportedExtensionMessages.Add(name, 1)
846                                                 }
847                                                 c.PeerExtensionIDs[name] = byte(id)
848                                         }
849                                 }
850                                 metadata_sizeUntyped, ok := d["metadata_size"]
851                                 if ok {
852                                         metadata_size, ok := metadata_sizeUntyped.(int64)
853                                         if !ok {
854                                                 log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
855                                         } else {
856                                                 err = t.setMetadataSize(metadata_size)
857                                                 if err != nil {
858                                                         err = fmt.Errorf("error setting metadata size to %d", metadata_size)
859                                                         break
860                                                 }
861                                         }
862                                 }
863                                 if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
864                                         c.requestPendingMetadata()
865                                 }
866                         case metadataExtendedId:
867                                 err = cl.gotMetadataExtensionMsg(msg.ExtendedPayload, t, c)
868                                 if err != nil {
869                                         err = fmt.Errorf("error handling metadata extension message: %s", err)
870                                 }
871                         case pexExtendedId:
872                                 if cl.config.DisablePEX {
873                                         break
874                                 }
875                                 var pexMsg peerExchangeMessage
876                                 err = bencode.Unmarshal(msg.ExtendedPayload, &pexMsg)
877                                 if err != nil {
878                                         err = fmt.Errorf("error unmarshalling PEX message: %s", err)
879                                         break
880                                 }
881                                 go func() {
882                                         cl.mu.Lock()
883                                         t.addPeers(func() (ret []Peer) {
884                                                 for i, cp := range pexMsg.Added {
885                                                         p := Peer{
886                                                                 IP:     make([]byte, 4),
887                                                                 Port:   cp.Port,
888                                                                 Source: peerSourcePEX,
889                                                         }
890                                                         if i < len(pexMsg.AddedFlags) && pexMsg.AddedFlags[i]&0x01 != 0 {
891                                                                 p.SupportsEncryption = true
892                                                         }
893                                                         missinggo.CopyExact(p.IP, cp.IP[:])
894                                                         ret = append(ret, p)
895                                                 }
896                                                 return
897                                         }())
898                                         cl.mu.Unlock()
899                                 }()
900                         default:
901                                 err = fmt.Errorf("unexpected extended message ID: %v", msg.ExtendedID)
902                         }
903                         if err != nil {
904                                 // That client uses its own extension IDs for outgoing message
905                                 // types, which is incorrect.
906                                 if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) ||
907                                         strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
908                                         return nil
909                                 }
910                         }
911                 case pp.Port:
912                         if cl.dHT == nil {
913                                 break
914                         }
915                         pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
916                         if err != nil {
917                                 panic(err)
918                         }
919                         if msg.Port != 0 {
920                                 pingAddr.Port = int(msg.Port)
921                         }
922                         go cl.dHT.Ping(pingAddr, nil)
923                 default:
924                         err = fmt.Errorf("received unknown message type: %#v", msg.Type)
925                 }
926                 if err != nil {
927                         return err
928                 }
929         }
930 }
931
932 // Set both the Reader and Writer for the connection from a single ReadWriter.
933 func (cn *connection) setRW(rw io.ReadWriter) {
934         cn.r = rw
935         cn.w = rw
936 }
937
938 // Returns the Reader and Writer as a combined ReadWriter.
939 func (cn *connection) rw() io.ReadWriter {
940         return struct {
941                 io.Reader
942                 io.Writer
943         }{cn.r, cn.w}
944 }
945
946 // Handle a received chunk from a peer.
947 func (c *connection) receiveChunk(msg *pp.Message) {
948         t := c.t
949         cl := t.cl
950         chunksReceived.Add(1)
951
952         req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
953
954         // Request has been satisfied.
955         if c.deleteRequest(req) {
956                 c.updateRequests()
957         } else {
958                 unexpectedChunksReceived.Add(1)
959         }
960
961         // Do we actually want this chunk?
962         if !t.wantPiece(req) {
963                 unwantedChunksReceived.Add(1)
964                 c.UnwantedChunksReceived++
965                 return
966         }
967
968         index := int(req.Index)
969         piece := &t.pieces[index]
970
971         c.UsefulChunksReceived++
972         c.lastUsefulChunkReceived = time.Now()
973
974         c.upload()
975
976         // Need to record that it hasn't been written yet, before we attempt to do
977         // anything with it.
978         piece.incrementPendingWrites()
979         // Record that we have the chunk.
980         piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
981
982         // Cancel pending requests for this chunk.
983         for c := range t.conns {
984                 c.updateRequests()
985         }
986
987         cl.mu.Unlock()
988         // Write the chunk out. Note that the upper bound on chunk writing
989         // concurrency will be the number of connections.
990         err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
991         cl.mu.Lock()
992
993         piece.decrementPendingWrites()
994
995         if err != nil {
996                 log.Printf("%s (%x): error writing chunk %v: %s", t, t.infoHash, req, err)
997                 t.pendRequest(req)
998                 t.updatePieceCompletion(int(msg.Index))
999                 return
1000         }
1001
1002         // It's important that the piece is potentially queued before we check if
1003         // the piece is still wanted, because if it is queued, it won't be wanted.
1004         if t.pieceAllDirty(index) {
1005                 t.queuePieceCheck(int(req.Index))
1006         }
1007
1008         if c.peerTouchedPieces == nil {
1009                 c.peerTouchedPieces = make(map[int]struct{})
1010         }
1011         c.peerTouchedPieces[index] = struct{}{}
1012
1013         cl.event.Broadcast()
1014         t.publishPieceChange(int(req.Index))
1015         return
1016 }
1017
1018 // Also handles choking and unchoking of the remote peer.
1019 func (c *connection) upload() {
1020         t := c.t
1021         cl := t.cl
1022         if cl.config.NoUpload {
1023                 return
1024         }
1025         if !c.PeerInterested {
1026                 return
1027         }
1028         seeding := t.seeding()
1029         if !seeding && !c.peerHasWantedPieces() {
1030                 // There's no reason to upload to this peer.
1031                 return
1032         }
1033         // Breaking or completing this loop means we don't want to upload to the
1034         // peer anymore, and we choke them.
1035 another:
1036         for seeding || c.chunksSent < c.UsefulChunksReceived+6 {
1037                 // We want to upload to the peer.
1038                 c.Unchoke()
1039                 for r := range c.PeerRequests {
1040                         res := cl.uploadLimit.ReserveN(time.Now(), int(r.Length))
1041                         delay := res.Delay()
1042                         if delay > 0 {
1043                                 res.Cancel()
1044                                 go func() {
1045                                         time.Sleep(delay)
1046                                         cl.mu.Lock()
1047                                         defer cl.mu.Unlock()
1048                                         c.upload()
1049                                 }()
1050                                 return
1051                         }
1052                         err := cl.sendChunk(t, c, r)
1053                         if err != nil {
1054                                 i := int(r.Index)
1055                                 if t.pieceComplete(i) {
1056                                         t.updatePieceCompletion(i)
1057                                         if !t.pieceComplete(i) {
1058                                                 // We had the piece, but not anymore.
1059                                                 break another
1060                                         }
1061                                 }
1062                                 log.Printf("error sending chunk %+v to peer: %s", r, err)
1063                                 // If we failed to send a chunk, choke the peer to ensure they
1064                                 // flush all their requests. We've probably dropped a piece,
1065                                 // but there's no way to communicate this to the peer. If they
1066                                 // ask for it again, we'll kick them to allow us to send them
1067                                 // an updated bitfield.
1068                                 break another
1069                         }
1070                         delete(c.PeerRequests, r)
1071                         goto another
1072                 }
1073                 return
1074         }
1075         c.Choke()
1076 }
1077
1078 func (cn *connection) Drop() {
1079         cn.t.dropConnection(cn)
1080 }
1081
1082 func (cn *connection) netGoodPiecesDirtied() int {
1083         return cn.goodPiecesDirtied - cn.badPiecesDirtied
1084 }
1085
1086 func (c *connection) peerHasWantedPieces() bool {
1087         return !c.pieceRequestOrder.IsEmpty()
1088 }
1089
1090 func (c *connection) numLocalRequests() int {
1091         return len(c.requests)
1092 }
1093
1094 func (c *connection) deleteRequest(r request) bool {
1095         if _, ok := c.requests[r]; !ok {
1096                 return false
1097         }
1098         delete(c.requests, r)
1099         return true
1100 }
1101 func (c *connection) tickleWriter() {
1102         c.writerCond.Broadcast()
1103 }