]> Sergey Matveev's repositories - btrtrc.git/blob - connection.go
pieceordering: Rename RemovePiece->DeletePiece and complete tests
[btrtrc.git] / connection.go
1 package torrent
2
3 import (
4         "bufio"
5         "container/list"
6         "encoding"
7         "errors"
8         "expvar"
9         "fmt"
10         "io"
11         "net"
12         "sync"
13         "time"
14
15         "bitbucket.org/anacrolix/go.torrent/internal/pieceordering"
16         pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
17 )
18
19 var optimizedCancels = expvar.NewInt("optimizedCancels")
20
21 type peerSource byte
22
23 const (
24         peerSourceIncoming = 'I'
25         peerSourceDHT      = 'H'
26         peerSourcePEX      = 'X'
27 )
28
29 // Maintains the state of a connection with a peer.
30 type connection struct {
31         Socket    net.Conn
32         Discovery peerSource
33         uTP       bool
34         closing   chan struct{}
35         mu        sync.Mutex // Only for closing.
36         post      chan pp.Message
37         writeCh   chan []byte
38
39         piecePriorities   []int
40         pieceRequestOrder *pieceordering.Instance
41
42         UnwantedChunksReceived int
43         UsefulChunksReceived   int
44
45         lastMessageReceived     time.Time
46         completedHandshake      time.Time
47         lastUsefulChunkReceived time.Time
48
49         // Stuff controlled by the local peer.
50         Interested bool
51         Choked     bool
52         Requests   map[request]struct{}
53
54         // Stuff controlled by the remote peer.
55         PeerID             [20]byte
56         PeerInterested     bool
57         PeerChoked         bool
58         PeerRequests       map[request]struct{}
59         PeerExtensionBytes peerExtensionBytes
60         // Whether the peer has the given piece. nil if they've not sent any
61         // related messages yet.
62         PeerPieces       []bool
63         PeerMaxRequests  int // Maximum pending requests the peer allows.
64         PeerExtensionIDs map[string]int64
65         PeerClientName   string
66 }
67
68 func newConnection(sock net.Conn, peb peerExtensionBytes, peerID [20]byte, uTP bool) (c *connection) {
69         c = &connection{
70                 Socket: sock,
71                 uTP:    uTP,
72
73                 Choked:             true,
74                 PeerChoked:         true,
75                 PeerMaxRequests:    250,
76                 PeerExtensionBytes: peb,
77                 PeerID:             peerID,
78
79                 closing: make(chan struct{}),
80                 writeCh: make(chan []byte),
81                 post:    make(chan pp.Message),
82
83                 completedHandshake: time.Now(),
84         }
85         go c.writer()
86         go c.writeOptimizer(time.Minute)
87         return
88 }
89
90 func (cn *connection) pendPiece(piece int, priority piecePriority) {
91         if priority == piecePriorityNone {
92                 cn.pieceRequestOrder.DeletePiece(piece)
93                 return
94         }
95         key := cn.piecePriorities[piece]
96         // TODO: Have some kind of overlap here, so there's some probabilistic
97         // favouring of higher priority pieces.
98         switch priority {
99         case piecePriorityReadahead:
100                 key -= len(cn.piecePriorities)
101         case piecePriorityNext:
102                 key -= 2 * len(cn.piecePriorities)
103         case piecePriorityNow:
104                 key -= 3 * len(cn.piecePriorities)
105         }
106         cn.pieceRequestOrder.SetPiece(piece, key)
107 }
108
109 func (cn *connection) supportsExtension(ext string) bool {
110         _, ok := cn.PeerExtensionIDs[ext]
111         return ok
112 }
113
114 func (cn *connection) completedString() string {
115         if cn.PeerPieces == nil {
116                 return "?"
117         }
118         // f := float32(cn.piecesPeerHasCount()) / float32(cn.totalPiecesCount())
119         // return fmt.Sprintf("%d%%", int(f*100))
120         return fmt.Sprintf("%d/%d", cn.piecesPeerHasCount(), cn.totalPiecesCount())
121 }
122
123 func (cn *connection) totalPiecesCount() int {
124         return len(cn.PeerPieces)
125 }
126
127 func (cn *connection) piecesPeerHasCount() (count int) {
128         for _, has := range cn.PeerPieces {
129                 if has {
130                         count++
131                 }
132         }
133         return
134 }
135
136 // Correct the PeerPieces slice length. Return false if the existing slice is
137 // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
138 // messages.
139 func (cn *connection) setNumPieces(num int) error {
140         if cn.PeerPieces == nil {
141                 return nil
142         }
143         if len(cn.PeerPieces) == num {
144         } else if len(cn.PeerPieces) < num {
145                 cn.PeerPieces = append(cn.PeerPieces, make([]bool, num-len(cn.PeerPieces))...)
146         } else if len(cn.PeerPieces) <= (num+7)/8*8 {
147                 for _, have := range cn.PeerPieces[num:] {
148                         if have {
149                                 return errors.New("peer has invalid piece")
150                         }
151                 }
152                 cn.PeerPieces = cn.PeerPieces[:num]
153         } else {
154                 return fmt.Errorf("peer bitfield is excessively long: expected %d, have %d", num, len(cn.PeerPieces))
155         }
156         if len(cn.PeerPieces) != num {
157                 panic("wat")
158         }
159         return nil
160 }
161
162 func eventAgeString(t time.Time) string {
163         if t.IsZero() {
164                 return "never"
165         }
166         return fmt.Sprintf("%.2fs ago", time.Now().Sub(t).Seconds())
167 }
168
169 func (cn *connection) WriteStatus(w io.Writer) {
170         fmt.Fprintf(w, "%-90s: %s completed, good chunks: %d/%d reqs: %d-%d, last msg: %s, connected: %s, last useful chunk: %s, flags: ", fmt.Sprintf("%q: %s-%s", cn.PeerID, cn.Socket.LocalAddr(), cn.Socket.RemoteAddr()), cn.completedString(), cn.UsefulChunksReceived, cn.UnwantedChunksReceived+cn.UsefulChunksReceived, len(cn.Requests), len(cn.PeerRequests), eventAgeString(cn.lastMessageReceived), eventAgeString(cn.completedHandshake), eventAgeString(cn.lastUsefulChunkReceived))
171         c := func(b byte) {
172                 fmt.Fprintf(w, "%c", b)
173         }
174         // Inspired by https://trac.transmissionbt.com/wiki/PeerStatusText
175         if len(cn.Requests) != 0 {
176                 c('D')
177         }
178         if cn.PeerChoked && cn.Interested {
179                 c('d')
180         }
181         if !cn.Choked {
182                 if cn.PeerInterested {
183                         c('U')
184                 } else {
185                         c('u')
186                 }
187         }
188         if cn.Discovery != 0 {
189                 c(byte(cn.Discovery))
190         }
191         if cn.uTP {
192                 c('T')
193         }
194         fmt.Fprintln(w)
195 }
196
197 func (c *connection) Close() {
198         c.mu.Lock()
199         defer c.mu.Unlock()
200         select {
201         case <-c.closing:
202                 return
203         default:
204         }
205         close(c.closing)
206         // TODO: This call blocks sometimes, why?
207         go c.Socket.Close()
208 }
209
210 func (c *connection) PeerHasPiece(index pp.Integer) bool {
211         if c.PeerPieces == nil {
212                 return false
213         }
214         if int(index) >= len(c.PeerPieces) {
215                 return false
216         }
217         return c.PeerPieces[index]
218 }
219
220 func (c *connection) Post(msg pp.Message) {
221         select {
222         case c.post <- msg:
223         case <-c.closing:
224         }
225 }
226
227 func (c *connection) RequestPending(r request) bool {
228         _, ok := c.Requests[r]
229         return ok
230 }
231
232 // Returns true if more requests can be sent.
233 func (c *connection) Request(chunk request) bool {
234         if len(c.Requests) >= c.PeerMaxRequests {
235                 return false
236         }
237         if !c.PeerHasPiece(chunk.Index) {
238                 return true
239         }
240         if c.RequestPending(chunk) {
241                 return true
242         }
243         c.SetInterested(true)
244         if c.PeerChoked {
245                 return false
246         }
247         if c.Requests == nil {
248                 c.Requests = make(map[request]struct{}, c.PeerMaxRequests)
249         }
250         c.Requests[chunk] = struct{}{}
251         c.Post(pp.Message{
252                 Type:   pp.Request,
253                 Index:  chunk.Index,
254                 Begin:  chunk.Begin,
255                 Length: chunk.Length,
256         })
257         return true
258 }
259
260 // Returns true if an unsatisfied request was canceled.
261 func (c *connection) Cancel(r request) bool {
262         if c.Requests == nil {
263                 return false
264         }
265         if _, ok := c.Requests[r]; !ok {
266                 return false
267         }
268         delete(c.Requests, r)
269         c.Post(pp.Message{
270                 Type:   pp.Cancel,
271                 Index:  r.Index,
272                 Begin:  r.Begin,
273                 Length: r.Length,
274         })
275         return true
276 }
277
278 // Returns true if an unsatisfied request was canceled.
279 func (c *connection) PeerCancel(r request) bool {
280         if c.PeerRequests == nil {
281                 return false
282         }
283         if _, ok := c.PeerRequests[r]; !ok {
284                 return false
285         }
286         delete(c.PeerRequests, r)
287         return true
288 }
289
290 func (c *connection) Choke() {
291         if c.Choked {
292                 return
293         }
294         c.Post(pp.Message{
295                 Type: pp.Choke,
296         })
297         c.Choked = true
298 }
299
300 func (c *connection) Unchoke() {
301         if !c.Choked {
302                 return
303         }
304         c.Post(pp.Message{
305                 Type: pp.Unchoke,
306         })
307         c.Choked = false
308 }
309
310 func (c *connection) SetInterested(interested bool) {
311         if c.Interested == interested {
312                 return
313         }
314         c.Post(pp.Message{
315                 Type: func() pp.MessageType {
316                         if interested {
317                                 return pp.Interested
318                         } else {
319                                 return pp.NotInterested
320                         }
321                 }(),
322         })
323         c.Interested = interested
324 }
325
326 var (
327         // Four consecutive zero bytes that comprise a keep alive on the wire.
328         keepAliveBytes [4]byte
329 )
330
331 // Writes buffers to the socket from the write channel.
332 func (conn *connection) writer() {
333         // Reduce write syscalls.
334         buf := bufio.NewWriterSize(conn.Socket, 0x8000) // 32 KiB
335         // Returns immediately if the buffer contains data.
336         notEmpty := make(chan struct{}, 1)
337         for {
338                 if buf.Buffered() != 0 {
339                         select {
340                         case notEmpty <- struct{}{}:
341                         default:
342                         }
343                 }
344                 select {
345                 case b, ok := <-conn.writeCh:
346                         if !ok {
347                                 return
348                         }
349                         _, err := buf.Write(b)
350                         if err != nil {
351                                 conn.Close()
352                                 return
353                         }
354                 case <-conn.closing:
355                         return
356                 case <-notEmpty:
357                         err := buf.Flush()
358                         if err != nil {
359                                 return
360                         }
361                 }
362         }
363 }
364
365 func (conn *connection) writeOptimizer(keepAliveDelay time.Duration) {
366         defer close(conn.writeCh) // Responsible for notifying downstream routines.
367         pending := list.New()     // Message queue.
368         var nextWrite []byte      // Set to nil if we need to need to marshal the next message.
369         timer := time.NewTimer(keepAliveDelay)
370         defer timer.Stop()
371         lastWrite := time.Now()
372         for {
373                 write := conn.writeCh // Set to nil if there's nothing to write.
374                 if pending.Len() == 0 {
375                         write = nil
376                 } else if nextWrite == nil {
377                         var err error
378                         nextWrite, err = pending.Front().Value.(encoding.BinaryMarshaler).MarshalBinary()
379                         if err != nil {
380                                 panic(err)
381                         }
382                 }
383         event:
384                 select {
385                 case <-timer.C:
386                         if pending.Len() != 0 {
387                                 break
388                         }
389                         keepAliveTime := lastWrite.Add(keepAliveDelay)
390                         if time.Now().Before(keepAliveTime) {
391                                 timer.Reset(keepAliveTime.Sub(time.Now()))
392                                 break
393                         }
394                         pending.PushBack(pp.Message{Keepalive: true})
395                 case msg, ok := <-conn.post:
396                         if !ok {
397                                 return
398                         }
399                         if msg.Type == pp.Cancel {
400                                 for e := pending.Back(); e != nil; e = e.Prev() {
401                                         elemMsg := e.Value.(pp.Message)
402                                         if elemMsg.Type == pp.Request && msg.Index == elemMsg.Index && msg.Begin == elemMsg.Begin && msg.Length == elemMsg.Length {
403                                                 pending.Remove(e)
404                                                 optimizedCancels.Add(1)
405                                                 break event
406                                         }
407                                 }
408                         }
409                         pending.PushBack(msg)
410                 case write <- nextWrite:
411                         pending.Remove(pending.Front())
412                         nextWrite = nil
413                         lastWrite = time.Now()
414                         if pending.Len() == 0 {
415                                 timer.Reset(keepAliveDelay)
416                         }
417                 case <-conn.closing:
418                         return
419                 }
420         }
421 }