]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Add peer upload order analysis
authorMatt Joiner <anacrolix@gmail.com>
Thu, 28 Jan 2021 05:31:13 +0000 (16:31 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Thu, 28 Jan 2021 05:31:13 +0000 (16:31 +1100)
analysis/peer-upload-order.go [new file with mode: 0644]
peerconn.go

diff --git a/analysis/peer-upload-order.go b/analysis/peer-upload-order.go
new file mode 100644 (file)
index 0000000..cef18ce
--- /dev/null
@@ -0,0 +1,102 @@
+package analysis
+
+import (
+       "fmt"
+       "log"
+       "sync"
+
+       "github.com/anacrolix/torrent"
+       pp "github.com/anacrolix/torrent/peer_protocol"
+       "github.com/elliotchance/orderedmap"
+)
+
+type peerData struct {
+       requested   *orderedmap.OrderedMap
+       haveDeleted map[torrent.Request]bool
+}
+
+// Tracks the order that peers upload requests that we've sent them.
+type PeerUploadOrder struct {
+       mu    sync.Mutex
+       peers map[*torrent.Peer]*peerData
+}
+
+func (me *PeerUploadOrder) Init() {
+       me.peers = make(map[*torrent.Peer]*peerData)
+}
+
+func (me *PeerUploadOrder) onNewPeer(p *torrent.Peer) {
+       me.mu.Lock()
+       defer me.mu.Unlock()
+       if _, ok := me.peers[p]; ok {
+               panic("already have peer")
+       }
+       me.peers[p] = &peerData{
+               requested:   orderedmap.NewOrderedMap(),
+               haveDeleted: make(map[torrent.Request]bool),
+       }
+}
+
+func (me *PeerUploadOrder) onSentRequest(event torrent.PeerRequestEvent) {
+       me.mu.Lock()
+       defer me.mu.Unlock()
+       if !me.peers[event.Peer].requested.Set(event.Request, nil) {
+               panic("duplicate request sent")
+       }
+}
+
+func (me *PeerUploadOrder) Install(cbs *torrent.Callbacks) {
+       cbs.NewPeer = append(cbs.NewPeer, me.onNewPeer)
+       cbs.SentRequest = append(cbs.SentRequest, me.onSentRequest)
+       cbs.ReceivedRequested = append(cbs.ReceivedRequested, me.onReceivedRequested)
+       cbs.DeletedRequest = append(cbs.DeletedRequest, me.deletedRequest)
+}
+
+func (me *PeerUploadOrder) report(desc string, req torrent.Request, peer *torrent.Peer) {
+       peerConn, ok := peer.TryAsPeerConn()
+       var peerId *torrent.PeerID
+       if ok {
+               peerId = &peerConn.PeerID
+       }
+       log.Printf("%s: %v, %v", desc, req, peerId)
+}
+
+func (me *PeerUploadOrder) onReceivedRequested(event torrent.PeerMessageEvent) {
+       req := torrent.Request{
+               event.Message.Index,
+               torrent.ChunkSpec{
+                       Begin:  event.Message.Begin,
+                       Length: pp.Integer(len(event.Message.Piece)),
+               },
+       }
+       makeLogMsg := func(desc string) string {
+               peerConn, ok := event.Peer.TryAsPeerConn()
+               var peerId *torrent.PeerID
+               if ok {
+                       peerId = &peerConn.PeerID
+               }
+               return fmt.Sprintf("%s: %q, %v", desc, peerId, req)
+       }
+       me.mu.Lock()
+       defer me.mu.Unlock()
+       peerData := me.peers[event.Peer]
+       if peerData.requested.Front().Key.(torrent.Request) == req {
+               log.Print(makeLogMsg("got next requested piece"))
+       } else if _, ok := peerData.requested.Get(req); ok {
+               log.Print(makeLogMsg(fmt.Sprintf(
+                       "got requested piece but not next (previous delete=%v)",
+                       peerData.haveDeleted[req])))
+       } else {
+               panic(makeLogMsg("got unrequested piece"))
+       }
+}
+
+func (me *PeerUploadOrder) deletedRequest(event torrent.PeerRequestEvent) {
+       me.mu.Lock()
+       defer me.mu.Unlock()
+       peerData := me.peers[event.Peer]
+       if !peerData.requested.Delete(event.Request) {
+               panic("nothing to delete")
+       }
+       peerData.haveDeleted[event.Request] = true
+}
index b994f3be7b95263dabb8c5e6b01036fc73764ecb..1f9a3a531e6ab94512700ad908ffcdc99d8ebb52 100644 (file)
@@ -1747,3 +1747,8 @@ func (cn *Peer) stats() *ConnStats {
 func (cn *Peer) torrent() requestStrategyTorrent {
        return cn.t.requestStrategyTorrent()
 }
+
+func (p *Peer) TryAsPeerConn() (*PeerConn, bool) {
+       pc, ok := p.peerImpl.(*PeerConn)
+       return pc, ok
+}