]> Sergey Matveev's repositories - btrtrc.git/blob - connection.go
51db22481db8da38816dd7b43ddb9fa8900dd356
[btrtrc.git] / connection.go
1 package torrent
2
3 import (
4         "bufio"
5         "bytes"
6         "container/list"
7         "encoding"
8         "errors"
9         "expvar"
10         "fmt"
11         "io"
12         "net"
13         "strconv"
14         "time"
15
16         "github.com/anacrolix/missinggo"
17         "github.com/anacrolix/missinggo/bitmap"
18         "github.com/anacrolix/missinggo/prioritybitmap"
19         "github.com/bradfitz/iter"
20
21         "github.com/anacrolix/torrent/bencode"
22         pp "github.com/anacrolix/torrent/peer_protocol"
23 )
24
25 var optimizedCancels = expvar.NewInt("optimizedCancels")
26
27 type peerSource byte
28
29 const (
30         peerSourceTracker  = '\x00' // It's the default.
31         peerSourceIncoming = 'I'
32         peerSourceDHT      = 'H'
33         peerSourcePEX      = 'X'
34 )
35
36 // Maintains the state of a connection with a peer.
37 type connection struct {
38         t         *torrent
39         conn      net.Conn
40         rw        io.ReadWriter // The real slim shady
41         encrypted bool
42         Discovery peerSource
43         uTP       bool
44         closed    missinggo.Event
45         post      chan pp.Message
46         writeCh   chan []byte
47
48         UnwantedChunksReceived int
49         UsefulChunksReceived   int
50         chunksSent             int
51
52         lastMessageReceived     time.Time
53         completedHandshake      time.Time
54         lastUsefulChunkReceived time.Time
55         lastChunkSent           time.Time
56
57         // Stuff controlled by the local peer.
58         Interested       bool
59         Choked           bool
60         Requests         map[request]struct{}
61         requestsLowWater int
62         // Indexed by metadata piece, set to true if posted and pending a
63         // response.
64         metadataRequests []bool
65         sentHaves        []bool
66
67         // Stuff controlled by the remote peer.
68         PeerID             [20]byte
69         PeerInterested     bool
70         PeerChoked         bool
71         PeerRequests       map[request]struct{}
72         PeerExtensionBytes peerExtensionBytes
73         // The pieces the peer has claimed to have.
74         peerPieces bitmap.Bitmap
75         // The peer has everything. This can occur due to a special message, when
76         // we may not even know the number of pieces in the torrent yet.
77         peerHasAll bool
78         // The highest possible number of pieces the torrent could have based on
79         // communication with the peer. Generally only useful until we have the
80         // torrent info.
81         peerMinPieces int
82         // Pieces we've accepted chunks for from the peer.
83         peerTouchedPieces map[int]struct{}
84
85         PeerMaxRequests  int // Maximum pending requests the peer allows.
86         PeerExtensionIDs map[string]byte
87         PeerClientName   string
88
89         pieceInclination  []int
90         pieceRequestOrder prioritybitmap.PriorityBitmap
91 }
92
93 func newConnection() (c *connection) {
94         c = &connection{
95                 Choked:          true,
96                 PeerChoked:      true,
97                 PeerMaxRequests: 250,
98
99                 writeCh: make(chan []byte),
100                 post:    make(chan pp.Message),
101         }
102         return
103 }
104
105 func (cn *connection) remoteAddr() net.Addr {
106         return cn.conn.RemoteAddr()
107 }
108
109 func (cn *connection) localAddr() net.Addr {
110         return cn.conn.LocalAddr()
111 }
112
113 func (cn *connection) supportsExtension(ext string) bool {
114         _, ok := cn.PeerExtensionIDs[ext]
115         return ok
116 }
117
118 // The best guess at number of pieces in the torrent for this peer.
119 func (cn *connection) bestPeerNumPieces() int {
120         if cn.t.haveInfo() {
121                 return cn.t.numPieces()
122         }
123         return cn.peerMinPieces
124 }
125
126 func (cn *connection) completedString() string {
127         return fmt.Sprintf("%d/%d", cn.peerPieces.Len(), cn.bestPeerNumPieces())
128 }
129
130 // Correct the PeerPieces slice length. Return false if the existing slice is
131 // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
132 // messages.
133 func (cn *connection) setNumPieces(num int) error {
134         cn.peerPieces.RemoveRange(num, -1)
135         cn.peerPiecesChanged()
136         return nil
137 }
138
139 func eventAgeString(t time.Time) string {
140         if t.IsZero() {
141                 return "never"
142         }
143         return fmt.Sprintf("%.2fs ago", time.Now().Sub(t).Seconds())
144 }
145
146 func (cn *connection) connectionFlags() (ret string) {
147         c := func(b byte) {
148                 ret += string([]byte{b})
149         }
150         if cn.encrypted {
151                 c('E')
152         }
153         if cn.Discovery != 0 {
154                 c(byte(cn.Discovery))
155         }
156         if cn.uTP {
157                 c('T')
158         }
159         return
160 }
161
162 // Inspired by https://trac.transmissionbt.com/wiki/PeerStatusText
163 func (cn *connection) statusFlags() (ret string) {
164         c := func(b byte) {
165                 ret += string([]byte{b})
166         }
167         if cn.Interested {
168                 c('i')
169         }
170         if cn.Choked {
171                 c('c')
172         }
173         c('-')
174         ret += cn.connectionFlags()
175         c('-')
176         if cn.PeerInterested {
177                 c('i')
178         }
179         if cn.PeerChoked {
180                 c('c')
181         }
182         return
183 }
184
185 func (cn *connection) String() string {
186         var buf bytes.Buffer
187         cn.WriteStatus(&buf, nil)
188         return buf.String()
189 }
190
191 func (cn *connection) WriteStatus(w io.Writer, t *torrent) {
192         // \t isn't preserved in <pre> blocks?
193         fmt.Fprintf(w, "%+q: %s-%s\n", cn.PeerID, cn.localAddr(), cn.remoteAddr())
194         fmt.Fprintf(w, "    last msg: %s, connected: %s, last useful chunk: %s\n",
195                 eventAgeString(cn.lastMessageReceived),
196                 eventAgeString(cn.completedHandshake),
197                 eventAgeString(cn.lastUsefulChunkReceived))
198         fmt.Fprintf(w,
199                 "    %s completed, %d pieces touched, good chunks: %d/%d-%d reqq: %d-%d, flags: %s\n",
200                 cn.completedString(),
201                 len(cn.peerTouchedPieces),
202                 cn.UsefulChunksReceived,
203                 cn.UnwantedChunksReceived+cn.UsefulChunksReceived,
204                 cn.chunksSent,
205                 len(cn.Requests),
206                 len(cn.PeerRequests),
207                 cn.statusFlags(),
208         )
209 }
210
211 func (c *connection) Close() {
212         c.closed.Set()
213         c.discardPieceInclination()
214         c.pieceRequestOrder.Clear()
215         // TODO: This call blocks sometimes, why?
216         go c.conn.Close()
217 }
218
219 func (c *connection) PeerHasPiece(piece int) bool {
220         return c.peerHasAll || c.peerPieces.Contains(piece)
221 }
222
223 func (c *connection) Post(msg pp.Message) {
224         select {
225         case c.post <- msg:
226                 postedMessageTypes.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
227         case <-c.closed.C():
228         }
229 }
230
231 func (c *connection) RequestPending(r request) bool {
232         _, ok := c.Requests[r]
233         return ok
234 }
235
236 func (c *connection) requestMetadataPiece(index int) {
237         eID := c.PeerExtensionIDs["ut_metadata"]
238         if eID == 0 {
239                 return
240         }
241         if index < len(c.metadataRequests) && c.metadataRequests[index] {
242                 return
243         }
244         c.Post(pp.Message{
245                 Type:       pp.Extended,
246                 ExtendedID: eID,
247                 ExtendedPayload: func() []byte {
248                         b, err := bencode.Marshal(map[string]int{
249                                 "msg_type": pp.RequestMetadataExtensionMsgType,
250                                 "piece":    index,
251                         })
252                         if err != nil {
253                                 panic(err)
254                         }
255                         return b
256                 }(),
257         })
258         for index >= len(c.metadataRequests) {
259                 c.metadataRequests = append(c.metadataRequests, false)
260         }
261         c.metadataRequests[index] = true
262 }
263
264 func (c *connection) requestedMetadataPiece(index int) bool {
265         return index < len(c.metadataRequests) && c.metadataRequests[index]
266 }
267
268 // The actual value to use as the maximum outbound requests.
269 func (c *connection) nominalMaxRequests() (ret int) {
270         ret = c.PeerMaxRequests
271         if ret > 64 {
272                 ret = 64
273         }
274         return
275 }
276
277 // Returns true if more requests can be sent.
278 func (c *connection) Request(chunk request) bool {
279         if len(c.Requests) >= c.nominalMaxRequests() {
280                 return false
281         }
282         if !c.PeerHasPiece(int(chunk.Index)) {
283                 return true
284         }
285         if c.RequestPending(chunk) {
286                 return true
287         }
288         c.SetInterested(true)
289         if c.PeerChoked {
290                 return false
291         }
292         if c.Requests == nil {
293                 c.Requests = make(map[request]struct{}, c.PeerMaxRequests)
294         }
295         c.Requests[chunk] = struct{}{}
296         c.requestsLowWater = len(c.Requests) / 2
297         c.Post(pp.Message{
298                 Type:   pp.Request,
299                 Index:  chunk.Index,
300                 Begin:  chunk.Begin,
301                 Length: chunk.Length,
302         })
303         return true
304 }
305
306 // Returns true if an unsatisfied request was canceled.
307 func (c *connection) Cancel(r request) bool {
308         if c.Requests == nil {
309                 return false
310         }
311         if _, ok := c.Requests[r]; !ok {
312                 return false
313         }
314         delete(c.Requests, r)
315         c.Post(pp.Message{
316                 Type:   pp.Cancel,
317                 Index:  r.Index,
318                 Begin:  r.Begin,
319                 Length: r.Length,
320         })
321         return true
322 }
323
324 // Returns true if an unsatisfied request was canceled.
325 func (c *connection) PeerCancel(r request) bool {
326         if c.PeerRequests == nil {
327                 return false
328         }
329         if _, ok := c.PeerRequests[r]; !ok {
330                 return false
331         }
332         delete(c.PeerRequests, r)
333         return true
334 }
335
336 func (c *connection) Choke() {
337         if c.Choked {
338                 return
339         }
340         c.Post(pp.Message{
341                 Type: pp.Choke,
342         })
343         c.PeerRequests = nil
344         c.Choked = true
345 }
346
347 func (c *connection) Unchoke() {
348         if !c.Choked {
349                 return
350         }
351         c.Post(pp.Message{
352                 Type: pp.Unchoke,
353         })
354         c.Choked = false
355 }
356
357 func (c *connection) SetInterested(interested bool) {
358         if c.Interested == interested {
359                 return
360         }
361         c.Post(pp.Message{
362                 Type: func() pp.MessageType {
363                         if interested {
364                                 return pp.Interested
365                         } else {
366                                 return pp.NotInterested
367                         }
368                 }(),
369         })
370         c.Interested = interested
371 }
372
373 var (
374         // Track connection writer buffer writes and flushes, to determine its
375         // efficiency.
376         connectionWriterFlush = expvar.NewInt("connectionWriterFlush")
377         connectionWriterWrite = expvar.NewInt("connectionWriterWrite")
378 )
379
380 // Writes buffers to the socket from the write channel.
381 func (conn *connection) writer() {
382         defer func() {
383                 conn.t.cl.mu.Lock()
384                 defer conn.t.cl.mu.Unlock()
385                 conn.Close()
386         }()
387         // Reduce write syscalls.
388         buf := bufio.NewWriter(conn.rw)
389         for {
390                 if buf.Buffered() == 0 {
391                         // There's nothing to write, so block until we get something.
392                         select {
393                         case b, ok := <-conn.writeCh:
394                                 if !ok {
395                                         return
396                                 }
397                                 connectionWriterWrite.Add(1)
398                                 _, err := buf.Write(b)
399                                 if err != nil {
400                                         return
401                                 }
402                         case <-conn.closed.C():
403                                 return
404                         }
405                 } else {
406                         // We already have something to write, so flush if there's nothing
407                         // more to write.
408                         select {
409                         case b, ok := <-conn.writeCh:
410                                 if !ok {
411                                         return
412                                 }
413                                 connectionWriterWrite.Add(1)
414                                 _, err := buf.Write(b)
415                                 if err != nil {
416                                         return
417                                 }
418                         case <-conn.closed.C():
419                                 return
420                         default:
421                                 connectionWriterFlush.Add(1)
422                                 err := buf.Flush()
423                                 if err != nil {
424                                         return
425                                 }
426                         }
427                 }
428         }
429 }
430
431 func (conn *connection) writeOptimizer(keepAliveDelay time.Duration) {
432         defer close(conn.writeCh) // Responsible for notifying downstream routines.
433         pending := list.New()     // Message queue.
434         var nextWrite []byte      // Set to nil if we need to need to marshal the next message.
435         timer := time.NewTimer(keepAliveDelay)
436         defer timer.Stop()
437         lastWrite := time.Now()
438         for {
439                 write := conn.writeCh // Set to nil if there's nothing to write.
440                 if pending.Len() == 0 {
441                         write = nil
442                 } else if nextWrite == nil {
443                         var err error
444                         nextWrite, err = pending.Front().Value.(encoding.BinaryMarshaler).MarshalBinary()
445                         if err != nil {
446                                 panic(err)
447                         }
448                 }
449         event:
450                 select {
451                 case <-timer.C:
452                         if pending.Len() != 0 {
453                                 break
454                         }
455                         keepAliveTime := lastWrite.Add(keepAliveDelay)
456                         if time.Now().Before(keepAliveTime) {
457                                 timer.Reset(keepAliveTime.Sub(time.Now()))
458                                 break
459                         }
460                         pending.PushBack(pp.Message{Keepalive: true})
461                         postedKeepalives.Add(1)
462                 case msg, ok := <-conn.post:
463                         if !ok {
464                                 return
465                         }
466                         if msg.Type == pp.Cancel {
467                                 for e := pending.Back(); e != nil; e = e.Prev() {
468                                         elemMsg := e.Value.(pp.Message)
469                                         if elemMsg.Type == pp.Request && msg.Index == elemMsg.Index && msg.Begin == elemMsg.Begin && msg.Length == elemMsg.Length {
470                                                 pending.Remove(e)
471                                                 optimizedCancels.Add(1)
472                                                 break event
473                                         }
474                                 }
475                         }
476                         pending.PushBack(msg)
477                 case write <- nextWrite:
478                         pending.Remove(pending.Front())
479                         nextWrite = nil
480                         lastWrite = time.Now()
481                         if pending.Len() == 0 {
482                                 timer.Reset(keepAliveDelay)
483                         }
484                 case <-conn.closed.C():
485                         return
486                 }
487         }
488 }
489
490 func (cn *connection) Have(piece int) {
491         for piece >= len(cn.sentHaves) {
492                 cn.sentHaves = append(cn.sentHaves, false)
493         }
494         if cn.sentHaves[piece] {
495                 return
496         }
497         cn.Post(pp.Message{
498                 Type:  pp.Have,
499                 Index: pp.Integer(piece),
500         })
501         cn.sentHaves[piece] = true
502 }
503
504 func (cn *connection) Bitfield(haves []bool) {
505         if cn.sentHaves != nil {
506                 panic("bitfield must be first have-related message sent")
507         }
508         cn.Post(pp.Message{
509                 Type:     pp.Bitfield,
510                 Bitfield: haves,
511         })
512         cn.sentHaves = haves
513 }
514
515 func (c *connection) updateRequests() {
516         if !c.t.haveInfo() {
517                 return
518         }
519         if c.Interested {
520                 if c.PeerChoked {
521                         return
522                 }
523                 if len(c.Requests) > c.requestsLowWater {
524                         return
525                 }
526         }
527         c.fillRequests()
528         if len(c.Requests) == 0 && !c.PeerChoked {
529                 // So we're not choked, but we don't want anything right now. We may
530                 // have completed readahead, and the readahead window has not rolled
531                 // over to the next piece. Better to stay interested in case we're
532                 // going to want data in the near future.
533                 c.SetInterested(!c.t.haveAllPieces())
534         }
535 }
536
537 func (c *connection) fillRequests() {
538         c.pieceRequestOrder.IterTyped(func(piece int) (more bool) {
539                 if c.t.cl.config.Debug && c.t.havePiece(piece) {
540                         panic(piece)
541                 }
542                 return c.requestPiecePendingChunks(piece)
543         })
544 }
545
546 func (c *connection) requestPiecePendingChunks(piece int) (again bool) {
547         return c.t.connRequestPiecePendingChunks(c, piece)
548 }
549
550 func (c *connection) stopRequestingPiece(piece int) {
551         c.pieceRequestOrder.Remove(piece)
552 }
553
554 func (c *connection) updatePiecePriority(piece int) {
555         tpp := c.t.piecePriority(piece)
556         if !c.PeerHasPiece(piece) {
557                 tpp = PiecePriorityNone
558         }
559         if tpp == PiecePriorityNone {
560                 c.stopRequestingPiece(piece)
561                 return
562         }
563         prio := c.getPieceInclination()[piece]
564         switch tpp {
565         case PiecePriorityNormal:
566         case PiecePriorityReadahead:
567                 prio -= c.t.numPieces()
568         case PiecePriorityNext, PiecePriorityNow:
569                 prio -= 2 * c.t.numPieces()
570         default:
571                 panic(tpp)
572         }
573         prio += piece
574         c.pieceRequestOrder.Set(piece, prio)
575         c.updateRequests()
576 }
577
578 func (c *connection) getPieceInclination() []int {
579         if c.pieceInclination == nil {
580                 c.pieceInclination = c.t.getConnPieceInclination()
581         }
582         return c.pieceInclination
583 }
584
585 func (c *connection) discardPieceInclination() {
586         if c.pieceInclination == nil {
587                 return
588         }
589         c.t.putPieceInclination(c.pieceInclination)
590         c.pieceInclination = nil
591 }
592
593 func (c *connection) peerHasPieceChanged(piece int) {
594         c.updatePiecePriority(piece)
595 }
596
597 func (c *connection) peerPiecesChanged() {
598         if c.t.haveInfo() {
599                 for i := range iter.N(c.t.numPieces()) {
600                         c.peerHasPieceChanged(i)
601                 }
602         }
603 }
604
605 func (c *connection) raisePeerMinPieces(newMin int) {
606         if newMin > c.peerMinPieces {
607                 c.peerMinPieces = newMin
608         }
609 }
610
611 func (c *connection) peerSentHave(piece int) error {
612         if c.t.haveInfo() && piece >= c.t.numPieces() {
613                 return errors.New("invalid piece")
614         }
615         if c.PeerHasPiece(piece) {
616                 return nil
617         }
618         c.raisePeerMinPieces(piece + 1)
619         c.peerPieces.Set(piece, true)
620         c.peerHasPieceChanged(piece)
621         return nil
622 }
623
624 func (c *connection) peerSentBitfield(bf []bool) error {
625         c.peerHasAll = false
626         if len(bf)%8 != 0 {
627                 panic("expected bitfield length divisible by 8")
628         }
629         // We know that the last byte means that at most the last 7 bits are
630         // wasted.
631         c.raisePeerMinPieces(len(bf) - 7)
632         if c.t.haveInfo() {
633                 // Ignore known excess pieces.
634                 bf = bf[:c.t.numPieces()]
635         }
636         for i, have := range bf {
637                 if have {
638                         c.raisePeerMinPieces(i + 1)
639                 }
640                 c.peerPieces.Set(i, have)
641         }
642         c.peerPiecesChanged()
643         return nil
644 }
645
646 func (cn *connection) peerSentHaveAll() error {
647         cn.peerHasAll = true
648         cn.peerPieces.Clear()
649         cn.peerPiecesChanged()
650         return nil
651 }
652
653 func (c *connection) peerSentHaveNone() error {
654         c.peerPieces.Clear()
655         c.peerHasAll = false
656         c.peerPiecesChanged()
657         return nil
658 }