]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Read peer request data without Client lock
authorMatt Joiner <anacrolix@gmail.com>
Fri, 13 Nov 2020 04:50:08 +0000 (15:50 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 16 Nov 2020 05:37:11 +0000 (16:37 +1100)
client.go
peerconn.go

index 2ca1661db3f581310820ee3cf7f3cdc37aabcbfa..d0bd686923ea79979599934797a5d97d01e75c02 100644 (file)
--- a/client.go
+++ b/client.go
@@ -959,8 +959,11 @@ func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) {
                                        M: map[pp.ExtensionName]pp.ExtensionNumber{
                                                pp.ExtensionNameMetadata: metadataExtendedId,
                                        },
-                                       V:            cl.config.ExtendedHandshakeClientVersion,
-                                       Reqq:         64, // TODO: Really?
+                                       V: cl.config.ExtendedHandshakeClientVersion,
+                                       // If peer requests are buffered on read, this instructs the amount of memory
+                                       // that might be used to cache pending writes. Assuming 512KiB cached for
+                                       // sending, for 16KiB chunks.
+                                       Reqq:         1 << 5,
                                        YourIp:       pp.CompactIp(addrIpOrNil(conn.RemoteAddr)),
                                        Encryption:   cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred,
                                        Port:         cl.incomingPeerPort(),
index 50c86ad6ecfec0f8d866f4681c86080283b0eb30..633e54ce49a16658c7e16085dbf7fa7c43b04199 100644 (file)
@@ -38,6 +38,10 @@ const (
        PeerSourceDirect = "M"
 )
 
+type peerRequestState struct {
+       data []byte
+}
+
 type peer struct {
        // First to ensure 64-bit alignment for atomics. See #262.
        _stats ConnStats
@@ -88,7 +92,7 @@ type peer struct {
        // Stuff controlled by the remote peer.
        peerInterested        bool
        peerChoking           bool
-       peerRequests          map[request]struct{}
+       peerRequests          map[request]*peerRequestState
        PeerPrefersEncryption bool // as indicated by 'e' field in extension handshake
        PeerListenPort        int
        // The pieces the peer has claimed to have.
@@ -365,17 +369,23 @@ func (cn *peer) peerHasPiece(piece pieceIndex) bool {
        return cn.peerSentHaveAll || cn._peerPieces.Contains(bitmap.BitIndex(piece))
 }
 
-// Writes a message into the write buffer.
-func (cn *PeerConn) post(msg pp.Message) {
+// 64KiB, but temporarily less to work around an issue with WebRTC. TODO: Update when
+// https://github.com/pion/datachannel/issues/59 is fixed.
+const writeBufferHighWaterLen = 1 << 15
+
+// Writes a message into the write buffer. Returns whether it's okay to keep writing. Posting is
+// done asynchronously, so it may be that we're not able to honour backpressure from this method. It
+// might be possible to merge this with PeerConn.write down the track? They seem to be very similar.
+func (cn *PeerConn) post(msg pp.Message) bool {
        torrent.Add(fmt.Sprintf("messages posted of type %s", msg.Type.String()), 1)
-       // We don't need to track bytes here because a connection.w Writer wrapper
-       // takes care of that (although there's some delay between us recording
-       // the message, and the connection writer flushing it out.).
+       // We don't need to track bytes here because a connection.w Writer wrapper takes care of that
+       // (although there's some delay between us recording the message, and the connection writer
+       // flushing it out.).
        cn.writeBuffer.Write(msg.MustMarshalBinary())
-       // Last I checked only Piece messages affect stats, and we don't post
-       // those.
+       // Last I checked only Piece messages affect stats, and we don't post those.
        cn.wroteMsg(&msg)
        cn.tickleWriter()
+       return cn.writeBuffer.Len() < writeBufferHighWaterLen
 }
 
 // Returns true if there's room to write more.
@@ -383,9 +393,7 @@ func (cn *PeerConn) write(msg pp.Message) bool {
        cn.wroteMsg(&msg)
        cn.writeBuffer.Write(msg.MustMarshalBinary())
        torrent.Add(fmt.Sprintf("messages filled of type %s", msg.Type.String()), 1)
-       // 64KiB, but temporarily less to work around an issue with WebRTC. TODO: Update
-       // when https://github.com/pion/datachannel/issues/59 is fixed.
-       return cn.writeBuffer.Len() < 1<<15
+       return cn.writeBuffer.Len() < writeBufferHighWaterLen
 }
 
 func (cn *PeerConn) requestMetadataPiece(index int) {
@@ -1032,13 +1040,68 @@ func (c *PeerConn) onReadRequest(r request) error {
                return errors.New("bad request")
        }
        if c.peerRequests == nil {
-               c.peerRequests = make(map[request]struct{}, maxRequests)
+               c.peerRequests = make(map[request]*peerRequestState, maxRequests)
        }
-       c.peerRequests[r] = struct{}{}
-       c.tickleWriter()
+       value := &peerRequestState{}
+       c.peerRequests[r] = value
+       go c.peerRequestDataReader(r, value)
+       //c.tickleWriter()
        return nil
 }
 
+func (c *PeerConn) peerRequestDataReader(r request, prs *peerRequestState) {
+       b, err := readPeerRequestData(r, c)
+       c.locker().Lock()
+       defer c.locker().Unlock()
+       if err != nil {
+               c.peerRequestDataReadFailed(err, r)
+       } else {
+               if b == nil {
+                       panic("data must be non-nil to trigger send")
+               }
+               prs.data = b
+               c.tickleWriter()
+       }
+}
+
+// If this is maintained correctly, we might be able to support optional synchronous reading for
+// chunk sending, the way it used to work.
+func (c *PeerConn) peerRequestDataReadFailed(err error, r request) {
+       c.logger.WithDefaultLevel(log.Warning).Printf("error reading chunk for peer request %v: %v", r, err)
+       i := pieceIndex(r.Index)
+       if c.t.pieceComplete(i) {
+               // There used to be more code here that just duplicated the following break. Piece
+               // completions are currently cached, so I'm not sure how helpful this update is, except to
+               // pull any completion changes pushed to the storage backend in failed reads that got us
+               // here.
+               c.t.updatePieceCompletion(i)
+       }
+       // If we failed to send a chunk, choke the peer to ensure they flush all their requests. We've
+       // probably dropped a piece from storage, but there's no way to communicate this to the peer. If
+       // they ask for it again, we'll kick them to allow us to send them an updated bitfield on the
+       // next connect. TODO: Support rejecting here too.
+       if c.choking {
+               c.logger.WithDefaultLevel(log.Warning).Printf("already choking peer, requests might not be rejected correctly")
+       }
+       c.choke(c.post)
+}
+
+func readPeerRequestData(r request, c *PeerConn) ([]byte, error) {
+       b := make([]byte, r.Length)
+       p := c.t.info.Piece(int(r.Index))
+       n, err := c.t.readAt(b, p.Offset()+int64(r.Begin))
+       if n == len(b) {
+               if err == io.EOF {
+                       err = nil
+               }
+       } else {
+               if err == nil {
+                       panic("expected error")
+               }
+       }
+       return b, err
+}
+
 func runSafeExtraneous(f func()) {
        if true {
                go f()
@@ -1422,7 +1485,10 @@ another:
                if !c.unchoke(msg) {
                        return false
                }
-               for r := range c.peerRequests {
+               for r, state := range c.peerRequests {
+                       if state.data == nil {
+                               continue
+                       }
                        res := c.t.cl.config.UploadRateLimiter.ReserveN(time.Now(), int(r.Length))
                        if !res.OK() {
                                panic(fmt.Sprintf("upload rate limiter burst size < %d", r.Length))
@@ -1434,27 +1500,7 @@ another:
                                // Hard to say what to return here.
                                return true
                        }
-                       more, err := c.sendChunk(r, msg)
-                       if err != nil {
-                               c.logger.WithDefaultLevel(log.Warning).Printf("sending chunk to peer: %v", err)
-                               i := pieceIndex(r.Index)
-                               if c.t.pieceComplete(i) {
-                                       // There used to be more code here that just duplicated the following break.
-                                       // Piece completions are currently cached, so I'm not sure how helpful this
-                                       // update is, except to pull any completion changes pushed to the storage
-                                       // backend in failed reads that got us here.
-                                       c.t.updatePieceCompletion(i)
-                               }
-                               // If we failed to send a chunk, choke the peer by breaking out of the loop here to
-                               // ensure they flush all their requests. We've probably dropped a piece from
-                               // storage, but there's no way to communicate this to the peer. If they ask for it
-                               // again, we'll kick them to allow us to send them an updated bitfield on the next
-                               // connect.
-                               if c.choking {
-                                       c.logger.WithDefaultLevel(log.Warning).Printf("already choking peer, requests might not be rejected correctly")
-                               }
-                               break another
-                       }
+                       more := c.sendChunk(r, msg, state)
                        delete(c.peerRequests, r)
                        if !more {
                                return false
@@ -1533,6 +1579,8 @@ func (c *peer) deleteAllRequests() {
        // }
 }
 
+// This is called when something has changed that should wake the writer, such as putting stuff into
+// the writeBuffer, or changing some state that the writer can act on.
 func (c *PeerConn) tickleWriter() {
        c.writerCond.Broadcast()
 }
@@ -1549,26 +1597,14 @@ func (c *PeerConn) _postCancel(r request) {
        c.post(makeCancelMessage(r))
 }
 
-func (c *PeerConn) sendChunk(r request, msg func(pp.Message) bool) (more bool, err error) {
-       b := make([]byte, r.Length)
-       p := c.t.info.Piece(int(r.Index))
-       n, err := c.t.readAt(b, p.Offset()+int64(r.Begin))
-       if n != len(b) {
-               if err == nil {
-                       panic("expected error")
-               }
-               return
-       } else if err == io.EOF {
-               err = nil
-       }
-       more = msg(pp.Message{
+func (c *PeerConn) sendChunk(r request, msg func(pp.Message) bool, state *peerRequestState) (more bool) {
+       c.lastChunkSent = time.Now()
+       return msg(pp.Message{
                Type:  pp.Piece,
                Index: r.Index,
                Begin: r.Begin,
-               Piece: b,
+               Piece: state.data,
        })
-       c.lastChunkSent = time.Now()
-       return
 }
 
 func (c *PeerConn) setTorrent(t *Torrent) {