16 pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
19 var optimizedCancels = expvar.NewInt("optimizedCancels")
24 peerSourceIncoming = 'I'
29 // Maintains the state of a connection with a peer.
30 type connection struct {
35 mu sync.Mutex // Only for closing.
40 UnwantedChunksReceived int
41 UsefulChunksReceived int
43 lastMessageReceived time.Time
44 completedHandshake time.Time
45 lastUsefulChunkReceived time.Time
47 // Stuff controlled by the local peer.
50 Requests map[request]struct{}
52 // Stuff controlled by the remote peer.
56 PeerRequests map[request]struct{}
57 PeerExtensionBytes peerExtensionBytes
58 // Whether the peer has the given piece. nil if they've not sent any
59 // related messages yet.
61 PeerMaxRequests int // Maximum pending requests the peer allows.
62 PeerExtensionIDs map[string]int64
66 func newConnection(sock net.Conn, peb peerExtensionBytes, peerID [20]byte, uTP bool) (c *connection) {
74 PeerExtensionBytes: peb,
77 closing: make(chan struct{}),
78 writeCh: make(chan []byte),
79 post: make(chan pp.Message),
81 completedHandshake: time.Now(),
84 go c.writeOptimizer(time.Minute)
88 func (cn *connection) completedString() string {
89 if cn.PeerPieces == nil {
92 // f := float32(cn.piecesPeerHasCount()) / float32(cn.totalPiecesCount())
93 // return fmt.Sprintf("%d%%", int(f*100))
94 return fmt.Sprintf("%d/%d", cn.piecesPeerHasCount(), cn.totalPiecesCount())
97 func (cn *connection) totalPiecesCount() int {
98 return len(cn.PeerPieces)
101 func (cn *connection) piecesPeerHasCount() (count int) {
102 for _, has := range cn.PeerPieces {
110 // Correct the PeerPieces slice length. Return false if the existing slice is
111 // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
113 func (cn *connection) setNumPieces(num int) error {
114 cn.initPieceOrder(num)
115 if cn.PeerPieces == nil {
118 if len(cn.PeerPieces) == num {
119 } else if len(cn.PeerPieces) < num {
120 cn.PeerPieces = append(cn.PeerPieces, make([]bool, num-len(cn.PeerPieces))...)
121 } else if len(cn.PeerPieces) <= (num+7)/8*8 {
122 for _, have := range cn.PeerPieces[num:] {
124 return errors.New("peer has invalid piece")
127 cn.PeerPieces = cn.PeerPieces[:num]
129 return fmt.Errorf("peer bitfield is excessively long: expected %d, have %d", num, len(cn.PeerPieces))
131 if len(cn.PeerPieces) != num {
137 func (cn *connection) initPieceOrder(numPieces int) {
138 if cn.pieceOrder == nil {
139 cn.pieceOrder = rand.Perm(numPieces)
141 if len(cn.pieceOrder) != numPieces {
142 panic("piece order initialized with wrong length")
146 func eventAgeString(t time.Time) string {
150 return fmt.Sprintf("%.2fs ago", time.Now().Sub(t).Seconds())
153 func (cn *connection) WriteStatus(w io.Writer) {
154 fmt.Fprintf(w, "%-90s: %s completed, good chunks: %d/%d reqs: %d-%d, last msg: %s, connected: %s, last useful chunk: %s, flags: ", fmt.Sprintf("%q: %s-%s", cn.PeerID, cn.Socket.LocalAddr(), cn.Socket.RemoteAddr()), cn.completedString(), cn.UsefulChunksReceived, cn.UnwantedChunksReceived+cn.UsefulChunksReceived, len(cn.Requests), len(cn.PeerRequests), eventAgeString(cn.lastMessageReceived), eventAgeString(cn.completedHandshake), eventAgeString(cn.lastUsefulChunkReceived))
156 fmt.Fprintf(w, "%c", b)
158 // Inspired by https://trac.transmissionbt.com/wiki/PeerStatusText
159 if len(cn.Requests) != 0 {
162 if cn.PeerChoked && cn.Interested {
166 if cn.PeerInterested {
172 if cn.Discovery != 0 {
173 c(byte(cn.Discovery))
181 func (c *connection) Close() {
190 // TODO: This call blocks sometimes, why?
194 func (c *connection) PeerHasPiece(index pp.Integer) bool {
195 if c.PeerPieces == nil {
198 if int(index) >= len(c.PeerPieces) {
201 return c.PeerPieces[index]
204 func (c *connection) Post(msg pp.Message) {
211 func (c *connection) RequestPending(r request) bool {
212 _, ok := c.Requests[r]
216 // Returns true if more requests can be sent.
217 func (c *connection) Request(chunk request) bool {
218 if len(c.Requests) >= c.PeerMaxRequests {
221 if !c.PeerHasPiece(chunk.Index) {
224 if c.RequestPending(chunk) {
227 c.SetInterested(true)
231 if c.Requests == nil {
232 c.Requests = make(map[request]struct{}, c.PeerMaxRequests)
234 c.Requests[chunk] = struct{}{}
239 Length: chunk.Length,
244 // Returns true if an unsatisfied request was canceled.
245 func (c *connection) Cancel(r request) bool {
246 if c.Requests == nil {
249 if _, ok := c.Requests[r]; !ok {
252 delete(c.Requests, r)
262 // Returns true if an unsatisfied request was canceled.
263 func (c *connection) PeerCancel(r request) bool {
264 if c.PeerRequests == nil {
267 if _, ok := c.PeerRequests[r]; !ok {
270 delete(c.PeerRequests, r)
274 func (c *connection) Choke() {
284 func (c *connection) Unchoke() {
294 func (c *connection) SetInterested(interested bool) {
295 if c.Interested == interested {
299 Type: func() pp.MessageType {
303 return pp.NotInterested
307 c.Interested = interested
311 // Four consecutive zero bytes that comprise a keep alive on the wire.
312 keepAliveBytes [4]byte
315 // Writes buffers to the socket from the write channel.
316 func (conn *connection) writer() {
317 // Reduce write syscalls.
318 buf := bufio.NewWriterSize(conn.Socket, 0x8000) // 32 KiB
319 // Returns immediately if the buffer contains data.
320 notEmpty := make(chan struct{}, 1)
322 if buf.Buffered() != 0 {
324 case notEmpty <- struct{}{}:
329 case b, ok := <-conn.writeCh:
333 _, err := buf.Write(b)
349 func (conn *connection) writeOptimizer(keepAliveDelay time.Duration) {
350 defer close(conn.writeCh) // Responsible for notifying downstream routines.
351 pending := list.New() // Message queue.
352 var nextWrite []byte // Set to nil if we need to need to marshal the next message.
353 timer := time.NewTimer(keepAliveDelay)
355 lastWrite := time.Now()
357 write := conn.writeCh // Set to nil if there's nothing to write.
358 if pending.Len() == 0 {
360 } else if nextWrite == nil {
362 nextWrite, err = pending.Front().Value.(encoding.BinaryMarshaler).MarshalBinary()
370 if pending.Len() != 0 {
373 keepAliveTime := lastWrite.Add(keepAliveDelay)
374 if time.Now().Before(keepAliveTime) {
375 timer.Reset(keepAliveTime.Sub(time.Now()))
378 pending.PushBack(pp.Message{Keepalive: true})
379 case msg, ok := <-conn.post:
383 if msg.Type == pp.Cancel {
384 for e := pending.Back(); e != nil; e = e.Prev() {
385 elemMsg := e.Value.(pp.Message)
386 if elemMsg.Type == pp.Request && msg.Index == elemMsg.Index && msg.Begin == elemMsg.Begin && msg.Length == elemMsg.Length {
388 optimizedCancels.Add(1)
393 pending.PushBack(msg)
394 case write <- nextWrite:
395 pending.Remove(pending.Front())
397 lastWrite = time.Now()
398 if pending.Len() == 0 {
399 timer.Reset(keepAliveDelay)