]> Sergey Matveev's repositories - btrtrc.git/blob - connection.go
Make Torrent.pendingPieces a priority bitmap in preparation for #220
[btrtrc.git] / connection.go
1 package torrent
2
3 import (
4         "bufio"
5         "bytes"
6         "errors"
7         "fmt"
8         "io"
9         "log"
10         "math/rand"
11         "net"
12         "strconv"
13         "strings"
14         "sync"
15         "time"
16
17         "github.com/anacrolix/missinggo"
18         "github.com/anacrolix/missinggo/bitmap"
19         "github.com/anacrolix/missinggo/iter"
20         "github.com/anacrolix/missinggo/prioritybitmap"
21
22         "github.com/anacrolix/torrent/bencode"
23         "github.com/anacrolix/torrent/mse"
24         pp "github.com/anacrolix/torrent/peer_protocol"
25 )
26
27 type peerSource string
28
29 const (
30         peerSourceTracker         = "T" // It's the default.
31         peerSourceIncoming        = "I"
32         peerSourceDHTGetPeers     = "Hg"
33         peerSourceDHTAnnouncePeer = "Ha"
34         peerSourcePEX             = "X"
35 )
36
37 // Maintains the state of a connection with a peer.
38 type connection struct {
39         t *Torrent
40         // The actual Conn, used for closing, and setting socket options.
41         conn net.Conn
42         // The Reader and Writer for this Conn, with hooks installed for stats,
43         // limiting, deadlines etc.
44         w io.Writer
45         r io.Reader
46         // True if the connection is operating over MSE obfuscation.
47         headerEncrypted bool
48         cryptoMethod    uint32
49         Discovery       peerSource
50         uTP             bool
51         closed          missinggo.Event
52
53         stats                  ConnStats
54         UnwantedChunksReceived int
55         UsefulChunksReceived   int
56         chunksSent             int
57         goodPiecesDirtied      int
58         badPiecesDirtied       int
59
60         lastMessageReceived     time.Time
61         completedHandshake      time.Time
62         lastUsefulChunkReceived time.Time
63         lastChunkSent           time.Time
64
65         // Stuff controlled by the local peer.
66         Interested       bool
67         Choked           bool
68         requests         map[request]struct{}
69         requestsLowWater int
70         // Indexed by metadata piece, set to true if posted and pending a
71         // response.
72         metadataRequests []bool
73         sentHaves        []bool
74
75         // Stuff controlled by the remote peer.
76         PeerID             PeerID
77         PeerInterested     bool
78         PeerChoked         bool
79         PeerRequests       map[request]struct{}
80         PeerExtensionBytes peerExtensionBytes
81         // The pieces the peer has claimed to have.
82         peerPieces bitmap.Bitmap
83         // The peer has everything. This can occur due to a special message, when
84         // we may not even know the number of pieces in the torrent yet.
85         peerHasAll bool
86         // The highest possible number of pieces the torrent could have based on
87         // communication with the peer. Generally only useful until we have the
88         // torrent info.
89         peerMinPieces int
90         // Pieces we've accepted chunks for from the peer.
91         peerTouchedPieces map[int]struct{}
92
93         PeerMaxRequests  int // Maximum pending requests the peer allows.
94         PeerExtensionIDs map[string]byte
95         PeerClientName   string
96
97         pieceInclination  []int
98         pieceRequestOrder prioritybitmap.PriorityBitmap
99
100         postedBuffer bytes.Buffer
101         uploadTimer  *time.Timer
102         writerCond   sync.Cond
103 }
104
105 func (cn *connection) mu() sync.Locker {
106         return &cn.t.cl.mu
107 }
108
109 func (cn *connection) remoteAddr() net.Addr {
110         return cn.conn.RemoteAddr()
111 }
112
113 func (cn *connection) localAddr() net.Addr {
114         return cn.conn.LocalAddr()
115 }
116
117 func (cn *connection) supportsExtension(ext string) bool {
118         _, ok := cn.PeerExtensionIDs[ext]
119         return ok
120 }
121
122 // The best guess at number of pieces in the torrent for this peer.
123 func (cn *connection) bestPeerNumPieces() int {
124         if cn.t.haveInfo() {
125                 return cn.t.numPieces()
126         }
127         return cn.peerMinPieces
128 }
129
130 func (cn *connection) completedString() string {
131         return fmt.Sprintf("%d/%d", cn.peerPieces.Len(), cn.bestPeerNumPieces())
132 }
133
134 // Correct the PeerPieces slice length. Return false if the existing slice is
135 // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
136 // messages.
137 func (cn *connection) setNumPieces(num int) error {
138         cn.peerPieces.RemoveRange(num, -1)
139         cn.peerPiecesChanged()
140         return nil
141 }
142
143 func eventAgeString(t time.Time) string {
144         if t.IsZero() {
145                 return "never"
146         }
147         return fmt.Sprintf("%.2fs ago", time.Since(t).Seconds())
148 }
149
150 func (cn *connection) connectionFlags() (ret string) {
151         c := func(b byte) {
152                 ret += string([]byte{b})
153         }
154         if cn.cryptoMethod == mse.CryptoMethodRC4 {
155                 c('E')
156         } else if cn.headerEncrypted {
157                 c('e')
158         }
159         ret += string(cn.Discovery)
160         if cn.uTP {
161                 c('T')
162         }
163         return
164 }
165
166 // Inspired by https://trac.transmissionbt.com/wiki/PeerStatusText
167 func (cn *connection) statusFlags() (ret string) {
168         c := func(b byte) {
169                 ret += string([]byte{b})
170         }
171         if cn.Interested {
172                 c('i')
173         }
174         if cn.Choked {
175                 c('c')
176         }
177         c('-')
178         ret += cn.connectionFlags()
179         c('-')
180         if cn.PeerInterested {
181                 c('i')
182         }
183         if cn.PeerChoked {
184                 c('c')
185         }
186         return
187 }
188
189 func (cn *connection) String() string {
190         var buf bytes.Buffer
191         cn.WriteStatus(&buf, nil)
192         return buf.String()
193 }
194
195 func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
196         // \t isn't preserved in <pre> blocks?
197         fmt.Fprintf(w, "%-40s: %s-%s\n", cn.PeerID, cn.localAddr(), cn.remoteAddr())
198         fmt.Fprintf(w, "    last msg: %s, connected: %s, last useful chunk: %s\n",
199                 eventAgeString(cn.lastMessageReceived),
200                 eventAgeString(cn.completedHandshake),
201                 eventAgeString(cn.lastUsefulChunkReceived))
202         fmt.Fprintf(w,
203                 "    %s completed, %d pieces touched, good chunks: %d/%d-%d reqq: %d-%d, flags: %s\n",
204                 cn.completedString(),
205                 len(cn.peerTouchedPieces),
206                 cn.UsefulChunksReceived,
207                 cn.UnwantedChunksReceived+cn.UsefulChunksReceived,
208                 cn.chunksSent,
209                 cn.numLocalRequests(),
210                 len(cn.PeerRequests),
211                 cn.statusFlags(),
212         )
213         roi := cn.pieceRequestOrderIter()
214         fmt.Fprintf(w, "    next pieces: %v%s\n",
215                 iter.ToSlice(iter.Head(10, roi)),
216                 func() string {
217                         if cn.shouldRequestWithoutBias() {
218                                 return " (fastest)"
219                         } else {
220                                 return ""
221                         }
222                 }())
223 }
224
225 func (cn *connection) Close() {
226         if !cn.closed.Set() {
227                 return
228         }
229         cn.discardPieceInclination()
230         cn.pieceRequestOrder.Clear()
231         if cn.conn != nil {
232                 go cn.conn.Close()
233         }
234 }
235
236 func (cn *connection) PeerHasPiece(piece int) bool {
237         return cn.peerHasAll || cn.peerPieces.Contains(piece)
238 }
239
240 func (cn *connection) Post(msg pp.Message) {
241         messageTypesPosted.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
242         cn.postedBuffer.Write(msg.MustMarshalBinary())
243         cn.tickleWriter()
244 }
245
246 func (cn *connection) requestMetadataPiece(index int) {
247         eID := cn.PeerExtensionIDs["ut_metadata"]
248         if eID == 0 {
249                 return
250         }
251         if index < len(cn.metadataRequests) && cn.metadataRequests[index] {
252                 return
253         }
254         cn.Post(pp.Message{
255                 Type:       pp.Extended,
256                 ExtendedID: eID,
257                 ExtendedPayload: func() []byte {
258                         b, err := bencode.Marshal(map[string]int{
259                                 "msg_type": pp.RequestMetadataExtensionMsgType,
260                                 "piece":    index,
261                         })
262                         if err != nil {
263                                 panic(err)
264                         }
265                         return b
266                 }(),
267         })
268         for index >= len(cn.metadataRequests) {
269                 cn.metadataRequests = append(cn.metadataRequests, false)
270         }
271         cn.metadataRequests[index] = true
272 }
273
274 func (cn *connection) requestedMetadataPiece(index int) bool {
275         return index < len(cn.metadataRequests) && cn.metadataRequests[index]
276 }
277
278 // The actual value to use as the maximum outbound requests.
279 func (cn *connection) nominalMaxRequests() (ret int) {
280         ret = cn.PeerMaxRequests
281         if ret > 64 {
282                 ret = 64
283         }
284         return
285 }
286
287 // Returns true if an unsatisfied request was canceled.
288 func (cn *connection) PeerCancel(r request) bool {
289         if cn.PeerRequests == nil {
290                 return false
291         }
292         if _, ok := cn.PeerRequests[r]; !ok {
293                 return false
294         }
295         delete(cn.PeerRequests, r)
296         return true
297 }
298
299 func (cn *connection) Choke(msg func(pp.Message) bool) bool {
300         if cn.Choked {
301                 return true
302         }
303         cn.PeerRequests = nil
304         cn.Choked = true
305         return msg(pp.Message{
306                 Type: pp.Choke,
307         })
308 }
309
310 func (cn *connection) Unchoke(msg func(pp.Message) bool) bool {
311         if !cn.Choked {
312                 return true
313         }
314         cn.Choked = false
315         return msg(pp.Message{
316                 Type: pp.Unchoke,
317         })
318 }
319
320 func (cn *connection) SetInterested(interested bool, msg func(pp.Message) bool) bool {
321         if cn.Interested == interested {
322                 return true
323         }
324         cn.Interested = interested
325         // log.Printf("%p: setting interest: %v", cn, interested)
326         return msg(pp.Message{
327                 Type: func() pp.MessageType {
328                         if interested {
329                                 return pp.Interested
330                         } else {
331                                 return pp.NotInterested
332                         }
333                 }(),
334         })
335 }
336
337 // The function takes a message to be sent, and returns true if more messages
338 // are okay.
339 type messageWriter func(pp.Message) bool
340
341 func (cn *connection) request(r request, mw messageWriter) bool {
342         if cn.requests == nil {
343                 cn.requests = make(map[request]struct{}, cn.nominalMaxRequests())
344         }
345         if _, ok := cn.requests[r]; ok {
346                 panic("chunk already requested")
347         }
348         if !cn.PeerHasPiece(r.Index.Int()) {
349                 panic("requesting piece peer doesn't have")
350         }
351         cn.requests[r] = struct{}{}
352         return mw(pp.Message{
353                 Type:   pp.Request,
354                 Index:  r.Index,
355                 Begin:  r.Begin,
356                 Length: r.Length,
357         })
358 }
359
360 func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) {
361         numFillBuffers.Add(1)
362         cancel, new, i := cn.desiredRequestState()
363         if !cn.SetInterested(i, msg) {
364                 return
365         }
366         if cancel && len(cn.requests) != 0 {
367                 fillBufferSentCancels.Add(1)
368                 for r := range cn.requests {
369                         cn.deleteRequest(r)
370                         // log.Printf("%p: cancelling request: %v", cn, r)
371                         if !msg(makeCancelMessage(r)) {
372                                 return
373                         }
374                 }
375         }
376         if len(new) != 0 {
377                 fillBufferSentRequests.Add(1)
378                 for _, r := range new {
379                         if !cn.request(r, msg) {
380                                 // If we didn't completely top up the requests, we shouldn't
381                                 // mark the low water, since we'll want to top up the requests
382                                 // as soon as we have more write buffer space.
383                                 return
384                         }
385                 }
386                 cn.requestsLowWater = len(cn.requests) / 2
387         }
388         cn.upload(msg)
389 }
390
391 // Routine that writes to the peer. Some of what to write is buffered by
392 // activity elsewhere in the Client, and some is determined locally when the
393 // connection is writable.
394 func (cn *connection) writer(keepAliveTimeout time.Duration) {
395         var (
396                 buf       bytes.Buffer
397                 lastWrite time.Time = time.Now()
398         )
399         var keepAliveTimer *time.Timer
400         keepAliveTimer = time.AfterFunc(keepAliveTimeout, func() {
401                 cn.mu().Lock()
402                 defer cn.mu().Unlock()
403                 if time.Since(lastWrite) >= keepAliveTimeout {
404                         cn.tickleWriter()
405                 }
406                 keepAliveTimer.Reset(keepAliveTimeout)
407         })
408         cn.mu().Lock()
409         defer cn.mu().Unlock()
410         defer cn.Close()
411         defer keepAliveTimer.Stop()
412         for {
413                 buf.Write(cn.postedBuffer.Bytes())
414                 cn.postedBuffer.Reset()
415                 if buf.Len() == 0 {
416                         cn.fillWriteBuffer(func(msg pp.Message) bool {
417                                 cn.wroteMsg(&msg)
418                                 buf.Write(msg.MustMarshalBinary())
419                                 return buf.Len() < 1<<16
420                         })
421                 }
422                 if buf.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout {
423                         buf.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
424                         postedKeepalives.Add(1)
425                 }
426                 if buf.Len() == 0 {
427                         // TODO: Minimize wakeups....
428                         cn.writerCond.Wait()
429                         continue
430                 }
431                 cn.mu().Unlock()
432                 // log.Printf("writing %d bytes", buf.Len())
433                 n, err := cn.w.Write(buf.Bytes())
434                 cn.mu().Lock()
435                 if n != 0 {
436                         lastWrite = time.Now()
437                         keepAliveTimer.Reset(keepAliveTimeout)
438                 }
439                 if err != nil {
440                         return
441                 }
442                 if n != buf.Len() {
443                         panic("short write")
444                 }
445                 buf.Reset()
446         }
447 }
448
449 func (cn *connection) Have(piece int) {
450         for piece >= len(cn.sentHaves) {
451                 cn.sentHaves = append(cn.sentHaves, false)
452         }
453         if cn.sentHaves[piece] {
454                 return
455         }
456         cn.Post(pp.Message{
457                 Type:  pp.Have,
458                 Index: pp.Integer(piece),
459         })
460         cn.sentHaves[piece] = true
461 }
462
463 func (cn *connection) Bitfield(haves []bool) {
464         if cn.sentHaves != nil {
465                 panic("bitfield must be first have-related message sent")
466         }
467         cn.Post(pp.Message{
468                 Type:     pp.Bitfield,
469                 Bitfield: haves,
470         })
471         // Make a copy of haves, as that's read when the message is marshalled
472         // without the lock. Also it obviously shouldn't change in the Msg due to
473         // changes in .sentHaves.
474         cn.sentHaves = append([]bool(nil), haves...)
475 }
476
477 func nextRequestState(
478         networkingEnabled bool,
479         currentRequests map[request]struct{},
480         peerChoking bool,
481         requestPieces iter.Func,
482         pendingChunks func(piece int, f func(chunkSpec) bool) bool,
483         requestsLowWater int,
484         requestsHighWater int,
485 ) (
486         cancelExisting bool, // Cancel all our pending requests
487         newRequests []request, // Chunks to request that we currently aren't
488         interested bool, // Whether we should indicate interest, even if we don't request anything
489 ) {
490         if !networkingEnabled {
491                 return true, nil, false
492         }
493         if len(currentRequests) > requestsLowWater {
494                 return false, nil, true
495         }
496         requestPieces(func(_piece interface{}) bool {
497                 interested = true
498                 if peerChoking {
499                         return false
500                 }
501                 piece := _piece.(int)
502                 return pendingChunks(piece, func(cs chunkSpec) bool {
503                         r := request{pp.Integer(piece), cs}
504                         if _, ok := currentRequests[r]; !ok {
505                                 if newRequests == nil {
506                                         newRequests = make([]request, 0, requestsHighWater-len(currentRequests))
507                                 }
508                                 newRequests = append(newRequests, r)
509                         }
510                         return len(currentRequests)+len(newRequests) < requestsHighWater
511                 })
512         })
513         return
514 }
515
516 func (cn *connection) updateRequests() {
517         cn.tickleWriter()
518 }
519
520 // Emits the indices in the Bitmaps bms in order, never repeating any index.
521 // skip is mutated during execution, and its initial values will never be
522 // emitted.
523 func iterBitmapsDistinct(skip bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func {
524         return func(cb iter.Callback) {
525                 for _, bm := range bms {
526                         if !iter.All(func(i interface{}) bool {
527                                 skip.Add(i.(int))
528                                 return cb(i)
529                         }, bitmap.Sub(bm, skip).Iter) {
530                                 return
531                         }
532                 }
533         }
534 }
535
536 func (cn *connection) unbiasedPieceRequestOrder() iter.Func {
537         now, readahead := cn.t.readerPiecePriorities()
538         // Pieces to skip include pieces the peer doesn't have
539         skip := bitmap.Flip(cn.peerPieces, 0, cn.t.numPieces())
540         // And pieces that we already have.
541         skip.Union(cn.t.completedPieces)
542         // Return an iterator over the different priority classes, minus the skip
543         // pieces.
544         return iter.Chain(
545                 iterBitmapsDistinct(skip, now, readahead),
546                 func(cb iter.Callback) {
547                         cn.t.pendingPieces.IterTyped(func(piece int) bool {
548                                 if skip.Contains(piece) {
549                                         return true
550                                 }
551                                 more := cb(piece)
552                                 skip.Add(piece)
553                                 return more
554                         })
555                 },
556         )
557 }
558
559 // The connection should download highest priority pieces first, without any
560 // inclination toward avoiding wastage. Generally we might do this if there's
561 // a single connection, or this is the fastest connection, and we have active
562 // readers that signal an ordering preference. It's conceivable that the best
563 // connection should do this, since it's least likely to waste our time if
564 // assigned to the highest priority pieces, and assigning more than one this
565 // role would cause significant wasted bandwidth.
566 func (cn *connection) shouldRequestWithoutBias() bool {
567         if cn.t.requestStrategy != 2 {
568                 return false
569         }
570         if len(cn.t.readers) == 0 {
571                 return false
572         }
573         if len(cn.t.conns) == 1 {
574                 return true
575         }
576         if cn == cn.t.fastestConn {
577                 return true
578         }
579         return false
580 }
581
582 func (cn *connection) pieceRequestOrderIter() iter.Func {
583         if cn.shouldRequestWithoutBias() {
584                 return cn.unbiasedPieceRequestOrder()
585         } else {
586                 return cn.pieceRequestOrder.Iter
587         }
588 }
589
590 func (cn *connection) desiredRequestState() (bool, []request, bool) {
591         return nextRequestState(
592                 cn.t.networkingEnabled,
593                 cn.requests,
594                 cn.PeerChoked,
595                 cn.pieceRequestOrderIter(),
596                 func(piece int, f func(chunkSpec) bool) bool {
597                         return undirtiedChunks(piece, cn.t, f)
598                 },
599                 cn.requestsLowWater,
600                 cn.nominalMaxRequests(),
601         )
602 }
603
604 func undirtiedChunks(piece int, t *Torrent, f func(chunkSpec) bool) bool {
605         chunkIndices := t.pieces[piece].undirtiedChunkIndices().ToSortedSlice()
606         return iter.ForPerm(len(chunkIndices), func(i int) bool {
607                 return f(t.chunkIndexSpec(chunkIndices[i], piece))
608         })
609 }
610
611 // check callers updaterequests
612 func (cn *connection) stopRequestingPiece(piece int) bool {
613         return cn.pieceRequestOrder.Remove(piece)
614 }
615
616 // This is distinct from Torrent piece priority, which is the user's
617 // preference. Connection piece priority is specific to a connection,
618 // pseudorandomly avoids connections always requesting the same pieces and
619 // thus wasting effort.
620 func (cn *connection) updatePiecePriority(piece int) bool {
621         tpp := cn.t.piecePriority(piece)
622         if !cn.PeerHasPiece(piece) {
623                 tpp = PiecePriorityNone
624         }
625         if tpp == PiecePriorityNone {
626                 return cn.stopRequestingPiece(piece)
627         }
628         prio := cn.getPieceInclination()[piece]
629         switch cn.t.requestStrategy {
630         case 1:
631                 switch tpp {
632                 case PiecePriorityNormal:
633                 case PiecePriorityReadahead:
634                         prio -= cn.t.numPieces()
635                 case PiecePriorityNext, PiecePriorityNow:
636                         prio -= 2 * cn.t.numPieces()
637                 default:
638                         panic(tpp)
639                 }
640                 prio += piece / 3
641         default:
642         }
643         return cn.pieceRequestOrder.Set(piece, prio)
644 }
645
646 func (cn *connection) getPieceInclination() []int {
647         if cn.pieceInclination == nil {
648                 cn.pieceInclination = cn.t.getConnPieceInclination()
649         }
650         return cn.pieceInclination
651 }
652
653 func (cn *connection) discardPieceInclination() {
654         if cn.pieceInclination == nil {
655                 return
656         }
657         cn.t.putPieceInclination(cn.pieceInclination)
658         cn.pieceInclination = nil
659 }
660
661 func (cn *connection) peerPiecesChanged() {
662         if cn.t.haveInfo() {
663                 prioritiesChanged := false
664                 for i := range iter.N(cn.t.numPieces()) {
665                         if cn.updatePiecePriority(i) {
666                                 prioritiesChanged = true
667                         }
668                 }
669                 if prioritiesChanged {
670                         cn.updateRequests()
671                 }
672         }
673 }
674
675 func (cn *connection) raisePeerMinPieces(newMin int) {
676         if newMin > cn.peerMinPieces {
677                 cn.peerMinPieces = newMin
678         }
679 }
680
681 func (cn *connection) peerSentHave(piece int) error {
682         if cn.t.haveInfo() && piece >= cn.t.numPieces() || piece < 0 {
683                 return errors.New("invalid piece")
684         }
685         if cn.PeerHasPiece(piece) {
686                 return nil
687         }
688         cn.raisePeerMinPieces(piece + 1)
689         cn.peerPieces.Set(piece, true)
690         if cn.updatePiecePriority(piece) {
691                 cn.updateRequests()
692         }
693         return nil
694 }
695
696 func (cn *connection) peerSentBitfield(bf []bool) error {
697         cn.peerHasAll = false
698         if len(bf)%8 != 0 {
699                 panic("expected bitfield length divisible by 8")
700         }
701         // We know that the last byte means that at most the last 7 bits are
702         // wasted.
703         cn.raisePeerMinPieces(len(bf) - 7)
704         if cn.t.haveInfo() && len(bf) > cn.t.numPieces() {
705                 // Ignore known excess pieces.
706                 bf = bf[:cn.t.numPieces()]
707         }
708         for i, have := range bf {
709                 if have {
710                         cn.raisePeerMinPieces(i + 1)
711                 }
712                 cn.peerPieces.Set(i, have)
713         }
714         cn.peerPiecesChanged()
715         return nil
716 }
717
718 func (cn *connection) peerSentHaveAll() error {
719         cn.peerHasAll = true
720         cn.peerPieces.Clear()
721         cn.peerPiecesChanged()
722         return nil
723 }
724
725 func (cn *connection) peerSentHaveNone() error {
726         cn.peerPieces.Clear()
727         cn.peerHasAll = false
728         cn.peerPiecesChanged()
729         return nil
730 }
731
732 func (c *connection) requestPendingMetadata() {
733         if c.t.haveInfo() {
734                 return
735         }
736         if c.PeerExtensionIDs["ut_metadata"] == 0 {
737                 // Peer doesn't support this.
738                 return
739         }
740         // Request metadata pieces that we don't have in a random order.
741         var pending []int
742         for index := 0; index < c.t.metadataPieceCount(); index++ {
743                 if !c.t.haveMetadataPiece(index) && !c.requestedMetadataPiece(index) {
744                         pending = append(pending, index)
745                 }
746         }
747         for _, i := range rand.Perm(len(pending)) {
748                 c.requestMetadataPiece(pending[i])
749         }
750 }
751
752 func (cn *connection) wroteMsg(msg *pp.Message) {
753         messageTypesSent.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
754         cn.stats.wroteMsg(msg)
755         cn.t.stats.wroteMsg(msg)
756 }
757
758 func (cn *connection) readMsg(msg *pp.Message) {
759         cn.stats.readMsg(msg)
760         cn.t.stats.readMsg(msg)
761 }
762
763 func (cn *connection) wroteBytes(n int64) {
764         cn.stats.wroteBytes(n)
765         if cn.t != nil {
766                 cn.t.stats.wroteBytes(n)
767         }
768 }
769
770 func (cn *connection) readBytes(n int64) {
771         cn.stats.readBytes(n)
772         if cn.t != nil {
773                 cn.t.stats.readBytes(n)
774         }
775 }
776
777 // Returns whether the connection is currently useful to us. We're seeding and
778 // they want data, we don't have metainfo and they can provide it, etc.
779 func (c *connection) useful() bool {
780         t := c.t
781         if c.closed.IsSet() {
782                 return false
783         }
784         if !t.haveInfo() {
785                 return c.supportsExtension("ut_metadata")
786         }
787         if t.seeding() {
788                 return c.PeerInterested
789         }
790         return c.peerHasWantedPieces()
791 }
792
793 func (c *connection) lastHelpful() (ret time.Time) {
794         ret = c.lastUsefulChunkReceived
795         if c.t.seeding() && c.lastChunkSent.After(ret) {
796                 ret = c.lastChunkSent
797         }
798         return
799 }
800
801 // Processes incoming bittorrent messages. The client lock is held upon entry
802 // and exit. Returning will end the connection.
803 func (c *connection) mainReadLoop() error {
804         t := c.t
805         cl := t.cl
806
807         decoder := pp.Decoder{
808                 R:         bufio.NewReaderSize(c.r, 1<<17),
809                 MaxLength: 256 * 1024,
810                 Pool:      t.chunkPool,
811         }
812         for {
813                 var (
814                         msg pp.Message
815                         err error
816                 )
817                 func() {
818                         cl.mu.Unlock()
819                         defer cl.mu.Lock()
820                         err = decoder.Decode(&msg)
821                 }()
822                 if cl.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
823                         return nil
824                 }
825                 if err != nil {
826                         return err
827                 }
828                 c.readMsg(&msg)
829                 c.lastMessageReceived = time.Now()
830                 if msg.Keepalive {
831                         receivedKeepalives.Add(1)
832                         continue
833                 }
834                 messageTypesReceived.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
835                 switch msg.Type {
836                 case pp.Choke:
837                         c.PeerChoked = true
838                         c.requests = nil
839                         // We can then reset our interest.
840                         c.updateRequests()
841                 case pp.Reject:
842                         if c.deleteRequest(newRequest(msg.Index, msg.Begin, msg.Length)) {
843                                 c.updateRequests()
844                         }
845                 case pp.Unchoke:
846                         c.PeerChoked = false
847                         c.tickleWriter()
848                 case pp.Interested:
849                         c.PeerInterested = true
850                         c.tickleWriter()
851                 case pp.NotInterested:
852                         c.PeerInterested = false
853                         c.PeerRequests = nil
854                 case pp.Have:
855                         err = c.peerSentHave(int(msg.Index))
856                 case pp.Request:
857                         if c.Choked {
858                                 break
859                         }
860                         if len(c.PeerRequests) >= maxRequests {
861                                 // TODO: Should we drop them or Choke them instead?
862                                 break
863                         }
864                         if !t.havePiece(msg.Index.Int()) {
865                                 // This isn't necessarily them screwing up. We can drop pieces
866                                 // from our storage, and can't communicate this to peers
867                                 // except by reconnecting.
868                                 requestsReceivedForMissingPieces.Add(1)
869                                 err = fmt.Errorf("peer requested piece we don't have: %v", msg.Index.Int())
870                                 break
871                         }
872                         if c.PeerRequests == nil {
873                                 c.PeerRequests = make(map[request]struct{}, maxRequests)
874                         }
875                         c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{}
876                         c.tickleWriter()
877                 case pp.Cancel:
878                         req := newRequest(msg.Index, msg.Begin, msg.Length)
879                         if !c.PeerCancel(req) {
880                                 unexpectedCancels.Add(1)
881                         }
882                 case pp.Bitfield:
883                         err = c.peerSentBitfield(msg.Bitfield)
884                 case pp.HaveAll:
885                         err = c.peerSentHaveAll()
886                 case pp.HaveNone:
887                         err = c.peerSentHaveNone()
888                 case pp.Piece:
889                         c.receiveChunk(&msg)
890                         if len(msg.Piece) == int(t.chunkSize) {
891                                 t.chunkPool.Put(&msg.Piece)
892                         }
893                 case pp.Extended:
894                         switch msg.ExtendedID {
895                         case pp.HandshakeExtendedID:
896                                 // TODO: Create a bencode struct for this.
897                                 var d map[string]interface{}
898                                 err = bencode.Unmarshal(msg.ExtendedPayload, &d)
899                                 if err != nil {
900                                         err = fmt.Errorf("error decoding extended message payload: %s", err)
901                                         break
902                                 }
903                                 // log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d)
904                                 if reqq, ok := d["reqq"]; ok {
905                                         if i, ok := reqq.(int64); ok {
906                                                 c.PeerMaxRequests = int(i)
907                                         }
908                                 }
909                                 if v, ok := d["v"]; ok {
910                                         c.PeerClientName = v.(string)
911                                 }
912                                 if m, ok := d["m"]; ok {
913                                         mTyped, ok := m.(map[string]interface{})
914                                         if !ok {
915                                                 err = errors.New("handshake m value is not dict")
916                                                 break
917                                         }
918                                         if c.PeerExtensionIDs == nil {
919                                                 c.PeerExtensionIDs = make(map[string]byte, len(mTyped))
920                                         }
921                                         for name, v := range mTyped {
922                                                 id, ok := v.(int64)
923                                                 if !ok {
924                                                         log.Printf("bad handshake m item extension ID type: %T", v)
925                                                         continue
926                                                 }
927                                                 if id == 0 {
928                                                         delete(c.PeerExtensionIDs, name)
929                                                 } else {
930                                                         if c.PeerExtensionIDs[name] == 0 {
931                                                                 supportedExtensionMessages.Add(name, 1)
932                                                         }
933                                                         c.PeerExtensionIDs[name] = byte(id)
934                                                 }
935                                         }
936                                 }
937                                 metadata_sizeUntyped, ok := d["metadata_size"]
938                                 if ok {
939                                         metadata_size, ok := metadata_sizeUntyped.(int64)
940                                         if !ok {
941                                                 log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
942                                         } else {
943                                                 err = t.setMetadataSize(metadata_size)
944                                                 if err != nil {
945                                                         err = fmt.Errorf("error setting metadata size to %d", metadata_size)
946                                                         break
947                                                 }
948                                         }
949                                 }
950                                 if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
951                                         c.requestPendingMetadata()
952                                 }
953                         case metadataExtendedId:
954                                 err = cl.gotMetadataExtensionMsg(msg.ExtendedPayload, t, c)
955                                 if err != nil {
956                                         err = fmt.Errorf("error handling metadata extension message: %s", err)
957                                 }
958                         case pexExtendedId:
959                                 if cl.config.DisablePEX {
960                                         break
961                                 }
962                                 var pexMsg peerExchangeMessage
963                                 err = bencode.Unmarshal(msg.ExtendedPayload, &pexMsg)
964                                 if err != nil {
965                                         err = fmt.Errorf("error unmarshalling PEX message: %s", err)
966                                         break
967                                 }
968                                 go func() {
969                                         cl.mu.Lock()
970                                         t.addPeers(func() (ret []Peer) {
971                                                 for i, cp := range pexMsg.Added {
972                                                         p := Peer{
973                                                                 IP:     make([]byte, 4),
974                                                                 Port:   cp.Port,
975                                                                 Source: peerSourcePEX,
976                                                         }
977                                                         if i < len(pexMsg.AddedFlags) && pexMsg.AddedFlags[i]&0x01 != 0 {
978                                                                 p.SupportsEncryption = true
979                                                         }
980                                                         missinggo.CopyExact(p.IP, cp.IP[:])
981                                                         ret = append(ret, p)
982                                                 }
983                                                 return
984                                         }())
985                                         cl.mu.Unlock()
986                                 }()
987                         default:
988                                 err = fmt.Errorf("unexpected extended message ID: %v", msg.ExtendedID)
989                         }
990                         if err != nil {
991                                 // That client uses its own extension IDs for outgoing message
992                                 // types, which is incorrect.
993                                 if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) ||
994                                         strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
995                                         return nil
996                                 }
997                         }
998                 case pp.Port:
999                         if cl.dHT == nil {
1000                                 break
1001                         }
1002                         pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
1003                         if err != nil {
1004                                 panic(err)
1005                         }
1006                         if msg.Port != 0 {
1007                                 pingAddr.Port = int(msg.Port)
1008                         }
1009                         go cl.dHT.Ping(pingAddr, nil)
1010                 default:
1011                         err = fmt.Errorf("received unknown message type: %#v", msg.Type)
1012                 }
1013                 if err != nil {
1014                         return err
1015                 }
1016         }
1017 }
1018
1019 // Set both the Reader and Writer for the connection from a single ReadWriter.
1020 func (cn *connection) setRW(rw io.ReadWriter) {
1021         cn.r = rw
1022         cn.w = rw
1023 }
1024
1025 // Returns the Reader and Writer as a combined ReadWriter.
1026 func (cn *connection) rw() io.ReadWriter {
1027         return struct {
1028                 io.Reader
1029                 io.Writer
1030         }{cn.r, cn.w}
1031 }
1032
1033 // Handle a received chunk from a peer.
1034 func (c *connection) receiveChunk(msg *pp.Message) {
1035         t := c.t
1036         cl := t.cl
1037         chunksReceived.Add(1)
1038
1039         req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
1040
1041         // Request has been satisfied.
1042         if c.deleteRequest(req) {
1043                 c.updateRequests()
1044         } else {
1045                 unexpectedChunksReceived.Add(1)
1046         }
1047
1048         // Do we actually want this chunk?
1049         if !t.wantPiece(req) {
1050                 unwantedChunksReceived.Add(1)
1051                 c.UnwantedChunksReceived++
1052                 return
1053         }
1054
1055         index := int(req.Index)
1056         piece := &t.pieces[index]
1057
1058         c.UsefulChunksReceived++
1059         c.lastUsefulChunkReceived = time.Now()
1060         // if t.fastestConn != c {
1061         // log.Printf("setting fastest connection %p", c)
1062         // }
1063         t.fastestConn = c
1064
1065         // Need to record that it hasn't been written yet, before we attempt to do
1066         // anything with it.
1067         piece.incrementPendingWrites()
1068         // Record that we have the chunk, so we aren't trying to download it while
1069         // waiting for it to be written to storage.
1070         piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
1071
1072         // Cancel pending requests for this chunk.
1073         for c := range t.conns {
1074                 c.postCancel(req)
1075         }
1076
1077         err := func() error {
1078                 cl.mu.Unlock()
1079                 defer cl.mu.Lock()
1080                 // Write the chunk out. Note that the upper bound on chunk writing
1081                 // concurrency will be the number of connections. We write inline with
1082                 // receiving the chunk (with this lock dance), because we want to
1083                 // handle errors synchronously and I haven't thought of a nice way to
1084                 // defer any concurrency to the storage and have that notify the
1085                 // client of errors. TODO: Do that instead.
1086                 return t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
1087         }()
1088
1089         piece.decrementPendingWrites()
1090
1091         if err != nil {
1092                 log.Printf("%s (%s): error writing chunk %v: %s", t, t.infoHash, req, err)
1093                 t.pendRequest(req)
1094                 t.updatePieceCompletion(int(msg.Index))
1095                 return
1096         }
1097
1098         // It's important that the piece is potentially queued before we check if
1099         // the piece is still wanted, because if it is queued, it won't be wanted.
1100         if t.pieceAllDirty(index) {
1101                 t.queuePieceCheck(int(req.Index))
1102                 t.pendAllChunkSpecs(index)
1103         }
1104
1105         if c.peerTouchedPieces == nil {
1106                 c.peerTouchedPieces = make(map[int]struct{})
1107         }
1108         c.peerTouchedPieces[index] = struct{}{}
1109
1110         cl.event.Broadcast()
1111         t.publishPieceChange(int(req.Index))
1112 }
1113
1114 // Also handles choking and unchoking of the remote peer.
1115 func (c *connection) upload(msg func(pp.Message) bool) bool {
1116         t := c.t
1117         cl := t.cl
1118         if cl.config.NoUpload {
1119                 return true
1120         }
1121         if !c.PeerInterested {
1122                 return true
1123         }
1124         seeding := t.seeding()
1125         if !seeding && !c.peerHasWantedPieces() {
1126                 // There's no reason to upload to this peer.
1127                 return true
1128         }
1129         // Breaking or completing this loop means we don't want to upload to the
1130         // peer anymore, and we choke them.
1131 another:
1132         for seeding || c.chunksSent < c.UsefulChunksReceived+6 {
1133                 // We want to upload to the peer.
1134                 if !c.Unchoke(msg) {
1135                         return false
1136                 }
1137                 for r := range c.PeerRequests {
1138                         res := cl.uploadLimit.ReserveN(time.Now(), int(r.Length))
1139                         if !res.OK() {
1140                                 panic(fmt.Sprintf("upload rate limiter burst size < %d", r.Length))
1141                         }
1142                         delay := res.Delay()
1143                         if delay > 0 {
1144                                 res.Cancel()
1145                                 if c.uploadTimer == nil {
1146                                         c.uploadTimer = time.AfterFunc(delay, c.writerCond.Broadcast)
1147                                 } else {
1148                                         c.uploadTimer.Reset(delay)
1149                                 }
1150                                 // Hard to say what to return here.
1151                                 return true
1152                         }
1153                         more, err := cl.sendChunk(t, c, r, msg)
1154                         if err != nil {
1155                                 i := int(r.Index)
1156                                 if t.pieceComplete(i) {
1157                                         t.updatePieceCompletion(i)
1158                                         if !t.pieceComplete(i) {
1159                                                 // We had the piece, but not anymore.
1160                                                 break another
1161                                         }
1162                                 }
1163                                 log.Printf("error sending chunk %+v to peer: %s", r, err)
1164                                 // If we failed to send a chunk, choke the peer to ensure they
1165                                 // flush all their requests. We've probably dropped a piece,
1166                                 // but there's no way to communicate this to the peer. If they
1167                                 // ask for it again, we'll kick them to allow us to send them
1168                                 // an updated bitfield.
1169                                 break another
1170                         }
1171                         delete(c.PeerRequests, r)
1172                         if !more {
1173                                 return false
1174                         }
1175                         goto another
1176                 }
1177                 return true
1178         }
1179         return c.Choke(msg)
1180 }
1181
1182 func (cn *connection) Drop() {
1183         cn.t.dropConnection(cn)
1184 }
1185
1186 func (cn *connection) netGoodPiecesDirtied() int {
1187         return cn.goodPiecesDirtied - cn.badPiecesDirtied
1188 }
1189
1190 func (c *connection) peerHasWantedPieces() bool {
1191         return !c.pieceRequestOrder.IsEmpty()
1192 }
1193
1194 func (c *connection) numLocalRequests() int {
1195         return len(c.requests)
1196 }
1197
1198 func (c *connection) deleteRequest(r request) bool {
1199         if _, ok := c.requests[r]; !ok {
1200                 return false
1201         }
1202         delete(c.requests, r)
1203         return true
1204 }
1205
1206 func (c *connection) tickleWriter() {
1207         c.writerCond.Broadcast()
1208 }
1209
1210 func (c *connection) postCancel(r request) bool {
1211         if !c.deleteRequest(r) {
1212                 return false
1213         }
1214         c.Post(makeCancelMessage(r))
1215         return true
1216 }