]> Sergey Matveev's repositories - btrtrc.git/blob - connection.go
75c26d46ce653bfe7b93f2e96236595a3ab8f7fd
[btrtrc.git] / connection.go
1 package torrent
2
3 import (
4         "bufio"
5         "bytes"
6         "container/list"
7         "encoding"
8         "errors"
9         "expvar"
10         "fmt"
11         "io"
12         "net"
13         "sync"
14         "time"
15
16         "github.com/anacrolix/torrent/bencode"
17         pp "github.com/anacrolix/torrent/peer_protocol"
18 )
19
20 var optimizedCancels = expvar.NewInt("optimizedCancels")
21
22 type peerSource byte
23
24 const (
25         peerSourceTracker  = '\x00' // It's the default.
26         peerSourceIncoming = 'I'
27         peerSourceDHT      = 'H'
28         peerSourcePEX      = 'X'
29 )
30
31 // Maintains the state of a connection with a peer.
32 type connection struct {
33         t         *torrent
34         conn      net.Conn
35         rw        io.ReadWriter // The real slim shady
36         encrypted bool
37         Discovery peerSource
38         uTP       bool
39         closing   chan struct{}
40         mu        sync.Mutex // Only for closing.
41         post      chan pp.Message
42         writeCh   chan []byte
43
44         UnwantedChunksReceived int
45         UsefulChunksReceived   int
46         chunksSent             int
47
48         lastMessageReceived     time.Time
49         completedHandshake      time.Time
50         lastUsefulChunkReceived time.Time
51         lastChunkSent           time.Time
52
53         // Stuff controlled by the local peer.
54         Interested       bool
55         Choked           bool
56         Requests         map[request]struct{}
57         requestsLowWater int
58         // Indexed by metadata piece, set to true if posted and pending a
59         // response.
60         metadataRequests []bool
61         sentHaves        []bool
62
63         // Stuff controlled by the remote peer.
64         PeerID             [20]byte
65         PeerInterested     bool
66         PeerChoked         bool
67         PeerRequests       map[request]struct{}
68         PeerExtensionBytes peerExtensionBytes
69         // Whether the peer has the given piece. nil if they've not sent any
70         // related messages yet.
71         PeerPieces []bool
72         peerHasAll bool
73         // Pieces we've accepted chunks for from the peer.
74         peerTouchedPieces map[int]struct{}
75
76         PeerMaxRequests  int // Maximum pending requests the peer allows.
77         PeerExtensionIDs map[string]byte
78         PeerClientName   string
79 }
80
81 func newConnection() (c *connection) {
82         c = &connection{
83                 Choked:          true,
84                 PeerChoked:      true,
85                 PeerMaxRequests: 250,
86
87                 closing: make(chan struct{}),
88                 writeCh: make(chan []byte),
89                 post:    make(chan pp.Message),
90         }
91         return
92 }
93
94 func (cn *connection) remoteAddr() net.Addr {
95         return cn.conn.RemoteAddr()
96 }
97
98 func (cn *connection) localAddr() net.Addr {
99         return cn.conn.LocalAddr()
100 }
101
102 func (cn *connection) supportsExtension(ext string) bool {
103         _, ok := cn.PeerExtensionIDs[ext]
104         return ok
105 }
106
107 func (cn *connection) completedString(t *torrent) string {
108         if cn.PeerPieces == nil && !cn.peerHasAll {
109                 return "?"
110         }
111         return fmt.Sprintf("%d/%d", func() int {
112                 if cn.peerHasAll {
113                         if t.haveInfo() {
114                                 return t.numPieces()
115                         }
116                         return -1
117                 }
118                 ret := 0
119                 for _, b := range cn.PeerPieces {
120                         if b {
121                                 ret++
122                         }
123                 }
124                 return ret
125         }(), func() int {
126                 if cn.peerHasAll || cn.PeerPieces == nil {
127                         if t.haveInfo() {
128                                 return t.numPieces()
129                         }
130                         return -1
131                 }
132                 return len(cn.PeerPieces)
133         }())
134 }
135
136 // Correct the PeerPieces slice length. Return false if the existing slice is
137 // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
138 // messages.
139 func (cn *connection) setNumPieces(num int) error {
140         if cn.peerHasAll {
141                 return nil
142         }
143         if cn.PeerPieces == nil {
144                 return nil
145         }
146         if len(cn.PeerPieces) == num {
147         } else if len(cn.PeerPieces) < num {
148                 cn.PeerPieces = append(cn.PeerPieces, make([]bool, num-len(cn.PeerPieces))...)
149         } else if len(cn.PeerPieces) <= (num+7)/8*8 {
150                 for _, have := range cn.PeerPieces[num:] {
151                         if have {
152                                 return errors.New("peer has invalid piece")
153                         }
154                 }
155                 cn.PeerPieces = cn.PeerPieces[:num]
156         } else {
157                 return fmt.Errorf("peer bitfield is excessively long: expected %d, have %d", num, len(cn.PeerPieces))
158         }
159         if len(cn.PeerPieces) != num {
160                 panic("wat")
161         }
162         return nil
163 }
164
165 func eventAgeString(t time.Time) string {
166         if t.IsZero() {
167                 return "never"
168         }
169         return fmt.Sprintf("%.2fs ago", time.Now().Sub(t).Seconds())
170 }
171
172 func (cn *connection) connectionFlags() (ret string) {
173         c := func(b byte) {
174                 ret += string([]byte{b})
175         }
176         if cn.encrypted {
177                 c('E')
178         }
179         if cn.Discovery != 0 {
180                 c(byte(cn.Discovery))
181         }
182         if cn.uTP {
183                 c('T')
184         }
185         return
186 }
187
188 // Inspired by https://trac.transmissionbt.com/wiki/PeerStatusText
189 func (cn *connection) statusFlags() (ret string) {
190         c := func(b byte) {
191                 ret += string([]byte{b})
192         }
193         if cn.Interested {
194                 c('i')
195         }
196         if cn.Choked {
197                 c('c')
198         }
199         c('-')
200         ret += cn.connectionFlags()
201         c('-')
202         if cn.PeerInterested {
203                 c('i')
204         }
205         if cn.PeerChoked {
206                 c('c')
207         }
208         return
209 }
210
211 func (cn *connection) String() string {
212         var buf bytes.Buffer
213         cn.WriteStatus(&buf, nil)
214         return buf.String()
215 }
216
217 func (cn *connection) WriteStatus(w io.Writer, t *torrent) {
218         // \t isn't preserved in <pre> blocks?
219         fmt.Fprintf(w, "%+q: %s-%s\n", cn.PeerID, cn.localAddr(), cn.remoteAddr())
220         fmt.Fprintf(w, "    last msg: %s, connected: %s, last useful chunk: %s\n",
221                 eventAgeString(cn.lastMessageReceived),
222                 eventAgeString(cn.completedHandshake),
223                 eventAgeString(cn.lastUsefulChunkReceived))
224         fmt.Fprintf(w,
225                 "    %s completed, %d pieces touched, good chunks: %d/%d-%d reqq: %d-%d, flags: %s\n",
226                 cn.completedString(t),
227                 len(cn.peerTouchedPieces),
228                 cn.UsefulChunksReceived,
229                 cn.UnwantedChunksReceived+cn.UsefulChunksReceived,
230                 cn.chunksSent,
231                 len(cn.Requests),
232                 len(cn.PeerRequests),
233                 cn.statusFlags(),
234         )
235 }
236
237 func (c *connection) Close() {
238         c.mu.Lock()
239         defer c.mu.Unlock()
240         select {
241         case <-c.closing:
242                 return
243         default:
244         }
245         close(c.closing)
246         // TODO: This call blocks sometimes, why?
247         go c.conn.Close()
248 }
249
250 func (c *connection) PeerHasPiece(piece int) bool {
251         if c.peerHasAll {
252                 return true
253         }
254         if piece >= len(c.PeerPieces) {
255                 return false
256         }
257         return c.PeerPieces[piece]
258 }
259
260 func (c *connection) Post(msg pp.Message) {
261         select {
262         case c.post <- msg:
263         case <-c.closing:
264         }
265 }
266
267 func (c *connection) RequestPending(r request) bool {
268         _, ok := c.Requests[r]
269         return ok
270 }
271
272 func (c *connection) requestMetadataPiece(index int) {
273         eID := c.PeerExtensionIDs["ut_metadata"]
274         if eID == 0 {
275                 return
276         }
277         if index < len(c.metadataRequests) && c.metadataRequests[index] {
278                 return
279         }
280         c.Post(pp.Message{
281                 Type:       pp.Extended,
282                 ExtendedID: eID,
283                 ExtendedPayload: func() []byte {
284                         b, err := bencode.Marshal(map[string]int{
285                                 "msg_type": pp.RequestMetadataExtensionMsgType,
286                                 "piece":    index,
287                         })
288                         if err != nil {
289                                 panic(err)
290                         }
291                         return b
292                 }(),
293         })
294         for index >= len(c.metadataRequests) {
295                 c.metadataRequests = append(c.metadataRequests, false)
296         }
297         c.metadataRequests[index] = true
298 }
299
300 func (c *connection) requestedMetadataPiece(index int) bool {
301         return index < len(c.metadataRequests) && c.metadataRequests[index]
302 }
303
304 // Returns true if more requests can be sent.
305 func (c *connection) Request(chunk request) bool {
306         if len(c.Requests) >= c.PeerMaxRequests {
307                 return false
308         }
309         if !c.PeerHasPiece(int(chunk.Index)) {
310                 return true
311         }
312         if c.RequestPending(chunk) {
313                 return true
314         }
315         c.SetInterested(true)
316         if c.PeerChoked {
317                 return false
318         }
319         if c.Requests == nil {
320                 c.Requests = make(map[request]struct{}, c.PeerMaxRequests)
321         }
322         c.Requests[chunk] = struct{}{}
323         c.requestsLowWater = len(c.Requests) / 2
324         c.Post(pp.Message{
325                 Type:   pp.Request,
326                 Index:  chunk.Index,
327                 Begin:  chunk.Begin,
328                 Length: chunk.Length,
329         })
330         return true
331 }
332
333 // Returns true if an unsatisfied request was canceled.
334 func (c *connection) Cancel(r request) bool {
335         if c.Requests == nil {
336                 return false
337         }
338         if _, ok := c.Requests[r]; !ok {
339                 return false
340         }
341         delete(c.Requests, r)
342         c.Post(pp.Message{
343                 Type:   pp.Cancel,
344                 Index:  r.Index,
345                 Begin:  r.Begin,
346                 Length: r.Length,
347         })
348         return true
349 }
350
351 // Returns true if an unsatisfied request was canceled.
352 func (c *connection) PeerCancel(r request) bool {
353         if c.PeerRequests == nil {
354                 return false
355         }
356         if _, ok := c.PeerRequests[r]; !ok {
357                 return false
358         }
359         delete(c.PeerRequests, r)
360         return true
361 }
362
363 func (c *connection) Choke() {
364         if c.Choked {
365                 return
366         }
367         c.Post(pp.Message{
368                 Type: pp.Choke,
369         })
370         c.PeerRequests = nil
371         c.Choked = true
372 }
373
374 func (c *connection) Unchoke() {
375         if !c.Choked {
376                 return
377         }
378         c.Post(pp.Message{
379                 Type: pp.Unchoke,
380         })
381         c.Choked = false
382 }
383
384 func (c *connection) SetInterested(interested bool) {
385         if c.Interested == interested {
386                 return
387         }
388         c.Post(pp.Message{
389                 Type: func() pp.MessageType {
390                         if interested {
391                                 return pp.Interested
392                         } else {
393                                 return pp.NotInterested
394                         }
395                 }(),
396         })
397         c.Interested = interested
398 }
399
400 var (
401         // Track connection writer buffer writes and flushes, to determine its
402         // efficiency.
403         connectionWriterFlush = expvar.NewInt("connectionWriterFlush")
404         connectionWriterWrite = expvar.NewInt("connectionWriterWrite")
405 )
406
407 // Writes buffers to the socket from the write channel.
408 func (conn *connection) writer() {
409         // Reduce write syscalls.
410         buf := bufio.NewWriter(conn.rw)
411         for {
412                 if buf.Buffered() == 0 {
413                         // There's nothing to write, so block until we get something.
414                         select {
415                         case b, ok := <-conn.writeCh:
416                                 if !ok {
417                                         return
418                                 }
419                                 connectionWriterWrite.Add(1)
420                                 _, err := buf.Write(b)
421                                 if err != nil {
422                                         conn.Close()
423                                         return
424                                 }
425                         case <-conn.closing:
426                                 return
427                         }
428                 } else {
429                         // We already have something to write, so flush if there's nothing
430                         // more to write.
431                         select {
432                         case b, ok := <-conn.writeCh:
433                                 if !ok {
434                                         return
435                                 }
436                                 connectionWriterWrite.Add(1)
437                                 _, err := buf.Write(b)
438                                 if err != nil {
439                                         conn.Close()
440                                         return
441                                 }
442                         case <-conn.closing:
443                                 return
444                         default:
445                                 connectionWriterFlush.Add(1)
446                                 err := buf.Flush()
447                                 if err != nil {
448                                         conn.Close()
449                                         return
450                                 }
451                         }
452                 }
453         }
454 }
455
456 func (conn *connection) writeOptimizer(keepAliveDelay time.Duration) {
457         defer close(conn.writeCh) // Responsible for notifying downstream routines.
458         pending := list.New()     // Message queue.
459         var nextWrite []byte      // Set to nil if we need to need to marshal the next message.
460         timer := time.NewTimer(keepAliveDelay)
461         defer timer.Stop()
462         lastWrite := time.Now()
463         for {
464                 write := conn.writeCh // Set to nil if there's nothing to write.
465                 if pending.Len() == 0 {
466                         write = nil
467                 } else if nextWrite == nil {
468                         var err error
469                         nextWrite, err = pending.Front().Value.(encoding.BinaryMarshaler).MarshalBinary()
470                         if err != nil {
471                                 panic(err)
472                         }
473                 }
474         event:
475                 select {
476                 case <-timer.C:
477                         if pending.Len() != 0 {
478                                 break
479                         }
480                         keepAliveTime := lastWrite.Add(keepAliveDelay)
481                         if time.Now().Before(keepAliveTime) {
482                                 timer.Reset(keepAliveTime.Sub(time.Now()))
483                                 break
484                         }
485                         pending.PushBack(pp.Message{Keepalive: true})
486                 case msg, ok := <-conn.post:
487                         if !ok {
488                                 return
489                         }
490                         if msg.Type == pp.Cancel {
491                                 for e := pending.Back(); e != nil; e = e.Prev() {
492                                         elemMsg := e.Value.(pp.Message)
493                                         if elemMsg.Type == pp.Request && msg.Index == elemMsg.Index && msg.Begin == elemMsg.Begin && msg.Length == elemMsg.Length {
494                                                 pending.Remove(e)
495                                                 optimizedCancels.Add(1)
496                                                 break event
497                                         }
498                                 }
499                         }
500                         pending.PushBack(msg)
501                 case write <- nextWrite:
502                         pending.Remove(pending.Front())
503                         nextWrite = nil
504                         lastWrite = time.Now()
505                         if pending.Len() == 0 {
506                                 timer.Reset(keepAliveDelay)
507                         }
508                 case <-conn.closing:
509                         return
510                 }
511         }
512 }
513
514 func (cn *connection) Have(piece int) {
515         for piece >= len(cn.sentHaves) {
516                 cn.sentHaves = append(cn.sentHaves, false)
517         }
518         if cn.sentHaves[piece] {
519                 return
520         }
521         cn.Post(pp.Message{
522                 Type:  pp.Have,
523                 Index: pp.Integer(piece),
524         })
525         cn.sentHaves[piece] = true
526 }
527
528 func (cn *connection) Bitfield(haves []bool) {
529         if cn.sentHaves != nil {
530                 panic("bitfield must be first have-related message sent")
531         }
532         cn.Post(pp.Message{
533                 Type:     pp.Bitfield,
534                 Bitfield: haves,
535         })
536         cn.sentHaves = haves
537 }
538
539 func (c *connection) updateRequests() {
540         if !c.t.haveInfo() {
541                 return
542         }
543         if c.Interested {
544                 if c.PeerChoked {
545                         return
546                 }
547                 if len(c.Requests) > c.requestsLowWater {
548                         return
549                 }
550         }
551         c.fillRequests()
552         if len(c.Requests) == 0 && !c.PeerChoked {
553                 // So we're not choked, but we don't want anything right now. We may
554                 // have completed readahead, and the readahead window has not rolled
555                 // over to the next piece. Better to stay interested in case we're
556                 // going to want data in the near future.
557                 c.SetInterested(!c.t.haveAllPieces())
558         }
559 }
560
561 func (c *connection) fillRequests() {
562         if !c.t.forUrgentPieces(func(piece int) (again bool) {
563                 return c.t.connRequestPiecePendingChunks(c, piece)
564         }) {
565                 return
566         }
567         c.t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
568                 for i := begin + 1; i < end; i++ {
569                         if !c.t.connRequestPiecePendingChunks(c, i) {
570                                 return false
571                         }
572                 }
573                 return true
574         })
575         for i := range c.t.pendingPieces {
576                 if !c.t.connRequestPiecePendingChunks(c, i) {
577                         return
578                 }
579         }
580 }