17 "github.com/anacrolix/missinggo"
18 "github.com/anacrolix/missinggo/bitmap"
19 "github.com/anacrolix/missinggo/prioritybitmap"
20 "github.com/anacrolix/missinggo/slices"
21 "github.com/bradfitz/iter"
23 "github.com/anacrolix/torrent/bencode"
24 pp "github.com/anacrolix/torrent/peer_protocol"
27 var optimizedCancels = expvar.NewInt("optimizedCancels")
32 peerSourceTracker = '\x00' // It's the default.
33 peerSourceIncoming = 'I'
38 // Maintains the state of a connection with a peer.
39 type connection struct {
42 rw io.ReadWriter // The real slim shady
46 closed missinggo.Event
49 UnwantedChunksReceived int
50 UsefulChunksReceived int
55 lastMessageReceived time.Time
56 completedHandshake time.Time
57 lastUsefulChunkReceived time.Time
58 lastChunkSent time.Time
60 // Stuff controlled by the local peer.
63 Requests map[request]struct{}
65 // Indexed by metadata piece, set to true if posted and pending a
67 metadataRequests []bool
70 // Stuff controlled by the remote peer.
74 PeerRequests map[request]struct{}
75 PeerExtensionBytes peerExtensionBytes
76 // The pieces the peer has claimed to have.
77 peerPieces bitmap.Bitmap
78 // The peer has everything. This can occur due to a special message, when
79 // we may not even know the number of pieces in the torrent yet.
81 // The highest possible number of pieces the torrent could have based on
82 // communication with the peer. Generally only useful until we have the
85 // Pieces we've accepted chunks for from the peer.
86 peerTouchedPieces map[int]struct{}
88 PeerMaxRequests int // Maximum pending requests the peer allows.
89 PeerExtensionIDs map[string]byte
92 pieceInclination []int
93 pieceRequestOrder prioritybitmap.PriorityBitmap
95 outgoingUnbufferedMessages *list.List
96 outgoingUnbufferedMessagesNotEmpty missinggo.Event
99 func (cn *connection) mu() sync.Locker {
103 func (cl *Client) newConnection(nc net.Conn) (c *connection) {
110 PeerMaxRequests: 250,
115 func (cn *connection) remoteAddr() net.Addr {
116 return cn.conn.RemoteAddr()
119 func (cn *connection) localAddr() net.Addr {
120 return cn.conn.LocalAddr()
123 func (cn *connection) supportsExtension(ext string) bool {
124 _, ok := cn.PeerExtensionIDs[ext]
128 // The best guess at number of pieces in the torrent for this peer.
129 func (cn *connection) bestPeerNumPieces() int {
131 return cn.t.numPieces()
133 return cn.peerMinPieces
136 func (cn *connection) completedString() string {
137 return fmt.Sprintf("%d/%d", cn.peerPieces.Len(), cn.bestPeerNumPieces())
140 // Correct the PeerPieces slice length. Return false if the existing slice is
141 // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
143 func (cn *connection) setNumPieces(num int) error {
144 cn.peerPieces.RemoveRange(num, -1)
145 cn.peerPiecesChanged()
149 func eventAgeString(t time.Time) string {
153 return fmt.Sprintf("%.2fs ago", time.Now().Sub(t).Seconds())
156 func (cn *connection) connectionFlags() (ret string) {
158 ret += string([]byte{b})
163 if cn.Discovery != 0 {
164 c(byte(cn.Discovery))
172 // Inspired by https://trac.transmissionbt.com/wiki/PeerStatusText
173 func (cn *connection) statusFlags() (ret string) {
175 ret += string([]byte{b})
184 ret += cn.connectionFlags()
186 if cn.PeerInterested {
195 func (cn *connection) String() string {
197 cn.WriteStatus(&buf, nil)
201 func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
202 // \t isn't preserved in <pre> blocks?
203 fmt.Fprintf(w, "%+q: %s-%s\n", cn.PeerID, cn.localAddr(), cn.remoteAddr())
204 fmt.Fprintf(w, " last msg: %s, connected: %s, last useful chunk: %s\n",
205 eventAgeString(cn.lastMessageReceived),
206 eventAgeString(cn.completedHandshake),
207 eventAgeString(cn.lastUsefulChunkReceived))
209 " %s completed, %d pieces touched, good chunks: %d/%d-%d reqq: %d-%d, flags: %s\n",
210 cn.completedString(),
211 len(cn.peerTouchedPieces),
212 cn.UsefulChunksReceived,
213 cn.UnwantedChunksReceived+cn.UsefulChunksReceived,
216 len(cn.PeerRequests),
221 func (cn *connection) Close() {
223 cn.discardPieceInclination()
224 cn.pieceRequestOrder.Clear()
226 // TODO: This call blocks sometimes, why?
231 func (cn *connection) PeerHasPiece(piece int) bool {
232 return cn.peerHasAll || cn.peerPieces.Contains(piece)
235 func (cn *connection) Post(msg pp.Message) {
238 for e := cn.outgoingUnbufferedMessages.Back(); e != nil; e = e.Prev() {
239 elemMsg := e.Value.(pp.Message)
240 if elemMsg.Type == pp.Request && elemMsg.Index == msg.Index && elemMsg.Begin == msg.Begin && elemMsg.Length == msg.Length {
241 cn.outgoingUnbufferedMessages.Remove(e)
242 optimizedCancels.Add(1)
247 if cn.outgoingUnbufferedMessages == nil {
248 cn.outgoingUnbufferedMessages = list.New()
250 cn.outgoingUnbufferedMessages.PushBack(msg)
251 cn.outgoingUnbufferedMessagesNotEmpty.Set()
252 postedMessageTypes.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
255 func (cn *connection) RequestPending(r request) bool {
256 _, ok := cn.Requests[r]
260 func (cn *connection) requestMetadataPiece(index int) {
261 eID := cn.PeerExtensionIDs["ut_metadata"]
265 if index < len(cn.metadataRequests) && cn.metadataRequests[index] {
271 ExtendedPayload: func() []byte {
272 b, err := bencode.Marshal(map[string]int{
273 "msg_type": pp.RequestMetadataExtensionMsgType,
282 for index >= len(cn.metadataRequests) {
283 cn.metadataRequests = append(cn.metadataRequests, false)
285 cn.metadataRequests[index] = true
288 func (cn *connection) requestedMetadataPiece(index int) bool {
289 return index < len(cn.metadataRequests) && cn.metadataRequests[index]
292 // The actual value to use as the maximum outbound requests.
293 func (cn *connection) nominalMaxRequests() (ret int) {
294 ret = cn.PeerMaxRequests
301 // Returns true if more requests can be sent.
302 func (cn *connection) Request(chunk request) bool {
303 if len(cn.Requests) >= cn.nominalMaxRequests() {
306 if !cn.PeerHasPiece(int(chunk.Index)) {
309 if cn.RequestPending(chunk) {
312 cn.SetInterested(true)
316 if cn.Requests == nil {
317 cn.Requests = make(map[request]struct{}, cn.PeerMaxRequests)
319 cn.Requests[chunk] = struct{}{}
320 cn.requestsLowWater = len(cn.Requests) / 2
325 Length: chunk.Length,
330 // Returns true if an unsatisfied request was canceled.
331 func (cn *connection) Cancel(r request) bool {
332 if !cn.RequestPending(r) {
335 delete(cn.Requests, r)
345 // Returns true if an unsatisfied request was canceled.
346 func (cn *connection) PeerCancel(r request) bool {
347 if cn.PeerRequests == nil {
350 if _, ok := cn.PeerRequests[r]; !ok {
353 delete(cn.PeerRequests, r)
357 func (cn *connection) Choke() {
364 cn.PeerRequests = nil
368 func (cn *connection) Unchoke() {
378 func (cn *connection) SetInterested(interested bool) {
379 if cn.Interested == interested {
383 Type: func() pp.MessageType {
387 return pp.NotInterested
391 cn.Interested = interested
395 // Track connection writer buffer writes and flushes, to determine its
397 connectionWriterFlush = expvar.NewInt("connectionWriterFlush")
398 connectionWriterWrite = expvar.NewInt("connectionWriterWrite")
401 // Writes buffers to the socket from the write channel.
402 func (cn *connection) writer(keepAliveTimeout time.Duration) {
405 defer cn.mu().Unlock()
408 // Reduce write syscalls.
409 buf := bufio.NewWriter(cn.rw)
410 keepAliveTimer := time.NewTimer(keepAliveTimeout)
413 for cn.outgoingUnbufferedMessages != nil && cn.outgoingUnbufferedMessages.Len() != 0 {
414 msg := cn.outgoingUnbufferedMessages.Remove(cn.outgoingUnbufferedMessages.Front()).(pp.Message)
416 b, err := msg.MarshalBinary()
420 connectionWriterWrite.Add(1)
421 n, err := buf.Write(b)
425 keepAliveTimer.Reset(keepAliveTimeout)
433 cn.outgoingUnbufferedMessagesNotEmpty.Clear()
435 connectionWriterFlush.Add(1)
436 if buf.Buffered() != 0 {
437 if buf.Flush() != nil {
440 keepAliveTimer.Reset(keepAliveTimeout)
443 case <-cn.closed.LockedChan(cn.mu()):
445 case <-cn.outgoingUnbufferedMessagesNotEmpty.LockedChan(cn.mu()):
446 case <-keepAliveTimer.C:
448 cn.Post(pp.Message{Keepalive: true})
450 postedKeepalives.Add(1)
455 func (cn *connection) Have(piece int) {
456 for piece >= len(cn.sentHaves) {
457 cn.sentHaves = append(cn.sentHaves, false)
459 if cn.sentHaves[piece] {
464 Index: pp.Integer(piece),
466 cn.sentHaves[piece] = true
469 func (cn *connection) Bitfield(haves []bool) {
470 if cn.sentHaves != nil {
471 panic("bitfield must be first have-related message sent")
477 // Make a copy of haves, as that's read when the message is marshalled
478 // without the lock. Also it obviously shouldn't change in the Msg due to
479 // changes in .sentHaves.
480 cn.sentHaves = append([]bool(nil), haves...)
483 func (cn *connection) updateRequests() {
484 if !cn.t.haveInfo() {
491 if len(cn.Requests) > cn.requestsLowWater {
496 if len(cn.Requests) == 0 && !cn.PeerChoked {
497 // So we're not choked, but we don't want anything right now. We may
498 // have completed readahead, and the readahead window has not rolled
499 // over to the next piece. Better to stay interested in case we're
500 // going to want data in the near future.
501 cn.SetInterested(!cn.t.haveAllPieces())
505 func (cn *connection) fillRequests() {
506 cn.pieceRequestOrder.IterTyped(func(piece int) (more bool) {
507 if cn.t.cl.config.Debug && cn.t.havePiece(piece) {
510 return cn.requestPiecePendingChunks(piece)
514 func (cn *connection) requestPiecePendingChunks(piece int) (again bool) {
515 return cn.t.connRequestPiecePendingChunks(cn, piece)
518 func (cn *connection) stopRequestingPiece(piece int) {
519 cn.pieceRequestOrder.Remove(piece)
522 func (cn *connection) updatePiecePriority(piece int) {
523 tpp := cn.t.piecePriority(piece)
524 if !cn.PeerHasPiece(piece) {
525 tpp = PiecePriorityNone
527 if tpp == PiecePriorityNone {
528 cn.stopRequestingPiece(piece)
531 prio := cn.getPieceInclination()[piece]
533 case PiecePriorityNormal:
534 case PiecePriorityReadahead:
535 prio -= cn.t.numPieces()
536 case PiecePriorityNext, PiecePriorityNow:
537 prio -= 2 * cn.t.numPieces()
542 cn.pieceRequestOrder.Set(piece, prio)
546 func (cn *connection) getPieceInclination() []int {
547 if cn.pieceInclination == nil {
548 cn.pieceInclination = cn.t.getConnPieceInclination()
550 return cn.pieceInclination
553 func (cn *connection) discardPieceInclination() {
554 if cn.pieceInclination == nil {
557 cn.t.putPieceInclination(cn.pieceInclination)
558 cn.pieceInclination = nil
561 func (cn *connection) peerHasPieceChanged(piece int) {
562 cn.updatePiecePriority(piece)
565 func (cn *connection) peerPiecesChanged() {
567 for i := range iter.N(cn.t.numPieces()) {
568 cn.peerHasPieceChanged(i)
573 func (cn *connection) raisePeerMinPieces(newMin int) {
574 if newMin > cn.peerMinPieces {
575 cn.peerMinPieces = newMin
579 func (cn *connection) peerSentHave(piece int) error {
580 if cn.t.haveInfo() && piece >= cn.t.numPieces() {
581 return errors.New("invalid piece")
583 if cn.PeerHasPiece(piece) {
586 cn.raisePeerMinPieces(piece + 1)
587 cn.peerPieces.Set(piece, true)
588 cn.peerHasPieceChanged(piece)
592 func (cn *connection) peerSentBitfield(bf []bool) error {
593 cn.peerHasAll = false
595 panic("expected bitfield length divisible by 8")
597 // We know that the last byte means that at most the last 7 bits are
599 cn.raisePeerMinPieces(len(bf) - 7)
600 if cn.t.haveInfo() && len(bf) > cn.t.numPieces() {
601 // Ignore known excess pieces.
602 bf = bf[:cn.t.numPieces()]
604 for i, have := range bf {
606 cn.raisePeerMinPieces(i + 1)
608 cn.peerPieces.Set(i, have)
610 cn.peerPiecesChanged()
614 func (cn *connection) peerSentHaveAll() error {
616 cn.peerPieces.Clear()
617 cn.peerPiecesChanged()
621 func (cn *connection) peerSentHaveNone() error {
622 cn.peerPieces.Clear()
623 cn.peerHasAll = false
624 cn.peerPiecesChanged()
628 func (c *connection) requestPendingMetadata() {
632 if c.PeerExtensionIDs["ut_metadata"] == 0 {
633 // Peer doesn't support this.
636 // Request metadata pieces that we don't have in a random order.
638 for index := 0; index < c.t.metadataPieceCount(); index++ {
639 if !c.t.haveMetadataPiece(index) && !c.requestedMetadataPiece(index) {
640 pending = append(pending, index)
643 for _, i := range rand.Perm(len(pending)) {
644 c.requestMetadataPiece(pending[i])
648 func (cn *connection) wroteMsg(msg pp.Message) {
649 cn.stats.wroteMsg(msg)
650 cn.t.stats.wroteMsg(msg)
653 func (cn *connection) wroteBytes(b []byte) {
654 cn.stats.wroteBytes(b)
655 cn.t.stats.wroteBytes(b)
658 // Returns whether the connection is currently useful to us. We're seeding and
659 // they want data, we don't have metainfo and they can provide it, etc.
660 func (c *connection) useful() bool {
662 if c.closed.IsSet() {
666 return c.supportsExtension("ut_metadata")
669 return c.PeerInterested
671 return t.connHasWantedPieces(c)
674 func (c *connection) lastHelpful() time.Time {
675 lasts := []time.Time{c.lastUsefulChunkReceived}
677 lasts = append(lasts, c.lastChunkSent)
679 return missinggo.Max(time.Time.Before, slices.ToEmptyInterface(lasts)...).(time.Time)