// Stuff controlled by the local peer.
Interested bool
Choked bool
- Requests map[request]struct{}
+ requests map[request]struct{}
requestsLowWater int
// Indexed by metadata piece, set to true if posted and pending a
// response.
cn.UsefulChunksReceived,
cn.UnwantedChunksReceived+cn.UsefulChunksReceived,
cn.chunksSent,
- len(cn.Requests),
+ cn.numLocalRequests(),
len(cn.PeerRequests),
cn.statusFlags(),
)
}
func (cn *connection) RequestPending(r request) bool {
- _, ok := cn.Requests[r]
+ _, ok := cn.requests[r]
return ok
}
return
}
-// Returns true if more requests can be sent.
-func (cn *connection) Request(chunk request) bool {
- if len(cn.Requests) >= cn.nominalMaxRequests() {
- return false
- }
- if !cn.PeerHasPiece(int(chunk.Index)) {
- return true
- }
- if cn.RequestPending(chunk) {
- return true
- }
- cn.SetInterested(true)
- if cn.PeerChoked {
- return false
- }
- if cn.Requests == nil {
- cn.Requests = make(map[request]struct{}, cn.PeerMaxRequests)
- }
- cn.Requests[chunk] = struct{}{}
- cn.requestsLowWater = len(cn.Requests) / 2
- cn.Post(pp.Message{
- Type: pp.Request,
- Index: chunk.Index,
- Begin: chunk.Begin,
- Length: chunk.Length,
- })
- return true
-}
-
-// Returns true if an unsatisfied request was canceled.
-func (cn *connection) Cancel(r request) bool {
- if !cn.RequestPending(r) {
- return false
- }
- delete(cn.Requests, r)
- cn.Post(pp.Message{
- Type: pp.Cancel,
- Index: r.Index,
- Begin: r.Begin,
- Length: r.Length,
- })
- return true
-}
-
// Returns true if an unsatisfied request was canceled.
func (cn *connection) PeerCancel(r request) bool {
if cn.PeerRequests == nil {
cn.Choked = false
}
-func (cn *connection) SetInterested(interested bool) {
+func (cn *connection) SetInterested(interested bool, msg func(pp.Message) bool) bool {
if cn.Interested == interested {
- return
+ return true
}
- cn.Post(pp.Message{
+ cn.Interested = interested
+ // log.Printf("%p: setting interest: %v", cn, interested)
+ return msg(pp.Message{
Type: func() pp.MessageType {
if interested {
return pp.Interested
}
}(),
})
- cn.Interested = interested
}
var (
connectionWriterWrite = expvar.NewInt("connectionWriterWrite")
)
+func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) {
+ rs, i := cn.desiredRequestState()
+ if !cn.SetInterested(i, msg) {
+ return
+ }
+ for r := range cn.requests {
+ if _, ok := rs[r]; !ok {
+ delete(cn.requests, r)
+ // log.Printf("%p: cancelling request: %v", cn, r)
+ if !msg(pp.Message{
+ Type: pp.Cancel,
+ Index: r.Index,
+ Begin: r.Begin,
+ Length: r.Length,
+ }) {
+ return
+ }
+ }
+ }
+ for r := range rs {
+ if _, ok := cn.requests[r]; !ok {
+ if cn.requests == nil {
+ cn.requests = make(map[request]struct{}, cn.nominalMaxRequests())
+ }
+ cn.requests[r] = struct{}{}
+ // log.Printf("%p: requesting %v", cn, r)
+ if !msg(pp.Message{
+ Type: pp.Request,
+ Index: r.Index,
+ Begin: r.Begin,
+ Length: r.Length,
+ }) {
+ return
+ }
+ }
+ }
+}
+
// Writes buffers to the socket from the write channel.
func (cn *connection) writer(keepAliveTimeout time.Duration) {
var (
for {
buf.Write(cn.postedBuffer.Bytes())
cn.postedBuffer.Reset()
+ if buf.Len() == 0 {
+ cn.fillWriteBuffer(func(msg pp.Message) bool {
+ buf.Write(msg.MustMarshalBinary())
+ return buf.Len() < 1<<16
+ })
+ }
if buf.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout {
buf.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
postedKeepalives.Add(1)
}
func (cn *connection) updateRequests() {
- rs, i := nextRequestState(
+ cn.writerCond.Broadcast()
+}
+
+func (cn *connection) desiredRequestState() (map[request]struct{}, bool) {
+ return nextRequestState(
cn.t.networkingEnabled,
- cn.Requests,
+ cn.requests,
cn.PeerChoked,
&cn.pieceRequestOrder,
func(piece int, f func(chunkSpec) bool) bool {
return undirtiedChunks(piece, cn.t, f)
},
cn.requestsLowWater,
- cn.nominalMaxRequests())
- for r := range cn.Requests {
- if _, ok := rs[r]; !ok {
- if !cn.Cancel(r) {
- panic("wat")
- }
- }
- }
- for r := range rs {
- if _, ok := cn.Requests[r]; !ok {
- if !cn.Request(r) {
- panic("how")
- }
- }
- }
- cn.SetInterested(i)
+ cn.nominalMaxRequests(),
+ )
}
func undirtiedChunks(piece int, t *Torrent, f func(chunkSpec) bool) bool {
func (cn *connection) stopRequestingPiece(piece int) {
cn.pieceRequestOrder.Remove(piece)
- cn.updateRequests()
+ cn.writerCond.Broadcast()
}
// This is distinct from Torrent piece priority, which is the user's
switch msg.Type {
case pp.Choke:
c.PeerChoked = true
- c.Requests = nil
+ c.requests = nil
// We can then reset our interest.
c.updateRequests()
case pp.Reject:
c.updateRequests()
case pp.Unchoke:
c.PeerChoked = false
- cl.peerUnchoked(t, c)
+ c.writerCond.Broadcast()
case pp.Interested:
c.PeerInterested = true
c.upload()
// Cancel pending requests for this chunk.
for c := range t.conns {
- if cl.connCancel(t, c, req) {
- c.updateRequests()
- }
+ c.updateRequests()
}
cl.mu.Unlock()
func (c *connection) peerHasWantedPieces() bool {
return !c.pieceRequestOrder.IsEmpty()
}
+
+func (c *connection) numLocalRequests() int {
+ return len(c.requests)
+}
import (
"io"
- "io/ioutil"
- "net"
"sync"
"testing"
"time"
- "github.com/anacrolix/missinggo/bitmap"
"github.com/anacrolix/missinggo/pubsub"
"github.com/bradfitz/iter"
"github.com/stretchr/testify/assert"
"github.com/anacrolix/torrent/storage"
)
-func TestCancelRequestOptimized(t *testing.T) {
- r, w := io.Pipe()
- c := &connection{
- PeerMaxRequests: 1,
- peerPieces: func() bitmap.Bitmap {
- var bm bitmap.Bitmap
- bm.Set(1, true)
- return bm
- }(),
- w: w,
- conn: new(net.TCPConn),
- // For the locks
- t: &Torrent{cl: &Client{}},
- }
- assert.Len(t, c.Requests, 0)
- c.Request(newRequest(1, 2, 3))
- require.Len(t, c.Requests, 1)
- // Posting this message should removing the pending Request.
- require.True(t, c.Cancel(newRequest(1, 2, 3)))
- assert.Len(t, c.Requests, 0)
- // Check that write optimization filters out the Request, due to the
- // Cancel. We should have received an Interested, due to the initial
- // request, and then keep-alives until we close the connection.
- go c.writer(0)
- b := make([]byte, 9)
- n, err := io.ReadFull(r, b)
- require.NoError(t, err)
- require.EqualValues(t, len(b), n)
- require.EqualValues(t, "\x00\x00\x00\x01\x02"+"\x00\x00\x00\x00", string(b))
- time.Sleep(time.Millisecond)
- c.mu().Lock()
- c.Close()
- c.mu().Unlock()
- w.Close()
- b, err = ioutil.ReadAll(r)
- require.NoError(t, err)
- // A single keep-alive may have gone through, as writer would be stuck
- // trying to flush it, and then promptly close.
- if s := string(b); s != "\x00\x00\x00\x00" && s != "" {
- t.Logf("expected zero or one keepalives, got %q", s)
- }
-}
-
// Ensure that no race exists between sending a bitfield, and a subsequent
// Have that would potentially alter it.
func TestSendBitfieldThenHave(t *testing.T) {