peerMinPieces int
// Pieces we've accepted chunks for from the peer.
peerTouchedPieces map[int]struct{}
+ peerAllowedFast bitmap.Bitmap
PeerMaxRequests int // Maximum pending requests the peer allows.
PeerExtensionIDs map[string]byte
return
}
-// Returns true if an unsatisfied request was canceled.
-func (cn *connection) PeerCancel(r request) bool {
- if cn.PeerRequests == nil {
- return false
- }
+func (cn *connection) onPeerSentCancel(r request) {
if _, ok := cn.PeerRequests[r]; !ok {
- return false
+ torrent.Add("unexpected cancels received", 1)
+ return
+ }
+ if cn.fastEnabled() {
+ cn.reject(r)
+ } else {
+ delete(cn.PeerRequests, r)
}
- delete(cn.PeerRequests, r)
- return true
}
-func (cn *connection) Choke(msg func(pp.Message) bool) bool {
+func (cn *connection) Choke(msg messageWriter) (more bool) {
if cn.Choked {
return true
}
- cn.PeerRequests = nil
cn.Choked = true
- return msg(pp.Message{
+ more = msg(pp.Message{
Type: pp.Choke,
})
+ if cn.fastEnabled() {
+ for r := range cn.PeerRequests {
+ // TODO: Don't reject pieces in allowed fast set.
+ cn.reject(r)
+ }
+ } else {
+ cn.PeerRequests = nil
+ }
+ return
}
func (cn *connection) Unchoke(msg func(pp.Message) bool) bool {
if _, ok := cn.t.conns[cn]; !ok {
panic("requesting but not in active conns")
}
+ if cn.PeerChoked {
+ if cn.peerAllowedFast.Get(int(r.Index)) {
+ torrent.Add("allowed fast requests sent", 1)
+ } else {
+ panic("requesting while choked and not allowed fast")
+ }
+ }
cn.t.pendingRequests[r]++
return mw(pp.Message{
Type: pp.Request,
iterPendingRequests func(f func(request) bool),
requestsLowWater int,
requestsHighWater int,
+ allowedFast bitmap.Bitmap,
) (
cancelExisting bool, // Cancel all our pending requests
newRequests []request, // Chunks to request that we currently aren't
iterPendingRequests(func(r request) bool {
interested = true
if peerChoking {
- return false
+ if allowedFast.IsEmpty() {
+ return false
+ }
+ if !allowedFast.Get(int(r.Index)) {
+ return true
+ }
}
if _, ok := currentRequests[r]; !ok {
if newRequests == nil {
cn.iterPendingRequests,
cn.requestsLowWater,
cn.nominalMaxRequests(),
+ cn.peerAllowedFast,
)
}
return c.PeerExtensionBytes.SupportsFast() && c.t.cl.extensionBytes.SupportsFast()
}
-// Returns true if we were able to reject the request.
-func (c *connection) reject(r request) bool {
+func (c *connection) reject(r request) {
if !c.fastEnabled() {
- return false
+ panic("fast not enabled")
}
- c.deleteRequest(r)
c.Post(r.ToMsg(pp.Reject))
- return true
+ delete(c.PeerRequests, r)
}
func (c *connection) onReadRequest(r request) error {
requestedChunkLengths.Add(strconv.FormatUint(r.Length.Uint64(), 10), 1)
if r.Begin+r.Length > c.t.pieceLength(int(r.Index)) {
+ torrent.Add("bad requests received", 1)
return errors.New("bad request")
}
if _, ok := c.PeerRequests[r]; ok {
+ torrent.Add("duplicate requests received", 1)
return nil
}
if c.Choked {
- c.reject(r)
+ torrent.Add("requests received while choking", 1)
+ if c.fastEnabled() {
+ torrent.Add("requests rejected while choking", 1)
+ c.reject(r)
+ }
return nil
}
if len(c.PeerRequests) >= maxRequests {
- c.reject(r)
+ torrent.Add("requests received while queue full", 1)
+ if c.fastEnabled() {
+ c.reject(r)
+ }
+ // BEP 6 says we may close here if we choose.
return nil
}
if !c.t.havePiece(r.Index.Int()) {
continue
}
messageTypesReceived.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
+ if msg.Type.FastExtension() && !c.fastEnabled() {
+ return fmt.Errorf("received fast extension message (type=%v) but extension is disabled", msg.Type)
+ }
switch msg.Type {
case pp.Choke:
c.PeerChoked = true
// We can then reset our interest.
c.updateRequests()
case pp.Reject:
- if c.deleteRequest(newRequestFromMessage(&msg)) {
- c.updateRequests()
- }
+ c.deleteRequest(newRequestFromMessage(&msg))
case pp.Unchoke:
c.PeerChoked = false
c.tickleWriter()
c.tickleWriter()
case pp.NotInterested:
c.PeerInterested = false
+ // TODO: Reject?
c.PeerRequests = nil
case pp.Have:
err = c.peerSentHave(int(msg.Index))
err = c.onReadRequest(r)
case pp.Cancel:
req := newRequestFromMessage(&msg)
- if !c.PeerCancel(req) {
- unexpectedCancels.Add(1)
- }
+ c.onPeerSentCancel(req)
case pp.Bitfield:
err = c.peerSentBitfield(msg.Bitfield)
case pp.HaveAll:
pingAddr.Port = int(msg.Port)
}
go cl.dHT.Ping(pingAddr, nil)
+ case pp.AllowedFast:
+ torrent.Add("allowed fasts received", 1)
+ log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c, debugLogValue).Log(c.t.logger)
+ c.peerAllowedFast.Add(int(msg.Index))
+ c.updateRequests()
+ case pp.Suggest:
+ torrent.Add("suggests received", 1)
+ log.Fmsg("peer suggested piece %d", msg.Index).AddValues(c, msg.Index, debugLogValue).Log(c.t.logger)
+ c.updateRequests()
default:
err = fmt.Errorf("received unknown message type: %#v", msg.Type)
}
unexpectedChunksReceived.Add(1)
}
+ if c.PeerChoked {
+ torrent.Add("chunks received while choked", 1)
+ if c.peerAllowedFast.Get(int(req.Index)) {
+ torrent.Add("chunks received due to allowed fast", 1)
+ }
+ }
+
// Do we actually want this chunk?
if !t.wantPiece(req) {
unwantedChunksReceived.Add(1)