]> Sergey Matveev's repositories - btrtrc.git/blob - connection.go
New slices package
[btrtrc.git] / connection.go
1 package torrent
2
3 import (
4         "bufio"
5         "bytes"
6         "container/list"
7         "errors"
8         "expvar"
9         "fmt"
10         "io"
11         "math/rand"
12         "net"
13         "strconv"
14         "sync"
15         "time"
16
17         "github.com/anacrolix/missinggo"
18         "github.com/anacrolix/missinggo/bitmap"
19         "github.com/anacrolix/missinggo/prioritybitmap"
20         "github.com/anacrolix/missinggo/slices"
21         "github.com/bradfitz/iter"
22
23         "github.com/anacrolix/torrent/bencode"
24         pp "github.com/anacrolix/torrent/peer_protocol"
25 )
26
27 var optimizedCancels = expvar.NewInt("optimizedCancels")
28
29 type peerSource byte
30
31 const (
32         peerSourceTracker  = '\x00' // It's the default.
33         peerSourceIncoming = 'I'
34         peerSourceDHT      = 'H'
35         peerSourcePEX      = 'X'
36 )
37
38 // Maintains the state of a connection with a peer.
39 type connection struct {
40         t         *Torrent
41         conn      net.Conn
42         rw        io.ReadWriter // The real slim shady
43         encrypted bool
44         Discovery peerSource
45         uTP       bool
46         closed    missinggo.Event
47
48         stats                  ConnStats
49         UnwantedChunksReceived int
50         UsefulChunksReceived   int
51         chunksSent             int
52         goodPiecesDirtied      int
53         badPiecesDirtied       int
54
55         lastMessageReceived     time.Time
56         completedHandshake      time.Time
57         lastUsefulChunkReceived time.Time
58         lastChunkSent           time.Time
59
60         // Stuff controlled by the local peer.
61         Interested       bool
62         Choked           bool
63         Requests         map[request]struct{}
64         requestsLowWater int
65         // Indexed by metadata piece, set to true if posted and pending a
66         // response.
67         metadataRequests []bool
68         sentHaves        []bool
69
70         // Stuff controlled by the remote peer.
71         PeerID             [20]byte
72         PeerInterested     bool
73         PeerChoked         bool
74         PeerRequests       map[request]struct{}
75         PeerExtensionBytes peerExtensionBytes
76         // The pieces the peer has claimed to have.
77         peerPieces bitmap.Bitmap
78         // The peer has everything. This can occur due to a special message, when
79         // we may not even know the number of pieces in the torrent yet.
80         peerHasAll bool
81         // The highest possible number of pieces the torrent could have based on
82         // communication with the peer. Generally only useful until we have the
83         // torrent info.
84         peerMinPieces int
85         // Pieces we've accepted chunks for from the peer.
86         peerTouchedPieces map[int]struct{}
87
88         PeerMaxRequests  int // Maximum pending requests the peer allows.
89         PeerExtensionIDs map[string]byte
90         PeerClientName   string
91
92         pieceInclination  []int
93         pieceRequestOrder prioritybitmap.PriorityBitmap
94
95         outgoingUnbufferedMessages         *list.List
96         outgoingUnbufferedMessagesNotEmpty missinggo.Event
97 }
98
99 func (cn *connection) mu() sync.Locker {
100         return &cn.t.cl.mu
101 }
102
103 func (cl *Client) newConnection(nc net.Conn) (c *connection) {
104         c = &connection{
105                 conn: nc,
106                 rw:   nc,
107
108                 Choked:          true,
109                 PeerChoked:      true,
110                 PeerMaxRequests: 250,
111         }
112         return
113 }
114
115 func (cn *connection) remoteAddr() net.Addr {
116         return cn.conn.RemoteAddr()
117 }
118
119 func (cn *connection) localAddr() net.Addr {
120         return cn.conn.LocalAddr()
121 }
122
123 func (cn *connection) supportsExtension(ext string) bool {
124         _, ok := cn.PeerExtensionIDs[ext]
125         return ok
126 }
127
128 // The best guess at number of pieces in the torrent for this peer.
129 func (cn *connection) bestPeerNumPieces() int {
130         if cn.t.haveInfo() {
131                 return cn.t.numPieces()
132         }
133         return cn.peerMinPieces
134 }
135
136 func (cn *connection) completedString() string {
137         return fmt.Sprintf("%d/%d", cn.peerPieces.Len(), cn.bestPeerNumPieces())
138 }
139
140 // Correct the PeerPieces slice length. Return false if the existing slice is
141 // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
142 // messages.
143 func (cn *connection) setNumPieces(num int) error {
144         cn.peerPieces.RemoveRange(num, -1)
145         cn.peerPiecesChanged()
146         return nil
147 }
148
149 func eventAgeString(t time.Time) string {
150         if t.IsZero() {
151                 return "never"
152         }
153         return fmt.Sprintf("%.2fs ago", time.Now().Sub(t).Seconds())
154 }
155
156 func (cn *connection) connectionFlags() (ret string) {
157         c := func(b byte) {
158                 ret += string([]byte{b})
159         }
160         if cn.encrypted {
161                 c('E')
162         }
163         if cn.Discovery != 0 {
164                 c(byte(cn.Discovery))
165         }
166         if cn.uTP {
167                 c('T')
168         }
169         return
170 }
171
172 // Inspired by https://trac.transmissionbt.com/wiki/PeerStatusText
173 func (cn *connection) statusFlags() (ret string) {
174         c := func(b byte) {
175                 ret += string([]byte{b})
176         }
177         if cn.Interested {
178                 c('i')
179         }
180         if cn.Choked {
181                 c('c')
182         }
183         c('-')
184         ret += cn.connectionFlags()
185         c('-')
186         if cn.PeerInterested {
187                 c('i')
188         }
189         if cn.PeerChoked {
190                 c('c')
191         }
192         return
193 }
194
195 func (cn *connection) String() string {
196         var buf bytes.Buffer
197         cn.WriteStatus(&buf, nil)
198         return buf.String()
199 }
200
201 func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
202         // \t isn't preserved in <pre> blocks?
203         fmt.Fprintf(w, "%+q: %s-%s\n", cn.PeerID, cn.localAddr(), cn.remoteAddr())
204         fmt.Fprintf(w, "    last msg: %s, connected: %s, last useful chunk: %s\n",
205                 eventAgeString(cn.lastMessageReceived),
206                 eventAgeString(cn.completedHandshake),
207                 eventAgeString(cn.lastUsefulChunkReceived))
208         fmt.Fprintf(w,
209                 "    %s completed, %d pieces touched, good chunks: %d/%d-%d reqq: %d-%d, flags: %s\n",
210                 cn.completedString(),
211                 len(cn.peerTouchedPieces),
212                 cn.UsefulChunksReceived,
213                 cn.UnwantedChunksReceived+cn.UsefulChunksReceived,
214                 cn.chunksSent,
215                 len(cn.Requests),
216                 len(cn.PeerRequests),
217                 cn.statusFlags(),
218         )
219 }
220
221 func (cn *connection) Close() {
222         cn.closed.Set()
223         cn.discardPieceInclination()
224         cn.pieceRequestOrder.Clear()
225         if cn.conn != nil {
226                 // TODO: This call blocks sometimes, why?
227                 go cn.conn.Close()
228         }
229 }
230
231 func (cn *connection) PeerHasPiece(piece int) bool {
232         return cn.peerHasAll || cn.peerPieces.Contains(piece)
233 }
234
235 func (cn *connection) Post(msg pp.Message) {
236         switch msg.Type {
237         case pp.Cancel:
238                 for e := cn.outgoingUnbufferedMessages.Back(); e != nil; e = e.Prev() {
239                         elemMsg := e.Value.(pp.Message)
240                         if elemMsg.Type == pp.Request && elemMsg.Index == msg.Index && elemMsg.Begin == msg.Begin && elemMsg.Length == msg.Length {
241                                 cn.outgoingUnbufferedMessages.Remove(e)
242                                 optimizedCancels.Add(1)
243                                 return
244                         }
245                 }
246         }
247         if cn.outgoingUnbufferedMessages == nil {
248                 cn.outgoingUnbufferedMessages = list.New()
249         }
250         cn.outgoingUnbufferedMessages.PushBack(msg)
251         cn.outgoingUnbufferedMessagesNotEmpty.Set()
252         postedMessageTypes.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
253 }
254
255 func (cn *connection) RequestPending(r request) bool {
256         _, ok := cn.Requests[r]
257         return ok
258 }
259
260 func (cn *connection) requestMetadataPiece(index int) {
261         eID := cn.PeerExtensionIDs["ut_metadata"]
262         if eID == 0 {
263                 return
264         }
265         if index < len(cn.metadataRequests) && cn.metadataRequests[index] {
266                 return
267         }
268         cn.Post(pp.Message{
269                 Type:       pp.Extended,
270                 ExtendedID: eID,
271                 ExtendedPayload: func() []byte {
272                         b, err := bencode.Marshal(map[string]int{
273                                 "msg_type": pp.RequestMetadataExtensionMsgType,
274                                 "piece":    index,
275                         })
276                         if err != nil {
277                                 panic(err)
278                         }
279                         return b
280                 }(),
281         })
282         for index >= len(cn.metadataRequests) {
283                 cn.metadataRequests = append(cn.metadataRequests, false)
284         }
285         cn.metadataRequests[index] = true
286 }
287
288 func (cn *connection) requestedMetadataPiece(index int) bool {
289         return index < len(cn.metadataRequests) && cn.metadataRequests[index]
290 }
291
292 // The actual value to use as the maximum outbound requests.
293 func (cn *connection) nominalMaxRequests() (ret int) {
294         ret = cn.PeerMaxRequests
295         if ret > 64 {
296                 ret = 64
297         }
298         return
299 }
300
301 // Returns true if more requests can be sent.
302 func (cn *connection) Request(chunk request) bool {
303         if len(cn.Requests) >= cn.nominalMaxRequests() {
304                 return false
305         }
306         if !cn.PeerHasPiece(int(chunk.Index)) {
307                 return true
308         }
309         if cn.RequestPending(chunk) {
310                 return true
311         }
312         cn.SetInterested(true)
313         if cn.PeerChoked {
314                 return false
315         }
316         if cn.Requests == nil {
317                 cn.Requests = make(map[request]struct{}, cn.PeerMaxRequests)
318         }
319         cn.Requests[chunk] = struct{}{}
320         cn.requestsLowWater = len(cn.Requests) / 2
321         cn.Post(pp.Message{
322                 Type:   pp.Request,
323                 Index:  chunk.Index,
324                 Begin:  chunk.Begin,
325                 Length: chunk.Length,
326         })
327         return true
328 }
329
330 // Returns true if an unsatisfied request was canceled.
331 func (cn *connection) Cancel(r request) bool {
332         if !cn.RequestPending(r) {
333                 return false
334         }
335         delete(cn.Requests, r)
336         cn.Post(pp.Message{
337                 Type:   pp.Cancel,
338                 Index:  r.Index,
339                 Begin:  r.Begin,
340                 Length: r.Length,
341         })
342         return true
343 }
344
345 // Returns true if an unsatisfied request was canceled.
346 func (cn *connection) PeerCancel(r request) bool {
347         if cn.PeerRequests == nil {
348                 return false
349         }
350         if _, ok := cn.PeerRequests[r]; !ok {
351                 return false
352         }
353         delete(cn.PeerRequests, r)
354         return true
355 }
356
357 func (cn *connection) Choke() {
358         if cn.Choked {
359                 return
360         }
361         cn.Post(pp.Message{
362                 Type: pp.Choke,
363         })
364         cn.PeerRequests = nil
365         cn.Choked = true
366 }
367
368 func (cn *connection) Unchoke() {
369         if !cn.Choked {
370                 return
371         }
372         cn.Post(pp.Message{
373                 Type: pp.Unchoke,
374         })
375         cn.Choked = false
376 }
377
378 func (cn *connection) SetInterested(interested bool) {
379         if cn.Interested == interested {
380                 return
381         }
382         cn.Post(pp.Message{
383                 Type: func() pp.MessageType {
384                         if interested {
385                                 return pp.Interested
386                         } else {
387                                 return pp.NotInterested
388                         }
389                 }(),
390         })
391         cn.Interested = interested
392 }
393
394 var (
395         // Track connection writer buffer writes and flushes, to determine its
396         // efficiency.
397         connectionWriterFlush = expvar.NewInt("connectionWriterFlush")
398         connectionWriterWrite = expvar.NewInt("connectionWriterWrite")
399 )
400
401 // Writes buffers to the socket from the write channel.
402 func (cn *connection) writer(keepAliveTimeout time.Duration) {
403         defer func() {
404                 cn.mu().Lock()
405                 defer cn.mu().Unlock()
406                 cn.Close()
407         }()
408         // Reduce write syscalls.
409         buf := bufio.NewWriter(cn.rw)
410         keepAliveTimer := time.NewTimer(keepAliveTimeout)
411         for {
412                 cn.mu().Lock()
413                 for cn.outgoingUnbufferedMessages != nil && cn.outgoingUnbufferedMessages.Len() != 0 {
414                         msg := cn.outgoingUnbufferedMessages.Remove(cn.outgoingUnbufferedMessages.Front()).(pp.Message)
415                         cn.mu().Unlock()
416                         b, err := msg.MarshalBinary()
417                         if err != nil {
418                                 panic(err)
419                         }
420                         connectionWriterWrite.Add(1)
421                         n, err := buf.Write(b)
422                         if err != nil {
423                                 return
424                         }
425                         keepAliveTimer.Reset(keepAliveTimeout)
426                         if n != len(b) {
427                                 panic("short write")
428                         }
429                         cn.mu().Lock()
430                         cn.wroteMsg(msg)
431                         cn.wroteBytes(b)
432                 }
433                 cn.outgoingUnbufferedMessagesNotEmpty.Clear()
434                 cn.mu().Unlock()
435                 connectionWriterFlush.Add(1)
436                 if buf.Buffered() != 0 {
437                         if buf.Flush() != nil {
438                                 return
439                         }
440                         keepAliveTimer.Reset(keepAliveTimeout)
441                 }
442                 select {
443                 case <-cn.closed.LockedChan(cn.mu()):
444                         return
445                 case <-cn.outgoingUnbufferedMessagesNotEmpty.LockedChan(cn.mu()):
446                 case <-keepAliveTimer.C:
447                         cn.mu().Lock()
448                         cn.Post(pp.Message{Keepalive: true})
449                         cn.mu().Unlock()
450                         postedKeepalives.Add(1)
451                 }
452         }
453 }
454
455 func (cn *connection) Have(piece int) {
456         for piece >= len(cn.sentHaves) {
457                 cn.sentHaves = append(cn.sentHaves, false)
458         }
459         if cn.sentHaves[piece] {
460                 return
461         }
462         cn.Post(pp.Message{
463                 Type:  pp.Have,
464                 Index: pp.Integer(piece),
465         })
466         cn.sentHaves[piece] = true
467 }
468
469 func (cn *connection) Bitfield(haves []bool) {
470         if cn.sentHaves != nil {
471                 panic("bitfield must be first have-related message sent")
472         }
473         cn.Post(pp.Message{
474                 Type:     pp.Bitfield,
475                 Bitfield: haves,
476         })
477         // Make a copy of haves, as that's read when the message is marshalled
478         // without the lock. Also it obviously shouldn't change in the Msg due to
479         // changes in .sentHaves.
480         cn.sentHaves = append([]bool(nil), haves...)
481 }
482
483 func (cn *connection) updateRequests() {
484         if !cn.t.haveInfo() {
485                 return
486         }
487         if cn.Interested {
488                 if cn.PeerChoked {
489                         return
490                 }
491                 if len(cn.Requests) > cn.requestsLowWater {
492                         return
493                 }
494         }
495         cn.fillRequests()
496         if len(cn.Requests) == 0 && !cn.PeerChoked {
497                 // So we're not choked, but we don't want anything right now. We may
498                 // have completed readahead, and the readahead window has not rolled
499                 // over to the next piece. Better to stay interested in case we're
500                 // going to want data in the near future.
501                 cn.SetInterested(!cn.t.haveAllPieces())
502         }
503 }
504
505 func (cn *connection) fillRequests() {
506         cn.pieceRequestOrder.IterTyped(func(piece int) (more bool) {
507                 if cn.t.cl.config.Debug && cn.t.havePiece(piece) {
508                         panic(piece)
509                 }
510                 return cn.requestPiecePendingChunks(piece)
511         })
512 }
513
514 func (cn *connection) requestPiecePendingChunks(piece int) (again bool) {
515         return cn.t.connRequestPiecePendingChunks(cn, piece)
516 }
517
518 func (cn *connection) stopRequestingPiece(piece int) {
519         cn.pieceRequestOrder.Remove(piece)
520 }
521
522 func (cn *connection) updatePiecePriority(piece int) {
523         tpp := cn.t.piecePriority(piece)
524         if !cn.PeerHasPiece(piece) {
525                 tpp = PiecePriorityNone
526         }
527         if tpp == PiecePriorityNone {
528                 cn.stopRequestingPiece(piece)
529                 return
530         }
531         prio := cn.getPieceInclination()[piece]
532         switch tpp {
533         case PiecePriorityNormal:
534         case PiecePriorityReadahead:
535                 prio -= cn.t.numPieces()
536         case PiecePriorityNext, PiecePriorityNow:
537                 prio -= 2 * cn.t.numPieces()
538         default:
539                 panic(tpp)
540         }
541         prio += piece / 2
542         cn.pieceRequestOrder.Set(piece, prio)
543         cn.updateRequests()
544 }
545
546 func (cn *connection) getPieceInclination() []int {
547         if cn.pieceInclination == nil {
548                 cn.pieceInclination = cn.t.getConnPieceInclination()
549         }
550         return cn.pieceInclination
551 }
552
553 func (cn *connection) discardPieceInclination() {
554         if cn.pieceInclination == nil {
555                 return
556         }
557         cn.t.putPieceInclination(cn.pieceInclination)
558         cn.pieceInclination = nil
559 }
560
561 func (cn *connection) peerHasPieceChanged(piece int) {
562         cn.updatePiecePriority(piece)
563 }
564
565 func (cn *connection) peerPiecesChanged() {
566         if cn.t.haveInfo() {
567                 for i := range iter.N(cn.t.numPieces()) {
568                         cn.peerHasPieceChanged(i)
569                 }
570         }
571 }
572
573 func (cn *connection) raisePeerMinPieces(newMin int) {
574         if newMin > cn.peerMinPieces {
575                 cn.peerMinPieces = newMin
576         }
577 }
578
579 func (cn *connection) peerSentHave(piece int) error {
580         if cn.t.haveInfo() && piece >= cn.t.numPieces() {
581                 return errors.New("invalid piece")
582         }
583         if cn.PeerHasPiece(piece) {
584                 return nil
585         }
586         cn.raisePeerMinPieces(piece + 1)
587         cn.peerPieces.Set(piece, true)
588         cn.peerHasPieceChanged(piece)
589         return nil
590 }
591
592 func (cn *connection) peerSentBitfield(bf []bool) error {
593         cn.peerHasAll = false
594         if len(bf)%8 != 0 {
595                 panic("expected bitfield length divisible by 8")
596         }
597         // We know that the last byte means that at most the last 7 bits are
598         // wasted.
599         cn.raisePeerMinPieces(len(bf) - 7)
600         if cn.t.haveInfo() && len(bf) > cn.t.numPieces() {
601                 // Ignore known excess pieces.
602                 bf = bf[:cn.t.numPieces()]
603         }
604         for i, have := range bf {
605                 if have {
606                         cn.raisePeerMinPieces(i + 1)
607                 }
608                 cn.peerPieces.Set(i, have)
609         }
610         cn.peerPiecesChanged()
611         return nil
612 }
613
614 func (cn *connection) peerSentHaveAll() error {
615         cn.peerHasAll = true
616         cn.peerPieces.Clear()
617         cn.peerPiecesChanged()
618         return nil
619 }
620
621 func (cn *connection) peerSentHaveNone() error {
622         cn.peerPieces.Clear()
623         cn.peerHasAll = false
624         cn.peerPiecesChanged()
625         return nil
626 }
627
628 func (c *connection) requestPendingMetadata() {
629         if c.t.haveInfo() {
630                 return
631         }
632         if c.PeerExtensionIDs["ut_metadata"] == 0 {
633                 // Peer doesn't support this.
634                 return
635         }
636         // Request metadata pieces that we don't have in a random order.
637         var pending []int
638         for index := 0; index < c.t.metadataPieceCount(); index++ {
639                 if !c.t.haveMetadataPiece(index) && !c.requestedMetadataPiece(index) {
640                         pending = append(pending, index)
641                 }
642         }
643         for _, i := range rand.Perm(len(pending)) {
644                 c.requestMetadataPiece(pending[i])
645         }
646 }
647
648 func (cn *connection) wroteMsg(msg pp.Message) {
649         cn.stats.wroteMsg(msg)
650         cn.t.stats.wroteMsg(msg)
651 }
652
653 func (cn *connection) wroteBytes(b []byte) {
654         cn.stats.wroteBytes(b)
655         cn.t.stats.wroteBytes(b)
656 }
657
658 // Returns whether the connection is currently useful to us. We're seeding and
659 // they want data, we don't have metainfo and they can provide it, etc.
660 func (c *connection) useful() bool {
661         t := c.t
662         if c.closed.IsSet() {
663                 return false
664         }
665         if !t.haveInfo() {
666                 return c.supportsExtension("ut_metadata")
667         }
668         if t.seeding() {
669                 return c.PeerInterested
670         }
671         return t.connHasWantedPieces(c)
672 }
673
674 func (c *connection) lastHelpful() time.Time {
675         lasts := []time.Time{c.lastUsefulChunkReceived}
676         if c.t.seeding() {
677                 lasts = append(lasts, c.lastChunkSent)
678         }
679         return missinggo.Max(time.Time.Before, slices.ToEmptyInterface(lasts)...).(time.Time)
680 }