13 "bitbucket.org/anacrolix/go.torrent/peer_protocol"
16 // Maintains the state of a connection with a peer.
17 type connection struct {
20 mu sync.Mutex // Only for closing.
21 post chan peer_protocol.Message
24 // Stuff controlled by the local peer.
27 Requests map[request]struct{}
29 // Stuff controlled by the remote peer.
33 PeerRequests map[request]struct{}
34 PeerExtensions [8]byte
36 PeerMaxRequests int // Maximum pending requests the peer allows.
37 PeerExtensionIDs map[string]int64
40 func (cn *connection) completedString() string {
41 if cn.PeerPieces == nil {
44 f := float32(cn.piecesPeerHasCount()) / float32(cn.totalPiecesCount())
45 return fmt.Sprintf("%d%%", int(f*100))
48 func (cn *connection) totalPiecesCount() int {
49 return len(cn.PeerPieces)
52 func (cn *connection) piecesPeerHasCount() (count int) {
53 for _, has := range cn.PeerPieces {
61 func (cn *connection) WriteStatus(w io.Writer) {
62 fmt.Fprintf(w, "%q: %s-%s: %s completed: ", cn.PeerId, cn.Socket.LocalAddr(), cn.Socket.RemoteAddr(), cn.completedString())
64 fmt.Fprintf(w, "%c", b)
66 // https://trac.transmissionbt.com/wiki/PeerStatusText
67 if len(cn.Requests) != 0 {
69 } else if cn.Interested {
72 if !cn.PeerChoked && !cn.Interested {
75 if !cn.Choked && !cn.PeerInterested {
81 func (c *connection) Close() {
92 func (c *connection) getClosed() bool {
98 func (c *connection) PeerHasPiece(index peer_protocol.Integer) bool {
99 if c.PeerPieces == nil {
102 return c.PeerPieces[index]
105 func (c *connection) Post(msg peer_protocol.Message) {
109 func (c *connection) RequestPending(r request) bool {
110 _, ok := c.Requests[r]
114 // Returns true if more requests can be sent.
115 func (c *connection) Request(chunk request) bool {
116 if len(c.Requests) >= c.PeerMaxRequests {
119 if !c.PeerHasPiece(chunk.Index) {
122 if c.RequestPending(chunk) {
125 c.SetInterested(true)
129 if c.Requests == nil {
130 c.Requests = make(map[request]struct{}, c.PeerMaxRequests)
132 c.Requests[chunk] = struct{}{}
133 c.Post(peer_protocol.Message{
134 Type: peer_protocol.Request,
137 Length: chunk.Length,
142 // Returns true if an unsatisfied request was canceled.
143 func (c *connection) Cancel(r request) bool {
144 if c.Requests == nil {
147 if _, ok := c.Requests[r]; !ok {
150 delete(c.Requests, r)
151 c.Post(peer_protocol.Message{
152 Type: peer_protocol.Cancel,
160 // Returns true if an unsatisfied request was canceled.
161 func (c *connection) PeerCancel(r request) bool {
162 if c.PeerRequests == nil {
165 if _, ok := c.PeerRequests[r]; !ok {
168 delete(c.PeerRequests, r)
172 func (c *connection) Choke() {
176 c.Post(peer_protocol.Message{
177 Type: peer_protocol.Choke,
182 func (c *connection) Unchoke() {
186 c.Post(peer_protocol.Message{
187 Type: peer_protocol.Unchoke,
192 func (c *connection) SetInterested(interested bool) {
193 if c.Interested == interested {
196 c.Post(peer_protocol.Message{
197 Type: func() peer_protocol.MessageType {
199 return peer_protocol.Interested
201 return peer_protocol.NotInterested
205 c.Interested = interested
209 // Four consecutive zero bytes that comprise a keep alive on the wire.
210 keepAliveBytes [4]byte
213 // Writes buffers to the socket from the write channel.
214 func (conn *connection) writer() {
215 for b := range conn.write {
216 _, err := conn.Socket.Write(b)
217 // log.Printf("wrote %q to %s", b, conn.Socket.RemoteAddr())
219 if !conn.getClosed() {
227 func (conn *connection) writeOptimizer(keepAliveDelay time.Duration) {
228 defer close(conn.write) // Responsible for notifying downstream routines.
229 pending := list.New() // Message queue.
230 var nextWrite []byte // Set to nil if we need to need to marshal the next message.
231 timer := time.NewTimer(keepAliveDelay)
233 lastWrite := time.Now()
235 write := conn.write // Set to nil if there's nothing to write.
236 if pending.Len() == 0 {
238 } else if nextWrite == nil {
240 nextWrite, err = pending.Front().Value.(encoding.BinaryMarshaler).MarshalBinary()
248 if pending.Len() != 0 {
251 keepAliveTime := lastWrite.Add(keepAliveDelay)
252 if time.Now().Before(keepAliveTime) {
253 timer.Reset(keepAliveTime.Sub(time.Now()))
256 pending.PushBack(peer_protocol.Message{Keepalive: true})
257 case msg, ok := <-conn.post:
261 if msg.Type == peer_protocol.Cancel {
262 for e := pending.Back(); e != nil; e = e.Prev() {
263 elemMsg := e.Value.(peer_protocol.Message)
264 if elemMsg.Type == peer_protocol.Request && msg.Index == elemMsg.Index && msg.Begin == elemMsg.Begin && msg.Length == elemMsg.Length {
266 log.Printf("optimized cancel! %v", msg)
271 pending.PushBack(msg)
272 case write <- nextWrite:
273 pending.Remove(pending.Front())
275 lastWrite = time.Now()
276 if pending.Len() == 0 {
277 timer.Reset(keepAliveDelay)