]> Sergey Matveev's repositories - btrtrc.git/blob - connection.go
Can now download from magnet links
[btrtrc.git] / connection.go
1 package torrent
2
3 import (
4         "container/list"
5         "encoding"
6         "fmt"
7         "io"
8         "log"
9         "net"
10         "sync"
11         "time"
12
13         "bitbucket.org/anacrolix/go.torrent/peer_protocol"
14 )
15
16 // Maintains the state of a connection with a peer.
17 type connection struct {
18         Socket net.Conn
19         closed bool
20         mu     sync.Mutex // Only for closing.
21         post   chan peer_protocol.Message
22         write  chan []byte
23
24         // Stuff controlled by the local peer.
25         Interested bool
26         Choked     bool
27         Requests   map[request]struct{}
28
29         // Stuff controlled by the remote peer.
30         PeerId           [20]byte
31         PeerInterested   bool
32         PeerChoked       bool
33         PeerRequests     map[request]struct{}
34         PeerExtensions   [8]byte
35         PeerPieces       []bool
36         PeerMaxRequests  int // Maximum pending requests the peer allows.
37         PeerExtensionIDs map[string]int64
38 }
39
40 func (cn *connection) completedString() string {
41         if cn.PeerPieces == nil {
42                 return "?"
43         }
44         f := float32(cn.piecesPeerHasCount()) / float32(cn.totalPiecesCount())
45         return fmt.Sprintf("%d%%", int(f*100))
46 }
47
48 func (cn *connection) totalPiecesCount() int {
49         return len(cn.PeerPieces)
50 }
51
52 func (cn *connection) piecesPeerHasCount() (count int) {
53         for _, has := range cn.PeerPieces {
54                 if has {
55                         count++
56                 }
57         }
58         return
59 }
60
61 func (cn *connection) WriteStatus(w io.Writer) {
62         fmt.Fprintf(w, "%q: %s-%s: %s completed: ", cn.PeerId, cn.Socket.LocalAddr(), cn.Socket.RemoteAddr(), cn.completedString())
63         c := func(b byte) {
64                 fmt.Fprintf(w, "%c", b)
65         }
66         // https://trac.transmissionbt.com/wiki/PeerStatusText
67         if len(cn.Requests) != 0 {
68                 c('D')
69         } else if cn.Interested {
70                 c('d')
71         }
72         if !cn.PeerChoked && !cn.Interested {
73                 c('K')
74         }
75         if !cn.Choked && !cn.PeerInterested {
76                 c('?')
77         }
78         fmt.Fprintln(w)
79 }
80
81 func (c *connection) Close() {
82         c.mu.Lock()
83         if c.closed {
84                 return
85         }
86         c.Socket.Close()
87         close(c.post)
88         c.closed = true
89         c.mu.Unlock()
90 }
91
92 func (c *connection) getClosed() bool {
93         c.mu.Lock()
94         defer c.mu.Unlock()
95         return c.closed
96 }
97
98 func (c *connection) PeerHasPiece(index peer_protocol.Integer) bool {
99         if c.PeerPieces == nil {
100                 return false
101         }
102         return c.PeerPieces[index]
103 }
104
105 func (c *connection) Post(msg peer_protocol.Message) {
106         c.post <- msg
107 }
108
109 func (c *connection) RequestPending(r request) bool {
110         _, ok := c.Requests[r]
111         return ok
112 }
113
114 // Returns true if more requests can be sent.
115 func (c *connection) Request(chunk request) bool {
116         if len(c.Requests) >= c.PeerMaxRequests {
117                 return false
118         }
119         if !c.PeerHasPiece(chunk.Index) {
120                 return true
121         }
122         if c.RequestPending(chunk) {
123                 return true
124         }
125         c.SetInterested(true)
126         if c.PeerChoked {
127                 return false
128         }
129         if c.Requests == nil {
130                 c.Requests = make(map[request]struct{}, c.PeerMaxRequests)
131         }
132         c.Requests[chunk] = struct{}{}
133         c.Post(peer_protocol.Message{
134                 Type:   peer_protocol.Request,
135                 Index:  chunk.Index,
136                 Begin:  chunk.Begin,
137                 Length: chunk.Length,
138         })
139         return true
140 }
141
142 // Returns true if an unsatisfied request was canceled.
143 func (c *connection) Cancel(r request) bool {
144         if c.Requests == nil {
145                 return false
146         }
147         if _, ok := c.Requests[r]; !ok {
148                 return false
149         }
150         delete(c.Requests, r)
151         c.Post(peer_protocol.Message{
152                 Type:   peer_protocol.Cancel,
153                 Index:  r.Index,
154                 Begin:  r.Begin,
155                 Length: r.Length,
156         })
157         return true
158 }
159
160 // Returns true if an unsatisfied request was canceled.
161 func (c *connection) PeerCancel(r request) bool {
162         if c.PeerRequests == nil {
163                 return false
164         }
165         if _, ok := c.PeerRequests[r]; !ok {
166                 return false
167         }
168         delete(c.PeerRequests, r)
169         return true
170 }
171
172 func (c *connection) Choke() {
173         if c.Choked {
174                 return
175         }
176         c.Post(peer_protocol.Message{
177                 Type: peer_protocol.Choke,
178         })
179         c.Choked = true
180 }
181
182 func (c *connection) Unchoke() {
183         if !c.Choked {
184                 return
185         }
186         c.Post(peer_protocol.Message{
187                 Type: peer_protocol.Unchoke,
188         })
189         c.Choked = false
190 }
191
192 func (c *connection) SetInterested(interested bool) {
193         if c.Interested == interested {
194                 return
195         }
196         c.Post(peer_protocol.Message{
197                 Type: func() peer_protocol.MessageType {
198                         if interested {
199                                 return peer_protocol.Interested
200                         } else {
201                                 return peer_protocol.NotInterested
202                         }
203                 }(),
204         })
205         c.Interested = interested
206 }
207
208 var (
209         // Four consecutive zero bytes that comprise a keep alive on the wire.
210         keepAliveBytes [4]byte
211 )
212
213 // Writes buffers to the socket from the write channel.
214 func (conn *connection) writer() {
215         for b := range conn.write {
216                 _, err := conn.Socket.Write(b)
217                 // log.Printf("wrote %q to %s", b, conn.Socket.RemoteAddr())
218                 if err != nil {
219                         if !conn.getClosed() {
220                                 log.Print(err)
221                         }
222                         break
223                 }
224         }
225 }
226
227 func (conn *connection) writeOptimizer(keepAliveDelay time.Duration) {
228         defer close(conn.write) // Responsible for notifying downstream routines.
229         pending := list.New()   // Message queue.
230         var nextWrite []byte    // Set to nil if we need to need to marshal the next message.
231         timer := time.NewTimer(keepAliveDelay)
232         defer timer.Stop()
233         lastWrite := time.Now()
234         for {
235                 write := conn.write // Set to nil if there's nothing to write.
236                 if pending.Len() == 0 {
237                         write = nil
238                 } else if nextWrite == nil {
239                         var err error
240                         nextWrite, err = pending.Front().Value.(encoding.BinaryMarshaler).MarshalBinary()
241                         if err != nil {
242                                 panic(err)
243                         }
244                 }
245         event:
246                 select {
247                 case <-timer.C:
248                         if pending.Len() != 0 {
249                                 break
250                         }
251                         keepAliveTime := lastWrite.Add(keepAliveDelay)
252                         if time.Now().Before(keepAliveTime) {
253                                 timer.Reset(keepAliveTime.Sub(time.Now()))
254                                 break
255                         }
256                         pending.PushBack(peer_protocol.Message{Keepalive: true})
257                 case msg, ok := <-conn.post:
258                         if !ok {
259                                 return
260                         }
261                         if msg.Type == peer_protocol.Cancel {
262                                 for e := pending.Back(); e != nil; e = e.Prev() {
263                                         elemMsg := e.Value.(peer_protocol.Message)
264                                         if elemMsg.Type == peer_protocol.Request && msg.Index == elemMsg.Index && msg.Begin == elemMsg.Begin && msg.Length == elemMsg.Length {
265                                                 pending.Remove(e)
266                                                 log.Printf("optimized cancel! %v", msg)
267                                                 break event
268                                         }
269                                 }
270                         }
271                         pending.PushBack(msg)
272                 case write <- nextWrite:
273                         pending.Remove(pending.Front())
274                         nextWrite = nil
275                         lastWrite = time.Now()
276                         if pending.Len() == 0 {
277                                 timer.Reset(keepAliveDelay)
278                         }
279                 }
280         }
281 }