From: Matt Joiner Date: Fri, 23 May 2014 11:01:05 +0000 (+1000) Subject: Store request heat in the download strategy X-Git-Tag: v1.0.0~1731 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=898c4f4e4f884398459cda42894bff418dfbfc3d;p=btrtrc.git Store request heat in the download strategy This avoids recalculating it for every FillRequest --- diff --git a/client.go b/client.go index 16c434a2..7b5d6715 100644 --- a/client.go +++ b/client.go @@ -170,7 +170,7 @@ func (c *Client) Start() { } c.quit = make(chan struct{}) if c.DownloadStrategy == nil { - c.DownloadStrategy = DefaultDownloadStrategy{} + c.DownloadStrategy = &DefaultDownloadStrategy{} } if c.Listener != nil { go c.acceptConnections() @@ -363,6 +363,22 @@ func (me *Client) peerUnchoked(torrent *torrent, conn *connection) { me.replenishConnRequests(torrent, conn) } +func (cl *Client) connCancel(t *torrent, cn *connection, r request) (ok bool) { + ok = cn.Cancel(r) + if ok { + cl.DownloadStrategy.DeleteRequest(t, r) + } + return +} + +func (cl *Client) connDeleteRequest(t *torrent, cn *connection, r request) { + if !cn.RequestPending(r) { + return + } + cl.DownloadStrategy.DeleteRequest(t, r) + delete(cn.Requests, r) +} + func (me *Client) connectionLoop(t *torrent, c *connection) error { decoder := pp.Decoder{ R: bufio.NewReader(c.Socket), @@ -385,7 +401,9 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error { switch msg.Type { case pp.Choke: c.PeerChoked = true - c.Requests = nil + for r := range c.Requests { + me.connDeleteRequest(t, c, r) + } case pp.Unchoke: c.PeerChoked = false me.peerUnchoked(t, c) @@ -453,6 +471,9 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error { func (me *Client) dropConnection(torrent *torrent, conn *connection) { conn.Socket.Close() + for r := range conn.Requests { + me.connDeleteRequest(torrent, conn, r) + } for i0, c := range torrent.Conns { if c != conn { continue @@ -562,6 +583,7 @@ func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) error { return torrent.Close() } me.torrents[torrent.InfoHash] = torrent + me.DownloadStrategy.TorrentStarted(torrent) if !me.DisableTrackers { go me.announceTorrent(torrent) } @@ -668,14 +690,39 @@ func (me *Client) WaitAll() bool { type DownloadStrategy interface { FillRequests(t *torrent, c *connection) + TorrentStarted(t *torrent) + TorrentStopped(t *torrent) + DeleteRequest(t *torrent, r request) } -type DefaultDownloadStrategy struct{} +type DefaultDownloadStrategy struct { + heat map[*torrent]map[request]int +} -func (DefaultDownloadStrategy) FillRequests(torrent *torrent, conn *connection) { - requestHeatMap := torrent.requestHeat() +func (cl *Client) assertRequestHeat() { + dds, ok := cl.DownloadStrategy.(*DefaultDownloadStrategy) + if !ok { + return + } + for _, t := range cl.torrents { + m := make(map[request]int, 3000) + for _, cn := range t.Conns { + for r := range cn.Requests { + m[r]++ + } + } + for r, h := range dds.heat[t] { + if m[r] != h { + panic(fmt.Sprintln(m[r], h)) + } + } + } +} + +func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) { + th := s.heat[t] addRequest := func(req request) (again bool) { - piece := torrent.Pieces[req.Index] + piece := t.Pieces[req.Index] if piece.Hashing { // We can't be sure we want this. return true @@ -684,30 +731,69 @@ func (DefaultDownloadStrategy) FillRequests(torrent *torrent, conn *connection) // We already have this. return true } - if requestHeatMap[req] > 0 { - // We've already requested this. + if c.RequestPending(req) { return true } - return conn.Request(req) + again = c.Request(req) + if c.RequestPending(req) { + th[req]++ + } + return } // First request prioritized chunks. - for e := torrent.Priorities.Front(); e != nil; e = e.Next() { + for e := t.Priorities.Front(); e != nil; e = e.Next() { if !addRequest(e.Value.(request)) { return } } + ppbs := t.piecesByPendingBytes() // Then finish off incomplete pieces in order of bytes remaining. - for _, index := range torrent.piecesByPendingBytes() { - for chunkSpec := range torrent.Pieces[index].PendingChunkSpecs { - if !addRequest(request{index, chunkSpec}) { - return + for _, heatThreshold := range []int{0, 4, 100} { + for _, pieceIndex := range ppbs { + for chunkSpec := range t.Pieces[pieceIndex].PendingChunkSpecs { + r := request{pieceIndex, chunkSpec} + if th[r] > heatThreshold { + continue + } + if !addRequest(request{pieceIndex, chunkSpec}) { + return + } } } } } +func (s *DefaultDownloadStrategy) TorrentStarted(t *torrent) { + if s.heat[t] != nil { + panic("torrent already started") + } + if s.heat == nil { + s.heat = make(map[*torrent]map[request]int, 10) + } + s.heat[t] = make(map[request]int, t.ChunkCount()) +} + +func (s *DefaultDownloadStrategy) TorrentStopped(t *torrent) { + if _, ok := s.heat[t]; !ok { + panic("torrent not yet started") + } + delete(s.heat, t) +} + +func (s *DefaultDownloadStrategy) DeleteRequest(t *torrent, r request) { + m := s.heat[t] + if m[r] <= 0 { + panic("no pending requests") + } + m[r]-- +} + type ResponsiveDownloadStrategy struct{} +func (ResponsiveDownloadStrategy) TorrentStarted(*torrent) {} +func (ResponsiveDownloadStrategy) TorrentStopped(*torrent) {} +func (ResponsiveDownloadStrategy) DeleteRequest(*torrent, request) {} + func (ResponsiveDownloadStrategy) FillRequests(t *torrent, c *connection) { for e := t.Priorities.Front(); e != nil; e = e.Next() { if !c.Request(e.Value.(request)) { @@ -745,7 +831,7 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece))) // Request has been satisfied. - delete(c.Requests, req) + me.connDeleteRequest(t, c, req) defer me.replenishConnRequests(t, c) @@ -763,7 +849,7 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er // Cancel pending requests for this chunk. for _, c := range t.Conns { - if c.Cancel(req) { + if me.connCancel(t, c, req) { me.replenishConnRequests(t, c) } } diff --git a/connection.go b/connection.go index 96a1d215..6bb5f5b2 100644 --- a/connection.go +++ b/connection.go @@ -62,6 +62,11 @@ func (c *connection) Post(msg encoding.BinaryMarshaler) { c.post <- msg } +func (c *connection) RequestPending(r request) bool { + _, ok := c.Requests[r] + return ok +} + // Returns true if more requests can be sent. func (c *connection) Request(chunk request) bool { if len(c.Requests) >= c.PeerMaxRequests { @@ -74,18 +79,19 @@ func (c *connection) Request(chunk request) bool { if c.PeerChoked { return false } - if _, ok := c.Requests[chunk]; !ok { - c.Post(peer_protocol.Message{ - Type: peer_protocol.Request, - Index: chunk.Index, - Begin: chunk.Begin, - Length: chunk.Length, - }) + if c.RequestPending(chunk) { + return true } if c.Requests == nil { c.Requests = make(map[request]struct{}, c.PeerMaxRequests) } c.Requests[chunk] = struct{}{} + c.Post(peer_protocol.Message{ + Type: peer_protocol.Request, + Index: chunk.Index, + Begin: chunk.Begin, + Length: chunk.Length, + }) return true } diff --git a/torrent.go b/torrent.go index 8a1da7c6..dfe24314 100644 --- a/torrent.go +++ b/torrent.go @@ -49,6 +49,20 @@ func (t *torrent) BytesLeft() (left int64) { return } +func NumChunksForPiece(chunkSize int, pieceSize int) int { + return (pieceSize + chunkSize - 1) / chunkSize +} + +func (t *torrent) ChunkCount() (num int) { + num += (t.NumPieces() - 1) * NumChunksForPiece(chunkSize, int(t.PieceLength(0))) + num += NumChunksForPiece(chunkSize, int(t.PieceLength(pp.Integer(t.NumPieces()-1)))) + return +} + +func (t *torrent) LastPieceSize() int { + return int(t.PieceLength(pp.Integer(t.NumPieces() - 1))) +} + func (t *torrent) NumPieces() int { return len(t.MetaInfo.Pieces) / pieceHash.Size() } @@ -156,16 +170,6 @@ func (t *torrent) pendAllChunkSpecs(index pp.Integer) { return } -func (t *torrent) requestHeat() (ret map[request]int) { - ret = make(map[request]int) - for _, conn := range t.Conns { - for req, _ := range conn.Requests { - ret[req]++ - } - } - return -} - type Peer struct { Id [20]byte IP net.IP