]> Sergey Matveev's repositories - btrtrc.git/blob - connection.go
It's working and the tests are usually passing
[btrtrc.git] / connection.go
1 package torrent
2
3 import (
4         "bufio"
5         "bytes"
6         "container/list"
7         "encoding"
8         "errors"
9         "expvar"
10         "fmt"
11         "io"
12         "net"
13         "sync"
14         "time"
15
16         "github.com/anacrolix/torrent/bencode"
17         pp "github.com/anacrolix/torrent/peer_protocol"
18 )
19
20 var optimizedCancels = expvar.NewInt("optimizedCancels")
21
22 type peerSource byte
23
24 const (
25         peerSourceTracker  = '\x00' // It's the default.
26         peerSourceIncoming = 'I'
27         peerSourceDHT      = 'H'
28         peerSourcePEX      = 'X'
29 )
30
31 // Maintains the state of a connection with a peer.
32 type connection struct {
33         conn      net.Conn
34         rw        io.ReadWriter // The real slim shady
35         encrypted bool
36         Discovery peerSource
37         uTP       bool
38         closing   chan struct{}
39         mu        sync.Mutex // Only for closing.
40         post      chan pp.Message
41         writeCh   chan []byte
42
43         UnwantedChunksReceived int
44         UsefulChunksReceived   int
45         chunksSent             int
46
47         lastMessageReceived     time.Time
48         completedHandshake      time.Time
49         lastUsefulChunkReceived time.Time
50         lastChunkSent           time.Time
51
52         // Stuff controlled by the local peer.
53         Interested       bool
54         Choked           bool
55         Requests         map[request]struct{}
56         requestsLowWater int
57         // Indexed by metadata piece, set to true if posted and pending a
58         // response.
59         metadataRequests []bool
60         sentHaves        []bool
61
62         // Stuff controlled by the remote peer.
63         PeerID             [20]byte
64         PeerInterested     bool
65         PeerChoked         bool
66         PeerRequests       map[request]struct{}
67         PeerExtensionBytes peerExtensionBytes
68         // Whether the peer has the given piece. nil if they've not sent any
69         // related messages yet.
70         PeerPieces []bool
71         peerHasAll bool
72         // Pieces we've accepted chunks for from the peer.
73         peerTouchedPieces map[int]struct{}
74
75         PeerMaxRequests  int // Maximum pending requests the peer allows.
76         PeerExtensionIDs map[string]byte
77         PeerClientName   string
78 }
79
80 func newConnection() (c *connection) {
81         c = &connection{
82                 Choked:          true,
83                 PeerChoked:      true,
84                 PeerMaxRequests: 250,
85
86                 closing: make(chan struct{}),
87                 writeCh: make(chan []byte),
88                 post:    make(chan pp.Message),
89         }
90         return
91 }
92
93 func (cn *connection) remoteAddr() net.Addr {
94         return cn.conn.RemoteAddr()
95 }
96
97 func (cn *connection) localAddr() net.Addr {
98         return cn.conn.LocalAddr()
99 }
100
101 func (cn *connection) supportsExtension(ext string) bool {
102         _, ok := cn.PeerExtensionIDs[ext]
103         return ok
104 }
105
106 func (cn *connection) completedString(t *torrent) string {
107         if cn.PeerPieces == nil && !cn.peerHasAll {
108                 return "?"
109         }
110         return fmt.Sprintf("%d/%d", func() int {
111                 if cn.peerHasAll {
112                         if t.haveInfo() {
113                                 return t.numPieces()
114                         }
115                         return -1
116                 }
117                 ret := 0
118                 for _, b := range cn.PeerPieces {
119                         if b {
120                                 ret++
121                         }
122                 }
123                 return ret
124         }(), func() int {
125                 if cn.peerHasAll || cn.PeerPieces == nil {
126                         if t.haveInfo() {
127                                 return t.numPieces()
128                         }
129                         return -1
130                 }
131                 return len(cn.PeerPieces)
132         }())
133 }
134
135 // Correct the PeerPieces slice length. Return false if the existing slice is
136 // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
137 // messages.
138 func (cn *connection) setNumPieces(num int) error {
139         if cn.peerHasAll {
140                 return nil
141         }
142         if cn.PeerPieces == nil {
143                 return nil
144         }
145         if len(cn.PeerPieces) == num {
146         } else if len(cn.PeerPieces) < num {
147                 cn.PeerPieces = append(cn.PeerPieces, make([]bool, num-len(cn.PeerPieces))...)
148         } else if len(cn.PeerPieces) <= (num+7)/8*8 {
149                 for _, have := range cn.PeerPieces[num:] {
150                         if have {
151                                 return errors.New("peer has invalid piece")
152                         }
153                 }
154                 cn.PeerPieces = cn.PeerPieces[:num]
155         } else {
156                 return fmt.Errorf("peer bitfield is excessively long: expected %d, have %d", num, len(cn.PeerPieces))
157         }
158         if len(cn.PeerPieces) != num {
159                 panic("wat")
160         }
161         return nil
162 }
163
164 func eventAgeString(t time.Time) string {
165         if t.IsZero() {
166                 return "never"
167         }
168         return fmt.Sprintf("%.2fs ago", time.Now().Sub(t).Seconds())
169 }
170
171 func (cn *connection) connectionFlags() (ret string) {
172         c := func(b byte) {
173                 ret += string([]byte{b})
174         }
175         if cn.encrypted {
176                 c('E')
177         }
178         if cn.Discovery != 0 {
179                 c(byte(cn.Discovery))
180         }
181         if cn.uTP {
182                 c('T')
183         }
184         return
185 }
186
187 // Inspired by https://trac.transmissionbt.com/wiki/PeerStatusText
188 func (cn *connection) statusFlags() (ret string) {
189         c := func(b byte) {
190                 ret += string([]byte{b})
191         }
192         if cn.Interested {
193                 c('i')
194         }
195         if cn.Choked {
196                 c('c')
197         }
198         c('-')
199         ret += cn.connectionFlags()
200         c('-')
201         if cn.PeerInterested {
202                 c('i')
203         }
204         if cn.PeerChoked {
205                 c('c')
206         }
207         return
208 }
209
210 func (cn *connection) String() string {
211         var buf bytes.Buffer
212         cn.WriteStatus(&buf, nil)
213         return buf.String()
214 }
215
216 func (cn *connection) WriteStatus(w io.Writer, t *torrent) {
217         // \t isn't preserved in <pre> blocks?
218         fmt.Fprintf(w, "%+q: %s-%s\n", cn.PeerID, cn.localAddr(), cn.remoteAddr())
219         fmt.Fprintf(w, "    last msg: %s, connected: %s, last useful chunk: %s\n",
220                 eventAgeString(cn.lastMessageReceived),
221                 eventAgeString(cn.completedHandshake),
222                 eventAgeString(cn.lastUsefulChunkReceived))
223         fmt.Fprintf(w,
224                 "    %s completed, %d pieces touched, good chunks: %d/%d-%d reqq: %d-%d, flags: %s\n",
225                 cn.completedString(t),
226                 len(cn.peerTouchedPieces),
227                 cn.UsefulChunksReceived,
228                 cn.UnwantedChunksReceived+cn.UsefulChunksReceived,
229                 cn.chunksSent,
230                 len(cn.Requests),
231                 len(cn.PeerRequests),
232                 cn.statusFlags(),
233         )
234 }
235
236 func (c *connection) Close() {
237         c.mu.Lock()
238         defer c.mu.Unlock()
239         select {
240         case <-c.closing:
241                 return
242         default:
243         }
244         close(c.closing)
245         // TODO: This call blocks sometimes, why?
246         go c.conn.Close()
247 }
248
249 func (c *connection) PeerHasPiece(piece int) bool {
250         if c.peerHasAll {
251                 return true
252         }
253         if piece >= len(c.PeerPieces) {
254                 return false
255         }
256         return c.PeerPieces[piece]
257 }
258
259 func (c *connection) Post(msg pp.Message) {
260         select {
261         case c.post <- msg:
262         case <-c.closing:
263         }
264 }
265
266 func (c *connection) RequestPending(r request) bool {
267         _, ok := c.Requests[r]
268         return ok
269 }
270
271 func (c *connection) requestMetadataPiece(index int) {
272         eID := c.PeerExtensionIDs["ut_metadata"]
273         if eID == 0 {
274                 return
275         }
276         if index < len(c.metadataRequests) && c.metadataRequests[index] {
277                 return
278         }
279         c.Post(pp.Message{
280                 Type:       pp.Extended,
281                 ExtendedID: eID,
282                 ExtendedPayload: func() []byte {
283                         b, err := bencode.Marshal(map[string]int{
284                                 "msg_type": pp.RequestMetadataExtensionMsgType,
285                                 "piece":    index,
286                         })
287                         if err != nil {
288                                 panic(err)
289                         }
290                         return b
291                 }(),
292         })
293         for index >= len(c.metadataRequests) {
294                 c.metadataRequests = append(c.metadataRequests, false)
295         }
296         c.metadataRequests[index] = true
297 }
298
299 func (c *connection) requestedMetadataPiece(index int) bool {
300         return index < len(c.metadataRequests) && c.metadataRequests[index]
301 }
302
303 // Returns true if more requests can be sent.
304 func (c *connection) Request(chunk request) bool {
305         if len(c.Requests) >= c.PeerMaxRequests {
306                 return false
307         }
308         if !c.PeerHasPiece(int(chunk.Index)) {
309                 return true
310         }
311         if c.RequestPending(chunk) {
312                 return true
313         }
314         c.SetInterested(true)
315         if c.PeerChoked {
316                 return false
317         }
318         if c.Requests == nil {
319                 c.Requests = make(map[request]struct{}, c.PeerMaxRequests)
320         }
321         c.Requests[chunk] = struct{}{}
322         c.requestsLowWater = len(c.Requests) / 2
323         c.Post(pp.Message{
324                 Type:   pp.Request,
325                 Index:  chunk.Index,
326                 Begin:  chunk.Begin,
327                 Length: chunk.Length,
328         })
329         return true
330 }
331
332 // Returns true if an unsatisfied request was canceled.
333 func (c *connection) Cancel(r request) bool {
334         if c.Requests == nil {
335                 return false
336         }
337         if _, ok := c.Requests[r]; !ok {
338                 return false
339         }
340         delete(c.Requests, r)
341         c.Post(pp.Message{
342                 Type:   pp.Cancel,
343                 Index:  r.Index,
344                 Begin:  r.Begin,
345                 Length: r.Length,
346         })
347         return true
348 }
349
350 // Returns true if an unsatisfied request was canceled.
351 func (c *connection) PeerCancel(r request) bool {
352         if c.PeerRequests == nil {
353                 return false
354         }
355         if _, ok := c.PeerRequests[r]; !ok {
356                 return false
357         }
358         delete(c.PeerRequests, r)
359         return true
360 }
361
362 func (c *connection) Choke() {
363         if c.Choked {
364                 return
365         }
366         c.Post(pp.Message{
367                 Type: pp.Choke,
368         })
369         c.PeerRequests = nil
370         c.Choked = true
371 }
372
373 func (c *connection) Unchoke() {
374         if !c.Choked {
375                 return
376         }
377         c.Post(pp.Message{
378                 Type: pp.Unchoke,
379         })
380         c.Choked = false
381 }
382
383 func (c *connection) SetInterested(interested bool) {
384         if c.Interested == interested {
385                 return
386         }
387         c.Post(pp.Message{
388                 Type: func() pp.MessageType {
389                         if interested {
390                                 return pp.Interested
391                         } else {
392                                 return pp.NotInterested
393                         }
394                 }(),
395         })
396         c.Interested = interested
397 }
398
399 var (
400         // Track connection writer buffer writes and flushes, to determine its
401         // efficiency.
402         connectionWriterFlush = expvar.NewInt("connectionWriterFlush")
403         connectionWriterWrite = expvar.NewInt("connectionWriterWrite")
404 )
405
406 // Writes buffers to the socket from the write channel.
407 func (conn *connection) writer() {
408         // Reduce write syscalls.
409         buf := bufio.NewWriter(conn.rw)
410         for {
411                 if buf.Buffered() == 0 {
412                         // There's nothing to write, so block until we get something.
413                         select {
414                         case b, ok := <-conn.writeCh:
415                                 if !ok {
416                                         return
417                                 }
418                                 connectionWriterWrite.Add(1)
419                                 _, err := buf.Write(b)
420                                 if err != nil {
421                                         conn.Close()
422                                         return
423                                 }
424                         case <-conn.closing:
425                                 return
426                         }
427                 } else {
428                         // We already have something to write, so flush if there's nothing
429                         // more to write.
430                         select {
431                         case b, ok := <-conn.writeCh:
432                                 if !ok {
433                                         return
434                                 }
435                                 connectionWriterWrite.Add(1)
436                                 _, err := buf.Write(b)
437                                 if err != nil {
438                                         conn.Close()
439                                         return
440                                 }
441                         case <-conn.closing:
442                                 return
443                         default:
444                                 connectionWriterFlush.Add(1)
445                                 err := buf.Flush()
446                                 if err != nil {
447                                         conn.Close()
448                                         return
449                                 }
450                         }
451                 }
452         }
453 }
454
455 func (conn *connection) writeOptimizer(keepAliveDelay time.Duration) {
456         defer close(conn.writeCh) // Responsible for notifying downstream routines.
457         pending := list.New()     // Message queue.
458         var nextWrite []byte      // Set to nil if we need to need to marshal the next message.
459         timer := time.NewTimer(keepAliveDelay)
460         defer timer.Stop()
461         lastWrite := time.Now()
462         for {
463                 write := conn.writeCh // Set to nil if there's nothing to write.
464                 if pending.Len() == 0 {
465                         write = nil
466                 } else if nextWrite == nil {
467                         var err error
468                         nextWrite, err = pending.Front().Value.(encoding.BinaryMarshaler).MarshalBinary()
469                         if err != nil {
470                                 panic(err)
471                         }
472                 }
473         event:
474                 select {
475                 case <-timer.C:
476                         if pending.Len() != 0 {
477                                 break
478                         }
479                         keepAliveTime := lastWrite.Add(keepAliveDelay)
480                         if time.Now().Before(keepAliveTime) {
481                                 timer.Reset(keepAliveTime.Sub(time.Now()))
482                                 break
483                         }
484                         pending.PushBack(pp.Message{Keepalive: true})
485                 case msg, ok := <-conn.post:
486                         if !ok {
487                                 return
488                         }
489                         if msg.Type == pp.Cancel {
490                                 for e := pending.Back(); e != nil; e = e.Prev() {
491                                         elemMsg := e.Value.(pp.Message)
492                                         if elemMsg.Type == pp.Request && msg.Index == elemMsg.Index && msg.Begin == elemMsg.Begin && msg.Length == elemMsg.Length {
493                                                 pending.Remove(e)
494                                                 optimizedCancels.Add(1)
495                                                 break event
496                                         }
497                                 }
498                         }
499                         pending.PushBack(msg)
500                 case write <- nextWrite:
501                         pending.Remove(pending.Front())
502                         nextWrite = nil
503                         lastWrite = time.Now()
504                         if pending.Len() == 0 {
505                                 timer.Reset(keepAliveDelay)
506                         }
507                 case <-conn.closing:
508                         return
509                 }
510         }
511 }
512
513 func (cn *connection) Have(piece int) {
514         for piece >= len(cn.sentHaves) {
515                 cn.sentHaves = append(cn.sentHaves, false)
516         }
517         if cn.sentHaves[piece] {
518                 return
519         }
520         cn.Post(pp.Message{
521                 Type:  pp.Have,
522                 Index: pp.Integer(piece),
523         })
524         cn.sentHaves[piece] = true
525 }
526
527 func (cn *connection) Bitfield(haves []bool) {
528         if cn.sentHaves != nil {
529                 panic("bitfield must be first have-related message sent")
530         }
531         cn.Post(pp.Message{
532                 Type:     pp.Bitfield,
533                 Bitfield: haves,
534         })
535         cn.sentHaves = haves
536 }