]> Sergey Matveev's repositories - btrtrc.git/blob - connection.go
TODOs and comments
[btrtrc.git] / connection.go
1 package torrent
2
3 import (
4         "bufio"
5         "bytes"
6         "errors"
7         "fmt"
8         "io"
9         "math/rand"
10         "net"
11         "strconv"
12         "strings"
13         "sync"
14         "time"
15
16         "github.com/anacrolix/log"
17
18         "github.com/anacrolix/missinggo"
19         "github.com/anacrolix/missinggo/bitmap"
20         "github.com/anacrolix/missinggo/iter"
21         "github.com/anacrolix/missinggo/prioritybitmap"
22
23         "github.com/anacrolix/torrent/bencode"
24         "github.com/anacrolix/torrent/mse"
25         pp "github.com/anacrolix/torrent/peer_protocol"
26 )
27
28 type peerSource string
29
30 const (
31         peerSourceTracker         = "T" // It's the default.
32         peerSourceIncoming        = "I"
33         peerSourceDHTGetPeers     = "Hg"
34         peerSourceDHTAnnouncePeer = "Ha"
35         peerSourcePEX             = "X"
36 )
37
38 // Maintains the state of a connection with a peer.
39 type connection struct {
40         t *Torrent
41         // The actual Conn, used for closing, and setting socket options.
42         conn net.Conn
43         // The Reader and Writer for this Conn, with hooks installed for stats,
44         // limiting, deadlines etc.
45         w io.Writer
46         r io.Reader
47         // True if the connection is operating over MSE obfuscation.
48         headerEncrypted bool
49         cryptoMethod    uint32
50         Discovery       peerSource
51         uTP             bool
52         closed          missinggo.Event
53
54         stats ConnStats
55
56         lastMessageReceived     time.Time
57         completedHandshake      time.Time
58         lastUsefulChunkReceived time.Time
59         lastChunkSent           time.Time
60
61         // Stuff controlled by the local peer.
62         Interested       bool
63         Choked           bool
64         requests         map[request]struct{}
65         requestsLowWater int
66         // Indexed by metadata piece, set to true if posted and pending a
67         // response.
68         metadataRequests []bool
69         sentHaves        bitmap.Bitmap
70
71         // Stuff controlled by the remote peer.
72         PeerID             PeerID
73         PeerInterested     bool
74         PeerChoked         bool
75         PeerRequests       map[request]struct{}
76         PeerExtensionBytes peerExtensionBytes
77         // The pieces the peer has claimed to have.
78         peerPieces bitmap.Bitmap
79         // The peer has everything. This can occur due to a special message, when
80         // we may not even know the number of pieces in the torrent yet.
81         peerSentHaveAll bool
82         // The highest possible number of pieces the torrent could have based on
83         // communication with the peer. Generally only useful until we have the
84         // torrent info.
85         peerMinPieces int
86         // Pieces we've accepted chunks for from the peer.
87         peerTouchedPieces map[int]struct{}
88
89         PeerMaxRequests  int // Maximum pending requests the peer allows.
90         PeerExtensionIDs map[string]byte
91         PeerClientName   string
92
93         pieceInclination  []int
94         pieceRequestOrder prioritybitmap.PriorityBitmap
95
96         writeBuffer *bytes.Buffer
97         uploadTimer *time.Timer
98         writerCond  sync.Cond
99 }
100
101 func (cn *connection) peerHasAllPieces() (all bool, known bool) {
102         if cn.peerSentHaveAll {
103                 return true, true
104         }
105         if !cn.t.haveInfo() {
106                 return false, false
107         }
108         return bitmap.Flip(cn.peerPieces, 0, cn.t.numPieces()).IsEmpty(), true
109 }
110
111 func (cn *connection) mu() sync.Locker {
112         return &cn.t.cl.mu
113 }
114
115 func (cn *connection) remoteAddr() net.Addr {
116         return cn.conn.RemoteAddr()
117 }
118
119 func (cn *connection) localAddr() net.Addr {
120         return cn.conn.LocalAddr()
121 }
122
123 func (cn *connection) supportsExtension(ext string) bool {
124         _, ok := cn.PeerExtensionIDs[ext]
125         return ok
126 }
127
128 // The best guess at number of pieces in the torrent for this peer.
129 func (cn *connection) bestPeerNumPieces() int {
130         if cn.t.haveInfo() {
131                 return cn.t.numPieces()
132         }
133         return cn.peerMinPieces
134 }
135
136 func (cn *connection) completedString() string {
137         return fmt.Sprintf("%d/%d", cn.peerPieces.Len(), cn.bestPeerNumPieces())
138 }
139
140 // Correct the PeerPieces slice length. Return false if the existing slice is
141 // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
142 // messages.
143 func (cn *connection) setNumPieces(num int) error {
144         cn.peerPieces.RemoveRange(num, bitmap.ToEnd)
145         cn.peerPiecesChanged()
146         return nil
147 }
148
149 func eventAgeString(t time.Time) string {
150         if t.IsZero() {
151                 return "never"
152         }
153         return fmt.Sprintf("%.2fs ago", time.Since(t).Seconds())
154 }
155
156 func (cn *connection) connectionFlags() (ret string) {
157         c := func(b byte) {
158                 ret += string([]byte{b})
159         }
160         if cn.cryptoMethod == mse.CryptoMethodRC4 {
161                 c('E')
162         } else if cn.headerEncrypted {
163                 c('e')
164         }
165         ret += string(cn.Discovery)
166         if cn.uTP {
167                 c('T')
168         }
169         return
170 }
171
172 // Inspired by https://trac.transmissionbt.com/wiki/PeerStatusText
173 func (cn *connection) statusFlags() (ret string) {
174         c := func(b byte) {
175                 ret += string([]byte{b})
176         }
177         if cn.Interested {
178                 c('i')
179         }
180         if cn.Choked {
181                 c('c')
182         }
183         c('-')
184         ret += cn.connectionFlags()
185         c('-')
186         if cn.PeerInterested {
187                 c('i')
188         }
189         if cn.PeerChoked {
190                 c('c')
191         }
192         return
193 }
194
195 func (cn *connection) String() string {
196         var buf bytes.Buffer
197         cn.WriteStatus(&buf, nil)
198         return buf.String()
199 }
200
201 func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
202         // \t isn't preserved in <pre> blocks?
203         fmt.Fprintf(w, "%-40s: %s-%s\n", cn.PeerID, cn.localAddr(), cn.remoteAddr())
204         fmt.Fprintf(w, "    last msg: %s, connected: %s, last helpful: %s\n",
205                 eventAgeString(cn.lastMessageReceived),
206                 eventAgeString(cn.completedHandshake),
207                 eventAgeString(cn.lastHelpful()))
208         fmt.Fprintf(w,
209                 "    %s completed, %d pieces touched, good chunks: %d/%d-%d reqq: %d-%d, flags: %s\n",
210                 cn.completedString(),
211                 len(cn.peerTouchedPieces),
212                 cn.stats.ChunksReadUseful,
213                 // TODO: Use ChunksRead? Verify that value is the same as this sum?
214                 cn.stats.ChunksReadUnwanted+cn.stats.ChunksReadUseful,
215                 cn.stats.ChunksWritten,
216                 cn.numLocalRequests(),
217                 len(cn.PeerRequests),
218                 cn.statusFlags(),
219         )
220         roi := cn.pieceRequestOrderIter()
221         fmt.Fprintf(w, "    next pieces: %v%s\n",
222                 iter.ToSlice(iter.Head(10, roi)),
223                 func() string {
224                         if cn.shouldRequestWithoutBias() {
225                                 return " (fastest)"
226                         } else {
227                                 return ""
228                         }
229                 }())
230 }
231
232 func (cn *connection) Close() {
233         if !cn.closed.Set() {
234                 return
235         }
236         cn.tickleWriter()
237         cn.discardPieceInclination()
238         cn.pieceRequestOrder.Clear()
239         if cn.conn != nil {
240                 go cn.conn.Close()
241         }
242 }
243
244 func (cn *connection) PeerHasPiece(piece int) bool {
245         return cn.peerSentHaveAll || cn.peerPieces.Contains(piece)
246 }
247
248 // Writes a message into the write buffer.
249 func (cn *connection) Post(msg pp.Message) {
250         messageTypesPosted.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
251         // We don't need to track bytes here because a connection.w Writer wrapper
252         // takes care of that (although there's some delay between us recording
253         // the message, and the connection writer flushing it out.).
254         cn.writeBuffer.Write(msg.MustMarshalBinary())
255         // Last I checked only Piece messages affect stats, and we don't post
256         // those.
257         cn.wroteMsg(&msg)
258         cn.tickleWriter()
259 }
260
261 func (cn *connection) requestMetadataPiece(index int) {
262         eID := cn.PeerExtensionIDs["ut_metadata"]
263         if eID == 0 {
264                 return
265         }
266         if index < len(cn.metadataRequests) && cn.metadataRequests[index] {
267                 return
268         }
269         cn.Post(pp.Message{
270                 Type:       pp.Extended,
271                 ExtendedID: eID,
272                 ExtendedPayload: func() []byte {
273                         b, err := bencode.Marshal(map[string]int{
274                                 "msg_type": pp.RequestMetadataExtensionMsgType,
275                                 "piece":    index,
276                         })
277                         if err != nil {
278                                 panic(err)
279                         }
280                         return b
281                 }(),
282         })
283         for index >= len(cn.metadataRequests) {
284                 cn.metadataRequests = append(cn.metadataRequests, false)
285         }
286         cn.metadataRequests[index] = true
287 }
288
289 func (cn *connection) requestedMetadataPiece(index int) bool {
290         return index < len(cn.metadataRequests) && cn.metadataRequests[index]
291 }
292
293 // The actual value to use as the maximum outbound requests.
294 func (cn *connection) nominalMaxRequests() (ret int) {
295         ret = cn.PeerMaxRequests
296         if ret > 64 {
297                 ret = 64
298         }
299         return
300 }
301
302 // Returns true if an unsatisfied request was canceled.
303 func (cn *connection) PeerCancel(r request) bool {
304         if cn.PeerRequests == nil {
305                 return false
306         }
307         if _, ok := cn.PeerRequests[r]; !ok {
308                 return false
309         }
310         delete(cn.PeerRequests, r)
311         return true
312 }
313
314 func (cn *connection) Choke(msg func(pp.Message) bool) bool {
315         if cn.Choked {
316                 return true
317         }
318         cn.PeerRequests = nil
319         cn.Choked = true
320         return msg(pp.Message{
321                 Type: pp.Choke,
322         })
323 }
324
325 func (cn *connection) Unchoke(msg func(pp.Message) bool) bool {
326         if !cn.Choked {
327                 return true
328         }
329         cn.Choked = false
330         return msg(pp.Message{
331                 Type: pp.Unchoke,
332         })
333 }
334
335 func (cn *connection) SetInterested(interested bool, msg func(pp.Message) bool) bool {
336         if cn.Interested == interested {
337                 return true
338         }
339         cn.Interested = interested
340         // log.Printf("%p: setting interest: %v", cn, interested)
341         return msg(pp.Message{
342                 Type: func() pp.MessageType {
343                         if interested {
344                                 return pp.Interested
345                         } else {
346                                 return pp.NotInterested
347                         }
348                 }(),
349         })
350 }
351
352 // The function takes a message to be sent, and returns true if more messages
353 // are okay.
354 type messageWriter func(pp.Message) bool
355
356 // Proxies the messageWriter's response.
357 func (cn *connection) request(r request, mw messageWriter) bool {
358         if cn.requests == nil {
359                 cn.requests = make(map[request]struct{}, cn.nominalMaxRequests())
360         }
361         if _, ok := cn.requests[r]; ok {
362                 panic("chunk already requested")
363         }
364         if !cn.PeerHasPiece(r.Index.Int()) {
365                 panic("requesting piece peer doesn't have")
366         }
367         cn.requests[r] = struct{}{}
368         if _, ok := cn.t.conns[cn]; !ok {
369                 panic("requesting but not in active conns")
370         }
371         cn.t.pendingRequests[r]++
372         return mw(pp.Message{
373                 Type:   pp.Request,
374                 Index:  r.Index,
375                 Begin:  r.Begin,
376                 Length: r.Length,
377         })
378 }
379
380 func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) {
381         numFillBuffers.Add(1)
382         cancel, new, i := cn.desiredRequestState()
383         if !cn.SetInterested(i, msg) {
384                 return
385         }
386         if cancel && len(cn.requests) != 0 {
387                 fillBufferSentCancels.Add(1)
388                 for r := range cn.requests {
389                         cn.deleteRequest(r)
390                         // log.Printf("%p: cancelling request: %v", cn, r)
391                         if !msg(makeCancelMessage(r)) {
392                                 return
393                         }
394                 }
395         }
396         if len(new) != 0 {
397                 fillBufferSentRequests.Add(1)
398                 for _, r := range new {
399                         if !cn.request(r, msg) {
400                                 // If we didn't completely top up the requests, we shouldn't
401                                 // mark the low water, since we'll want to top up the requests
402                                 // as soon as we have more write buffer space.
403                                 return
404                         }
405                 }
406                 cn.requestsLowWater = len(cn.requests) / 2
407         }
408         cn.upload(msg)
409 }
410
411 // Routine that writes to the peer. Some of what to write is buffered by
412 // activity elsewhere in the Client, and some is determined locally when the
413 // connection is writable.
414 func (cn *connection) writer(keepAliveTimeout time.Duration) {
415         var (
416                 lastWrite      time.Time = time.Now()
417                 keepAliveTimer *time.Timer
418         )
419         keepAliveTimer = time.AfterFunc(keepAliveTimeout, func() {
420                 cn.mu().Lock()
421                 defer cn.mu().Unlock()
422                 if time.Since(lastWrite) >= keepAliveTimeout {
423                         cn.tickleWriter()
424                 }
425                 keepAliveTimer.Reset(keepAliveTimeout)
426         })
427         cn.mu().Lock()
428         defer cn.mu().Unlock()
429         defer cn.Close()
430         defer keepAliveTimer.Stop()
431         frontBuf := new(bytes.Buffer)
432         for {
433                 if cn.closed.IsSet() {
434                         return
435                 }
436                 if cn.writeBuffer.Len() == 0 {
437                         cn.fillWriteBuffer(func(msg pp.Message) bool {
438                                 cn.wroteMsg(&msg)
439                                 cn.writeBuffer.Write(msg.MustMarshalBinary())
440                                 return cn.writeBuffer.Len() < 1<<16
441                         })
442                 }
443                 if cn.writeBuffer.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout {
444                         cn.writeBuffer.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
445                         postedKeepalives.Add(1)
446                 }
447                 if cn.writeBuffer.Len() == 0 {
448                         // TODO: Minimize wakeups....
449                         cn.writerCond.Wait()
450                         continue
451                 }
452                 // Flip the buffers.
453                 frontBuf, cn.writeBuffer = cn.writeBuffer, frontBuf
454                 cn.mu().Unlock()
455                 n, err := cn.w.Write(frontBuf.Bytes())
456                 cn.mu().Lock()
457                 if n != 0 {
458                         lastWrite = time.Now()
459                         keepAliveTimer.Reset(keepAliveTimeout)
460                 }
461                 if err != nil {
462                         return
463                 }
464                 if n != frontBuf.Len() {
465                         panic("short write")
466                 }
467                 frontBuf.Reset()
468         }
469 }
470
471 func (cn *connection) Have(piece int) {
472         if cn.sentHaves.Get(piece) {
473                 return
474         }
475         cn.Post(pp.Message{
476                 Type:  pp.Have,
477                 Index: pp.Integer(piece),
478         })
479         cn.sentHaves.Add(piece)
480 }
481
482 func (cn *connection) PostBitfield() {
483         if cn.sentHaves.Len() != 0 {
484                 panic("bitfield must be first have-related message sent")
485         }
486         if !cn.t.haveAnyPieces() {
487                 return
488         }
489         cn.Post(pp.Message{
490                 Type:     pp.Bitfield,
491                 Bitfield: cn.t.bitfield(),
492         })
493         cn.sentHaves = cn.t.completedPieces.Copy()
494 }
495
496 // Determines interest and requests to send to a connected peer.
497 func nextRequestState(
498         networkingEnabled bool,
499         currentRequests map[request]struct{},
500         peerChoking bool,
501         iterPendingRequests func(f func(request) bool),
502         requestsLowWater int,
503         requestsHighWater int,
504 ) (
505         cancelExisting bool, // Cancel all our pending requests
506         newRequests []request, // Chunks to request that we currently aren't
507         interested bool, // Whether we should indicate interest, even if we don't request anything
508 ) {
509         if !networkingEnabled {
510                 return true, nil, false
511         }
512         if len(currentRequests) > requestsLowWater {
513                 return false, nil, true
514         }
515         iterPendingRequests(func(r request) bool {
516                 interested = true
517                 if peerChoking {
518                         return false
519                 }
520                 if _, ok := currentRequests[r]; !ok {
521                         if newRequests == nil {
522                                 newRequests = make([]request, 0, requestsHighWater-len(currentRequests))
523                         }
524                         newRequests = append(newRequests, r)
525                 }
526                 return len(currentRequests)+len(newRequests) < requestsHighWater
527         })
528         return
529 }
530
531 func (cn *connection) updateRequests() {
532         // log.Print("update requests")
533         cn.tickleWriter()
534 }
535
536 // Emits the indices in the Bitmaps bms in order, never repeating any index.
537 // skip is mutated during execution, and its initial values will never be
538 // emitted.
539 func iterBitmapsDistinct(skip bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func {
540         return func(cb iter.Callback) {
541                 for _, bm := range bms {
542                         if !iter.All(func(i interface{}) bool {
543                                 skip.Add(i.(int))
544                                 return cb(i)
545                         }, bitmap.Sub(bm, skip).Iter) {
546                                 return
547                         }
548                 }
549         }
550 }
551
552 func (cn *connection) unbiasedPieceRequestOrder() iter.Func {
553         now, readahead := cn.t.readerPiecePriorities()
554         var skip bitmap.Bitmap
555         if !cn.peerSentHaveAll {
556                 // Pieces to skip include pieces the peer doesn't have
557                 skip = bitmap.Flip(cn.peerPieces, 0, cn.t.numPieces())
558         }
559         // And pieces that we already have.
560         skip.Union(cn.t.completedPieces)
561         // Return an iterator over the different priority classes, minus the skip
562         // pieces.
563         return iter.Chain(
564                 iterBitmapsDistinct(skip, now, readahead),
565                 func(cb iter.Callback) {
566                         cn.t.pendingPieces.IterTyped(func(piece int) bool {
567                                 if skip.Contains(piece) {
568                                         return true
569                                 }
570                                 more := cb(piece)
571                                 skip.Add(piece)
572                                 return more
573                         })
574                 },
575         )
576 }
577
578 // The connection should download highest priority pieces first, without any
579 // inclination toward avoiding wastage. Generally we might do this if there's
580 // a single connection, or this is the fastest connection, and we have active
581 // readers that signal an ordering preference. It's conceivable that the best
582 // connection should do this, since it's least likely to waste our time if
583 // assigned to the highest priority pieces, and assigning more than one this
584 // role would cause significant wasted bandwidth.
585 func (cn *connection) shouldRequestWithoutBias() bool {
586         if cn.t.requestStrategy != 2 {
587                 return false
588         }
589         if len(cn.t.readers) == 0 {
590                 return false
591         }
592         if len(cn.t.conns) == 1 {
593                 return true
594         }
595         if cn == cn.t.fastestConn {
596                 return true
597         }
598         return false
599 }
600
601 func (cn *connection) pieceRequestOrderIter() iter.Func {
602         if cn.shouldRequestWithoutBias() {
603                 return cn.unbiasedPieceRequestOrder()
604         } else {
605                 return cn.pieceRequestOrder.Iter
606         }
607 }
608
609 func (cn *connection) iterPendingRequests(f func(request) bool) {
610         cn.pieceRequestOrderIter()(func(_piece interface{}) bool {
611                 piece := _piece.(int)
612                 return iterUndirtiedChunks(piece, cn.t, func(cs chunkSpec) bool {
613                         r := request{pp.Integer(piece), cs}
614                         // log.Println(r, cn.t.pendingRequests[r], cn.requests)
615                         // if _, ok := cn.requests[r]; !ok && cn.t.pendingRequests[r] != 0 {
616                         //      return true
617                         // }
618                         return f(r)
619                 })
620         })
621 }
622
623 func (cn *connection) desiredRequestState() (bool, []request, bool) {
624         return nextRequestState(
625                 cn.t.networkingEnabled,
626                 cn.requests,
627                 cn.PeerChoked,
628                 cn.iterPendingRequests,
629                 cn.requestsLowWater,
630                 cn.nominalMaxRequests(),
631         )
632 }
633
634 func iterUndirtiedChunks(piece int, t *Torrent, f func(chunkSpec) bool) bool {
635         chunkIndices := t.pieces[piece].undirtiedChunkIndices().ToSortedSlice()
636         // TODO: Use "math/rand".Shuffle >= Go 1.10
637         return iter.ForPerm(len(chunkIndices), func(i int) bool {
638                 return f(t.chunkIndexSpec(chunkIndices[i], piece))
639         })
640 }
641
642 // check callers updaterequests
643 func (cn *connection) stopRequestingPiece(piece int) bool {
644         return cn.pieceRequestOrder.Remove(piece)
645 }
646
647 // This is distinct from Torrent piece priority, which is the user's
648 // preference. Connection piece priority is specific to a connection and is
649 // used to pseudorandomly avoid connections always requesting the same pieces
650 // and thus wasting effort.
651 func (cn *connection) updatePiecePriority(piece int) bool {
652         tpp := cn.t.piecePriority(piece)
653         if !cn.PeerHasPiece(piece) {
654                 tpp = PiecePriorityNone
655         }
656         if tpp == PiecePriorityNone {
657                 return cn.stopRequestingPiece(piece)
658         }
659         prio := cn.getPieceInclination()[piece]
660         switch cn.t.requestStrategy {
661         case 1:
662                 switch tpp {
663                 case PiecePriorityNormal:
664                 case PiecePriorityReadahead:
665                         prio -= cn.t.numPieces()
666                 case PiecePriorityNext, PiecePriorityNow:
667                         prio -= 2 * cn.t.numPieces()
668                 default:
669                         panic(tpp)
670                 }
671                 prio += piece / 3
672         default:
673         }
674         return cn.pieceRequestOrder.Set(piece, prio) || cn.shouldRequestWithoutBias()
675 }
676
677 func (cn *connection) getPieceInclination() []int {
678         if cn.pieceInclination == nil {
679                 cn.pieceInclination = cn.t.getConnPieceInclination()
680         }
681         return cn.pieceInclination
682 }
683
684 func (cn *connection) discardPieceInclination() {
685         if cn.pieceInclination == nil {
686                 return
687         }
688         cn.t.putPieceInclination(cn.pieceInclination)
689         cn.pieceInclination = nil
690 }
691
692 func (cn *connection) peerPiecesChanged() {
693         if cn.t.haveInfo() {
694                 prioritiesChanged := false
695                 for i := range iter.N(cn.t.numPieces()) {
696                         if cn.updatePiecePriority(i) {
697                                 prioritiesChanged = true
698                         }
699                 }
700                 if prioritiesChanged {
701                         cn.updateRequests()
702                 }
703         }
704 }
705
706 func (cn *connection) raisePeerMinPieces(newMin int) {
707         if newMin > cn.peerMinPieces {
708                 cn.peerMinPieces = newMin
709         }
710 }
711
712 func (cn *connection) peerSentHave(piece int) error {
713         if cn.t.haveInfo() && piece >= cn.t.numPieces() || piece < 0 {
714                 return errors.New("invalid piece")
715         }
716         if cn.PeerHasPiece(piece) {
717                 return nil
718         }
719         cn.raisePeerMinPieces(piece + 1)
720         cn.peerPieces.Set(piece, true)
721         if cn.updatePiecePriority(piece) {
722                 cn.updateRequests()
723         }
724         return nil
725 }
726
727 func (cn *connection) peerSentBitfield(bf []bool) error {
728         cn.peerSentHaveAll = false
729         if len(bf)%8 != 0 {
730                 panic("expected bitfield length divisible by 8")
731         }
732         // We know that the last byte means that at most the last 7 bits are
733         // wasted.
734         cn.raisePeerMinPieces(len(bf) - 7)
735         if cn.t.haveInfo() && len(bf) > cn.t.numPieces() {
736                 // Ignore known excess pieces.
737                 bf = bf[:cn.t.numPieces()]
738         }
739         for i, have := range bf {
740                 if have {
741                         cn.raisePeerMinPieces(i + 1)
742                 }
743                 cn.peerPieces.Set(i, have)
744         }
745         cn.peerPiecesChanged()
746         return nil
747 }
748
749 func (cn *connection) onPeerSentHaveAll() error {
750         cn.peerSentHaveAll = true
751         cn.peerPieces.Clear()
752         cn.peerPiecesChanged()
753         return nil
754 }
755
756 func (cn *connection) peerSentHaveNone() error {
757         cn.peerPieces.Clear()
758         cn.peerSentHaveAll = false
759         cn.peerPiecesChanged()
760         return nil
761 }
762
763 func (c *connection) requestPendingMetadata() {
764         if c.t.haveInfo() {
765                 return
766         }
767         if c.PeerExtensionIDs["ut_metadata"] == 0 {
768                 // Peer doesn't support this.
769                 return
770         }
771         // Request metadata pieces that we don't have in a random order.
772         var pending []int
773         for index := 0; index < c.t.metadataPieceCount(); index++ {
774                 if !c.t.haveMetadataPiece(index) && !c.requestedMetadataPiece(index) {
775                         pending = append(pending, index)
776                 }
777         }
778         for _, i := range rand.Perm(len(pending)) {
779                 c.requestMetadataPiece(pending[i])
780         }
781 }
782
783 func (cn *connection) wroteMsg(msg *pp.Message) {
784         messageTypesSent.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
785         cn.stats.wroteMsg(msg)
786         cn.t.stats.wroteMsg(msg)
787 }
788
789 func (cn *connection) readMsg(msg *pp.Message) {
790         cn.stats.readMsg(msg)
791         cn.t.stats.readMsg(msg)
792 }
793
794 func (cn *connection) wroteBytes(n int64) {
795         cn.stats.wroteBytes(n)
796         if cn.t != nil {
797                 cn.t.stats.wroteBytes(n)
798         }
799 }
800
801 func (cn *connection) readBytes(n int64) {
802         cn.stats.readBytes(n)
803         if cn.t != nil {
804                 cn.t.stats.readBytes(n)
805         }
806 }
807
808 // Returns whether the connection could be useful to us. We're seeding and
809 // they want data, we don't have metainfo and they can provide it, etc.
810 func (c *connection) useful() bool {
811         t := c.t
812         if c.closed.IsSet() {
813                 return false
814         }
815         if !t.haveInfo() {
816                 return c.supportsExtension("ut_metadata")
817         }
818         if t.seeding() && c.PeerInterested {
819                 return true
820         }
821         if c.peerHasWantedPieces() {
822                 return true
823         }
824         return false
825 }
826
827 func (c *connection) lastHelpful() (ret time.Time) {
828         ret = c.lastUsefulChunkReceived
829         if c.t.seeding() && c.lastChunkSent.After(ret) {
830                 ret = c.lastChunkSent
831         }
832         return
833 }
834
835 func (c *connection) fastEnabled() bool {
836         return c.PeerExtensionBytes.SupportsFast() && c.t.cl.extensionBytes.SupportsFast()
837 }
838
839 // Returns true if we were able to reject the request.
840 func (c *connection) reject(r request) bool {
841         if !c.fastEnabled() {
842                 return false
843         }
844         c.deleteRequest(r)
845         c.Post(r.ToMsg(pp.Reject))
846         return true
847 }
848
849 func (c *connection) onReadRequest(r request) error {
850         requestedChunkLengths.Add(strconv.FormatUint(r.Length.Uint64(), 10), 1)
851         if r.Begin+r.Length > c.t.pieceLength(int(r.Index)) {
852                 return errors.New("bad request")
853         }
854         if _, ok := c.PeerRequests[r]; ok {
855                 return nil
856         }
857         if c.Choked {
858                 c.reject(r)
859                 return nil
860         }
861         if len(c.PeerRequests) >= maxRequests {
862                 c.reject(r)
863                 return nil
864         }
865         if !c.t.havePiece(r.Index.Int()) {
866                 // This isn't necessarily them screwing up. We can drop pieces
867                 // from our storage, and can't communicate this to peers
868                 // except by reconnecting.
869                 requestsReceivedForMissingPieces.Add(1)
870                 return fmt.Errorf("peer requested piece we don't have: %v", r.Index.Int())
871         }
872         if c.PeerRequests == nil {
873                 c.PeerRequests = make(map[request]struct{}, maxRequests)
874         }
875         c.PeerRequests[r] = struct{}{}
876         c.tickleWriter()
877         return nil
878 }
879
880 // Processes incoming bittorrent messages. The client lock is held upon entry
881 // and exit. Returning will end the connection.
882 func (c *connection) mainReadLoop() error {
883         t := c.t
884         cl := t.cl
885
886         decoder := pp.Decoder{
887                 R:         bufio.NewReaderSize(c.r, 1<<17),
888                 MaxLength: 256 * 1024,
889                 Pool:      t.chunkPool,
890         }
891         for {
892                 var (
893                         msg pp.Message
894                         err error
895                 )
896                 func() {
897                         cl.mu.Unlock()
898                         defer cl.mu.Lock()
899                         err = decoder.Decode(&msg)
900                 }()
901                 if cl.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
902                         return nil
903                 }
904                 if err != nil {
905                         return err
906                 }
907                 c.readMsg(&msg)
908                 c.lastMessageReceived = time.Now()
909                 if msg.Keepalive {
910                         receivedKeepalives.Add(1)
911                         continue
912                 }
913                 messageTypesReceived.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
914                 switch msg.Type {
915                 case pp.Choke:
916                         c.PeerChoked = true
917                         c.deleteAllRequests()
918                         // We can then reset our interest.
919                         c.updateRequests()
920                 case pp.Reject:
921                         if c.deleteRequest(newRequestFromMessage(&msg)) {
922                                 c.updateRequests()
923                         }
924                 case pp.Unchoke:
925                         c.PeerChoked = false
926                         c.tickleWriter()
927                 case pp.Interested:
928                         c.PeerInterested = true
929                         c.tickleWriter()
930                 case pp.NotInterested:
931                         c.PeerInterested = false
932                         c.PeerRequests = nil
933                 case pp.Have:
934                         err = c.peerSentHave(int(msg.Index))
935                 case pp.Request:
936                         r := newRequestFromMessage(&msg)
937                         err = c.onReadRequest(r)
938                 case pp.Cancel:
939                         req := newRequestFromMessage(&msg)
940                         if !c.PeerCancel(req) {
941                                 unexpectedCancels.Add(1)
942                         }
943                 case pp.Bitfield:
944                         err = c.peerSentBitfield(msg.Bitfield)
945                 case pp.HaveAll:
946                         err = c.onPeerSentHaveAll()
947                 case pp.HaveNone:
948                         err = c.peerSentHaveNone()
949                 case pp.Piece:
950                         c.receiveChunk(&msg)
951                         if len(msg.Piece) == int(t.chunkSize) {
952                                 t.chunkPool.Put(&msg.Piece)
953                         }
954                 case pp.Extended:
955                         err = c.onReadExtendedMsg(msg.ExtendedID, msg.ExtendedPayload)
956                 case pp.Port:
957                         if cl.dHT == nil {
958                                 break
959                         }
960                         pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
961                         if err != nil {
962                                 panic(err)
963                         }
964                         if msg.Port != 0 {
965                                 pingAddr.Port = int(msg.Port)
966                         }
967                         go cl.dHT.Ping(pingAddr, nil)
968                 default:
969                         err = fmt.Errorf("received unknown message type: %#v", msg.Type)
970                 }
971                 if err != nil {
972                         return err
973                 }
974         }
975 }
976
977 func (c *connection) onReadExtendedMsg(id byte, payload []byte) (err error) {
978         defer func() {
979                 // TODO: Should we still do this?
980                 if err != nil {
981                         // These clients use their own extension IDs for outgoing message
982                         // types, which is incorrect.
983                         if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) || strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
984                                 err = nil
985                         }
986                 }
987         }()
988         t := c.t
989         cl := t.cl
990         switch id {
991         case pp.HandshakeExtendedID:
992                 // TODO: Create a bencode struct for this.
993                 var d map[string]interface{}
994                 err := bencode.Unmarshal(payload, &d)
995                 if err != nil {
996                         return fmt.Errorf("error decoding extended message payload: %s", err)
997                 }
998                 // log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d)
999                 if reqq, ok := d["reqq"]; ok {
1000                         if i, ok := reqq.(int64); ok {
1001                                 c.PeerMaxRequests = int(i)
1002                         }
1003                 }
1004                 if v, ok := d["v"]; ok {
1005                         c.PeerClientName = v.(string)
1006                 }
1007                 if m, ok := d["m"]; ok {
1008                         mTyped, ok := m.(map[string]interface{})
1009                         if !ok {
1010                                 return errors.New("handshake m value is not dict")
1011                         }
1012                         if c.PeerExtensionIDs == nil {
1013                                 c.PeerExtensionIDs = make(map[string]byte, len(mTyped))
1014                         }
1015                         for name, v := range mTyped {
1016                                 id, ok := v.(int64)
1017                                 if !ok {
1018                                         log.Printf("bad handshake m item extension ID type: %T", v)
1019                                         continue
1020                                 }
1021                                 if id == 0 {
1022                                         delete(c.PeerExtensionIDs, name)
1023                                 } else {
1024                                         if c.PeerExtensionIDs[name] == 0 {
1025                                                 supportedExtensionMessages.Add(name, 1)
1026                                         }
1027                                         c.PeerExtensionIDs[name] = byte(id)
1028                                 }
1029                         }
1030                 }
1031                 metadata_sizeUntyped, ok := d["metadata_size"]
1032                 if ok {
1033                         metadata_size, ok := metadata_sizeUntyped.(int64)
1034                         if !ok {
1035                                 log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
1036                         } else {
1037                                 err = t.setMetadataSize(metadata_size)
1038                                 if err != nil {
1039                                         return fmt.Errorf("error setting metadata size to %d", metadata_size)
1040                                 }
1041                         }
1042                 }
1043                 if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
1044                         c.requestPendingMetadata()
1045                 }
1046                 return nil
1047         case metadataExtendedId:
1048                 err := cl.gotMetadataExtensionMsg(payload, t, c)
1049                 if err != nil {
1050                         return fmt.Errorf("error handling metadata extension message: %s", err)
1051                 }
1052                 return nil
1053         case pexExtendedId:
1054                 if cl.config.DisablePEX {
1055                         return nil
1056                 }
1057                 var pexMsg peerExchangeMessage
1058                 err := bencode.Unmarshal(payload, &pexMsg)
1059                 if err != nil {
1060                         return fmt.Errorf("error unmarshalling PEX message: %s", err)
1061                 }
1062                 go func() {
1063                         cl.mu.Lock()
1064                         t.addPeers(func() (ret []Peer) {
1065                                 for i, cp := range pexMsg.Added {
1066                                         p := Peer{
1067                                                 IP:     make([]byte, 4),
1068                                                 Port:   cp.Port,
1069                                                 Source: peerSourcePEX,
1070                                         }
1071                                         if i < len(pexMsg.AddedFlags) && pexMsg.AddedFlags[i]&0x01 != 0 {
1072                                                 p.SupportsEncryption = true
1073                                         }
1074                                         missinggo.CopyExact(p.IP, cp.IP[:])
1075                                         ret = append(ret, p)
1076                                 }
1077                                 return
1078                         }())
1079                         cl.mu.Unlock()
1080                 }()
1081                 return nil
1082         default:
1083                 return fmt.Errorf("unexpected extended message ID: %v", id)
1084         }
1085 }
1086
1087 // Set both the Reader and Writer for the connection from a single ReadWriter.
1088 func (cn *connection) setRW(rw io.ReadWriter) {
1089         cn.r = rw
1090         cn.w = rw
1091 }
1092
1093 // Returns the Reader and Writer as a combined ReadWriter.
1094 func (cn *connection) rw() io.ReadWriter {
1095         return struct {
1096                 io.Reader
1097                 io.Writer
1098         }{cn.r, cn.w}
1099 }
1100
1101 // Handle a received chunk from a peer.
1102 func (c *connection) receiveChunk(msg *pp.Message) {
1103         t := c.t
1104         cl := t.cl
1105         chunksReceived.Add(1)
1106
1107         req := newRequestFromMessage(msg)
1108
1109         // Request has been satisfied.
1110         if c.deleteRequest(req) {
1111                 c.updateRequests()
1112         } else {
1113                 unexpectedChunksReceived.Add(1)
1114         }
1115
1116         // Do we actually want this chunk?
1117         if !t.wantPiece(req) {
1118                 unwantedChunksReceived.Add(1)
1119                 c.stats.ChunksReadUnwanted++
1120                 c.t.stats.ChunksReadUnwanted++
1121                 return
1122         }
1123
1124         index := int(req.Index)
1125         piece := &t.pieces[index]
1126
1127         c.stats.ChunksReadUseful++
1128         c.t.stats.ChunksReadUseful++
1129         c.stats.BytesReadUsefulData += int64(len(msg.Piece))
1130         c.t.stats.BytesReadUsefulData += int64(len(msg.Piece))
1131         c.lastUsefulChunkReceived = time.Now()
1132         // if t.fastestConn != c {
1133         // log.Printf("setting fastest connection %p", c)
1134         // }
1135         t.fastestConn = c
1136
1137         // Need to record that it hasn't been written yet, before we attempt to do
1138         // anything with it.
1139         piece.incrementPendingWrites()
1140         // Record that we have the chunk, so we aren't trying to download it while
1141         // waiting for it to be written to storage.
1142         piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
1143
1144         // Cancel pending requests for this chunk.
1145         for c := range t.conns {
1146                 c.postCancel(req)
1147         }
1148
1149         err := func() error {
1150                 cl.mu.Unlock()
1151                 defer cl.mu.Lock()
1152                 // Write the chunk out. Note that the upper bound on chunk writing
1153                 // concurrency will be the number of connections. We write inline with
1154                 // receiving the chunk (with this lock dance), because we want to
1155                 // handle errors synchronously and I haven't thought of a nice way to
1156                 // defer any concurrency to the storage and have that notify the
1157                 // client of errors. TODO: Do that instead.
1158                 return t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
1159         }()
1160
1161         piece.decrementPendingWrites()
1162
1163         if err != nil {
1164                 log.Printf("%s (%s): error writing chunk %v: %s", t, t.infoHash, req, err)
1165                 t.pendRequest(req)
1166                 t.updatePieceCompletion(int(msg.Index))
1167                 return
1168         }
1169
1170         // It's important that the piece is potentially queued before we check if
1171         // the piece is still wanted, because if it is queued, it won't be wanted.
1172         if t.pieceAllDirty(index) {
1173                 t.queuePieceCheck(int(req.Index))
1174                 t.pendAllChunkSpecs(index)
1175         }
1176
1177         c.onDirtiedPiece(index)
1178
1179         cl.event.Broadcast()
1180         t.publishPieceChange(int(req.Index))
1181 }
1182
1183 func (c *connection) onDirtiedPiece(piece int) {
1184         if c.peerTouchedPieces == nil {
1185                 c.peerTouchedPieces = make(map[int]struct{})
1186         }
1187         c.peerTouchedPieces[piece] = struct{}{}
1188         ds := &c.t.pieces[piece].dirtiers
1189         if *ds == nil {
1190                 *ds = make(map[*connection]struct{})
1191         }
1192         (*ds)[c] = struct{}{}
1193 }
1194
1195 func (c *connection) uploadAllowed() bool {
1196         if c.t.cl.config.NoUpload {
1197                 return false
1198         }
1199         if c.t.seeding() {
1200                 return true
1201         }
1202         if !c.peerHasWantedPieces() {
1203                 return false
1204         }
1205         // Don't upload more than 100 KiB more than we download.
1206         if c.stats.BytesWrittenData >= c.stats.BytesReadData+100<<10 {
1207                 return false
1208         }
1209         return true
1210 }
1211
1212 func (c *connection) setRetryUploadTimer(delay time.Duration) {
1213         if c.uploadTimer == nil {
1214                 c.uploadTimer = time.AfterFunc(delay, c.writerCond.Broadcast)
1215         } else {
1216                 c.uploadTimer.Reset(delay)
1217         }
1218 }
1219
1220 // Also handles choking and unchoking of the remote peer.
1221 func (c *connection) upload(msg func(pp.Message) bool) bool {
1222         // Breaking or completing this loop means we don't want to upload to the
1223         // peer anymore, and we choke them.
1224 another:
1225         for c.uploadAllowed() {
1226                 // We want to upload to the peer.
1227                 if !c.Unchoke(msg) {
1228                         return false
1229                 }
1230                 for r := range c.PeerRequests {
1231                         res := c.t.cl.uploadLimit.ReserveN(time.Now(), int(r.Length))
1232                         if !res.OK() {
1233                                 panic(fmt.Sprintf("upload rate limiter burst size < %d", r.Length))
1234                         }
1235                         delay := res.Delay()
1236                         if delay > 0 {
1237                                 res.Cancel()
1238                                 c.setRetryUploadTimer(delay)
1239                                 // Hard to say what to return here.
1240                                 return true
1241                         }
1242                         more, err := c.sendChunk(r, msg)
1243                         if err != nil {
1244                                 i := int(r.Index)
1245                                 if c.t.pieceComplete(i) {
1246                                         c.t.updatePieceCompletion(i)
1247                                         if !c.t.pieceComplete(i) {
1248                                                 // We had the piece, but not anymore.
1249                                                 break another
1250                                         }
1251                                 }
1252                                 log.Str("error sending chunk to peer").AddValues(c, r, err).Log(c.t.logger)
1253                                 // If we failed to send a chunk, choke the peer to ensure they
1254                                 // flush all their requests. We've probably dropped a piece,
1255                                 // but there's no way to communicate this to the peer. If they
1256                                 // ask for it again, we'll kick them to allow us to send them
1257                                 // an updated bitfield.
1258                                 break another
1259                         }
1260                         delete(c.PeerRequests, r)
1261                         if !more {
1262                                 return false
1263                         }
1264                         goto another
1265                 }
1266                 return true
1267         }
1268         return c.Choke(msg)
1269 }
1270
1271 func (cn *connection) Drop() {
1272         cn.t.dropConnection(cn)
1273 }
1274
1275 func (cn *connection) netGoodPiecesDirtied() int64 {
1276         return cn.stats.PiecesDirtiedGood - cn.stats.PiecesDirtiedBad
1277 }
1278
1279 func (c *connection) peerHasWantedPieces() bool {
1280         return !c.pieceRequestOrder.IsEmpty()
1281 }
1282
1283 func (c *connection) numLocalRequests() int {
1284         return len(c.requests)
1285 }
1286
1287 func (c *connection) deleteRequest(r request) bool {
1288         if _, ok := c.requests[r]; !ok {
1289                 return false
1290         }
1291         delete(c.requests, r)
1292         c.t.pendingRequests[r]--
1293         c.updateRequests()
1294         return true
1295 }
1296
1297 func (c *connection) deleteAllRequests() {
1298         for r := range c.requests {
1299                 c.deleteRequest(r)
1300         }
1301         // for c := range c.t.conns {
1302         //      c.tickleWriter()
1303         // }
1304 }
1305
1306 func (c *connection) tickleWriter() {
1307         c.writerCond.Broadcast()
1308 }
1309
1310 func (c *connection) postCancel(r request) bool {
1311         if !c.deleteRequest(r) {
1312                 return false
1313         }
1314         c.Post(makeCancelMessage(r))
1315         return true
1316 }
1317
1318 func (c *connection) sendChunk(r request, msg func(pp.Message) bool) (more bool, err error) {
1319         // Count the chunk being sent, even if it isn't.
1320         b := make([]byte, r.Length)
1321         p := c.t.info.Piece(int(r.Index))
1322         n, err := c.t.readAt(b, p.Offset()+int64(r.Begin))
1323         if n != len(b) {
1324                 if err == nil {
1325                         panic("expected error")
1326                 }
1327                 return
1328         } else if err == io.EOF {
1329                 err = nil
1330         }
1331         more = msg(pp.Message{
1332                 Type:  pp.Piece,
1333                 Index: r.Index,
1334                 Begin: r.Begin,
1335                 Piece: b,
1336         })
1337         uploadChunksPosted.Add(1)
1338         c.lastChunkSent = time.Now()
1339         return
1340 }
1341
1342 func (c *connection) setTorrent(t *Torrent) {
1343         if c.t != nil {
1344                 panic("connection already associated with a torrent")
1345         }
1346         c.t = t
1347         t.conns[c] = struct{}{}
1348 }