From 0d40c4bac23ba0545ef47cb65741bcf877e8c6a0 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Fri, 13 Nov 2020 15:50:08 +1100 Subject: [PATCH] Read peer request data without Client lock --- client.go | 7 ++- peerconn.go | 140 +++++++++++++++++++++++++++++++++------------------- 2 files changed, 93 insertions(+), 54 deletions(-) diff --git a/client.go b/client.go index 2ca1661d..d0bd6869 100644 --- a/client.go +++ b/client.go @@ -959,8 +959,11 @@ func (cl *Client) sendInitialMessages(conn *PeerConn, torrent *Torrent) { M: map[pp.ExtensionName]pp.ExtensionNumber{ pp.ExtensionNameMetadata: metadataExtendedId, }, - V: cl.config.ExtendedHandshakeClientVersion, - Reqq: 64, // TODO: Really? + V: cl.config.ExtendedHandshakeClientVersion, + // If peer requests are buffered on read, this instructs the amount of memory + // that might be used to cache pending writes. Assuming 512KiB cached for + // sending, for 16KiB chunks. + Reqq: 1 << 5, YourIp: pp.CompactIp(addrIpOrNil(conn.RemoteAddr)), Encryption: cl.config.HeaderObfuscationPolicy.Preferred || !cl.config.HeaderObfuscationPolicy.RequirePreferred, Port: cl.incomingPeerPort(), diff --git a/peerconn.go b/peerconn.go index 50c86ad6..633e54ce 100644 --- a/peerconn.go +++ b/peerconn.go @@ -38,6 +38,10 @@ const ( PeerSourceDirect = "M" ) +type peerRequestState struct { + data []byte +} + type peer struct { // First to ensure 64-bit alignment for atomics. See #262. _stats ConnStats @@ -88,7 +92,7 @@ type peer struct { // Stuff controlled by the remote peer. peerInterested bool peerChoking bool - peerRequests map[request]struct{} + peerRequests map[request]*peerRequestState PeerPrefersEncryption bool // as indicated by 'e' field in extension handshake PeerListenPort int // The pieces the peer has claimed to have. @@ -365,17 +369,23 @@ func (cn *peer) peerHasPiece(piece pieceIndex) bool { return cn.peerSentHaveAll || cn._peerPieces.Contains(bitmap.BitIndex(piece)) } -// Writes a message into the write buffer. -func (cn *PeerConn) post(msg pp.Message) { +// 64KiB, but temporarily less to work around an issue with WebRTC. TODO: Update when +// https://github.com/pion/datachannel/issues/59 is fixed. +const writeBufferHighWaterLen = 1 << 15 + +// Writes a message into the write buffer. Returns whether it's okay to keep writing. Posting is +// done asynchronously, so it may be that we're not able to honour backpressure from this method. It +// might be possible to merge this with PeerConn.write down the track? They seem to be very similar. +func (cn *PeerConn) post(msg pp.Message) bool { torrent.Add(fmt.Sprintf("messages posted of type %s", msg.Type.String()), 1) - // We don't need to track bytes here because a connection.w Writer wrapper - // takes care of that (although there's some delay between us recording - // the message, and the connection writer flushing it out.). + // We don't need to track bytes here because a connection.w Writer wrapper takes care of that + // (although there's some delay between us recording the message, and the connection writer + // flushing it out.). cn.writeBuffer.Write(msg.MustMarshalBinary()) - // Last I checked only Piece messages affect stats, and we don't post - // those. + // Last I checked only Piece messages affect stats, and we don't post those. cn.wroteMsg(&msg) cn.tickleWriter() + return cn.writeBuffer.Len() < writeBufferHighWaterLen } // Returns true if there's room to write more. @@ -383,9 +393,7 @@ func (cn *PeerConn) write(msg pp.Message) bool { cn.wroteMsg(&msg) cn.writeBuffer.Write(msg.MustMarshalBinary()) torrent.Add(fmt.Sprintf("messages filled of type %s", msg.Type.String()), 1) - // 64KiB, but temporarily less to work around an issue with WebRTC. TODO: Update - // when https://github.com/pion/datachannel/issues/59 is fixed. - return cn.writeBuffer.Len() < 1<<15 + return cn.writeBuffer.Len() < writeBufferHighWaterLen } func (cn *PeerConn) requestMetadataPiece(index int) { @@ -1032,13 +1040,68 @@ func (c *PeerConn) onReadRequest(r request) error { return errors.New("bad request") } if c.peerRequests == nil { - c.peerRequests = make(map[request]struct{}, maxRequests) + c.peerRequests = make(map[request]*peerRequestState, maxRequests) } - c.peerRequests[r] = struct{}{} - c.tickleWriter() + value := &peerRequestState{} + c.peerRequests[r] = value + go c.peerRequestDataReader(r, value) + //c.tickleWriter() return nil } +func (c *PeerConn) peerRequestDataReader(r request, prs *peerRequestState) { + b, err := readPeerRequestData(r, c) + c.locker().Lock() + defer c.locker().Unlock() + if err != nil { + c.peerRequestDataReadFailed(err, r) + } else { + if b == nil { + panic("data must be non-nil to trigger send") + } + prs.data = b + c.tickleWriter() + } +} + +// If this is maintained correctly, we might be able to support optional synchronous reading for +// chunk sending, the way it used to work. +func (c *PeerConn) peerRequestDataReadFailed(err error, r request) { + c.logger.WithDefaultLevel(log.Warning).Printf("error reading chunk for peer request %v: %v", r, err) + i := pieceIndex(r.Index) + if c.t.pieceComplete(i) { + // There used to be more code here that just duplicated the following break. Piece + // completions are currently cached, so I'm not sure how helpful this update is, except to + // pull any completion changes pushed to the storage backend in failed reads that got us + // here. + c.t.updatePieceCompletion(i) + } + // If we failed to send a chunk, choke the peer to ensure they flush all their requests. We've + // probably dropped a piece from storage, but there's no way to communicate this to the peer. If + // they ask for it again, we'll kick them to allow us to send them an updated bitfield on the + // next connect. TODO: Support rejecting here too. + if c.choking { + c.logger.WithDefaultLevel(log.Warning).Printf("already choking peer, requests might not be rejected correctly") + } + c.choke(c.post) +} + +func readPeerRequestData(r request, c *PeerConn) ([]byte, error) { + b := make([]byte, r.Length) + p := c.t.info.Piece(int(r.Index)) + n, err := c.t.readAt(b, p.Offset()+int64(r.Begin)) + if n == len(b) { + if err == io.EOF { + err = nil + } + } else { + if err == nil { + panic("expected error") + } + } + return b, err +} + func runSafeExtraneous(f func()) { if true { go f() @@ -1422,7 +1485,10 @@ another: if !c.unchoke(msg) { return false } - for r := range c.peerRequests { + for r, state := range c.peerRequests { + if state.data == nil { + continue + } res := c.t.cl.config.UploadRateLimiter.ReserveN(time.Now(), int(r.Length)) if !res.OK() { panic(fmt.Sprintf("upload rate limiter burst size < %d", r.Length)) @@ -1434,27 +1500,7 @@ another: // Hard to say what to return here. return true } - more, err := c.sendChunk(r, msg) - if err != nil { - c.logger.WithDefaultLevel(log.Warning).Printf("sending chunk to peer: %v", err) - i := pieceIndex(r.Index) - if c.t.pieceComplete(i) { - // There used to be more code here that just duplicated the following break. - // Piece completions are currently cached, so I'm not sure how helpful this - // update is, except to pull any completion changes pushed to the storage - // backend in failed reads that got us here. - c.t.updatePieceCompletion(i) - } - // If we failed to send a chunk, choke the peer by breaking out of the loop here to - // ensure they flush all their requests. We've probably dropped a piece from - // storage, but there's no way to communicate this to the peer. If they ask for it - // again, we'll kick them to allow us to send them an updated bitfield on the next - // connect. - if c.choking { - c.logger.WithDefaultLevel(log.Warning).Printf("already choking peer, requests might not be rejected correctly") - } - break another - } + more := c.sendChunk(r, msg, state) delete(c.peerRequests, r) if !more { return false @@ -1533,6 +1579,8 @@ func (c *peer) deleteAllRequests() { // } } +// This is called when something has changed that should wake the writer, such as putting stuff into +// the writeBuffer, or changing some state that the writer can act on. func (c *PeerConn) tickleWriter() { c.writerCond.Broadcast() } @@ -1549,26 +1597,14 @@ func (c *PeerConn) _postCancel(r request) { c.post(makeCancelMessage(r)) } -func (c *PeerConn) sendChunk(r request, msg func(pp.Message) bool) (more bool, err error) { - b := make([]byte, r.Length) - p := c.t.info.Piece(int(r.Index)) - n, err := c.t.readAt(b, p.Offset()+int64(r.Begin)) - if n != len(b) { - if err == nil { - panic("expected error") - } - return - } else if err == io.EOF { - err = nil - } - more = msg(pp.Message{ +func (c *PeerConn) sendChunk(r request, msg func(pp.Message) bool, state *peerRequestState) (more bool) { + c.lastChunkSent = time.Now() + return msg(pp.Message{ Type: pp.Piece, Index: r.Index, Begin: r.Begin, - Piece: b, + Piece: state.data, }) - c.lastChunkSent = time.Now() - return } func (c *PeerConn) setTorrent(t *Torrent) { -- 2.48.1