16 "github.com/anacrolix/torrent/bencode"
17 "github.com/anacrolix/torrent/internal/pieceordering"
18 pp "github.com/anacrolix/torrent/peer_protocol"
21 var optimizedCancels = expvar.NewInt("optimizedCancels")
26 peerSourceTracker = '\x00' // It's the default.
27 peerSourceIncoming = 'I'
32 // Maintains the state of a connection with a peer.
33 type connection struct {
35 rw io.ReadWriter // The real slim shady
40 mu sync.Mutex // Only for closing.
44 // The connection's preferred order to download pieces. The index is the
45 // piece, the value is its priority.
47 // The piece request order based on piece priorities.
48 pieceRequestOrder *pieceordering.Instance
50 UnwantedChunksReceived int
51 UsefulChunksReceived int
54 lastMessageReceived time.Time
55 completedHandshake time.Time
56 lastUsefulChunkReceived time.Time
57 lastChunkSent time.Time
59 // Stuff controlled by the local peer.
62 Requests map[request]struct{}
64 // Indexed by metadata piece, set to true if posted and pending a
66 metadataRequests []bool
69 // Stuff controlled by the remote peer.
73 PeerRequests map[request]struct{}
74 PeerExtensionBytes peerExtensionBytes
75 // Whether the peer has the given piece. nil if they've not sent any
76 // related messages yet.
79 // Pieces we've accepted chunks for from the peer.
80 peerTouchedPieces map[int]struct{}
82 PeerMaxRequests int // Maximum pending requests the peer allows.
83 PeerExtensionIDs map[string]byte
87 func newConnection() (c *connection) {
93 closing: make(chan struct{}),
94 writeCh: make(chan []byte),
95 post: make(chan pp.Message),
100 func (cn *connection) remoteAddr() net.Addr {
101 return cn.conn.RemoteAddr()
104 func (cn *connection) localAddr() net.Addr {
105 return cn.conn.LocalAddr()
108 // Adjust piece position in the request order for this connection based on the
109 // given piece priority.
110 func (cn *connection) pendPiece(piece int, priority piecePriority, t *torrent) {
111 if priority == PiecePriorityNone {
112 cn.pieceRequestOrder.DeletePiece(piece)
115 if cn.piecePriorities == nil {
116 cn.piecePriorities = t.newConnPiecePriorities()
118 pp := cn.piecePriorities[piece]
119 // Priority regions not to scale. Within each region, piece is randomized
120 // according to connection.
122 // <-request first -- last->
129 case PiecePriorityNow:
130 return -3*len(cn.piecePriorities) + 3*pp
131 case PiecePriorityNext:
132 return -2*len(cn.piecePriorities) + 2*pp
133 case PiecePriorityReadahead:
134 return -len(cn.piecePriorities) + pp
135 case PiecePriorityNormal:
141 cn.pieceRequestOrder.SetPiece(piece, key)
144 func (cn *connection) supportsExtension(ext string) bool {
145 _, ok := cn.PeerExtensionIDs[ext]
149 func (cn *connection) completedString(t *torrent) string {
150 if cn.PeerPieces == nil && !cn.peerHasAll {
153 return fmt.Sprintf("%d/%d", func() int {
161 for _, b := range cn.PeerPieces {
168 if cn.peerHasAll || cn.PeerPieces == nil {
174 return len(cn.PeerPieces)
178 // Correct the PeerPieces slice length. Return false if the existing slice is
179 // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
181 func (cn *connection) setNumPieces(num int) error {
185 if cn.PeerPieces == nil {
188 if len(cn.PeerPieces) == num {
189 } else if len(cn.PeerPieces) < num {
190 cn.PeerPieces = append(cn.PeerPieces, make([]bool, num-len(cn.PeerPieces))...)
191 } else if len(cn.PeerPieces) <= (num+7)/8*8 {
192 for _, have := range cn.PeerPieces[num:] {
194 return errors.New("peer has invalid piece")
197 cn.PeerPieces = cn.PeerPieces[:num]
199 return fmt.Errorf("peer bitfield is excessively long: expected %d, have %d", num, len(cn.PeerPieces))
201 if len(cn.PeerPieces) != num {
207 func eventAgeString(t time.Time) string {
211 return fmt.Sprintf("%.2fs ago", time.Now().Sub(t).Seconds())
214 func (cn *connection) connectionFlags() (ret string) {
216 ret += string([]byte{b})
221 if cn.Discovery != 0 {
222 c(byte(cn.Discovery))
230 // Inspired by https://trac.transmissionbt.com/wiki/PeerStatusText
231 func (cn *connection) statusFlags() (ret string) {
233 ret += string([]byte{b})
242 ret += cn.connectionFlags()
244 if cn.PeerInterested {
253 func (cn *connection) String() string {
255 cn.WriteStatus(&buf, nil)
259 func (cn *connection) WriteStatus(w io.Writer, t *torrent) {
260 // \t isn't preserved in <pre> blocks?
261 fmt.Fprintf(w, "%+q: %s-%s\n", cn.PeerID, cn.localAddr(), cn.remoteAddr())
262 fmt.Fprintf(w, " last msg: %s, connected: %s, last useful chunk: %s\n",
263 eventAgeString(cn.lastMessageReceived),
264 eventAgeString(cn.completedHandshake),
265 eventAgeString(cn.lastUsefulChunkReceived))
267 " %s completed, %d pieces touched, good chunks: %d/%d-%d reqq: %d-%d, flags: %s\n",
268 cn.completedString(t),
269 len(cn.peerTouchedPieces),
270 cn.UsefulChunksReceived,
271 cn.UnwantedChunksReceived+cn.UsefulChunksReceived,
274 len(cn.PeerRequests),
279 func (c *connection) Close() {
288 // TODO: This call blocks sometimes, why?
292 func (c *connection) PeerHasPiece(piece int) bool {
296 if piece >= len(c.PeerPieces) {
299 return c.PeerPieces[piece]
302 func (c *connection) Post(msg pp.Message) {
309 func (c *connection) RequestPending(r request) bool {
310 _, ok := c.Requests[r]
314 func (c *connection) requestMetadataPiece(index int) {
315 eID := c.PeerExtensionIDs["ut_metadata"]
319 if index < len(c.metadataRequests) && c.metadataRequests[index] {
325 ExtendedPayload: func() []byte {
326 b, err := bencode.Marshal(map[string]int{
327 "msg_type": pp.RequestMetadataExtensionMsgType,
336 for index >= len(c.metadataRequests) {
337 c.metadataRequests = append(c.metadataRequests, false)
339 c.metadataRequests[index] = true
342 func (c *connection) requestedMetadataPiece(index int) bool {
343 return index < len(c.metadataRequests) && c.metadataRequests[index]
346 // Returns true if more requests can be sent.
347 func (c *connection) Request(chunk request) bool {
348 if len(c.Requests) >= c.PeerMaxRequests {
351 if !c.PeerHasPiece(int(chunk.Index)) {
354 if c.RequestPending(chunk) {
357 c.SetInterested(true)
361 if c.Requests == nil {
362 c.Requests = make(map[request]struct{}, c.PeerMaxRequests)
364 c.Requests[chunk] = struct{}{}
365 c.requestsLowWater = len(c.Requests) / 2
370 Length: chunk.Length,
375 // Returns true if an unsatisfied request was canceled.
376 func (c *connection) Cancel(r request) bool {
377 if c.Requests == nil {
380 if _, ok := c.Requests[r]; !ok {
383 delete(c.Requests, r)
393 // Returns true if an unsatisfied request was canceled.
394 func (c *connection) PeerCancel(r request) bool {
395 if c.PeerRequests == nil {
398 if _, ok := c.PeerRequests[r]; !ok {
401 delete(c.PeerRequests, r)
405 func (c *connection) Choke() {
416 func (c *connection) Unchoke() {
426 func (c *connection) SetInterested(interested bool) {
427 if c.Interested == interested {
431 Type: func() pp.MessageType {
435 return pp.NotInterested
439 c.Interested = interested
443 // Track connection writer buffer writes and flushes, to determine its
445 connectionWriterFlush = expvar.NewInt("connectionWriterFlush")
446 connectionWriterWrite = expvar.NewInt("connectionWriterWrite")
449 // Writes buffers to the socket from the write channel.
450 func (conn *connection) writer() {
451 // Reduce write syscalls.
452 buf := bufio.NewWriter(conn.rw)
454 if buf.Buffered() == 0 {
455 // There's nothing to write, so block until we get something.
457 case b, ok := <-conn.writeCh:
461 connectionWriterWrite.Add(1)
462 _, err := buf.Write(b)
471 // We already have something to write, so flush if there's nothing
474 case b, ok := <-conn.writeCh:
478 connectionWriterWrite.Add(1)
479 _, err := buf.Write(b)
487 connectionWriterFlush.Add(1)
498 func (conn *connection) writeOptimizer(keepAliveDelay time.Duration) {
499 defer close(conn.writeCh) // Responsible for notifying downstream routines.
500 pending := list.New() // Message queue.
501 var nextWrite []byte // Set to nil if we need to need to marshal the next message.
502 timer := time.NewTimer(keepAliveDelay)
504 lastWrite := time.Now()
506 write := conn.writeCh // Set to nil if there's nothing to write.
507 if pending.Len() == 0 {
509 } else if nextWrite == nil {
511 nextWrite, err = pending.Front().Value.(encoding.BinaryMarshaler).MarshalBinary()
519 if pending.Len() != 0 {
522 keepAliveTime := lastWrite.Add(keepAliveDelay)
523 if time.Now().Before(keepAliveTime) {
524 timer.Reset(keepAliveTime.Sub(time.Now()))
527 pending.PushBack(pp.Message{Keepalive: true})
528 case msg, ok := <-conn.post:
532 if msg.Type == pp.Cancel {
533 for e := pending.Back(); e != nil; e = e.Prev() {
534 elemMsg := e.Value.(pp.Message)
535 if elemMsg.Type == pp.Request && msg.Index == elemMsg.Index && msg.Begin == elemMsg.Begin && msg.Length == elemMsg.Length {
537 optimizedCancels.Add(1)
542 pending.PushBack(msg)
543 case write <- nextWrite:
544 pending.Remove(pending.Front())
546 lastWrite = time.Now()
547 if pending.Len() == 0 {
548 timer.Reset(keepAliveDelay)
556 func (cn *connection) Have(piece int) {
557 for piece >= len(cn.sentHaves) {
558 cn.sentHaves = append(cn.sentHaves, false)
560 if cn.sentHaves[piece] {
565 Index: pp.Integer(piece),
567 cn.sentHaves[piece] = true
570 func (cn *connection) Bitfield(haves []bool) {
571 if cn.sentHaves != nil {
572 panic("bitfield must be first have-related message sent")