15 "github.com/anacrolix/libtorgo/bencode"
17 "github.com/anacrolix/torrent/internal/pieceordering"
18 pp "github.com/anacrolix/torrent/peer_protocol"
21 var optimizedCancels = expvar.NewInt("optimizedCancels")
26 peerSourceIncoming = 'I'
31 // Maintains the state of a connection with a peer.
32 type connection struct {
34 rw io.ReadWriter // The real slim shady
39 mu sync.Mutex // Only for closing.
43 // The connections preferred order to download pieces.
45 // The piece request order based on piece priorities.
46 pieceRequestOrder *pieceordering.Instance
48 UnwantedChunksReceived int
49 UsefulChunksReceived int
51 lastMessageReceived time.Time
52 completedHandshake time.Time
53 lastUsefulChunkReceived time.Time
55 // Stuff controlled by the local peer.
58 Requests map[request]struct{}
61 // Stuff controlled by the remote peer.
65 PeerRequests map[request]struct{}
66 PeerExtensionBytes peerExtensionBytes
67 // Whether the peer has the given piece. nil if they've not sent any
68 // related messages yet.
72 PeerMaxRequests int // Maximum pending requests the peer allows.
73 PeerExtensionIDs map[string]int64
77 func newConnection() (c *connection) {
83 closing: make(chan struct{}),
84 writeCh: make(chan []byte),
85 post: make(chan pp.Message),
90 func (cn *connection) remoteAddr() net.Addr {
91 return cn.conn.RemoteAddr()
94 func (cn *connection) localAddr() net.Addr {
95 return cn.conn.LocalAddr()
98 // Adjust piece position in the request order for this connection based on the
99 // given piece priority.
100 func (cn *connection) pendPiece(piece int, priority piecePriority) {
101 if priority == piecePriorityNone {
102 cn.pieceRequestOrder.DeletePiece(piece)
105 pp := cn.piecePriorities[piece]
106 // Priority goes to Now, then Next in connection order. Then Readahead in
107 // by piece index. Then normal again by connection order.
110 case piecePriorityNow:
111 return -3*len(cn.piecePriorities) + 3*pp
112 case piecePriorityNext:
113 return -2*len(cn.piecePriorities) + 2*pp
114 case piecePriorityReadahead:
115 return -len(cn.piecePriorities) + pp
116 case piecePriorityNormal:
122 cn.pieceRequestOrder.SetPiece(piece, key)
125 func (cn *connection) supportsExtension(ext string) bool {
126 _, ok := cn.PeerExtensionIDs[ext]
130 func (cn *connection) completedString(t *torrent) string {
131 if cn.PeerPieces == nil && !cn.peerHasAll {
134 return fmt.Sprintf("%d/%d", func() int {
142 for _, b := range cn.PeerPieces {
149 if cn.peerHasAll || cn.PeerPieces == nil {
155 return len(cn.PeerPieces)
159 // Correct the PeerPieces slice length. Return false if the existing slice is
160 // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
162 func (cn *connection) setNumPieces(num int) error {
166 if cn.PeerPieces == nil {
169 if len(cn.PeerPieces) == num {
170 } else if len(cn.PeerPieces) < num {
171 cn.PeerPieces = append(cn.PeerPieces, make([]bool, num-len(cn.PeerPieces))...)
172 } else if len(cn.PeerPieces) <= (num+7)/8*8 {
173 for _, have := range cn.PeerPieces[num:] {
175 return errors.New("peer has invalid piece")
178 cn.PeerPieces = cn.PeerPieces[:num]
180 return fmt.Errorf("peer bitfield is excessively long: expected %d, have %d", num, len(cn.PeerPieces))
182 if len(cn.PeerPieces) != num {
188 func eventAgeString(t time.Time) string {
192 return fmt.Sprintf("%.2fs ago", time.Now().Sub(t).Seconds())
195 // Inspired by https://trac.transmissionbt.com/wiki/PeerStatusText
196 func (cn *connection) statusFlags() (ret string) {
198 ret += string([]byte{b})
210 if cn.Discovery != 0 {
211 c(byte(cn.Discovery))
217 if cn.PeerInterested {
226 func (cn *connection) WriteStatus(w io.Writer, t *torrent) {
227 // \t isn't preserved in <pre> blocks?
228 fmt.Fprintf(w, "%q: %s-%s\n", cn.PeerID, cn.localAddr(), cn.remoteAddr())
229 fmt.Fprintf(w, " last msg: %s, connected: %s, last useful chunk: %s\n",
230 eventAgeString(cn.lastMessageReceived),
231 eventAgeString(cn.completedHandshake),
232 eventAgeString(cn.lastUsefulChunkReceived))
233 fmt.Fprintf(w, " %s completed, good chunks: %d/%d reqs: %d-%d, flags: %s\n",
234 cn.completedString(t),
235 cn.UsefulChunksReceived,
236 cn.UnwantedChunksReceived+cn.UsefulChunksReceived,
238 len(cn.PeerRequests),
242 func (c *connection) Close() {
251 // TODO: This call blocks sometimes, why?
255 func (c *connection) PeerHasPiece(piece int) bool {
259 if piece >= len(c.PeerPieces) {
262 return c.PeerPieces[piece]
265 func (c *connection) Post(msg pp.Message) {
272 func (c *connection) RequestPending(r request) bool {
273 _, ok := c.Requests[r]
277 // Returns true if more requests can be sent.
278 func (c *connection) Request(chunk request) bool {
279 if len(c.Requests) >= c.PeerMaxRequests {
282 if !c.PeerHasPiece(int(chunk.Index)) {
285 if c.RequestPending(chunk) {
288 c.SetInterested(true)
292 if c.Requests == nil {
293 c.Requests = make(map[request]struct{}, c.PeerMaxRequests)
295 c.Requests[chunk] = struct{}{}
296 c.requestsLowWater = len(c.Requests) / 2
301 Length: chunk.Length,
306 // Returns true if an unsatisfied request was canceled.
307 func (c *connection) Cancel(r request) bool {
308 if c.Requests == nil {
311 if _, ok := c.Requests[r]; !ok {
314 delete(c.Requests, r)
324 // Returns true if an unsatisfied request was canceled.
325 func (c *connection) PeerCancel(r request) bool {
326 if c.PeerRequests == nil {
329 if _, ok := c.PeerRequests[r]; !ok {
332 delete(c.PeerRequests, r)
336 func (c *connection) Choke() {
346 func (c *connection) Unchoke() {
356 func (c *connection) SetInterested(interested bool) {
357 if c.Interested == interested {
361 Type: func() pp.MessageType {
365 return pp.NotInterested
369 c.Interested = interested
372 // Writes buffers to the socket from the write channel.
373 func (conn *connection) writer() {
374 // Reduce write syscalls.
375 buf := bufio.NewWriterSize(conn.rw, 0x8000) // 32 KiB
376 // Receives when buf is not empty.
377 notEmpty := make(chan struct{}, 1)
379 if buf.Buffered() != 0 {
380 // Make sure it's receivable.
382 case notEmpty <- struct{}{}:
387 case b, ok := <-conn.writeCh:
391 _, err := buf.Write(b)
407 func (conn *connection) writeOptimizer(keepAliveDelay time.Duration) {
408 defer close(conn.writeCh) // Responsible for notifying downstream routines.
409 pending := list.New() // Message queue.
410 var nextWrite []byte // Set to nil if we need to need to marshal the next message.
411 timer := time.NewTimer(keepAliveDelay)
413 lastWrite := time.Now()
415 write := conn.writeCh // Set to nil if there's nothing to write.
416 if pending.Len() == 0 {
418 } else if nextWrite == nil {
420 nextWrite, err = pending.Front().Value.(encoding.BinaryMarshaler).MarshalBinary()
428 if pending.Len() != 0 {
431 keepAliveTime := lastWrite.Add(keepAliveDelay)
432 if time.Now().Before(keepAliveTime) {
433 timer.Reset(keepAliveTime.Sub(time.Now()))
436 pending.PushBack(pp.Message{Keepalive: true})
437 case msg, ok := <-conn.post:
441 if msg.Type == pp.Cancel {
442 for e := pending.Back(); e != nil; e = e.Prev() {
443 elemMsg := e.Value.(pp.Message)
444 if elemMsg.Type == pp.Request && msg.Index == elemMsg.Index && msg.Begin == elemMsg.Begin && msg.Length == elemMsg.Length {
446 optimizedCancels.Add(1)
451 pending.PushBack(msg)
452 case write <- nextWrite:
453 pending.Remove(pending.Front())
455 lastWrite = time.Now()
456 if pending.Len() == 0 {
457 timer.Reset(keepAliveDelay)