]> Sergey Matveev's repositories - btrtrc.git/blob - analysis/peer-upload-order.go
Add peer upload order analysis
[btrtrc.git] / analysis / peer-upload-order.go
1 package analysis
2
3 import (
4         "fmt"
5         "log"
6         "sync"
7
8         "github.com/anacrolix/torrent"
9         pp "github.com/anacrolix/torrent/peer_protocol"
10         "github.com/elliotchance/orderedmap"
11 )
12
13 type peerData struct {
14         requested   *orderedmap.OrderedMap
15         haveDeleted map[torrent.Request]bool
16 }
17
18 // Tracks the order that peers upload requests that we've sent them.
19 type PeerUploadOrder struct {
20         mu    sync.Mutex
21         peers map[*torrent.Peer]*peerData
22 }
23
24 func (me *PeerUploadOrder) Init() {
25         me.peers = make(map[*torrent.Peer]*peerData)
26 }
27
28 func (me *PeerUploadOrder) onNewPeer(p *torrent.Peer) {
29         me.mu.Lock()
30         defer me.mu.Unlock()
31         if _, ok := me.peers[p]; ok {
32                 panic("already have peer")
33         }
34         me.peers[p] = &peerData{
35                 requested:   orderedmap.NewOrderedMap(),
36                 haveDeleted: make(map[torrent.Request]bool),
37         }
38 }
39
40 func (me *PeerUploadOrder) onSentRequest(event torrent.PeerRequestEvent) {
41         me.mu.Lock()
42         defer me.mu.Unlock()
43         if !me.peers[event.Peer].requested.Set(event.Request, nil) {
44                 panic("duplicate request sent")
45         }
46 }
47
48 func (me *PeerUploadOrder) Install(cbs *torrent.Callbacks) {
49         cbs.NewPeer = append(cbs.NewPeer, me.onNewPeer)
50         cbs.SentRequest = append(cbs.SentRequest, me.onSentRequest)
51         cbs.ReceivedRequested = append(cbs.ReceivedRequested, me.onReceivedRequested)
52         cbs.DeletedRequest = append(cbs.DeletedRequest, me.deletedRequest)
53 }
54
55 func (me *PeerUploadOrder) report(desc string, req torrent.Request, peer *torrent.Peer) {
56         peerConn, ok := peer.TryAsPeerConn()
57         var peerId *torrent.PeerID
58         if ok {
59                 peerId = &peerConn.PeerID
60         }
61         log.Printf("%s: %v, %v", desc, req, peerId)
62 }
63
64 func (me *PeerUploadOrder) onReceivedRequested(event torrent.PeerMessageEvent) {
65         req := torrent.Request{
66                 event.Message.Index,
67                 torrent.ChunkSpec{
68                         Begin:  event.Message.Begin,
69                         Length: pp.Integer(len(event.Message.Piece)),
70                 },
71         }
72         makeLogMsg := func(desc string) string {
73                 peerConn, ok := event.Peer.TryAsPeerConn()
74                 var peerId *torrent.PeerID
75                 if ok {
76                         peerId = &peerConn.PeerID
77                 }
78                 return fmt.Sprintf("%s: %q, %v", desc, peerId, req)
79         }
80         me.mu.Lock()
81         defer me.mu.Unlock()
82         peerData := me.peers[event.Peer]
83         if peerData.requested.Front().Key.(torrent.Request) == req {
84                 log.Print(makeLogMsg("got next requested piece"))
85         } else if _, ok := peerData.requested.Get(req); ok {
86                 log.Print(makeLogMsg(fmt.Sprintf(
87                         "got requested piece but not next (previous delete=%v)",
88                         peerData.haveDeleted[req])))
89         } else {
90                 panic(makeLogMsg("got unrequested piece"))
91         }
92 }
93
94 func (me *PeerUploadOrder) deletedRequest(event torrent.PeerRequestEvent) {
95         me.mu.Lock()
96         defer me.mu.Unlock()
97         peerData := me.peers[event.Peer]
98         if !peerData.requested.Delete(event.Request) {
99                 panic("nothing to delete")
100         }
101         peerData.haveDeleted[event.Request] = true
102 }