]> Sergey Matveev's repositories - btrtrc.git/blob - connection.go
14c24c85bce03838b36bc3f1acac8b6483997c8e
[btrtrc.git] / connection.go
1 package torrent
2
3 import (
4         "bufio"
5         "bytes"
6         "container/list"
7         "encoding"
8         "errors"
9         "expvar"
10         "fmt"
11         "io"
12         "net"
13         "sync"
14         "time"
15
16         "github.com/anacrolix/torrent/bencode"
17         "github.com/anacrolix/torrent/internal/pieceordering"
18         pp "github.com/anacrolix/torrent/peer_protocol"
19 )
20
21 var optimizedCancels = expvar.NewInt("optimizedCancels")
22
23 type peerSource byte
24
25 const (
26         peerSourceTracker  = '\x00' // It's the default.
27         peerSourceIncoming = 'I'
28         peerSourceDHT      = 'H'
29         peerSourcePEX      = 'X'
30 )
31
32 // Maintains the state of a connection with a peer.
33 type connection struct {
34         conn      net.Conn
35         rw        io.ReadWriter // The real slim shady
36         encrypted bool
37         Discovery peerSource
38         uTP       bool
39         closing   chan struct{}
40         mu        sync.Mutex // Only for closing.
41         post      chan pp.Message
42         writeCh   chan []byte
43
44         // The connection's preferred order to download pieces. The index is the
45         // piece, the value is its priority.
46         piecePriorities []int
47         // The piece request order based on piece priorities.
48         pieceRequestOrder *pieceordering.Instance
49
50         UnwantedChunksReceived int
51         UsefulChunksReceived   int
52         chunksSent             int
53
54         lastMessageReceived     time.Time
55         completedHandshake      time.Time
56         lastUsefulChunkReceived time.Time
57         lastChunkSent           time.Time
58
59         // Stuff controlled by the local peer.
60         Interested       bool
61         Choked           bool
62         Requests         map[request]struct{}
63         requestsLowWater int
64         // Indexed by metadata piece, set to true if posted and pending a
65         // response.
66         metadataRequests []bool
67         sentHaves        []bool
68
69         // Stuff controlled by the remote peer.
70         PeerID             [20]byte
71         PeerInterested     bool
72         PeerChoked         bool
73         PeerRequests       map[request]struct{}
74         PeerExtensionBytes peerExtensionBytes
75         // Whether the peer has the given piece. nil if they've not sent any
76         // related messages yet.
77         PeerPieces []bool
78         peerHasAll bool
79         // Pieces we've accepted chunks for from the peer.
80         peerTouchedPieces map[int]struct{}
81
82         PeerMaxRequests  int // Maximum pending requests the peer allows.
83         PeerExtensionIDs map[string]byte
84         PeerClientName   string
85 }
86
87 func newConnection() (c *connection) {
88         c = &connection{
89                 Choked:          true,
90                 PeerChoked:      true,
91                 PeerMaxRequests: 250,
92
93                 closing: make(chan struct{}),
94                 writeCh: make(chan []byte),
95                 post:    make(chan pp.Message),
96         }
97         return
98 }
99
100 func (cn *connection) remoteAddr() net.Addr {
101         return cn.conn.RemoteAddr()
102 }
103
104 func (cn *connection) localAddr() net.Addr {
105         return cn.conn.LocalAddr()
106 }
107
108 // Adjust piece position in the request order for this connection based on the
109 // given piece priority.
110 func (cn *connection) pendPiece(piece int, priority piecePriority, t *torrent) {
111         if priority == PiecePriorityNone {
112                 cn.pieceRequestOrder.DeletePiece(piece)
113                 return
114         }
115         if cn.piecePriorities == nil {
116                 cn.piecePriorities = t.newConnPiecePriorities()
117         }
118         pp := cn.piecePriorities[piece]
119         // Priority regions not to scale. Within each region, piece is randomized
120         // according to connection.
121
122         // <-request first -- last->
123         // [ Now         ]
124         //  [ Next       ]
125         //   [ Readahead ]
126         //                [ Normal ]
127         key := func() int {
128                 switch priority {
129                 case PiecePriorityNow:
130                         return -3*len(cn.piecePriorities) + 3*pp
131                 case PiecePriorityNext:
132                         return -2*len(cn.piecePriorities) + 2*pp
133                 case PiecePriorityReadahead:
134                         return -len(cn.piecePriorities) + pp
135                 case PiecePriorityNormal:
136                         return pp
137                 default:
138                         panic(priority)
139                 }
140         }()
141         cn.pieceRequestOrder.SetPiece(piece, key)
142 }
143
144 func (cn *connection) supportsExtension(ext string) bool {
145         _, ok := cn.PeerExtensionIDs[ext]
146         return ok
147 }
148
149 func (cn *connection) completedString(t *torrent) string {
150         if cn.PeerPieces == nil && !cn.peerHasAll {
151                 return "?"
152         }
153         return fmt.Sprintf("%d/%d", func() int {
154                 if cn.peerHasAll {
155                         if t.haveInfo() {
156                                 return t.numPieces()
157                         }
158                         return -1
159                 }
160                 ret := 0
161                 for _, b := range cn.PeerPieces {
162                         if b {
163                                 ret++
164                         }
165                 }
166                 return ret
167         }(), func() int {
168                 if cn.peerHasAll || cn.PeerPieces == nil {
169                         if t.haveInfo() {
170                                 return t.numPieces()
171                         }
172                         return -1
173                 }
174                 return len(cn.PeerPieces)
175         }())
176 }
177
178 // Correct the PeerPieces slice length. Return false if the existing slice is
179 // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
180 // messages.
181 func (cn *connection) setNumPieces(num int) error {
182         if cn.peerHasAll {
183                 return nil
184         }
185         if cn.PeerPieces == nil {
186                 return nil
187         }
188         if len(cn.PeerPieces) == num {
189         } else if len(cn.PeerPieces) < num {
190                 cn.PeerPieces = append(cn.PeerPieces, make([]bool, num-len(cn.PeerPieces))...)
191         } else if len(cn.PeerPieces) <= (num+7)/8*8 {
192                 for _, have := range cn.PeerPieces[num:] {
193                         if have {
194                                 return errors.New("peer has invalid piece")
195                         }
196                 }
197                 cn.PeerPieces = cn.PeerPieces[:num]
198         } else {
199                 return fmt.Errorf("peer bitfield is excessively long: expected %d, have %d", num, len(cn.PeerPieces))
200         }
201         if len(cn.PeerPieces) != num {
202                 panic("wat")
203         }
204         return nil
205 }
206
207 func eventAgeString(t time.Time) string {
208         if t.IsZero() {
209                 return "never"
210         }
211         return fmt.Sprintf("%.2fs ago", time.Now().Sub(t).Seconds())
212 }
213
214 func (cn *connection) connectionFlags() (ret string) {
215         c := func(b byte) {
216                 ret += string([]byte{b})
217         }
218         if cn.encrypted {
219                 c('E')
220         }
221         if cn.Discovery != 0 {
222                 c(byte(cn.Discovery))
223         }
224         if cn.uTP {
225                 c('T')
226         }
227         return
228 }
229
230 // Inspired by https://trac.transmissionbt.com/wiki/PeerStatusText
231 func (cn *connection) statusFlags() (ret string) {
232         c := func(b byte) {
233                 ret += string([]byte{b})
234         }
235         if cn.Interested {
236                 c('i')
237         }
238         if cn.Choked {
239                 c('c')
240         }
241         c('-')
242         ret += cn.connectionFlags()
243         c('-')
244         if cn.PeerInterested {
245                 c('i')
246         }
247         if cn.PeerChoked {
248                 c('c')
249         }
250         return
251 }
252
253 func (cn *connection) String() string {
254         var buf bytes.Buffer
255         cn.WriteStatus(&buf, nil)
256         return buf.String()
257 }
258
259 func (cn *connection) WriteStatus(w io.Writer, t *torrent) {
260         // \t isn't preserved in <pre> blocks?
261         fmt.Fprintf(w, "%+q: %s-%s\n", cn.PeerID, cn.localAddr(), cn.remoteAddr())
262         fmt.Fprintf(w, "    last msg: %s, connected: %s, last useful chunk: %s\n",
263                 eventAgeString(cn.lastMessageReceived),
264                 eventAgeString(cn.completedHandshake),
265                 eventAgeString(cn.lastUsefulChunkReceived))
266         fmt.Fprintf(w,
267                 "    %s completed, %d pieces touched, good chunks: %d/%d-%d reqq: %d-%d, flags: %s\n",
268                 cn.completedString(t),
269                 len(cn.peerTouchedPieces),
270                 cn.UsefulChunksReceived,
271                 cn.UnwantedChunksReceived+cn.UsefulChunksReceived,
272                 cn.chunksSent,
273                 len(cn.Requests),
274                 len(cn.PeerRequests),
275                 cn.statusFlags(),
276         )
277 }
278
279 func (c *connection) Close() {
280         c.mu.Lock()
281         defer c.mu.Unlock()
282         select {
283         case <-c.closing:
284                 return
285         default:
286         }
287         close(c.closing)
288         // TODO: This call blocks sometimes, why?
289         go c.conn.Close()
290 }
291
292 func (c *connection) PeerHasPiece(piece int) bool {
293         if c.peerHasAll {
294                 return true
295         }
296         if piece >= len(c.PeerPieces) {
297                 return false
298         }
299         return c.PeerPieces[piece]
300 }
301
302 func (c *connection) Post(msg pp.Message) {
303         select {
304         case c.post <- msg:
305         case <-c.closing:
306         }
307 }
308
309 func (c *connection) RequestPending(r request) bool {
310         _, ok := c.Requests[r]
311         return ok
312 }
313
314 func (c *connection) requestMetadataPiece(index int) {
315         eID := c.PeerExtensionIDs["ut_metadata"]
316         if eID == 0 {
317                 return
318         }
319         if index < len(c.metadataRequests) && c.metadataRequests[index] {
320                 return
321         }
322         c.Post(pp.Message{
323                 Type:       pp.Extended,
324                 ExtendedID: eID,
325                 ExtendedPayload: func() []byte {
326                         b, err := bencode.Marshal(map[string]int{
327                                 "msg_type": pp.RequestMetadataExtensionMsgType,
328                                 "piece":    index,
329                         })
330                         if err != nil {
331                                 panic(err)
332                         }
333                         return b
334                 }(),
335         })
336         for index >= len(c.metadataRequests) {
337                 c.metadataRequests = append(c.metadataRequests, false)
338         }
339         c.metadataRequests[index] = true
340 }
341
342 func (c *connection) requestedMetadataPiece(index int) bool {
343         return index < len(c.metadataRequests) && c.metadataRequests[index]
344 }
345
346 // Returns true if more requests can be sent.
347 func (c *connection) Request(chunk request) bool {
348         if len(c.Requests) >= c.PeerMaxRequests {
349                 return false
350         }
351         if !c.PeerHasPiece(int(chunk.Index)) {
352                 return true
353         }
354         if c.RequestPending(chunk) {
355                 return true
356         }
357         c.SetInterested(true)
358         if c.PeerChoked {
359                 return false
360         }
361         if c.Requests == nil {
362                 c.Requests = make(map[request]struct{}, c.PeerMaxRequests)
363         }
364         c.Requests[chunk] = struct{}{}
365         c.requestsLowWater = len(c.Requests) / 2
366         c.Post(pp.Message{
367                 Type:   pp.Request,
368                 Index:  chunk.Index,
369                 Begin:  chunk.Begin,
370                 Length: chunk.Length,
371         })
372         return true
373 }
374
375 // Returns true if an unsatisfied request was canceled.
376 func (c *connection) Cancel(r request) bool {
377         if c.Requests == nil {
378                 return false
379         }
380         if _, ok := c.Requests[r]; !ok {
381                 return false
382         }
383         delete(c.Requests, r)
384         c.Post(pp.Message{
385                 Type:   pp.Cancel,
386                 Index:  r.Index,
387                 Begin:  r.Begin,
388                 Length: r.Length,
389         })
390         return true
391 }
392
393 // Returns true if an unsatisfied request was canceled.
394 func (c *connection) PeerCancel(r request) bool {
395         if c.PeerRequests == nil {
396                 return false
397         }
398         if _, ok := c.PeerRequests[r]; !ok {
399                 return false
400         }
401         delete(c.PeerRequests, r)
402         return true
403 }
404
405 func (c *connection) Choke() {
406         if c.Choked {
407                 return
408         }
409         c.Post(pp.Message{
410                 Type: pp.Choke,
411         })
412         c.PeerRequests = nil
413         c.Choked = true
414 }
415
416 func (c *connection) Unchoke() {
417         if !c.Choked {
418                 return
419         }
420         c.Post(pp.Message{
421                 Type: pp.Unchoke,
422         })
423         c.Choked = false
424 }
425
426 func (c *connection) SetInterested(interested bool) {
427         if c.Interested == interested {
428                 return
429         }
430         c.Post(pp.Message{
431                 Type: func() pp.MessageType {
432                         if interested {
433                                 return pp.Interested
434                         } else {
435                                 return pp.NotInterested
436                         }
437                 }(),
438         })
439         c.Interested = interested
440 }
441
442 var (
443         // Track connection writer buffer writes and flushes, to determine its
444         // efficiency.
445         connectionWriterFlush = expvar.NewInt("connectionWriterFlush")
446         connectionWriterWrite = expvar.NewInt("connectionWriterWrite")
447 )
448
449 // Writes buffers to the socket from the write channel.
450 func (conn *connection) writer() {
451         // Reduce write syscalls.
452         buf := bufio.NewWriter(conn.rw)
453         for {
454                 if buf.Buffered() == 0 {
455                         // There's nothing to write, so block until we get something.
456                         select {
457                         case b, ok := <-conn.writeCh:
458                                 if !ok {
459                                         return
460                                 }
461                                 connectionWriterWrite.Add(1)
462                                 _, err := buf.Write(b)
463                                 if err != nil {
464                                         conn.Close()
465                                         return
466                                 }
467                         case <-conn.closing:
468                                 return
469                         }
470                 } else {
471                         // We already have something to write, so flush if there's nothing
472                         // more to write.
473                         select {
474                         case b, ok := <-conn.writeCh:
475                                 if !ok {
476                                         return
477                                 }
478                                 connectionWriterWrite.Add(1)
479                                 _, err := buf.Write(b)
480                                 if err != nil {
481                                         conn.Close()
482                                         return
483                                 }
484                         case <-conn.closing:
485                                 return
486                         default:
487                                 connectionWriterFlush.Add(1)
488                                 err := buf.Flush()
489                                 if err != nil {
490                                         conn.Close()
491                                         return
492                                 }
493                         }
494                 }
495         }
496 }
497
498 func (conn *connection) writeOptimizer(keepAliveDelay time.Duration) {
499         defer close(conn.writeCh) // Responsible for notifying downstream routines.
500         pending := list.New()     // Message queue.
501         var nextWrite []byte      // Set to nil if we need to need to marshal the next message.
502         timer := time.NewTimer(keepAliveDelay)
503         defer timer.Stop()
504         lastWrite := time.Now()
505         for {
506                 write := conn.writeCh // Set to nil if there's nothing to write.
507                 if pending.Len() == 0 {
508                         write = nil
509                 } else if nextWrite == nil {
510                         var err error
511                         nextWrite, err = pending.Front().Value.(encoding.BinaryMarshaler).MarshalBinary()
512                         if err != nil {
513                                 panic(err)
514                         }
515                 }
516         event:
517                 select {
518                 case <-timer.C:
519                         if pending.Len() != 0 {
520                                 break
521                         }
522                         keepAliveTime := lastWrite.Add(keepAliveDelay)
523                         if time.Now().Before(keepAliveTime) {
524                                 timer.Reset(keepAliveTime.Sub(time.Now()))
525                                 break
526                         }
527                         pending.PushBack(pp.Message{Keepalive: true})
528                 case msg, ok := <-conn.post:
529                         if !ok {
530                                 return
531                         }
532                         if msg.Type == pp.Cancel {
533                                 for e := pending.Back(); e != nil; e = e.Prev() {
534                                         elemMsg := e.Value.(pp.Message)
535                                         if elemMsg.Type == pp.Request && msg.Index == elemMsg.Index && msg.Begin == elemMsg.Begin && msg.Length == elemMsg.Length {
536                                                 pending.Remove(e)
537                                                 optimizedCancels.Add(1)
538                                                 break event
539                                         }
540                                 }
541                         }
542                         pending.PushBack(msg)
543                 case write <- nextWrite:
544                         pending.Remove(pending.Front())
545                         nextWrite = nil
546                         lastWrite = time.Now()
547                         if pending.Len() == 0 {
548                                 timer.Reset(keepAliveDelay)
549                         }
550                 case <-conn.closing:
551                         return
552                 }
553         }
554 }
555
556 func (cn *connection) Have(piece int) {
557         for piece >= len(cn.sentHaves) {
558                 cn.sentHaves = append(cn.sentHaves, false)
559         }
560         if cn.sentHaves[piece] {
561                 return
562         }
563         cn.Post(pp.Message{
564                 Type:  pp.Have,
565                 Index: pp.Integer(piece),
566         })
567         cn.sentHaves[piece] = true
568 }
569
570 func (cn *connection) Bitfield(haves []bool) {
571         if cn.sentHaves != nil {
572                 panic("bitfield must be first have-related message sent")
573         }
574         cn.Post(pp.Message{
575                 Type:     pp.Bitfield,
576                 Bitfield: haves,
577         })
578         cn.sentHaves = haves
579 }