]> Sergey Matveev's repositories - btrtrc.git/blob - connection.go
9b102eb87dcac0e4305abf5bdcba8091d1f21099
[btrtrc.git] / connection.go
1 package torrent
2
3 import (
4         "bufio"
5         "bytes"
6         "errors"
7         "fmt"
8         "io"
9         "log"
10         "math/rand"
11         "net"
12         "strconv"
13         "strings"
14         "sync"
15         "time"
16
17         "github.com/anacrolix/torrent/mse"
18
19         "github.com/anacrolix/missinggo"
20         "github.com/anacrolix/missinggo/bitmap"
21         "github.com/anacrolix/missinggo/iter"
22         "github.com/anacrolix/missinggo/prioritybitmap"
23
24         "github.com/anacrolix/torrent/bencode"
25         pp "github.com/anacrolix/torrent/peer_protocol"
26 )
27
28 type peerSource string
29
30 const (
31         peerSourceTracker         = "T" // It's the default.
32         peerSourceIncoming        = "I"
33         peerSourceDHTGetPeers     = "Hg"
34         peerSourceDHTAnnouncePeer = "Ha"
35         peerSourcePEX             = "X"
36 )
37
38 // Maintains the state of a connection with a peer.
39 type connection struct {
40         t *Torrent
41         // The actual Conn, used for closing, and setting socket options.
42         conn net.Conn
43         // The Reader and Writer for this Conn, with hooks installed for stats,
44         // limiting, deadlines etc.
45         w io.Writer
46         r io.Reader
47         // True if the connection is operating over MSE obfuscation.
48         headerEncrypted bool
49         cryptoMethod    uint32
50         Discovery       peerSource
51         uTP             bool
52         closed          missinggo.Event
53
54         stats                  ConnStats
55         UnwantedChunksReceived int
56         UsefulChunksReceived   int
57         chunksSent             int
58         goodPiecesDirtied      int
59         badPiecesDirtied       int
60
61         lastMessageReceived     time.Time
62         completedHandshake      time.Time
63         lastUsefulChunkReceived time.Time
64         lastChunkSent           time.Time
65
66         // Stuff controlled by the local peer.
67         Interested       bool
68         Choked           bool
69         requests         map[request]struct{}
70         requestsLowWater int
71         // Indexed by metadata piece, set to true if posted and pending a
72         // response.
73         metadataRequests []bool
74         sentHaves        []bool
75
76         // Stuff controlled by the remote peer.
77         PeerID             [20]byte
78         PeerInterested     bool
79         PeerChoked         bool
80         PeerRequests       map[request]struct{}
81         PeerExtensionBytes peerExtensionBytes
82         // The pieces the peer has claimed to have.
83         peerPieces bitmap.Bitmap
84         // The peer has everything. This can occur due to a special message, when
85         // we may not even know the number of pieces in the torrent yet.
86         peerHasAll bool
87         // The highest possible number of pieces the torrent could have based on
88         // communication with the peer. Generally only useful until we have the
89         // torrent info.
90         peerMinPieces int
91         // Pieces we've accepted chunks for from the peer.
92         peerTouchedPieces map[int]struct{}
93
94         PeerMaxRequests  int // Maximum pending requests the peer allows.
95         PeerExtensionIDs map[string]byte
96         PeerClientName   string
97
98         pieceInclination  []int
99         pieceRequestOrder prioritybitmap.PriorityBitmap
100
101         postedBuffer bytes.Buffer
102         uploadTimer  *time.Timer
103         writerCond   sync.Cond
104 }
105
106 func (cn *connection) mu() sync.Locker {
107         return &cn.t.cl.mu
108 }
109
110 func (cn *connection) remoteAddr() net.Addr {
111         return cn.conn.RemoteAddr()
112 }
113
114 func (cn *connection) localAddr() net.Addr {
115         return cn.conn.LocalAddr()
116 }
117
118 func (cn *connection) supportsExtension(ext string) bool {
119         _, ok := cn.PeerExtensionIDs[ext]
120         return ok
121 }
122
123 // The best guess at number of pieces in the torrent for this peer.
124 func (cn *connection) bestPeerNumPieces() int {
125         if cn.t.haveInfo() {
126                 return cn.t.numPieces()
127         }
128         return cn.peerMinPieces
129 }
130
131 func (cn *connection) completedString() string {
132         return fmt.Sprintf("%d/%d", cn.peerPieces.Len(), cn.bestPeerNumPieces())
133 }
134
135 // Correct the PeerPieces slice length. Return false if the existing slice is
136 // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
137 // messages.
138 func (cn *connection) setNumPieces(num int) error {
139         cn.peerPieces.RemoveRange(num, -1)
140         cn.peerPiecesChanged()
141         return nil
142 }
143
144 func eventAgeString(t time.Time) string {
145         if t.IsZero() {
146                 return "never"
147         }
148         return fmt.Sprintf("%.2fs ago", time.Now().Sub(t).Seconds())
149 }
150
151 func (cn *connection) connectionFlags() (ret string) {
152         c := func(b byte) {
153                 ret += string([]byte{b})
154         }
155         if cn.cryptoMethod == mse.CryptoMethodRC4 {
156                 c('E')
157         } else if cn.headerEncrypted {
158                 c('e')
159         }
160         ret += string(cn.Discovery)
161         if cn.uTP {
162                 c('T')
163         }
164         return
165 }
166
167 // Inspired by https://trac.transmissionbt.com/wiki/PeerStatusText
168 func (cn *connection) statusFlags() (ret string) {
169         c := func(b byte) {
170                 ret += string([]byte{b})
171         }
172         if cn.Interested {
173                 c('i')
174         }
175         if cn.Choked {
176                 c('c')
177         }
178         c('-')
179         ret += cn.connectionFlags()
180         c('-')
181         if cn.PeerInterested {
182                 c('i')
183         }
184         if cn.PeerChoked {
185                 c('c')
186         }
187         return
188 }
189
190 func (cn *connection) String() string {
191         var buf bytes.Buffer
192         cn.WriteStatus(&buf, nil)
193         return buf.String()
194 }
195
196 func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
197         // \t isn't preserved in <pre> blocks?
198         fmt.Fprintf(w, "%+q: %s-%s\n", cn.PeerID, cn.localAddr(), cn.remoteAddr())
199         fmt.Fprintf(w, "    last msg: %s, connected: %s, last useful chunk: %s\n",
200                 eventAgeString(cn.lastMessageReceived),
201                 eventAgeString(cn.completedHandshake),
202                 eventAgeString(cn.lastUsefulChunkReceived))
203         fmt.Fprintf(w,
204                 "    %s completed, %d pieces touched, good chunks: %d/%d-%d reqq: %d-%d, flags: %s\n",
205                 cn.completedString(),
206                 len(cn.peerTouchedPieces),
207                 cn.UsefulChunksReceived,
208                 cn.UnwantedChunksReceived+cn.UsefulChunksReceived,
209                 cn.chunksSent,
210                 cn.numLocalRequests(),
211                 len(cn.PeerRequests),
212                 cn.statusFlags(),
213         )
214         fmt.Fprintf(w, "    next pieces: %v\n", priorityBitmapHeadAsSlice(&cn.pieceRequestOrder, 10))
215 }
216
217 func priorityBitmapHeadAsSlice(pb *prioritybitmap.PriorityBitmap, n int) (ret []int) {
218         pb.IterTyped(func(i int) bool {
219                 if len(ret) >= n {
220                         return false
221                 }
222                 ret = append(ret, i)
223                 return true
224         })
225         return
226 }
227
228 func (cn *connection) Close() {
229         if !cn.closed.Set() {
230                 return
231         }
232         cn.discardPieceInclination()
233         cn.pieceRequestOrder.Clear()
234         if cn.conn != nil {
235                 go func() {
236                         // TODO: This call blocks sometimes, why? Maybe it was the Go utp
237                         // implementation?
238                         err := cn.conn.Close()
239                         if err != nil {
240                                 log.Printf("error closing connection net.Conn: %s", err)
241                         }
242                 }()
243         }
244 }
245
246 func (cn *connection) PeerHasPiece(piece int) bool {
247         return cn.peerHasAll || cn.peerPieces.Contains(piece)
248 }
249
250 func (cn *connection) Post(msg pp.Message) {
251         messageTypesPosted.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
252         cn.postedBuffer.Write(msg.MustMarshalBinary())
253         cn.tickleWriter()
254 }
255
256 func (cn *connection) requestMetadataPiece(index int) {
257         eID := cn.PeerExtensionIDs["ut_metadata"]
258         if eID == 0 {
259                 return
260         }
261         if index < len(cn.metadataRequests) && cn.metadataRequests[index] {
262                 return
263         }
264         cn.Post(pp.Message{
265                 Type:       pp.Extended,
266                 ExtendedID: eID,
267                 ExtendedPayload: func() []byte {
268                         b, err := bencode.Marshal(map[string]int{
269                                 "msg_type": pp.RequestMetadataExtensionMsgType,
270                                 "piece":    index,
271                         })
272                         if err != nil {
273                                 panic(err)
274                         }
275                         return b
276                 }(),
277         })
278         for index >= len(cn.metadataRequests) {
279                 cn.metadataRequests = append(cn.metadataRequests, false)
280         }
281         cn.metadataRequests[index] = true
282 }
283
284 func (cn *connection) requestedMetadataPiece(index int) bool {
285         return index < len(cn.metadataRequests) && cn.metadataRequests[index]
286 }
287
288 // The actual value to use as the maximum outbound requests.
289 func (cn *connection) nominalMaxRequests() (ret int) {
290         ret = cn.PeerMaxRequests
291         if ret > 64 {
292                 ret = 64
293         }
294         return
295 }
296
297 // Returns true if an unsatisfied request was canceled.
298 func (cn *connection) PeerCancel(r request) bool {
299         if cn.PeerRequests == nil {
300                 return false
301         }
302         if _, ok := cn.PeerRequests[r]; !ok {
303                 return false
304         }
305         delete(cn.PeerRequests, r)
306         return true
307 }
308
309 func (cn *connection) Choke(msg func(pp.Message) bool) bool {
310         if cn.Choked {
311                 return true
312         }
313         cn.PeerRequests = nil
314         cn.Choked = true
315         return msg(pp.Message{
316                 Type: pp.Choke,
317         })
318 }
319
320 func (cn *connection) Unchoke(msg func(pp.Message) bool) bool {
321         if !cn.Choked {
322                 return true
323         }
324         cn.Choked = false
325         return msg(pp.Message{
326                 Type: pp.Unchoke,
327         })
328 }
329
330 func (cn *connection) SetInterested(interested bool, msg func(pp.Message) bool) bool {
331         if cn.Interested == interested {
332                 return true
333         }
334         cn.Interested = interested
335         // log.Printf("%p: setting interest: %v", cn, interested)
336         return msg(pp.Message{
337                 Type: func() pp.MessageType {
338                         if interested {
339                                 return pp.Interested
340                         } else {
341                                 return pp.NotInterested
342                         }
343                 }(),
344         })
345 }
346
347 func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) {
348         numFillBuffers.Add(1)
349         cancel, new, i := cn.desiredRequestState()
350         if !cn.SetInterested(i, msg) {
351                 return
352         }
353         if cancel && len(cn.requests) != 0 {
354                 fillBufferSentCancels.Add(1)
355                 for r := range cn.requests {
356                         cn.deleteRequest(r)
357                         // log.Printf("%p: cancelling request: %v", cn, r)
358                         if !msg(pp.Message{
359                                 Type:   pp.Cancel,
360                                 Index:  r.Index,
361                                 Begin:  r.Begin,
362                                 Length: r.Length,
363                         }) {
364                                 return
365                         }
366                 }
367         }
368         if len(new) != 0 {
369                 fillBufferSentRequests.Add(1)
370                 for _, r := range new {
371                         if cn.requests == nil {
372                                 cn.requests = make(map[request]struct{}, cn.nominalMaxRequests())
373                         }
374                         cn.requests[r] = struct{}{}
375                         // log.Printf("%p: requesting %v", cn, r)
376                         if !msg(pp.Message{
377                                 Type:   pp.Request,
378                                 Index:  r.Index,
379                                 Begin:  r.Begin,
380                                 Length: r.Length,
381                         }) {
382                                 return
383                         }
384                 }
385                 // If we didn't completely top up the requests, we shouldn't mark the
386                 // low water, since we'll want to top up the requests as soon as we
387                 // have more write buffer space.
388                 cn.requestsLowWater = len(cn.requests) / 2
389         }
390         cn.upload(msg)
391 }
392
393 // Writes buffers to the socket from the write channel.
394 func (cn *connection) writer(keepAliveTimeout time.Duration) {
395         var (
396                 buf       bytes.Buffer
397                 lastWrite time.Time = time.Now()
398         )
399         var keepAliveTimer *time.Timer
400         keepAliveTimer = time.AfterFunc(keepAliveTimeout, func() {
401                 cn.mu().Lock()
402                 defer cn.mu().Unlock()
403                 if time.Since(lastWrite) >= keepAliveTimeout {
404                         cn.tickleWriter()
405                 }
406                 keepAliveTimer.Reset(keepAliveTimeout)
407         })
408         cn.mu().Lock()
409         defer cn.mu().Unlock()
410         defer cn.Close()
411         defer keepAliveTimer.Stop()
412         for {
413                 buf.Write(cn.postedBuffer.Bytes())
414                 cn.postedBuffer.Reset()
415                 if buf.Len() == 0 {
416                         cn.fillWriteBuffer(func(msg pp.Message) bool {
417                                 cn.wroteMsg(&msg)
418                                 buf.Write(msg.MustMarshalBinary())
419                                 return buf.Len() < 1<<16
420                         })
421                 }
422                 if buf.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout {
423                         buf.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
424                         postedKeepalives.Add(1)
425                 }
426                 if buf.Len() == 0 {
427                         cn.writerCond.Wait()
428                         continue
429                 }
430                 cn.mu().Unlock()
431                 // log.Printf("writing %d bytes", buf.Len())
432                 n, err := cn.w.Write(buf.Bytes())
433                 cn.mu().Lock()
434                 if n != 0 {
435                         lastWrite = time.Now()
436                         keepAliveTimer.Reset(keepAliveTimeout)
437                 }
438                 if err != nil {
439                         return
440                 }
441                 if n != buf.Len() {
442                         panic("short write")
443                 }
444                 buf.Reset()
445         }
446 }
447
448 func (cn *connection) Have(piece int) {
449         for piece >= len(cn.sentHaves) {
450                 cn.sentHaves = append(cn.sentHaves, false)
451         }
452         if cn.sentHaves[piece] {
453                 return
454         }
455         cn.Post(pp.Message{
456                 Type:  pp.Have,
457                 Index: pp.Integer(piece),
458         })
459         cn.sentHaves[piece] = true
460 }
461
462 func (cn *connection) Bitfield(haves []bool) {
463         if cn.sentHaves != nil {
464                 panic("bitfield must be first have-related message sent")
465         }
466         cn.Post(pp.Message{
467                 Type:     pp.Bitfield,
468                 Bitfield: haves,
469         })
470         // Make a copy of haves, as that's read when the message is marshalled
471         // without the lock. Also it obviously shouldn't change in the Msg due to
472         // changes in .sentHaves.
473         cn.sentHaves = append([]bool(nil), haves...)
474 }
475
476 func nextRequestState(
477         networkingEnabled bool,
478         currentRequests map[request]struct{},
479         peerChoking bool,
480         nextPieces *prioritybitmap.PriorityBitmap,
481         pendingChunks func(piece int, f func(chunkSpec) bool) bool,
482         requestsLowWater int,
483         requestsHighWater int,
484 ) (
485         cancelExisting bool,
486         newRequests []request,
487         interested bool,
488 ) {
489         if !networkingEnabled || nextPieces.IsEmpty() {
490                 return true, nil, false
491         }
492         if peerChoking || len(currentRequests) > requestsLowWater {
493                 return false, nil, !nextPieces.IsEmpty()
494         }
495         nextPieces.IterTyped(func(piece int) bool {
496                 return pendingChunks(piece, func(cs chunkSpec) bool {
497                         r := request{pp.Integer(piece), cs}
498                         if _, ok := currentRequests[r]; !ok {
499                                 if newRequests == nil {
500                                         newRequests = make([]request, 0, requestsHighWater-len(currentRequests))
501                                 }
502                                 newRequests = append(newRequests, r)
503                         }
504                         return len(currentRequests)+len(newRequests) < requestsHighWater
505                 })
506         })
507         return false, newRequests, true
508 }
509
510 func (cn *connection) updateRequests() {
511         cn.tickleWriter()
512 }
513
514 func (cn *connection) desiredRequestState() (bool, []request, bool) {
515         return nextRequestState(
516                 cn.t.networkingEnabled,
517                 cn.requests,
518                 cn.PeerChoked,
519                 &cn.pieceRequestOrder,
520                 func(piece int, f func(chunkSpec) bool) bool {
521                         return undirtiedChunks(piece, cn.t, f)
522                 },
523                 cn.requestsLowWater,
524                 cn.nominalMaxRequests(),
525         )
526 }
527
528 func undirtiedChunks(piece int, t *Torrent, f func(chunkSpec) bool) bool {
529         chunkIndices := t.pieces[piece].undirtiedChunkIndices().ToSortedSlice()
530         return iter.ForPerm(len(chunkIndices), func(i int) bool {
531                 return f(t.chunkIndexSpec(chunkIndices[i], piece))
532         })
533 }
534
535 // check callers updaterequests
536 func (cn *connection) stopRequestingPiece(piece int) bool {
537         return cn.pieceRequestOrder.Remove(piece)
538 }
539
540 // This is distinct from Torrent piece priority, which is the user's
541 // preference. Connection piece priority is specific to a connection,
542 // pseudorandomly avoids connections always requesting the same pieces and
543 // thus wasting effort.
544 func (cn *connection) updatePiecePriority(piece int) bool {
545         tpp := cn.t.piecePriority(piece)
546         if !cn.PeerHasPiece(piece) {
547                 tpp = PiecePriorityNone
548         }
549         if tpp == PiecePriorityNone {
550                 return cn.stopRequestingPiece(piece)
551         }
552         prio := cn.getPieceInclination()[piece]
553         switch tpp {
554         case PiecePriorityNormal:
555         case PiecePriorityReadahead:
556                 prio -= cn.t.numPieces()
557         case PiecePriorityNext, PiecePriorityNow:
558                 prio -= 2 * cn.t.numPieces()
559         default:
560                 panic(tpp)
561         }
562         prio += piece / 3
563         return cn.pieceRequestOrder.Set(piece, prio)
564 }
565
566 func (cn *connection) getPieceInclination() []int {
567         if cn.pieceInclination == nil {
568                 cn.pieceInclination = cn.t.getConnPieceInclination()
569         }
570         return cn.pieceInclination
571 }
572
573 func (cn *connection) discardPieceInclination() {
574         if cn.pieceInclination == nil {
575                 return
576         }
577         cn.t.putPieceInclination(cn.pieceInclination)
578         cn.pieceInclination = nil
579 }
580
581 func (cn *connection) peerPiecesChanged() {
582         if cn.t.haveInfo() {
583                 prioritiesChanged := false
584                 for i := range iter.N(cn.t.numPieces()) {
585                         if cn.updatePiecePriority(i) {
586                                 prioritiesChanged = true
587                         }
588                 }
589                 if prioritiesChanged {
590                         cn.updateRequests()
591                 }
592         }
593 }
594
595 func (cn *connection) raisePeerMinPieces(newMin int) {
596         if newMin > cn.peerMinPieces {
597                 cn.peerMinPieces = newMin
598         }
599 }
600
601 func (cn *connection) peerSentHave(piece int) error {
602         if cn.t.haveInfo() && piece >= cn.t.numPieces() {
603                 return errors.New("invalid piece")
604         }
605         if cn.PeerHasPiece(piece) {
606                 return nil
607         }
608         cn.raisePeerMinPieces(piece + 1)
609         cn.peerPieces.Set(piece, true)
610         if cn.updatePiecePriority(piece) {
611                 cn.updateRequests()
612         }
613         return nil
614 }
615
616 func (cn *connection) peerSentBitfield(bf []bool) error {
617         cn.peerHasAll = false
618         if len(bf)%8 != 0 {
619                 panic("expected bitfield length divisible by 8")
620         }
621         // We know that the last byte means that at most the last 7 bits are
622         // wasted.
623         cn.raisePeerMinPieces(len(bf) - 7)
624         if cn.t.haveInfo() && len(bf) > cn.t.numPieces() {
625                 // Ignore known excess pieces.
626                 bf = bf[:cn.t.numPieces()]
627         }
628         for i, have := range bf {
629                 if have {
630                         cn.raisePeerMinPieces(i + 1)
631                 }
632                 cn.peerPieces.Set(i, have)
633         }
634         cn.peerPiecesChanged()
635         return nil
636 }
637
638 func (cn *connection) peerSentHaveAll() error {
639         cn.peerHasAll = true
640         cn.peerPieces.Clear()
641         cn.peerPiecesChanged()
642         return nil
643 }
644
645 func (cn *connection) peerSentHaveNone() error {
646         cn.peerPieces.Clear()
647         cn.peerHasAll = false
648         cn.peerPiecesChanged()
649         return nil
650 }
651
652 func (c *connection) requestPendingMetadata() {
653         if c.t.haveInfo() {
654                 return
655         }
656         if c.PeerExtensionIDs["ut_metadata"] == 0 {
657                 // Peer doesn't support this.
658                 return
659         }
660         // Request metadata pieces that we don't have in a random order.
661         var pending []int
662         for index := 0; index < c.t.metadataPieceCount(); index++ {
663                 if !c.t.haveMetadataPiece(index) && !c.requestedMetadataPiece(index) {
664                         pending = append(pending, index)
665                 }
666         }
667         for _, i := range rand.Perm(len(pending)) {
668                 c.requestMetadataPiece(pending[i])
669         }
670 }
671
672 func (cn *connection) wroteMsg(msg *pp.Message) {
673         messageTypesSent.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
674         cn.stats.wroteMsg(msg)
675         cn.t.stats.wroteMsg(msg)
676 }
677
678 func (cn *connection) readMsg(msg *pp.Message) {
679         cn.stats.readMsg(msg)
680         cn.t.stats.readMsg(msg)
681 }
682
683 func (cn *connection) wroteBytes(n int64) {
684         cn.stats.wroteBytes(n)
685         if cn.t != nil {
686                 cn.t.stats.wroteBytes(n)
687         }
688 }
689
690 func (cn *connection) readBytes(n int64) {
691         cn.stats.readBytes(n)
692         if cn.t != nil {
693                 cn.t.stats.readBytes(n)
694         }
695 }
696
697 // Returns whether the connection is currently useful to us. We're seeding and
698 // they want data, we don't have metainfo and they can provide it, etc.
699 func (c *connection) useful() bool {
700         t := c.t
701         if c.closed.IsSet() {
702                 return false
703         }
704         if !t.haveInfo() {
705                 return c.supportsExtension("ut_metadata")
706         }
707         if t.seeding() {
708                 return c.PeerInterested
709         }
710         return c.peerHasWantedPieces()
711 }
712
713 func (c *connection) lastHelpful() (ret time.Time) {
714         ret = c.lastUsefulChunkReceived
715         if c.t.seeding() && c.lastChunkSent.After(ret) {
716                 ret = c.lastChunkSent
717         }
718         return
719 }
720
721 // Processes incoming bittorrent messages. The client lock is held upon entry
722 // and exit. Returning will end the connection.
723 func (c *connection) mainReadLoop() error {
724         t := c.t
725         cl := t.cl
726
727         decoder := pp.Decoder{
728                 R:         bufio.NewReader(c.r),
729                 MaxLength: 256 * 1024,
730                 Pool:      t.chunkPool,
731         }
732         for {
733                 var (
734                         msg pp.Message
735                         err error
736                 )
737                 func() {
738                         cl.mu.Unlock()
739                         defer cl.mu.Lock()
740                         err = decoder.Decode(&msg)
741                 }()
742                 if cl.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
743                         return nil
744                 }
745                 if err != nil {
746                         return err
747                 }
748                 c.readMsg(&msg)
749                 c.lastMessageReceived = time.Now()
750                 if msg.Keepalive {
751                         receivedKeepalives.Add(1)
752                         continue
753                 }
754                 messageTypesReceived.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
755                 switch msg.Type {
756                 case pp.Choke:
757                         c.PeerChoked = true
758                         c.requests = nil
759                         // We can then reset our interest.
760                         c.updateRequests()
761                 case pp.Reject:
762                         if c.deleteRequest(newRequest(msg.Index, msg.Begin, msg.Length)) {
763                                 c.updateRequests()
764                         }
765                 case pp.Unchoke:
766                         c.PeerChoked = false
767                         c.tickleWriter()
768                 case pp.Interested:
769                         c.PeerInterested = true
770                         c.tickleWriter()
771                 case pp.NotInterested:
772                         c.PeerInterested = false
773                         c.PeerRequests = nil
774                 case pp.Have:
775                         err = c.peerSentHave(int(msg.Index))
776                 case pp.Request:
777                         if c.Choked {
778                                 break
779                         }
780                         if len(c.PeerRequests) >= maxRequests {
781                                 break
782                         }
783                         if c.PeerRequests == nil {
784                                 c.PeerRequests = make(map[request]struct{}, maxRequests)
785                         }
786                         c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{}
787                         c.tickleWriter()
788                 case pp.Cancel:
789                         req := newRequest(msg.Index, msg.Begin, msg.Length)
790                         if !c.PeerCancel(req) {
791                                 unexpectedCancels.Add(1)
792                         }
793                 case pp.Bitfield:
794                         err = c.peerSentBitfield(msg.Bitfield)
795                 case pp.HaveAll:
796                         err = c.peerSentHaveAll()
797                 case pp.HaveNone:
798                         err = c.peerSentHaveNone()
799                 case pp.Piece:
800                         c.receiveChunk(&msg)
801                         if len(msg.Piece) == int(t.chunkSize) {
802                                 t.chunkPool.Put(msg.Piece)
803                         }
804                 case pp.Extended:
805                         switch msg.ExtendedID {
806                         case pp.HandshakeExtendedID:
807                                 // TODO: Create a bencode struct for this.
808                                 var d map[string]interface{}
809                                 err = bencode.Unmarshal(msg.ExtendedPayload, &d)
810                                 if err != nil {
811                                         err = fmt.Errorf("error decoding extended message payload: %s", err)
812                                         break
813                                 }
814                                 // log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d)
815                                 if reqq, ok := d["reqq"]; ok {
816                                         if i, ok := reqq.(int64); ok {
817                                                 c.PeerMaxRequests = int(i)
818                                         }
819                                 }
820                                 if v, ok := d["v"]; ok {
821                                         c.PeerClientName = v.(string)
822                                 }
823                                 m, ok := d["m"]
824                                 if !ok {
825                                         err = errors.New("handshake missing m item")
826                                         break
827                                 }
828                                 mTyped, ok := m.(map[string]interface{})
829                                 if !ok {
830                                         err = errors.New("handshake m value is not dict")
831                                         break
832                                 }
833                                 if c.PeerExtensionIDs == nil {
834                                         c.PeerExtensionIDs = make(map[string]byte, len(mTyped))
835                                 }
836                                 for name, v := range mTyped {
837                                         id, ok := v.(int64)
838                                         if !ok {
839                                                 log.Printf("bad handshake m item extension ID type: %T", v)
840                                                 continue
841                                         }
842                                         if id == 0 {
843                                                 delete(c.PeerExtensionIDs, name)
844                                         } else {
845                                                 if c.PeerExtensionIDs[name] == 0 {
846                                                         supportedExtensionMessages.Add(name, 1)
847                                                 }
848                                                 c.PeerExtensionIDs[name] = byte(id)
849                                         }
850                                 }
851                                 metadata_sizeUntyped, ok := d["metadata_size"]
852                                 if ok {
853                                         metadata_size, ok := metadata_sizeUntyped.(int64)
854                                         if !ok {
855                                                 log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
856                                         } else {
857                                                 err = t.setMetadataSize(metadata_size)
858                                                 if err != nil {
859                                                         err = fmt.Errorf("error setting metadata size to %d", metadata_size)
860                                                         break
861                                                 }
862                                         }
863                                 }
864                                 if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
865                                         c.requestPendingMetadata()
866                                 }
867                         case metadataExtendedId:
868                                 err = cl.gotMetadataExtensionMsg(msg.ExtendedPayload, t, c)
869                                 if err != nil {
870                                         err = fmt.Errorf("error handling metadata extension message: %s", err)
871                                 }
872                         case pexExtendedId:
873                                 if cl.config.DisablePEX {
874                                         break
875                                 }
876                                 var pexMsg peerExchangeMessage
877                                 err = bencode.Unmarshal(msg.ExtendedPayload, &pexMsg)
878                                 if err != nil {
879                                         err = fmt.Errorf("error unmarshalling PEX message: %s", err)
880                                         break
881                                 }
882                                 go func() {
883                                         cl.mu.Lock()
884                                         t.addPeers(func() (ret []Peer) {
885                                                 for i, cp := range pexMsg.Added {
886                                                         p := Peer{
887                                                                 IP:     make([]byte, 4),
888                                                                 Port:   cp.Port,
889                                                                 Source: peerSourcePEX,
890                                                         }
891                                                         if i < len(pexMsg.AddedFlags) && pexMsg.AddedFlags[i]&0x01 != 0 {
892                                                                 p.SupportsEncryption = true
893                                                         }
894                                                         missinggo.CopyExact(p.IP, cp.IP[:])
895                                                         ret = append(ret, p)
896                                                 }
897                                                 return
898                                         }())
899                                         cl.mu.Unlock()
900                                 }()
901                         default:
902                                 err = fmt.Errorf("unexpected extended message ID: %v", msg.ExtendedID)
903                         }
904                         if err != nil {
905                                 // That client uses its own extension IDs for outgoing message
906                                 // types, which is incorrect.
907                                 if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) ||
908                                         strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
909                                         return nil
910                                 }
911                         }
912                 case pp.Port:
913                         if cl.dHT == nil {
914                                 break
915                         }
916                         pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
917                         if err != nil {
918                                 panic(err)
919                         }
920                         if msg.Port != 0 {
921                                 pingAddr.Port = int(msg.Port)
922                         }
923                         go cl.dHT.Ping(pingAddr, nil)
924                 default:
925                         err = fmt.Errorf("received unknown message type: %#v", msg.Type)
926                 }
927                 if err != nil {
928                         return err
929                 }
930         }
931 }
932
933 // Set both the Reader and Writer for the connection from a single ReadWriter.
934 func (cn *connection) setRW(rw io.ReadWriter) {
935         cn.r = rw
936         cn.w = rw
937 }
938
939 // Returns the Reader and Writer as a combined ReadWriter.
940 func (cn *connection) rw() io.ReadWriter {
941         return struct {
942                 io.Reader
943                 io.Writer
944         }{cn.r, cn.w}
945 }
946
947 // Handle a received chunk from a peer.
948 func (c *connection) receiveChunk(msg *pp.Message) {
949         t := c.t
950         cl := t.cl
951         chunksReceived.Add(1)
952
953         req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
954
955         // Request has been satisfied.
956         if c.deleteRequest(req) {
957                 c.updateRequests()
958         } else {
959                 unexpectedChunksReceived.Add(1)
960         }
961
962         // Do we actually want this chunk?
963         if !t.wantPiece(req) {
964                 unwantedChunksReceived.Add(1)
965                 c.UnwantedChunksReceived++
966                 return
967         }
968
969         index := int(req.Index)
970         piece := &t.pieces[index]
971
972         c.UsefulChunksReceived++
973         c.lastUsefulChunkReceived = time.Now()
974
975         // Need to record that it hasn't been written yet, before we attempt to do
976         // anything with it.
977         piece.incrementPendingWrites()
978         // Record that we have the chunk.
979         piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
980
981         // Cancel pending requests for this chunk.
982         for c := range t.conns {
983                 c.updateRequests()
984         }
985
986         cl.mu.Unlock()
987         // Write the chunk out. Note that the upper bound on chunk writing
988         // concurrency will be the number of connections.
989         err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
990         cl.mu.Lock()
991
992         piece.decrementPendingWrites()
993
994         if err != nil {
995                 log.Printf("%s (%x): error writing chunk %v: %s", t, t.infoHash, req, err)
996                 t.pendRequest(req)
997                 t.updatePieceCompletion(int(msg.Index))
998                 return
999         }
1000
1001         // It's important that the piece is potentially queued before we check if
1002         // the piece is still wanted, because if it is queued, it won't be wanted.
1003         if t.pieceAllDirty(index) {
1004                 t.queuePieceCheck(int(req.Index))
1005         }
1006
1007         if c.peerTouchedPieces == nil {
1008                 c.peerTouchedPieces = make(map[int]struct{})
1009         }
1010         c.peerTouchedPieces[index] = struct{}{}
1011
1012         cl.event.Broadcast()
1013         t.publishPieceChange(int(req.Index))
1014         return
1015 }
1016
1017 // Also handles choking and unchoking of the remote peer.
1018 func (c *connection) upload(msg func(pp.Message) bool) bool {
1019         t := c.t
1020         cl := t.cl
1021         if cl.config.NoUpload {
1022                 return true
1023         }
1024         if !c.PeerInterested {
1025                 return true
1026         }
1027         seeding := t.seeding()
1028         if !seeding && !c.peerHasWantedPieces() {
1029                 // There's no reason to upload to this peer.
1030                 return true
1031         }
1032         // Breaking or completing this loop means we don't want to upload to the
1033         // peer anymore, and we choke them.
1034 another:
1035         for seeding || c.chunksSent < c.UsefulChunksReceived+6 {
1036                 // We want to upload to the peer.
1037                 if !c.Unchoke(msg) {
1038                         return false
1039                 }
1040                 for r := range c.PeerRequests {
1041                         res := cl.uploadLimit.ReserveN(time.Now(), int(r.Length))
1042                         if !res.OK() {
1043                                 panic(res)
1044                         }
1045                         delay := res.Delay()
1046                         if delay > 0 {
1047                                 res.Cancel()
1048                                 if c.uploadTimer == nil {
1049                                         c.uploadTimer = time.AfterFunc(delay, c.writerCond.Broadcast)
1050                                 } else {
1051                                         c.uploadTimer.Reset(delay)
1052                                 }
1053                                 // Hard to say what to return here.
1054                                 return true
1055                         }
1056                         more, err := cl.sendChunk(t, c, r, msg)
1057                         if err != nil {
1058                                 i := int(r.Index)
1059                                 if t.pieceComplete(i) {
1060                                         t.updatePieceCompletion(i)
1061                                         if !t.pieceComplete(i) {
1062                                                 // We had the piece, but not anymore.
1063                                                 break another
1064                                         }
1065                                 }
1066                                 log.Printf("error sending chunk %+v to peer: %s", r, err)
1067                                 // If we failed to send a chunk, choke the peer to ensure they
1068                                 // flush all their requests. We've probably dropped a piece,
1069                                 // but there's no way to communicate this to the peer. If they
1070                                 // ask for it again, we'll kick them to allow us to send them
1071                                 // an updated bitfield.
1072                                 break another
1073                         }
1074                         delete(c.PeerRequests, r)
1075                         if !more {
1076                                 return false
1077                         }
1078                         goto another
1079                 }
1080                 return true
1081         }
1082         return c.Choke(msg)
1083 }
1084
1085 func (cn *connection) Drop() {
1086         cn.t.dropConnection(cn)
1087 }
1088
1089 func (cn *connection) netGoodPiecesDirtied() int {
1090         return cn.goodPiecesDirtied - cn.badPiecesDirtied
1091 }
1092
1093 func (c *connection) peerHasWantedPieces() bool {
1094         return !c.pieceRequestOrder.IsEmpty()
1095 }
1096
1097 func (c *connection) numLocalRequests() int {
1098         return len(c.requests)
1099 }
1100
1101 func (c *connection) deleteRequest(r request) bool {
1102         if _, ok := c.requests[r]; !ok {
1103                 return false
1104         }
1105         delete(c.requests, r)
1106         return true
1107 }
1108 func (c *connection) tickleWriter() {
1109         c.writerCond.Broadcast()
1110 }