From ca1497ad58769b4ed6f7f164fb405e80cf59f847 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Thu, 28 Jan 2021 16:31:13 +1100 Subject: [PATCH] Add peer upload order analysis --- analysis/peer-upload-order.go | 102 ++++++++++++++++++++++++++++++++++ peerconn.go | 5 ++ 2 files changed, 107 insertions(+) create mode 100644 analysis/peer-upload-order.go diff --git a/analysis/peer-upload-order.go b/analysis/peer-upload-order.go new file mode 100644 index 00000000..cef18ce4 --- /dev/null +++ b/analysis/peer-upload-order.go @@ -0,0 +1,102 @@ +package analysis + +import ( + "fmt" + "log" + "sync" + + "github.com/anacrolix/torrent" + pp "github.com/anacrolix/torrent/peer_protocol" + "github.com/elliotchance/orderedmap" +) + +type peerData struct { + requested *orderedmap.OrderedMap + haveDeleted map[torrent.Request]bool +} + +// Tracks the order that peers upload requests that we've sent them. +type PeerUploadOrder struct { + mu sync.Mutex + peers map[*torrent.Peer]*peerData +} + +func (me *PeerUploadOrder) Init() { + me.peers = make(map[*torrent.Peer]*peerData) +} + +func (me *PeerUploadOrder) onNewPeer(p *torrent.Peer) { + me.mu.Lock() + defer me.mu.Unlock() + if _, ok := me.peers[p]; ok { + panic("already have peer") + } + me.peers[p] = &peerData{ + requested: orderedmap.NewOrderedMap(), + haveDeleted: make(map[torrent.Request]bool), + } +} + +func (me *PeerUploadOrder) onSentRequest(event torrent.PeerRequestEvent) { + me.mu.Lock() + defer me.mu.Unlock() + if !me.peers[event.Peer].requested.Set(event.Request, nil) { + panic("duplicate request sent") + } +} + +func (me *PeerUploadOrder) Install(cbs *torrent.Callbacks) { + cbs.NewPeer = append(cbs.NewPeer, me.onNewPeer) + cbs.SentRequest = append(cbs.SentRequest, me.onSentRequest) + cbs.ReceivedRequested = append(cbs.ReceivedRequested, me.onReceivedRequested) + cbs.DeletedRequest = append(cbs.DeletedRequest, me.deletedRequest) +} + +func (me *PeerUploadOrder) report(desc string, req torrent.Request, peer *torrent.Peer) { + peerConn, ok := peer.TryAsPeerConn() + var peerId *torrent.PeerID + if ok { + peerId = &peerConn.PeerID + } + log.Printf("%s: %v, %v", desc, req, peerId) +} + +func (me *PeerUploadOrder) onReceivedRequested(event torrent.PeerMessageEvent) { + req := torrent.Request{ + event.Message.Index, + torrent.ChunkSpec{ + Begin: event.Message.Begin, + Length: pp.Integer(len(event.Message.Piece)), + }, + } + makeLogMsg := func(desc string) string { + peerConn, ok := event.Peer.TryAsPeerConn() + var peerId *torrent.PeerID + if ok { + peerId = &peerConn.PeerID + } + return fmt.Sprintf("%s: %q, %v", desc, peerId, req) + } + me.mu.Lock() + defer me.mu.Unlock() + peerData := me.peers[event.Peer] + if peerData.requested.Front().Key.(torrent.Request) == req { + log.Print(makeLogMsg("got next requested piece")) + } else if _, ok := peerData.requested.Get(req); ok { + log.Print(makeLogMsg(fmt.Sprintf( + "got requested piece but not next (previous delete=%v)", + peerData.haveDeleted[req]))) + } else { + panic(makeLogMsg("got unrequested piece")) + } +} + +func (me *PeerUploadOrder) deletedRequest(event torrent.PeerRequestEvent) { + me.mu.Lock() + defer me.mu.Unlock() + peerData := me.peers[event.Peer] + if !peerData.requested.Delete(event.Request) { + panic("nothing to delete") + } + peerData.haveDeleted[event.Request] = true +} diff --git a/peerconn.go b/peerconn.go index b994f3be..1f9a3a53 100644 --- a/peerconn.go +++ b/peerconn.go @@ -1747,3 +1747,8 @@ func (cn *Peer) stats() *ConnStats { func (cn *Peer) torrent() requestStrategyTorrent { return cn.t.requestStrategyTorrent() } + +func (p *Peer) TryAsPeerConn() (*PeerConn, bool) { + pc, ok := p.peerImpl.(*PeerConn) + return pc, ok +} -- 2.48.1