]> Sergey Matveev's repositories - btrtrc.git/blob - connection.go
Reintroduce connection piece inclinations, and begin caching piece priorities
[btrtrc.git] / connection.go
1 package torrent
2
3 import (
4         "bufio"
5         "bytes"
6         "container/list"
7         "encoding"
8         "errors"
9         "expvar"
10         "fmt"
11         "io"
12         "net"
13         "time"
14
15         "github.com/anacrolix/missinggo"
16         "github.com/anacrolix/missinggo/itertools"
17         "github.com/anacrolix/missinggo/prioritybitmap"
18
19         "github.com/anacrolix/torrent/bencode"
20         pp "github.com/anacrolix/torrent/peer_protocol"
21 )
22
23 var optimizedCancels = expvar.NewInt("optimizedCancels")
24
25 type peerSource byte
26
27 const (
28         peerSourceTracker  = '\x00' // It's the default.
29         peerSourceIncoming = 'I'
30         peerSourceDHT      = 'H'
31         peerSourcePEX      = 'X'
32 )
33
34 // Maintains the state of a connection with a peer.
35 type connection struct {
36         t         *torrent
37         conn      net.Conn
38         rw        io.ReadWriter // The real slim shady
39         encrypted bool
40         Discovery peerSource
41         uTP       bool
42         closed    missinggo.Event
43         post      chan pp.Message
44         writeCh   chan []byte
45
46         UnwantedChunksReceived int
47         UsefulChunksReceived   int
48         chunksSent             int
49
50         lastMessageReceived     time.Time
51         completedHandshake      time.Time
52         lastUsefulChunkReceived time.Time
53         lastChunkSent           time.Time
54
55         // Stuff controlled by the local peer.
56         Interested       bool
57         Choked           bool
58         Requests         map[request]struct{}
59         requestsLowWater int
60         // Indexed by metadata piece, set to true if posted and pending a
61         // response.
62         metadataRequests []bool
63         sentHaves        []bool
64
65         // Stuff controlled by the remote peer.
66         PeerID             [20]byte
67         PeerInterested     bool
68         PeerChoked         bool
69         PeerRequests       map[request]struct{}
70         PeerExtensionBytes peerExtensionBytes
71         // Whether the peer has the given piece. nil if they've not sent any
72         // related messages yet.
73         PeerPieces []bool
74         peerHasAll bool
75         // Pieces we've accepted chunks for from the peer.
76         peerTouchedPieces map[int]struct{}
77
78         PeerMaxRequests  int // Maximum pending requests the peer allows.
79         PeerExtensionIDs map[string]byte
80         PeerClientName   string
81
82         pieceInclination  []int
83         pieceRequestOrder prioritybitmap.PriorityBitmap
84 }
85
86 func newConnection() (c *connection) {
87         c = &connection{
88                 Choked:          true,
89                 PeerChoked:      true,
90                 PeerMaxRequests: 250,
91
92                 writeCh: make(chan []byte),
93                 post:    make(chan pp.Message),
94         }
95         return
96 }
97
98 func (cn *connection) remoteAddr() net.Addr {
99         return cn.conn.RemoteAddr()
100 }
101
102 func (cn *connection) localAddr() net.Addr {
103         return cn.conn.LocalAddr()
104 }
105
106 func (cn *connection) supportsExtension(ext string) bool {
107         _, ok := cn.PeerExtensionIDs[ext]
108         return ok
109 }
110
111 func (cn *connection) completedString(t *torrent) string {
112         if cn.PeerPieces == nil && !cn.peerHasAll {
113                 return "?"
114         }
115         return fmt.Sprintf("%d/%d", func() int {
116                 if cn.peerHasAll {
117                         if t.haveInfo() {
118                                 return t.numPieces()
119                         }
120                         return -1
121                 }
122                 ret := 0
123                 for _, b := range cn.PeerPieces {
124                         if b {
125                                 ret++
126                         }
127                 }
128                 return ret
129         }(), func() int {
130                 if cn.peerHasAll || cn.PeerPieces == nil {
131                         if t.haveInfo() {
132                                 return t.numPieces()
133                         }
134                         return -1
135                 }
136                 return len(cn.PeerPieces)
137         }())
138 }
139
140 // Correct the PeerPieces slice length. Return false if the existing slice is
141 // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
142 // messages.
143 func (cn *connection) setNumPieces(num int) error {
144         if cn.peerHasAll {
145                 return nil
146         }
147         if cn.PeerPieces == nil {
148                 return nil
149         }
150         if len(cn.PeerPieces) == num {
151         } else if len(cn.PeerPieces) < num {
152                 cn.PeerPieces = append(cn.PeerPieces, make([]bool, num-len(cn.PeerPieces))...)
153         } else if len(cn.PeerPieces) <= (num+7)/8*8 {
154                 for _, have := range cn.PeerPieces[num:] {
155                         if have {
156                                 return errors.New("peer has invalid piece")
157                         }
158                 }
159                 cn.PeerPieces = cn.PeerPieces[:num]
160         } else {
161                 return fmt.Errorf("peer bitfield is excessively long: expected %d, have %d", num, len(cn.PeerPieces))
162         }
163         if len(cn.PeerPieces) != num {
164                 panic("wat")
165         }
166         return nil
167 }
168
169 func eventAgeString(t time.Time) string {
170         if t.IsZero() {
171                 return "never"
172         }
173         return fmt.Sprintf("%.2fs ago", time.Now().Sub(t).Seconds())
174 }
175
176 func (cn *connection) connectionFlags() (ret string) {
177         c := func(b byte) {
178                 ret += string([]byte{b})
179         }
180         if cn.encrypted {
181                 c('E')
182         }
183         if cn.Discovery != 0 {
184                 c(byte(cn.Discovery))
185         }
186         if cn.uTP {
187                 c('T')
188         }
189         return
190 }
191
192 // Inspired by https://trac.transmissionbt.com/wiki/PeerStatusText
193 func (cn *connection) statusFlags() (ret string) {
194         c := func(b byte) {
195                 ret += string([]byte{b})
196         }
197         if cn.Interested {
198                 c('i')
199         }
200         if cn.Choked {
201                 c('c')
202         }
203         c('-')
204         ret += cn.connectionFlags()
205         c('-')
206         if cn.PeerInterested {
207                 c('i')
208         }
209         if cn.PeerChoked {
210                 c('c')
211         }
212         return
213 }
214
215 func (cn *connection) String() string {
216         var buf bytes.Buffer
217         cn.WriteStatus(&buf, nil)
218         return buf.String()
219 }
220
221 func (cn *connection) WriteStatus(w io.Writer, t *torrent) {
222         // \t isn't preserved in <pre> blocks?
223         fmt.Fprintf(w, "%+q: %s-%s\n", cn.PeerID, cn.localAddr(), cn.remoteAddr())
224         fmt.Fprintf(w, "    last msg: %s, connected: %s, last useful chunk: %s\n",
225                 eventAgeString(cn.lastMessageReceived),
226                 eventAgeString(cn.completedHandshake),
227                 eventAgeString(cn.lastUsefulChunkReceived))
228         fmt.Fprintf(w,
229                 "    %s completed, %d pieces touched, good chunks: %d/%d-%d reqq: %d-%d, flags: %s\n",
230                 cn.completedString(t),
231                 len(cn.peerTouchedPieces),
232                 cn.UsefulChunksReceived,
233                 cn.UnwantedChunksReceived+cn.UsefulChunksReceived,
234                 cn.chunksSent,
235                 len(cn.Requests),
236                 len(cn.PeerRequests),
237                 cn.statusFlags(),
238         )
239 }
240
241 func (c *connection) Close() {
242         c.closed.Set()
243         c.discardPieceInclination()
244         // TODO: This call blocks sometimes, why?
245         go c.conn.Close()
246 }
247
248 func (c *connection) PeerHasPiece(piece int) bool {
249         if c.peerHasAll {
250                 return true
251         }
252         if piece >= len(c.PeerPieces) {
253                 return false
254         }
255         return c.PeerPieces[piece]
256 }
257
258 func (c *connection) Post(msg pp.Message) {
259         select {
260         case c.post <- msg:
261         case <-c.closed.C():
262         }
263 }
264
265 func (c *connection) RequestPending(r request) bool {
266         _, ok := c.Requests[r]
267         return ok
268 }
269
270 func (c *connection) requestMetadataPiece(index int) {
271         eID := c.PeerExtensionIDs["ut_metadata"]
272         if eID == 0 {
273                 return
274         }
275         if index < len(c.metadataRequests) && c.metadataRequests[index] {
276                 return
277         }
278         c.Post(pp.Message{
279                 Type:       pp.Extended,
280                 ExtendedID: eID,
281                 ExtendedPayload: func() []byte {
282                         b, err := bencode.Marshal(map[string]int{
283                                 "msg_type": pp.RequestMetadataExtensionMsgType,
284                                 "piece":    index,
285                         })
286                         if err != nil {
287                                 panic(err)
288                         }
289                         return b
290                 }(),
291         })
292         for index >= len(c.metadataRequests) {
293                 c.metadataRequests = append(c.metadataRequests, false)
294         }
295         c.metadataRequests[index] = true
296 }
297
298 func (c *connection) requestedMetadataPiece(index int) bool {
299         return index < len(c.metadataRequests) && c.metadataRequests[index]
300 }
301
302 // Returns true if more requests can be sent.
303 func (c *connection) Request(chunk request) bool {
304         if len(c.Requests) >= c.PeerMaxRequests {
305                 return false
306         }
307         if !c.PeerHasPiece(int(chunk.Index)) {
308                 return true
309         }
310         if c.RequestPending(chunk) {
311                 return true
312         }
313         c.SetInterested(true)
314         if c.PeerChoked {
315                 return false
316         }
317         if c.Requests == nil {
318                 c.Requests = make(map[request]struct{}, c.PeerMaxRequests)
319         }
320         c.Requests[chunk] = struct{}{}
321         c.requestsLowWater = len(c.Requests) / 2
322         c.Post(pp.Message{
323                 Type:   pp.Request,
324                 Index:  chunk.Index,
325                 Begin:  chunk.Begin,
326                 Length: chunk.Length,
327         })
328         return true
329 }
330
331 // Returns true if an unsatisfied request was canceled.
332 func (c *connection) Cancel(r request) bool {
333         if c.Requests == nil {
334                 return false
335         }
336         if _, ok := c.Requests[r]; !ok {
337                 return false
338         }
339         delete(c.Requests, r)
340         c.Post(pp.Message{
341                 Type:   pp.Cancel,
342                 Index:  r.Index,
343                 Begin:  r.Begin,
344                 Length: r.Length,
345         })
346         return true
347 }
348
349 // Returns true if an unsatisfied request was canceled.
350 func (c *connection) PeerCancel(r request) bool {
351         if c.PeerRequests == nil {
352                 return false
353         }
354         if _, ok := c.PeerRequests[r]; !ok {
355                 return false
356         }
357         delete(c.PeerRequests, r)
358         return true
359 }
360
361 func (c *connection) Choke() {
362         if c.Choked {
363                 return
364         }
365         c.Post(pp.Message{
366                 Type: pp.Choke,
367         })
368         c.PeerRequests = nil
369         c.Choked = true
370 }
371
372 func (c *connection) Unchoke() {
373         if !c.Choked {
374                 return
375         }
376         c.Post(pp.Message{
377                 Type: pp.Unchoke,
378         })
379         c.Choked = false
380 }
381
382 func (c *connection) SetInterested(interested bool) {
383         if c.Interested == interested {
384                 return
385         }
386         c.Post(pp.Message{
387                 Type: func() pp.MessageType {
388                         if interested {
389                                 return pp.Interested
390                         } else {
391                                 return pp.NotInterested
392                         }
393                 }(),
394         })
395         c.Interested = interested
396 }
397
398 var (
399         // Track connection writer buffer writes and flushes, to determine its
400         // efficiency.
401         connectionWriterFlush = expvar.NewInt("connectionWriterFlush")
402         connectionWriterWrite = expvar.NewInt("connectionWriterWrite")
403 )
404
405 // Writes buffers to the socket from the write channel.
406 func (conn *connection) writer() {
407         // Reduce write syscalls.
408         buf := bufio.NewWriter(conn.rw)
409         for {
410                 if buf.Buffered() == 0 {
411                         // There's nothing to write, so block until we get something.
412                         select {
413                         case b, ok := <-conn.writeCh:
414                                 if !ok {
415                                         return
416                                 }
417                                 connectionWriterWrite.Add(1)
418                                 _, err := buf.Write(b)
419                                 if err != nil {
420                                         conn.Close()
421                                         return
422                                 }
423                         case <-conn.closed.C():
424                                 return
425                         }
426                 } else {
427                         // We already have something to write, so flush if there's nothing
428                         // more to write.
429                         select {
430                         case b, ok := <-conn.writeCh:
431                                 if !ok {
432                                         return
433                                 }
434                                 connectionWriterWrite.Add(1)
435                                 _, err := buf.Write(b)
436                                 if err != nil {
437                                         conn.Close()
438                                         return
439                                 }
440                         case <-conn.closed.C():
441                                 return
442                         default:
443                                 connectionWriterFlush.Add(1)
444                                 err := buf.Flush()
445                                 if err != nil {
446                                         conn.Close()
447                                         return
448                                 }
449                         }
450                 }
451         }
452 }
453
454 func (conn *connection) writeOptimizer(keepAliveDelay time.Duration) {
455         defer close(conn.writeCh) // Responsible for notifying downstream routines.
456         pending := list.New()     // Message queue.
457         var nextWrite []byte      // Set to nil if we need to need to marshal the next message.
458         timer := time.NewTimer(keepAliveDelay)
459         defer timer.Stop()
460         lastWrite := time.Now()
461         for {
462                 write := conn.writeCh // Set to nil if there's nothing to write.
463                 if pending.Len() == 0 {
464                         write = nil
465                 } else if nextWrite == nil {
466                         var err error
467                         nextWrite, err = pending.Front().Value.(encoding.BinaryMarshaler).MarshalBinary()
468                         if err != nil {
469                                 panic(err)
470                         }
471                 }
472         event:
473                 select {
474                 case <-timer.C:
475                         if pending.Len() != 0 {
476                                 break
477                         }
478                         keepAliveTime := lastWrite.Add(keepAliveDelay)
479                         if time.Now().Before(keepAliveTime) {
480                                 timer.Reset(keepAliveTime.Sub(time.Now()))
481                                 break
482                         }
483                         pending.PushBack(pp.Message{Keepalive: true})
484                 case msg, ok := <-conn.post:
485                         if !ok {
486                                 return
487                         }
488                         if msg.Type == pp.Cancel {
489                                 for e := pending.Back(); e != nil; e = e.Prev() {
490                                         elemMsg := e.Value.(pp.Message)
491                                         if elemMsg.Type == pp.Request && msg.Index == elemMsg.Index && msg.Begin == elemMsg.Begin && msg.Length == elemMsg.Length {
492                                                 pending.Remove(e)
493                                                 optimizedCancels.Add(1)
494                                                 break event
495                                         }
496                                 }
497                         }
498                         pending.PushBack(msg)
499                 case write <- nextWrite:
500                         pending.Remove(pending.Front())
501                         nextWrite = nil
502                         lastWrite = time.Now()
503                         if pending.Len() == 0 {
504                                 timer.Reset(keepAliveDelay)
505                         }
506                 case <-conn.closed.C():
507                         return
508                 }
509         }
510 }
511
512 func (cn *connection) Have(piece int) {
513         for piece >= len(cn.sentHaves) {
514                 cn.sentHaves = append(cn.sentHaves, false)
515         }
516         if cn.sentHaves[piece] {
517                 return
518         }
519         cn.Post(pp.Message{
520                 Type:  pp.Have,
521                 Index: pp.Integer(piece),
522         })
523         cn.sentHaves[piece] = true
524 }
525
526 func (cn *connection) Bitfield(haves []bool) {
527         if cn.sentHaves != nil {
528                 panic("bitfield must be first have-related message sent")
529         }
530         cn.Post(pp.Message{
531                 Type:     pp.Bitfield,
532                 Bitfield: haves,
533         })
534         cn.sentHaves = haves
535 }
536
537 func (c *connection) updateRequests() {
538         if !c.t.haveInfo() {
539                 return
540         }
541         if c.Interested {
542                 if c.PeerChoked {
543                         return
544                 }
545                 if len(c.Requests) > c.requestsLowWater {
546                         return
547                 }
548         }
549         c.fillRequests()
550         if len(c.Requests) == 0 && !c.PeerChoked {
551                 // So we're not choked, but we don't want anything right now. We may
552                 // have completed readahead, and the readahead window has not rolled
553                 // over to the next piece. Better to stay interested in case we're
554                 // going to want data in the near future.
555                 c.SetInterested(!c.t.haveAllPieces())
556         }
557 }
558
559 func (c *connection) fillRequests() {
560         itertools.ForIterable(&c.pieceRequestOrder, func(_piece interface{}) (more bool) {
561                 return c.requestPiecePendingChunks(_piece.(int))
562         })
563 }
564
565 func (c *connection) requestPiecePendingChunks(piece int) (again bool) {
566         return c.t.connRequestPiecePendingChunks(c, piece)
567 }
568
569 func (c *connection) stopRequestingPiece(piece int) {
570         c.pieceRequestOrder.Remove(piece)
571 }
572
573 func (c *connection) updatePiecePriority(piece int) {
574         if !c.PeerHasPiece(piece) {
575                 return
576         }
577         tpp := c.t.piecePriority(piece)
578         if tpp == PiecePriorityNone {
579                 c.stopRequestingPiece(piece)
580                 return
581         }
582         prio := c.getPieceInclination()[piece]
583         switch tpp {
584         case PiecePriorityNormal:
585         case PiecePriorityReadahead:
586                 prio -= c.t.numPieces()
587         case PiecePriorityNext, PiecePriorityNow:
588                 prio -= 2 * c.t.numPieces()
589         default:
590                 panic(tpp)
591         }
592         c.pieceRequestOrder.Set(piece, prio)
593         c.updateRequests()
594 }
595
596 func (c *connection) getPieceInclination() []int {
597         if c.pieceInclination == nil {
598                 c.pieceInclination = c.t.getConnPieceInclination()
599         }
600         return c.pieceInclination
601 }
602
603 func (c *connection) discardPieceInclination() {
604         if c.pieceInclination == nil {
605                 return
606         }
607         c.t.putPieceInclination(c.pieceInclination)
608         c.pieceInclination = nil
609 }