From: Matt Joiner Date: Sun, 9 May 2021 04:14:11 +0000 (+1000) Subject: Pass tests with new full-client request strategy implementation X-Git-Tag: v1.29.0~31^2~58 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=0830589b0adb09d4c671d22674810107a5c92ba4;p=btrtrc.git Pass tests with new full-client request strategy implementation --- diff --git a/client.go b/client.go index cab4c234..b0f537ca 100644 --- a/client.go +++ b/client.go @@ -81,6 +81,8 @@ type Client struct { websocketTrackers websocketTrackers activeAnnounceLimiter limiter.Instance + + clientPieceRequestOrder } type ipStr string @@ -293,6 +295,8 @@ func NewClient(cfg *ClientConfig) (cl *Client, err error) { }, } + go cl.requester() + return } @@ -1139,7 +1143,6 @@ func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) ( webSeeds: make(map[string]*Peer), } t._pendingPieces.NewSet = priorityBitmapStableNewSet - t.requestStrategy = cl.config.DefaultRequestStrategy(t.requestStrategyCallbacks(), &cl._mu) t.logger = cl.logger.WithContextValue(t) t.setChunkSize(defaultChunkSize) return diff --git a/config.go b/config.go index 373ce6fe..5aeef38b 100644 --- a/config.go +++ b/config.go @@ -137,8 +137,6 @@ type ClientConfig struct { // OnQuery hook func DHTOnQuery func(query *krpc.Msg, source net.Addr) (propagate bool) - DefaultRequestStrategy requestStrategyMaker - Extensions PeerExtensionBits DisableWebtorrent bool @@ -185,10 +183,7 @@ func NewDefaultClientConfig() *ClientConfig { CryptoSelector: mse.DefaultCryptoSelector, CryptoProvides: mse.AllSupportedCrypto, ListenPort: 42069, - - DefaultRequestStrategy: RequestStrategyDuplicateRequestTimeout(5 * time.Second), - - Extensions: defaultPeerExtensionBytes(), + Extensions: defaultPeerExtensionBytes(), } //cc.ConnTracker.SetNoMaxEntries() //cc.ConnTracker.Timeout = func(conntrack.Entry) time.Duration { return 0 } diff --git a/peer-impl.go b/peer-impl.go index a04a160b..f4ad12a2 100644 --- a/peer-impl.go +++ b/peer-impl.go @@ -20,4 +20,5 @@ type peerImpl interface { drop() String() string connStatusString() string + writeBufferFull() bool } diff --git a/peerconn.go b/peerconn.go index b26f4398..9522adf3 100644 --- a/peerconn.go +++ b/peerconn.go @@ -337,16 +337,16 @@ func (cn *Peer) writeStatus(w io.Writer, t *Torrent) { cn.statusFlags(), cn.downloadRate()/(1<<10), ) - fmt.Fprintf(w, " next pieces: %v%s\n", - iter.ToSlice(iter.Head(10, cn.iterPendingPiecesUntyped)), - func() string { - if cn == t.fastestPeer { - return " (fastest)" - } else { - return "" - } - }(), - ) + //fmt.Fprintf(w, " next pieces: %v%s\n", + // iter.ToSlice(iter.Head(10, cn.iterPendingPiecesUntyped)), + // func() string { + // if cn == t.fastestPeer { + // return " (fastest)" + // } else { + // return "" + // } + // }(), + //) } func (cn *Peer) close() { @@ -402,7 +402,12 @@ func (cn *PeerConn) write(msg pp.Message) bool { cn.wroteMsg(&msg) cn.writeBuffer.Write(msg.MustMarshalBinary()) torrent.Add(fmt.Sprintf("messages filled of type %s", msg.Type.String()), 1) - return cn.writeBuffer.Len() < writeBufferHighWaterLen + cn.tickleWriter() + return !cn.writeBufferFull() +} + +func (cn *PeerConn) writeBufferFull() bool { + return cn.writeBuffer.Len() >= writeBufferHighWaterLen } func (cn *PeerConn) requestMetadataPiece(index int) { @@ -440,11 +445,7 @@ func (cn *PeerConn) requestedMetadataPiece(index int) bool { // The actual value to use as the maximum outbound requests. func (cn *Peer) nominalMaxRequests() (ret int) { - return int(clamp( - 1, - int64(cn.PeerMaxRequests), - int64(cn.t.requestStrategy.nominalMaxRequests(cn.requestStrategyConnection())), - )) + return cn.PeerMaxRequests } func (cn *Peer) totalExpectingTime() (ret time.Duration) { @@ -528,12 +529,12 @@ func (pc *PeerConn) writeInterested(interested bool) bool { // are okay. type messageWriter func(pp.Message) bool -func (cn *Peer) request(r Request) bool { +func (cn *Peer) request(r Request) (more bool, err error) { if _, ok := cn.requests[r]; ok { - panic("chunk already requested") + return true, nil } if !cn.peerHasPiece(pieceIndex(r.Index)) { - panic("requesting piece peer doesn't have") + return true, errors.New("requesting piece peer doesn't have") } if !cn.t.peerIsActive(cn) { panic("requesting but not in active conns") @@ -545,7 +546,7 @@ func (cn *Peer) request(r Request) bool { if cn.peerAllowedFast.Get(int(r.Index)) { torrent.Add("allowed fast requests sent", 1) } else { - panic("requesting while choking and not allowed fast") + return cn.setInterested(true), errors.New("requesting while choked and not allowed fast") } } if cn.t.hashingPiece(pieceIndex(r.Index)) { @@ -563,12 +564,11 @@ func (cn *Peer) request(r Request) bool { } cn.validReceiveChunks[r]++ cn.t.pendingRequests[r]++ - cn.t.requestStrategy.hooks().sentRequest(r) cn.updateExpectingChunks() for _, f := range cn.callbacks.SentRequest { f(PeerRequestEvent{cn, r}) } - return cn.peerImpl.request(r) + return cn.peerImpl.request(r), nil } func (me *PeerConn) request(r Request) bool { @@ -584,64 +584,7 @@ func (me *PeerConn) cancel(r Request) bool { return me.write(makeCancelMessage(r)) } -func (cn *Peer) doRequestState() bool { - if !cn.t.networkingEnabled || cn.t.dataDownloadDisallowed { - if !cn.setInterested(false) { - return false - } - if len(cn.requests) != 0 { - for r := range cn.requests { - cn.deleteRequest(r) - // log.Printf("%p: cancelling request: %v", cn, r) - if !cn.peerImpl.cancel(r) { - return false - } - } - } - } else if len(cn.requests) <= cn.requestsLowWater { - filledBuffer := false - cn.iterPendingPieces(func(pieceIndex pieceIndex) bool { - cn.iterPendingRequests(pieceIndex, func(r Request) bool { - if !cn.setInterested(true) { - filledBuffer = true - return false - } - if len(cn.requests) >= cn.nominalMaxRequests() { - return false - } - // Choking is looked at here because our interest is dependent - // on whether we'd make requests in its absence. - if cn.peerChoking { - if !cn.peerAllowedFast.Get(bitmap.BitIndex(r.Index)) { - return false - } - } - if _, ok := cn.requests[r]; ok { - return true - } - filledBuffer = !cn.request(r) - return !filledBuffer - }) - return !filledBuffer - }) - if filledBuffer { - // If we didn't completely top up the requests, we shouldn't mark - // the low water, since we'll want to top up the requests as soon - // as we have more write buffer space. - return false - } - cn.requestsLowWater = len(cn.requests) / 2 - if len(cn.requests) == 0 { - return cn.setInterested(false) - } - } - return true -} - func (cn *PeerConn) fillWriteBuffer() { - if !cn.doRequestState() { - return - } if cn.pex.IsEnabled() { if flow := cn.pex.Share(cn.write); !flow { return @@ -743,10 +686,13 @@ func (cn *PeerConn) updateRequests() { func iterBitmapsDistinct(skip *bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func { return func(cb iter.Callback) { for _, bm := range bms { - bm.Sub(*skip) if !iter.All( - func(i interface{}) bool { - skip.Add(i.(int)) + func(_i interface{}) bool { + i := _i.(int) + if skip.Contains(i) { + return true + } + skip.Add(i) return cb(i) }, bm.Iter, @@ -757,62 +703,6 @@ func iterBitmapsDistinct(skip *bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func { } } -func iterUnbiasedPieceRequestOrder(cn requestStrategyConnection, f func(piece pieceIndex) bool) bool { - now, readahead := cn.torrent().readerPiecePriorities() - skip := bitmap.Flip(cn.peerPieces(), 0, cn.torrent().numPieces()) - skip.Union(cn.torrent().ignorePieces()) - // Return an iterator over the different priority classes, minus the skip pieces. - return iter.All( - func(_piece interface{}) bool { - return f(pieceIndex(_piece.(bitmap.BitIndex))) - }, - iterBitmapsDistinct(&skip, now, readahead), - // We have to iterate _pendingPieces separately because it isn't a Bitmap. - func(cb iter.Callback) { - cn.torrent().pendingPieces().IterTyped(func(piece int) bool { - if skip.Contains(piece) { - return true - } - more := cb(piece) - skip.Add(piece) - return more - }) - }, - ) -} - -// The connection should download highest priority pieces first, without any inclination toward -// avoiding wastage. Generally we might do this if there's a single connection, or this is the -// fastest connection, and we have active readers that signal an ordering preference. It's -// conceivable that the best connection should do this, since it's least likely to waste our time if -// assigned to the highest priority pieces, and assigning more than one this role would cause -// significant wasted bandwidth. -func (cn *Peer) shouldRequestWithoutBias() bool { - return cn.t.requestStrategy.shouldRequestWithoutBias(cn.requestStrategyConnection()) -} - -func (cn *Peer) iterPendingPieces(f func(pieceIndex) bool) { - if !cn.t.haveInfo() { - return - } - if cn.closed.IsSet() { - return - } - cn.t.requestStrategy.iterPendingPieces(cn, f) -} -func (cn *Peer) iterPendingPiecesUntyped(f iter.Callback) { - cn.iterPendingPieces(func(i pieceIndex) bool { return f(i) }) -} - -func (cn *Peer) iterPendingRequests(piece pieceIndex, f func(Request) bool) bool { - return cn.t.requestStrategy.iterUndirtiedChunks( - cn.t.piece(piece).requestStrategyPiece(), - func(cs ChunkSpec) bool { - return f(Request{pp.Integer(piece), cs}) - }, - ) -} - // check callers updaterequests func (cn *Peer) stopRequestingPiece(piece pieceIndex) bool { return cn._pieceRequestOrder.Remove(bitmap.BitIndex(piece)) @@ -831,8 +721,7 @@ func (cn *Peer) updatePiecePriority(piece pieceIndex) bool { return cn.stopRequestingPiece(piece) } prio := cn.getPieceInclination()[piece] - prio = cn.t.requestStrategy.piecePriority(cn, piece, tpp, prio) - return cn._pieceRequestOrder.Set(bitmap.BitIndex(piece), prio) || cn.shouldRequestWithoutBias() + return cn._pieceRequestOrder.Set(bitmap.BitIndex(piece), prio) } func (cn *Peer) getPieceInclination() []int { @@ -1571,7 +1460,6 @@ func (c *Peer) deleteRequest(r Request) bool { f(PeerRequestEvent{c, r}) } c.updateExpectingChunks() - c.t.requestStrategy.hooks().deletedRequest(r) pr := c.t.pendingRequests pr[r]-- n := pr[r] @@ -1722,10 +1610,6 @@ func (l connectionTrust) Less(r connectionTrust) bool { return multiless.New().Bool(l.Implicit, r.Implicit).Int64(l.NetGoodPiecesDirted, r.NetGoodPiecesDirted).Less() } -func (cn *Peer) requestStrategyConnection() requestStrategyConnection { - return cn -} - func (cn *Peer) chunksReceivedWhileExpecting() int64 { return cn._chunksReceivedWhileExpecting } @@ -1761,10 +1645,6 @@ func (cn *Peer) stats() *ConnStats { return &cn._stats } -func (cn *Peer) torrent() requestStrategyTorrent { - return cn.t.requestStrategyTorrent() -} - func (p *Peer) TryAsPeerConn() (*PeerConn, bool) { pc, ok := p.peerImpl.(*PeerConn) return pc, ok diff --git a/piece.go b/piece.go index fb9c8056..248832d0 100644 --- a/piece.go +++ b/piece.go @@ -144,7 +144,7 @@ func (p *Piece) chunkIndexSpec(chunk pp.Integer) ChunkSpec { func (p *Piece) chunkIndexRequest(chunkIndex pp.Integer) Request { return Request{ pp.Integer(p.index), - chunkIndexSpec(chunkIndex, p.length(), p.chunkSize()), + p.chunkIndexSpec(chunkIndex), } } @@ -259,10 +259,6 @@ func (p *Piece) allChunksDirty() bool { return p._dirtyChunks.Len() == int(p.numChunks()) } -func (p *Piece) requestStrategyPiece() requestStrategyPiece { - return p -} - func (p *Piece) dirtyChunks() bitmap.Bitmap { return p._dirtyChunks } @@ -270,3 +266,15 @@ func (p *Piece) dirtyChunks() bitmap.Bitmap { func (p *Piece) State() PieceState { return p.t.PieceState(p.index) } + +func (p *Piece) iterUndirtiedChunks(f func(ChunkSpec) bool) bool { + for i := pp.Integer(0); i < p.numChunks(); i++ { + if p.chunkIndexDirty(i) { + continue + } + if !f(p.chunkIndexSpec(i)) { + return false + } + } + return true +} diff --git a/request-strategy-defaults.go b/request-strategy-defaults.go index 1ece5fc6..10cbafc7 100644 --- a/request-strategy-defaults.go +++ b/request-strategy-defaults.go @@ -1,45 +1 @@ package torrent - -import ( - "github.com/anacrolix/missinggo/iter" - "github.com/anacrolix/missinggo/v2/bitmap" - pp "github.com/anacrolix/torrent/peer_protocol" -) - -// Provides default implementations for requestStrategy methods. Could be embedded, or delegated to. -type requestStrategyDefaults struct{} - -func (requestStrategyDefaults) hooks() requestStrategyHooks { - return requestStrategyHooks{ - sentRequest: func(Request) {}, - deletedRequest: func(Request) {}, - } -} - -func (requestStrategyDefaults) iterUndirtiedChunks(p requestStrategyPiece, f func(ChunkSpec) bool) bool { - chunkIndices := p.dirtyChunks().Copy() - chunkIndices.FlipRange(0, bitmap.BitIndex(p.numChunks())) - return iter.ForPerm(chunkIndices.Len(), func(i int) bool { - ci, err := chunkIndices.RB.Select(uint32(i)) - if err != nil { - panic(err) - } - return f(p.chunkIndexRequest(pp.Integer(ci)).ChunkSpec) - }) -} - -func (requestStrategyDefaults) nominalMaxRequests(cn requestStrategyConnection) int { - return int( - max( - 64, - cn.stats().ChunksReadUseful.Int64()- - (cn.stats().ChunksRead.Int64()-cn.stats().ChunksReadUseful.Int64()))) -} - -func (requestStrategyDefaults) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int { - return prio -} - -func (requestStrategyDefaults) shouldRequestWithoutBias(cn requestStrategyConnection) bool { - return false -} diff --git a/request-strategy.go b/request-strategy.go index 75cff963..62e7174f 100644 --- a/request-strategy.go +++ b/request-strategy.go @@ -1,223 +1,162 @@ package torrent import ( - "math" - "sync" + "sort" "time" - "github.com/anacrolix/missinggo/v2/bitmap" - "github.com/anacrolix/missinggo/v2/prioritybitmap" - + "github.com/anacrolix/log" + "github.com/anacrolix/multiless" pp "github.com/anacrolix/torrent/peer_protocol" + "github.com/bradfitz/iter" ) -type requestStrategyPiece interface { - numChunks() pp.Integer - dirtyChunks() bitmap.Bitmap - chunkIndexRequest(i pp.Integer) Request -} - -type requestStrategyTorrent interface { - numConns() int - numReaders() int - numPieces() int - readerPiecePriorities() (now, readahead bitmap.Bitmap) - ignorePieces() bitmap.Bitmap - pendingPieces() *prioritybitmap.PriorityBitmap -} - -type requestStrategyConnection interface { - torrent() requestStrategyTorrent - peerPieces() bitmap.Bitmap - pieceRequestOrder() *prioritybitmap.PriorityBitmap - fastest() bool - stats() *ConnStats - totalExpectingTime() time.Duration - peerMaxRequests() int - chunksReceivedWhileExpecting() int64 -} - -type requestStrategy interface { - iterPendingPieces(requestStrategyConnection, func(pieceIndex) bool) bool - iterUndirtiedChunks(requestStrategyPiece, func(ChunkSpec) bool) bool - nominalMaxRequests(requestStrategyConnection) int - shouldRequestWithoutBias(requestStrategyConnection) bool - piecePriority(requestStrategyConnection, pieceIndex, piecePriority, int) int - hooks() requestStrategyHooks -} - -type requestStrategyHooks struct { - sentRequest func(Request) - deletedRequest func(Request) -} - -type requestStrategyCallbacks interface { - requestTimedOut(Request) -} - -type requestStrategyFuzzing struct { - requestStrategyDefaults +type clientPieceRequestOrder struct { + pieces []pieceRequestOrderPiece } -type requestStrategyFastest struct { - requestStrategyDefaults +type pieceRequestOrderPiece struct { + t *Torrent + index pieceIndex + prio piecePriority + partial bool + availability int } -func newRequestStrategyMaker(rs requestStrategy) requestStrategyMaker { - return func(requestStrategyCallbacks, sync.Locker) requestStrategy { - return rs +func (me *clientPieceRequestOrder) addPieces(t *Torrent, numPieces pieceIndex) { + for i := range iter.N(numPieces) { + me.pieces = append(me.pieces, pieceRequestOrderPiece{ + t: t, + index: i, + }) } } -// The fastest connection downloads strictly in order of priority, while all others adhere to their -// piece inclinations. -func RequestStrategyFastest() requestStrategyMaker { - return newRequestStrategyMaker(requestStrategyFastest{}) -} - -// Favour higher priority pieces with some fuzzing to reduce overlaps and wastage across -// connections. -func RequestStrategyFuzzing() requestStrategyMaker { - return newRequestStrategyMaker(requestStrategyFuzzing{}) -} - -func (requestStrategyFastest) shouldRequestWithoutBias(cn requestStrategyConnection) bool { - if cn.torrent().numReaders() == 0 { - return false - } - if cn.torrent().numConns() == 1 { - return true - } - if cn.fastest() { - return true +func (me *clientPieceRequestOrder) removePieces(t *Torrent) { + newPieces := make([]pieceRequestOrderPiece, 0, len(me.pieces)-t.numPieces()) + for _, p := range me.pieces { + if p.t != t { + newPieces = append(newPieces, p) + } } - return false + me.pieces = newPieces } -type requestStrategyDuplicateRequestTimeout struct { - requestStrategyDefaults - // How long to avoid duplicating a pending request. - duplicateRequestTimeout time.Duration - - callbacks requestStrategyCallbacks - - // The last time we requested a chunk. Deleting the request from any connection will clear this - // value. - lastRequested map[Request]*time.Timer - // The lock to take when running a request timeout handler. - timeoutLocker sync.Locker +func (me clientPieceRequestOrder) sort() { + sort.SliceStable(me.pieces, me.less) } -// Generates a request strategy instance for a given torrent. callbacks are probably specific to the torrent. -type requestStrategyMaker func(callbacks requestStrategyCallbacks, clientLocker sync.Locker) requestStrategy - -// Requests are strictly by piece priority, and not duplicated until duplicateRequestTimeout is -// reached. -func RequestStrategyDuplicateRequestTimeout(duplicateRequestTimeout time.Duration) requestStrategyMaker { - return func(callbacks requestStrategyCallbacks, clientLocker sync.Locker) requestStrategy { - return requestStrategyDuplicateRequestTimeout{ - duplicateRequestTimeout: duplicateRequestTimeout, - callbacks: callbacks, - lastRequested: make(map[Request]*time.Timer), - timeoutLocker: clientLocker, - } +func (me clientPieceRequestOrder) update() { + for i := range me.pieces { + p := &me.pieces[i] + p.prio = p.t.piece(p.index).uncachedPriority() + p.partial = p.t.piecePartiallyDownloaded(p.index) + p.availability = p.t.pieceAvailability(p.index) } } -func (rs requestStrategyDuplicateRequestTimeout) hooks() requestStrategyHooks { - return requestStrategyHooks{ - deletedRequest: func(r Request) { - if t, ok := rs.lastRequested[r]; ok { - t.Stop() - delete(rs.lastRequested, r) - } - }, - sentRequest: rs.onSentRequest, +func (me clientPieceRequestOrder) less(_i, _j int) bool { + i := me.pieces[_i] + j := me.pieces[_j] + ml := multiless.New() + ml.Int(int(j.prio), int(i.prio)) + ml.Bool(j.partial, i.partial) + ml.Int(i.availability, j.availability) + return ml.Less() +} + +func (cl *Client) requester() { + for { + func() { + cl.lock() + defer cl.unlock() + cl.doRequests() + }() + select { + case <-cl.closed.LockedChan(cl.locker()): + return + case <-time.After(10 * time.Millisecond): + } } } -func (rs requestStrategyDuplicateRequestTimeout) iterUndirtiedChunks(p requestStrategyPiece, f func(ChunkSpec) bool) bool { - for i := pp.Integer(0); i < pp.Integer(p.numChunks()); i++ { - if p.dirtyChunks().Get(bitmap.BitIndex(i)) { - continue +func (cl *Client) doRequests() { + requestOrder := clientPieceRequestOrder{} + allPeers := make(map[*Torrent][]*Peer) + storageCapacity := make(map[*Torrent]*int64) + for _, t := range cl.torrents { + // TODO: We could do metainfo requests here. + if t.haveInfo() { + value := int64(t.usualPieceSize()) + storageCapacity[t] = &value + requestOrder.addPieces(t, t.numPieces()) } - r := p.chunkIndexRequest(i) - if rs.wouldDuplicateRecent(r) { + var peers []*Peer + t.iterPeers(func(p *Peer) { + peers = append(peers, p) + }) + allPeers[t] = peers + } + requestOrder.update() + requestOrder.sort() + for _, p := range requestOrder.pieces { + if p.t.ignorePieceForRequests(p.index) { continue } - if !f(r.ChunkSpec) { - return false + peers := allPeers[p.t] + torrentPiece := p.t.piece(p.index) + if left := storageCapacity[p.t]; left != nil { + if *left < int64(torrentPiece.length()) { + continue + } + *left -= int64(torrentPiece.length()) } + p.t.piece(p.index).iterUndirtiedChunks(func(chunk ChunkSpec) bool { + for _, peer := range peers { + req := Request{pp.Integer(p.index), chunk} + _, err := peer.request(req) + if err == nil { + log.Printf("requested %v", req) + break + } + } + return true + }) } - return true -} - -func (requestStrategyFuzzing) piecePriority(cn requestStrategyConnection, piece pieceIndex, tpp piecePriority, prio int) int { - switch tpp { - case PiecePriorityNormal: - case PiecePriorityReadahead: - prio -= int(cn.torrent().numPieces()) - case PiecePriorityNext, PiecePriorityNow: - prio -= 2 * int(cn.torrent().numPieces()) - default: - panic(tpp) - } - prio += int(piece / 3) - return prio -} - -func (requestStrategyDuplicateRequestTimeout) iterPendingPieces(cn requestStrategyConnection, f func(pieceIndex) bool) bool { - return iterUnbiasedPieceRequestOrder(cn, f) -} -func defaultIterPendingPieces(rs requestStrategy, cn requestStrategyConnection, f func(pieceIndex) bool) bool { - if rs.shouldRequestWithoutBias(cn) { - return iterUnbiasedPieceRequestOrder(cn, f) - } else { - return cn.pieceRequestOrder().IterTyped(func(i int) bool { - return f(pieceIndex(i)) + for _, t := range cl.torrents { + t.iterPeers(func(p *Peer) { + if !p.peerChoking && p.numLocalRequests() == 0 && !p.writeBufferFull() { + p.setInterested(false) + } }) } } -func (rs requestStrategyFuzzing) iterPendingPieces(cn requestStrategyConnection, cb func(pieceIndex) bool) bool { - return defaultIterPendingPieces(rs, cn, cb) -} -func (rs requestStrategyFastest) iterPendingPieces(cn requestStrategyConnection, cb func(pieceIndex) bool) bool { - return defaultIterPendingPieces(rs, cn, cb) -} -func (rs requestStrategyDuplicateRequestTimeout) onSentRequest(r Request) { - rs.lastRequested[r] = time.AfterFunc(rs.duplicateRequestTimeout, func() { - rs.timeoutLocker.Lock() - delete(rs.lastRequested, r) - rs.timeoutLocker.Unlock() - rs.callbacks.requestTimedOut(r) - }) -} - -// The actual value to use as the maximum outbound requests. -func (rs requestStrategyDuplicateRequestTimeout) nominalMaxRequests(cn requestStrategyConnection) (ret int) { - expectingTime := int64(cn.totalExpectingTime()) - if expectingTime == 0 { - expectingTime = math.MaxInt64 - } else { - expectingTime *= 2 - } - return int(clamp( - 1, - int64(cn.peerMaxRequests()), - max( - // It makes sense to always pipeline at least one connection, since latency must be - // non-zero. - 2, - // Request only as many as we expect to receive in the duplicateRequestTimeout - // window. We are trying to avoid having to duplicate requests. - cn.chunksReceivedWhileExpecting()*int64(rs.duplicateRequestTimeout)/expectingTime, - ), - )) -} -func (rs requestStrategyDuplicateRequestTimeout) wouldDuplicateRecent(r Request) bool { - // This piece has been requested on another connection, and the duplicate request timer is still - // running. - _, ok := rs.lastRequested[r] - return ok -} +//func (requestStrategyDefaults) iterUndirtiedChunks(p requestStrategyPiece, f func(ChunkSpec) bool) bool { +// chunkIndices := p.dirtyChunks().Copy() +// chunkIndices.FlipRange(0, bitmap.BitIndex(p.numChunks())) +// return iter.ForPerm(chunkIndices.Len(), func(i int) bool { +// ci, err := chunkIndices.RB.Select(uint32(i)) +// if err != nil { +// panic(err) +// } +// return f(p.chunkIndexRequest(pp.Integer(ci)).ChunkSpec) +// }) +//} + +// +//func iterUnbiasedPieceRequestOrder( +// cn requestStrategyConnection, +// f func(piece pieceIndex) bool, +// pieceRequestOrder []pieceIndex, +//) bool { +// cn.torrent().sortPieceRequestOrder(pieceRequestOrder) +// for _, i := range pieceRequestOrder { +// if !cn.peerHasPiece(i) || cn.torrent().ignorePieceForRequests(i) { +// continue +// } +// if !f(i) { +// return false +// } +// } +// return true +//} diff --git a/torrent.go b/torrent.go index e3876474..e8cf36e4 100644 --- a/torrent.go +++ b/torrent.go @@ -55,9 +55,6 @@ type Torrent struct { dataUploadDisallowed bool userOnWriteChunkErr func(error) - // Determines what chunks to request from peers. - requestStrategy requestStrategy - closed missinggo.Event infoHash metainfo.Hash pieces []Piece @@ -150,6 +147,29 @@ type Torrent struct { pex pexState } +func (t *Torrent) pieceAvailability(i pieceIndex) (count int) { + t.iterPeers(func(peer *Peer) { + if peer.peerHasPiece(i) { + count++ + } + }) + return +} + +func (t *Torrent) sortPieceRequestOrder(sl []pieceIndex) { + if len(sl) != t.numPieces() { + panic(len(sl)) + } + availability := make([]int, len(sl)) + t.iterPeers(func(peer *Peer) { + for i := range availability { + if peer.peerHasPiece(i) { + availability[i]++ + } + } + }) +} + func (t *Torrent) numConns() int { return len(t.conns) } @@ -166,15 +186,8 @@ func (t *Torrent) readerReadaheadPieces() bitmap.Bitmap { return t._readerReadaheadPieces } -func (t *Torrent) ignorePieces() bitmap.Bitmap { - ret := t._completedPieces.Copy() - ret.Union(t.piecesQueuedForHash) - for i := 0; i < t.numPieces(); i++ { - if t.piece(i).hashing { - ret.Set(i, true) - } - } - return ret +func (t *Torrent) ignorePieceForRequests(i pieceIndex) bool { + return !t.wantPieceIndex(i) } func (t *Torrent) pendingPieces() *prioritybitmap.PriorityBitmap { @@ -413,6 +426,7 @@ func (t *Torrent) setInfo(info *metainfo.Info) error { // This seems to be all the follow-up tasks after info is set, that can't fail. func (t *Torrent) onSetInfo() { + t.cl.clientPieceRequestOrder.addPieces(t, t.numPieces()) t.iterPeers(func(p *Peer) { p.onGotInfo(t.info) }) @@ -2026,30 +2040,6 @@ func (t *Torrent) piece(i int) *Piece { return &t.pieces[i] } -func (t *Torrent) requestStrategyTorrent() requestStrategyTorrent { - return t -} - -type torrentRequestStrategyCallbacks struct { - t *Torrent -} - -func (cb torrentRequestStrategyCallbacks) requestTimedOut(r Request) { - torrent.Add("Request timeouts", 1) - cb.t.cl.lock() - defer cb.t.cl.unlock() - cb.t.iterPeers(func(cn *Peer) { - if cn.peerHasPiece(pieceIndex(r.Index)) { - cn.updateRequests() - } - }) - -} - -func (t *Torrent) requestStrategyCallbacks() requestStrategyCallbacks { - return torrentRequestStrategyCallbacks{t} -} - func (t *Torrent) onWriteChunkErr(err error) { if t.userOnWriteChunkErr != nil { go t.userOnWriteChunkErr(err) @@ -2111,7 +2101,7 @@ func (t *Torrent) SetOnWriteChunkError(f func(error)) { t.userOnWriteChunkErr = f } -func (t *Torrent) iterPeers(f func(*Peer)) { +func (t *Torrent) iterPeers(f func(p *Peer)) { for pc := range t.conns { f(&pc.Peer) } diff --git a/webseed-peer.go b/webseed-peer.go index e2df582e..9fa77a28 100644 --- a/webseed-peer.go +++ b/webseed-peer.go @@ -24,6 +24,10 @@ type webseedPeer struct { var _ peerImpl = (*webseedPeer)(nil) +func (me *webseedPeer) writeBufferFull() bool { + return false +} + func (me *webseedPeer) connStatusString() string { return me.client.Url } @@ -99,7 +103,6 @@ func (ws *webseedPeer) connectionFlags() string { func (ws *webseedPeer) drop() {} func (ws *webseedPeer) updateRequests() { - ws.peer.doRequestState() } func (ws *webseedPeer) onClose() {