]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Move peerConnMsgWriter into its own file
authorMatt Joiner <anacrolix@gmail.com>
Thu, 20 May 2021 08:55:23 +0000 (18:55 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 7 Jun 2021 03:01:40 +0000 (13:01 +1000)
client.go
peerconn.go

index 31dbf4ce3d7803bde3dba96247004870d0564ddd..7ac6b264d65aaed0df3b70243fa14bac52419406 100644 (file)
--- a/client.go
+++ b/client.go
@@ -2,7 +2,6 @@ package torrent
 
 import (
        "bufio"
-       "bytes"
        "context"
        "crypto/rand"
        "encoding/binary"
@@ -21,13 +20,11 @@ import (
        "github.com/anacrolix/missinggo/perf"
        "github.com/anacrolix/missinggo/pubsub"
        "github.com/anacrolix/missinggo/slices"
+       "github.com/anacrolix/missinggo/v2"
        "github.com/anacrolix/missinggo/v2/bitmap"
+       "github.com/anacrolix/missinggo/v2/conntrack"
        "github.com/anacrolix/missinggo/v2/pproffd"
        "github.com/anacrolix/sync"
-       "github.com/anacrolix/torrent/internal/limiter"
-       request_strategy "github.com/anacrolix/torrent/request-strategy"
-       "github.com/anacrolix/torrent/tracker"
-       "github.com/anacrolix/torrent/webtorrent"
        "github.com/davecgh/go-spew/spew"
        "github.com/dustin/go-humanize"
        "github.com/google/btree"
@@ -35,15 +32,16 @@ import (
        "golang.org/x/time/rate"
        "golang.org/x/xerrors"
 
-       "github.com/anacrolix/missinggo/v2"
-       "github.com/anacrolix/missinggo/v2/conntrack"
-
        "github.com/anacrolix/torrent/bencode"
+       "github.com/anacrolix/torrent/internal/limiter"
        "github.com/anacrolix/torrent/iplist"
        "github.com/anacrolix/torrent/metainfo"
        "github.com/anacrolix/torrent/mse"
        pp "github.com/anacrolix/torrent/peer_protocol"
+       request_strategy "github.com/anacrolix/torrent/request-strategy"
        "github.com/anacrolix/torrent/storage"
+       "github.com/anacrolix/torrent/tracker"
+       "github.com/anacrolix/torrent/webtorrent"
 )
 
 // Clients contain zero or more Torrents. A Client manages a blocklist, the
@@ -969,32 +967,6 @@ func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
        return nil
 }
 
-func (pc *PeerConn) startWriter() {
-       w := &pc.messageWriter
-       *w = peerConnWriter{
-               fillWriteBuffer: func() {
-                       pc.locker().Lock()
-                       defer pc.locker().Unlock()
-                       pc.fillWriteBuffer()
-               },
-               closed: &pc.closed,
-               logger: pc.logger,
-               w:      pc.w,
-               keepAlive: func() bool {
-                       pc.locker().Lock()
-                       defer pc.locker().Unlock()
-                       return pc.useful()
-               },
-               writeBuffer: new(bytes.Buffer),
-       }
-       go func() {
-               defer pc.locker().Unlock()
-               defer pc.close()
-               defer pc.locker().Lock()
-               pc.messageWriter.run(time.Minute)
-       }()
-}
-
 // Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
 // instructs the amount of memory that might be used to cache pending writes. Assuming 512KiB
 // (1<<19) cached for sending, for 16KiB (1<<14) chunks.
index 9cddf7d77590a4a0a07d995a6a0e750d176aa1e1..b2753d6a88d741e266d2e8ee2982df2c2f193822 100644 (file)
@@ -18,7 +18,6 @@ import (
        "github.com/anacrolix/missinggo/v2/bitmap"
        "github.com/anacrolix/missinggo/v2/prioritybitmap"
        "github.com/anacrolix/multiless"
-       "github.com/anacrolix/sync"
 
        "github.com/anacrolix/torrent/bencode"
        "github.com/anacrolix/torrent/internal/chansync"
@@ -148,7 +147,7 @@ type PeerConn struct {
        w io.Writer
        r io.Reader
 
-       messageWriter peerConnWriter
+       messageWriter peerConnMsgWriter
 
        uploadTimer *time.Timer
        pex         pexConnState
@@ -438,7 +437,7 @@ func (cn *PeerConn) write(msg pp.Message) bool {
        return notFull
 }
 
-func (cn *peerConnWriter) write(msg pp.Message) bool {
+func (cn *peerConnMsgWriter) write(msg pp.Message) bool {
        cn.mu.Lock()
        defer cn.mu.Unlock()
        cn.writeBuffer.Write(msg.MustMarshalBinary())
@@ -446,7 +445,7 @@ func (cn *peerConnWriter) write(msg pp.Message) bool {
        return !cn.writeBufferFull()
 }
 
-func (cn *peerConnWriter) writeBufferFull() bool {
+func (cn *peerConnMsgWriter) writeBufferFull() bool {
        return cn.writeBuffer.Len() >= writeBufferHighWaterLen
 }
 
@@ -643,85 +642,6 @@ func (cn *PeerConn) fillWriteBuffer() {
        cn.upload(cn.write)
 }
 
-type peerConnWriter struct {
-       // Must not be called with the local mutex held, as it will call back into the write method.
-       fillWriteBuffer func()
-       closed          *chansync.SetOnce
-       logger          log.Logger
-       w               io.Writer
-       keepAlive       func() bool
-
-       mu        sync.Mutex
-       writeCond chansync.BroadcastCond
-       // Pointer so we can swap with the "front buffer".
-       writeBuffer *bytes.Buffer
-}
-
-// Routine that writes to the peer. Some of what to write is buffered by
-// activity elsewhere in the Client, and some is determined locally when the
-// connection is writable.
-func (cn *peerConnWriter) run(keepAliveTimeout time.Duration) {
-       var (
-               lastWrite      time.Time = time.Now()
-               keepAliveTimer *time.Timer
-       )
-       keepAliveTimer = time.AfterFunc(keepAliveTimeout, func() {
-               cn.mu.Lock()
-               defer cn.mu.Unlock()
-               if time.Since(lastWrite) >= keepAliveTimeout {
-                       cn.writeCond.Broadcast()
-               }
-               keepAliveTimer.Reset(keepAliveTimeout)
-       })
-       cn.mu.Lock()
-       defer cn.mu.Unlock()
-       defer keepAliveTimer.Stop()
-       frontBuf := new(bytes.Buffer)
-       for {
-               if cn.closed.IsSet() {
-                       return
-               }
-               if cn.writeBuffer.Len() == 0 {
-                       func() {
-                               cn.mu.Unlock()
-                               defer cn.mu.Lock()
-                               cn.fillWriteBuffer()
-                       }()
-               }
-               if cn.writeBuffer.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout && cn.keepAlive() {
-                       cn.writeBuffer.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
-                       torrent.Add("written keepalives", 1)
-               }
-               if cn.writeBuffer.Len() == 0 {
-                       writeCond := cn.writeCond.WaitChan()
-                       cn.mu.Unlock()
-                       select {
-                       case <-cn.closed.Chan():
-                       case <-writeCond:
-                       }
-                       cn.mu.Lock()
-                       continue
-               }
-               // Flip the buffers.
-               frontBuf, cn.writeBuffer = cn.writeBuffer, frontBuf
-               cn.mu.Unlock()
-               n, err := cn.w.Write(frontBuf.Bytes())
-               cn.mu.Lock()
-               if n != 0 {
-                       lastWrite = time.Now()
-                       keepAliveTimer.Reset(keepAliveTimeout)
-               }
-               if err != nil {
-                       cn.logger.WithDefaultLevel(log.Debug).Printf("error writing: %v", err)
-                       return
-               }
-               if n != frontBuf.Len() {
-                       panic("short write")
-               }
-               frontBuf.Reset()
-       }
-}
-
 func (cn *PeerConn) have(piece pieceIndex) {
        if cn.sentHaves.Get(bitmap.BitIndex(piece)) {
                return