"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/mse"
pp "github.com/anacrolix/torrent/peer_protocol"
+ request_strategy "github.com/anacrolix/torrent/request-strategy"
)
type PeerSource string
// Since we have to store all the requests in memory, we can't reasonably exceed what would be
// indexable with the memory space available.
-type maxRequests = int
+type (
+ maxRequests = int
+ requestState = request_strategy.PeerNextRequestState
+)
type Peer struct {
// First to ensure 64-bit alignment for atomics. See #262.
lastChunkSent time.Time
// Stuff controlled by the local peer.
- interested bool
+ nextRequestState requestState
+ actualRequestState requestState
lastBecameInterested time.Time
priorInterest time.Duration
_chunksReceivedWhileExpecting int64
choking bool
- requests map[Request]struct{}
piecesReceivedSinceLastRequestUpdate maxRequests
maxPiecesReceivedBetweenRequestUpdates maxRequests
// Chunks that we might reasonably expect to receive from the peer. Due to
}
func (cn *Peer) expectingChunks() bool {
- if len(cn.requests) == 0 {
+ if len(cn.actualRequestState.Requests) == 0 {
return false
}
- if !cn.interested {
+ if !cn.actualRequestState.Interested {
return false
}
- for r := range cn.requests {
+ for r := range cn.actualRequestState.Requests {
if !cn.remoteChokingPiece(r.Index.Int()) {
return true
}
func (cn *Peer) cumInterest() time.Duration {
ret := cn.priorInterest
- if cn.interested {
+ if cn.actualRequestState.Interested {
ret += time.Since(cn.lastBecameInterested)
}
return ret
c := func(b byte) {
ret += string([]byte{b})
}
- if cn.interested {
+ if cn.actualRequestState.Interested {
c('i')
}
if cn.choking {
func (cn *Peer) numRequestsByPiece() (ret map[pieceIndex]int) {
ret = make(map[pieceIndex]int)
- for r := range cn.requests {
+ for r := range cn.actualRequestState.Requests {
ret[pieceIndex(r.Index)]++
}
return
return notFull
}
-func (cn *peerConnMsgWriter) write(msg pp.Message) bool {
- cn.mu.Lock()
- defer cn.mu.Unlock()
- cn.writeBuffer.Write(msg.MustMarshalBinary())
- cn.writeCond.Broadcast()
- return !cn.writeBufferFull()
-}
-
-func (cn *peerConnMsgWriter) writeBufferFull() bool {
- return cn.writeBuffer.Len() >= writeBufferHighWaterLen
-}
-
func (cn *PeerConn) requestMetadataPiece(index int) {
eID := cn.PeerExtensionIDs[pp.ExtensionNameMetadata]
if eID == 0 {
}
func (cn *Peer) setInterested(interested bool) bool {
- if cn.interested == interested {
+ if cn.actualRequestState.Interested == interested {
return true
}
- cn.interested = interested
+ cn.actualRequestState.Interested = interested
if interested {
cn.lastBecameInterested = time.Now()
} else if !cn.lastBecameInterested.IsZero() {
return nil
}
-func (cn *Peer) request(r Request) error {
+func (cn *Peer) request(r Request) (more bool, err error) {
if err := cn.shouldRequest(r); err != nil {
panic(err)
}
- if _, ok := cn.requests[r]; ok {
- return nil
+ if _, ok := cn.actualRequestState.Requests[r]; ok {
+ return true, nil
}
if cn.numLocalRequests() >= cn.nominalMaxRequests() {
- return errors.New("too many outstanding requests")
+ return true, errors.New("too many outstanding requests")
}
- if cn.requests == nil {
- cn.requests = make(map[Request]struct{})
+ if cn.actualRequestState.Requests == nil {
+ cn.actualRequestState.Requests = make(map[Request]struct{})
}
- cn.requests[r] = struct{}{}
+ cn.actualRequestState.Requests[r] = struct{}{}
if cn.validReceiveChunks == nil {
cn.validReceiveChunks = make(map[Request]int)
}
for _, f := range cn.callbacks.SentRequest {
f(PeerRequestEvent{cn, r})
}
- cn.peerImpl._request(r)
- return nil
+ return cn.peerImpl._request(r), nil
}
-func (me *PeerConn) _request(r Request) {
- me.write(pp.Message{
+func (me *PeerConn) _request(r Request) bool {
+ return me.write(pp.Message{
Type: pp.Request,
Index: r.Index,
Begin: r.Begin,
})
}
-func (me *Peer) cancel(r Request) {
+func (me *Peer) cancel(r Request) bool {
if me.deleteRequest(r) {
- me.peerImpl._cancel(r)
+ return me.peerImpl._cancel(r)
}
+ return true
}
-func (me *PeerConn) _cancel(r Request) {
- me.write(makeCancelMessage(r))
+func (me *PeerConn) _cancel(r Request) bool {
+ return me.write(makeCancelMessage(r))
}
func (cn *PeerConn) fillWriteBuffer() {
+ if !cn.applyNextRequestState() {
+ return
+ }
if cn.pex.IsEnabled() {
if flow := cn.pex.Share(cn.write); !flow {
return
}
func (cn *PeerConn) updateRequests() {
- // log.Print("update requests")
- cn.tickleWriter()
+ cn.t.cl.tickleRequester()
}
// Emits the indices in the Bitmaps bms in order, never repeating any index.
// out.
deletedRequest := false
{
- if _, ok := c.requests[req]; ok {
+ if _, ok := c.actualRequestState.Requests[req]; ok {
for _, f := range c.callbacks.ReceivedRequested {
f(PeerMessageEvent{c, msg})
}
}
func (c *Peer) numLocalRequests() int {
- return len(c.requests)
+ return len(c.actualRequestState.Requests)
}
func (c *Peer) deleteRequest(r Request) bool {
- if _, ok := c.requests[r]; !ok {
+ delete(c.nextRequestState.Requests, r)
+ if _, ok := c.actualRequestState.Requests[r]; !ok {
return false
}
- delete(c.requests, r)
+ delete(c.actualRequestState.Requests, r)
for _, f := range c.callbacks.DeletedRequest {
f(PeerRequestEvent{c, r})
}
}
func (c *Peer) deleteAllRequests() {
- for r := range c.requests {
+ for r := range c.actualRequestState.Requests {
c.deleteRequest(r)
}
- if len(c.requests) != 0 {
- panic(len(c.requests))
+ if l := len(c.actualRequestState.Requests); l != 0 {
+ panic(l)
}
+ c.nextRequestState.Requests = nil
// for c := range c.t.conns {
// c.tickleWriter()
// }
pc, ok := p.peerImpl.(*PeerConn)
return pc, ok
}
+
+func (p *PeerConn) onNextRequestStateChanged() {
+ p.tickleWriter()
+}
"unsafe"
"github.com/anacrolix/missinggo/v2/bitmap"
+
+ "github.com/anacrolix/torrent/internal/chansync"
request_strategy "github.com/anacrolix/torrent/request-strategy"
"github.com/anacrolix/torrent/types"
)
func (cl *Client) requester() {
for {
- func() {
+ update := func() chansync.Signaled {
cl.lock()
defer cl.unlock()
cl.doRequests()
+ return cl.updateRequests.Signaled()
}()
+ // We can probably tune how often to heed this signal. TODO: Currently disabled to retain
+ // existing behaviour, while the signalling is worked out.
+ update = nil
select {
case <-cl.closed.LockedChan(cl.locker()):
return
+ case <-update:
case <-time.After(100 * time.Millisecond):
}
}
}
+func (cl *Client) tickleRequester() {
+ cl.updateRequests.Broadcast()
+}
+
func (cl *Client) doRequests() {
ts := make([]request_strategy.Torrent, 0, len(cl.torrents))
for _, t := range cl.torrents {
HasPiece: p.peerHasPiece,
MaxRequests: p.nominalMaxRequests(),
HasExistingRequest: func(r request_strategy.Request) bool {
- _, ok := p.requests[r]
+ _, ok := p.actualRequestState.Requests[r]
return ok
},
Choking: p.peerChoking,
MaxUnverifiedBytes: cl.config.MaxUnverifiedBytes,
})
for p, state := range nextPeerStates {
- applyPeerNextRequestState(p, state)
+ setPeerNextRequestState(p, state)
}
}
return uintptr(unsafe.Pointer(p))
}
-func applyPeerNextRequestState(_p request_strategy.PeerId, rp request_strategy.PeerNextRequestState) {
+func setPeerNextRequestState(_p request_strategy.PeerId, rp request_strategy.PeerNextRequestState) {
p := (*Peer)(_p.(*peerId))
- p.setInterested(rp.Interested)
- for req := range p.requests {
- if _, ok := rp.Requests[req]; !ok {
- p.cancel(req)
+ p.nextRequestState = rp
+ p.onNextRequestStateChanged()
+}
+
+func (p *Peer) applyNextRequestState() bool {
+ next := p.nextRequestState
+ current := p.actualRequestState
+ if !p.setInterested(next.Interested) {
+ return false
+ }
+ for req := range current.Requests {
+ if _, ok := next.Requests[req]; !ok {
+ if !p.cancel(req) {
+ return false
+ }
}
}
- for req := range rp.Requests {
- err := p.request(req)
+ for req := range next.Requests {
+ more, err := p.request(req)
if err != nil {
panic(err)
} else {
//log.Print(req)
}
+ if !more {
+ return false
+ }
}
+ return true
}