From: Matt Joiner Date: Thu, 1 Feb 2018 03:46:03 +0000 (+1100) Subject: Track concurrent requests across connections X-Git-Tag: v1.0.0~238 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=9387e6099992444f6af0a72d082b55e805f0abb6;p=btrtrc.git Track concurrent requests across connections In preparation for some more request strategy options --- diff --git a/connection.go b/connection.go index 40d02aba..aea9d49d 100644 --- a/connection.go +++ b/connection.go @@ -359,6 +359,7 @@ func (cn *connection) request(r request, mw messageWriter) bool { panic("requesting piece peer doesn't have") } cn.requests[r] = struct{}{} + cn.t.pendingRequests[r]++ return mw(pp.Message{ Type: pp.Request, Index: r.Index, @@ -846,7 +847,7 @@ func (c *connection) mainReadLoop() error { switch msg.Type { case pp.Choke: c.PeerChoked = true - c.requests = nil + c.deleteAllRequests() // We can then reset our interest. c.updateRequests() case pp.Reject: @@ -1211,9 +1212,19 @@ func (c *connection) deleteRequest(r request) bool { return false } delete(c.requests, r) + c.t.pendingRequests[r]-- return true } +func (c *connection) deleteAllRequests() { + for r := range c.requests { + c.deleteRequest(r) + } + // for c := range c.t.conns { + // c.tickleWriter() + // } +} + func (c *connection) tickleWriter() { c.writerCond.Broadcast() } diff --git a/torrent.go b/torrent.go index 935fb164..0814dcee 100644 --- a/torrent.go +++ b/torrent.go @@ -133,6 +133,8 @@ type Torrent struct { connPieceInclinationPool sync.Pool // Torrent-level statistics. stats TorrentStats + + pendingRequests map[request]int } // Returns a channel that is closed when the Torrent is closed. @@ -381,6 +383,7 @@ func (t *Torrent) onSetInfo() { t.cl.event.Broadcast() t.gotMetainfo.Set() t.updateWantPeersEvent() + t.pendingRequests = make(map[request]int) } // Called when metadata for a torrent becomes available. @@ -1170,9 +1173,21 @@ func (t *Torrent) SetInfoBytes(b []byte) (err error) { func (t *Torrent) deleteConnection(c *connection) (ret bool) { _, ret = t.conns[c] delete(t.conns, c) + c.deleteAllRequests() + if len(t.conns) == 0 { + t.assertNoPendingRequests() + } return } +func (t *Torrent) assertNoPendingRequests() { + for _, num := range t.pendingRequests { + if num != 0 { + panic(num) + } + } +} + func (t *Torrent) dropConnection(c *connection) { t.cl.event.Broadcast() c.Close()