]> Sergey Matveev's repositories - btrtrc.git/blob - analysis/peer-upload-order.go
Fix overflow in average download rate
[btrtrc.git] / analysis / peer-upload-order.go
1 package analysis
2
3 import (
4         "fmt"
5         "log"
6         "sync"
7
8         "github.com/elliotchance/orderedmap"
9
10         "github.com/anacrolix/torrent"
11         pp "github.com/anacrolix/torrent/peer_protocol"
12 )
13
14 type peerData struct {
15         requested   *orderedmap.OrderedMap
16         haveDeleted map[torrent.Request]bool
17 }
18
19 // Tracks the order that peers upload requests that we've sent them.
20 type PeerUploadOrder struct {
21         mu    sync.Mutex
22         peers map[*torrent.Peer]*peerData
23 }
24
25 func (me *PeerUploadOrder) Init() {
26         me.peers = make(map[*torrent.Peer]*peerData)
27 }
28
29 func (me *PeerUploadOrder) onNewPeer(p *torrent.Peer) {
30         me.mu.Lock()
31         defer me.mu.Unlock()
32         if _, ok := me.peers[p]; ok {
33                 panic("already have peer")
34         }
35         me.peers[p] = &peerData{
36                 requested:   orderedmap.NewOrderedMap(),
37                 haveDeleted: make(map[torrent.Request]bool),
38         }
39 }
40
41 func (me *PeerUploadOrder) onSentRequest(event torrent.PeerRequestEvent) {
42         me.mu.Lock()
43         defer me.mu.Unlock()
44         if !me.peers[event.Peer].requested.Set(event.Request, nil) {
45                 panic("duplicate request sent")
46         }
47 }
48
49 func (me *PeerUploadOrder) Install(cbs *torrent.Callbacks) {
50         cbs.NewPeer = append(cbs.NewPeer, me.onNewPeer)
51         cbs.SentRequest = append(cbs.SentRequest, me.onSentRequest)
52         cbs.ReceivedRequested = append(cbs.ReceivedRequested, me.onReceivedRequested)
53         cbs.DeletedRequest = append(cbs.DeletedRequest, me.deletedRequest)
54 }
55
56 func (me *PeerUploadOrder) report(desc string, req torrent.Request, peer *torrent.Peer) {
57         peerConn, ok := peer.TryAsPeerConn()
58         var peerId *torrent.PeerID
59         if ok {
60                 peerId = &peerConn.PeerID
61         }
62         log.Printf("%s: %v, %v", desc, req, peerId)
63 }
64
65 func (me *PeerUploadOrder) onReceivedRequested(event torrent.PeerMessageEvent) {
66         req := torrent.Request{
67                 event.Message.Index,
68                 torrent.ChunkSpec{
69                         Begin:  event.Message.Begin,
70                         Length: pp.Integer(len(event.Message.Piece)),
71                 },
72         }
73         makeLogMsg := func(desc string) string {
74                 peerConn, ok := event.Peer.TryAsPeerConn()
75                 var peerId *torrent.PeerID
76                 if ok {
77                         peerId = &peerConn.PeerID
78                 }
79                 return fmt.Sprintf("%s: %q, %v", desc, peerId, req)
80         }
81         me.mu.Lock()
82         defer me.mu.Unlock()
83         peerData := me.peers[event.Peer]
84         if peerData.requested.Front().Key.(torrent.Request) == req {
85                 log.Print(makeLogMsg("got next requested piece"))
86         } else if _, ok := peerData.requested.Get(req); ok {
87                 log.Print(makeLogMsg(fmt.Sprintf(
88                         "got requested piece but not next (previous delete=%v)",
89                         peerData.haveDeleted[req])))
90         } else {
91                 panic(makeLogMsg("got unrequested piece"))
92         }
93 }
94
95 func (me *PeerUploadOrder) deletedRequest(event torrent.PeerRequestEvent) {
96         me.mu.Lock()
97         defer me.mu.Unlock()
98         peerData := me.peers[event.Peer]
99         if !peerData.requested.Delete(event.Request) {
100                 panic("nothing to delete")
101         }
102         peerData.haveDeleted[event.Request] = true
103 }