maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
networkingEnabled: true,
- requestStrategy: requestStrategyOne{},
+ requestStrategy: cl.config.DefaultRequestStrategy(t.requestStrategyCallbacks()),
metadataChanged: sync.Cond{
L: cl.locker(),
},
- _duplicateRequestTimeout: 1 * time.Second,
}
t.logger = cl.logger.WithValues(t).WithText(func(m log.Msg) string {
return fmt.Sprintf("%v: %s", t, m.Text())
// OnQuery hook func
DHTOnQuery func(query *krpc.Msg, source net.Addr) (propagate bool)
+
+ DefaultRequestStrategy requestStrategyMaker
}
func (cfg *ClientConfig) SetListenAddr(addr string) *ClientConfig {
CryptoProvides: mse.AllSupportedCrypto,
ListenPort: 42069,
Logger: log.Default,
+
+ DefaultRequestStrategy: requestStrategyThreeMaker(5 * time.Second),
}
//cc.ConnTracker.SetNoMaxEntries()
//cc.ConnTracker.Timeout = func(conntrack.Entry) time.Duration { return 0 }
}
// The actual value to use as the maximum outbound requests.
-func (requestStrategyThree) nominalMaxRequests(cn requestStrategyConnection) (ret int) {
+func (rs requestStrategyThree) nominalMaxRequests(cn requestStrategyConnection) (ret int) {
expectingTime := int64(cn.totalExpectingTime())
if expectingTime == 0 {
expectingTime = math.MaxInt64
2,
// Request only as many as we expect to receive in the duplicateRequestTimeout
// window. We are trying to avoid having to duplicate requests.
- cn.chunksReceivedWhileExpecting()*int64(cn.torrent().duplicateRequestTimeout())/expectingTime,
+ cn.chunksReceivedWhileExpecting()*int64(rs.duplicateRequestTimeout)/expectingTime,
),
))
}
}
cn.validReceiveChunks[r] = struct{}{}
cn.t.pendingRequests[r]++
- cn.t.lastRequested[r] = time.AfterFunc(cn.t._duplicateRequestTimeout, func() {
- torrent.Add("duplicate request timeouts", 1)
- cn.mu().Lock()
- defer cn.mu().Unlock()
- delete(cn.t.lastRequested, r)
- for cn := range cn.t.conns {
- if cn.PeerHasPiece(pieceIndex(r.Index)) {
- cn.updateRequests()
- }
- }
- })
+ cn.t.requestStrategy.hooks().sentRequest(r)
cn.updateExpectingChunks()
return mw(pp.Message{
Type: pp.Request,
})
}
+func (rs requestStrategyThree) onSentRequest(r request) {
+ rs.lastRequested[r] = time.AfterFunc(rs.duplicateRequestTimeout, func() {
+ delete(rs.lastRequested, r)
+ rs.callbacks.requestTimedOut(r)
+ })
+}
+
func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) {
if !cn.t.networkingEnabled {
if !cn.SetInterested(false, msg) {
}
func (cn *connection) iterPendingRequests(piece pieceIndex, f func(request) bool) bool {
- return cn.t.requestStrategy.iterUndirtiedChunks(cn.t.piece(piece).requestStrategyPiece(),
+ return cn.t.requestStrategy.iterUndirtiedChunks(
+ cn.t.piece(piece).requestStrategyPiece(),
func(cs chunkSpec) bool {
return f(request{pp.Integer(piece), cs})
- })
+ },
+ )
}
-func (requestStrategyThree) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool {
+func (rs requestStrategyThree) iterUndirtiedChunks(p requestStrategyPiece, f func(chunkSpec) bool) bool {
for i := pp.Integer(0); i < pp.Integer(p.numChunks()); i++ {
if p.dirtyChunks().Get(bitmap.BitIndex(i)) {
continue
}
- ci := p.chunkIndexSpec(i)
- if p.wouldDuplicateRecent(ci) {
+ r := p.chunkIndexRequest(i)
+ if rs.wouldDuplicateRecent(r) {
continue
}
- if !f(p.chunkIndexSpec(i)) {
+ if !f(r.chunkSpec) {
return false
}
}
if err != nil {
panic(err)
}
- return f(p.chunkIndexSpec(pp.Integer(ci)))
+ return f(p.chunkIndexRequest(pp.Integer(ci)).chunkSpec)
})
}
}
delete(c.requests, r)
c.updateExpectingChunks()
- if t, ok := c.t.lastRequested[r]; ok {
- t.Stop()
- delete(c.t.lastRequested, r)
- }
+ c.t.requestStrategy.hooks().deletedRequest(r)
pr := c.t.pendingRequests
pr[r]--
n := pr[r]
github.com/anacrolix/log v0.4.0
github.com/anacrolix/missinggo v1.2.1
github.com/anacrolix/missinggo/perf v1.0.0
- github.com/anacrolix/missinggo/v2 v2.3.2-0.20200109120848-ad7ce9a1247a
+ github.com/anacrolix/missinggo/v2 v2.3.2-0.20200110051601-fc3212fb3984
github.com/anacrolix/multiless v0.0.0-20191223025854-070b7994e841
github.com/anacrolix/sync v0.2.0
github.com/anacrolix/tagflag v1.0.1
github.com/anacrolix/missinggo/v2 v2.3.0/go.mod h1:ZzG3/cc3t+5zcYWAgYrJW0MBsSwNwOkTlNquBbP51Bc=
github.com/anacrolix/missinggo/v2 v2.3.2-0.20200109120848-ad7ce9a1247a h1:lYHFRvNiiBBGyreXVkcIztKyru+xAQJzg8AKaj/85TQ=
github.com/anacrolix/missinggo/v2 v2.3.2-0.20200109120848-ad7ce9a1247a/go.mod h1:sjPqWXxdr3jWcMO/tXhhshXAaiTkGIgJpN93clGzGr8=
+github.com/anacrolix/missinggo/v2 v2.3.2-0.20200110051601-fc3212fb3984 h1:/pEakYOx8jjk2HLIYEA/rTVXoU0Q/JA7Ojv/6X9iIkI=
+github.com/anacrolix/missinggo/v2 v2.3.2-0.20200110051601-fc3212fb3984/go.mod h1:sjPqWXxdr3jWcMO/tXhhshXAaiTkGIgJpN93clGzGr8=
github.com/anacrolix/mmsg v0.0.0-20180515031531-a4a3ba1fc8bb h1:2Or5ccMoY4Kfao+WdL2w6tpY6ZEe+2VTVbIPd7A/Ajk=
github.com/anacrolix/mmsg v0.0.0-20180515031531-a4a3ba1fc8bb/go.mod h1:x2/ErsYUmT77kezS63+wzZp8E3byYB0gzirM/WMBLfw=
github.com/anacrolix/mmsg v1.0.0 h1:btC7YLjOn29aTUAExJiVUhQOuf/8rhm+/nWCMAnL3Hg=
return chunkIndexSpec(chunk, p.length(), p.chunkSize())
}
+func (p *Piece) chunkIndexRequest(chunkIndex pp.Integer) request {
+ return request{
+ pp.Integer(p.index),
+ chunkIndexSpec(chunkIndex, p.length(), p.chunkSize()),
+ }
+}
+
func (p *Piece) numDirtyBytes() (ret pp.Integer) {
// defer func() {
// if ret > p.length() {
return p._dirtyChunks
}
-func (p *Piece) wouldDuplicateRecent(cs chunkSpec) bool {
+func (rs requestStrategyThree) wouldDuplicateRecent(r request) bool {
// This piece has been requested on another connection, and the duplicate request timer is still
// running.
- _, ok := p.t.lastRequested[request{pp.Integer(p.index), cs}]
+ _, ok := rs.lastRequested[r]
return ok
}
type requestStrategyPiece interface {
numChunks() pp.Integer
dirtyChunks() bitmap.Bitmap
- chunkIndexSpec(i pp.Integer) chunkSpec
- wouldDuplicateRecent(chunkSpec) bool
+ chunkIndexRequest(i pp.Integer) request
}
type requestStrategyTorrent interface {
readerPiecePriorities() (now, readahead bitmap.Bitmap)
ignorePieces() bitmap.Bitmap
pendingPieces() *prioritybitmap.PriorityBitmap
- duplicateRequestTimeout() time.Duration
}
type requestStrategyConnection interface {
nominalMaxRequests(requestStrategyConnection) int
shouldRequestWithoutBias(requestStrategyConnection) bool
piecePriority(requestStrategyConnection, pieceIndex, piecePriority, int) int
+ hooks() requestStrategyHooks
+}
+
+type requestStrategyHooks struct {
+ sentRequest func(request)
+ deletedRequest func(request)
+}
+
+type requestStrategyCallbacks interface {
+ requestTimedOut(request)
}
// Favour higher priority pieces with some fuzzing to reduce overlaps and wastage across
// Requests are strictly by piece priority, and not duplicated until duplicateRequestTimeout is
// reached.
type requestStrategyThree struct {
+ // How long to avoid duplicating a pending request.
+ duplicateRequestTimeout time.Duration
+ // The last time we requested a chunk. Deleting the request from any connection will clear this
+ // value.
+ lastRequested map[request]*time.Timer
+ callbacks requestStrategyCallbacks
+}
+
+type requestStrategyMaker func(callbacks requestStrategyCallbacks) requestStrategy
+
+func requestStrategyThreeMaker(duplicateRequestTimeout time.Duration) requestStrategyMaker {
+ return func(callbacks requestStrategyCallbacks) requestStrategy {
+ return requestStrategyThree{
+ duplicateRequestTimeout: duplicateRequestTimeout,
+ callbacks: callbacks,
+ lastRequested: make(map[request]*time.Timer),
+ }
+ }
+}
+
+func (rs requestStrategyThree) hooks() requestStrategyHooks {
+ return requestStrategyHooks{
+ deletedRequest: func(r request) {
+ if t, ok := rs.lastRequested[r]; ok {
+ t.Stop()
+ delete(rs.lastRequested, r)
+ }
+ },
+ sentRequest: rs.onSentRequest,
+ }
+
+}
+
+func (rs requestStrategyOne) hooks() requestStrategyHooks {
+ return requestStrategyHooks{}
+}
+
+func (rs requestStrategyTwo) hooks() requestStrategyHooks {
+ return requestStrategyHooks{}
}
// Determines what chunks to request from peers.
requestStrategy requestStrategy
- // How long to avoid duplicating a pending request.
- _duplicateRequestTimeout time.Duration
closed missinggo.Event
infoHash metainfo.Hash
// Count of each request across active connections.
pendingRequests map[request]int
- // The last time we requested a chunk. Deleting the request from any
- // connection will clear this value.
- lastRequested map[request]*time.Timer
}
func (t *Torrent) numConns() int {
t.gotMetainfo.Set()
t.updateWantPeersEvent()
t.pendingRequests = make(map[request]int)
- t.lastRequested = make(map[request]*time.Timer)
t.tryCreateMorePieceHashers()
}
if len(t.pendingRequests) != 0 {
panic(t.pendingRequests)
}
- if len(t.lastRequested) != 0 {
- panic(t.lastRequested)
- }
+ //if len(t.lastRequested) != 0 {
+ // panic(t.lastRequested)
+ //}
}
func (t *Torrent) dropConnection(c *connection) {
return t
}
-func (t *Torrent) duplicateRequestTimeout() time.Duration {
- return t._duplicateRequestTimeout
+type torrentRequestStrategyCallbacks struct {
+ t *Torrent
+}
+
+func (cb torrentRequestStrategyCallbacks) requestTimedOut(r request) {
+ torrent.Add("request timeouts", 1)
+ cb.t.cl.lock()
+ defer cb.t.cl.unlock()
+ for cn := range cb.t.conns {
+ if cn.PeerHasPiece(pieceIndex(r.Index)) {
+ cn.updateRequests()
+ }
+ }
+
+}
+
+func (t *Torrent) requestStrategyCallbacks() requestStrategyCallbacks {
+ return torrentRequestStrategyCallbacks{t}
}