return fmt.Errorf("adding connection: %w", err)
}
defer t.dropConnection(c)
- go c.writer(time.Minute)
+ c.startWriter()
cl.sendInitialMessages(c, t)
err := c.mainReadLoop()
if err != nil {
return nil
}
+func (pc *PeerConn) startWriter() {
+ w := &pc.messageWriter
+ *w = peerConnWriter{
+ fillWriteBuffer: func() {
+ pc.locker().Lock()
+ defer pc.locker().Unlock()
+ pc.fillWriteBuffer()
+ },
+ closed: &pc.closed,
+ logger: pc.logger,
+ w: pc.w,
+ keepAlive: func() bool {
+ pc.locker().Lock()
+ defer pc.locker().Unlock()
+ return pc.useful()
+ },
+ writeBuffer: new(bytes.Buffer),
+ }
+ go func() {
+ defer pc.locker().Unlock()
+ defer pc.close()
+ defer pc.locker().Lock()
+ pc.messageWriter.run(time.Minute)
+ }()
+}
+
// Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
// instructs the amount of memory that might be used to cache pending writes. Assuming 512KiB
// (1<<19) cached for sending, for 16KiB (1<<14) chunks.
Network: network,
callbacks: &cl.config.Callbacks,
},
- connString: connString,
- conn: nc,
- writeBuffer: new(bytes.Buffer),
+ connString: connString,
+ conn: nc,
}
c.peerImpl = c
c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextValue(c)
- c.writerCond.L = cl.locker()
c.setRW(connStatsReadWriter{nc, c})
c.r = &rateLimitedReader{
l: cl.config.DownloadRateLimiter,
github.com/anacrolix/sync v0.0.0-20180808010631-44578de4e778/go.mod h1:s735Etp3joe/voe2sdaXLcqDdJSay1O0OPnM0ystjqk=
github.com/anacrolix/sync v0.2.0 h1:oRe22/ZB+v7v/5Mbc4d2zE0AXEZy0trKyKLjqYOt6tY=
github.com/anacrolix/sync v0.2.0/go.mod h1:BbecHL6jDSExojhNtgTFSBcdGerzNc64tz3DCOj/I0g=
+github.com/anacrolix/sync v0.2.1-0.20210520084835-26aa6614542f h1:7KqmZoEOIXa0UbR2WQ/YPF4H+MPV6rhWk4E4tcv5eDg=
+github.com/anacrolix/sync v0.2.1-0.20210520084835-26aa6614542f/go.mod h1:BbecHL6jDSExojhNtgTFSBcdGerzNc64tz3DCOj/I0g=
+github.com/anacrolix/sync v0.3.0 h1:ZPjTrkqQWEfnYVGTQHh5qNjokWaXnjsyXTJSMsKY0TA=
+github.com/anacrolix/sync v0.3.0/go.mod h1:BbecHL6jDSExojhNtgTFSBcdGerzNc64tz3DCOj/I0g=
github.com/anacrolix/tagflag v0.0.0-20180109131632-2146c8d41bf0/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw=
github.com/anacrolix/tagflag v0.0.0-20180605133421-f477c8c2f14c/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw=
github.com/anacrolix/tagflag v0.0.0-20180803105420-3a8ff5428f76/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
+github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/logutils v1.0.0 h1:dLEQVugN8vlakKOUE3ihGLTZJRB4j+M2cdTm/ORI65Y=
github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
"sort"
"strconv"
"strings"
- "sync"
"time"
"github.com/anacrolix/log"
- "github.com/anacrolix/missinggo"
"github.com/anacrolix/missinggo/iter"
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/missinggo/v2/prioritybitmap"
"github.com/anacrolix/multiless"
- "github.com/anacrolix/torrent/metainfo"
+ "github.com/anacrolix/sync"
"github.com/anacrolix/torrent/bencode"
+ "github.com/anacrolix/torrent/internal/chansync"
+ "github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/mse"
pp "github.com/anacrolix/torrent/peer_protocol"
)
cryptoMethod mse.CryptoMethod
Discovery PeerSource
trusted bool
- closed missinggo.Event
+ closed chansync.SetOnce
// Set true after we've added our ConnStats generated during handshake to
// other ConnStat instances as determined when the *Torrent became known.
reconciledHandshakeStats bool
w io.Writer
r io.Reader
- writeBuffer *bytes.Buffer
- uploadTimer *time.Timer
- writerCond sync.Cond
+ messageWriter peerConnWriter
- pex pexConnState
+ uploadTimer *time.Timer
+ pex pexConnState
}
func (cn *PeerConn) connStatusString() string {
// done asynchronously, so it may be that we're not able to honour backpressure from this method.
func (cn *PeerConn) write(msg pp.Message) bool {
torrent.Add(fmt.Sprintf("messages written of type %s", msg.Type.String()), 1)
- // We don't need to track bytes here because a connection.w Writer wrapper takes care of that
- // (although there's some delay between us recording the message, and the connection writer
+ // We don't need to track bytes here because the connection's Writer has that behaviour injected
+ // (although there's some delay between us buffering the message, and the connection writer
// flushing it out.).
- cn.writeBuffer.Write(msg.MustMarshalBinary())
+ notFull := cn.messageWriter.write(msg)
// Last I checked only Piece messages affect stats, and we don't write those.
cn.wroteMsg(&msg)
cn.tickleWriter()
+ return notFull
+}
+
+func (cn *peerConnWriter) write(msg pp.Message) bool {
+ cn.mu.Lock()
+ defer cn.mu.Unlock()
+ cn.writeBuffer.Write(msg.MustMarshalBinary())
+ cn.writeCond.Broadcast()
return !cn.writeBufferFull()
}
-func (cn *PeerConn) writeBufferFull() bool {
+func (cn *peerConnWriter) writeBufferFull() bool {
return cn.writeBuffer.Len() >= writeBufferHighWaterLen
}
cn.upload(cn.write)
}
+type peerConnWriter struct {
+ // Must not be called with the local mutex held, as it will call back into the write method.
+ fillWriteBuffer func()
+ closed *chansync.SetOnce
+ logger log.Logger
+ w io.Writer
+ keepAlive func() bool
+
+ mu sync.Mutex
+ writeCond chansync.BroadcastCond
+ // Pointer so we can swap with the "front buffer".
+ writeBuffer *bytes.Buffer
+}
+
// Routine that writes to the peer. Some of what to write is buffered by
// activity elsewhere in the Client, and some is determined locally when the
// connection is writable.
-func (cn *PeerConn) writer(keepAliveTimeout time.Duration) {
+func (cn *peerConnWriter) run(keepAliveTimeout time.Duration) {
var (
lastWrite time.Time = time.Now()
keepAliveTimer *time.Timer
)
keepAliveTimer = time.AfterFunc(keepAliveTimeout, func() {
- cn.locker().Lock()
- defer cn.locker().Unlock()
+ cn.mu.Lock()
+ defer cn.mu.Unlock()
if time.Since(lastWrite) >= keepAliveTimeout {
- cn.tickleWriter()
+ cn.writeCond.Broadcast()
}
keepAliveTimer.Reset(keepAliveTimeout)
})
- cn.locker().Lock()
- defer cn.locker().Unlock()
- defer cn.close()
+ cn.mu.Lock()
+ defer cn.mu.Unlock()
defer keepAliveTimer.Stop()
frontBuf := new(bytes.Buffer)
for {
return
}
if cn.writeBuffer.Len() == 0 {
- cn.fillWriteBuffer()
+ func() {
+ cn.mu.Unlock()
+ defer cn.mu.Lock()
+ cn.fillWriteBuffer()
+ }()
}
- if cn.writeBuffer.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout && cn.useful() {
+ if cn.writeBuffer.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout && cn.keepAlive() {
cn.writeBuffer.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
torrent.Add("written keepalives", 1)
}
if cn.writeBuffer.Len() == 0 {
- // TODO: Minimize wakeups....
- cn.writerCond.Wait()
+ writeCond := cn.writeCond.WaitChan()
+ cn.mu.Unlock()
+ select {
+ case <-cn.closed.Chan():
+ case <-writeCond:
+ }
+ cn.mu.Lock()
continue
}
// Flip the buffers.
frontBuf, cn.writeBuffer = cn.writeBuffer, frontBuf
- cn.locker().Unlock()
+ cn.mu.Unlock()
n, err := cn.w.Write(frontBuf.Bytes())
- cn.locker().Lock()
+ cn.mu.Lock()
if n != 0 {
lastWrite = time.Now()
keepAliveTimer.Reset(keepAliveTimeout)
func (c *PeerConn) setRetryUploadTimer(delay time.Duration) {
if c.uploadTimer == nil {
- c.uploadTimer = time.AfterFunc(delay, c.writerCond.Broadcast)
+ c.uploadTimer = time.AfterFunc(delay, c.tickleWriter)
} else {
c.uploadTimer.Reset(delay)
}
// This is called when something has changed that should wake the writer, such as putting stuff into
// the writeBuffer, or changing some state that the writer can act on.
func (c *PeerConn) tickleWriter() {
- c.writerCond.Broadcast()
+ c.messageWriter.writeCond.Broadcast()
}
func (c *PeerConn) sendChunk(r Request, msg func(pp.Message) bool, state *peerRequestState) (more bool) {