15 "bitbucket.org/anacrolix/go.torrent/internal/pieceordering"
16 pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
19 var optimizedCancels = expvar.NewInt("optimizedCancels")
24 peerSourceIncoming = 'I'
29 // Maintains the state of a connection with a peer.
30 type connection struct {
35 mu sync.Mutex // Only for closing.
40 pieceRequestOrder *pieceordering.Instance
42 UnwantedChunksReceived int
43 UsefulChunksReceived int
45 lastMessageReceived time.Time
46 completedHandshake time.Time
47 lastUsefulChunkReceived time.Time
49 // Stuff controlled by the local peer.
52 Requests map[request]struct{}
54 // Stuff controlled by the remote peer.
58 PeerRequests map[request]struct{}
59 PeerExtensionBytes peerExtensionBytes
60 // Whether the peer has the given piece. nil if they've not sent any
61 // related messages yet.
63 PeerMaxRequests int // Maximum pending requests the peer allows.
64 PeerExtensionIDs map[string]int64
68 func newConnection(sock net.Conn, peb peerExtensionBytes, peerID [20]byte, uTP bool) (c *connection) {
76 PeerExtensionBytes: peb,
79 closing: make(chan struct{}),
80 writeCh: make(chan []byte),
81 post: make(chan pp.Message),
83 completedHandshake: time.Now(),
86 go c.writeOptimizer(time.Minute)
90 func (cn *connection) pendPiece(piece int, priority piecePriority) {
91 if priority == piecePriorityNone {
92 cn.pieceRequestOrder.DeletePiece(piece)
95 key := cn.piecePriorities[piece]
96 // TODO: Have some kind of overlap here, so there's some probabilistic
97 // favouring of higher priority pieces.
99 case piecePriorityReadahead:
100 key -= len(cn.piecePriorities)
101 case piecePriorityNext:
102 key -= 2 * len(cn.piecePriorities)
103 case piecePriorityNow:
104 key -= 3 * len(cn.piecePriorities)
106 cn.pieceRequestOrder.SetPiece(piece, key)
109 func (cn *connection) supportsExtension(ext string) bool {
110 _, ok := cn.PeerExtensionIDs[ext]
114 func (cn *connection) completedString() string {
115 if cn.PeerPieces == nil {
118 // f := float32(cn.piecesPeerHasCount()) / float32(cn.totalPiecesCount())
119 // return fmt.Sprintf("%d%%", int(f*100))
120 return fmt.Sprintf("%d/%d", cn.piecesPeerHasCount(), cn.totalPiecesCount())
123 func (cn *connection) totalPiecesCount() int {
124 return len(cn.PeerPieces)
127 func (cn *connection) piecesPeerHasCount() (count int) {
128 for _, has := range cn.PeerPieces {
136 // Correct the PeerPieces slice length. Return false if the existing slice is
137 // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
139 func (cn *connection) setNumPieces(num int) error {
140 if cn.PeerPieces == nil {
143 if len(cn.PeerPieces) == num {
144 } else if len(cn.PeerPieces) < num {
145 cn.PeerPieces = append(cn.PeerPieces, make([]bool, num-len(cn.PeerPieces))...)
146 } else if len(cn.PeerPieces) <= (num+7)/8*8 {
147 for _, have := range cn.PeerPieces[num:] {
149 return errors.New("peer has invalid piece")
152 cn.PeerPieces = cn.PeerPieces[:num]
154 return fmt.Errorf("peer bitfield is excessively long: expected %d, have %d", num, len(cn.PeerPieces))
156 if len(cn.PeerPieces) != num {
162 func eventAgeString(t time.Time) string {
166 return fmt.Sprintf("%.2fs ago", time.Now().Sub(t).Seconds())
169 func (cn *connection) WriteStatus(w io.Writer) {
170 fmt.Fprintf(w, "%-90s: %s completed, good chunks: %d/%d reqs: %d-%d, last msg: %s, connected: %s, last useful chunk: %s, flags: ", fmt.Sprintf("%q: %s-%s", cn.PeerID, cn.Socket.LocalAddr(), cn.Socket.RemoteAddr()), cn.completedString(), cn.UsefulChunksReceived, cn.UnwantedChunksReceived+cn.UsefulChunksReceived, len(cn.Requests), len(cn.PeerRequests), eventAgeString(cn.lastMessageReceived), eventAgeString(cn.completedHandshake), eventAgeString(cn.lastUsefulChunkReceived))
172 fmt.Fprintf(w, "%c", b)
174 // Inspired by https://trac.transmissionbt.com/wiki/PeerStatusText
175 if len(cn.Requests) != 0 {
178 if cn.PeerChoked && cn.Interested {
182 if cn.PeerInterested {
188 if cn.Discovery != 0 {
189 c(byte(cn.Discovery))
197 func (c *connection) Close() {
206 // TODO: This call blocks sometimes, why?
210 func (c *connection) PeerHasPiece(index pp.Integer) bool {
211 if c.PeerPieces == nil {
214 if int(index) >= len(c.PeerPieces) {
217 return c.PeerPieces[index]
220 func (c *connection) Post(msg pp.Message) {
227 func (c *connection) RequestPending(r request) bool {
228 _, ok := c.Requests[r]
232 // Returns true if more requests can be sent.
233 func (c *connection) Request(chunk request) bool {
234 if len(c.Requests) >= c.PeerMaxRequests {
237 if !c.PeerHasPiece(chunk.Index) {
240 if c.RequestPending(chunk) {
243 c.SetInterested(true)
247 if c.Requests == nil {
248 c.Requests = make(map[request]struct{}, c.PeerMaxRequests)
250 c.Requests[chunk] = struct{}{}
255 Length: chunk.Length,
260 // Returns true if an unsatisfied request was canceled.
261 func (c *connection) Cancel(r request) bool {
262 if c.Requests == nil {
265 if _, ok := c.Requests[r]; !ok {
268 delete(c.Requests, r)
278 // Returns true if an unsatisfied request was canceled.
279 func (c *connection) PeerCancel(r request) bool {
280 if c.PeerRequests == nil {
283 if _, ok := c.PeerRequests[r]; !ok {
286 delete(c.PeerRequests, r)
290 func (c *connection) Choke() {
300 func (c *connection) Unchoke() {
310 func (c *connection) SetInterested(interested bool) {
311 if c.Interested == interested {
315 Type: func() pp.MessageType {
319 return pp.NotInterested
323 c.Interested = interested
327 // Four consecutive zero bytes that comprise a keep alive on the wire.
328 keepAliveBytes [4]byte
331 // Writes buffers to the socket from the write channel.
332 func (conn *connection) writer() {
333 // Reduce write syscalls.
334 buf := bufio.NewWriterSize(conn.Socket, 0x8000) // 32 KiB
335 // Returns immediately if the buffer contains data.
336 notEmpty := make(chan struct{}, 1)
338 if buf.Buffered() != 0 {
340 case notEmpty <- struct{}{}:
345 case b, ok := <-conn.writeCh:
349 _, err := buf.Write(b)
365 func (conn *connection) writeOptimizer(keepAliveDelay time.Duration) {
366 defer close(conn.writeCh) // Responsible for notifying downstream routines.
367 pending := list.New() // Message queue.
368 var nextWrite []byte // Set to nil if we need to need to marshal the next message.
369 timer := time.NewTimer(keepAliveDelay)
371 lastWrite := time.Now()
373 write := conn.writeCh // Set to nil if there's nothing to write.
374 if pending.Len() == 0 {
376 } else if nextWrite == nil {
378 nextWrite, err = pending.Front().Value.(encoding.BinaryMarshaler).MarshalBinary()
386 if pending.Len() != 0 {
389 keepAliveTime := lastWrite.Add(keepAliveDelay)
390 if time.Now().Before(keepAliveTime) {
391 timer.Reset(keepAliveTime.Sub(time.Now()))
394 pending.PushBack(pp.Message{Keepalive: true})
395 case msg, ok := <-conn.post:
399 if msg.Type == pp.Cancel {
400 for e := pending.Back(); e != nil; e = e.Prev() {
401 elemMsg := e.Value.(pp.Message)
402 if elemMsg.Type == pp.Request && msg.Index == elemMsg.Index && msg.Begin == elemMsg.Begin && msg.Length == elemMsg.Length {
404 optimizedCancels.Add(1)
409 pending.PushBack(msg)
410 case write <- nextWrite:
411 pending.Remove(pending.Front())
413 lastWrite = time.Now()
414 if pending.Len() == 0 {
415 timer.Reset(keepAliveDelay)