16 "github.com/anacrolix/torrent/bencode"
17 pp "github.com/anacrolix/torrent/peer_protocol"
20 var optimizedCancels = expvar.NewInt("optimizedCancels")
25 peerSourceTracker = '\x00' // It's the default.
26 peerSourceIncoming = 'I'
31 // Maintains the state of a connection with a peer.
32 type connection struct {
35 rw io.ReadWriter // The real slim shady
40 mu sync.Mutex // Only for closing.
44 UnwantedChunksReceived int
45 UsefulChunksReceived int
48 lastMessageReceived time.Time
49 completedHandshake time.Time
50 lastUsefulChunkReceived time.Time
51 lastChunkSent time.Time
53 // Stuff controlled by the local peer.
56 Requests map[request]struct{}
58 // Indexed by metadata piece, set to true if posted and pending a
60 metadataRequests []bool
63 // Stuff controlled by the remote peer.
67 PeerRequests map[request]struct{}
68 PeerExtensionBytes peerExtensionBytes
69 // Whether the peer has the given piece. nil if they've not sent any
70 // related messages yet.
73 // Pieces we've accepted chunks for from the peer.
74 peerTouchedPieces map[int]struct{}
76 PeerMaxRequests int // Maximum pending requests the peer allows.
77 PeerExtensionIDs map[string]byte
81 func newConnection() (c *connection) {
87 closing: make(chan struct{}),
88 writeCh: make(chan []byte),
89 post: make(chan pp.Message),
94 func (cn *connection) remoteAddr() net.Addr {
95 return cn.conn.RemoteAddr()
98 func (cn *connection) localAddr() net.Addr {
99 return cn.conn.LocalAddr()
102 func (cn *connection) supportsExtension(ext string) bool {
103 _, ok := cn.PeerExtensionIDs[ext]
107 func (cn *connection) completedString(t *torrent) string {
108 if cn.PeerPieces == nil && !cn.peerHasAll {
111 return fmt.Sprintf("%d/%d", func() int {
119 for _, b := range cn.PeerPieces {
126 if cn.peerHasAll || cn.PeerPieces == nil {
132 return len(cn.PeerPieces)
136 // Correct the PeerPieces slice length. Return false if the existing slice is
137 // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
139 func (cn *connection) setNumPieces(num int) error {
143 if cn.PeerPieces == nil {
146 if len(cn.PeerPieces) == num {
147 } else if len(cn.PeerPieces) < num {
148 cn.PeerPieces = append(cn.PeerPieces, make([]bool, num-len(cn.PeerPieces))...)
149 } else if len(cn.PeerPieces) <= (num+7)/8*8 {
150 for _, have := range cn.PeerPieces[num:] {
152 return errors.New("peer has invalid piece")
155 cn.PeerPieces = cn.PeerPieces[:num]
157 return fmt.Errorf("peer bitfield is excessively long: expected %d, have %d", num, len(cn.PeerPieces))
159 if len(cn.PeerPieces) != num {
165 func eventAgeString(t time.Time) string {
169 return fmt.Sprintf("%.2fs ago", time.Now().Sub(t).Seconds())
172 func (cn *connection) connectionFlags() (ret string) {
174 ret += string([]byte{b})
179 if cn.Discovery != 0 {
180 c(byte(cn.Discovery))
188 // Inspired by https://trac.transmissionbt.com/wiki/PeerStatusText
189 func (cn *connection) statusFlags() (ret string) {
191 ret += string([]byte{b})
200 ret += cn.connectionFlags()
202 if cn.PeerInterested {
211 func (cn *connection) String() string {
213 cn.WriteStatus(&buf, nil)
217 func (cn *connection) WriteStatus(w io.Writer, t *torrent) {
218 // \t isn't preserved in <pre> blocks?
219 fmt.Fprintf(w, "%+q: %s-%s\n", cn.PeerID, cn.localAddr(), cn.remoteAddr())
220 fmt.Fprintf(w, " last msg: %s, connected: %s, last useful chunk: %s\n",
221 eventAgeString(cn.lastMessageReceived),
222 eventAgeString(cn.completedHandshake),
223 eventAgeString(cn.lastUsefulChunkReceived))
225 " %s completed, %d pieces touched, good chunks: %d/%d-%d reqq: %d-%d, flags: %s\n",
226 cn.completedString(t),
227 len(cn.peerTouchedPieces),
228 cn.UsefulChunksReceived,
229 cn.UnwantedChunksReceived+cn.UsefulChunksReceived,
232 len(cn.PeerRequests),
237 func (c *connection) Close() {
246 // TODO: This call blocks sometimes, why?
250 func (c *connection) PeerHasPiece(piece int) bool {
254 if piece >= len(c.PeerPieces) {
257 return c.PeerPieces[piece]
260 func (c *connection) Post(msg pp.Message) {
267 func (c *connection) RequestPending(r request) bool {
268 _, ok := c.Requests[r]
272 func (c *connection) requestMetadataPiece(index int) {
273 eID := c.PeerExtensionIDs["ut_metadata"]
277 if index < len(c.metadataRequests) && c.metadataRequests[index] {
283 ExtendedPayload: func() []byte {
284 b, err := bencode.Marshal(map[string]int{
285 "msg_type": pp.RequestMetadataExtensionMsgType,
294 for index >= len(c.metadataRequests) {
295 c.metadataRequests = append(c.metadataRequests, false)
297 c.metadataRequests[index] = true
300 func (c *connection) requestedMetadataPiece(index int) bool {
301 return index < len(c.metadataRequests) && c.metadataRequests[index]
304 // Returns true if more requests can be sent.
305 func (c *connection) Request(chunk request) bool {
306 if len(c.Requests) >= c.PeerMaxRequests {
309 if !c.PeerHasPiece(int(chunk.Index)) {
312 if c.RequestPending(chunk) {
315 c.SetInterested(true)
319 if c.Requests == nil {
320 c.Requests = make(map[request]struct{}, c.PeerMaxRequests)
322 c.Requests[chunk] = struct{}{}
323 c.requestsLowWater = len(c.Requests) / 2
328 Length: chunk.Length,
333 // Returns true if an unsatisfied request was canceled.
334 func (c *connection) Cancel(r request) bool {
335 if c.Requests == nil {
338 if _, ok := c.Requests[r]; !ok {
341 delete(c.Requests, r)
351 // Returns true if an unsatisfied request was canceled.
352 func (c *connection) PeerCancel(r request) bool {
353 if c.PeerRequests == nil {
356 if _, ok := c.PeerRequests[r]; !ok {
359 delete(c.PeerRequests, r)
363 func (c *connection) Choke() {
374 func (c *connection) Unchoke() {
384 func (c *connection) SetInterested(interested bool) {
385 if c.Interested == interested {
389 Type: func() pp.MessageType {
393 return pp.NotInterested
397 c.Interested = interested
401 // Track connection writer buffer writes and flushes, to determine its
403 connectionWriterFlush = expvar.NewInt("connectionWriterFlush")
404 connectionWriterWrite = expvar.NewInt("connectionWriterWrite")
407 // Writes buffers to the socket from the write channel.
408 func (conn *connection) writer() {
409 // Reduce write syscalls.
410 buf := bufio.NewWriter(conn.rw)
412 if buf.Buffered() == 0 {
413 // There's nothing to write, so block until we get something.
415 case b, ok := <-conn.writeCh:
419 connectionWriterWrite.Add(1)
420 _, err := buf.Write(b)
429 // We already have something to write, so flush if there's nothing
432 case b, ok := <-conn.writeCh:
436 connectionWriterWrite.Add(1)
437 _, err := buf.Write(b)
445 connectionWriterFlush.Add(1)
456 func (conn *connection) writeOptimizer(keepAliveDelay time.Duration) {
457 defer close(conn.writeCh) // Responsible for notifying downstream routines.
458 pending := list.New() // Message queue.
459 var nextWrite []byte // Set to nil if we need to need to marshal the next message.
460 timer := time.NewTimer(keepAliveDelay)
462 lastWrite := time.Now()
464 write := conn.writeCh // Set to nil if there's nothing to write.
465 if pending.Len() == 0 {
467 } else if nextWrite == nil {
469 nextWrite, err = pending.Front().Value.(encoding.BinaryMarshaler).MarshalBinary()
477 if pending.Len() != 0 {
480 keepAliveTime := lastWrite.Add(keepAliveDelay)
481 if time.Now().Before(keepAliveTime) {
482 timer.Reset(keepAliveTime.Sub(time.Now()))
485 pending.PushBack(pp.Message{Keepalive: true})
486 case msg, ok := <-conn.post:
490 if msg.Type == pp.Cancel {
491 for e := pending.Back(); e != nil; e = e.Prev() {
492 elemMsg := e.Value.(pp.Message)
493 if elemMsg.Type == pp.Request && msg.Index == elemMsg.Index && msg.Begin == elemMsg.Begin && msg.Length == elemMsg.Length {
495 optimizedCancels.Add(1)
500 pending.PushBack(msg)
501 case write <- nextWrite:
502 pending.Remove(pending.Front())
504 lastWrite = time.Now()
505 if pending.Len() == 0 {
506 timer.Reset(keepAliveDelay)
514 func (cn *connection) Have(piece int) {
515 for piece >= len(cn.sentHaves) {
516 cn.sentHaves = append(cn.sentHaves, false)
518 if cn.sentHaves[piece] {
523 Index: pp.Integer(piece),
525 cn.sentHaves[piece] = true
528 func (cn *connection) Bitfield(haves []bool) {
529 if cn.sentHaves != nil {
530 panic("bitfield must be first have-related message sent")
539 func (c *connection) updateRequests() {
547 if len(c.Requests) > c.requestsLowWater {
552 if len(c.Requests) == 0 && !c.PeerChoked {
553 // So we're not choked, but we don't want anything right now. We may
554 // have completed readahead, and the readahead window has not rolled
555 // over to the next piece. Better to stay interested in case we're
556 // going to want data in the near future.
557 c.SetInterested(!c.t.haveAllPieces())
561 func (c *connection) fillRequests() {
562 if !c.t.forUrgentPieces(func(piece int) (again bool) {
563 return c.t.connRequestPiecePendingChunks(c, piece)
567 c.t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
568 for i := begin + 1; i < end; i++ {
569 if !c.t.connRequestPiecePendingChunks(c, i) {
575 for i := range c.t.pendingPieces {
576 if !c.t.connRequestPiecePendingChunks(c, i) {