]> Sergey Matveev's repositories - btrtrc.git/blob - connection.go
Merge a bunch of stuff into ConnStats and refactor connection.upload
[btrtrc.git] / connection.go
1 package torrent
2
3 import (
4         "bufio"
5         "bytes"
6         "errors"
7         "fmt"
8         "io"
9         "log"
10         "math/rand"
11         "net"
12         "strconv"
13         "strings"
14         "sync"
15         "time"
16
17         "github.com/anacrolix/missinggo"
18         "github.com/anacrolix/missinggo/bitmap"
19         "github.com/anacrolix/missinggo/iter"
20         "github.com/anacrolix/missinggo/prioritybitmap"
21
22         "github.com/anacrolix/torrent/bencode"
23         "github.com/anacrolix/torrent/mse"
24         pp "github.com/anacrolix/torrent/peer_protocol"
25 )
26
27 type peerSource string
28
29 const (
30         peerSourceTracker         = "T" // It's the default.
31         peerSourceIncoming        = "I"
32         peerSourceDHTGetPeers     = "Hg"
33         peerSourceDHTAnnouncePeer = "Ha"
34         peerSourcePEX             = "X"
35 )
36
37 // Maintains the state of a connection with a peer.
38 type connection struct {
39         t *Torrent
40         // The actual Conn, used for closing, and setting socket options.
41         conn net.Conn
42         // The Reader and Writer for this Conn, with hooks installed for stats,
43         // limiting, deadlines etc.
44         w io.Writer
45         r io.Reader
46         // True if the connection is operating over MSE obfuscation.
47         headerEncrypted bool
48         cryptoMethod    uint32
49         Discovery       peerSource
50         uTP             bool
51         closed          missinggo.Event
52
53         stats ConnStats
54
55         lastMessageReceived     time.Time
56         completedHandshake      time.Time
57         lastUsefulChunkReceived time.Time
58         lastChunkSent           time.Time
59
60         // Stuff controlled by the local peer.
61         Interested       bool
62         Choked           bool
63         requests         map[request]struct{}
64         requestsLowWater int
65         // Indexed by metadata piece, set to true if posted and pending a
66         // response.
67         metadataRequests []bool
68         sentHaves        []bool
69
70         // Stuff controlled by the remote peer.
71         PeerID             PeerID
72         PeerInterested     bool
73         PeerChoked         bool
74         PeerRequests       map[request]struct{}
75         PeerExtensionBytes peerExtensionBytes
76         // The pieces the peer has claimed to have.
77         peerPieces bitmap.Bitmap
78         // The peer has everything. This can occur due to a special message, when
79         // we may not even know the number of pieces in the torrent yet.
80         peerSentHaveAll bool
81         // The highest possible number of pieces the torrent could have based on
82         // communication with the peer. Generally only useful until we have the
83         // torrent info.
84         peerMinPieces int
85         // Pieces we've accepted chunks for from the peer.
86         peerTouchedPieces map[int]struct{}
87
88         PeerMaxRequests  int // Maximum pending requests the peer allows.
89         PeerExtensionIDs map[string]byte
90         PeerClientName   string
91
92         pieceInclination  []int
93         pieceRequestOrder prioritybitmap.PriorityBitmap
94
95         postedBuffer bytes.Buffer
96         uploadTimer  *time.Timer
97         writerCond   sync.Cond
98 }
99
100 func (cn *connection) peerHasAllPieces() (all bool, known bool) {
101         if cn.peerSentHaveAll {
102                 return true, true
103         }
104         if !cn.t.haveInfo() {
105                 return false, false
106         }
107         return bitmap.Flip(cn.peerPieces, 0, cn.t.numPieces()).IsEmpty(), true
108 }
109
110 func (cn *connection) mu() sync.Locker {
111         return &cn.t.cl.mu
112 }
113
114 func (cn *connection) remoteAddr() net.Addr {
115         return cn.conn.RemoteAddr()
116 }
117
118 func (cn *connection) localAddr() net.Addr {
119         return cn.conn.LocalAddr()
120 }
121
122 func (cn *connection) supportsExtension(ext string) bool {
123         _, ok := cn.PeerExtensionIDs[ext]
124         return ok
125 }
126
127 // The best guess at number of pieces in the torrent for this peer.
128 func (cn *connection) bestPeerNumPieces() int {
129         if cn.t.haveInfo() {
130                 return cn.t.numPieces()
131         }
132         return cn.peerMinPieces
133 }
134
135 func (cn *connection) completedString() string {
136         return fmt.Sprintf("%d/%d", cn.peerPieces.Len(), cn.bestPeerNumPieces())
137 }
138
139 // Correct the PeerPieces slice length. Return false if the existing slice is
140 // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
141 // messages.
142 func (cn *connection) setNumPieces(num int) error {
143         cn.peerPieces.RemoveRange(num, -1)
144         cn.peerPiecesChanged()
145         return nil
146 }
147
148 func eventAgeString(t time.Time) string {
149         if t.IsZero() {
150                 return "never"
151         }
152         return fmt.Sprintf("%.2fs ago", time.Since(t).Seconds())
153 }
154
155 func (cn *connection) connectionFlags() (ret string) {
156         c := func(b byte) {
157                 ret += string([]byte{b})
158         }
159         if cn.cryptoMethod == mse.CryptoMethodRC4 {
160                 c('E')
161         } else if cn.headerEncrypted {
162                 c('e')
163         }
164         ret += string(cn.Discovery)
165         if cn.uTP {
166                 c('T')
167         }
168         return
169 }
170
171 // Inspired by https://trac.transmissionbt.com/wiki/PeerStatusText
172 func (cn *connection) statusFlags() (ret string) {
173         c := func(b byte) {
174                 ret += string([]byte{b})
175         }
176         if cn.Interested {
177                 c('i')
178         }
179         if cn.Choked {
180                 c('c')
181         }
182         c('-')
183         ret += cn.connectionFlags()
184         c('-')
185         if cn.PeerInterested {
186                 c('i')
187         }
188         if cn.PeerChoked {
189                 c('c')
190         }
191         return
192 }
193
194 func (cn *connection) String() string {
195         var buf bytes.Buffer
196         cn.WriteStatus(&buf, nil)
197         return buf.String()
198 }
199
200 func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
201         // \t isn't preserved in <pre> blocks?
202         fmt.Fprintf(w, "%-40s: %s-%s\n", cn.PeerID, cn.localAddr(), cn.remoteAddr())
203         fmt.Fprintf(w, "    last msg: %s, connected: %s, last helpful: %s\n",
204                 eventAgeString(cn.lastMessageReceived),
205                 eventAgeString(cn.completedHandshake),
206                 eventAgeString(cn.lastHelpful()))
207         fmt.Fprintf(w,
208                 "    %s completed, %d pieces touched, good chunks: %d/%d-%d reqq: %d-%d, flags: %s\n",
209                 cn.completedString(),
210                 len(cn.peerTouchedPieces),
211                 cn.stats.ChunksReadUseful,
212                 cn.stats.ChunksReadUnwanted+cn.stats.ChunksReadUseful,
213                 cn.stats.ChunksWritten,
214                 cn.numLocalRequests(),
215                 len(cn.PeerRequests),
216                 cn.statusFlags(),
217         )
218         roi := cn.pieceRequestOrderIter()
219         fmt.Fprintf(w, "    next pieces: %v%s\n",
220                 iter.ToSlice(iter.Head(10, roi)),
221                 func() string {
222                         if cn.shouldRequestWithoutBias() {
223                                 return " (fastest)"
224                         } else {
225                                 return ""
226                         }
227                 }())
228 }
229
230 func (cn *connection) Close() {
231         if !cn.closed.Set() {
232                 return
233         }
234         cn.discardPieceInclination()
235         cn.pieceRequestOrder.Clear()
236         if cn.conn != nil {
237                 go cn.conn.Close()
238         }
239 }
240
241 func (cn *connection) PeerHasPiece(piece int) bool {
242         return cn.peerSentHaveAll || cn.peerPieces.Contains(piece)
243 }
244
245 func (cn *connection) Post(msg pp.Message) {
246         messageTypesPosted.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
247         cn.postedBuffer.Write(msg.MustMarshalBinary())
248         cn.tickleWriter()
249 }
250
251 func (cn *connection) requestMetadataPiece(index int) {
252         eID := cn.PeerExtensionIDs["ut_metadata"]
253         if eID == 0 {
254                 return
255         }
256         if index < len(cn.metadataRequests) && cn.metadataRequests[index] {
257                 return
258         }
259         cn.Post(pp.Message{
260                 Type:       pp.Extended,
261                 ExtendedID: eID,
262                 ExtendedPayload: func() []byte {
263                         b, err := bencode.Marshal(map[string]int{
264                                 "msg_type": pp.RequestMetadataExtensionMsgType,
265                                 "piece":    index,
266                         })
267                         if err != nil {
268                                 panic(err)
269                         }
270                         return b
271                 }(),
272         })
273         for index >= len(cn.metadataRequests) {
274                 cn.metadataRequests = append(cn.metadataRequests, false)
275         }
276         cn.metadataRequests[index] = true
277 }
278
279 func (cn *connection) requestedMetadataPiece(index int) bool {
280         return index < len(cn.metadataRequests) && cn.metadataRequests[index]
281 }
282
283 // The actual value to use as the maximum outbound requests.
284 func (cn *connection) nominalMaxRequests() (ret int) {
285         ret = cn.PeerMaxRequests
286         if ret > 64 {
287                 ret = 64
288         }
289         return
290 }
291
292 // Returns true if an unsatisfied request was canceled.
293 func (cn *connection) PeerCancel(r request) bool {
294         if cn.PeerRequests == nil {
295                 return false
296         }
297         if _, ok := cn.PeerRequests[r]; !ok {
298                 return false
299         }
300         delete(cn.PeerRequests, r)
301         return true
302 }
303
304 func (cn *connection) Choke(msg func(pp.Message) bool) bool {
305         if cn.Choked {
306                 return true
307         }
308         cn.PeerRequests = nil
309         cn.Choked = true
310         return msg(pp.Message{
311                 Type: pp.Choke,
312         })
313 }
314
315 func (cn *connection) Unchoke(msg func(pp.Message) bool) bool {
316         if !cn.Choked {
317                 return true
318         }
319         cn.Choked = false
320         return msg(pp.Message{
321                 Type: pp.Unchoke,
322         })
323 }
324
325 func (cn *connection) SetInterested(interested bool, msg func(pp.Message) bool) bool {
326         if cn.Interested == interested {
327                 return true
328         }
329         cn.Interested = interested
330         // log.Printf("%p: setting interest: %v", cn, interested)
331         return msg(pp.Message{
332                 Type: func() pp.MessageType {
333                         if interested {
334                                 return pp.Interested
335                         } else {
336                                 return pp.NotInterested
337                         }
338                 }(),
339         })
340 }
341
342 // The function takes a message to be sent, and returns true if more messages
343 // are okay.
344 type messageWriter func(pp.Message) bool
345
346 // Proxies the messageWriter's response.
347 func (cn *connection) request(r request, mw messageWriter) bool {
348         if cn.requests == nil {
349                 cn.requests = make(map[request]struct{}, cn.nominalMaxRequests())
350         }
351         if _, ok := cn.requests[r]; ok {
352                 panic("chunk already requested")
353         }
354         if !cn.PeerHasPiece(r.Index.Int()) {
355                 panic("requesting piece peer doesn't have")
356         }
357         cn.requests[r] = struct{}{}
358         if _, ok := cn.t.conns[cn]; !ok {
359                 panic("requesting but not in active conns")
360         }
361         cn.t.pendingRequests[r]++
362         return mw(pp.Message{
363                 Type:   pp.Request,
364                 Index:  r.Index,
365                 Begin:  r.Begin,
366                 Length: r.Length,
367         })
368 }
369
370 func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) {
371         numFillBuffers.Add(1)
372         cancel, new, i := cn.desiredRequestState()
373         if !cn.SetInterested(i, msg) {
374                 return
375         }
376         if cancel && len(cn.requests) != 0 {
377                 fillBufferSentCancels.Add(1)
378                 for r := range cn.requests {
379                         cn.deleteRequest(r)
380                         // log.Printf("%p: cancelling request: %v", cn, r)
381                         if !msg(makeCancelMessage(r)) {
382                                 return
383                         }
384                 }
385         }
386         if len(new) != 0 {
387                 fillBufferSentRequests.Add(1)
388                 for _, r := range new {
389                         if !cn.request(r, msg) {
390                                 // If we didn't completely top up the requests, we shouldn't
391                                 // mark the low water, since we'll want to top up the requests
392                                 // as soon as we have more write buffer space.
393                                 return
394                         }
395                 }
396                 cn.requestsLowWater = len(cn.requests) / 2
397         }
398         cn.upload(msg)
399 }
400
401 // Routine that writes to the peer. Some of what to write is buffered by
402 // activity elsewhere in the Client, and some is determined locally when the
403 // connection is writable.
404 func (cn *connection) writer(keepAliveTimeout time.Duration) {
405         var (
406                 buf       bytes.Buffer
407                 lastWrite time.Time = time.Now()
408         )
409         var keepAliveTimer *time.Timer
410         keepAliveTimer = time.AfterFunc(keepAliveTimeout, func() {
411                 cn.mu().Lock()
412                 defer cn.mu().Unlock()
413                 if time.Since(lastWrite) >= keepAliveTimeout {
414                         cn.tickleWriter()
415                 }
416                 keepAliveTimer.Reset(keepAliveTimeout)
417         })
418         cn.mu().Lock()
419         defer cn.mu().Unlock()
420         defer cn.Close()
421         defer keepAliveTimer.Stop()
422         for {
423                 if cn.closed.IsSet() {
424                         return
425                 }
426                 buf.Write(cn.postedBuffer.Bytes())
427                 cn.postedBuffer.Reset()
428                 if buf.Len() == 0 {
429                         cn.fillWriteBuffer(func(msg pp.Message) bool {
430                                 cn.wroteMsg(&msg)
431                                 buf.Write(msg.MustMarshalBinary())
432                                 return buf.Len() < 1<<16
433                         })
434                 }
435                 if buf.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout {
436                         buf.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
437                         postedKeepalives.Add(1)
438                 }
439                 if buf.Len() == 0 {
440                         // TODO: Minimize wakeups....
441                         cn.writerCond.Wait()
442                         continue
443                 }
444                 cn.mu().Unlock()
445                 // log.Printf("writing %d bytes", buf.Len())
446                 n, err := cn.w.Write(buf.Bytes())
447                 cn.mu().Lock()
448                 if n != 0 {
449                         lastWrite = time.Now()
450                         keepAliveTimer.Reset(keepAliveTimeout)
451                 }
452                 if err != nil {
453                         return
454                 }
455                 if n != buf.Len() {
456                         panic("short write")
457                 }
458                 buf.Reset()
459         }
460 }
461
462 func (cn *connection) Have(piece int) {
463         for piece >= len(cn.sentHaves) {
464                 cn.sentHaves = append(cn.sentHaves, false)
465         }
466         if cn.sentHaves[piece] {
467                 return
468         }
469         cn.Post(pp.Message{
470                 Type:  pp.Have,
471                 Index: pp.Integer(piece),
472         })
473         cn.sentHaves[piece] = true
474 }
475
476 func (cn *connection) Bitfield(haves []bool) {
477         if cn.sentHaves != nil {
478                 panic("bitfield must be first have-related message sent")
479         }
480         cn.Post(pp.Message{
481                 Type:     pp.Bitfield,
482                 Bitfield: haves,
483         })
484         // Make a copy of haves, as that's read when the message is marshalled
485         // without the lock. Also it obviously shouldn't change in the Msg due to
486         // changes in .sentHaves.
487         cn.sentHaves = append([]bool(nil), haves...)
488 }
489
490 // Determines interest and requests to send to a connected peer.
491 func nextRequestState(
492         networkingEnabled bool,
493         currentRequests map[request]struct{},
494         peerChoking bool,
495         iterPendingRequests func(f func(request) bool),
496         requestsLowWater int,
497         requestsHighWater int,
498 ) (
499         cancelExisting bool, // Cancel all our pending requests
500         newRequests []request, // Chunks to request that we currently aren't
501         interested bool, // Whether we should indicate interest, even if we don't request anything
502 ) {
503         if !networkingEnabled {
504                 return true, nil, false
505         }
506         if len(currentRequests) > requestsLowWater {
507                 return false, nil, true
508         }
509         iterPendingRequests(func(r request) bool {
510                 interested = true
511                 if peerChoking {
512                         return false
513                 }
514                 if _, ok := currentRequests[r]; !ok {
515                         if newRequests == nil {
516                                 newRequests = make([]request, 0, requestsHighWater-len(currentRequests))
517                         }
518                         newRequests = append(newRequests, r)
519                 }
520                 return len(currentRequests)+len(newRequests) < requestsHighWater
521         })
522         return
523 }
524
525 func (cn *connection) updateRequests() {
526         // log.Print("update requests")
527         cn.tickleWriter()
528 }
529
530 // Emits the indices in the Bitmaps bms in order, never repeating any index.
531 // skip is mutated during execution, and its initial values will never be
532 // emitted.
533 func iterBitmapsDistinct(skip bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func {
534         return func(cb iter.Callback) {
535                 for _, bm := range bms {
536                         if !iter.All(func(i interface{}) bool {
537                                 skip.Add(i.(int))
538                                 return cb(i)
539                         }, bitmap.Sub(bm, skip).Iter) {
540                                 return
541                         }
542                 }
543         }
544 }
545
546 func (cn *connection) unbiasedPieceRequestOrder() iter.Func {
547         now, readahead := cn.t.readerPiecePriorities()
548         // Pieces to skip include pieces the peer doesn't have
549         skip := bitmap.Flip(cn.peerPieces, 0, cn.t.numPieces())
550         // And pieces that we already have.
551         skip.Union(cn.t.completedPieces)
552         // Return an iterator over the different priority classes, minus the skip
553         // pieces.
554         return iter.Chain(
555                 iterBitmapsDistinct(skip, now, readahead),
556                 func(cb iter.Callback) {
557                         cn.t.pendingPieces.IterTyped(func(piece int) bool {
558                                 if skip.Contains(piece) {
559                                         return true
560                                 }
561                                 more := cb(piece)
562                                 skip.Add(piece)
563                                 return more
564                         })
565                 },
566         )
567 }
568
569 // The connection should download highest priority pieces first, without any
570 // inclination toward avoiding wastage. Generally we might do this if there's
571 // a single connection, or this is the fastest connection, and we have active
572 // readers that signal an ordering preference. It's conceivable that the best
573 // connection should do this, since it's least likely to waste our time if
574 // assigned to the highest priority pieces, and assigning more than one this
575 // role would cause significant wasted bandwidth.
576 func (cn *connection) shouldRequestWithoutBias() bool {
577         if cn.t.requestStrategy != 2 {
578                 return false
579         }
580         if len(cn.t.readers) == 0 {
581                 return false
582         }
583         if len(cn.t.conns) == 1 {
584                 return true
585         }
586         if cn == cn.t.fastestConn {
587                 return true
588         }
589         return false
590 }
591
592 func (cn *connection) pieceRequestOrderIter() iter.Func {
593         if cn.shouldRequestWithoutBias() {
594                 return cn.unbiasedPieceRequestOrder()
595         } else {
596                 return cn.pieceRequestOrder.Iter
597         }
598 }
599
600 func (cn *connection) iterPendingRequests(f func(request) bool) {
601         cn.pieceRequestOrderIter()(func(_piece interface{}) bool {
602                 piece := _piece.(int)
603                 return iterUndirtiedChunks(piece, cn.t, func(cs chunkSpec) bool {
604                         r := request{pp.Integer(piece), cs}
605                         // log.Println(r, cn.t.pendingRequests[r], cn.requests)
606                         // if _, ok := cn.requests[r]; !ok && cn.t.pendingRequests[r] != 0 {
607                         //      return true
608                         // }
609                         return f(r)
610                 })
611         })
612 }
613
614 func (cn *connection) desiredRequestState() (bool, []request, bool) {
615         return nextRequestState(
616                 cn.t.networkingEnabled,
617                 cn.requests,
618                 cn.PeerChoked,
619                 cn.iterPendingRequests,
620                 cn.requestsLowWater,
621                 cn.nominalMaxRequests(),
622         )
623 }
624
625 func iterUndirtiedChunks(piece int, t *Torrent, f func(chunkSpec) bool) bool {
626         chunkIndices := t.pieces[piece].undirtiedChunkIndices().ToSortedSlice()
627         // TODO: Use "math/rand".Shuffle >= Go 1.10
628         return iter.ForPerm(len(chunkIndices), func(i int) bool {
629                 return f(t.chunkIndexSpec(chunkIndices[i], piece))
630         })
631 }
632
633 // check callers updaterequests
634 func (cn *connection) stopRequestingPiece(piece int) bool {
635         return cn.pieceRequestOrder.Remove(piece)
636 }
637
638 // This is distinct from Torrent piece priority, which is the user's
639 // preference. Connection piece priority is specific to a connection and is
640 // used to pseudorandomly avoid connections always requesting the same pieces
641 // and thus wasting effort.
642 func (cn *connection) updatePiecePriority(piece int) bool {
643         tpp := cn.t.piecePriority(piece)
644         if !cn.PeerHasPiece(piece) {
645                 tpp = PiecePriorityNone
646         }
647         if tpp == PiecePriorityNone {
648                 return cn.stopRequestingPiece(piece)
649         }
650         prio := cn.getPieceInclination()[piece]
651         switch cn.t.requestStrategy {
652         case 1:
653                 switch tpp {
654                 case PiecePriorityNormal:
655                 case PiecePriorityReadahead:
656                         prio -= cn.t.numPieces()
657                 case PiecePriorityNext, PiecePriorityNow:
658                         prio -= 2 * cn.t.numPieces()
659                 default:
660                         panic(tpp)
661                 }
662                 prio += piece / 3
663         default:
664         }
665         return cn.pieceRequestOrder.Set(piece, prio) || cn.shouldRequestWithoutBias()
666 }
667
668 func (cn *connection) getPieceInclination() []int {
669         if cn.pieceInclination == nil {
670                 cn.pieceInclination = cn.t.getConnPieceInclination()
671         }
672         return cn.pieceInclination
673 }
674
675 func (cn *connection) discardPieceInclination() {
676         if cn.pieceInclination == nil {
677                 return
678         }
679         cn.t.putPieceInclination(cn.pieceInclination)
680         cn.pieceInclination = nil
681 }
682
683 func (cn *connection) peerPiecesChanged() {
684         if cn.t.haveInfo() {
685                 prioritiesChanged := false
686                 for i := range iter.N(cn.t.numPieces()) {
687                         if cn.updatePiecePriority(i) {
688                                 prioritiesChanged = true
689                         }
690                 }
691                 if prioritiesChanged {
692                         cn.updateRequests()
693                 }
694         }
695 }
696
697 func (cn *connection) raisePeerMinPieces(newMin int) {
698         if newMin > cn.peerMinPieces {
699                 cn.peerMinPieces = newMin
700         }
701 }
702
703 func (cn *connection) peerSentHave(piece int) error {
704         if cn.t.haveInfo() && piece >= cn.t.numPieces() || piece < 0 {
705                 return errors.New("invalid piece")
706         }
707         if cn.PeerHasPiece(piece) {
708                 return nil
709         }
710         cn.raisePeerMinPieces(piece + 1)
711         cn.peerPieces.Set(piece, true)
712         if cn.updatePiecePriority(piece) {
713                 cn.updateRequests()
714         }
715         return nil
716 }
717
718 func (cn *connection) peerSentBitfield(bf []bool) error {
719         cn.peerSentHaveAll = false
720         if len(bf)%8 != 0 {
721                 panic("expected bitfield length divisible by 8")
722         }
723         // We know that the last byte means that at most the last 7 bits are
724         // wasted.
725         cn.raisePeerMinPieces(len(bf) - 7)
726         if cn.t.haveInfo() && len(bf) > cn.t.numPieces() {
727                 // Ignore known excess pieces.
728                 bf = bf[:cn.t.numPieces()]
729         }
730         for i, have := range bf {
731                 if have {
732                         cn.raisePeerMinPieces(i + 1)
733                 }
734                 cn.peerPieces.Set(i, have)
735         }
736         cn.peerPiecesChanged()
737         return nil
738 }
739
740 func (cn *connection) onPeerSentHaveAll() error {
741         cn.peerSentHaveAll = true
742         cn.peerPieces.Clear()
743         cn.peerPiecesChanged()
744         return nil
745 }
746
747 func (cn *connection) peerSentHaveNone() error {
748         cn.peerPieces.Clear()
749         cn.peerSentHaveAll = false
750         cn.peerPiecesChanged()
751         return nil
752 }
753
754 func (c *connection) requestPendingMetadata() {
755         if c.t.haveInfo() {
756                 return
757         }
758         if c.PeerExtensionIDs["ut_metadata"] == 0 {
759                 // Peer doesn't support this.
760                 return
761         }
762         // Request metadata pieces that we don't have in a random order.
763         var pending []int
764         for index := 0; index < c.t.metadataPieceCount(); index++ {
765                 if !c.t.haveMetadataPiece(index) && !c.requestedMetadataPiece(index) {
766                         pending = append(pending, index)
767                 }
768         }
769         for _, i := range rand.Perm(len(pending)) {
770                 c.requestMetadataPiece(pending[i])
771         }
772 }
773
774 func (cn *connection) wroteMsg(msg *pp.Message) {
775         messageTypesSent.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
776         cn.stats.wroteMsg(msg)
777         cn.t.stats.wroteMsg(msg)
778 }
779
780 func (cn *connection) readMsg(msg *pp.Message) {
781         cn.stats.readMsg(msg)
782         cn.t.stats.readMsg(msg)
783 }
784
785 func (cn *connection) wroteBytes(n int64) {
786         cn.stats.wroteBytes(n)
787         if cn.t != nil {
788                 cn.t.stats.wroteBytes(n)
789         }
790 }
791
792 func (cn *connection) readBytes(n int64) {
793         cn.stats.readBytes(n)
794         if cn.t != nil {
795                 cn.t.stats.readBytes(n)
796         }
797 }
798
799 // Returns whether the connection could be useful to us. We're seeding and
800 // they want data, we don't have metainfo and they can provide it, etc.
801 func (c *connection) useful() bool {
802         t := c.t
803         if c.closed.IsSet() {
804                 return false
805         }
806         if !t.haveInfo() {
807                 return c.supportsExtension("ut_metadata")
808         }
809         if t.seeding() && c.PeerInterested {
810                 return true
811         }
812         if c.peerHasWantedPieces() {
813                 return true
814         }
815         return false
816 }
817
818 func (c *connection) lastHelpful() (ret time.Time) {
819         ret = c.lastUsefulChunkReceived
820         if c.t.seeding() && c.lastChunkSent.After(ret) {
821                 ret = c.lastChunkSent
822         }
823         return
824 }
825
826 // Processes incoming bittorrent messages. The client lock is held upon entry
827 // and exit. Returning will end the connection.
828 func (c *connection) mainReadLoop() error {
829         t := c.t
830         cl := t.cl
831
832         decoder := pp.Decoder{
833                 R:         bufio.NewReaderSize(c.r, 1<<17),
834                 MaxLength: 256 * 1024,
835                 Pool:      t.chunkPool,
836         }
837         for {
838                 var (
839                         msg pp.Message
840                         err error
841                 )
842                 func() {
843                         cl.mu.Unlock()
844                         defer cl.mu.Lock()
845                         err = decoder.Decode(&msg)
846                 }()
847                 if cl.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
848                         return nil
849                 }
850                 if err != nil {
851                         return err
852                 }
853                 c.readMsg(&msg)
854                 c.lastMessageReceived = time.Now()
855                 if msg.Keepalive {
856                         receivedKeepalives.Add(1)
857                         continue
858                 }
859                 messageTypesReceived.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
860                 switch msg.Type {
861                 case pp.Choke:
862                         c.PeerChoked = true
863                         c.deleteAllRequests()
864                         // We can then reset our interest.
865                         c.updateRequests()
866                 case pp.Reject:
867                         if c.deleteRequest(newRequest(msg.Index, msg.Begin, msg.Length)) {
868                                 c.updateRequests()
869                         }
870                 case pp.Unchoke:
871                         c.PeerChoked = false
872                         c.tickleWriter()
873                 case pp.Interested:
874                         c.PeerInterested = true
875                         c.tickleWriter()
876                 case pp.NotInterested:
877                         c.PeerInterested = false
878                         c.PeerRequests = nil
879                 case pp.Have:
880                         err = c.peerSentHave(int(msg.Index))
881                 case pp.Request:
882                         requestedChunkLengths.Add(strconv.FormatUint(msg.Length.Uint64(), 10), 1)
883                         if c.Choked {
884                                 break
885                         }
886                         if len(c.PeerRequests) >= maxRequests {
887                                 // TODO: Should we drop them or Choke them instead?
888                                 break
889                         }
890                         if !t.havePiece(msg.Index.Int()) {
891                                 // This isn't necessarily them screwing up. We can drop pieces
892                                 // from our storage, and can't communicate this to peers
893                                 // except by reconnecting.
894                                 requestsReceivedForMissingPieces.Add(1)
895                                 err = fmt.Errorf("peer requested piece we don't have: %v", msg.Index.Int())
896                                 break
897                         }
898                         if c.PeerRequests == nil {
899                                 c.PeerRequests = make(map[request]struct{}, maxRequests)
900                         }
901                         c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{}
902                         c.tickleWriter()
903                 case pp.Cancel:
904                         req := newRequest(msg.Index, msg.Begin, msg.Length)
905                         if !c.PeerCancel(req) {
906                                 unexpectedCancels.Add(1)
907                         }
908                 case pp.Bitfield:
909                         err = c.peerSentBitfield(msg.Bitfield)
910                 case pp.HaveAll:
911                         err = c.onPeerSentHaveAll()
912                 case pp.HaveNone:
913                         err = c.peerSentHaveNone()
914                 case pp.Piece:
915                         c.receiveChunk(&msg)
916                         if len(msg.Piece) == int(t.chunkSize) {
917                                 t.chunkPool.Put(&msg.Piece)
918                         }
919                 case pp.Extended:
920                         switch msg.ExtendedID {
921                         case pp.HandshakeExtendedID:
922                                 // TODO: Create a bencode struct for this.
923                                 var d map[string]interface{}
924                                 err = bencode.Unmarshal(msg.ExtendedPayload, &d)
925                                 if err != nil {
926                                         err = fmt.Errorf("error decoding extended message payload: %s", err)
927                                         break
928                                 }
929                                 // log.Printf("got handshake from %q: %#v", c.Socket.RemoteAddr().String(), d)
930                                 if reqq, ok := d["reqq"]; ok {
931                                         if i, ok := reqq.(int64); ok {
932                                                 c.PeerMaxRequests = int(i)
933                                         }
934                                 }
935                                 if v, ok := d["v"]; ok {
936                                         c.PeerClientName = v.(string)
937                                 }
938                                 if m, ok := d["m"]; ok {
939                                         mTyped, ok := m.(map[string]interface{})
940                                         if !ok {
941                                                 err = errors.New("handshake m value is not dict")
942                                                 break
943                                         }
944                                         if c.PeerExtensionIDs == nil {
945                                                 c.PeerExtensionIDs = make(map[string]byte, len(mTyped))
946                                         }
947                                         for name, v := range mTyped {
948                                                 id, ok := v.(int64)
949                                                 if !ok {
950                                                         log.Printf("bad handshake m item extension ID type: %T", v)
951                                                         continue
952                                                 }
953                                                 if id == 0 {
954                                                         delete(c.PeerExtensionIDs, name)
955                                                 } else {
956                                                         if c.PeerExtensionIDs[name] == 0 {
957                                                                 supportedExtensionMessages.Add(name, 1)
958                                                         }
959                                                         c.PeerExtensionIDs[name] = byte(id)
960                                                 }
961                                         }
962                                 }
963                                 metadata_sizeUntyped, ok := d["metadata_size"]
964                                 if ok {
965                                         metadata_size, ok := metadata_sizeUntyped.(int64)
966                                         if !ok {
967                                                 log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
968                                         } else {
969                                                 err = t.setMetadataSize(metadata_size)
970                                                 if err != nil {
971                                                         err = fmt.Errorf("error setting metadata size to %d", metadata_size)
972                                                         break
973                                                 }
974                                         }
975                                 }
976                                 if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
977                                         c.requestPendingMetadata()
978                                 }
979                         case metadataExtendedId:
980                                 err = cl.gotMetadataExtensionMsg(msg.ExtendedPayload, t, c)
981                                 if err != nil {
982                                         err = fmt.Errorf("error handling metadata extension message: %s", err)
983                                 }
984                         case pexExtendedId:
985                                 if cl.config.DisablePEX {
986                                         break
987                                 }
988                                 var pexMsg peerExchangeMessage
989                                 err = bencode.Unmarshal(msg.ExtendedPayload, &pexMsg)
990                                 if err != nil {
991                                         err = fmt.Errorf("error unmarshalling PEX message: %s", err)
992                                         break
993                                 }
994                                 go func() {
995                                         cl.mu.Lock()
996                                         t.addPeers(func() (ret []Peer) {
997                                                 for i, cp := range pexMsg.Added {
998                                                         p := Peer{
999                                                                 IP:     make([]byte, 4),
1000                                                                 Port:   cp.Port,
1001                                                                 Source: peerSourcePEX,
1002                                                         }
1003                                                         if i < len(pexMsg.AddedFlags) && pexMsg.AddedFlags[i]&0x01 != 0 {
1004                                                                 p.SupportsEncryption = true
1005                                                         }
1006                                                         missinggo.CopyExact(p.IP, cp.IP[:])
1007                                                         ret = append(ret, p)
1008                                                 }
1009                                                 return
1010                                         }())
1011                                         cl.mu.Unlock()
1012                                 }()
1013                         default:
1014                                 err = fmt.Errorf("unexpected extended message ID: %v", msg.ExtendedID)
1015                         }
1016                         if err != nil {
1017                                 // That client uses its own extension IDs for outgoing message
1018                                 // types, which is incorrect.
1019                                 if bytes.HasPrefix(c.PeerID[:], []byte("-SD0100-")) ||
1020                                         strings.HasPrefix(string(c.PeerID[:]), "-XL0012-") {
1021                                         return nil
1022                                 }
1023                         }
1024                 case pp.Port:
1025                         if cl.dHT == nil {
1026                                 break
1027                         }
1028                         pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
1029                         if err != nil {
1030                                 panic(err)
1031                         }
1032                         if msg.Port != 0 {
1033                                 pingAddr.Port = int(msg.Port)
1034                         }
1035                         go cl.dHT.Ping(pingAddr, nil)
1036                 default:
1037                         err = fmt.Errorf("received unknown message type: %#v", msg.Type)
1038                 }
1039                 if err != nil {
1040                         return err
1041                 }
1042         }
1043 }
1044
1045 // Set both the Reader and Writer for the connection from a single ReadWriter.
1046 func (cn *connection) setRW(rw io.ReadWriter) {
1047         cn.r = rw
1048         cn.w = rw
1049 }
1050
1051 // Returns the Reader and Writer as a combined ReadWriter.
1052 func (cn *connection) rw() io.ReadWriter {
1053         return struct {
1054                 io.Reader
1055                 io.Writer
1056         }{cn.r, cn.w}
1057 }
1058
1059 // Handle a received chunk from a peer.
1060 func (c *connection) receiveChunk(msg *pp.Message) {
1061         t := c.t
1062         cl := t.cl
1063         chunksReceived.Add(1)
1064
1065         req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
1066
1067         // Request has been satisfied.
1068         if c.deleteRequest(req) {
1069                 c.updateRequests()
1070         } else {
1071                 unexpectedChunksReceived.Add(1)
1072         }
1073
1074         // Do we actually want this chunk?
1075         if !t.wantPiece(req) {
1076                 unwantedChunksReceived.Add(1)
1077                 c.stats.ChunksReadUnwanted++
1078                 return
1079         }
1080
1081         index := int(req.Index)
1082         piece := &t.pieces[index]
1083
1084         c.stats.ChunksReadUseful++
1085         c.lastUsefulChunkReceived = time.Now()
1086         // if t.fastestConn != c {
1087         // log.Printf("setting fastest connection %p", c)
1088         // }
1089         t.fastestConn = c
1090
1091         // Need to record that it hasn't been written yet, before we attempt to do
1092         // anything with it.
1093         piece.incrementPendingWrites()
1094         // Record that we have the chunk, so we aren't trying to download it while
1095         // waiting for it to be written to storage.
1096         piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
1097
1098         // Cancel pending requests for this chunk.
1099         for c := range t.conns {
1100                 c.postCancel(req)
1101         }
1102
1103         err := func() error {
1104                 cl.mu.Unlock()
1105                 defer cl.mu.Lock()
1106                 // Write the chunk out. Note that the upper bound on chunk writing
1107                 // concurrency will be the number of connections. We write inline with
1108                 // receiving the chunk (with this lock dance), because we want to
1109                 // handle errors synchronously and I haven't thought of a nice way to
1110                 // defer any concurrency to the storage and have that notify the
1111                 // client of errors. TODO: Do that instead.
1112                 return t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
1113         }()
1114
1115         piece.decrementPendingWrites()
1116
1117         if err != nil {
1118                 log.Printf("%s (%s): error writing chunk %v: %s", t, t.infoHash, req, err)
1119                 t.pendRequest(req)
1120                 t.updatePieceCompletion(int(msg.Index))
1121                 return
1122         }
1123
1124         // It's important that the piece is potentially queued before we check if
1125         // the piece is still wanted, because if it is queued, it won't be wanted.
1126         if t.pieceAllDirty(index) {
1127                 t.queuePieceCheck(int(req.Index))
1128                 t.pendAllChunkSpecs(index)
1129         }
1130
1131         if c.peerTouchedPieces == nil {
1132                 c.peerTouchedPieces = make(map[int]struct{})
1133         }
1134         c.peerTouchedPieces[index] = struct{}{}
1135
1136         cl.event.Broadcast()
1137         t.publishPieceChange(int(req.Index))
1138 }
1139
1140 func (c *connection) uploadAllowed() bool {
1141         if c.t.cl.config.NoUpload {
1142                 return false
1143         }
1144         if c.t.seeding() {
1145                 return true
1146         }
1147         if !c.peerHasWantedPieces() {
1148                 return false
1149         }
1150         // Don't upload more than 100 KiB more than we download.
1151         if c.stats.DataBytesWritten >= c.stats.DataBytesRead+100<<10 {
1152                 return false
1153         }
1154         return true
1155 }
1156
1157 func (c *connection) setRetryUploadTimer(delay time.Duration) {
1158         if c.uploadTimer == nil {
1159                 c.uploadTimer = time.AfterFunc(delay, c.writerCond.Broadcast)
1160         } else {
1161                 c.uploadTimer.Reset(delay)
1162         }
1163 }
1164
1165 // Also handles choking and unchoking of the remote peer.
1166 func (c *connection) upload(msg func(pp.Message) bool) bool {
1167         // Breaking or completing this loop means we don't want to upload to the
1168         // peer anymore, and we choke them.
1169 another:
1170         for c.uploadAllowed() {
1171                 // We want to upload to the peer.
1172                 if !c.Unchoke(msg) {
1173                         return false
1174                 }
1175                 for r := range c.PeerRequests {
1176                         res := c.t.cl.uploadLimit.ReserveN(time.Now(), int(r.Length))
1177                         if !res.OK() {
1178                                 panic(fmt.Sprintf("upload rate limiter burst size < %d", r.Length))
1179                         }
1180                         delay := res.Delay()
1181                         if delay > 0 {
1182                                 res.Cancel()
1183                                 c.setRetryUploadTimer(delay)
1184                                 // Hard to say what to return here.
1185                                 return true
1186                         }
1187                         more, err := c.sendChunk(r, msg)
1188                         if err != nil {
1189                                 i := int(r.Index)
1190                                 if c.t.pieceComplete(i) {
1191                                         c.t.updatePieceCompletion(i)
1192                                         if !c.t.pieceComplete(i) {
1193                                                 // We had the piece, but not anymore.
1194                                                 break another
1195                                         }
1196                                 }
1197                                 log.Printf("error sending chunk %+v to peer: %s", r, err)
1198                                 // If we failed to send a chunk, choke the peer to ensure they
1199                                 // flush all their requests. We've probably dropped a piece,
1200                                 // but there's no way to communicate this to the peer. If they
1201                                 // ask for it again, we'll kick them to allow us to send them
1202                                 // an updated bitfield.
1203                                 break another
1204                         }
1205                         delete(c.PeerRequests, r)
1206                         if !more {
1207                                 return false
1208                         }
1209                         goto another
1210                 }
1211                 return true
1212         }
1213         return c.Choke(msg)
1214 }
1215
1216 func (cn *connection) Drop() {
1217         cn.t.dropConnection(cn)
1218 }
1219
1220 func (cn *connection) netGoodPiecesDirtied() int64 {
1221         return cn.stats.GoodPiecesDirtied - cn.stats.BadPiecesDirtied
1222 }
1223
1224 func (c *connection) peerHasWantedPieces() bool {
1225         return !c.pieceRequestOrder.IsEmpty()
1226 }
1227
1228 func (c *connection) numLocalRequests() int {
1229         return len(c.requests)
1230 }
1231
1232 func (c *connection) deleteRequest(r request) bool {
1233         if _, ok := c.requests[r]; !ok {
1234                 return false
1235         }
1236         delete(c.requests, r)
1237         c.t.pendingRequests[r]--
1238         return true
1239 }
1240
1241 func (c *connection) deleteAllRequests() {
1242         for r := range c.requests {
1243                 c.deleteRequest(r)
1244         }
1245         // for c := range c.t.conns {
1246         //      c.tickleWriter()
1247         // }
1248 }
1249
1250 func (c *connection) tickleWriter() {
1251         c.writerCond.Broadcast()
1252 }
1253
1254 func (c *connection) postCancel(r request) bool {
1255         if !c.deleteRequest(r) {
1256                 return false
1257         }
1258         c.Post(makeCancelMessage(r))
1259         return true
1260 }
1261
1262 func (c *connection) sendChunk(r request, msg func(pp.Message) bool) (more bool, err error) {
1263         // Count the chunk being sent, even if it isn't.
1264         b := make([]byte, r.Length)
1265         p := c.t.info.Piece(int(r.Index))
1266         n, err := c.t.readAt(b, p.Offset()+int64(r.Begin))
1267         if n != len(b) {
1268                 if err == nil {
1269                         panic("expected error")
1270                 }
1271                 return
1272         } else if err == io.EOF {
1273                 err = nil
1274         }
1275         more = msg(pp.Message{
1276                 Type:  pp.Piece,
1277                 Index: r.Index,
1278                 Begin: r.Begin,
1279                 Piece: b,
1280         })
1281         uploadChunksPosted.Add(1)
1282         c.lastChunkSent = time.Now()
1283         return
1284 }