]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Extract the request timeout stuff into requestStrategyThree
authorMatt Joiner <anacrolix@gmail.com>
Fri, 10 Jan 2020 05:18:55 +0000 (16:18 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 13 Jan 2020 23:51:09 +0000 (10:51 +1100)
client.go
config.go
connection.go
go.mod
go.sum
piece.go
request_strategy.go
torrent.go

index 0dab418c212df07a609ea537869914f2a87fdaa6..fddddf4814964c6257a4111329d97e4c9960f108 100644 (file)
--- a/client.go
+++ b/client.go
@@ -1039,11 +1039,10 @@ func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (
                maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
 
                networkingEnabled: true,
-               requestStrategy:   requestStrategyOne{},
+               requestStrategy:   cl.config.DefaultRequestStrategy(t.requestStrategyCallbacks()),
                metadataChanged: sync.Cond{
                        L: cl.locker(),
                },
-               _duplicateRequestTimeout: 1 * time.Second,
        }
        t.logger = cl.logger.WithValues(t).WithText(func(m log.Msg) string {
                return fmt.Sprintf("%v: %s", t, m.Text())
index 0c23ec46e3861c0ce0e8295a1867c1198988e33b..3fd7461ad30e7d4f7d9cc178145d263df6d00452 100644 (file)
--- a/config.go
+++ b/config.go
@@ -137,6 +137,8 @@ type ClientConfig struct {
 
        // OnQuery hook func
        DHTOnQuery func(query *krpc.Msg, source net.Addr) (propagate bool)
+
+       DefaultRequestStrategy requestStrategyMaker
 }
 
 func (cfg *ClientConfig) SetListenAddr(addr string) *ClientConfig {
@@ -174,6 +176,8 @@ func NewDefaultClientConfig() *ClientConfig {
                CryptoProvides: mse.AllSupportedCrypto,
                ListenPort:     42069,
                Logger:         log.Default,
+
+               DefaultRequestStrategy: requestStrategyThreeMaker(5 * time.Second),
        }
        //cc.ConnTracker.SetNoMaxEntries()
        //cc.ConnTracker.Timeout = func(conntrack.Entry) time.Duration { return 0 }
index 24c15b924958eae77d87cd93c1158a122faa6302..0c25e3432a7a52d45b94532428908f14509b11bb 100644 (file)
@@ -388,7 +388,7 @@ func (cn *connection) nominalMaxRequests() (ret int) {
 }
 
 // The actual value to use as the maximum outbound requests.
-func (requestStrategyThree) nominalMaxRequests(cn requestStrategyConnection) (ret int) {
+func (rs requestStrategyThree) nominalMaxRequests(cn requestStrategyConnection) (ret int) {
        expectingTime := int64(cn.totalExpectingTime())
        if expectingTime == 0 {
                expectingTime = math.MaxInt64
@@ -404,7 +404,7 @@ func (requestStrategyThree) nominalMaxRequests(cn requestStrategyConnection) (re
                        2,
                        // Request only as many as we expect to receive in the duplicateRequestTimeout
                        // window. We are trying to avoid having to duplicate requests.
-                       cn.chunksReceivedWhileExpecting()*int64(cn.torrent().duplicateRequestTimeout())/expectingTime,
+                       cn.chunksReceivedWhileExpecting()*int64(rs.duplicateRequestTimeout)/expectingTime,
                ),
        ))
 }
@@ -533,17 +533,7 @@ func (cn *connection) request(r request, mw messageWriter) bool {
        }
        cn.validReceiveChunks[r] = struct{}{}
        cn.t.pendingRequests[r]++
-       cn.t.lastRequested[r] = time.AfterFunc(cn.t._duplicateRequestTimeout, func() {
-               torrent.Add("duplicate request timeouts", 1)
-               cn.mu().Lock()
-               defer cn.mu().Unlock()
-               delete(cn.t.lastRequested, r)
-               for cn := range cn.t.conns {
-                       if cn.PeerHasPiece(pieceIndex(r.Index)) {
-                               cn.updateRequests()
-                       }
-               }
-       })
+       cn.t.requestStrategy.hooks().sentRequest(r)
        cn.updateExpectingChunks()
        return mw(pp.Message{
                Type:   pp.Request,
@@ -553,6 +543,13 @@ func (cn *connection) request(r request, mw messageWriter) bool {
        })
 }
 
+func (rs requestStrategyThree) onSentRequest(r request) {
+       rs.lastRequested[r] = time.AfterFunc(rs.duplicateRequestTimeout, func() {
+               delete(rs.lastRequested, r)
+               rs.callbacks.requestTimedOut(r)
+       })
+}
+
 func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) {
        if !cn.t.networkingEnabled {
                if !cn.SetInterested(false, msg) {
@@ -805,22 +802,24 @@ func (cn *connection) iterPendingPiecesUntyped(f iter.Callback) {
 }
 
 func (cn *connection) iterPendingRequests(piece pieceIndex, f func(request) bool) bool {
-       return cn.t.requestStrategy.iterUndirtiedChunks(cn.t.piece(piece).requestStrategyPiece(),
+       return cn.t.requestStrategy.iterUndirtiedChunks(
+               cn.t.piece(piece).requestStrategyPiece(),
                func(cs chunkSpec) bool {
                        return f(request{pp.Integer(piece), cs})
-               })
+               },
+       )
 }
 
-func (requestStrategyThree) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool {
+func (rs requestStrategyThree) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool {
        for i := pp.Integer(0); i < pp.Integer(p.numChunks()); i++ {
                if p.dirtyChunks().Get(bitmap.BitIndex(i)) {
                        continue
                }
-               ci := p.chunkIndexSpec(i)
-               if p.wouldDuplicateRecent(ci) {
+               r := p.chunkIndexRequest(i)
+               if rs.wouldDuplicateRecent(r) {
                        continue
                }
-               if !f(p.chunkIndexSpec(i)) {
+               if !f(r.chunkSpec) {
                        return false
                }
        }
@@ -835,7 +834,7 @@ func defaultIterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool)
                if err != nil {
                        panic(err)
                }
-               return f(p.chunkIndexSpec(pp.Integer(ci)))
+               return f(p.chunkIndexRequest(pp.Integer(ci)).chunkSpec)
        })
 }
 
@@ -1524,10 +1523,7 @@ func (c *connection) deleteRequest(r request) bool {
        }
        delete(c.requests, r)
        c.updateExpectingChunks()
-       if t, ok := c.t.lastRequested[r]; ok {
-               t.Stop()
-               delete(c.t.lastRequested, r)
-       }
+       c.t.requestStrategy.hooks().deletedRequest(r)
        pr := c.t.pendingRequests
        pr[r]--
        n := pr[r]
diff --git a/go.mod b/go.mod
index 3c480385b3edbbc10a4e8bdf3b9c59d86fbbb957..5f88dc104916d93e9021dce166851dd99f4e4547 100644 (file)
--- a/go.mod
+++ b/go.mod
@@ -10,7 +10,7 @@ require (
        github.com/anacrolix/log v0.4.0
        github.com/anacrolix/missinggo v1.2.1
        github.com/anacrolix/missinggo/perf v1.0.0
-       github.com/anacrolix/missinggo/v2 v2.3.2-0.20200109120848-ad7ce9a1247a
+       github.com/anacrolix/missinggo/v2 v2.3.2-0.20200110051601-fc3212fb3984
        github.com/anacrolix/multiless v0.0.0-20191223025854-070b7994e841
        github.com/anacrolix/sync v0.2.0
        github.com/anacrolix/tagflag v1.0.1
diff --git a/go.sum b/go.sum
index 5786142ae5cf8de40118b97184eeb4f8f8e9e18f..29f47b5b6d5d451d4ee393f4b76c6decc4c5c3b8 100644 (file)
--- a/go.sum
+++ b/go.sum
@@ -65,6 +65,8 @@ github.com/anacrolix/missinggo/v2 v2.2.1-0.20191103010835-12360f38ced0/go.mod h1
 github.com/anacrolix/missinggo/v2 v2.3.0/go.mod h1:ZzG3/cc3t+5zcYWAgYrJW0MBsSwNwOkTlNquBbP51Bc=
 github.com/anacrolix/missinggo/v2 v2.3.2-0.20200109120848-ad7ce9a1247a h1:lYHFRvNiiBBGyreXVkcIztKyru+xAQJzg8AKaj/85TQ=
 github.com/anacrolix/missinggo/v2 v2.3.2-0.20200109120848-ad7ce9a1247a/go.mod h1:sjPqWXxdr3jWcMO/tXhhshXAaiTkGIgJpN93clGzGr8=
+github.com/anacrolix/missinggo/v2 v2.3.2-0.20200110051601-fc3212fb3984 h1:/pEakYOx8jjk2HLIYEA/rTVXoU0Q/JA7Ojv/6X9iIkI=
+github.com/anacrolix/missinggo/v2 v2.3.2-0.20200110051601-fc3212fb3984/go.mod h1:sjPqWXxdr3jWcMO/tXhhshXAaiTkGIgJpN93clGzGr8=
 github.com/anacrolix/mmsg v0.0.0-20180515031531-a4a3ba1fc8bb h1:2Or5ccMoY4Kfao+WdL2w6tpY6ZEe+2VTVbIPd7A/Ajk=
 github.com/anacrolix/mmsg v0.0.0-20180515031531-a4a3ba1fc8bb/go.mod h1:x2/ErsYUmT77kezS63+wzZp8E3byYB0gzirM/WMBLfw=
 github.com/anacrolix/mmsg v1.0.0 h1:btC7YLjOn29aTUAExJiVUhQOuf/8rhm+/nWCMAnL3Hg=
index af4385c3ec27a0895f68e144bae0280f70438e1c..d2f72fd8cda27de8a94fb82bffe71019e3d15f2b 100644 (file)
--- a/piece.go
+++ b/piece.go
@@ -145,6 +145,13 @@ func (p *Piece) chunkIndexSpec(chunk pp.Integer) chunkSpec {
        return chunkIndexSpec(chunk, p.length(), p.chunkSize())
 }
 
+func (p *Piece) chunkIndexRequest(chunkIndex pp.Integer) request {
+       return request{
+               pp.Integer(p.index),
+               chunkIndexSpec(chunkIndex, p.length(), p.chunkSize()),
+       }
+}
+
 func (p *Piece) numDirtyBytes() (ret pp.Integer) {
        // defer func() {
        //      if ret > p.length() {
@@ -256,9 +263,9 @@ func (p *Piece) dirtyChunks() bitmap.Bitmap {
        return p._dirtyChunks
 }
 
-func (p *Piece) wouldDuplicateRecent(cs chunkSpec) bool {
+func (rs requestStrategyThree) wouldDuplicateRecent(r request) bool {
        // This piece has been requested on another connection, and the duplicate request timer is still
        // running.
-       _, ok := p.t.lastRequested[request{pp.Integer(p.index), cs}]
+       _, ok := rs.lastRequested[r]
        return ok
 }
index a09932a3b8491a9a1088e39df704bfa159858025..e2e2c1289b628267a0a974721af9f1dfd0c017aa 100644 (file)
@@ -12,8 +12,7 @@ import (
 type requestStrategyPiece interface {
        numChunks() pp.Integer
        dirtyChunks() bitmap.Bitmap
-       chunkIndexSpec(i pp.Integer) chunkSpec
-       wouldDuplicateRecent(chunkSpec) bool
+       chunkIndexRequest(i pp.Integer) request
 }
 
 type requestStrategyTorrent interface {
@@ -23,7 +22,6 @@ type requestStrategyTorrent interface {
        readerPiecePriorities() (now, readahead bitmap.Bitmap)
        ignorePieces() bitmap.Bitmap
        pendingPieces() *prioritybitmap.PriorityBitmap
-       duplicateRequestTimeout() time.Duration
 }
 
 type requestStrategyConnection interface {
@@ -43,6 +41,16 @@ type requestStrategy interface {
        nominalMaxRequests(requestStrategyConnection) int
        shouldRequestWithoutBias(requestStrategyConnection) bool
        piecePriority(requestStrategyConnection, pieceIndex, piecePriority, int) int
+       hooks() requestStrategyHooks
+}
+
+type requestStrategyHooks struct {
+       sentRequest    func(request)
+       deletedRequest func(request)
+}
+
+type requestStrategyCallbacks interface {
+       requestTimedOut(request)
 }
 
 // Favour higher priority pieces with some fuzzing to reduce overlaps and wastage across
@@ -69,4 +77,43 @@ func (requestStrategyTwo) ShouldRequestWithoutBias(cn requestStrategyConnection)
 // Requests are strictly by piece priority, and not duplicated until duplicateRequestTimeout is
 // reached.
 type requestStrategyThree struct {
+       // How long to avoid duplicating a pending request.
+       duplicateRequestTimeout time.Duration
+       // The last time we requested a chunk. Deleting the request from any connection will clear this
+       // value.
+       lastRequested map[request]*time.Timer
+       callbacks     requestStrategyCallbacks
+}
+
+type requestStrategyMaker func(callbacks requestStrategyCallbacks) requestStrategy
+
+func requestStrategyThreeMaker(duplicateRequestTimeout time.Duration) requestStrategyMaker {
+       return func(callbacks requestStrategyCallbacks) requestStrategy {
+               return requestStrategyThree{
+                       duplicateRequestTimeout: duplicateRequestTimeout,
+                       callbacks:               callbacks,
+                       lastRequested:           make(map[request]*time.Timer),
+               }
+       }
+}
+
+func (rs requestStrategyThree) hooks() requestStrategyHooks {
+       return requestStrategyHooks{
+               deletedRequest: func(r request) {
+                       if t, ok := rs.lastRequested[r]; ok {
+                               t.Stop()
+                               delete(rs.lastRequested, r)
+                       }
+               },
+               sentRequest: rs.onSentRequest,
+       }
+
+}
+
+func (rs requestStrategyOne) hooks() requestStrategyHooks {
+       return requestStrategyHooks{}
+}
+
+func (rs requestStrategyTwo) hooks() requestStrategyHooks {
+       return requestStrategyHooks{}
 }
index b29fd9f8ac22385fa4260e1e407aab7831920d42..5bb013bce9ab99191b74471e70b45b07ecc4abf3 100644 (file)
@@ -48,8 +48,6 @@ type Torrent struct {
 
        // Determines what chunks to request from peers.
        requestStrategy requestStrategy
-       // How long to avoid duplicating a pending request.
-       _duplicateRequestTimeout time.Duration
 
        closed   missinggo.Event
        infoHash metainfo.Hash
@@ -135,9 +133,6 @@ type Torrent struct {
 
        // Count of each request across active connections.
        pendingRequests map[request]int
-       // The last time we requested a chunk. Deleting the request from any
-       // connection will clear this value.
-       lastRequested map[request]*time.Timer
 }
 
 func (t *Torrent) numConns() int {
@@ -408,7 +403,6 @@ func (t *Torrent) onSetInfo() {
        t.gotMetainfo.Set()
        t.updateWantPeersEvent()
        t.pendingRequests = make(map[request]int)
-       t.lastRequested = make(map[request]*time.Timer)
        t.tryCreateMorePieceHashers()
 }
 
@@ -1228,9 +1222,9 @@ func (t *Torrent) assertNoPendingRequests() {
        if len(t.pendingRequests) != 0 {
                panic(t.pendingRequests)
        }
-       if len(t.lastRequested) != 0 {
-               panic(t.lastRequested)
-       }
+       //if len(t.lastRequested) != 0 {
+       //      panic(t.lastRequested)
+       //}
 }
 
 func (t *Torrent) dropConnection(c *connection) {
@@ -1802,6 +1796,22 @@ func (t *Torrent) requestStrategyTorrent() requestStrategyTorrent {
        return t
 }
 
-func (t *Torrent) duplicateRequestTimeout() time.Duration {
-       return t._duplicateRequestTimeout
+type torrentRequestStrategyCallbacks struct {
+       t *Torrent
+}
+
+func (cb torrentRequestStrategyCallbacks) requestTimedOut(r request) {
+       torrent.Add("request timeouts", 1)
+       cb.t.cl.lock()
+       defer cb.t.cl.unlock()
+       for cn := range cb.t.conns {
+               if cn.PeerHasPiece(pieceIndex(r.Index)) {
+                       cn.updateRequests()
+               }
+       }
+
+}
+
+func (t *Torrent) requestStrategyCallbacks() requestStrategyCallbacks {
+       return torrentRequestStrategyCallbacks{t}
 }