]> Sergey Matveev's repositories - btrtrc.git/blobdiff - peerconn.go
Apply next request state asynchronously
[btrtrc.git] / peerconn.go
index b2753d6a88d741e266d2e8ee2982df2c2f193822..40848e37e2c5e136991323cf9cefb5dc1bf35aaf 100644 (file)
@@ -24,6 +24,7 @@ import (
        "github.com/anacrolix/torrent/metainfo"
        "github.com/anacrolix/torrent/mse"
        pp "github.com/anacrolix/torrent/peer_protocol"
+       request_strategy "github.com/anacrolix/torrent/request-strategy"
 )
 
 type PeerSource string
@@ -48,7 +49,10 @@ type PeerRemoteAddr interface {
 
 // Since we have to store all the requests in memory, we can't reasonably exceed what would be
 // indexable with the memory space available.
-type maxRequests = int
+type (
+       maxRequests  = int
+       requestState = request_strategy.PeerNextRequestState
+)
 
 type Peer struct {
        // First to ensure 64-bit alignment for atomics. See #262.
@@ -78,7 +82,8 @@ type Peer struct {
        lastChunkSent           time.Time
 
        // Stuff controlled by the local peer.
-       interested           bool
+       nextRequestState     requestState
+       actualRequestState   requestState
        lastBecameInterested time.Time
        priorInterest        time.Duration
 
@@ -87,7 +92,6 @@ type Peer struct {
        _chunksReceivedWhileExpecting       int64
 
        choking                                bool
-       requests                               map[Request]struct{}
        piecesReceivedSinceLastRequestUpdate   maxRequests
        maxPiecesReceivedBetweenRequestUpdates maxRequests
        // Chunks that we might reasonably expect to receive from the peer. Due to
@@ -171,13 +175,13 @@ func (cn *Peer) updateExpectingChunks() {
 }
 
 func (cn *Peer) expectingChunks() bool {
-       if len(cn.requests) == 0 {
+       if len(cn.actualRequestState.Requests) == 0 {
                return false
        }
-       if !cn.interested {
+       if !cn.actualRequestState.Interested {
                return false
        }
-       for r := range cn.requests {
+       for r := range cn.actualRequestState.Requests {
                if !cn.remoteChokingPiece(r.Index.Int()) {
                        return true
                }
@@ -217,7 +221,7 @@ func (l *PeerConn) hasPreferredNetworkOver(r *PeerConn) (left, ok bool) {
 
 func (cn *Peer) cumInterest() time.Duration {
        ret := cn.priorInterest
-       if cn.interested {
+       if cn.actualRequestState.Interested {
                ret += time.Since(cn.lastBecameInterested)
        }
        return ret
@@ -301,7 +305,7 @@ func (cn *Peer) statusFlags() (ret string) {
        c := func(b byte) {
                ret += string([]byte{b})
        }
-       if cn.interested {
+       if cn.actualRequestState.Interested {
                c('i')
        }
        if cn.choking {
@@ -329,7 +333,7 @@ func (cn *Peer) downloadRate() float64 {
 
 func (cn *Peer) numRequestsByPiece() (ret map[pieceIndex]int) {
        ret = make(map[pieceIndex]int)
-       for r := range cn.requests {
+       for r := range cn.actualRequestState.Requests {
                ret[pieceIndex(r.Index)]++
        }
        return
@@ -437,18 +441,6 @@ func (cn *PeerConn) write(msg pp.Message) bool {
        return notFull
 }
 
-func (cn *peerConnMsgWriter) write(msg pp.Message) bool {
-       cn.mu.Lock()
-       defer cn.mu.Unlock()
-       cn.writeBuffer.Write(msg.MustMarshalBinary())
-       cn.writeCond.Broadcast()
-       return !cn.writeBufferFull()
-}
-
-func (cn *peerConnMsgWriter) writeBufferFull() bool {
-       return cn.writeBuffer.Len() >= writeBufferHighWaterLen
-}
-
 func (cn *PeerConn) requestMetadataPiece(index int) {
        eID := cn.PeerExtensionIDs[pp.ExtensionNameMetadata]
        if eID == 0 {
@@ -538,10 +530,10 @@ func (cn *PeerConn) unchoke(msg func(pp.Message) bool) bool {
 }
 
 func (cn *Peer) setInterested(interested bool) bool {
-       if cn.interested == interested {
+       if cn.actualRequestState.Interested == interested {
                return true
        }
-       cn.interested = interested
+       cn.actualRequestState.Interested = interested
        if interested {
                cn.lastBecameInterested = time.Now()
        } else if !cn.lastBecameInterested.IsZero() {
@@ -587,20 +579,20 @@ func (cn *Peer) shouldRequest(r Request) error {
        return nil
 }
 
-func (cn *Peer) request(r Request) error {
+func (cn *Peer) request(r Request) (more bool, err error) {
        if err := cn.shouldRequest(r); err != nil {
                panic(err)
        }
-       if _, ok := cn.requests[r]; ok {
-               return nil
+       if _, ok := cn.actualRequestState.Requests[r]; ok {
+               return true, nil
        }
        if cn.numLocalRequests() >= cn.nominalMaxRequests() {
-               return errors.New("too many outstanding requests")
+               return true, errors.New("too many outstanding requests")
        }
-       if cn.requests == nil {
-               cn.requests = make(map[Request]struct{})
+       if cn.actualRequestState.Requests == nil {
+               cn.actualRequestState.Requests = make(map[Request]struct{})
        }
-       cn.requests[r] = struct{}{}
+       cn.actualRequestState.Requests[r] = struct{}{}
        if cn.validReceiveChunks == nil {
                cn.validReceiveChunks = make(map[Request]int)
        }
@@ -610,12 +602,11 @@ func (cn *Peer) request(r Request) error {
        for _, f := range cn.callbacks.SentRequest {
                f(PeerRequestEvent{cn, r})
        }
-       cn.peerImpl._request(r)
-       return nil
+       return cn.peerImpl._request(r), nil
 }
 
-func (me *PeerConn) _request(r Request) {
-       me.write(pp.Message{
+func (me *PeerConn) _request(r Request) bool {
+       return me.write(pp.Message{
                Type:   pp.Request,
                Index:  r.Index,
                Begin:  r.Begin,
@@ -623,17 +614,21 @@ func (me *PeerConn) _request(r Request) {
        })
 }
 
-func (me *Peer) cancel(r Request) {
+func (me *Peer) cancel(r Request) bool {
        if me.deleteRequest(r) {
-               me.peerImpl._cancel(r)
+               return me.peerImpl._cancel(r)
        }
+       return true
 }
 
-func (me *PeerConn) _cancel(r Request) {
-       me.write(makeCancelMessage(r))
+func (me *PeerConn) _cancel(r Request) bool {
+       return me.write(makeCancelMessage(r))
 }
 
 func (cn *PeerConn) fillWriteBuffer() {
+       if !cn.applyNextRequestState() {
+               return
+       }
        if cn.pex.IsEnabled() {
                if flow := cn.pex.Share(cn.write); !flow {
                        return
@@ -668,8 +663,7 @@ func (cn *PeerConn) postBitfield() {
 }
 
 func (cn *PeerConn) updateRequests() {
-       // log.Print("update requests")
-       cn.tickleWriter()
+       cn.t.cl.tickleRequester()
 }
 
 // Emits the indices in the Bitmaps bms in order, never repeating any index.
@@ -1286,7 +1280,7 @@ func (c *Peer) receiveChunk(msg *pp.Message) error {
        // out.
        deletedRequest := false
        {
-               if _, ok := c.requests[req]; ok {
+               if _, ok := c.actualRequestState.Requests[req]; ok {
                        for _, f := range c.callbacks.ReceivedRequested {
                                f(PeerMessageEvent{c, msg})
                        }
@@ -1468,14 +1462,15 @@ func (c *Peer) peerHasWantedPieces() bool {
 }
 
 func (c *Peer) numLocalRequests() int {
-       return len(c.requests)
+       return len(c.actualRequestState.Requests)
 }
 
 func (c *Peer) deleteRequest(r Request) bool {
-       if _, ok := c.requests[r]; !ok {
+       delete(c.nextRequestState.Requests, r)
+       if _, ok := c.actualRequestState.Requests[r]; !ok {
                return false
        }
-       delete(c.requests, r)
+       delete(c.actualRequestState.Requests, r)
        for _, f := range c.callbacks.DeletedRequest {
                f(PeerRequestEvent{c, r})
        }
@@ -1493,12 +1488,13 @@ func (c *Peer) deleteRequest(r Request) bool {
 }
 
 func (c *Peer) deleteAllRequests() {
-       for r := range c.requests {
+       for r := range c.actualRequestState.Requests {
                c.deleteRequest(r)
        }
-       if len(c.requests) != 0 {
-               panic(len(c.requests))
+       if l := len(c.actualRequestState.Requests); l != 0 {
+               panic(l)
        }
+       c.nextRequestState.Requests = nil
        // for c := range c.t.conns {
        //      c.tickleWriter()
        // }
@@ -1635,3 +1631,7 @@ func (p *Peer) TryAsPeerConn() (*PeerConn, bool) {
        pc, ok := p.peerImpl.(*PeerConn)
        return pc, ok
 }
+
+func (p *PeerConn) onNextRequestStateChanged() {
+       p.tickleWriter()
+}