From: Matt Joiner Date: Sun, 4 Feb 2018 13:18:38 +0000 (+1100) Subject: Support AllowedFast and enable fast extension X-Git-Tag: v1.0.0~202 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=20316e5af4662d43071e22bc9821888bf37a71a8;p=btrtrc.git Support AllowedFast and enable fast extension --- diff --git a/connection.go b/connection.go index a49f4fa2..16888aa0 100644 --- a/connection.go +++ b/connection.go @@ -85,6 +85,7 @@ type connection struct { peerMinPieces int // Pieces we've accepted chunks for from the peer. peerTouchedPieces map[int]struct{} + peerAllowedFast bitmap.Bitmap PeerMaxRequests int // Maximum pending requests the peer allows. PeerExtensionIDs map[string]byte @@ -303,27 +304,35 @@ func (cn *connection) nominalMaxRequests() (ret int) { return } -// Returns true if an unsatisfied request was canceled. -func (cn *connection) PeerCancel(r request) bool { - if cn.PeerRequests == nil { - return false - } +func (cn *connection) onPeerSentCancel(r request) { if _, ok := cn.PeerRequests[r]; !ok { - return false + torrent.Add("unexpected cancels received", 1) + return + } + if cn.fastEnabled() { + cn.reject(r) + } else { + delete(cn.PeerRequests, r) } - delete(cn.PeerRequests, r) - return true } -func (cn *connection) Choke(msg func(pp.Message) bool) bool { +func (cn *connection) Choke(msg messageWriter) (more bool) { if cn.Choked { return true } - cn.PeerRequests = nil cn.Choked = true - return msg(pp.Message{ + more = msg(pp.Message{ Type: pp.Choke, }) + if cn.fastEnabled() { + for r := range cn.PeerRequests { + // TODO: Don't reject pieces in allowed fast set. + cn.reject(r) + } + } else { + cn.PeerRequests = nil + } + return } func (cn *connection) Unchoke(msg func(pp.Message) bool) bool { @@ -372,6 +381,13 @@ func (cn *connection) request(r request, mw messageWriter) bool { if _, ok := cn.t.conns[cn]; !ok { panic("requesting but not in active conns") } + if cn.PeerChoked { + if cn.peerAllowedFast.Get(int(r.Index)) { + torrent.Add("allowed fast requests sent", 1) + } else { + panic("requesting while choked and not allowed fast") + } + } cn.t.pendingRequests[r]++ return mw(pp.Message{ Type: pp.Request, @@ -505,6 +521,7 @@ func nextRequestState( iterPendingRequests func(f func(request) bool), requestsLowWater int, requestsHighWater int, + allowedFast bitmap.Bitmap, ) ( cancelExisting bool, // Cancel all our pending requests newRequests []request, // Chunks to request that we currently aren't @@ -519,7 +536,12 @@ func nextRequestState( iterPendingRequests(func(r request) bool { interested = true if peerChoking { - return false + if allowedFast.IsEmpty() { + return false + } + if !allowedFast.Get(int(r.Index)) { + return true + } } if _, ok := currentRequests[r]; !ok { if newRequests == nil { @@ -632,6 +654,7 @@ func (cn *connection) desiredRequestState() (bool, []request, bool) { cn.iterPendingRequests, cn.requestsLowWater, cn.nominalMaxRequests(), + cn.peerAllowedFast, ) } @@ -840,30 +863,38 @@ func (c *connection) fastEnabled() bool { return c.PeerExtensionBytes.SupportsFast() && c.t.cl.extensionBytes.SupportsFast() } -// Returns true if we were able to reject the request. -func (c *connection) reject(r request) bool { +func (c *connection) reject(r request) { if !c.fastEnabled() { - return false + panic("fast not enabled") } - c.deleteRequest(r) c.Post(r.ToMsg(pp.Reject)) - return true + delete(c.PeerRequests, r) } func (c *connection) onReadRequest(r request) error { requestedChunkLengths.Add(strconv.FormatUint(r.Length.Uint64(), 10), 1) if r.Begin+r.Length > c.t.pieceLength(int(r.Index)) { + torrent.Add("bad requests received", 1) return errors.New("bad request") } if _, ok := c.PeerRequests[r]; ok { + torrent.Add("duplicate requests received", 1) return nil } if c.Choked { - c.reject(r) + torrent.Add("requests received while choking", 1) + if c.fastEnabled() { + torrent.Add("requests rejected while choking", 1) + c.reject(r) + } return nil } if len(c.PeerRequests) >= maxRequests { - c.reject(r) + torrent.Add("requests received while queue full", 1) + if c.fastEnabled() { + c.reject(r) + } + // BEP 6 says we may close here if we choose. return nil } if !c.t.havePiece(r.Index.Int()) { @@ -915,6 +946,9 @@ func (c *connection) mainReadLoop() error { continue } messageTypesReceived.Add(strconv.FormatInt(int64(msg.Type), 10), 1) + if msg.Type.FastExtension() && !c.fastEnabled() { + return fmt.Errorf("received fast extension message (type=%v) but extension is disabled", msg.Type) + } switch msg.Type { case pp.Choke: c.PeerChoked = true @@ -922,9 +956,7 @@ func (c *connection) mainReadLoop() error { // We can then reset our interest. c.updateRequests() case pp.Reject: - if c.deleteRequest(newRequestFromMessage(&msg)) { - c.updateRequests() - } + c.deleteRequest(newRequestFromMessage(&msg)) case pp.Unchoke: c.PeerChoked = false c.tickleWriter() @@ -933,6 +965,7 @@ func (c *connection) mainReadLoop() error { c.tickleWriter() case pp.NotInterested: c.PeerInterested = false + // TODO: Reject? c.PeerRequests = nil case pp.Have: err = c.peerSentHave(int(msg.Index)) @@ -941,9 +974,7 @@ func (c *connection) mainReadLoop() error { err = c.onReadRequest(r) case pp.Cancel: req := newRequestFromMessage(&msg) - if !c.PeerCancel(req) { - unexpectedCancels.Add(1) - } + c.onPeerSentCancel(req) case pp.Bitfield: err = c.peerSentBitfield(msg.Bitfield) case pp.HaveAll: @@ -969,6 +1000,15 @@ func (c *connection) mainReadLoop() error { pingAddr.Port = int(msg.Port) } go cl.dHT.Ping(pingAddr, nil) + case pp.AllowedFast: + torrent.Add("allowed fasts received", 1) + log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c, debugLogValue).Log(c.t.logger) + c.peerAllowedFast.Add(int(msg.Index)) + c.updateRequests() + case pp.Suggest: + torrent.Add("suggests received", 1) + log.Fmsg("peer suggested piece %d", msg.Index).AddValues(c, msg.Index, debugLogValue).Log(c.t.logger) + c.updateRequests() default: err = fmt.Errorf("received unknown message type: %#v", msg.Type) } @@ -1117,6 +1157,13 @@ func (c *connection) receiveChunk(msg *pp.Message) { unexpectedChunksReceived.Add(1) } + if c.PeerChoked { + torrent.Add("chunks received while choked", 1) + if c.peerAllowedFast.Get(int(req.Index)) { + torrent.Add("chunks received due to allowed fast", 1) + } + } + // Do we actually want this chunk? if !t.wantPiece(req) { unwantedChunksReceived.Add(1) diff --git a/global.go b/global.go index 8012fe61..9855f8ac 100644 --- a/global.go +++ b/global.go @@ -17,7 +17,7 @@ const ( ) func defaultPeerExtensionBytes() peerExtensionBytes { - return newPeerExtensionBytes(ExtensionBitDHT, ExtensionBitExtended) + return newPeerExtensionBytes(ExtensionBitDHT, ExtensionBitExtended, ExtensionBitFast) } // I could move a lot of these counters to their own file, but I suspect they @@ -27,10 +27,11 @@ var ( unexpectedChunksReceived = expvar.NewInt("chunksReceivedUnexpected") chunksReceived = expvar.NewInt("chunksReceived") + torrent = expvar.NewMap("torrent") + peersAddedBySource = expvar.NewMap("peersAddedBySource") uploadChunksPosted = expvar.NewInt("uploadChunksPosted") - unexpectedCancels = expvar.NewInt("unexpectedCancels") pieceHashedCorrect = expvar.NewInt("pieceHashedCorrect") pieceHashedNotCorrect = expvar.NewInt("pieceHashedNotCorrect") diff --git a/handshake_test.go b/handshake_test.go index cc761399..8c2c6d24 100644 --- a/handshake_test.go +++ b/handshake_test.go @@ -3,14 +3,13 @@ package torrent import ( "testing" - "github.com/anacrolix/missinggo" "github.com/stretchr/testify/assert" ) func TestDefaultExtensionBytes(t *testing.T) { - var pex peerExtensionBytes - missinggo.CopyExact(&pex, defaultPeerExtensionBytes()) + pex := defaultPeerExtensionBytes() assert.True(t, pex.SupportsDHT()) assert.True(t, pex.SupportsExtended()) - assert.False(t, pex.SupportsFast()) + assert.False(t, pex.GetBit(63)) + assert.Panics(t, func() { pex.GetBit(64) }) } diff --git a/peer_protocol/decoder.go b/peer_protocol/decoder.go index 3f66cdc9..37758fd8 100644 --- a/peer_protocol/decoder.go +++ b/peer_protocol/decoder.go @@ -54,7 +54,7 @@ func (d *Decoder) Decode(msg *Message) (err error) { switch msg.Type { case Choke, Unchoke, Interested, NotInterested, HaveAll, HaveNone: return - case Have: + case Have, AllowedFast, Suggest: err = msg.Index.Read(r) case Request, Cancel, Reject: for _, data := range []*Integer{&msg.Index, &msg.Begin, &msg.Length} { diff --git a/peer_protocol/protocol.go b/peer_protocol/protocol.go index 13d76def..aa53ffdf 100644 --- a/peer_protocol/protocol.go +++ b/peer_protocol/protocol.go @@ -16,6 +16,10 @@ func (me MessageType) String() string { return strconv.FormatInt(int64(me), 10) } +func (mt MessageType) FastExtension() bool { + return mt >= Suggest && mt <= AllowedFast +} + const ( Choke MessageType = iota Unchoke // 1