From: Matt Joiner <anacrolix@gmail.com>
Date: Thu, 28 Jan 2021 05:31:13 +0000 (+1100)
Subject: Add peer upload order analysis
X-Git-Tag: v1.22.0~6
X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=ca1497ad58769b4ed6f7f164fb405e80cf59f847;p=btrtrc.git

Add peer upload order analysis
---

diff --git a/analysis/peer-upload-order.go b/analysis/peer-upload-order.go
new file mode 100644
index 00000000..cef18ce4
--- /dev/null
+++ b/analysis/peer-upload-order.go
@@ -0,0 +1,102 @@
+package analysis
+
+import (
+	"fmt"
+	"log"
+	"sync"
+
+	"github.com/anacrolix/torrent"
+	pp "github.com/anacrolix/torrent/peer_protocol"
+	"github.com/elliotchance/orderedmap"
+)
+
+type peerData struct {
+	requested   *orderedmap.OrderedMap
+	haveDeleted map[torrent.Request]bool
+}
+
+// Tracks the order that peers upload requests that we've sent them.
+type PeerUploadOrder struct {
+	mu    sync.Mutex
+	peers map[*torrent.Peer]*peerData
+}
+
+func (me *PeerUploadOrder) Init() {
+	me.peers = make(map[*torrent.Peer]*peerData)
+}
+
+func (me *PeerUploadOrder) onNewPeer(p *torrent.Peer) {
+	me.mu.Lock()
+	defer me.mu.Unlock()
+	if _, ok := me.peers[p]; ok {
+		panic("already have peer")
+	}
+	me.peers[p] = &peerData{
+		requested:   orderedmap.NewOrderedMap(),
+		haveDeleted: make(map[torrent.Request]bool),
+	}
+}
+
+func (me *PeerUploadOrder) onSentRequest(event torrent.PeerRequestEvent) {
+	me.mu.Lock()
+	defer me.mu.Unlock()
+	if !me.peers[event.Peer].requested.Set(event.Request, nil) {
+		panic("duplicate request sent")
+	}
+}
+
+func (me *PeerUploadOrder) Install(cbs *torrent.Callbacks) {
+	cbs.NewPeer = append(cbs.NewPeer, me.onNewPeer)
+	cbs.SentRequest = append(cbs.SentRequest, me.onSentRequest)
+	cbs.ReceivedRequested = append(cbs.ReceivedRequested, me.onReceivedRequested)
+	cbs.DeletedRequest = append(cbs.DeletedRequest, me.deletedRequest)
+}
+
+func (me *PeerUploadOrder) report(desc string, req torrent.Request, peer *torrent.Peer) {
+	peerConn, ok := peer.TryAsPeerConn()
+	var peerId *torrent.PeerID
+	if ok {
+		peerId = &peerConn.PeerID
+	}
+	log.Printf("%s: %v, %v", desc, req, peerId)
+}
+
+func (me *PeerUploadOrder) onReceivedRequested(event torrent.PeerMessageEvent) {
+	req := torrent.Request{
+		event.Message.Index,
+		torrent.ChunkSpec{
+			Begin:  event.Message.Begin,
+			Length: pp.Integer(len(event.Message.Piece)),
+		},
+	}
+	makeLogMsg := func(desc string) string {
+		peerConn, ok := event.Peer.TryAsPeerConn()
+		var peerId *torrent.PeerID
+		if ok {
+			peerId = &peerConn.PeerID
+		}
+		return fmt.Sprintf("%s: %q, %v", desc, peerId, req)
+	}
+	me.mu.Lock()
+	defer me.mu.Unlock()
+	peerData := me.peers[event.Peer]
+	if peerData.requested.Front().Key.(torrent.Request) == req {
+		log.Print(makeLogMsg("got next requested piece"))
+	} else if _, ok := peerData.requested.Get(req); ok {
+		log.Print(makeLogMsg(fmt.Sprintf(
+			"got requested piece but not next (previous delete=%v)",
+			peerData.haveDeleted[req])))
+	} else {
+		panic(makeLogMsg("got unrequested piece"))
+	}
+}
+
+func (me *PeerUploadOrder) deletedRequest(event torrent.PeerRequestEvent) {
+	me.mu.Lock()
+	defer me.mu.Unlock()
+	peerData := me.peers[event.Peer]
+	if !peerData.requested.Delete(event.Request) {
+		panic("nothing to delete")
+	}
+	peerData.haveDeleted[event.Request] = true
+}
diff --git a/peerconn.go b/peerconn.go
index b994f3be..1f9a3a53 100644
--- a/peerconn.go
+++ b/peerconn.go
@@ -1747,3 +1747,8 @@ func (cn *Peer) stats() *ConnStats {
 func (cn *Peer) torrent() requestStrategyTorrent {
 	return cn.t.requestStrategyTorrent()
 }
+
+func (p *Peer) TryAsPeerConn() (*PeerConn, bool) {
+	pc, ok := p.peerImpl.(*PeerConn)
+	return pc, ok
+}