]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Add requestStrategy 3, which duplicates requests only after a timeout
authorMatt Joiner <anacrolix@gmail.com>
Sun, 24 Jun 2018 10:04:31 +0000 (20:04 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Sun, 24 Jun 2018 10:04:31 +0000 (20:04 +1000)
Possible solution for #253.

client.go
connection.go
connection_test.go
torrent.go

index bc90f0cb7f33b39800f5b2d2359741309fc40e6b..6cc9597f8042000d7616b64dab18847dadac7eae 100644 (file)
--- a/client.go
+++ b/client.go
@@ -975,6 +975,8 @@ func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (
                metadataChanged: sync.Cond{
                        L: &cl.mu,
                },
+               lastRequested:           make(map[request]time.Time),
+               duplicateRequestTimeout: 15 * time.Second,
        }
        t.logger = cl.logger.Clone().AddValue(t)
        t.setChunkSize(defaultChunkSize)
index 53bf211681b6b74abe463fdf2cd7db568c086562..85aaabdcf019fce9698ab7096ef2bf23d87a1dcb 100644 (file)
@@ -6,6 +6,7 @@ import (
        "errors"
        "fmt"
        "io"
+       "math"
        "math/rand"
        "net"
        "strconv"
@@ -68,6 +69,10 @@ type connection struct {
        lastBecameInterested time.Time
        priorInterest        time.Duration
 
+       lastStartedExpectingToReceiveChunks time.Time
+       cumulativeExpectedToReceiveChunks   time.Duration
+       chunksReceivedWhileExpecting        int64
+
        Choked           bool
        requests         map[request]struct{}
        requestsLowWater int
@@ -107,6 +112,23 @@ type connection struct {
        writerCond  sync.Cond
 }
 
+func (cn *connection) updateExpectingChunks() {
+       if cn.expectingChunks() {
+               if cn.lastStartedExpectingToReceiveChunks.IsZero() {
+                       cn.lastStartedExpectingToReceiveChunks = time.Now()
+               }
+       } else {
+               if !cn.lastStartedExpectingToReceiveChunks.IsZero() {
+                       cn.cumulativeExpectedToReceiveChunks += time.Since(cn.lastStartedExpectingToReceiveChunks)
+                       cn.lastStartedExpectingToReceiveChunks = time.Time{}
+               }
+       }
+}
+
+func (cn *connection) expectingChunks() bool {
+       return cn.Interested && !cn.PeerChoked
+}
+
 // Returns true if the connection is over IPv6.
 func (cn *connection) ipv6() bool {
        ip := missinggo.AddrIP(cn.remoteAddr())
@@ -256,11 +278,12 @@ func (cn *connection) downloadRate() float64 {
 func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
        // \t isn't preserved in <pre> blocks?
        fmt.Fprintf(w, "%+-55q %s %s-%s\n", cn.PeerID, cn.PeerExtensionBytes, cn.localAddr(), cn.remoteAddr())
-       fmt.Fprintf(w, "    last msg: %s, connected: %s, last helpful: %s, itime: %s\n",
+       fmt.Fprintf(w, "    last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n",
                eventAgeString(cn.lastMessageReceived),
                eventAgeString(cn.completedHandshake),
                eventAgeString(cn.lastHelpful()),
                cn.cumInterest(),
+               cn.totalExpectingTime(),
        )
        fmt.Fprintf(w,
                "    %s completed, %d pieces touched, good chunks: %v/%v-%v reqq: (%d,%d,%d]-%d, flags: %s, dr: %.1f KiB/s\n",
@@ -351,9 +374,37 @@ func (cn *connection) requestedMetadataPiece(index int) bool {
 
 // The actual value to use as the maximum outbound requests.
 func (cn *connection) nominalMaxRequests() (ret int) {
+       if cn.t.requestStrategy == 3 {
+               expectingTime := int64(cn.totalExpectingTime())
+               if expectingTime == 0 {
+                       expectingTime = math.MaxInt64
+               }
+               return int(clamp(
+                       1,
+                       int64(cn.PeerMaxRequests),
+                       max(
+                               // It makes sense to always pipeline at least one connection,
+                               // since latency must be non-zero.
+                               2,
+                               // Request only as many as we expect to receive in the
+                               // dupliateRequestTimeout window. We are trying to avoid having to
+                               // duplicate requests.
+                               cn.chunksReceivedWhileExpecting*int64(cn.t.duplicateRequestTimeout)/expectingTime,
+                       ),
+               ))
+       }
        return int(clamp(1, int64(cn.PeerMaxRequests), max(64, cn.stats.ChunksReadUseful.Int64()-(cn.stats.ChunksRead.Int64()-cn.stats.ChunksReadUseful.Int64()))))
 }
 
+func (cn *connection) totalExpectingTime() (ret time.Duration) {
+       ret = cn.cumulativeExpectedToReceiveChunks
+       if !cn.lastStartedExpectingToReceiveChunks.IsZero() {
+               ret += time.Since(cn.lastStartedExpectingToReceiveChunks)
+       }
+       return
+
+}
+
 func (cn *connection) onPeerSentCancel(r request) {
        if _, ok := cn.PeerRequests[r]; !ok {
                torrent.Add("unexpected cancels received", 1)
@@ -405,6 +456,7 @@ func (cn *connection) SetInterested(interested bool, msg func(pp.Message) bool)
        } else if !cn.lastBecameInterested.IsZero() {
                cn.priorInterest += time.Since(cn.lastBecameInterested)
        }
+       cn.updateExpectingChunks()
        // log.Printf("%p: setting interest: %v", cn, interested)
        return msg(pp.Message{
                Type: func() pp.MessageType {
@@ -447,6 +499,8 @@ func (cn *connection) request(r request, mw messageWriter) bool {
        }
        cn.requests[r] = struct{}{}
        cn.t.pendingRequests[r]++
+       cn.t.lastRequested[r] = time.Now()
+       cn.updateExpectingChunks()
        return mw(pp.Message{
                Type:   pp.Request,
                Index:  r.Index,
@@ -689,6 +743,9 @@ func (cn *connection) shouldRequestWithoutBias() bool {
 }
 
 func (cn *connection) pieceRequestOrderIter() iter.Func {
+       if cn.t.requestStrategy == 3 {
+               return cn.unbiasedPieceRequestOrder()
+       }
        if cn.shouldRequestWithoutBias() {
                return cn.unbiasedPieceRequestOrder()
        } else {
@@ -701,10 +758,16 @@ func (cn *connection) iterPendingRequests(f func(request) bool) {
                piece := _piece.(int)
                return iterUndirtiedChunks(piece, cn.t, func(cs chunkSpec) bool {
                        r := request{pp.Integer(piece), cs}
-                       // log.Println(r, cn.t.pendingRequests[r], cn.requests)
-                       // if _, ok := cn.requests[r]; !ok && cn.t.pendingRequests[r] != 0 {
-                       //      return true
-                       // }
+                       if cn.t.requestStrategy == 3 {
+                               lr := cn.t.lastRequested[r]
+                               if !lr.IsZero() {
+                                       if time.Since(lr) < cn.t.duplicateRequestTimeout {
+                                               return true
+                                       } else {
+                                               torrent.Add("requests duplicated due to timeout", 1)
+                                       }
+                               }
+                       }
                        return f(r)
                })
        })
@@ -1033,11 +1096,13 @@ func (c *connection) mainReadLoop() (err error) {
                        c.deleteAllRequests()
                        // We can then reset our interest.
                        c.updateRequests()
+                       c.updateExpectingChunks()
                case pp.Reject:
                        c.deleteRequest(newRequestFromMessage(&msg))
                case pp.Unchoke:
                        c.PeerChoked = false
                        c.tickleWriter()
+                       c.updateExpectingChunks()
                case pp.Interested:
                        c.PeerInterested = true
                        c.tickleWriter()
@@ -1215,6 +1280,9 @@ func (c *connection) receiveChunk(msg *pp.Message) {
 
        // Request has been satisfied.
        if c.deleteRequest(req) {
+               if c.expectingChunks() {
+                       c.chunksReceivedWhileExpecting++
+               }
                c.updateRequests()
        } else {
                torrent.Add("chunks received unexpected", 1)
@@ -1400,6 +1468,8 @@ func (c *connection) deleteRequest(r request) bool {
                return false
        }
        delete(c.requests, r)
+       c.updateExpectingChunks()
+       delete(c.t.lastRequested, r)
        pr := c.t.pendingRequests
        pr[r]--
        n := pr[r]
index 58a92a61a4da9c2c20388dbf9e328f6b9c4e03e6..f65338189540abbb64251797e813ede1fcbf75db 100644 (file)
@@ -9,7 +9,6 @@ import (
 
        "github.com/anacrolix/missinggo/pubsub"
        "github.com/bradfitz/iter"
-       "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
 
        "github.com/anacrolix/torrent/metainfo"
@@ -139,14 +138,3 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) {
        require.NoError(b, <-mrlErr)
        require.EqualValues(b, b.N, cn.stats.ChunksReadUseful.Int64())
 }
-
-func TestConnectionReceiveBadChunkIndex(t *testing.T) {
-       cn := connection{
-               t: &Torrent{},
-       }
-       require.False(t, cn.t.haveInfo())
-       assert.NotPanics(t, func() { cn.receiveChunk(&pp.Message{Type: pp.Piece}) })
-       cn.t.info = &metainfo.Info{}
-       require.True(t, cn.t.haveInfo())
-       assert.NotPanics(t, func() { cn.receiveChunk(&pp.Message{Type: pp.Piece}) })
-}
index ccddd57368f9500105712dffc2f07297095f6bd9..36fa77065f4a0eb36d782c24b0e2772dd3c868c9 100644 (file)
@@ -52,11 +52,16 @@ type Torrent struct {
        logger *log.Logger
 
        networkingEnabled bool
+
        // Determines what chunks to request from peers. 1: Favour higher priority
        // pieces with some fuzzing to reduce overlaps and wastage across
        // connections. 2: The fastest connection downloads strictly in order of
-       // priority, while all others adher to their piece inclications.
+       // priority, while all others adher to their piece inclications. 3:
+       // Requests are strictly by piece priority, and not duplicated until
+       // duplicateRequestTimeout is reached.
        requestStrategy int
+       // How long to avoid duplicating a pending request.
+       duplicateRequestTimeout time.Duration
 
        closed   missinggo.Event
        infoHash metainfo.Hash
@@ -140,6 +145,9 @@ type Torrent struct {
 
        // Count of each request across active connections.
        pendingRequests map[request]int
+       // The last time we requested a chunk. Deleting the request from any
+       // connection will clear this value.
+       lastRequested map[request]time.Time
 }
 
 func (t *Torrent) tickleReaders() {
@@ -399,6 +407,7 @@ func (t *Torrent) onSetInfo() {
        t.gotMetainfo.Set()
        t.updateWantPeersEvent()
        t.pendingRequests = make(map[request]int)
+       t.lastRequested = make(map[request]time.Time)
 }
 
 // Called when metadata for a torrent becomes available.