c.bannableAddr = Some(netipAddrPort.Addr())
}
}
+ c.legacyPeerImpl = c
c.peerImpl = c
c.logger = cl.logger.WithDefaultLevel(log.Warning).WithContextText(fmt.Sprintf("%T %p", c, c))
c.protocolLogger = c.logger.WithNames(protocolLoggingName)
callbacks: &cfg.Callbacks,
}}
tt.conns[cn] = struct{}{}
- cn.peerImpl = cn
+ cn.legacyPeerImpl = cn
cl.lock()
defer cl.unlock()
assert.NoError(t, cn.peerSentHave(0))
defer pc.locker().RUnlock()
return pc.useful()
},
- writeBuffer: new(bytes.Buffer),
+ writeBuffer: new(peerConnMsgWriterBuffer),
}
}
pc.messageWriter.run(pc.t.cl.config.KeepAliveTimeout)
}
+type peerConnMsgWriterBuffer struct {
+ // The number of bytes in the buffer that are part of a piece message. When
+ // the whole buffer is written, we can count this many bytes.
+ pieceDataBytes int
+ bytes.Buffer
+}
+
type peerConnMsgWriter struct {
// Must not be called with the local mutex held, as it will call back into the write method.
fillWriteBuffer func()
mu sync.Mutex
writeCond chansync.BroadcastCond
// Pointer so we can swap with the "front buffer".
- writeBuffer *bytes.Buffer
+ writeBuffer *peerConnMsgWriterBuffer
+
+ totalWriteDuration time.Duration
+ totalBytesWritten int64
+ totalDataBytesWritten int64
+ dataUploadRate float64
}
// Routine that writes to the peer. Some of what to write is buffered by
func (cn *peerConnMsgWriter) run(keepAliveTimeout time.Duration) {
lastWrite := time.Now()
keepAliveTimer := time.NewTimer(keepAliveTimeout)
- frontBuf := new(bytes.Buffer)
+ frontBuf := new(peerConnMsgWriterBuffer)
for {
if cn.closed.IsSet() {
return
panic("expected non-empty front buffer")
}
var err error
+ startedWriting := time.Now()
+ startingBufLen := frontBuf.Len()
for frontBuf.Len() != 0 {
next := frontBuf.Bytes()
var n int
cn.logger.WithDefaultLevel(log.Debug).Printf("error writing: %v", err)
return
}
+ // Track what was sent and how long it took.
+ writeDuration := time.Since(startedWriting)
+ cn.mu.Lock()
+ cn.dataUploadRate = float64(frontBuf.pieceDataBytes) / writeDuration.Seconds()
+ cn.totalWriteDuration += writeDuration
+ cn.totalBytesWritten += int64(startingBufLen)
+ cn.totalDataBytesWritten += int64(frontBuf.pieceDataBytes)
+ cn.mu.Unlock()
+ frontBuf.pieceDataBytes = 0
lastWrite = time.Now()
keepAliveTimer.Reset(keepAliveTimeout)
}
cn.writeBuffer.Truncate(originalLen)
}
}()
- return msg.WriteTo(cn.writeBuffer)
+ err = msg.WriteTo(cn.writeBuffer)
+ if err == nil {
+ cn.writeBuffer.pieceDataBytes += len(msg.Piece)
+ }
+ return
}
func (cn *peerConnMsgWriter) write(msg pp.Message) bool {
package torrent
import (
- "bytes"
"testing"
"github.com/dustin/go-humanize"
func runBenchmarkWriteToBuffer(b *testing.B, length int64) {
writer := &peerConnMsgWriter{
- writeBuffer: &bytes.Buffer{},
+ writeBuffer: new(peerConnMsgWriterBuffer),
}
msg := PieceMsg(length)
func runBenchmarkMarshalBinaryWrite(b *testing.B, length int64) {
writer := &peerConnMsgWriter{
- writeBuffer: &bytes.Buffer{},
+ writeBuffer: &peerConnMsgWriterBuffer{},
}
msg := PieceMsg(length)
)
// Contains implementation details that differ between peer types, like Webseeds and regular
-// BitTorrent protocol connections. Some methods are underlined so as to avoid collisions with
-// legacy PeerConn methods.
-type peerImpl interface {
+// BitTorrent protocol connections. These methods are embedded in the child types of Peer for legacy
+// expectations that they exist on the child type. Some methods are underlined to avoid collisions
+// with legacy PeerConn methods. New methods and calls that are fixed up should be migrated over to
+// newHotPeerImpl.
+type legacyPeerImpl interface {
// Trigger the actual request state to get updated
handleUpdateRequests()
writeInterested(interested bool) bool
peerHasAllPieces() (all, known bool)
peerPieces() *roaring.Bitmap
}
+
+// Abstract methods implemented by subclasses of Peer.
+type newHotPeerImpl interface {
+ lastWriteUploadRate() float64
+}
--- /dev/null
+package torrent
+
+type PeerStats struct {
+ ConnStats
+
+ DownloadRate float64
+ LastWriteUploadRate float64
+ // How many pieces the peer has.
+ RemotePieceCount int
+}
t *Torrent
- peerImpl
+ legacyPeerImpl
+ peerImpl newHotPeerImpl
callbacks *Callbacks
outgoing bool
return p.t
}
+func (p *Peer) Stats() (ret PeerStats) {
+ p.locker().RLock()
+ defer p.locker().RUnlock()
+ ret.ConnStats = p._stats.Copy()
+ ret.DownloadRate = p.downloadRate()
+ ret.LastWriteUploadRate = p.peerImpl.lastWriteUploadRate()
+ ret.RemotePieceCount = p.remotePieceCount()
+ return
+}
+
func (p *Peer) initRequestState() {
p.requestState.Requests = &peerRequests{}
}
return cn.peerMinPieces
}
-func (cn *Peer) completedString() string {
+// How many pieces we think the peer has.
+func (cn *Peer) remotePieceCount() pieceIndex {
have := pieceIndex(cn.peerPieces().GetCardinality())
if all, _ := cn.peerHasAllPieces(); all {
have = cn.bestPeerNumPieces()
}
- return fmt.Sprintf("%d/%d", have, cn.bestPeerNumPieces())
+ return have
+}
+
+func (cn *Peer) completedString() string {
+ return fmt.Sprintf("%d/%d", cn.remotePieceCount(), cn.bestPeerNumPieces())
}
func eventAgeString(t time.Time) string {
return float64(num) / cn.totalExpectingTime().Seconds()
}
+// Deprecated: Use Peer.Stats.
func (p *Peer) DownloadRate() float64 {
- p.locker().RLock()
- defer p.locker().RUnlock()
-
- return p.downloadRate()
+ return p.Stats().DownloadRate
}
func (cn *Peer) iterContiguousPieceRequests(f func(piece pieceIndex, count int)) {
for _, prs := range p.peerRequests {
prs.allocReservation.Drop()
}
- p.peerImpl.onClose()
+ p.legacyPeerImpl.onClose()
if p.t != nil {
p.t.decPeerPieceAvailability(p)
}
for _, f := range cn.callbacks.SentRequest {
f(PeerRequestEvent{cn, ppReq})
}
- return cn.peerImpl._request(ppReq), nil
+ return cn.legacyPeerImpl._request(ppReq), nil
}
func (me *Peer) cancel(r RequestIndex) {
}
func (p *Peer) TryAsPeerConn() (*PeerConn, bool) {
- pc, ok := p.peerImpl.(*PeerConn)
+ pc, ok := p.legacyPeerImpl.(*PeerConn)
return pc, ok
}
receivedHashPieces map[[32]byte][][32]byte
}
+func (cn *PeerConn) lastWriteUploadRate() float64 {
+ cn.messageWriter.mu.Lock()
+ defer cn.messageWriter.mu.Unlock()
+ return cn.messageWriter.dataUploadRate
+}
+
func (cn *PeerConn) pexStatus() string {
if !cn.bitExtensionEnabled(pp.ExtensionBitLtep) {
return "extended protocol disabled"
Peer: Peer{t: tt},
}
pc.initRequestState()
- pc.peerImpl = &pc
+ pc.legacyPeerImpl = &pc
tt.conns[&pc] = struct{}{}
c.Assert(pc.onPeerSentHaveAll(), qt.IsNil)
c.Check(pc.t.connsWithAllPieces, qt.DeepEquals, map[*Peer]struct{}{&pc.Peer: {}})
readerCond chansync.BroadcastCond
- numVerifies pieceVerifyCount
- numVerifiesCond chansync.BroadcastCond
- hashing bool
+ numVerifies pieceVerifyCount
+ numVerifiesCond chansync.BroadcastCond
+ hashing bool
+ // The piece state may have changed, and is being synchronized with storage.
marking bool
storageCompletionOk bool
}
// Tells the Client to refetch the completion status from storage, updating priority etc. if
-// necessary. Might be useful if you know the state of the piece data has changed externally.
+// necessary. Might be useful if you know the state of the piece data has
+// changed externally. TODO: Document why this is public, maybe change name to
+// SyncCompletion or something.
func (p *Piece) UpdateCompletion() {
p.t.cl.lock()
defer p.t.cl.unlock()
Completion() Completion
}
+// TODO: Yo where the fuck is the documentation.
type Completion struct {
Complete bool
Ok bool
func (t *Torrent) hashPiece(piece pieceIndex) (
correct bool,
-// These are peers that sent us blocks that differ from what we hash here.
+ // These are peers that sent us blocks that differ from what we hash here.
differingPeers map[bannableAddr]struct{},
err error,
) {
}
func (t *Torrent) hashPieceWithSpecificHash(piece pieceIndex, h hash.Hash) (
-// These are peers that sent us blocks that differ from what we hash here.
+ // These are peers that sent us blocks that differ from what we hash here.
differingPeers map[bannableAddr]struct{},
err error,
) {
}
func (t *Torrent) maybeDropMutuallyCompletePeer(
-// I'm not sure about taking peer here, not all peer implementations actually drop. Maybe that's
-// okay?
+ // I'm not sure about taking peer here, not all peer implementations actually drop. Maybe that's
+ // okay?
p *PeerConn,
) {
if !t.cl.config.DropMutuallyCompletePeers {
f(&ws.peer)
}
ws.peer.logger = t.logger.WithContextValue(&ws).WithNames("webseed")
+ // TODO: Abstract out a common struct initializer for this...
+ ws.peer.legacyPeerImpl = &ws
ws.peer.peerImpl = &ws
if t.haveInfo() {
ws.onGotInfo(t.info)
g.MakeMapIfNil(&tt.conns)
pc := PeerConn{}
pc.t = &tt
- pc.peerImpl = &pc
+ pc.legacyPeerImpl = &pc
pc.initRequestState()
g.InitNew(&pc.callbacks)
tt.conns[&pc] = struct{}{}
lastUnhandledErr time.Time
}
-var _ peerImpl = (*webseedPeer)(nil)
+func (me *webseedPeer) lastWriteUploadRate() float64 {
+ // We never upload to webseeds.
+ return 0
+}
+
+var _ legacyPeerImpl = (*webseedPeer)(nil)
func (me *webseedPeer) peerImplStatusLines() []string {
return []string{