15 "github.com/anacrolix/torrent/bencode"
16 "github.com/anacrolix/torrent/internal/pieceordering"
17 pp "github.com/anacrolix/torrent/peer_protocol"
20 var optimizedCancels = expvar.NewInt("optimizedCancels")
25 peerSourceIncoming = 'I'
30 // Maintains the state of a connection with a peer.
31 type connection struct {
33 rw io.ReadWriter // The real slim shady
38 mu sync.Mutex // Only for closing.
42 // The connection's preferred order to download pieces. The index is the
43 // piece, the value is its priority.
45 // The piece request order based on piece priorities.
46 pieceRequestOrder *pieceordering.Instance
48 UnwantedChunksReceived int
49 UsefulChunksReceived int
52 lastMessageReceived time.Time
53 completedHandshake time.Time
54 lastUsefulChunkReceived time.Time
56 // Stuff controlled by the local peer.
59 Requests map[request]struct{}
61 // Indexed by metadata piece, set to true if posted and pending a
63 metadataRequests []bool
65 // Stuff controlled by the remote peer.
69 PeerRequests map[request]struct{}
70 PeerExtensionBytes peerExtensionBytes
71 // Whether the peer has the given piece. nil if they've not sent any
72 // related messages yet.
76 PeerMaxRequests int // Maximum pending requests the peer allows.
77 PeerExtensionIDs map[string]byte
81 func newConnection() (c *connection) {
87 closing: make(chan struct{}),
88 writeCh: make(chan []byte),
89 post: make(chan pp.Message),
94 func (cn *connection) remoteAddr() net.Addr {
95 return cn.conn.RemoteAddr()
98 func (cn *connection) localAddr() net.Addr {
99 return cn.conn.LocalAddr()
102 // Adjust piece position in the request order for this connection based on the
103 // given piece priority.
104 func (cn *connection) pendPiece(piece int, priority piecePriority) {
105 if priority == PiecePriorityNone {
106 cn.pieceRequestOrder.DeletePiece(piece)
109 pp := cn.piecePriorities[piece]
110 // Priority regions not to scale. Within each region, piece is randomized
111 // according to connection.
113 // <-request first -- last->
120 case PiecePriorityNow:
121 return -3*len(cn.piecePriorities) + 3*pp
122 case PiecePriorityNext:
123 return -2*len(cn.piecePriorities) + 2*pp
124 case PiecePriorityReadahead:
125 return -len(cn.piecePriorities) + pp
126 case PiecePriorityNormal:
132 cn.pieceRequestOrder.SetPiece(piece, key)
135 func (cn *connection) supportsExtension(ext string) bool {
136 _, ok := cn.PeerExtensionIDs[ext]
140 func (cn *connection) completedString(t *torrent) string {
141 if cn.PeerPieces == nil && !cn.peerHasAll {
144 return fmt.Sprintf("%d/%d", func() int {
152 for _, b := range cn.PeerPieces {
159 if cn.peerHasAll || cn.PeerPieces == nil {
165 return len(cn.PeerPieces)
169 // Correct the PeerPieces slice length. Return false if the existing slice is
170 // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
172 func (cn *connection) setNumPieces(num int) error {
176 if cn.PeerPieces == nil {
179 if len(cn.PeerPieces) == num {
180 } else if len(cn.PeerPieces) < num {
181 cn.PeerPieces = append(cn.PeerPieces, make([]bool, num-len(cn.PeerPieces))...)
182 } else if len(cn.PeerPieces) <= (num+7)/8*8 {
183 for _, have := range cn.PeerPieces[num:] {
185 return errors.New("peer has invalid piece")
188 cn.PeerPieces = cn.PeerPieces[:num]
190 return fmt.Errorf("peer bitfield is excessively long: expected %d, have %d", num, len(cn.PeerPieces))
192 if len(cn.PeerPieces) != num {
198 func eventAgeString(t time.Time) string {
202 return fmt.Sprintf("%.2fs ago", time.Now().Sub(t).Seconds())
205 // Inspired by https://trac.transmissionbt.com/wiki/PeerStatusText
206 func (cn *connection) statusFlags() (ret string) {
208 ret += string([]byte{b})
220 if cn.Discovery != 0 {
221 c(byte(cn.Discovery))
227 if cn.PeerInterested {
236 func (cn *connection) WriteStatus(w io.Writer, t *torrent) {
237 // \t isn't preserved in <pre> blocks?
238 fmt.Fprintf(w, "%q: %s-%s\n", cn.PeerID, cn.localAddr(), cn.remoteAddr())
239 fmt.Fprintf(w, " last msg: %s, connected: %s, last useful chunk: %s\n",
240 eventAgeString(cn.lastMessageReceived),
241 eventAgeString(cn.completedHandshake),
242 eventAgeString(cn.lastUsefulChunkReceived))
244 " %s completed, good chunks: %d/%d-%d reqq: %d-%d, flags: %s\n",
245 cn.completedString(t),
246 cn.UsefulChunksReceived,
247 cn.UnwantedChunksReceived+cn.UsefulChunksReceived,
250 len(cn.PeerRequests),
255 func (c *connection) Close() {
264 // TODO: This call blocks sometimes, why?
268 func (c *connection) PeerHasPiece(piece int) bool {
272 if piece >= len(c.PeerPieces) {
275 return c.PeerPieces[piece]
278 func (c *connection) Post(msg pp.Message) {
285 func (c *connection) RequestPending(r request) bool {
286 _, ok := c.Requests[r]
290 func (c *connection) requestMetadataPiece(index int) {
291 eID := c.PeerExtensionIDs["ut_metadata"]
295 if index < len(c.metadataRequests) && c.metadataRequests[index] {
301 ExtendedPayload: func() []byte {
302 b, err := bencode.Marshal(map[string]int{
303 "msg_type": pp.RequestMetadataExtensionMsgType,
312 for index >= len(c.metadataRequests) {
313 c.metadataRequests = append(c.metadataRequests, false)
315 c.metadataRequests[index] = true
318 func (c *connection) requestedMetadataPiece(index int) bool {
319 return index < len(c.metadataRequests) && c.metadataRequests[index]
322 // Returns true if more requests can be sent.
323 func (c *connection) Request(chunk request) bool {
324 if len(c.Requests) >= c.PeerMaxRequests {
327 if !c.PeerHasPiece(int(chunk.Index)) {
330 if c.RequestPending(chunk) {
333 c.SetInterested(true)
337 if c.Requests == nil {
338 c.Requests = make(map[request]struct{}, c.PeerMaxRequests)
340 c.Requests[chunk] = struct{}{}
341 c.requestsLowWater = len(c.Requests) / 2
346 Length: chunk.Length,
351 // Returns true if an unsatisfied request was canceled.
352 func (c *connection) Cancel(r request) bool {
353 if c.Requests == nil {
356 if _, ok := c.Requests[r]; !ok {
359 delete(c.Requests, r)
369 // Returns true if an unsatisfied request was canceled.
370 func (c *connection) PeerCancel(r request) bool {
371 if c.PeerRequests == nil {
374 if _, ok := c.PeerRequests[r]; !ok {
377 delete(c.PeerRequests, r)
381 func (c *connection) Choke() {
392 func (c *connection) Unchoke() {
402 func (c *connection) SetInterested(interested bool) {
403 if c.Interested == interested {
407 Type: func() pp.MessageType {
411 return pp.NotInterested
415 c.Interested = interested
418 // Writes buffers to the socket from the write channel.
419 func (conn *connection) writer() {
420 // Reduce write syscalls.
421 buf := bufio.NewWriterSize(conn.rw, 0x8000) // 32 KiB
422 // Receives when buf is not empty.
423 notEmpty := make(chan struct{}, 1)
425 if buf.Buffered() != 0 {
426 // Make sure it's receivable.
428 case notEmpty <- struct{}{}:
433 case b, ok := <-conn.writeCh:
437 _, err := buf.Write(b)
453 func (conn *connection) writeOptimizer(keepAliveDelay time.Duration) {
454 defer close(conn.writeCh) // Responsible for notifying downstream routines.
455 pending := list.New() // Message queue.
456 var nextWrite []byte // Set to nil if we need to need to marshal the next message.
457 timer := time.NewTimer(keepAliveDelay)
459 lastWrite := time.Now()
461 write := conn.writeCh // Set to nil if there's nothing to write.
462 if pending.Len() == 0 {
464 } else if nextWrite == nil {
466 nextWrite, err = pending.Front().Value.(encoding.BinaryMarshaler).MarshalBinary()
474 if pending.Len() != 0 {
477 keepAliveTime := lastWrite.Add(keepAliveDelay)
478 if time.Now().Before(keepAliveTime) {
479 timer.Reset(keepAliveTime.Sub(time.Now()))
482 pending.PushBack(pp.Message{Keepalive: true})
483 case msg, ok := <-conn.post:
487 if msg.Type == pp.Cancel {
488 for e := pending.Back(); e != nil; e = e.Prev() {
489 elemMsg := e.Value.(pp.Message)
490 if elemMsg.Type == pp.Request && msg.Index == elemMsg.Index && msg.Begin == elemMsg.Begin && msg.Length == elemMsg.Length {
492 optimizedCancels.Add(1)
497 pending.PushBack(msg)
498 case write <- nextWrite:
499 pending.Remove(pending.Front())
501 lastWrite = time.Now()
502 if pending.Len() == 0 {
503 timer.Reset(keepAliveDelay)