From: Matt Joiner Date: Sun, 24 Jun 2018 10:04:31 +0000 (+1000) Subject: Add requestStrategy 3, which duplicates requests only after a timeout X-Git-Tag: v1.0.0~123 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=195695042d6e067b124f96debb89464721783c75;p=btrtrc.git Add requestStrategy 3, which duplicates requests only after a timeout Possible solution for #253. --- diff --git a/client.go b/client.go index bc90f0cb..6cc9597f 100644 --- a/client.go +++ b/client.go @@ -975,6 +975,8 @@ func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) ( metadataChanged: sync.Cond{ L: &cl.mu, }, + lastRequested: make(map[request]time.Time), + duplicateRequestTimeout: 15 * time.Second, } t.logger = cl.logger.Clone().AddValue(t) t.setChunkSize(defaultChunkSize) diff --git a/connection.go b/connection.go index 53bf2116..85aaabdc 100644 --- a/connection.go +++ b/connection.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "io" + "math" "math/rand" "net" "strconv" @@ -68,6 +69,10 @@ type connection struct { lastBecameInterested time.Time priorInterest time.Duration + lastStartedExpectingToReceiveChunks time.Time + cumulativeExpectedToReceiveChunks time.Duration + chunksReceivedWhileExpecting int64 + Choked bool requests map[request]struct{} requestsLowWater int @@ -107,6 +112,23 @@ type connection struct { writerCond sync.Cond } +func (cn *connection) updateExpectingChunks() { + if cn.expectingChunks() { + if cn.lastStartedExpectingToReceiveChunks.IsZero() { + cn.lastStartedExpectingToReceiveChunks = time.Now() + } + } else { + if !cn.lastStartedExpectingToReceiveChunks.IsZero() { + cn.cumulativeExpectedToReceiveChunks += time.Since(cn.lastStartedExpectingToReceiveChunks) + cn.lastStartedExpectingToReceiveChunks = time.Time{} + } + } +} + +func (cn *connection) expectingChunks() bool { + return cn.Interested && !cn.PeerChoked +} + // Returns true if the connection is over IPv6. func (cn *connection) ipv6() bool { ip := missinggo.AddrIP(cn.remoteAddr()) @@ -256,11 +278,12 @@ func (cn *connection) downloadRate() float64 { func (cn *connection) WriteStatus(w io.Writer, t *Torrent) { // \t isn't preserved in
 blocks?
 	fmt.Fprintf(w, "%+-55q %s %s-%s\n", cn.PeerID, cn.PeerExtensionBytes, cn.localAddr(), cn.remoteAddr())
-	fmt.Fprintf(w, "    last msg: %s, connected: %s, last helpful: %s, itime: %s\n",
+	fmt.Fprintf(w, "    last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n",
 		eventAgeString(cn.lastMessageReceived),
 		eventAgeString(cn.completedHandshake),
 		eventAgeString(cn.lastHelpful()),
 		cn.cumInterest(),
+		cn.totalExpectingTime(),
 	)
 	fmt.Fprintf(w,
 		"    %s completed, %d pieces touched, good chunks: %v/%v-%v reqq: (%d,%d,%d]-%d, flags: %s, dr: %.1f KiB/s\n",
@@ -351,9 +374,37 @@ func (cn *connection) requestedMetadataPiece(index int) bool {
 
 // The actual value to use as the maximum outbound requests.
 func (cn *connection) nominalMaxRequests() (ret int) {
+	if cn.t.requestStrategy == 3 {
+		expectingTime := int64(cn.totalExpectingTime())
+		if expectingTime == 0 {
+			expectingTime = math.MaxInt64
+		}
+		return int(clamp(
+			1,
+			int64(cn.PeerMaxRequests),
+			max(
+				// It makes sense to always pipeline at least one connection,
+				// since latency must be non-zero.
+				2,
+				// Request only as many as we expect to receive in the
+				// dupliateRequestTimeout window. We are trying to avoid having to
+				// duplicate requests.
+				cn.chunksReceivedWhileExpecting*int64(cn.t.duplicateRequestTimeout)/expectingTime,
+			),
+		))
+	}
 	return int(clamp(1, int64(cn.PeerMaxRequests), max(64, cn.stats.ChunksReadUseful.Int64()-(cn.stats.ChunksRead.Int64()-cn.stats.ChunksReadUseful.Int64()))))
 }
 
+func (cn *connection) totalExpectingTime() (ret time.Duration) {
+	ret = cn.cumulativeExpectedToReceiveChunks
+	if !cn.lastStartedExpectingToReceiveChunks.IsZero() {
+		ret += time.Since(cn.lastStartedExpectingToReceiveChunks)
+	}
+	return
+
+}
+
 func (cn *connection) onPeerSentCancel(r request) {
 	if _, ok := cn.PeerRequests[r]; !ok {
 		torrent.Add("unexpected cancels received", 1)
@@ -405,6 +456,7 @@ func (cn *connection) SetInterested(interested bool, msg func(pp.Message) bool)
 	} else if !cn.lastBecameInterested.IsZero() {
 		cn.priorInterest += time.Since(cn.lastBecameInterested)
 	}
+	cn.updateExpectingChunks()
 	// log.Printf("%p: setting interest: %v", cn, interested)
 	return msg(pp.Message{
 		Type: func() pp.MessageType {
@@ -447,6 +499,8 @@ func (cn *connection) request(r request, mw messageWriter) bool {
 	}
 	cn.requests[r] = struct{}{}
 	cn.t.pendingRequests[r]++
+	cn.t.lastRequested[r] = time.Now()
+	cn.updateExpectingChunks()
 	return mw(pp.Message{
 		Type:   pp.Request,
 		Index:  r.Index,
@@ -689,6 +743,9 @@ func (cn *connection) shouldRequestWithoutBias() bool {
 }
 
 func (cn *connection) pieceRequestOrderIter() iter.Func {
+	if cn.t.requestStrategy == 3 {
+		return cn.unbiasedPieceRequestOrder()
+	}
 	if cn.shouldRequestWithoutBias() {
 		return cn.unbiasedPieceRequestOrder()
 	} else {
@@ -701,10 +758,16 @@ func (cn *connection) iterPendingRequests(f func(request) bool) {
 		piece := _piece.(int)
 		return iterUndirtiedChunks(piece, cn.t, func(cs chunkSpec) bool {
 			r := request{pp.Integer(piece), cs}
-			// log.Println(r, cn.t.pendingRequests[r], cn.requests)
-			// if _, ok := cn.requests[r]; !ok && cn.t.pendingRequests[r] != 0 {
-			// 	return true
-			// }
+			if cn.t.requestStrategy == 3 {
+				lr := cn.t.lastRequested[r]
+				if !lr.IsZero() {
+					if time.Since(lr) < cn.t.duplicateRequestTimeout {
+						return true
+					} else {
+						torrent.Add("requests duplicated due to timeout", 1)
+					}
+				}
+			}
 			return f(r)
 		})
 	})
@@ -1033,11 +1096,13 @@ func (c *connection) mainReadLoop() (err error) {
 			c.deleteAllRequests()
 			// We can then reset our interest.
 			c.updateRequests()
+			c.updateExpectingChunks()
 		case pp.Reject:
 			c.deleteRequest(newRequestFromMessage(&msg))
 		case pp.Unchoke:
 			c.PeerChoked = false
 			c.tickleWriter()
+			c.updateExpectingChunks()
 		case pp.Interested:
 			c.PeerInterested = true
 			c.tickleWriter()
@@ -1215,6 +1280,9 @@ func (c *connection) receiveChunk(msg *pp.Message) {
 
 	// Request has been satisfied.
 	if c.deleteRequest(req) {
+		if c.expectingChunks() {
+			c.chunksReceivedWhileExpecting++
+		}
 		c.updateRequests()
 	} else {
 		torrent.Add("chunks received unexpected", 1)
@@ -1400,6 +1468,8 @@ func (c *connection) deleteRequest(r request) bool {
 		return false
 	}
 	delete(c.requests, r)
+	c.updateExpectingChunks()
+	delete(c.t.lastRequested, r)
 	pr := c.t.pendingRequests
 	pr[r]--
 	n := pr[r]
diff --git a/connection_test.go b/connection_test.go
index 58a92a61..f6533818 100644
--- a/connection_test.go
+++ b/connection_test.go
@@ -9,7 +9,6 @@ import (
 
 	"github.com/anacrolix/missinggo/pubsub"
 	"github.com/bradfitz/iter"
-	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/require"
 
 	"github.com/anacrolix/torrent/metainfo"
@@ -139,14 +138,3 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) {
 	require.NoError(b, <-mrlErr)
 	require.EqualValues(b, b.N, cn.stats.ChunksReadUseful.Int64())
 }
-
-func TestConnectionReceiveBadChunkIndex(t *testing.T) {
-	cn := connection{
-		t: &Torrent{},
-	}
-	require.False(t, cn.t.haveInfo())
-	assert.NotPanics(t, func() { cn.receiveChunk(&pp.Message{Type: pp.Piece}) })
-	cn.t.info = &metainfo.Info{}
-	require.True(t, cn.t.haveInfo())
-	assert.NotPanics(t, func() { cn.receiveChunk(&pp.Message{Type: pp.Piece}) })
-}
diff --git a/torrent.go b/torrent.go
index ccddd573..36fa7706 100644
--- a/torrent.go
+++ b/torrent.go
@@ -52,11 +52,16 @@ type Torrent struct {
 	logger *log.Logger
 
 	networkingEnabled bool
+
 	// Determines what chunks to request from peers. 1: Favour higher priority
 	// pieces with some fuzzing to reduce overlaps and wastage across
 	// connections. 2: The fastest connection downloads strictly in order of
-	// priority, while all others adher to their piece inclications.
+	// priority, while all others adher to their piece inclications. 3:
+	// Requests are strictly by piece priority, and not duplicated until
+	// duplicateRequestTimeout is reached.
 	requestStrategy int
+	// How long to avoid duplicating a pending request.
+	duplicateRequestTimeout time.Duration
 
 	closed   missinggo.Event
 	infoHash metainfo.Hash
@@ -140,6 +145,9 @@ type Torrent struct {
 
 	// Count of each request across active connections.
 	pendingRequests map[request]int
+	// The last time we requested a chunk. Deleting the request from any
+	// connection will clear this value.
+	lastRequested map[request]time.Time
 }
 
 func (t *Torrent) tickleReaders() {
@@ -399,6 +407,7 @@ func (t *Torrent) onSetInfo() {
 	t.gotMetainfo.Set()
 	t.updateWantPeersEvent()
 	t.pendingRequests = make(map[request]int)
+	t.lastRequested = make(map[request]time.Time)
 }
 
 // Called when metadata for a torrent becomes available.