]> Sergey Matveev's repositories - btrtrc.git/blob - connection.go
Support IPv6 peers over PEX
[btrtrc.git] / connection.go
1 package torrent
2
3 import (
4         "bufio"
5         "bytes"
6         "errors"
7         "fmt"
8         "io"
9         "math/rand"
10         "net"
11         "strconv"
12         "strings"
13         "sync"
14         "time"
15
16         "github.com/anacrolix/log"
17
18         "github.com/anacrolix/missinggo"
19         "github.com/anacrolix/missinggo/bitmap"
20         "github.com/anacrolix/missinggo/iter"
21         "github.com/anacrolix/missinggo/prioritybitmap"
22
23         "github.com/anacrolix/torrent/bencode"
24         "github.com/anacrolix/torrent/mse"
25         pp "github.com/anacrolix/torrent/peer_protocol"
26 )
27
28 type peerSource string
29
30 const (
31         peerSourceTracker         = "T" // It's the default.
32         peerSourceIncoming        = "I"
33         peerSourceDHTGetPeers     = "Hg"
34         peerSourceDHTAnnouncePeer = "Ha"
35         peerSourcePEX             = "X"
36 )
37
38 // Maintains the state of a connection with a peer.
39 type connection struct {
40         t *Torrent
41         // The actual Conn, used for closing, and setting socket options.
42         conn net.Conn
43         // The Reader and Writer for this Conn, with hooks installed for stats,
44         // limiting, deadlines etc.
45         w io.Writer
46         r io.Reader
47         // True if the connection is operating over MSE obfuscation.
48         headerEncrypted bool
49         cryptoMethod    uint32
50         Discovery       peerSource
51         uTP             bool
52         closed          missinggo.Event
53
54         stats ConnStats
55
56         lastMessageReceived     time.Time
57         completedHandshake      time.Time
58         lastUsefulChunkReceived time.Time
59         lastChunkSent           time.Time
60
61         // Stuff controlled by the local peer.
62         Interested           bool
63         lastBecameInterested time.Time
64         priorInterest        time.Duration
65
66         Choked           bool
67         requests         map[request]struct{}
68         requestsLowWater int
69         // Indexed by metadata piece, set to true if posted and pending a
70         // response.
71         metadataRequests []bool
72         sentHaves        bitmap.Bitmap
73
74         // Stuff controlled by the remote peer.
75         PeerID             PeerID
76         PeerInterested     bool
77         PeerChoked         bool
78         PeerRequests       map[request]struct{}
79         PeerExtensionBytes peerExtensionBytes
80         // The pieces the peer has claimed to have.
81         peerPieces bitmap.Bitmap
82         // The peer has everything. This can occur due to a special message, when
83         // we may not even know the number of pieces in the torrent yet.
84         peerSentHaveAll bool
85         // The highest possible number of pieces the torrent could have based on
86         // communication with the peer. Generally only useful until we have the
87         // torrent info.
88         peerMinPieces int
89         // Pieces we've accepted chunks for from the peer.
90         peerTouchedPieces map[int]struct{}
91         peerAllowedFast   bitmap.Bitmap
92
93         PeerMaxRequests  int // Maximum pending requests the peer allows.
94         PeerExtensionIDs map[string]byte
95         PeerClientName   string
96
97         pieceInclination  []int
98         pieceRequestOrder prioritybitmap.PriorityBitmap
99
100         writeBuffer *bytes.Buffer
101         uploadTimer *time.Timer
102         writerCond  sync.Cond
103 }
104
105 func (cn *connection) cumInterest() time.Duration {
106         ret := cn.priorInterest
107         if cn.Interested {
108                 ret += time.Since(cn.lastBecameInterested)
109         }
110         return ret
111 }
112
113 func (cn *connection) peerHasAllPieces() (all bool, known bool) {
114         if cn.peerSentHaveAll {
115                 return true, true
116         }
117         if !cn.t.haveInfo() {
118                 return false, false
119         }
120         return bitmap.Flip(cn.peerPieces, 0, cn.t.numPieces()).IsEmpty(), true
121 }
122
123 func (cn *connection) mu() sync.Locker {
124         return &cn.t.cl.mu
125 }
126
127 func (cn *connection) remoteAddr() net.Addr {
128         return cn.conn.RemoteAddr()
129 }
130
131 func (cn *connection) localAddr() net.Addr {
132         return cn.conn.LocalAddr()
133 }
134
135 func (cn *connection) supportsExtension(ext string) bool {
136         _, ok := cn.PeerExtensionIDs[ext]
137         return ok
138 }
139
140 // The best guess at number of pieces in the torrent for this peer.
141 func (cn *connection) bestPeerNumPieces() int {
142         if cn.t.haveInfo() {
143                 return cn.t.numPieces()
144         }
145         return cn.peerMinPieces
146 }
147
148 func (cn *connection) completedString() string {
149         have := cn.peerPieces.Len()
150         if cn.peerSentHaveAll {
151                 have = cn.bestPeerNumPieces()
152         }
153         return fmt.Sprintf("%d/%d", have, cn.bestPeerNumPieces())
154 }
155
156 // Correct the PeerPieces slice length. Return false if the existing slice is
157 // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
158 // messages.
159 func (cn *connection) setNumPieces(num int) error {
160         cn.peerPieces.RemoveRange(num, bitmap.ToEnd)
161         cn.peerPiecesChanged()
162         return nil
163 }
164
165 func eventAgeString(t time.Time) string {
166         if t.IsZero() {
167                 return "never"
168         }
169         return fmt.Sprintf("%.2fs ago", time.Since(t).Seconds())
170 }
171
172 func (cn *connection) connectionFlags() (ret string) {
173         c := func(b byte) {
174                 ret += string([]byte{b})
175         }
176         if cn.cryptoMethod == mse.CryptoMethodRC4 {
177                 c('E')
178         } else if cn.headerEncrypted {
179                 c('e')
180         }
181         ret += string(cn.Discovery)
182         if cn.uTP {
183                 c('T')
184         }
185         return
186 }
187
188 // Inspired by https://trac.transmissionbt.com/wiki/PeerStatusText
189 func (cn *connection) statusFlags() (ret string) {
190         c := func(b byte) {
191                 ret += string([]byte{b})
192         }
193         if cn.Interested {
194                 c('i')
195         }
196         if cn.Choked {
197                 c('c')
198         }
199         c('-')
200         ret += cn.connectionFlags()
201         c('-')
202         if cn.PeerInterested {
203                 c('i')
204         }
205         if cn.PeerChoked {
206                 c('c')
207         }
208         return
209 }
210
211 // func (cn *connection) String() string {
212 //      var buf bytes.Buffer
213 //      cn.WriteStatus(&buf, nil)
214 //      return buf.String()
215 // }
216
217 func (cn *connection) downloadRate() float64 {
218         return float64(cn.stats.BytesReadUsefulData) / cn.cumInterest().Seconds()
219 }
220
221 func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
222         // \t isn't preserved in <pre> blocks?
223         fmt.Fprintf(w, "%+-55q %s %s-%s\n", cn.PeerID, cn.PeerExtensionBytes, cn.localAddr(), cn.remoteAddr())
224         fmt.Fprintf(w, "    last msg: %s, connected: %s, last helpful: %s, itime: %s\n",
225                 eventAgeString(cn.lastMessageReceived),
226                 eventAgeString(cn.completedHandshake),
227                 eventAgeString(cn.lastHelpful()),
228                 cn.cumInterest(),
229         )
230         fmt.Fprintf(w,
231                 "    %s completed, %d pieces touched, good chunks: %d/%d-%d reqq: (%d,%d,%d]-%d, flags: %s, dr: %.1f KiB/s\n",
232                 cn.completedString(),
233                 len(cn.peerTouchedPieces),
234                 cn.stats.ChunksReadUseful,
235                 cn.stats.ChunksRead,
236                 cn.stats.ChunksWritten,
237                 cn.requestsLowWater,
238                 cn.numLocalRequests(),
239                 cn.nominalMaxRequests(),
240                 len(cn.PeerRequests),
241                 cn.statusFlags(),
242                 cn.downloadRate()/(1<<10),
243         )
244         roi := cn.pieceRequestOrderIter()
245         fmt.Fprintf(w, "    next pieces: %v%s\n",
246                 iter.ToSlice(iter.Head(10, roi)),
247                 func() string {
248                         if cn.shouldRequestWithoutBias() {
249                                 return " (fastest)"
250                         } else {
251                                 return ""
252                         }
253                 }())
254 }
255
256 func (cn *connection) Close() {
257         if !cn.closed.Set() {
258                 return
259         }
260         cn.tickleWriter()
261         cn.discardPieceInclination()
262         cn.pieceRequestOrder.Clear()
263         if cn.conn != nil {
264                 go cn.conn.Close()
265         }
266 }
267
268 func (cn *connection) PeerHasPiece(piece int) bool {
269         return cn.peerSentHaveAll || cn.peerPieces.Contains(piece)
270 }
271
272 // Writes a message into the write buffer.
273 func (cn *connection) Post(msg pp.Message) {
274         messageTypesPosted.Add(msg.Type.String(), 1)
275         // We don't need to track bytes here because a connection.w Writer wrapper
276         // takes care of that (although there's some delay between us recording
277         // the message, and the connection writer flushing it out.).
278         cn.writeBuffer.Write(msg.MustMarshalBinary())
279         // Last I checked only Piece messages affect stats, and we don't post
280         // those.
281         cn.wroteMsg(&msg)
282         cn.tickleWriter()
283 }
284
285 func (cn *connection) requestMetadataPiece(index int) {
286         eID := cn.PeerExtensionIDs["ut_metadata"]
287         if eID == 0 {
288                 return
289         }
290         if index < len(cn.metadataRequests) && cn.metadataRequests[index] {
291                 return
292         }
293         cn.Post(pp.Message{
294                 Type:       pp.Extended,
295                 ExtendedID: eID,
296                 ExtendedPayload: func() []byte {
297                         b, err := bencode.Marshal(map[string]int{
298                                 "msg_type": pp.RequestMetadataExtensionMsgType,
299                                 "piece":    index,
300                         })
301                         if err != nil {
302                                 panic(err)
303                         }
304                         return b
305                 }(),
306         })
307         for index >= len(cn.metadataRequests) {
308                 cn.metadataRequests = append(cn.metadataRequests, false)
309         }
310         cn.metadataRequests[index] = true
311 }
312
313 func (cn *connection) requestedMetadataPiece(index int) bool {
314         return index < len(cn.metadataRequests) && cn.metadataRequests[index]
315 }
316
317 func clamp(min, value, max int64) int64 {
318         if min > max {
319                 panic("harumph")
320         }
321         if value < min {
322                 value = min
323         }
324         if value > max {
325                 value = max
326         }
327         return value
328 }
329
330 func max(as ...int64) int64 {
331         ret := as[0]
332         for _, a := range as[1:] {
333                 if a > ret {
334                         ret = a
335                 }
336         }
337         return ret
338 }
339
340 // The actual value to use as the maximum outbound requests.
341 func (cn *connection) nominalMaxRequests() (ret int) {
342         return int(clamp(1, int64(cn.PeerMaxRequests), max(64, cn.stats.ChunksReadUseful-(cn.stats.ChunksRead-cn.stats.ChunksReadUseful))))
343 }
344
345 func (cn *connection) onPeerSentCancel(r request) {
346         if _, ok := cn.PeerRequests[r]; !ok {
347                 torrent.Add("unexpected cancels received", 1)
348                 return
349         }
350         if cn.fastEnabled() {
351                 cn.reject(r)
352         } else {
353                 delete(cn.PeerRequests, r)
354         }
355 }
356
357 func (cn *connection) Choke(msg messageWriter) (more bool) {
358         if cn.Choked {
359                 return true
360         }
361         cn.Choked = true
362         more = msg(pp.Message{
363                 Type: pp.Choke,
364         })
365         if cn.fastEnabled() {
366                 for r := range cn.PeerRequests {
367                         // TODO: Don't reject pieces in allowed fast set.
368                         cn.reject(r)
369                 }
370         } else {
371                 cn.PeerRequests = nil
372         }
373         return
374 }
375
376 func (cn *connection) Unchoke(msg func(pp.Message) bool) bool {
377         if !cn.Choked {
378                 return true
379         }
380         cn.Choked = false
381         return msg(pp.Message{
382                 Type: pp.Unchoke,
383         })
384 }
385
386 func (cn *connection) SetInterested(interested bool, msg func(pp.Message) bool) bool {
387         if cn.Interested == interested {
388                 return true
389         }
390         cn.Interested = interested
391         if interested {
392                 cn.lastBecameInterested = time.Now()
393         } else if !cn.lastBecameInterested.IsZero() {
394                 cn.priorInterest += time.Since(cn.lastBecameInterested)
395         }
396         // log.Printf("%p: setting interest: %v", cn, interested)
397         return msg(pp.Message{
398                 Type: func() pp.MessageType {
399                         if interested {
400                                 return pp.Interested
401                         } else {
402                                 return pp.NotInterested
403                         }
404                 }(),
405         })
406 }
407
408 // The function takes a message to be sent, and returns true if more messages
409 // are okay.
410 type messageWriter func(pp.Message) bool
411
412 // Proxies the messageWriter's response.
413 func (cn *connection) request(r request, mw messageWriter) bool {
414         if cn.requests == nil {
415                 cn.requests = make(map[request]struct{}, cn.nominalMaxRequests())
416         }
417         if _, ok := cn.requests[r]; ok {
418                 panic("chunk already requested")
419         }
420         if !cn.PeerHasPiece(r.Index.Int()) {
421                 panic("requesting piece peer doesn't have")
422         }
423         cn.requests[r] = struct{}{}
424         if _, ok := cn.t.conns[cn]; !ok {
425                 panic("requesting but not in active conns")
426         }
427         if cn.PeerChoked {
428                 if cn.peerAllowedFast.Get(int(r.Index)) {
429                         torrent.Add("allowed fast requests sent", 1)
430                 } else {
431                         panic("requesting while choked and not allowed fast")
432                 }
433         }
434         cn.t.pendingRequests[r]++
435         return mw(pp.Message{
436                 Type:   pp.Request,
437                 Index:  r.Index,
438                 Begin:  r.Begin,
439                 Length: r.Length,
440         })
441 }
442
443 func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) {
444         numFillBuffers.Add(1)
445         cancel, new, i := cn.desiredRequestState()
446         if !cn.SetInterested(i, msg) {
447                 return
448         }
449         if cancel && len(cn.requests) != 0 {
450                 fillBufferSentCancels.Add(1)
451                 for r := range cn.requests {
452                         cn.deleteRequest(r)
453                         // log.Printf("%p: cancelling request: %v", cn, r)
454                         if !msg(makeCancelMessage(r)) {
455                                 return
456                         }
457                 }
458         }
459         if len(new) != 0 {
460                 fillBufferSentRequests.Add(1)
461                 for _, r := range new {
462                         if !cn.request(r, msg) {
463                                 // If we didn't completely top up the requests, we shouldn't
464                                 // mark the low water, since we'll want to top up the requests
465                                 // as soon as we have more write buffer space.
466                                 return
467                         }
468                 }
469                 cn.requestsLowWater = len(cn.requests) / 2
470         }
471         cn.upload(msg)
472 }
473
474 // Routine that writes to the peer. Some of what to write is buffered by
475 // activity elsewhere in the Client, and some is determined locally when the
476 // connection is writable.
477 func (cn *connection) writer(keepAliveTimeout time.Duration) {
478         var (
479                 lastWrite      time.Time = time.Now()
480                 keepAliveTimer *time.Timer
481         )
482         keepAliveTimer = time.AfterFunc(keepAliveTimeout, func() {
483                 cn.mu().Lock()
484                 defer cn.mu().Unlock()
485                 if time.Since(lastWrite) >= keepAliveTimeout {
486                         cn.tickleWriter()
487                 }
488                 keepAliveTimer.Reset(keepAliveTimeout)
489         })
490         cn.mu().Lock()
491         defer cn.mu().Unlock()
492         defer cn.Close()
493         defer keepAliveTimer.Stop()
494         frontBuf := new(bytes.Buffer)
495         for {
496                 if cn.closed.IsSet() {
497                         return
498                 }
499                 if cn.writeBuffer.Len() == 0 {
500                         cn.fillWriteBuffer(func(msg pp.Message) bool {
501                                 cn.wroteMsg(&msg)
502                                 cn.writeBuffer.Write(msg.MustMarshalBinary())
503                                 return cn.writeBuffer.Len() < 1<<16
504                         })
505                 }
506                 if cn.writeBuffer.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout {
507                         cn.writeBuffer.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
508                         postedKeepalives.Add(1)
509                 }
510                 if cn.writeBuffer.Len() == 0 {
511                         // TODO: Minimize wakeups....
512                         cn.writerCond.Wait()
513                         continue
514                 }
515                 // Flip the buffers.
516                 frontBuf, cn.writeBuffer = cn.writeBuffer, frontBuf
517                 cn.mu().Unlock()
518                 n, err := cn.w.Write(frontBuf.Bytes())
519                 cn.mu().Lock()
520                 if n != 0 {
521                         lastWrite = time.Now()
522                         keepAliveTimer.Reset(keepAliveTimeout)
523                 }
524                 if err != nil {
525                         return
526                 }
527                 if n != frontBuf.Len() {
528                         panic("short write")
529                 }
530                 frontBuf.Reset()
531         }
532 }
533
534 func (cn *connection) Have(piece int) {
535         if cn.sentHaves.Get(piece) {
536                 return
537         }
538         cn.Post(pp.Message{
539                 Type:  pp.Have,
540                 Index: pp.Integer(piece),
541         })
542         cn.sentHaves.Add(piece)
543 }
544
545 func (cn *connection) PostBitfield() {
546         if cn.sentHaves.Len() != 0 {
547                 panic("bitfield must be first have-related message sent")
548         }
549         if !cn.t.haveAnyPieces() {
550                 return
551         }
552         cn.Post(pp.Message{
553                 Type:     pp.Bitfield,
554                 Bitfield: cn.t.bitfield(),
555         })
556         cn.sentHaves = cn.t.completedPieces.Copy()
557 }
558
559 // Determines interest and requests to send to a connected peer.
560 func nextRequestState(
561         networkingEnabled bool,
562         currentRequests map[request]struct{},
563         peerChoking bool,
564         iterPendingRequests func(f func(request) bool),
565         requestsLowWater int,
566         requestsHighWater int,
567         allowedFast bitmap.Bitmap,
568 ) (
569         cancelExisting bool, // Cancel all our pending requests
570         newRequests []request, // Chunks to request that we currently aren't
571         interested bool, // Whether we should indicate interest, even if we don't request anything
572 ) {
573         if !networkingEnabled {
574                 return true, nil, false
575         }
576         if len(currentRequests) > requestsLowWater {
577                 return false, nil, true
578         }
579         iterPendingRequests(func(r request) bool {
580                 interested = true
581                 if peerChoking {
582                         if allowedFast.IsEmpty() {
583                                 return false
584                         }
585                         if !allowedFast.Get(int(r.Index)) {
586                                 return true
587                         }
588                 }
589                 if len(currentRequests)+len(newRequests) >= requestsHighWater {
590                         return false
591                 }
592                 if _, ok := currentRequests[r]; !ok {
593                         if newRequests == nil {
594                                 newRequests = make([]request, 0, requestsHighWater-len(currentRequests))
595                         }
596                         newRequests = append(newRequests, r)
597                 }
598                 return true
599         })
600         return
601 }
602
603 func (cn *connection) updateRequests() {
604         // log.Print("update requests")
605         cn.tickleWriter()
606 }
607
608 // Emits the indices in the Bitmaps bms in order, never repeating any index.
609 // skip is mutated during execution, and its initial values will never be
610 // emitted.
611 func iterBitmapsDistinct(skip *bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func {
612         return func(cb iter.Callback) {
613                 for _, bm := range bms {
614                         if !iter.All(func(i interface{}) bool {
615                                 skip.Add(i.(int))
616                                 return cb(i)
617                         }, bitmap.Sub(bm, *skip).Iter) {
618                                 return
619                         }
620                 }
621         }
622 }
623
624 func (cn *connection) unbiasedPieceRequestOrder() iter.Func {
625         now, readahead := cn.t.readerPiecePriorities()
626         var skip bitmap.Bitmap
627         if !cn.peerSentHaveAll {
628                 // Pieces to skip include pieces the peer doesn't have
629                 skip = bitmap.Flip(cn.peerPieces, 0, cn.t.numPieces())
630         }
631         // And pieces that we already have.
632         skip.Union(cn.t.completedPieces)
633         // Return an iterator over the different priority classes, minus the skip
634         // pieces.
635         return iter.Chain(
636                 iterBitmapsDistinct(&skip, now, readahead),
637                 func(cb iter.Callback) {
638                         cn.t.pendingPieces.IterTyped(func(piece int) bool {
639                                 if skip.Contains(piece) {
640                                         return true
641                                 }
642                                 more := cb(piece)
643                                 skip.Add(piece)
644                                 return more
645                         })
646                 },
647         )
648 }
649
650 // The connection should download highest priority pieces first, without any
651 // inclination toward avoiding wastage. Generally we might do this if there's
652 // a single connection, or this is the fastest connection, and we have active
653 // readers that signal an ordering preference. It's conceivable that the best
654 // connection should do this, since it's least likely to waste our time if
655 // assigned to the highest priority pieces, and assigning more than one this
656 // role would cause significant wasted bandwidth.
657 func (cn *connection) shouldRequestWithoutBias() bool {
658         if cn.t.requestStrategy != 2 {
659                 return false
660         }
661         if len(cn.t.readers) == 0 {
662                 return false
663         }
664         if len(cn.t.conns) == 1 {
665                 return true
666         }
667         if cn == cn.t.fastestConn {
668                 return true
669         }
670         return false
671 }
672
673 func (cn *connection) pieceRequestOrderIter() iter.Func {
674         if cn.shouldRequestWithoutBias() {
675                 return cn.unbiasedPieceRequestOrder()
676         } else {
677                 return cn.pieceRequestOrder.Iter
678         }
679 }
680
681 func (cn *connection) iterPendingRequests(f func(request) bool) {
682         cn.pieceRequestOrderIter()(func(_piece interface{}) bool {
683                 piece := _piece.(int)
684                 return iterUndirtiedChunks(piece, cn.t, func(cs chunkSpec) bool {
685                         r := request{pp.Integer(piece), cs}
686                         // log.Println(r, cn.t.pendingRequests[r], cn.requests)
687                         // if _, ok := cn.requests[r]; !ok && cn.t.pendingRequests[r] != 0 {
688                         //      return true
689                         // }
690                         return f(r)
691                 })
692         })
693 }
694
695 func (cn *connection) desiredRequestState() (bool, []request, bool) {
696         return nextRequestState(
697                 cn.t.networkingEnabled,
698                 cn.requests,
699                 cn.PeerChoked,
700                 cn.iterPendingRequests,
701                 cn.requestsLowWater,
702                 cn.nominalMaxRequests(),
703                 cn.peerAllowedFast,
704         )
705 }
706
707 func iterUndirtiedChunks(piece int, t *Torrent, f func(chunkSpec) bool) bool {
708         chunkIndices := t.pieces[piece].undirtiedChunkIndices().ToSortedSlice()
709         // TODO: Use "math/rand".Shuffle >= Go 1.10
710         return iter.ForPerm(len(chunkIndices), func(i int) bool {
711                 return f(t.chunkIndexSpec(chunkIndices[i], piece))
712         })
713 }
714
715 // check callers updaterequests
716 func (cn *connection) stopRequestingPiece(piece int) bool {
717         return cn.pieceRequestOrder.Remove(piece)
718 }
719
720 // This is distinct from Torrent piece priority, which is the user's
721 // preference. Connection piece priority is specific to a connection and is
722 // used to pseudorandomly avoid connections always requesting the same pieces
723 // and thus wasting effort.
724 func (cn *connection) updatePiecePriority(piece int) bool {
725         tpp := cn.t.piecePriority(piece)
726         if !cn.PeerHasPiece(piece) {
727                 tpp = PiecePriorityNone
728         }
729         if tpp == PiecePriorityNone {
730                 return cn.stopRequestingPiece(piece)
731         }
732         prio := cn.getPieceInclination()[piece]
733         switch cn.t.requestStrategy {
734         case 1:
735                 switch tpp {
736                 case PiecePriorityNormal:
737                 case PiecePriorityReadahead:
738                         prio -= cn.t.numPieces()
739                 case PiecePriorityNext, PiecePriorityNow:
740                         prio -= 2 * cn.t.numPieces()
741                 default:
742                         panic(tpp)
743                 }
744                 prio += piece / 3
745         default:
746         }
747         return cn.pieceRequestOrder.Set(piece, prio) || cn.shouldRequestWithoutBias()
748 }
749
750 func (cn *connection) getPieceInclination() []int {
751         if cn.pieceInclination == nil {
752                 cn.pieceInclination = cn.t.getConnPieceInclination()
753         }
754         return cn.pieceInclination
755 }
756
757 func (cn *connection) discardPieceInclination() {
758         if cn.pieceInclination == nil {
759                 return
760         }
761         cn.t.putPieceInclination(cn.pieceInclination)
762         cn.pieceInclination = nil
763 }
764
765 func (cn *connection) peerPiecesChanged() {
766         if cn.t.haveInfo() {
767                 prioritiesChanged := false
768                 for i := range iter.N(cn.t.numPieces()) {
769                         if cn.updatePiecePriority(i) {
770                                 prioritiesChanged = true
771                         }
772                 }
773                 if prioritiesChanged {
774                         cn.updateRequests()
775                 }
776         }
777 }
778
779 func (cn *connection) raisePeerMinPieces(newMin int) {
780         if newMin > cn.peerMinPieces {
781                 cn.peerMinPieces = newMin
782         }
783 }
784
785 func (cn *connection) peerSentHave(piece int) error {
786         if cn.t.haveInfo() && piece >= cn.t.numPieces() || piece < 0 {
787                 return errors.New("invalid piece")
788         }
789         if cn.PeerHasPiece(piece) {
790                 return nil
791         }
792         cn.raisePeerMinPieces(piece + 1)
793         cn.peerPieces.Set(piece, true)
794         if cn.updatePiecePriority(piece) {
795                 cn.updateRequests()
796         }
797         return nil
798 }
799
800 func (cn *connection) peerSentBitfield(bf []bool) error {
801         cn.peerSentHaveAll = false
802         if len(bf)%8 != 0 {
803                 panic("expected bitfield length divisible by 8")
804         }
805         // We know that the last byte means that at most the last 7 bits are
806         // wasted.
807         cn.raisePeerMinPieces(len(bf) - 7)
808         if cn.t.haveInfo() && len(bf) > cn.t.numPieces() {
809                 // Ignore known excess pieces.
810                 bf = bf[:cn.t.numPieces()]
811         }
812         for i, have := range bf {
813                 if have {
814                         cn.raisePeerMinPieces(i + 1)
815                 }
816                 cn.peerPieces.Set(i, have)
817         }
818         cn.peerPiecesChanged()
819         return nil
820 }
821
822 func (cn *connection) onPeerSentHaveAll() error {
823         cn.peerSentHaveAll = true
824         cn.peerPieces.Clear()
825         cn.peerPiecesChanged()
826         return nil
827 }
828
829 func (cn *connection) peerSentHaveNone() error {
830         cn.peerPieces.Clear()
831         cn.peerSentHaveAll = false
832         cn.peerPiecesChanged()
833         return nil
834 }
835
836 func (c *connection) requestPendingMetadata() {
837         if c.t.haveInfo() {
838                 return
839         }
840         if c.PeerExtensionIDs["ut_metadata"] == 0 {
841                 // Peer doesn't support this.
842                 return
843         }
844         // Request metadata pieces that we don't have in a random order.
845         var pending []int
846         for index := 0; index < c.t.metadataPieceCount(); index++ {
847                 if !c.t.haveMetadataPiece(index) && !c.requestedMetadataPiece(index) {
848                         pending = append(pending, index)
849                 }
850         }
851         for _, i := range rand.Perm(len(pending)) {
852                 c.requestMetadataPiece(pending[i])
853         }
854 }
855
856 func (cn *connection) wroteMsg(msg *pp.Message) {
857         messageTypesSent.Add(msg.Type.String(), 1)
858         cn.stats.wroteMsg(msg)
859         cn.t.stats.wroteMsg(msg)
860 }
861
862 func (cn *connection) readMsg(msg *pp.Message) {
863         cn.stats.readMsg(msg)
864         cn.t.stats.readMsg(msg)
865 }
866
867 func (cn *connection) wroteBytes(n int64) {
868         cn.stats.wroteBytes(n)
869         if cn.t != nil {
870                 cn.t.stats.wroteBytes(n)
871         }
872 }
873
874 func (cn *connection) readBytes(n int64) {
875         cn.stats.readBytes(n)
876         if cn.t != nil {
877                 cn.t.stats.readBytes(n)
878         }
879 }
880
881 // Returns whether the connection could be useful to us. We're seeding and
882 // they want data, we don't have metainfo and they can provide it, etc.
883 func (c *connection) useful() bool {
884         t := c.t
885         if c.closed.IsSet() {
886                 return false
887         }
888         if !t.haveInfo() {
889                 return c.supportsExtension("ut_metadata")
890         }
891         if t.seeding() && c.PeerInterested {
892                 return true
893         }
894         if c.peerHasWantedPieces() {
895                 return true
896         }
897         return false
898 }
899
900 func (c *connection) lastHelpful() (ret time.Time) {
901         ret = c.lastUsefulChunkReceived
902         if c.t.seeding() && c.lastChunkSent.After(ret) {
903                 ret = c.lastChunkSent
904         }
905         return
906 }
907
908 func (c *connection) fastEnabled() bool {
909         return c.PeerExtensionBytes.SupportsFast() && c.t.cl.extensionBytes.SupportsFast()
910 }
911
912 func (c *connection) reject(r request) {
913         if !c.fastEnabled() {
914                 panic("fast not enabled")
915         }
916         c.Post(r.ToMsg(pp.Reject))
917         delete(c.PeerRequests, r)
918 }
919
920 func (c *connection) onReadRequest(r request) error {
921         requestedChunkLengths.Add(strconv.FormatUint(r.Length.Uint64(), 10), 1)
922         if r.Begin+r.Length > c.t.pieceLength(int(r.Index)) {
923                 torrent.Add("bad requests received", 1)
924                 return errors.New("bad request")
925         }
926         if _, ok := c.PeerRequests[r]; ok {
927                 torrent.Add("duplicate requests received", 1)
928                 return nil
929         }
930         if c.Choked {
931                 torrent.Add("requests received while choking", 1)
932                 if c.fastEnabled() {
933                         torrent.Add("requests rejected while choking", 1)
934                         c.reject(r)
935                 }
936                 return nil
937         }
938         if len(c.PeerRequests) >= maxRequests {
939                 torrent.Add("requests received while queue full", 1)
940                 if c.fastEnabled() {
941                         c.reject(r)
942                 }
943                 // BEP 6 says we may close here if we choose.
944                 return nil
945         }
946         if !c.t.havePiece(r.Index.Int()) {
947                 // This isn't necessarily them screwing up. We can drop pieces
948                 // from our storage, and can't communicate this to peers
949                 // except by reconnecting.
950                 requestsReceivedForMissingPieces.Add(1)
951                 return fmt.Errorf("peer requested piece we don't have: %v", r.Index.Int())
952         }
953         if c.PeerRequests == nil {
954                 c.PeerRequests = make(map[request]struct{}, maxRequests)
955         }
956         c.PeerRequests[r] = struct{}{}
957         c.tickleWriter()
958         return nil
959 }
960
961 // Processes incoming bittorrent messages. The client lock is held upon entry
962 // and exit. Returning will end the connection.
963 func (c *connection) mainReadLoop() error {
964         t := c.t
965         cl := t.cl
966
967         decoder := pp.Decoder{
968                 R:         bufio.NewReaderSize(c.r, 1<<17),
969                 MaxLength: 256 * 1024,
970                 Pool:      t.chunkPool,
971         }
972         for {
973                 var (
974                         msg pp.Message
975                         err error
976                 )
977                 func() {
978                         cl.mu.Unlock()
979                         defer cl.mu.Lock()
980                         err = decoder.Decode(&msg)
981                 }()
982                 if t.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
983                         return nil
984                 }
985                 if err != nil {
986                         return err
987                 }
988                 c.readMsg(&msg)
989                 c.lastMessageReceived = time.Now()
990                 if msg.Keepalive {
991                         receivedKeepalives.Add(1)
992                         continue
993                 }
994                 messageTypesReceived.Add(msg.Type.String(), 1)
995                 if msg.Type.FastExtension() && !c.fastEnabled() {
996                         return fmt.Errorf("received fast extension message (type=%v) but extension is disabled", msg.Type)
997                 }
998                 switch msg.Type {
999                 case pp.Choke:
1000                         c.PeerChoked = true
1001                         c.deleteAllRequests()
1002                         // We can then reset our interest.
1003                         c.updateRequests()
1004                 case pp.Reject:
1005                         c.deleteRequest(newRequestFromMessage(&msg))
1006                 case pp.Unchoke:
1007                         c.PeerChoked = false
1008                         c.tickleWriter()
1009                 case pp.Interested:
1010                         c.PeerInterested = true
1011                         c.tickleWriter()
1012                 case pp.NotInterested:
1013                         c.PeerInterested = false
1014                         // TODO: Reject?
1015                         c.PeerRequests = nil
1016                 case pp.Have:
1017                         err = c.peerSentHave(int(msg.Index))
1018                 case pp.Request:
1019                         r := newRequestFromMessage(&msg)
1020                         err = c.onReadRequest(r)
1021                 case pp.Cancel:
1022                         req := newRequestFromMessage(&msg)
1023                         c.onPeerSentCancel(req)
1024                 case pp.Bitfield:
1025                         err = c.peerSentBitfield(msg.Bitfield)
1026                 case pp.HaveAll:
1027                         err = c.onPeerSentHaveAll()
1028                 case pp.HaveNone:
1029                         err = c.peerSentHaveNone()
1030                 case pp.Piece:
1031                         c.receiveChunk(&msg)
1032                         if len(msg.Piece) == int(t.chunkSize) {
1033                                 t.chunkPool.Put(&msg.Piece)
1034                         }
1035                 case pp.Extended:
1036                         err = c.onReadExtendedMsg(msg.ExtendedID, msg.ExtendedPayload)
1037                 case pp.Port:
1038                         if cl.dHT == nil {
1039                                 break
1040                         }
1041                         pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
1042                         if err != nil {
1043                                 panic(err)
1044                         }
1045                         if msg.Port != 0 {
1046                                 pingAddr.Port = int(msg.Port)
1047                         }
1048                         go cl.dHT.Ping(pingAddr, nil)
1049                 case pp.AllowedFast:
1050                         torrent.Add("allowed fasts received", 1)
1051                         log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c, debugLogValue).Log(c.t.logger)
1052                         c.peerAllowedFast.Add(int(msg.Index))
1053                         c.updateRequests()
1054                 case pp.Suggest:
1055                         torrent.Add("suggests received", 1)
1056                         log.Fmsg("peer suggested piece %d", msg.Index).AddValues(c, msg.Index, debugLogValue).Log(c.t.logger)
1057                         c.updateRequests()
1058                 default:
1059                         err = fmt.Errorf("received unknown message type: %#v", msg.Type)
1060                 }
1061                 if err != nil {
1062                         return err
1063                 }
1064         }
1065 }
1066
1067 func (c *connection) onReadExtendedMsg(id byte, payload []byte) (err error) {
1068         defer func() {
1069                 // TODO: Should we still do this?
1070                 if err != nil {
1071                         // These clients use their own extension IDs for outgoing message
1072                         // types, which is incorrect.
1073                         if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) || strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
1074                                 err = nil
1075                         }
1076                 }
1077         }()
1078         t := c.t
1079         cl := t.cl
1080         switch id {
1081         case pp.HandshakeExtendedID:
1082                 // TODO: Create a bencode struct for this.
1083                 var d map[string]interface{}
1084                 err := bencode.Unmarshal(payload, &d)
1085                 if err != nil {
1086                         return fmt.Errorf("error decoding extended message payload: %s", err)
1087                 }
1088                 // log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d)
1089                 if reqq, ok := d["reqq"]; ok {
1090                         if i, ok := reqq.(int64); ok {
1091                                 c.PeerMaxRequests = int(i)
1092                         }
1093                 }
1094                 if v, ok := d["v"]; ok {
1095                         c.PeerClientName = v.(string)
1096                 }
1097                 if m, ok := d["m"]; ok {
1098                         mTyped, ok := m.(map[string]interface{})
1099                         if !ok {
1100                                 return errors.New("handshake m value is not dict")
1101                         }
1102                         if c.PeerExtensionIDs == nil {
1103                                 c.PeerExtensionIDs = make(map[string]byte, len(mTyped))
1104                         }
1105                         for name, v := range mTyped {
1106                                 id, ok := v.(int64)
1107                                 if !ok {
1108                                         log.Printf("bad handshake m item extension ID type: %T", v)
1109                                         continue
1110                                 }
1111                                 if id == 0 {
1112                                         delete(c.PeerExtensionIDs, name)
1113                                 } else {
1114                                         if c.PeerExtensionIDs[name] == 0 {
1115                                                 supportedExtensionMessages.Add(name, 1)
1116                                         }
1117                                         c.PeerExtensionIDs[name] = byte(id)
1118                                 }
1119                         }
1120                 }
1121                 metadata_sizeUntyped, ok := d["metadata_size"]
1122                 if ok {
1123                         metadata_size, ok := metadata_sizeUntyped.(int64)
1124                         if !ok {
1125                                 log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
1126                         } else {
1127                                 err = t.setMetadataSize(metadata_size)
1128                                 if err != nil {
1129                                         return fmt.Errorf("error setting metadata size to %d", metadata_size)
1130                                 }
1131                         }
1132                 }
1133                 if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
1134                         c.requestPendingMetadata()
1135                 }
1136                 return nil
1137         case metadataExtendedId:
1138                 err := cl.gotMetadataExtensionMsg(payload, t, c)
1139                 if err != nil {
1140                         return fmt.Errorf("error handling metadata extension message: %s", err)
1141                 }
1142                 return nil
1143         case pexExtendedId:
1144                 if cl.config.DisablePEX {
1145                         // TODO: Maybe close the connection. Check that we're not
1146                         // advertising that we support PEX if it's disabled.
1147                         return nil
1148                 }
1149                 var pexMsg peerExchangeMessage
1150                 err := bencode.Unmarshal(payload, &pexMsg)
1151                 if err != nil {
1152                         return fmt.Errorf("error unmarshalling PEX message: %s", err)
1153                 }
1154                 go func() {
1155                         ps := pexMsg.AddedPeers()
1156                         cl.mu.Lock()
1157                         t.addPeers(ps)
1158                         cl.mu.Unlock()
1159                 }()
1160                 return nil
1161         default:
1162                 return fmt.Errorf("unexpected extended message ID: %v", id)
1163         }
1164 }
1165
1166 // Set both the Reader and Writer for the connection from a single ReadWriter.
1167 func (cn *connection) setRW(rw io.ReadWriter) {
1168         cn.r = rw
1169         cn.w = rw
1170 }
1171
1172 // Returns the Reader and Writer as a combined ReadWriter.
1173 func (cn *connection) rw() io.ReadWriter {
1174         return struct {
1175                 io.Reader
1176                 io.Writer
1177         }{cn.r, cn.w}
1178 }
1179
1180 // Handle a received chunk from a peer.
1181 func (c *connection) receiveChunk(msg *pp.Message) {
1182         t := c.t
1183         cl := t.cl
1184         chunksReceived.Add(1)
1185
1186         req := newRequestFromMessage(msg)
1187
1188         // Request has been satisfied.
1189         if c.deleteRequest(req) {
1190                 c.updateRequests()
1191         } else {
1192                 unexpectedChunksReceived.Add(1)
1193         }
1194
1195         if c.PeerChoked {
1196                 torrent.Add("chunks received while choked", 1)
1197                 if c.peerAllowedFast.Get(int(req.Index)) {
1198                         torrent.Add("chunks received due to allowed fast", 1)
1199                 }
1200         }
1201
1202         // Do we actually want this chunk?
1203         if !t.wantPiece(req) {
1204                 unwantedChunksReceived.Add(1)
1205                 c.stats.ChunksReadUnwanted++
1206                 c.t.stats.ChunksReadUnwanted++
1207                 return
1208         }
1209
1210         index := int(req.Index)
1211         piece := &t.pieces[index]
1212
1213         c.stats.ChunksReadUseful++
1214         c.t.stats.ChunksReadUseful++
1215         c.stats.BytesReadUsefulData += int64(len(msg.Piece))
1216         c.t.stats.BytesReadUsefulData += int64(len(msg.Piece))
1217         c.lastUsefulChunkReceived = time.Now()
1218         // if t.fastestConn != c {
1219         // log.Printf("setting fastest connection %p", c)
1220         // }
1221         t.fastestConn = c
1222
1223         // Need to record that it hasn't been written yet, before we attempt to do
1224         // anything with it.
1225         piece.incrementPendingWrites()
1226         // Record that we have the chunk, so we aren't trying to download it while
1227         // waiting for it to be written to storage.
1228         piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
1229
1230         // Cancel pending requests for this chunk.
1231         for c := range t.conns {
1232                 c.postCancel(req)
1233         }
1234
1235         err := func() error {
1236                 cl.mu.Unlock()
1237                 defer cl.mu.Lock()
1238                 // Write the chunk out. Note that the upper bound on chunk writing
1239                 // concurrency will be the number of connections. We write inline with
1240                 // receiving the chunk (with this lock dance), because we want to
1241                 // handle errors synchronously and I haven't thought of a nice way to
1242                 // defer any concurrency to the storage and have that notify the
1243                 // client of errors. TODO: Do that instead.
1244                 return t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
1245         }()
1246
1247         piece.decrementPendingWrites()
1248
1249         if err != nil {
1250                 log.Printf("%s (%s): error writing chunk %v: %s", t, t.infoHash, req, err)
1251                 t.pendRequest(req)
1252                 t.updatePieceCompletion(int(msg.Index))
1253                 return
1254         }
1255
1256         // It's important that the piece is potentially queued before we check if
1257         // the piece is still wanted, because if it is queued, it won't be wanted.
1258         if t.pieceAllDirty(index) {
1259                 t.queuePieceCheck(int(req.Index))
1260                 t.pendAllChunkSpecs(index)
1261         }
1262
1263         c.onDirtiedPiece(index)
1264
1265         cl.event.Broadcast()
1266         t.publishPieceChange(int(req.Index))
1267 }
1268
1269 func (c *connection) onDirtiedPiece(piece int) {
1270         if c.peerTouchedPieces == nil {
1271                 c.peerTouchedPieces = make(map[int]struct{})
1272         }
1273         c.peerTouchedPieces[piece] = struct{}{}
1274         ds := &c.t.pieces[piece].dirtiers
1275         if *ds == nil {
1276                 *ds = make(map[*connection]struct{})
1277         }
1278         (*ds)[c] = struct{}{}
1279 }
1280
1281 func (c *connection) uploadAllowed() bool {
1282         if c.t.cl.config.NoUpload {
1283                 return false
1284         }
1285         if c.t.seeding() {
1286                 return true
1287         }
1288         if !c.peerHasWantedPieces() {
1289                 return false
1290         }
1291         // Don't upload more than 100 KiB more than we download.
1292         if c.stats.BytesWrittenData >= c.stats.BytesReadData+100<<10 {
1293                 return false
1294         }
1295         return true
1296 }
1297
1298 func (c *connection) setRetryUploadTimer(delay time.Duration) {
1299         if c.uploadTimer == nil {
1300                 c.uploadTimer = time.AfterFunc(delay, c.writerCond.Broadcast)
1301         } else {
1302                 c.uploadTimer.Reset(delay)
1303         }
1304 }
1305
1306 // Also handles choking and unchoking of the remote peer.
1307 func (c *connection) upload(msg func(pp.Message) bool) bool {
1308         // Breaking or completing this loop means we don't want to upload to the
1309         // peer anymore, and we choke them.
1310 another:
1311         for c.uploadAllowed() {
1312                 // We want to upload to the peer.
1313                 if !c.Unchoke(msg) {
1314                         return false
1315                 }
1316                 for r := range c.PeerRequests {
1317                         res := c.t.cl.uploadLimit.ReserveN(time.Now(), int(r.Length))
1318                         if !res.OK() {
1319                                 panic(fmt.Sprintf("upload rate limiter burst size < %d", r.Length))
1320                         }
1321                         delay := res.Delay()
1322                         if delay > 0 {
1323                                 res.Cancel()
1324                                 c.setRetryUploadTimer(delay)
1325                                 // Hard to say what to return here.
1326                                 return true
1327                         }
1328                         more, err := c.sendChunk(r, msg)
1329                         if err != nil {
1330                                 i := int(r.Index)
1331                                 if c.t.pieceComplete(i) {
1332                                         c.t.updatePieceCompletion(i)
1333                                         if !c.t.pieceComplete(i) {
1334                                                 // We had the piece, but not anymore.
1335                                                 break another
1336                                         }
1337                                 }
1338                                 log.Str("error sending chunk to peer").AddValues(c, r, err).Log(c.t.logger)
1339                                 // If we failed to send a chunk, choke the peer to ensure they
1340                                 // flush all their requests. We've probably dropped a piece,
1341                                 // but there's no way to communicate this to the peer. If they
1342                                 // ask for it again, we'll kick them to allow us to send them
1343                                 // an updated bitfield.
1344                                 break another
1345                         }
1346                         delete(c.PeerRequests, r)
1347                         if !more {
1348                                 return false
1349                         }
1350                         goto another
1351                 }
1352                 return true
1353         }
1354         return c.Choke(msg)
1355 }
1356
1357 func (cn *connection) Drop() {
1358         cn.t.dropConnection(cn)
1359 }
1360
1361 func (cn *connection) netGoodPiecesDirtied() int64 {
1362         return cn.stats.PiecesDirtiedGood - cn.stats.PiecesDirtiedBad
1363 }
1364
1365 func (c *connection) peerHasWantedPieces() bool {
1366         return !c.pieceRequestOrder.IsEmpty()
1367 }
1368
1369 func (c *connection) numLocalRequests() int {
1370         return len(c.requests)
1371 }
1372
1373 func (c *connection) deleteRequest(r request) bool {
1374         if _, ok := c.requests[r]; !ok {
1375                 return false
1376         }
1377         delete(c.requests, r)
1378         c.t.pendingRequests[r]--
1379         c.updateRequests()
1380         return true
1381 }
1382
1383 func (c *connection) deleteAllRequests() {
1384         for r := range c.requests {
1385                 c.deleteRequest(r)
1386         }
1387         // for c := range c.t.conns {
1388         //      c.tickleWriter()
1389         // }
1390 }
1391
1392 func (c *connection) tickleWriter() {
1393         c.writerCond.Broadcast()
1394 }
1395
1396 func (c *connection) postCancel(r request) bool {
1397         if !c.deleteRequest(r) {
1398                 return false
1399         }
1400         c.Post(makeCancelMessage(r))
1401         return true
1402 }
1403
1404 func (c *connection) sendChunk(r request, msg func(pp.Message) bool) (more bool, err error) {
1405         // Count the chunk being sent, even if it isn't.
1406         b := make([]byte, r.Length)
1407         p := c.t.info.Piece(int(r.Index))
1408         n, err := c.t.readAt(b, p.Offset()+int64(r.Begin))
1409         if n != len(b) {
1410                 if err == nil {
1411                         panic("expected error")
1412                 }
1413                 return
1414         } else if err == io.EOF {
1415                 err = nil
1416         }
1417         more = msg(pp.Message{
1418                 Type:  pp.Piece,
1419                 Index: r.Index,
1420                 Begin: r.Begin,
1421                 Piece: b,
1422         })
1423         c.lastChunkSent = time.Now()
1424         return
1425 }
1426
1427 func (c *connection) setTorrent(t *Torrent) {
1428         if c.t != nil {
1429                 panic("connection already associated with a torrent")
1430         }
1431         c.t = t
1432         t.conns[c] = struct{}{}
1433 }