15 "github.com/anacrolix/missinggo"
16 "github.com/anacrolix/missinggo/itertools"
17 "github.com/anacrolix/missinggo/prioritybitmap"
19 "github.com/anacrolix/torrent/bencode"
20 pp "github.com/anacrolix/torrent/peer_protocol"
23 var optimizedCancels = expvar.NewInt("optimizedCancels")
28 peerSourceTracker = '\x00' // It's the default.
29 peerSourceIncoming = 'I'
34 // Maintains the state of a connection with a peer.
35 type connection struct {
38 rw io.ReadWriter // The real slim shady
42 closed missinggo.Event
46 UnwantedChunksReceived int
47 UsefulChunksReceived int
50 lastMessageReceived time.Time
51 completedHandshake time.Time
52 lastUsefulChunkReceived time.Time
53 lastChunkSent time.Time
55 // Stuff controlled by the local peer.
58 Requests map[request]struct{}
60 // Indexed by metadata piece, set to true if posted and pending a
62 metadataRequests []bool
65 // Stuff controlled by the remote peer.
69 PeerRequests map[request]struct{}
70 PeerExtensionBytes peerExtensionBytes
71 // Whether the peer has the given piece. nil if they've not sent any
72 // related messages yet.
75 // Pieces we've accepted chunks for from the peer.
76 peerTouchedPieces map[int]struct{}
78 PeerMaxRequests int // Maximum pending requests the peer allows.
79 PeerExtensionIDs map[string]byte
82 pieceInclination []int
83 pieceRequestOrder prioritybitmap.PriorityBitmap
86 func newConnection() (c *connection) {
92 writeCh: make(chan []byte),
93 post: make(chan pp.Message),
98 func (cn *connection) remoteAddr() net.Addr {
99 return cn.conn.RemoteAddr()
102 func (cn *connection) localAddr() net.Addr {
103 return cn.conn.LocalAddr()
106 func (cn *connection) supportsExtension(ext string) bool {
107 _, ok := cn.PeerExtensionIDs[ext]
111 func (cn *connection) completedString(t *torrent) string {
112 if cn.PeerPieces == nil && !cn.peerHasAll {
115 return fmt.Sprintf("%d/%d", func() int {
123 for _, b := range cn.PeerPieces {
130 if cn.peerHasAll || cn.PeerPieces == nil {
136 return len(cn.PeerPieces)
140 // Correct the PeerPieces slice length. Return false if the existing slice is
141 // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
143 func (cn *connection) setNumPieces(num int) error {
147 if cn.PeerPieces == nil {
150 if len(cn.PeerPieces) == num {
151 } else if len(cn.PeerPieces) < num {
152 cn.PeerPieces = append(cn.PeerPieces, make([]bool, num-len(cn.PeerPieces))...)
153 } else if len(cn.PeerPieces) <= (num+7)/8*8 {
154 for _, have := range cn.PeerPieces[num:] {
156 return errors.New("peer has invalid piece")
159 cn.PeerPieces = cn.PeerPieces[:num]
161 return fmt.Errorf("peer bitfield is excessively long: expected %d, have %d", num, len(cn.PeerPieces))
163 if len(cn.PeerPieces) != num {
169 func eventAgeString(t time.Time) string {
173 return fmt.Sprintf("%.2fs ago", time.Now().Sub(t).Seconds())
176 func (cn *connection) connectionFlags() (ret string) {
178 ret += string([]byte{b})
183 if cn.Discovery != 0 {
184 c(byte(cn.Discovery))
192 // Inspired by https://trac.transmissionbt.com/wiki/PeerStatusText
193 func (cn *connection) statusFlags() (ret string) {
195 ret += string([]byte{b})
204 ret += cn.connectionFlags()
206 if cn.PeerInterested {
215 func (cn *connection) String() string {
217 cn.WriteStatus(&buf, nil)
221 func (cn *connection) WriteStatus(w io.Writer, t *torrent) {
222 // \t isn't preserved in <pre> blocks?
223 fmt.Fprintf(w, "%+q: %s-%s\n", cn.PeerID, cn.localAddr(), cn.remoteAddr())
224 fmt.Fprintf(w, " last msg: %s, connected: %s, last useful chunk: %s\n",
225 eventAgeString(cn.lastMessageReceived),
226 eventAgeString(cn.completedHandshake),
227 eventAgeString(cn.lastUsefulChunkReceived))
229 " %s completed, %d pieces touched, good chunks: %d/%d-%d reqq: %d-%d, flags: %s\n",
230 cn.completedString(t),
231 len(cn.peerTouchedPieces),
232 cn.UsefulChunksReceived,
233 cn.UnwantedChunksReceived+cn.UsefulChunksReceived,
236 len(cn.PeerRequests),
241 func (c *connection) Close() {
243 c.discardPieceInclination()
244 // TODO: This call blocks sometimes, why?
248 func (c *connection) PeerHasPiece(piece int) bool {
252 if piece >= len(c.PeerPieces) {
255 return c.PeerPieces[piece]
258 func (c *connection) Post(msg pp.Message) {
265 func (c *connection) RequestPending(r request) bool {
266 _, ok := c.Requests[r]
270 func (c *connection) requestMetadataPiece(index int) {
271 eID := c.PeerExtensionIDs["ut_metadata"]
275 if index < len(c.metadataRequests) && c.metadataRequests[index] {
281 ExtendedPayload: func() []byte {
282 b, err := bencode.Marshal(map[string]int{
283 "msg_type": pp.RequestMetadataExtensionMsgType,
292 for index >= len(c.metadataRequests) {
293 c.metadataRequests = append(c.metadataRequests, false)
295 c.metadataRequests[index] = true
298 func (c *connection) requestedMetadataPiece(index int) bool {
299 return index < len(c.metadataRequests) && c.metadataRequests[index]
302 // Returns true if more requests can be sent.
303 func (c *connection) Request(chunk request) bool {
304 if len(c.Requests) >= c.PeerMaxRequests {
307 if !c.PeerHasPiece(int(chunk.Index)) {
310 if c.RequestPending(chunk) {
313 c.SetInterested(true)
317 if c.Requests == nil {
318 c.Requests = make(map[request]struct{}, c.PeerMaxRequests)
320 c.Requests[chunk] = struct{}{}
321 c.requestsLowWater = len(c.Requests) / 2
326 Length: chunk.Length,
331 // Returns true if an unsatisfied request was canceled.
332 func (c *connection) Cancel(r request) bool {
333 if c.Requests == nil {
336 if _, ok := c.Requests[r]; !ok {
339 delete(c.Requests, r)
349 // Returns true if an unsatisfied request was canceled.
350 func (c *connection) PeerCancel(r request) bool {
351 if c.PeerRequests == nil {
354 if _, ok := c.PeerRequests[r]; !ok {
357 delete(c.PeerRequests, r)
361 func (c *connection) Choke() {
372 func (c *connection) Unchoke() {
382 func (c *connection) SetInterested(interested bool) {
383 if c.Interested == interested {
387 Type: func() pp.MessageType {
391 return pp.NotInterested
395 c.Interested = interested
399 // Track connection writer buffer writes and flushes, to determine its
401 connectionWriterFlush = expvar.NewInt("connectionWriterFlush")
402 connectionWriterWrite = expvar.NewInt("connectionWriterWrite")
405 // Writes buffers to the socket from the write channel.
406 func (conn *connection) writer() {
407 // Reduce write syscalls.
408 buf := bufio.NewWriter(conn.rw)
410 if buf.Buffered() == 0 {
411 // There's nothing to write, so block until we get something.
413 case b, ok := <-conn.writeCh:
417 connectionWriterWrite.Add(1)
418 _, err := buf.Write(b)
423 case <-conn.closed.C():
427 // We already have something to write, so flush if there's nothing
430 case b, ok := <-conn.writeCh:
434 connectionWriterWrite.Add(1)
435 _, err := buf.Write(b)
440 case <-conn.closed.C():
443 connectionWriterFlush.Add(1)
454 func (conn *connection) writeOptimizer(keepAliveDelay time.Duration) {
455 defer close(conn.writeCh) // Responsible for notifying downstream routines.
456 pending := list.New() // Message queue.
457 var nextWrite []byte // Set to nil if we need to need to marshal the next message.
458 timer := time.NewTimer(keepAliveDelay)
460 lastWrite := time.Now()
462 write := conn.writeCh // Set to nil if there's nothing to write.
463 if pending.Len() == 0 {
465 } else if nextWrite == nil {
467 nextWrite, err = pending.Front().Value.(encoding.BinaryMarshaler).MarshalBinary()
475 if pending.Len() != 0 {
478 keepAliveTime := lastWrite.Add(keepAliveDelay)
479 if time.Now().Before(keepAliveTime) {
480 timer.Reset(keepAliveTime.Sub(time.Now()))
483 pending.PushBack(pp.Message{Keepalive: true})
484 case msg, ok := <-conn.post:
488 if msg.Type == pp.Cancel {
489 for e := pending.Back(); e != nil; e = e.Prev() {
490 elemMsg := e.Value.(pp.Message)
491 if elemMsg.Type == pp.Request && msg.Index == elemMsg.Index && msg.Begin == elemMsg.Begin && msg.Length == elemMsg.Length {
493 optimizedCancels.Add(1)
498 pending.PushBack(msg)
499 case write <- nextWrite:
500 pending.Remove(pending.Front())
502 lastWrite = time.Now()
503 if pending.Len() == 0 {
504 timer.Reset(keepAliveDelay)
506 case <-conn.closed.C():
512 func (cn *connection) Have(piece int) {
513 for piece >= len(cn.sentHaves) {
514 cn.sentHaves = append(cn.sentHaves, false)
516 if cn.sentHaves[piece] {
521 Index: pp.Integer(piece),
523 cn.sentHaves[piece] = true
526 func (cn *connection) Bitfield(haves []bool) {
527 if cn.sentHaves != nil {
528 panic("bitfield must be first have-related message sent")
537 func (c *connection) updateRequests() {
545 if len(c.Requests) > c.requestsLowWater {
550 if len(c.Requests) == 0 && !c.PeerChoked {
551 // So we're not choked, but we don't want anything right now. We may
552 // have completed readahead, and the readahead window has not rolled
553 // over to the next piece. Better to stay interested in case we're
554 // going to want data in the near future.
555 c.SetInterested(!c.t.haveAllPieces())
559 func (c *connection) fillRequests() {
560 itertools.ForIterable(&c.pieceRequestOrder, func(_piece interface{}) (more bool) {
561 return c.requestPiecePendingChunks(_piece.(int))
565 func (c *connection) requestPiecePendingChunks(piece int) (again bool) {
566 return c.t.connRequestPiecePendingChunks(c, piece)
569 func (c *connection) stopRequestingPiece(piece int) {
570 c.pieceRequestOrder.Remove(piece)
573 func (c *connection) updatePiecePriority(piece int) {
574 if !c.PeerHasPiece(piece) {
577 tpp := c.t.piecePriority(piece)
578 if tpp == PiecePriorityNone {
579 c.stopRequestingPiece(piece)
582 prio := c.getPieceInclination()[piece]
584 case PiecePriorityNormal:
585 case PiecePriorityReadahead:
586 prio -= c.t.numPieces()
587 case PiecePriorityNext, PiecePriorityNow:
588 prio -= 2 * c.t.numPieces()
592 c.pieceRequestOrder.Set(piece, prio)
596 func (c *connection) getPieceInclination() []int {
597 if c.pieceInclination == nil {
598 c.pieceInclination = c.t.getConnPieceInclination()
600 return c.pieceInclination
603 func (c *connection) discardPieceInclination() {
604 if c.pieceInclination == nil {
607 c.t.putPieceInclination(c.pieceInclination)
608 c.pieceInclination = nil