import (
"bufio"
- "bytes"
"context"
"crypto/rand"
"encoding/binary"
"github.com/anacrolix/missinggo/perf"
"github.com/anacrolix/missinggo/pubsub"
"github.com/anacrolix/missinggo/slices"
+ "github.com/anacrolix/missinggo/v2"
"github.com/anacrolix/missinggo/v2/bitmap"
+ "github.com/anacrolix/missinggo/v2/conntrack"
"github.com/anacrolix/missinggo/v2/pproffd"
"github.com/anacrolix/sync"
- "github.com/anacrolix/torrent/internal/limiter"
- request_strategy "github.com/anacrolix/torrent/request-strategy"
- "github.com/anacrolix/torrent/tracker"
- "github.com/anacrolix/torrent/webtorrent"
"github.com/davecgh/go-spew/spew"
"github.com/dustin/go-humanize"
"github.com/google/btree"
"golang.org/x/time/rate"
"golang.org/x/xerrors"
- "github.com/anacrolix/missinggo/v2"
- "github.com/anacrolix/missinggo/v2/conntrack"
-
"github.com/anacrolix/torrent/bencode"
+ "github.com/anacrolix/torrent/internal/limiter"
"github.com/anacrolix/torrent/iplist"
"github.com/anacrolix/torrent/metainfo"
"github.com/anacrolix/torrent/mse"
pp "github.com/anacrolix/torrent/peer_protocol"
+ request_strategy "github.com/anacrolix/torrent/request-strategy"
"github.com/anacrolix/torrent/storage"
+ "github.com/anacrolix/torrent/tracker"
+ "github.com/anacrolix/torrent/webtorrent"
)
// Clients contain zero or more Torrents. A Client manages a blocklist, the
return nil
}
-func (pc *PeerConn) startWriter() {
- w := &pc.messageWriter
- *w = peerConnWriter{
- fillWriteBuffer: func() {
- pc.locker().Lock()
- defer pc.locker().Unlock()
- pc.fillWriteBuffer()
- },
- closed: &pc.closed,
- logger: pc.logger,
- w: pc.w,
- keepAlive: func() bool {
- pc.locker().Lock()
- defer pc.locker().Unlock()
- return pc.useful()
- },
- writeBuffer: new(bytes.Buffer),
- }
- go func() {
- defer pc.locker().Unlock()
- defer pc.close()
- defer pc.locker().Lock()
- pc.messageWriter.run(time.Minute)
- }()
-}
-
// Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
// instructs the amount of memory that might be used to cache pending writes. Assuming 512KiB
// (1<<19) cached for sending, for 16KiB (1<<14) chunks.
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/missinggo/v2/prioritybitmap"
"github.com/anacrolix/multiless"
- "github.com/anacrolix/sync"
"github.com/anacrolix/torrent/bencode"
"github.com/anacrolix/torrent/internal/chansync"
w io.Writer
r io.Reader
- messageWriter peerConnWriter
+ messageWriter peerConnMsgWriter
uploadTimer *time.Timer
pex pexConnState
return notFull
}
-func (cn *peerConnWriter) write(msg pp.Message) bool {
+func (cn *peerConnMsgWriter) write(msg pp.Message) bool {
cn.mu.Lock()
defer cn.mu.Unlock()
cn.writeBuffer.Write(msg.MustMarshalBinary())
return !cn.writeBufferFull()
}
-func (cn *peerConnWriter) writeBufferFull() bool {
+func (cn *peerConnMsgWriter) writeBufferFull() bool {
return cn.writeBuffer.Len() >= writeBufferHighWaterLen
}
cn.upload(cn.write)
}
-type peerConnWriter struct {
- // Must not be called with the local mutex held, as it will call back into the write method.
- fillWriteBuffer func()
- closed *chansync.SetOnce
- logger log.Logger
- w io.Writer
- keepAlive func() bool
-
- mu sync.Mutex
- writeCond chansync.BroadcastCond
- // Pointer so we can swap with the "front buffer".
- writeBuffer *bytes.Buffer
-}
-
-// Routine that writes to the peer. Some of what to write is buffered by
-// activity elsewhere in the Client, and some is determined locally when the
-// connection is writable.
-func (cn *peerConnWriter) run(keepAliveTimeout time.Duration) {
- var (
- lastWrite time.Time = time.Now()
- keepAliveTimer *time.Timer
- )
- keepAliveTimer = time.AfterFunc(keepAliveTimeout, func() {
- cn.mu.Lock()
- defer cn.mu.Unlock()
- if time.Since(lastWrite) >= keepAliveTimeout {
- cn.writeCond.Broadcast()
- }
- keepAliveTimer.Reset(keepAliveTimeout)
- })
- cn.mu.Lock()
- defer cn.mu.Unlock()
- defer keepAliveTimer.Stop()
- frontBuf := new(bytes.Buffer)
- for {
- if cn.closed.IsSet() {
- return
- }
- if cn.writeBuffer.Len() == 0 {
- func() {
- cn.mu.Unlock()
- defer cn.mu.Lock()
- cn.fillWriteBuffer()
- }()
- }
- if cn.writeBuffer.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout && cn.keepAlive() {
- cn.writeBuffer.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
- torrent.Add("written keepalives", 1)
- }
- if cn.writeBuffer.Len() == 0 {
- writeCond := cn.writeCond.WaitChan()
- cn.mu.Unlock()
- select {
- case <-cn.closed.Chan():
- case <-writeCond:
- }
- cn.mu.Lock()
- continue
- }
- // Flip the buffers.
- frontBuf, cn.writeBuffer = cn.writeBuffer, frontBuf
- cn.mu.Unlock()
- n, err := cn.w.Write(frontBuf.Bytes())
- cn.mu.Lock()
- if n != 0 {
- lastWrite = time.Now()
- keepAliveTimer.Reset(keepAliveTimeout)
- }
- if err != nil {
- cn.logger.WithDefaultLevel(log.Debug).Printf("error writing: %v", err)
- return
- }
- if n != frontBuf.Len() {
- panic("short write")
- }
- frontBuf.Reset()
- }
-}
-
func (cn *PeerConn) have(piece pieceIndex) {
if cn.sentHaves.Get(bitmap.BitIndex(piece)) {
return