]> Sergey Matveev's repositories - btrtrc.git/blob - connection.go
943e1a7cfcc29271dd99c499739678bedeedee54
[btrtrc.git] / connection.go
1 package torrent
2
3 import (
4         "bufio"
5         "container/list"
6         "encoding"
7         "errors"
8         "expvar"
9         "fmt"
10         "io"
11         "net"
12         "sync"
13         "time"
14
15         "github.com/anacrolix/torrent/bencode"
16         "github.com/anacrolix/torrent/internal/pieceordering"
17         pp "github.com/anacrolix/torrent/peer_protocol"
18 )
19
20 var optimizedCancels = expvar.NewInt("optimizedCancels")
21
22 type peerSource byte
23
24 const (
25         peerSourceIncoming = 'I'
26         peerSourceDHT      = 'H'
27         peerSourcePEX      = 'X'
28 )
29
30 // Maintains the state of a connection with a peer.
31 type connection struct {
32         conn      net.Conn
33         rw        io.ReadWriter // The real slim shady
34         encrypted bool
35         Discovery peerSource
36         uTP       bool
37         closing   chan struct{}
38         mu        sync.Mutex // Only for closing.
39         post      chan pp.Message
40         writeCh   chan []byte
41
42         // The connection's preferred order to download pieces. The index is the
43         // piece, the value is its priority.
44         piecePriorities []int
45         // The piece request order based on piece priorities.
46         pieceRequestOrder *pieceordering.Instance
47
48         UnwantedChunksReceived int
49         UsefulChunksReceived   int
50         chunksSent             int
51
52         lastMessageReceived     time.Time
53         completedHandshake      time.Time
54         lastUsefulChunkReceived time.Time
55
56         // Stuff controlled by the local peer.
57         Interested       bool
58         Choked           bool
59         Requests         map[request]struct{}
60         requestsLowWater int
61         // Indexed by metadata piece, set to true if posted and pending a
62         // response.
63         metadataRequests []bool
64
65         // Stuff controlled by the remote peer.
66         PeerID             [20]byte
67         PeerInterested     bool
68         PeerChoked         bool
69         PeerRequests       map[request]struct{}
70         PeerExtensionBytes peerExtensionBytes
71         // Whether the peer has the given piece. nil if they've not sent any
72         // related messages yet.
73         PeerPieces []bool
74         peerHasAll bool
75
76         PeerMaxRequests  int // Maximum pending requests the peer allows.
77         PeerExtensionIDs map[string]byte
78         PeerClientName   string
79 }
80
81 func newConnection() (c *connection) {
82         c = &connection{
83                 Choked:          true,
84                 PeerChoked:      true,
85                 PeerMaxRequests: 250,
86
87                 closing: make(chan struct{}),
88                 writeCh: make(chan []byte),
89                 post:    make(chan pp.Message),
90         }
91         return
92 }
93
94 func (cn *connection) remoteAddr() net.Addr {
95         return cn.conn.RemoteAddr()
96 }
97
98 func (cn *connection) localAddr() net.Addr {
99         return cn.conn.LocalAddr()
100 }
101
102 // Adjust piece position in the request order for this connection based on the
103 // given piece priority.
104 func (cn *connection) pendPiece(piece int, priority piecePriority) {
105         if priority == PiecePriorityNone {
106                 cn.pieceRequestOrder.DeletePiece(piece)
107                 return
108         }
109         pp := cn.piecePriorities[piece]
110         // Priority regions not to scale. Within each region, piece is randomized
111         // according to connection.
112
113         // <-request first -- last->
114         // [ Now         ]
115         //  [ Next       ]
116         //   [ Readahead ]
117         //                [ Normal ]
118         key := func() int {
119                 switch priority {
120                 case PiecePriorityNow:
121                         return -3*len(cn.piecePriorities) + 3*pp
122                 case PiecePriorityNext:
123                         return -2*len(cn.piecePriorities) + 2*pp
124                 case PiecePriorityReadahead:
125                         return -len(cn.piecePriorities) + pp
126                 case PiecePriorityNormal:
127                         return pp
128                 default:
129                         panic(priority)
130                 }
131         }()
132         cn.pieceRequestOrder.SetPiece(piece, key)
133 }
134
135 func (cn *connection) supportsExtension(ext string) bool {
136         _, ok := cn.PeerExtensionIDs[ext]
137         return ok
138 }
139
140 func (cn *connection) completedString(t *torrent) string {
141         if cn.PeerPieces == nil && !cn.peerHasAll {
142                 return "?"
143         }
144         return fmt.Sprintf("%d/%d", func() int {
145                 if cn.peerHasAll {
146                         if t.haveInfo() {
147                                 return t.numPieces()
148                         }
149                         return -1
150                 }
151                 ret := 0
152                 for _, b := range cn.PeerPieces {
153                         if b {
154                                 ret++
155                         }
156                 }
157                 return ret
158         }(), func() int {
159                 if cn.peerHasAll || cn.PeerPieces == nil {
160                         if t.haveInfo() {
161                                 return t.numPieces()
162                         }
163                         return -1
164                 }
165                 return len(cn.PeerPieces)
166         }())
167 }
168
169 // Correct the PeerPieces slice length. Return false if the existing slice is
170 // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
171 // messages.
172 func (cn *connection) setNumPieces(num int) error {
173         if cn.peerHasAll {
174                 return nil
175         }
176         if cn.PeerPieces == nil {
177                 return nil
178         }
179         if len(cn.PeerPieces) == num {
180         } else if len(cn.PeerPieces) < num {
181                 cn.PeerPieces = append(cn.PeerPieces, make([]bool, num-len(cn.PeerPieces))...)
182         } else if len(cn.PeerPieces) <= (num+7)/8*8 {
183                 for _, have := range cn.PeerPieces[num:] {
184                         if have {
185                                 return errors.New("peer has invalid piece")
186                         }
187                 }
188                 cn.PeerPieces = cn.PeerPieces[:num]
189         } else {
190                 return fmt.Errorf("peer bitfield is excessively long: expected %d, have %d", num, len(cn.PeerPieces))
191         }
192         if len(cn.PeerPieces) != num {
193                 panic("wat")
194         }
195         return nil
196 }
197
198 func eventAgeString(t time.Time) string {
199         if t.IsZero() {
200                 return "never"
201         }
202         return fmt.Sprintf("%.2fs ago", time.Now().Sub(t).Seconds())
203 }
204
205 // Inspired by https://trac.transmissionbt.com/wiki/PeerStatusText
206 func (cn *connection) statusFlags() (ret string) {
207         c := func(b byte) {
208                 ret += string([]byte{b})
209         }
210         if cn.Interested {
211                 c('i')
212         }
213         if cn.Choked {
214                 c('c')
215         }
216         c('-')
217         if cn.encrypted {
218                 c('E')
219         }
220         if cn.Discovery != 0 {
221                 c(byte(cn.Discovery))
222         }
223         if cn.uTP {
224                 c('T')
225         }
226         c('-')
227         if cn.PeerInterested {
228                 c('i')
229         }
230         if cn.PeerChoked {
231                 c('c')
232         }
233         return
234 }
235
236 func (cn *connection) WriteStatus(w io.Writer, t *torrent) {
237         // \t isn't preserved in <pre> blocks?
238         fmt.Fprintf(w, "%q: %s-%s\n", cn.PeerID, cn.localAddr(), cn.remoteAddr())
239         fmt.Fprintf(w, "    last msg: %s, connected: %s, last useful chunk: %s\n",
240                 eventAgeString(cn.lastMessageReceived),
241                 eventAgeString(cn.completedHandshake),
242                 eventAgeString(cn.lastUsefulChunkReceived))
243         fmt.Fprintf(w,
244                 "    %s completed, good chunks: %d/%d-%d reqq: %d-%d, flags: %s\n",
245                 cn.completedString(t),
246                 cn.UsefulChunksReceived,
247                 cn.UnwantedChunksReceived+cn.UsefulChunksReceived,
248                 cn.chunksSent,
249                 len(cn.Requests),
250                 len(cn.PeerRequests),
251                 cn.statusFlags(),
252         )
253 }
254
255 func (c *connection) Close() {
256         c.mu.Lock()
257         defer c.mu.Unlock()
258         select {
259         case <-c.closing:
260                 return
261         default:
262         }
263         close(c.closing)
264         // TODO: This call blocks sometimes, why?
265         go c.conn.Close()
266 }
267
268 func (c *connection) PeerHasPiece(piece int) bool {
269         if c.peerHasAll {
270                 return true
271         }
272         if piece >= len(c.PeerPieces) {
273                 return false
274         }
275         return c.PeerPieces[piece]
276 }
277
278 func (c *connection) Post(msg pp.Message) {
279         select {
280         case c.post <- msg:
281         case <-c.closing:
282         }
283 }
284
285 func (c *connection) RequestPending(r request) bool {
286         _, ok := c.Requests[r]
287         return ok
288 }
289
290 func (c *connection) requestMetadataPiece(index int) {
291         eID := c.PeerExtensionIDs["ut_metadata"]
292         if eID == 0 {
293                 return
294         }
295         if index < len(c.metadataRequests) && c.metadataRequests[index] {
296                 return
297         }
298         c.Post(pp.Message{
299                 Type:       pp.Extended,
300                 ExtendedID: eID,
301                 ExtendedPayload: func() []byte {
302                         b, err := bencode.Marshal(map[string]int{
303                                 "msg_type": pp.RequestMetadataExtensionMsgType,
304                                 "piece":    index,
305                         })
306                         if err != nil {
307                                 panic(err)
308                         }
309                         return b
310                 }(),
311         })
312         for index >= len(c.metadataRequests) {
313                 c.metadataRequests = append(c.metadataRequests, false)
314         }
315         c.metadataRequests[index] = true
316 }
317
318 func (c *connection) requestedMetadataPiece(index int) bool {
319         return index < len(c.metadataRequests) && c.metadataRequests[index]
320 }
321
322 // Returns true if more requests can be sent.
323 func (c *connection) Request(chunk request) bool {
324         if len(c.Requests) >= c.PeerMaxRequests {
325                 return false
326         }
327         if !c.PeerHasPiece(int(chunk.Index)) {
328                 return true
329         }
330         if c.RequestPending(chunk) {
331                 return true
332         }
333         c.SetInterested(true)
334         if c.PeerChoked {
335                 return false
336         }
337         if c.Requests == nil {
338                 c.Requests = make(map[request]struct{}, c.PeerMaxRequests)
339         }
340         c.Requests[chunk] = struct{}{}
341         c.requestsLowWater = len(c.Requests) / 2
342         c.Post(pp.Message{
343                 Type:   pp.Request,
344                 Index:  chunk.Index,
345                 Begin:  chunk.Begin,
346                 Length: chunk.Length,
347         })
348         return true
349 }
350
351 // Returns true if an unsatisfied request was canceled.
352 func (c *connection) Cancel(r request) bool {
353         if c.Requests == nil {
354                 return false
355         }
356         if _, ok := c.Requests[r]; !ok {
357                 return false
358         }
359         delete(c.Requests, r)
360         c.Post(pp.Message{
361                 Type:   pp.Cancel,
362                 Index:  r.Index,
363                 Begin:  r.Begin,
364                 Length: r.Length,
365         })
366         return true
367 }
368
369 // Returns true if an unsatisfied request was canceled.
370 func (c *connection) PeerCancel(r request) bool {
371         if c.PeerRequests == nil {
372                 return false
373         }
374         if _, ok := c.PeerRequests[r]; !ok {
375                 return false
376         }
377         delete(c.PeerRequests, r)
378         return true
379 }
380
381 func (c *connection) Choke() {
382         if c.Choked {
383                 return
384         }
385         c.Post(pp.Message{
386                 Type: pp.Choke,
387         })
388         c.PeerRequests = nil
389         c.Choked = true
390 }
391
392 func (c *connection) Unchoke() {
393         if !c.Choked {
394                 return
395         }
396         c.Post(pp.Message{
397                 Type: pp.Unchoke,
398         })
399         c.Choked = false
400 }
401
402 func (c *connection) SetInterested(interested bool) {
403         if c.Interested == interested {
404                 return
405         }
406         c.Post(pp.Message{
407                 Type: func() pp.MessageType {
408                         if interested {
409                                 return pp.Interested
410                         } else {
411                                 return pp.NotInterested
412                         }
413                 }(),
414         })
415         c.Interested = interested
416 }
417
418 // Writes buffers to the socket from the write channel.
419 func (conn *connection) writer() {
420         // Reduce write syscalls.
421         buf := bufio.NewWriterSize(conn.rw, 0x8000) // 32 KiB
422         // Receives when buf is not empty.
423         notEmpty := make(chan struct{}, 1)
424         for {
425                 if buf.Buffered() != 0 {
426                         // Make sure it's receivable.
427                         select {
428                         case notEmpty <- struct{}{}:
429                         default:
430                         }
431                 }
432                 select {
433                 case b, ok := <-conn.writeCh:
434                         if !ok {
435                                 return
436                         }
437                         _, err := buf.Write(b)
438                         if err != nil {
439                                 conn.Close()
440                                 return
441                         }
442                 case <-conn.closing:
443                         return
444                 case <-notEmpty:
445                         err := buf.Flush()
446                         if err != nil {
447                                 return
448                         }
449                 }
450         }
451 }
452
453 func (conn *connection) writeOptimizer(keepAliveDelay time.Duration) {
454         defer close(conn.writeCh) // Responsible for notifying downstream routines.
455         pending := list.New()     // Message queue.
456         var nextWrite []byte      // Set to nil if we need to need to marshal the next message.
457         timer := time.NewTimer(keepAliveDelay)
458         defer timer.Stop()
459         lastWrite := time.Now()
460         for {
461                 write := conn.writeCh // Set to nil if there's nothing to write.
462                 if pending.Len() == 0 {
463                         write = nil
464                 } else if nextWrite == nil {
465                         var err error
466                         nextWrite, err = pending.Front().Value.(encoding.BinaryMarshaler).MarshalBinary()
467                         if err != nil {
468                                 panic(err)
469                         }
470                 }
471         event:
472                 select {
473                 case <-timer.C:
474                         if pending.Len() != 0 {
475                                 break
476                         }
477                         keepAliveTime := lastWrite.Add(keepAliveDelay)
478                         if time.Now().Before(keepAliveTime) {
479                                 timer.Reset(keepAliveTime.Sub(time.Now()))
480                                 break
481                         }
482                         pending.PushBack(pp.Message{Keepalive: true})
483                 case msg, ok := <-conn.post:
484                         if !ok {
485                                 return
486                         }
487                         if msg.Type == pp.Cancel {
488                                 for e := pending.Back(); e != nil; e = e.Prev() {
489                                         elemMsg := e.Value.(pp.Message)
490                                         if elemMsg.Type == pp.Request && msg.Index == elemMsg.Index && msg.Begin == elemMsg.Begin && msg.Length == elemMsg.Length {
491                                                 pending.Remove(e)
492                                                 optimizedCancels.Add(1)
493                                                 break event
494                                         }
495                                 }
496                         }
497                         pending.PushBack(msg)
498                 case write <- nextWrite:
499                         pending.Remove(pending.Front())
500                         nextWrite = nil
501                         lastWrite = time.Now()
502                         if pending.Len() == 0 {
503                                 timer.Reset(keepAliveDelay)
504                         }
505                 case <-conn.closing:
506                         return
507                 }
508         }
509 }