]> Sergey Matveev's repositories - btrtrc.git/blob - connection.go
Rework the Torrent Reader interface, to allow reader options, and add "responsive...
[btrtrc.git] / connection.go
1 package torrent
2
3 import (
4         "bufio"
5         "container/list"
6         "encoding"
7         "errors"
8         "expvar"
9         "fmt"
10         "io"
11         "net"
12         "sync"
13         "time"
14
15         "github.com/anacrolix/libtorgo/bencode"
16
17         "github.com/anacrolix/torrent/internal/pieceordering"
18         pp "github.com/anacrolix/torrent/peer_protocol"
19 )
20
21 var optimizedCancels = expvar.NewInt("optimizedCancels")
22
23 type peerSource byte
24
25 const (
26         peerSourceIncoming = 'I'
27         peerSourceDHT      = 'H'
28         peerSourcePEX      = 'X'
29 )
30
31 // Maintains the state of a connection with a peer.
32 type connection struct {
33         conn      net.Conn
34         rw        io.ReadWriter // The real slim shady
35         encrypted bool
36         Discovery peerSource
37         uTP       bool
38         closing   chan struct{}
39         mu        sync.Mutex // Only for closing.
40         post      chan pp.Message
41         writeCh   chan []byte
42
43         // The connections preferred order to download pieces.
44         piecePriorities []int
45         // The piece request order based on piece priorities.
46         pieceRequestOrder *pieceordering.Instance
47
48         UnwantedChunksReceived int
49         UsefulChunksReceived   int
50
51         lastMessageReceived     time.Time
52         completedHandshake      time.Time
53         lastUsefulChunkReceived time.Time
54
55         // Stuff controlled by the local peer.
56         Interested       bool
57         Choked           bool
58         Requests         map[request]struct{}
59         requestsLowWater int
60         // Indexed by metadata piece, set to true if posted and pending a
61         // response.
62         metadataRequests []bool
63
64         // Stuff controlled by the remote peer.
65         PeerID             [20]byte
66         PeerInterested     bool
67         PeerChoked         bool
68         PeerRequests       map[request]struct{}
69         PeerExtensionBytes peerExtensionBytes
70         // Whether the peer has the given piece. nil if they've not sent any
71         // related messages yet.
72         PeerPieces []bool
73         peerHasAll bool
74
75         PeerMaxRequests  int // Maximum pending requests the peer allows.
76         PeerExtensionIDs map[string]byte
77         PeerClientName   string
78 }
79
80 func newConnection() (c *connection) {
81         c = &connection{
82                 Choked:          true,
83                 PeerChoked:      true,
84                 PeerMaxRequests: 250,
85
86                 closing: make(chan struct{}),
87                 writeCh: make(chan []byte),
88                 post:    make(chan pp.Message),
89         }
90         return
91 }
92
93 func (cn *connection) remoteAddr() net.Addr {
94         return cn.conn.RemoteAddr()
95 }
96
97 func (cn *connection) localAddr() net.Addr {
98         return cn.conn.LocalAddr()
99 }
100
101 // Adjust piece position in the request order for this connection based on the
102 // given piece priority.
103 func (cn *connection) pendPiece(piece int, priority piecePriority) {
104         if priority == piecePriorityNone {
105                 cn.pieceRequestOrder.DeletePiece(piece)
106                 return
107         }
108         pp := cn.piecePriorities[piece]
109         // Priority regions not to scale. Within each region, piece is randomized
110         // according to connection.
111
112         // [ Now         ]
113         //  [ Next       ]
114         //   [ Readahead ]
115         //                [ Normal ]
116         key := func() int {
117                 switch priority {
118                 case piecePriorityNow:
119                         return -3*len(cn.piecePriorities) + 3*pp
120                 case piecePriorityNext:
121                         return -2*len(cn.piecePriorities) + 2*pp
122                 case piecePriorityReadahead:
123                         return -len(cn.piecePriorities) + pp
124                 case piecePriorityNormal:
125                         return pp
126                 default:
127                         panic(priority)
128                 }
129         }()
130         cn.pieceRequestOrder.SetPiece(piece, key)
131 }
132
133 func (cn *connection) supportsExtension(ext string) bool {
134         _, ok := cn.PeerExtensionIDs[ext]
135         return ok
136 }
137
138 func (cn *connection) completedString(t *torrent) string {
139         if cn.PeerPieces == nil && !cn.peerHasAll {
140                 return "?"
141         }
142         return fmt.Sprintf("%d/%d", func() int {
143                 if cn.peerHasAll {
144                         if t.haveInfo() {
145                                 return t.numPieces()
146                         }
147                         return -1
148                 }
149                 ret := 0
150                 for _, b := range cn.PeerPieces {
151                         if b {
152                                 ret++
153                         }
154                 }
155                 return ret
156         }(), func() int {
157                 if cn.peerHasAll || cn.PeerPieces == nil {
158                         if t.haveInfo() {
159                                 return t.numPieces()
160                         }
161                         return -1
162                 }
163                 return len(cn.PeerPieces)
164         }())
165 }
166
167 // Correct the PeerPieces slice length. Return false if the existing slice is
168 // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
169 // messages.
170 func (cn *connection) setNumPieces(num int) error {
171         if cn.peerHasAll {
172                 return nil
173         }
174         if cn.PeerPieces == nil {
175                 return nil
176         }
177         if len(cn.PeerPieces) == num {
178         } else if len(cn.PeerPieces) < num {
179                 cn.PeerPieces = append(cn.PeerPieces, make([]bool, num-len(cn.PeerPieces))...)
180         } else if len(cn.PeerPieces) <= (num+7)/8*8 {
181                 for _, have := range cn.PeerPieces[num:] {
182                         if have {
183                                 return errors.New("peer has invalid piece")
184                         }
185                 }
186                 cn.PeerPieces = cn.PeerPieces[:num]
187         } else {
188                 return fmt.Errorf("peer bitfield is excessively long: expected %d, have %d", num, len(cn.PeerPieces))
189         }
190         if len(cn.PeerPieces) != num {
191                 panic("wat")
192         }
193         return nil
194 }
195
196 func eventAgeString(t time.Time) string {
197         if t.IsZero() {
198                 return "never"
199         }
200         return fmt.Sprintf("%.2fs ago", time.Now().Sub(t).Seconds())
201 }
202
203 // Inspired by https://trac.transmissionbt.com/wiki/PeerStatusText
204 func (cn *connection) statusFlags() (ret string) {
205         c := func(b byte) {
206                 ret += string([]byte{b})
207         }
208         if cn.Interested {
209                 c('i')
210         }
211         if cn.Choked {
212                 c('c')
213         }
214         c('-')
215         if cn.encrypted {
216                 c('E')
217         }
218         if cn.Discovery != 0 {
219                 c(byte(cn.Discovery))
220         }
221         if cn.uTP {
222                 c('T')
223         }
224         c('-')
225         if cn.PeerInterested {
226                 c('i')
227         }
228         if cn.PeerChoked {
229                 c('c')
230         }
231         return
232 }
233
234 func (cn *connection) WriteStatus(w io.Writer, t *torrent) {
235         // \t isn't preserved in <pre> blocks?
236         fmt.Fprintf(w, "%q: %s-%s\n", cn.PeerID, cn.localAddr(), cn.remoteAddr())
237         fmt.Fprintf(w, "    last msg: %s, connected: %s, last useful chunk: %s\n",
238                 eventAgeString(cn.lastMessageReceived),
239                 eventAgeString(cn.completedHandshake),
240                 eventAgeString(cn.lastUsefulChunkReceived))
241         fmt.Fprintf(w, "    %s completed, good chunks: %d/%d reqs: %d-%d, flags: %s\n",
242                 cn.completedString(t),
243                 cn.UsefulChunksReceived,
244                 cn.UnwantedChunksReceived+cn.UsefulChunksReceived,
245                 len(cn.Requests),
246                 len(cn.PeerRequests),
247                 cn.statusFlags())
248 }
249
250 func (c *connection) Close() {
251         c.mu.Lock()
252         defer c.mu.Unlock()
253         select {
254         case <-c.closing:
255                 return
256         default:
257         }
258         close(c.closing)
259         // TODO: This call blocks sometimes, why?
260         go c.conn.Close()
261 }
262
263 func (c *connection) PeerHasPiece(piece int) bool {
264         if c.peerHasAll {
265                 return true
266         }
267         if piece >= len(c.PeerPieces) {
268                 return false
269         }
270         return c.PeerPieces[piece]
271 }
272
273 func (c *connection) Post(msg pp.Message) {
274         select {
275         case c.post <- msg:
276         case <-c.closing:
277         }
278 }
279
280 func (c *connection) RequestPending(r request) bool {
281         _, ok := c.Requests[r]
282         return ok
283 }
284
285 func (c *connection) requestMetadataPiece(index int) {
286         eID := c.PeerExtensionIDs["ut_metadata"]
287         if eID == 0 {
288                 return
289         }
290         if index < len(c.metadataRequests) && c.metadataRequests[index] {
291                 return
292         }
293         c.Post(pp.Message{
294                 Type:       pp.Extended,
295                 ExtendedID: eID,
296                 ExtendedPayload: func() []byte {
297                         b, err := bencode.Marshal(map[string]int{
298                                 "msg_type": pp.RequestMetadataExtensionMsgType,
299                                 "piece":    index,
300                         })
301                         if err != nil {
302                                 panic(err)
303                         }
304                         return b
305                 }(),
306         })
307         for index >= len(c.metadataRequests) {
308                 c.metadataRequests = append(c.metadataRequests, false)
309         }
310         c.metadataRequests[index] = true
311 }
312
313 func (c *connection) requestedMetadataPiece(index int) bool {
314         return index < len(c.metadataRequests) && c.metadataRequests[index]
315 }
316
317 // Returns true if more requests can be sent.
318 func (c *connection) Request(chunk request) bool {
319         if len(c.Requests) >= c.PeerMaxRequests {
320                 return false
321         }
322         if !c.PeerHasPiece(int(chunk.Index)) {
323                 return true
324         }
325         if c.RequestPending(chunk) {
326                 return true
327         }
328         c.SetInterested(true)
329         if c.PeerChoked {
330                 return false
331         }
332         if c.Requests == nil {
333                 c.Requests = make(map[request]struct{}, c.PeerMaxRequests)
334         }
335         c.Requests[chunk] = struct{}{}
336         c.requestsLowWater = len(c.Requests) / 2
337         c.Post(pp.Message{
338                 Type:   pp.Request,
339                 Index:  chunk.Index,
340                 Begin:  chunk.Begin,
341                 Length: chunk.Length,
342         })
343         return true
344 }
345
346 // Returns true if an unsatisfied request was canceled.
347 func (c *connection) Cancel(r request) bool {
348         if c.Requests == nil {
349                 return false
350         }
351         if _, ok := c.Requests[r]; !ok {
352                 return false
353         }
354         delete(c.Requests, r)
355         c.Post(pp.Message{
356                 Type:   pp.Cancel,
357                 Index:  r.Index,
358                 Begin:  r.Begin,
359                 Length: r.Length,
360         })
361         return true
362 }
363
364 // Returns true if an unsatisfied request was canceled.
365 func (c *connection) PeerCancel(r request) bool {
366         if c.PeerRequests == nil {
367                 return false
368         }
369         if _, ok := c.PeerRequests[r]; !ok {
370                 return false
371         }
372         delete(c.PeerRequests, r)
373         return true
374 }
375
376 func (c *connection) Choke() {
377         if c.Choked {
378                 return
379         }
380         c.Post(pp.Message{
381                 Type: pp.Choke,
382         })
383         c.Choked = true
384 }
385
386 func (c *connection) Unchoke() {
387         if !c.Choked {
388                 return
389         }
390         c.Post(pp.Message{
391                 Type: pp.Unchoke,
392         })
393         c.Choked = false
394 }
395
396 func (c *connection) SetInterested(interested bool) {
397         if c.Interested == interested {
398                 return
399         }
400         c.Post(pp.Message{
401                 Type: func() pp.MessageType {
402                         if interested {
403                                 return pp.Interested
404                         } else {
405                                 return pp.NotInterested
406                         }
407                 }(),
408         })
409         c.Interested = interested
410 }
411
412 // Writes buffers to the socket from the write channel.
413 func (conn *connection) writer() {
414         // Reduce write syscalls.
415         buf := bufio.NewWriterSize(conn.rw, 0x8000) // 32 KiB
416         // Receives when buf is not empty.
417         notEmpty := make(chan struct{}, 1)
418         for {
419                 if buf.Buffered() != 0 {
420                         // Make sure it's receivable.
421                         select {
422                         case notEmpty <- struct{}{}:
423                         default:
424                         }
425                 }
426                 select {
427                 case b, ok := <-conn.writeCh:
428                         if !ok {
429                                 return
430                         }
431                         _, err := buf.Write(b)
432                         if err != nil {
433                                 conn.Close()
434                                 return
435                         }
436                 case <-conn.closing:
437                         return
438                 case <-notEmpty:
439                         err := buf.Flush()
440                         if err != nil {
441                                 return
442                         }
443                 }
444         }
445 }
446
447 func (conn *connection) writeOptimizer(keepAliveDelay time.Duration) {
448         defer close(conn.writeCh) // Responsible for notifying downstream routines.
449         pending := list.New()     // Message queue.
450         var nextWrite []byte      // Set to nil if we need to need to marshal the next message.
451         timer := time.NewTimer(keepAliveDelay)
452         defer timer.Stop()
453         lastWrite := time.Now()
454         for {
455                 write := conn.writeCh // Set to nil if there's nothing to write.
456                 if pending.Len() == 0 {
457                         write = nil
458                 } else if nextWrite == nil {
459                         var err error
460                         nextWrite, err = pending.Front().Value.(encoding.BinaryMarshaler).MarshalBinary()
461                         if err != nil {
462                                 panic(err)
463                         }
464                 }
465         event:
466                 select {
467                 case <-timer.C:
468                         if pending.Len() != 0 {
469                                 break
470                         }
471                         keepAliveTime := lastWrite.Add(keepAliveDelay)
472                         if time.Now().Before(keepAliveTime) {
473                                 timer.Reset(keepAliveTime.Sub(time.Now()))
474                                 break
475                         }
476                         pending.PushBack(pp.Message{Keepalive: true})
477                 case msg, ok := <-conn.post:
478                         if !ok {
479                                 return
480                         }
481                         if msg.Type == pp.Cancel {
482                                 for e := pending.Back(); e != nil; e = e.Prev() {
483                                         elemMsg := e.Value.(pp.Message)
484                                         if elemMsg.Type == pp.Request && msg.Index == elemMsg.Index && msg.Begin == elemMsg.Begin && msg.Length == elemMsg.Length {
485                                                 pending.Remove(e)
486                                                 optimizedCancels.Add(1)
487                                                 break event
488                                         }
489                                 }
490                         }
491                         pending.PushBack(msg)
492                 case write <- nextWrite:
493                         pending.Remove(pending.Front())
494                         nextWrite = nil
495                         lastWrite = time.Now()
496                         if pending.Len() == 0 {
497                                 timer.Reset(keepAliveDelay)
498                         }
499                 case <-conn.closing:
500                         return
501                 }
502         }
503 }