From: Matt Joiner Date: Sat, 16 May 2015 00:51:48 +0000 (+1000) Subject: Switch to using a slice for tracking pending chunks. Saves massive amounts of memory. X-Git-Tag: v1.0.0~1188 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=1cf591dc3d924176aab2dc143b60e7794a9d2fdb;p=btrtrc.git Switch to using a slice for tracking pending chunks. Saves massive amounts of memory. --- diff --git a/client.go b/client.go index 1e8755f1..3fe20da2 100644 --- a/client.go +++ b/client.go @@ -2447,7 +2447,7 @@ func (me *Client) fillRequests(t *torrent, c *connection) { panic("unwanted piece in connection request order") } piece := t.Pieces[pieceIndex] - for _, cs := range piece.shuffledPendingChunkSpecs() { + for _, cs := range piece.shuffledPendingChunkSpecs(t.pieceLength(pieceIndex)) { r := request{pp.Integer(pieceIndex), cs} if !addRequest(r) { return @@ -2503,9 +2503,9 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er // log.Println("got chunk", req) piece.Event.Broadcast() // Record that we have the chunk. - delete(piece.PendingChunkSpecs, req.chunkSpec) + piece.unpendChunkIndex(chunkIndex(req.chunkSpec)) delete(t.urgent, req) - if len(piece.PendingChunkSpecs) == 0 { + if piece.numPendingChunks() == 0 { for _, c := range t.Conns { c.pieceRequestOrder.DeletePiece(int(req.Index)) } @@ -2555,7 +2555,7 @@ func (me *Client) pieceChanged(t *torrent, piece int) { } p.Event.Broadcast() } else { - if len(p.PendingChunkSpecs) == 0 { + if p.numPendingChunks() == 0 { t.pendAllChunkSpecs(int(piece)) } if t.wantPiece(piece) { diff --git a/client_test.go b/client_test.go index 45381b7d..75ef696f 100644 --- a/client_test.go +++ b/client_test.go @@ -95,16 +95,12 @@ func TestTorrentInitialState(t *testing.T) { } p := tor.Pieces[0] tor.pendAllChunkSpecs(0) - if len(p.PendingChunkSpecs) != 1 { + if p.numPendingChunks() != 1 { t.Fatalf("should only be 1 chunk: %v", p.PendingChunkSpecs) } // TODO: Set chunkSize to 2, to test odd/even silliness. - if false { - if _, ok := p.PendingChunkSpecs[chunkSpec{ - Length: 13, - }]; !ok { - t.Fatal("pending chunk spec is incorrect") - } + if chunkIndexSpec(0, tor.pieceLength(0)).Length != 5 { + t.Fatal("pending chunk spec is incorrect") } } diff --git a/piece.go b/piece.go index 37b894b9..6bb96a3e 100644 --- a/piece.go +++ b/piece.go @@ -3,6 +3,8 @@ package torrent import ( "math/rand" "sync" + + pp "github.com/anacrolix/torrent/peer_protocol" ) type piecePriority byte @@ -16,8 +18,10 @@ const ( ) type piece struct { - Hash pieceSum - PendingChunkSpecs map[chunkSpec]struct{} + Hash pieceSum + // Chunks we don't have. The offset and length can be determined by our + // request chunkSize in use. + PendingChunkSpecs []bool Hashing bool QueuedForHash bool EverHashed bool @@ -25,13 +29,46 @@ type piece struct { Priority piecePriority } -func (p *piece) shuffledPendingChunkSpecs() (css []chunkSpec) { - if len(p.PendingChunkSpecs) == 0 { +func (p *piece) pendingChunk(cs chunkSpec) bool { + if p.PendingChunkSpecs == nil { + return false + } + return p.PendingChunkSpecs[chunkIndex(cs)] +} + +func (p *piece) numPendingChunks() (ret int) { + for _, pending := range p.PendingChunkSpecs { + if pending { + ret++ + } + } + return +} + +func (p *piece) unpendChunkIndex(i int) { + if p.PendingChunkSpecs == nil { + return + } + p.PendingChunkSpecs[i] = false +} + +func chunkIndexSpec(index int, pieceLength pp.Integer) chunkSpec { + ret := chunkSpec{pp.Integer(index) * chunkSize, chunkSize} + if ret.Begin+ret.Length > pieceLength { + ret.Length = pieceLength - ret.Begin + } + return ret +} + +func (p *piece) shuffledPendingChunkSpecs(pieceLength pp.Integer) (css []chunkSpec) { + if p.numPendingChunks() == 0 { return } - css = make([]chunkSpec, 0, len(p.PendingChunkSpecs)) - for cs := range p.PendingChunkSpecs { - css = append(css, cs) + css = make([]chunkSpec, 0, p.numPendingChunks()) + for i, pending := range p.PendingChunkSpecs { + if pending { + css = append(css, chunkIndexSpec(i, pieceLength)) + } } if len(css) <= 1 { return diff --git a/torrent.go b/torrent.go index 6d2a42d8..a764a662 100644 --- a/torrent.go +++ b/torrent.go @@ -25,15 +25,13 @@ func (t *torrent) pieceNumPendingBytes(index int) (count pp.Integer) { return 0 } piece := t.Pieces[index] + pieceLength := t.pieceLength(index) if !piece.EverHashed { - return t.pieceLength(index) - } - pendingChunks := t.Pieces[index].PendingChunkSpecs - count = pp.Integer(len(pendingChunks)) * chunkSize - _lastChunkSpec := lastChunkSpec(t.pieceLength(index)) - if _lastChunkSpec.Length != chunkSize { - if _, ok := pendingChunks[_lastChunkSpec]; ok { - count += _lastChunkSpec.Length - chunkSize + return pieceLength + } + for i, pending := range piece.PendingChunkSpecs { + if pending { + count += chunkIndexSpec(i, pieceLength).Length } } return @@ -570,11 +568,28 @@ func (t *torrent) writeChunk(piece int, begin int64, data []byte) (err error) { func (t *torrent) bitfield() (bf []bool) { for _, p := range t.Pieces { // TODO: Check this logic. - bf = append(bf, p.EverHashed && len(p.PendingChunkSpecs) == 0) + bf = append(bf, p.EverHashed && p.numPendingChunks() == 0) } return } +func (t *torrent) validOutgoingRequest(r request) bool { + if r.Index >= pp.Integer(t.Info.NumPieces()) { + return false + } + if r.Begin%chunkSize != 0 { + return false + } + if r.Length > chunkSize { + return false + } + pieceLength := t.pieceLength(int(r.Index)) + if r.Begin+r.Length > pieceLength { + return false + } + return r.Length == chunkSize || r.Begin+r.Length == pieceLength +} + func (t *torrent) pieceChunks(piece int) (css []chunkSpec) { css = make([]chunkSpec, 0, (t.pieceLength(piece)+chunkSize-1)/chunkSize) var cs chunkSpec @@ -589,16 +604,16 @@ func (t *torrent) pieceChunks(piece int) (css []chunkSpec) { return } -func (t *torrent) pendAllChunkSpecs(index int) { - piece := t.Pieces[index] +func (t *torrent) pendAllChunkSpecs(pieceIndex int) { + piece := t.Pieces[pieceIndex] if piece.PendingChunkSpecs == nil { - piece.PendingChunkSpecs = make( - map[chunkSpec]struct{}, - (t.pieceLength(index)+chunkSize-1)/chunkSize) + // Allocate to exact size. + piece.PendingChunkSpecs = make([]bool, (t.pieceLength(pieceIndex)+chunkSize-1)/chunkSize) } + // Pend all the chunks. pcss := piece.PendingChunkSpecs - for _, cs := range t.pieceChunks(int(index)) { - pcss[cs] = struct{}{} + for i := range pcss { + pcss[i] = true } return } @@ -658,21 +673,22 @@ func (t *torrent) haveChunk(r request) bool { if !t.haveInfo() { return false } - piece := t.Pieces[r.Index] - _, ok := piece.PendingChunkSpecs[r.chunkSpec] - // log.Println("have chunk", r, !ok) - return !ok + return !t.Pieces[r.Index].pendingChunk(r.chunkSpec) +} + +func chunkIndex(cs chunkSpec) int { + return int(cs.Begin / chunkSize) } +// TODO: This should probably be called wantPiece. func (t *torrent) wantChunk(r request) bool { if !t.wantPiece(int(r.Index)) { return false } - _, ok := t.Pieces[r.Index].PendingChunkSpecs[r.chunkSpec] - if ok { + if t.Pieces[r.Index].pendingChunk(r.chunkSpec) { return true } - _, ok = t.urgent[r] + _, ok := t.urgent[r] return ok } @@ -685,6 +701,7 @@ func (t *torrent) urgentChunkInPiece(piece int) bool { return false } +// TODO: This should be called wantPieceIndex. func (t *torrent) wantPiece(index int) bool { if !t.haveInfo() { return false