]> Sergey Matveev's repositories - btrtrc.git/blob - connection.go
Use rogpeppe's sortimports to fix this goimports ordering madness
[btrtrc.git] / connection.go
1 package torrent
2
3 import (
4         "bufio"
5         "container/list"
6         "encoding"
7         "errors"
8         "expvar"
9         "fmt"
10         "io"
11         "net"
12         "sync"
13         "time"
14
15         "github.com/anacrolix/libtorgo/bencode"
16
17         "github.com/anacrolix/torrent/internal/pieceordering"
18         pp "github.com/anacrolix/torrent/peer_protocol"
19 )
20
21 var optimizedCancels = expvar.NewInt("optimizedCancels")
22
23 type peerSource byte
24
25 const (
26         peerSourceIncoming = 'I'
27         peerSourceDHT      = 'H'
28         peerSourcePEX      = 'X'
29 )
30
31 // Maintains the state of a connection with a peer.
32 type connection struct {
33         conn      net.Conn
34         rw        io.ReadWriter // The real slim shady
35         encrypted bool
36         Discovery peerSource
37         uTP       bool
38         closing   chan struct{}
39         mu        sync.Mutex // Only for closing.
40         post      chan pp.Message
41         writeCh   chan []byte
42
43         // The connections preferred order to download pieces.
44         piecePriorities []int
45         // The piece request order based on piece priorities.
46         pieceRequestOrder *pieceordering.Instance
47
48         UnwantedChunksReceived int
49         UsefulChunksReceived   int
50
51         lastMessageReceived     time.Time
52         completedHandshake      time.Time
53         lastUsefulChunkReceived time.Time
54
55         // Stuff controlled by the local peer.
56         Interested       bool
57         Choked           bool
58         Requests         map[request]struct{}
59         requestsLowWater int
60
61         // Stuff controlled by the remote peer.
62         PeerID             [20]byte
63         PeerInterested     bool
64         PeerChoked         bool
65         PeerRequests       map[request]struct{}
66         PeerExtensionBytes peerExtensionBytes
67         // Whether the peer has the given piece. nil if they've not sent any
68         // related messages yet.
69         PeerPieces []bool
70         peerHasAll bool
71
72         PeerMaxRequests  int // Maximum pending requests the peer allows.
73         PeerExtensionIDs map[string]int64
74         PeerClientName   string
75 }
76
77 func newConnection() (c *connection) {
78         c = &connection{
79                 Choked:          true,
80                 PeerChoked:      true,
81                 PeerMaxRequests: 250,
82
83                 closing: make(chan struct{}),
84                 writeCh: make(chan []byte),
85                 post:    make(chan pp.Message),
86         }
87         return
88 }
89
90 func (cn *connection) remoteAddr() net.Addr {
91         return cn.conn.RemoteAddr()
92 }
93
94 func (cn *connection) localAddr() net.Addr {
95         return cn.conn.LocalAddr()
96 }
97
98 // Adjust piece position in the request order for this connection based on the
99 // given piece priority.
100 func (cn *connection) pendPiece(piece int, priority piecePriority) {
101         if priority == piecePriorityNone {
102                 cn.pieceRequestOrder.DeletePiece(piece)
103                 return
104         }
105         pp := cn.piecePriorities[piece]
106         // Priority goes to Now, then Next in connection order. Then Readahead in
107         // by piece index. Then normal again by connection order.
108         key := func() int {
109                 switch priority {
110                 case piecePriorityNow:
111                         return -3*len(cn.piecePriorities) + 3*pp
112                 case piecePriorityNext:
113                         return -2*len(cn.piecePriorities) + 2*pp
114                 case piecePriorityReadahead:
115                         return -len(cn.piecePriorities) + pp
116                 case piecePriorityNormal:
117                         return pp
118                 default:
119                         panic(priority)
120                 }
121         }()
122         cn.pieceRequestOrder.SetPiece(piece, key)
123 }
124
125 func (cn *connection) supportsExtension(ext string) bool {
126         _, ok := cn.PeerExtensionIDs[ext]
127         return ok
128 }
129
130 func (cn *connection) completedString(t *torrent) string {
131         if cn.PeerPieces == nil && !cn.peerHasAll {
132                 return "?"
133         }
134         return fmt.Sprintf("%d/%d", func() int {
135                 if cn.peerHasAll {
136                         if t.haveInfo() {
137                                 return t.numPieces()
138                         }
139                         return -1
140                 }
141                 ret := 0
142                 for _, b := range cn.PeerPieces {
143                         if b {
144                                 ret++
145                         }
146                 }
147                 return ret
148         }(), func() int {
149                 if cn.peerHasAll || cn.PeerPieces == nil {
150                         if t.haveInfo() {
151                                 return t.numPieces()
152                         }
153                         return -1
154                 }
155                 return len(cn.PeerPieces)
156         }())
157 }
158
159 // Correct the PeerPieces slice length. Return false if the existing slice is
160 // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
161 // messages.
162 func (cn *connection) setNumPieces(num int) error {
163         if cn.peerHasAll {
164                 return nil
165         }
166         if cn.PeerPieces == nil {
167                 return nil
168         }
169         if len(cn.PeerPieces) == num {
170         } else if len(cn.PeerPieces) < num {
171                 cn.PeerPieces = append(cn.PeerPieces, make([]bool, num-len(cn.PeerPieces))...)
172         } else if len(cn.PeerPieces) <= (num+7)/8*8 {
173                 for _, have := range cn.PeerPieces[num:] {
174                         if have {
175                                 return errors.New("peer has invalid piece")
176                         }
177                 }
178                 cn.PeerPieces = cn.PeerPieces[:num]
179         } else {
180                 return fmt.Errorf("peer bitfield is excessively long: expected %d, have %d", num, len(cn.PeerPieces))
181         }
182         if len(cn.PeerPieces) != num {
183                 panic("wat")
184         }
185         return nil
186 }
187
188 func eventAgeString(t time.Time) string {
189         if t.IsZero() {
190                 return "never"
191         }
192         return fmt.Sprintf("%.2fs ago", time.Now().Sub(t).Seconds())
193 }
194
195 // Inspired by https://trac.transmissionbt.com/wiki/PeerStatusText
196 func (cn *connection) statusFlags() (ret string) {
197         c := func(b byte) {
198                 ret += string([]byte{b})
199         }
200         if cn.Interested {
201                 c('i')
202         }
203         if cn.Choked {
204                 c('c')
205         }
206         c('-')
207         if cn.encrypted {
208                 c('E')
209         }
210         if cn.Discovery != 0 {
211                 c(byte(cn.Discovery))
212         }
213         if cn.uTP {
214                 c('T')
215         }
216         c('-')
217         if cn.PeerInterested {
218                 c('i')
219         }
220         if cn.PeerChoked {
221                 c('c')
222         }
223         return
224 }
225
226 func (cn *connection) WriteStatus(w io.Writer, t *torrent) {
227         // \t isn't preserved in <pre> blocks?
228         fmt.Fprintf(w, "%q: %s-%s\n", cn.PeerID, cn.localAddr(), cn.remoteAddr())
229         fmt.Fprintf(w, "    last msg: %s, connected: %s, last useful chunk: %s\n",
230                 eventAgeString(cn.lastMessageReceived),
231                 eventAgeString(cn.completedHandshake),
232                 eventAgeString(cn.lastUsefulChunkReceived))
233         fmt.Fprintf(w, "    %s completed, good chunks: %d/%d reqs: %d-%d, flags: %s\n",
234                 cn.completedString(t),
235                 cn.UsefulChunksReceived,
236                 cn.UnwantedChunksReceived+cn.UsefulChunksReceived,
237                 len(cn.Requests),
238                 len(cn.PeerRequests),
239                 cn.statusFlags())
240 }
241
242 func (c *connection) Close() {
243         c.mu.Lock()
244         defer c.mu.Unlock()
245         select {
246         case <-c.closing:
247                 return
248         default:
249         }
250         close(c.closing)
251         // TODO: This call blocks sometimes, why?
252         go c.conn.Close()
253 }
254
255 func (c *connection) PeerHasPiece(piece int) bool {
256         if c.peerHasAll {
257                 return true
258         }
259         if piece >= len(c.PeerPieces) {
260                 return false
261         }
262         return c.PeerPieces[piece]
263 }
264
265 func (c *connection) Post(msg pp.Message) {
266         select {
267         case c.post <- msg:
268         case <-c.closing:
269         }
270 }
271
272 func (c *connection) RequestPending(r request) bool {
273         _, ok := c.Requests[r]
274         return ok
275 }
276
277 // Returns true if more requests can be sent.
278 func (c *connection) Request(chunk request) bool {
279         if len(c.Requests) >= c.PeerMaxRequests {
280                 return false
281         }
282         if !c.PeerHasPiece(int(chunk.Index)) {
283                 return true
284         }
285         if c.RequestPending(chunk) {
286                 return true
287         }
288         c.SetInterested(true)
289         if c.PeerChoked {
290                 return false
291         }
292         if c.Requests == nil {
293                 c.Requests = make(map[request]struct{}, c.PeerMaxRequests)
294         }
295         c.Requests[chunk] = struct{}{}
296         c.requestsLowWater = len(c.Requests) / 2
297         c.Post(pp.Message{
298                 Type:   pp.Request,
299                 Index:  chunk.Index,
300                 Begin:  chunk.Begin,
301                 Length: chunk.Length,
302         })
303         return true
304 }
305
306 // Returns true if an unsatisfied request was canceled.
307 func (c *connection) Cancel(r request) bool {
308         if c.Requests == nil {
309                 return false
310         }
311         if _, ok := c.Requests[r]; !ok {
312                 return false
313         }
314         delete(c.Requests, r)
315         c.Post(pp.Message{
316                 Type:   pp.Cancel,
317                 Index:  r.Index,
318                 Begin:  r.Begin,
319                 Length: r.Length,
320         })
321         return true
322 }
323
324 // Returns true if an unsatisfied request was canceled.
325 func (c *connection) PeerCancel(r request) bool {
326         if c.PeerRequests == nil {
327                 return false
328         }
329         if _, ok := c.PeerRequests[r]; !ok {
330                 return false
331         }
332         delete(c.PeerRequests, r)
333         return true
334 }
335
336 func (c *connection) Choke() {
337         if c.Choked {
338                 return
339         }
340         c.Post(pp.Message{
341                 Type: pp.Choke,
342         })
343         c.Choked = true
344 }
345
346 func (c *connection) Unchoke() {
347         if !c.Choked {
348                 return
349         }
350         c.Post(pp.Message{
351                 Type: pp.Unchoke,
352         })
353         c.Choked = false
354 }
355
356 func (c *connection) SetInterested(interested bool) {
357         if c.Interested == interested {
358                 return
359         }
360         c.Post(pp.Message{
361                 Type: func() pp.MessageType {
362                         if interested {
363                                 return pp.Interested
364                         } else {
365                                 return pp.NotInterested
366                         }
367                 }(),
368         })
369         c.Interested = interested
370 }
371
372 // Writes buffers to the socket from the write channel.
373 func (conn *connection) writer() {
374         // Reduce write syscalls.
375         buf := bufio.NewWriterSize(conn.rw, 0x8000) // 32 KiB
376         // Receives when buf is not empty.
377         notEmpty := make(chan struct{}, 1)
378         for {
379                 if buf.Buffered() != 0 {
380                         // Make sure it's receivable.
381                         select {
382                         case notEmpty <- struct{}{}:
383                         default:
384                         }
385                 }
386                 select {
387                 case b, ok := <-conn.writeCh:
388                         if !ok {
389                                 return
390                         }
391                         _, err := buf.Write(b)
392                         if err != nil {
393                                 conn.Close()
394                                 return
395                         }
396                 case <-conn.closing:
397                         return
398                 case <-notEmpty:
399                         err := buf.Flush()
400                         if err != nil {
401                                 return
402                         }
403                 }
404         }
405 }
406
407 func (conn *connection) writeOptimizer(keepAliveDelay time.Duration) {
408         defer close(conn.writeCh) // Responsible for notifying downstream routines.
409         pending := list.New()     // Message queue.
410         var nextWrite []byte      // Set to nil if we need to need to marshal the next message.
411         timer := time.NewTimer(keepAliveDelay)
412         defer timer.Stop()
413         lastWrite := time.Now()
414         for {
415                 write := conn.writeCh // Set to nil if there's nothing to write.
416                 if pending.Len() == 0 {
417                         write = nil
418                 } else if nextWrite == nil {
419                         var err error
420                         nextWrite, err = pending.Front().Value.(encoding.BinaryMarshaler).MarshalBinary()
421                         if err != nil {
422                                 panic(err)
423                         }
424                 }
425         event:
426                 select {
427                 case <-timer.C:
428                         if pending.Len() != 0 {
429                                 break
430                         }
431                         keepAliveTime := lastWrite.Add(keepAliveDelay)
432                         if time.Now().Before(keepAliveTime) {
433                                 timer.Reset(keepAliveTime.Sub(time.Now()))
434                                 break
435                         }
436                         pending.PushBack(pp.Message{Keepalive: true})
437                 case msg, ok := <-conn.post:
438                         if !ok {
439                                 return
440                         }
441                         if msg.Type == pp.Cancel {
442                                 for e := pending.Back(); e != nil; e = e.Prev() {
443                                         elemMsg := e.Value.(pp.Message)
444                                         if elemMsg.Type == pp.Request && msg.Index == elemMsg.Index && msg.Begin == elemMsg.Begin && msg.Length == elemMsg.Length {
445                                                 pending.Remove(e)
446                                                 optimizedCancels.Add(1)
447                                                 break event
448                                         }
449                                 }
450                         }
451                         pending.PushBack(msg)
452                 case write <- nextWrite:
453                         pending.Remove(pending.Front())
454                         nextWrite = nil
455                         lastWrite = time.Now()
456                         if pending.Len() == 0 {
457                                 timer.Reset(keepAliveDelay)
458                         }
459                 case <-conn.closing:
460                         return
461                 }
462         }
463 }