import (
"bufio"
"bytes"
- "container/list"
"errors"
- "expvar"
"fmt"
"io"
"log"
pp "github.com/anacrolix/torrent/peer_protocol"
)
-var optimizedCancels = expvar.NewInt("optimizedCancels")
-
type peerSource string
const (
// Stuff controlled by the local peer.
Interested bool
Choked bool
- Requests map[request]struct{}
+ requests map[request]struct{}
requestsLowWater int
// Indexed by metadata piece, set to true if posted and pending a
// response.
pieceInclination []int
pieceRequestOrder prioritybitmap.PriorityBitmap
- outgoingUnbufferedMessages *list.List
- outgoingUnbufferedMessagesNotEmpty missinggo.Event
+ postedBuffer bytes.Buffer
+ writerCond sync.Cond
}
func (cn *connection) mu() sync.Locker {
cn.UsefulChunksReceived,
cn.UnwantedChunksReceived+cn.UsefulChunksReceived,
cn.chunksSent,
- len(cn.Requests),
+ cn.numLocalRequests(),
len(cn.PeerRequests),
cn.statusFlags(),
)
}
func (cn *connection) Post(msg pp.Message) {
- switch msg.Type {
- case pp.Cancel:
- for e := cn.outgoingUnbufferedMessages.Back(); e != nil; e = e.Prev() {
- elemMsg := e.Value.(pp.Message)
- if elemMsg.Type == pp.Request && elemMsg.Index == msg.Index && elemMsg.Begin == msg.Begin && elemMsg.Length == msg.Length {
- cn.outgoingUnbufferedMessages.Remove(e)
- optimizedCancels.Add(1)
- return
- }
- }
- }
- if cn.outgoingUnbufferedMessages == nil {
- cn.outgoingUnbufferedMessages = list.New()
- }
- cn.outgoingUnbufferedMessages.PushBack(msg)
- cn.outgoingUnbufferedMessagesNotEmpty.Set()
- postedMessageTypes.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
-}
-
-func (cn *connection) RequestPending(r request) bool {
- _, ok := cn.Requests[r]
- return ok
+ messageTypesPosted.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
+ cn.postedBuffer.Write(msg.MustMarshalBinary())
+ cn.tickleWriter()
}
func (cn *connection) requestMetadataPiece(index int) {
return
}
-// Returns true if more requests can be sent.
-func (cn *connection) Request(chunk request) bool {
- if len(cn.Requests) >= cn.nominalMaxRequests() {
- return false
- }
- if !cn.PeerHasPiece(int(chunk.Index)) {
- return true
- }
- if cn.RequestPending(chunk) {
- return true
- }
- cn.SetInterested(true)
- if cn.PeerChoked {
- return false
- }
- if cn.Requests == nil {
- cn.Requests = make(map[request]struct{}, cn.PeerMaxRequests)
- }
- cn.Requests[chunk] = struct{}{}
- cn.requestsLowWater = len(cn.Requests) / 2
- cn.Post(pp.Message{
- Type: pp.Request,
- Index: chunk.Index,
- Begin: chunk.Begin,
- Length: chunk.Length,
- })
- return true
-}
-
-// Returns true if an unsatisfied request was canceled.
-func (cn *connection) Cancel(r request) bool {
- if !cn.RequestPending(r) {
- return false
- }
- delete(cn.Requests, r)
- cn.Post(pp.Message{
- Type: pp.Cancel,
- Index: r.Index,
- Begin: r.Begin,
- Length: r.Length,
- })
- return true
-}
-
// Returns true if an unsatisfied request was canceled.
func (cn *connection) PeerCancel(r request) bool {
if cn.PeerRequests == nil {
cn.Choked = false
}
-func (cn *connection) SetInterested(interested bool) {
+func (cn *connection) SetInterested(interested bool, msg func(pp.Message) bool) bool {
if cn.Interested == interested {
- return
+ return true
}
- cn.Post(pp.Message{
+ cn.Interested = interested
+ // log.Printf("%p: setting interest: %v", cn, interested)
+ return msg(pp.Message{
Type: func() pp.MessageType {
if interested {
return pp.Interested
}
}(),
})
- cn.Interested = interested
}
-var (
- // Track connection writer buffer writes and flushes, to determine its
- // efficiency.
- connectionWriterFlush = expvar.NewInt("connectionWriterFlush")
- connectionWriterWrite = expvar.NewInt("connectionWriterWrite")
-)
+func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) {
+ numFillBuffers.Add(1)
+ cancel, new, i := cn.desiredRequestState()
+ if !cn.SetInterested(i, msg) {
+ return
+ }
+ if cancel && len(cn.requests) != 0 {
+ fillBufferSentCancels.Add(1)
+ for r := range cn.requests {
+ cn.deleteRequest(r)
+ // log.Printf("%p: cancelling request: %v", cn, r)
+ if !msg(pp.Message{
+ Type: pp.Cancel,
+ Index: r.Index,
+ Begin: r.Begin,
+ Length: r.Length,
+ }) {
+ return
+ }
+ }
+ }
+ if len(new) != 0 {
+ fillBufferSentRequests.Add(1)
+ for _, r := range new {
+ if cn.requests == nil {
+ cn.requests = make(map[request]struct{}, cn.nominalMaxRequests())
+ }
+ cn.requests[r] = struct{}{}
+ // log.Printf("%p: requesting %v", cn, r)
+ if !msg(pp.Message{
+ Type: pp.Request,
+ Index: r.Index,
+ Begin: r.Begin,
+ Length: r.Length,
+ }) {
+ return
+ }
+ }
+ // If we didn't completely top up the requests, we shouldn't mark the
+ // low water, since we'll want to top up the requests as soon as we
+ // have more write buffer space.
+ cn.requestsLowWater = len(cn.requests) / 2
+ }
+}
// Writes buffers to the socket from the write channel.
func (cn *connection) writer(keepAliveTimeout time.Duration) {
- defer func() {
+ var (
+ buf bytes.Buffer
+ lastWrite time.Time = time.Now()
+ )
+ var keepAliveTimer *time.Timer
+ keepAliveTimer = time.AfterFunc(keepAliveTimeout, func() {
cn.mu().Lock()
defer cn.mu().Unlock()
- cn.Close()
- }()
- // Reduce write syscalls.
- buf := bufio.NewWriter(cn.w)
- keepAliveTimer := time.NewTimer(keepAliveTimeout)
+ if time.Since(lastWrite) >= keepAliveTimeout {
+ cn.tickleWriter()
+ }
+ keepAliveTimer.Reset(keepAliveTimeout)
+ })
+ cn.mu().Lock()
+ defer cn.mu().Unlock()
+ defer cn.Close()
+ defer keepAliveTimer.Stop()
for {
- cn.mu().Lock()
- for cn.outgoingUnbufferedMessages != nil && cn.outgoingUnbufferedMessages.Len() != 0 {
- msg := cn.outgoingUnbufferedMessages.Remove(cn.outgoingUnbufferedMessages.Front()).(pp.Message)
- cn.mu().Unlock()
- b, err := msg.MarshalBinary()
- if err != nil {
- panic(err)
- }
- connectionWriterWrite.Add(1)
- n, err := buf.Write(b)
- if err != nil {
- return
- }
- keepAliveTimer.Reset(keepAliveTimeout)
- if n != len(b) {
- panic("short write")
- }
- cn.mu().Lock()
- cn.wroteMsg(&msg)
+ buf.Write(cn.postedBuffer.Bytes())
+ cn.postedBuffer.Reset()
+ if buf.Len() == 0 {
+ cn.fillWriteBuffer(func(msg pp.Message) bool {
+ cn.wroteMsg(&msg)
+ buf.Write(msg.MustMarshalBinary())
+ return buf.Len() < 1<<16
+ })
+ }
+ if buf.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout {
+ buf.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
+ postedKeepalives.Add(1)
+ }
+ if buf.Len() == 0 {
+ cn.writerCond.Wait()
+ continue
}
- cn.outgoingUnbufferedMessagesNotEmpty.Clear()
cn.mu().Unlock()
- connectionWriterFlush.Add(1)
- if buf.Buffered() != 0 {
- if buf.Flush() != nil {
- return
- }
+ // log.Printf("writing %d bytes", buf.Len())
+ n, err := cn.w.Write(buf.Bytes())
+ cn.mu().Lock()
+ if n != 0 {
+ lastWrite = time.Now()
keepAliveTimer.Reset(keepAliveTimeout)
}
- select {
- case <-cn.closed.LockedChan(cn.mu()):
+ if err != nil {
return
- case <-cn.outgoingUnbufferedMessagesNotEmpty.LockedChan(cn.mu()):
- case <-keepAliveTimer.C:
- cn.mu().Lock()
- cn.Post(pp.Message{Keepalive: true})
- cn.mu().Unlock()
- postedKeepalives.Add(1)
}
+ if n != buf.Len() {
+ panic("short write")
+ }
+ buf.Reset()
}
}
requestsLowWater int,
requestsHighWater int,
) (
- requests map[request]struct{},
+ cancelExisting bool,
+ newRequests []request,
interested bool,
) {
if !networkingEnabled || nextPieces.IsEmpty() {
- return nil, false
+ return true, nil, false
}
if peerChoking || len(currentRequests) > requestsLowWater {
- return currentRequests, true
- }
- requests = make(map[request]struct{}, requestsHighWater)
- for r := range currentRequests {
- requests[r] = struct{}{}
+ return false, nil, !nextPieces.IsEmpty()
}
nextPieces.IterTyped(func(piece int) bool {
return pendingChunks(piece, func(cs chunkSpec) bool {
- if len(requests) >= requestsHighWater {
- return false
- }
r := request{pp.Integer(piece), cs}
- requests[r] = struct{}{}
- return true
+ if _, ok := currentRequests[r]; !ok {
+ if newRequests == nil {
+ newRequests = make([]request, 0, requestsHighWater-len(currentRequests))
+ }
+ newRequests = append(newRequests, r)
+ }
+ return len(currentRequests)+len(newRequests) < requestsHighWater
})
})
- return requests, true
+ return false, newRequests, true
}
func (cn *connection) updateRequests() {
- rs, i := nextRequestState(
+ cn.tickleWriter()
+}
+
+func (cn *connection) desiredRequestState() (bool, []request, bool) {
+ return nextRequestState(
cn.t.networkingEnabled,
- cn.Requests,
+ cn.requests,
cn.PeerChoked,
&cn.pieceRequestOrder,
func(piece int, f func(chunkSpec) bool) bool {
return undirtiedChunks(piece, cn.t, f)
},
cn.requestsLowWater,
- cn.nominalMaxRequests())
- for r := range cn.Requests {
- if _, ok := rs[r]; !ok {
- if !cn.Cancel(r) {
- panic("wat")
- }
- }
- }
- for r := range rs {
- if _, ok := cn.Requests[r]; !ok {
- if !cn.Request(r) {
- panic("how")
- }
- }
- }
- cn.SetInterested(i)
+ cn.nominalMaxRequests(),
+ )
}
func undirtiedChunks(piece int, t *Torrent, f func(chunkSpec) bool) bool {
})
}
-func (cn *connection) stopRequestingPiece(piece int) {
- cn.pieceRequestOrder.Remove(piece)
- cn.updateRequests()
+// check callers updaterequests
+func (cn *connection) stopRequestingPiece(piece int) bool {
+ return cn.pieceRequestOrder.Remove(piece)
}
// This is distinct from Torrent piece priority, which is the user's
// preference. Connection piece priority is specific to a connection,
// pseudorandomly avoids connections always requesting the same pieces and
// thus wasting effort.
-func (cn *connection) updatePiecePriority(piece int) {
+func (cn *connection) updatePiecePriority(piece int) bool {
tpp := cn.t.piecePriority(piece)
if !cn.PeerHasPiece(piece) {
tpp = PiecePriorityNone
}
if tpp == PiecePriorityNone {
- cn.stopRequestingPiece(piece)
- return
+ return cn.stopRequestingPiece(piece)
}
prio := cn.getPieceInclination()[piece]
switch tpp {
panic(tpp)
}
prio += piece / 3
- cn.pieceRequestOrder.Set(piece, prio)
- cn.updateRequests()
+ return cn.pieceRequestOrder.Set(piece, prio)
}
func (cn *connection) getPieceInclination() []int {
cn.pieceInclination = nil
}
-func (cn *connection) peerHasPieceChanged(piece int) {
- cn.updatePiecePriority(piece)
-}
-
func (cn *connection) peerPiecesChanged() {
if cn.t.haveInfo() {
+ prioritiesChanged := false
for i := range iter.N(cn.t.numPieces()) {
- cn.peerHasPieceChanged(i)
+ if cn.updatePiecePriority(i) {
+ prioritiesChanged = true
+ }
+ }
+ if prioritiesChanged {
+ cn.updateRequests()
}
}
}
}
cn.raisePeerMinPieces(piece + 1)
cn.peerPieces.Set(piece, true)
- cn.peerHasPieceChanged(piece)
+ if cn.updatePiecePriority(piece) {
+ cn.updateRequests()
+ }
return nil
}
}
func (cn *connection) wroteMsg(msg *pp.Message) {
+ messageTypesSent.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
cn.stats.wroteMsg(msg)
cn.t.stats.wroteMsg(msg)
}
Pool: t.chunkPool,
}
for {
- cl.mu.Unlock()
- var msg pp.Message
- err := decoder.Decode(&msg)
- cl.mu.Lock()
+ var (
+ msg pp.Message
+ err error
+ )
+ func() {
+ cl.mu.Unlock()
+ defer cl.mu.Lock()
+ err = decoder.Decode(&msg)
+ }()
if cl.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
return nil
}
receivedKeepalives.Add(1)
continue
}
- receivedMessageTypes.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
+ messageTypesReceived.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
switch msg.Type {
case pp.Choke:
c.PeerChoked = true
- c.Requests = nil
+ c.requests = nil
// We can then reset our interest.
c.updateRequests()
case pp.Reject:
- cl.connDeleteRequest(t, c, newRequest(msg.Index, msg.Begin, msg.Length))
- c.updateRequests()
+ if c.deleteRequest(newRequest(msg.Index, msg.Begin, msg.Length)) {
+ c.updateRequests()
+ }
case pp.Unchoke:
c.PeerChoked = false
- cl.peerUnchoked(t, c)
+ c.tickleWriter()
case pp.Interested:
c.PeerInterested = true
c.upload()
req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
// Request has been satisfied.
- if cl.connDeleteRequest(t, c, req) {
- defer c.updateRequests()
+ if c.deleteRequest(req) {
+ c.updateRequests()
} else {
unexpectedChunksReceived.Add(1)
}
// Cancel pending requests for this chunk.
for c := range t.conns {
- if cl.connCancel(t, c, req) {
- c.updateRequests()
- }
+ c.updateRequests()
}
cl.mu.Unlock()
func (c *connection) peerHasWantedPieces() bool {
return !c.pieceRequestOrder.IsEmpty()
}
+
+func (c *connection) numLocalRequests() int {
+ return len(c.requests)
+}
+
+func (c *connection) deleteRequest(r request) bool {
+ if _, ok := c.requests[r]; !ok {
+ return false
+ }
+ delete(c.requests, r)
+ return true
+}
+func (c *connection) tickleWriter() {
+ c.writerCond.Broadcast()
+}
package torrent
import (
- "container/list"
"io"
- "io/ioutil"
- "net"
"sync"
"testing"
"time"
- "github.com/anacrolix/missinggo/bitmap"
"github.com/anacrolix/missinggo/pubsub"
"github.com/bradfitz/iter"
"github.com/stretchr/testify/assert"
"github.com/anacrolix/torrent/storage"
)
-func TestCancelRequestOptimized(t *testing.T) {
- r, w := io.Pipe()
- c := &connection{
- PeerMaxRequests: 1,
- peerPieces: func() bitmap.Bitmap {
- var bm bitmap.Bitmap
- bm.Set(1, true)
- return bm
- }(),
- w: w,
- conn: new(net.TCPConn),
- // For the locks
- t: &Torrent{cl: &Client{}},
- }
- assert.Len(t, c.Requests, 0)
- c.Request(newRequest(1, 2, 3))
- require.Len(t, c.Requests, 1)
- // Posting this message should removing the pending Request.
- require.True(t, c.Cancel(newRequest(1, 2, 3)))
- assert.Len(t, c.Requests, 0)
- // Check that write optimization filters out the Request, due to the
- // Cancel. We should have received an Interested, due to the initial
- // request, and then keep-alives until we close the connection.
- go c.writer(0)
- b := make([]byte, 9)
- n, err := io.ReadFull(r, b)
- require.NoError(t, err)
- require.EqualValues(t, len(b), n)
- require.EqualValues(t, "\x00\x00\x00\x01\x02"+"\x00\x00\x00\x00", string(b))
- time.Sleep(time.Millisecond)
- c.mu().Lock()
- c.Close()
- c.mu().Unlock()
- w.Close()
- b, err = ioutil.ReadAll(r)
- require.NoError(t, err)
- // A single keep-alive may have gone through, as writer would be stuck
- // trying to flush it, and then promptly close.
- if s := string(b); s != "\x00\x00\x00\x00" && s != "" {
- t.Logf("expected zero or one keepalives, got %q", s)
- }
-}
-
// Ensure that no race exists between sending a bitfield, and a subsequent
// Have that would potentially alter it.
func TestSendBitfieldThenHave(t *testing.T) {
},
r: r,
w: w,
- outgoingUnbufferedMessages: list.New(),
}
+ c.writerCond.L = &c.t.cl.mu
go c.writer(time.Minute)
c.mu().Lock()
c.Bitfield([]bool{false, true, false})