From 78ef36066d36914c20b0c2c03aea25fef71dcc37 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Mon, 1 Dec 2014 03:36:25 -0600 Subject: [PATCH] Give each connection its own piece priority order --- client.go | 35 ++----------- connection.go | 26 +++++++--- download_strategies.go | 109 ++++++++++++----------------------------- 3 files changed, 53 insertions(+), 117 deletions(-) diff --git a/client.go b/client.go index c886a0ef..74141627 100644 --- a/client.go +++ b/client.go @@ -836,6 +836,9 @@ func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerS Port: uint16(addr.Port), }) } + if torrent.haveInfo() { + conn.initPieceOrder(torrent.NumPieces()) + } err = me.connectionLoop(torrent, conn) if err != nil { err = fmt.Errorf("during Connection loop with peer %q: %s", conn.PeerID, err) @@ -1758,36 +1761,11 @@ func (me *Client) WaitAll() bool { return true } -func (cl *Client) assertRequestHeat() { - dds, ok := cl.downloadStrategy.(*DefaultDownloadStrategy) - if !ok { - return - } - for _, t := range cl.torrents { - m := make(map[request]int, 3000) - for _, cn := range t.Conns { - for r := range cn.Requests { - m[r]++ - } - } - for r, h := range dds.heat[t] { - if m[r] != h { - panic(fmt.Sprintln(m[r], h)) - } - } - } -} - func (me *Client) replenishConnRequests(t *torrent, c *connection) { if !t.haveInfo() { return } - for _, p := range me.downloadStrategy.FillRequests(t, c) { - // Make sure the state of pieces that would have been requested is - // known. - me.queueFirstHash(t, p) - } - //me.assertRequestHeat() + me.downloadStrategy.FillRequests(t, c) if len(c.Requests) == 0 && !c.PeerChoked { c.SetInterested(false) } @@ -1916,11 +1894,6 @@ func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) { } } } - // Do this even if the piece is correct because new first-hashings may - // need to be scheduled. - if conn.PeerHasPiece(piece) { - me.replenishConnRequests(t, conn) - } } if t.haveAllPieces() && me.noUpload { t.CeaseNetworking() diff --git a/connection.go b/connection.go index 1a75e17a..464f14be 100644 --- a/connection.go +++ b/connection.go @@ -8,6 +8,7 @@ import ( "expvar" "fmt" "io" + "math/rand" "net" "sync" "time" @@ -27,13 +28,14 @@ const ( // Maintains the state of a connection with a peer. type connection struct { - Socket net.Conn - Discovery peerSource - uTP bool - closing chan struct{} - mu sync.Mutex // Only for closing. - post chan pp.Message - writeCh chan []byte + Socket net.Conn + Discovery peerSource + uTP bool + closing chan struct{} + mu sync.Mutex // Only for closing. + post chan pp.Message + writeCh chan []byte + pieceOrder []int UnwantedChunksReceived int UsefulChunksReceived int @@ -109,6 +111,7 @@ func (cn *connection) piecesPeerHasCount() (count int) { // invalid, such as by receiving badly sized BITFIELD, or invalid HAVE // messages. func (cn *connection) setNumPieces(num int) error { + cn.initPieceOrder(num) if cn.PeerPieces == nil { return nil } @@ -131,6 +134,15 @@ func (cn *connection) setNumPieces(num int) error { return nil } +func (cn *connection) initPieceOrder(numPieces int) { + if cn.pieceOrder == nil { + cn.pieceOrder = rand.Perm(numPieces) + } + if len(cn.pieceOrder) != numPieces { + panic("piece order initialized with wrong length") + } +} + func eventAgeString(t time.Time) string { if t.IsZero() { return "never" diff --git a/download_strategies.go b/download_strategies.go index 5317cbf9..0b85d190 100644 --- a/download_strategies.go +++ b/download_strategies.go @@ -10,10 +10,8 @@ import ( ) type DownloadStrategy interface { - // Tops up the outgoing pending requests. Returns the indices of pieces - // that would be requested. This is used to determine if pieces require - // hashing so the completed state is known. - FillRequests(*torrent, *connection) (pieces []int) + // Tops up the outgoing pending requests. + FillRequests(*torrent, *connection) TorrentStarted(*torrent) TorrentStopped(*torrent) DeleteRequest(*torrent, request) @@ -25,97 +23,53 @@ type DownloadStrategy interface { PendingData(*torrent) bool } -type DefaultDownloadStrategy struct { - heat map[*torrent]map[request]int -} +type DefaultDownloadStrategy struct{} func (me *DefaultDownloadStrategy) PendingData(t *torrent) bool { return !t.haveAllPieces() } -func (me *DefaultDownloadStrategy) AssertNotRequested(t *torrent, r request) { - if me.heat[t][r] != 0 { - panic("outstanding requests break invariant") - } -} +func (me *DefaultDownloadStrategy) AssertNotRequested(t *torrent, r request) {} func (me *DefaultDownloadStrategy) WriteStatus(w io.Writer) {} -func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) (pieces []int) { +func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) { if c.Interested { if c.PeerChoked { return } - if len(c.Requests) >= (c.PeerMaxRequests+1)/2 { + if len(c.Requests) != 0 { return } } - th := s.heat[t] addRequest := func(req request) (again bool) { - piece := t.Pieces[req.Index] - if piece.Hashing || piece.QueuedForHash { - // We can't be sure we want this. - return true - } - if piece.Complete() { - // We already have this. - return true - } - if c.RequestPending(req) { - return true + return c.Request(req) + } + for i := range t.Pieces { + pieceIndex := c.pieceOrder[i] + if !c.PeerHasPiece(pp.Integer(pieceIndex)) { + continue } - again = c.Request(req) - if c.RequestPending(req) { - th[req]++ + if !t.wantPiece(pieceIndex) { + continue } - return - } - // Then finish off incomplete pieces in order of bytes remaining. - for _, heatThreshold := range []int{1, 4, 15, 60} { - for e := t.IncompletePiecesByBytesLeft.Front(); e != nil; e = e.Next() { - pieceIndex := pp.Integer(e.Value.(int)) - piece := t.Pieces[pieceIndex] - if !piece.EverHashed { - pieces = append(pieces, int(pieceIndex)) - continue - } - for chunkSpec := range t.Pieces[pieceIndex].PendingChunkSpecs { - r := request{pieceIndex, chunkSpec} - if th[r] >= heatThreshold { - continue - } - if !addRequest(r) { - return - } + piece := t.Pieces[pieceIndex] + for _, cs := range piece.shuffledPendingChunkSpecs() { + r := request{pp.Integer(pieceIndex), cs} + if !addRequest(r) { + return } } } return } -func (s *DefaultDownloadStrategy) TorrentStarted(t *torrent) { - if s.heat[t] != nil { - panic("torrent already started") - } - if s.heat == nil { - s.heat = make(map[*torrent]map[request]int, 10) - } - s.heat[t] = make(map[request]int, t.ChunkCount()) -} +func (s *DefaultDownloadStrategy) TorrentStarted(t *torrent) {} func (s *DefaultDownloadStrategy) TorrentStopped(t *torrent) { - if _, ok := s.heat[t]; !ok { - panic("torrent not yet started") - } - delete(s.heat, t) } func (s *DefaultDownloadStrategy) DeleteRequest(t *torrent, r request) { - m := s.heat[t] - if m[r] <= 0 { - panic("no pending requests") - } - m[r]-- } func (me *DefaultDownloadStrategy) TorrentGotChunk(t *torrent, c request) {} @@ -176,18 +130,10 @@ type requestFiller struct { c *connection t *torrent s *responsiveDownloadStrategy - - // The set of pieces that were considered for requesting. - pieces map[int]struct{} } // Wrapper around connection.request that tracks request heat. func (me *requestFiller) request(req request) bool { - if me.pieces == nil { - me.pieces = make(map[int]struct{}) - } - // log.Print(req) - me.pieces[int(req.Index)] = struct{}{} if me.c.RequestPending(req) { return true } @@ -340,12 +286,9 @@ func (me *requestFiller) readahead() bool { return true } -func (me *responsiveDownloadStrategy) FillRequests(t *torrent, c *connection) (pieces []int) { +func (me *responsiveDownloadStrategy) FillRequests(t *torrent, c *connection) { rf := requestFiller{c: c, t: t, s: me} rf.Run() - for p := range rf.pieces { - pieces = append(pieces, p) - } return } @@ -385,5 +328,13 @@ func (s *responsiveDownloadStrategy) AssertNotRequested(t *torrent, r request) { } func (me *responsiveDownloadStrategy) PendingData(t *torrent) bool { - return len(me.FillRequests(t, me.dummyConn)) != 0 + if len(me.priorities[t]) != 0 { + return true + } + for index := range t.Pieces { + if t.wantPiece(index) { + return true + } + } + return false } -- 2.48.1