]> Sergey Matveev's repositories - btrtrc.git/blob - connection.go
464f14be3dea2c7fd8c25a8a17fde3f06dac608f
[btrtrc.git] / connection.go
1 package torrent
2
3 import (
4         "bufio"
5         "container/list"
6         "encoding"
7         "errors"
8         "expvar"
9         "fmt"
10         "io"
11         "math/rand"
12         "net"
13         "sync"
14         "time"
15
16         pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
17 )
18
19 var optimizedCancels = expvar.NewInt("optimizedCancels")
20
21 type peerSource byte
22
23 const (
24         peerSourceIncoming = 'I'
25         peerSourceDHT      = 'H'
26         peerSourcePEX      = 'X'
27 )
28
29 // Maintains the state of a connection with a peer.
30 type connection struct {
31         Socket     net.Conn
32         Discovery  peerSource
33         uTP        bool
34         closing    chan struct{}
35         mu         sync.Mutex // Only for closing.
36         post       chan pp.Message
37         writeCh    chan []byte
38         pieceOrder []int
39
40         UnwantedChunksReceived int
41         UsefulChunksReceived   int
42
43         lastMessageReceived     time.Time
44         completedHandshake      time.Time
45         lastUsefulChunkReceived time.Time
46
47         // Stuff controlled by the local peer.
48         Interested bool
49         Choked     bool
50         Requests   map[request]struct{}
51
52         // Stuff controlled by the remote peer.
53         PeerID             [20]byte
54         PeerInterested     bool
55         PeerChoked         bool
56         PeerRequests       map[request]struct{}
57         PeerExtensionBytes peerExtensionBytes
58         // Whether the peer has the given piece. nil if they've not sent any
59         // related messages yet.
60         PeerPieces       []bool
61         PeerMaxRequests  int // Maximum pending requests the peer allows.
62         PeerExtensionIDs map[string]int64
63         PeerClientName   string
64 }
65
66 func newConnection(sock net.Conn, peb peerExtensionBytes, peerID [20]byte, uTP bool) (c *connection) {
67         c = &connection{
68                 Socket: sock,
69                 uTP:    uTP,
70
71                 Choked:             true,
72                 PeerChoked:         true,
73                 PeerMaxRequests:    250,
74                 PeerExtensionBytes: peb,
75                 PeerID:             peerID,
76
77                 closing: make(chan struct{}),
78                 writeCh: make(chan []byte),
79                 post:    make(chan pp.Message),
80
81                 completedHandshake: time.Now(),
82         }
83         go c.writer()
84         go c.writeOptimizer(time.Minute)
85         return
86 }
87
88 func (cn *connection) completedString() string {
89         if cn.PeerPieces == nil {
90                 return "?"
91         }
92         // f := float32(cn.piecesPeerHasCount()) / float32(cn.totalPiecesCount())
93         // return fmt.Sprintf("%d%%", int(f*100))
94         return fmt.Sprintf("%d/%d", cn.piecesPeerHasCount(), cn.totalPiecesCount())
95 }
96
97 func (cn *connection) totalPiecesCount() int {
98         return len(cn.PeerPieces)
99 }
100
101 func (cn *connection) piecesPeerHasCount() (count int) {
102         for _, has := range cn.PeerPieces {
103                 if has {
104                         count++
105                 }
106         }
107         return
108 }
109
110 // Correct the PeerPieces slice length. Return false if the existing slice is
111 // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
112 // messages.
113 func (cn *connection) setNumPieces(num int) error {
114         cn.initPieceOrder(num)
115         if cn.PeerPieces == nil {
116                 return nil
117         }
118         if len(cn.PeerPieces) == num {
119         } else if len(cn.PeerPieces) < num {
120                 cn.PeerPieces = append(cn.PeerPieces, make([]bool, num-len(cn.PeerPieces))...)
121         } else if len(cn.PeerPieces) <= (num+7)/8*8 {
122                 for _, have := range cn.PeerPieces[num:] {
123                         if have {
124                                 return errors.New("peer has invalid piece")
125                         }
126                 }
127                 cn.PeerPieces = cn.PeerPieces[:num]
128         } else {
129                 return fmt.Errorf("peer bitfield is excessively long: expected %d, have %d", num, len(cn.PeerPieces))
130         }
131         if len(cn.PeerPieces) != num {
132                 panic("wat")
133         }
134         return nil
135 }
136
137 func (cn *connection) initPieceOrder(numPieces int) {
138         if cn.pieceOrder == nil {
139                 cn.pieceOrder = rand.Perm(numPieces)
140         }
141         if len(cn.pieceOrder) != numPieces {
142                 panic("piece order initialized with wrong length")
143         }
144 }
145
146 func eventAgeString(t time.Time) string {
147         if t.IsZero() {
148                 return "never"
149         }
150         return fmt.Sprintf("%.2fs ago", time.Now().Sub(t).Seconds())
151 }
152
153 func (cn *connection) WriteStatus(w io.Writer) {
154         fmt.Fprintf(w, "%-90s: %s completed, good chunks: %d/%d reqs: %d-%d, last msg: %s, connected: %s, last useful chunk: %s, flags: ", fmt.Sprintf("%q: %s-%s", cn.PeerID, cn.Socket.LocalAddr(), cn.Socket.RemoteAddr()), cn.completedString(), cn.UsefulChunksReceived, cn.UnwantedChunksReceived+cn.UsefulChunksReceived, len(cn.Requests), len(cn.PeerRequests), eventAgeString(cn.lastMessageReceived), eventAgeString(cn.completedHandshake), eventAgeString(cn.lastUsefulChunkReceived))
155         c := func(b byte) {
156                 fmt.Fprintf(w, "%c", b)
157         }
158         // Inspired by https://trac.transmissionbt.com/wiki/PeerStatusText
159         if len(cn.Requests) != 0 {
160                 c('D')
161         }
162         if cn.PeerChoked && cn.Interested {
163                 c('d')
164         }
165         if !cn.Choked {
166                 if cn.PeerInterested {
167                         c('U')
168                 } else {
169                         c('u')
170                 }
171         }
172         if cn.Discovery != 0 {
173                 c(byte(cn.Discovery))
174         }
175         if cn.uTP {
176                 c('T')
177         }
178         fmt.Fprintln(w)
179 }
180
181 func (c *connection) Close() {
182         c.mu.Lock()
183         defer c.mu.Unlock()
184         select {
185         case <-c.closing:
186                 return
187         default:
188         }
189         close(c.closing)
190         // TODO: This call blocks sometimes, why?
191         go c.Socket.Close()
192 }
193
194 func (c *connection) PeerHasPiece(index pp.Integer) bool {
195         if c.PeerPieces == nil {
196                 return false
197         }
198         if int(index) >= len(c.PeerPieces) {
199                 return false
200         }
201         return c.PeerPieces[index]
202 }
203
204 func (c *connection) Post(msg pp.Message) {
205         select {
206         case c.post <- msg:
207         case <-c.closing:
208         }
209 }
210
211 func (c *connection) RequestPending(r request) bool {
212         _, ok := c.Requests[r]
213         return ok
214 }
215
216 // Returns true if more requests can be sent.
217 func (c *connection) Request(chunk request) bool {
218         if len(c.Requests) >= c.PeerMaxRequests {
219                 return false
220         }
221         if !c.PeerHasPiece(chunk.Index) {
222                 return true
223         }
224         if c.RequestPending(chunk) {
225                 return true
226         }
227         c.SetInterested(true)
228         if c.PeerChoked {
229                 return false
230         }
231         if c.Requests == nil {
232                 c.Requests = make(map[request]struct{}, c.PeerMaxRequests)
233         }
234         c.Requests[chunk] = struct{}{}
235         c.Post(pp.Message{
236                 Type:   pp.Request,
237                 Index:  chunk.Index,
238                 Begin:  chunk.Begin,
239                 Length: chunk.Length,
240         })
241         return true
242 }
243
244 // Returns true if an unsatisfied request was canceled.
245 func (c *connection) Cancel(r request) bool {
246         if c.Requests == nil {
247                 return false
248         }
249         if _, ok := c.Requests[r]; !ok {
250                 return false
251         }
252         delete(c.Requests, r)
253         c.Post(pp.Message{
254                 Type:   pp.Cancel,
255                 Index:  r.Index,
256                 Begin:  r.Begin,
257                 Length: r.Length,
258         })
259         return true
260 }
261
262 // Returns true if an unsatisfied request was canceled.
263 func (c *connection) PeerCancel(r request) bool {
264         if c.PeerRequests == nil {
265                 return false
266         }
267         if _, ok := c.PeerRequests[r]; !ok {
268                 return false
269         }
270         delete(c.PeerRequests, r)
271         return true
272 }
273
274 func (c *connection) Choke() {
275         if c.Choked {
276                 return
277         }
278         c.Post(pp.Message{
279                 Type: pp.Choke,
280         })
281         c.Choked = true
282 }
283
284 func (c *connection) Unchoke() {
285         if !c.Choked {
286                 return
287         }
288         c.Post(pp.Message{
289                 Type: pp.Unchoke,
290         })
291         c.Choked = false
292 }
293
294 func (c *connection) SetInterested(interested bool) {
295         if c.Interested == interested {
296                 return
297         }
298         c.Post(pp.Message{
299                 Type: func() pp.MessageType {
300                         if interested {
301                                 return pp.Interested
302                         } else {
303                                 return pp.NotInterested
304                         }
305                 }(),
306         })
307         c.Interested = interested
308 }
309
310 var (
311         // Four consecutive zero bytes that comprise a keep alive on the wire.
312         keepAliveBytes [4]byte
313 )
314
315 // Writes buffers to the socket from the write channel.
316 func (conn *connection) writer() {
317         // Reduce write syscalls.
318         buf := bufio.NewWriterSize(conn.Socket, 0x8000) // 32 KiB
319         // Returns immediately if the buffer contains data.
320         notEmpty := make(chan struct{}, 1)
321         for {
322                 if buf.Buffered() != 0 {
323                         select {
324                         case notEmpty <- struct{}{}:
325                         default:
326                         }
327                 }
328                 select {
329                 case b, ok := <-conn.writeCh:
330                         if !ok {
331                                 return
332                         }
333                         _, err := buf.Write(b)
334                         if err != nil {
335                                 conn.Close()
336                                 return
337                         }
338                 case <-conn.closing:
339                         return
340                 case <-notEmpty:
341                         err := buf.Flush()
342                         if err != nil {
343                                 return
344                         }
345                 }
346         }
347 }
348
349 func (conn *connection) writeOptimizer(keepAliveDelay time.Duration) {
350         defer close(conn.writeCh) // Responsible for notifying downstream routines.
351         pending := list.New()     // Message queue.
352         var nextWrite []byte      // Set to nil if we need to need to marshal the next message.
353         timer := time.NewTimer(keepAliveDelay)
354         defer timer.Stop()
355         lastWrite := time.Now()
356         for {
357                 write := conn.writeCh // Set to nil if there's nothing to write.
358                 if pending.Len() == 0 {
359                         write = nil
360                 } else if nextWrite == nil {
361                         var err error
362                         nextWrite, err = pending.Front().Value.(encoding.BinaryMarshaler).MarshalBinary()
363                         if err != nil {
364                                 panic(err)
365                         }
366                 }
367         event:
368                 select {
369                 case <-timer.C:
370                         if pending.Len() != 0 {
371                                 break
372                         }
373                         keepAliveTime := lastWrite.Add(keepAliveDelay)
374                         if time.Now().Before(keepAliveTime) {
375                                 timer.Reset(keepAliveTime.Sub(time.Now()))
376                                 break
377                         }
378                         pending.PushBack(pp.Message{Keepalive: true})
379                 case msg, ok := <-conn.post:
380                         if !ok {
381                                 return
382                         }
383                         if msg.Type == pp.Cancel {
384                                 for e := pending.Back(); e != nil; e = e.Prev() {
385                                         elemMsg := e.Value.(pp.Message)
386                                         if elemMsg.Type == pp.Request && msg.Index == elemMsg.Index && msg.Begin == elemMsg.Begin && msg.Length == elemMsg.Length {
387                                                 pending.Remove(e)
388                                                 optimizedCancels.Add(1)
389                                                 break event
390                                         }
391                                 }
392                         }
393                         pending.PushBack(msg)
394                 case write <- nextWrite:
395                         pending.Remove(pending.Front())
396                         nextWrite = nil
397                         lastWrite = time.Now()
398                         if pending.Len() == 0 {
399                                 timer.Reset(keepAliveDelay)
400                         }
401                 case <-conn.closing:
402                         return
403                 }
404         }
405 }