]> Sergey Matveev's repositories - btrtrc.git/blob - connection.go
Support individual peer max requests
[btrtrc.git] / connection.go
1 package torrent
2
3 import (
4         "container/list"
5         "encoding"
6         "log"
7         "net"
8         "sync"
9         "time"
10
11         "bitbucket.org/anacrolix/go.torrent/peer_protocol"
12 )
13
14 // Maintains the state of a connection with a peer.
15 type connection struct {
16         Socket net.Conn
17         closed bool
18         mu     sync.Mutex // Only for closing.
19         post   chan encoding.BinaryMarshaler
20         write  chan []byte
21
22         // Stuff controlled by the local peer.
23         Interested bool
24         Choked     bool
25         Requests   map[request]struct{}
26
27         // Stuff controlled by the remote peer.
28         PeerId          [20]byte
29         PeerInterested  bool
30         PeerChoked      bool
31         PeerRequests    map[request]struct{}
32         PeerExtensions  [8]byte
33         PeerPieces      []bool
34         PeerMaxRequests int // Maximum pending requests the peer allows.
35 }
36
37 func (c *connection) Close() {
38         c.mu.Lock()
39         if c.closed {
40                 return
41         }
42         c.Socket.Close()
43         close(c.post)
44         c.closed = true
45         c.mu.Unlock()
46 }
47
48 func (c *connection) getClosed() bool {
49         c.mu.Lock()
50         defer c.mu.Unlock()
51         return c.closed
52 }
53
54 func (c *connection) PeerHasPiece(index peer_protocol.Integer) bool {
55         if c.PeerPieces == nil {
56                 return false
57         }
58         return c.PeerPieces[index]
59 }
60
61 func (c *connection) Post(msg encoding.BinaryMarshaler) {
62         c.post <- msg
63 }
64
65 // Returns true if more requests can be sent.
66 func (c *connection) Request(chunk request) bool {
67         if len(c.Requests) >= c.PeerMaxRequests {
68                 return false
69         }
70         if !c.PeerHasPiece(chunk.Index) {
71                 return true
72         }
73         c.SetInterested(true)
74         if c.PeerChoked {
75                 return false
76         }
77         if _, ok := c.Requests[chunk]; !ok {
78                 c.Post(peer_protocol.Message{
79                         Type:   peer_protocol.Request,
80                         Index:  chunk.Index,
81                         Begin:  chunk.Begin,
82                         Length: chunk.Length,
83                 })
84         }
85         if c.Requests == nil {
86                 c.Requests = make(map[request]struct{}, c.PeerMaxRequests)
87         }
88         c.Requests[chunk] = struct{}{}
89         return true
90 }
91
92 // Returns true if an unsatisfied request was canceled.
93 func (c *connection) PeerCancel(r request) bool {
94         if c.PeerRequests == nil {
95                 return false
96         }
97         if _, ok := c.PeerRequests[r]; !ok {
98                 return false
99         }
100         delete(c.PeerRequests, r)
101         return true
102 }
103
104 func (c *connection) Unchoke() {
105         if !c.Choked {
106                 return
107         }
108         c.Post(peer_protocol.Message{
109                 Type: peer_protocol.Unchoke,
110         })
111         c.Choked = false
112 }
113
114 func (c *connection) SetInterested(interested bool) {
115         if c.Interested == interested {
116                 return
117         }
118         c.Post(peer_protocol.Message{
119                 Type: func() peer_protocol.MessageType {
120                         if interested {
121                                 return peer_protocol.Interested
122                         } else {
123                                 return peer_protocol.NotInterested
124                         }
125                 }(),
126         })
127         c.Interested = interested
128 }
129
130 var (
131         // Four consecutive zero bytes that comprise a keep alive on the wire.
132         keepAliveBytes [4]byte
133 )
134
135 func (conn *connection) writer() {
136         timer := time.NewTimer(0)
137         defer timer.Stop()
138         for {
139                 if !timer.Reset(time.Minute) {
140                         <-timer.C
141                 }
142                 var b []byte
143                 select {
144                 case <-timer.C:
145                         b = keepAliveBytes[:]
146                 case b = <-conn.write:
147                         if b == nil {
148                                 return
149                         }
150                 }
151                 _, err := conn.Socket.Write(b)
152                 if conn.getClosed() {
153                         break
154                 }
155                 if err != nil {
156                         log.Print(err)
157                         break
158                 }
159         }
160 }
161
162 func (conn *connection) writeOptimizer() {
163         pending := list.New()
164         var nextWrite []byte
165         defer close(conn.write)
166         for {
167                 write := conn.write
168                 if pending.Len() == 0 {
169                         write = nil
170                 } else {
171                         var err error
172                         nextWrite, err = pending.Front().Value.(encoding.BinaryMarshaler).MarshalBinary()
173                         if err != nil {
174                                 panic(err)
175                         }
176                 }
177                 select {
178                 case msg, ok := <-conn.post:
179                         if !ok {
180                                 return
181                         }
182                         pending.PushBack(msg)
183                 case write <- nextWrite:
184                         pending.Remove(pending.Front())
185                 }
186         }
187 }