From c018c660f05497f47b30081dbb215eeb9b4eb1a6 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Wed, 15 Jul 2015 15:31:18 +1000 Subject: [PATCH] Allow chunk size to be specified per torrent --- client.go | 25 +++++++++++++++++-------- client_test.go | 16 ++++++++-------- misc.go | 8 ++++---- misc_test.go | 2 +- piece.go | 10 +++++----- torrent.go | 30 ++++++++++++++++-------------- 6 files changed, 51 insertions(+), 40 deletions(-) diff --git a/client.go b/client.go index 586a9034..63d9e0ca 100644 --- a/client.go +++ b/client.go @@ -324,7 +324,7 @@ func (cl *Client) addUrgentRequests(t *torrent, off int64, n int) { } if _, ok := t.urgent[req]; !ok && !t.haveChunk(req) { if t.urgent == nil { - t.urgent = make(map[request]struct{}, (n+chunkSize-1)/chunkSize) + t.urgent = make(map[request]struct{}, (n+int(t.chunkSize)-1)/int(t.chunkSize)) } t.urgent[req] = struct{}{} cl.event.Broadcast() // Why? @@ -1911,8 +1911,9 @@ func (cl *Client) setMetaData(t *torrent, md *metainfo.Info, bytes []byte) (err // it. func newTorrent(ih InfoHash) (t *torrent, err error) { t = &torrent{ - InfoHash: ih, - Peers: make(map[peersKey]Peer), + InfoHash: ih, + chunkSize: defaultChunkSize, + Peers: make(map[peersKey]Peer), closing: make(chan struct{}), ceasingNetworking: make(chan struct{}), @@ -2078,10 +2079,15 @@ func (cl *Client) torrentCacheMetaInfo(ih InfoHash) (mi *metainfo.MetaInfo, err // Specifies a new torrent for adding to a client. There are helpers for // magnet URIs and torrent metainfo files. type TorrentSpec struct { - Trackers [][]string - InfoHash InfoHash - Info *metainfo.InfoEx + // The tiered tracker URIs. + Trackers [][]string + InfoHash InfoHash + Info *metainfo.InfoEx + // The name to use if the Name field from the Info isn't available. DisplayName string + // The chunk size to use for outbound requests. Defaults to 16KiB if not + // set. + ChunkSize int } func TorrentSpecFromMagnetURI(uri string) (spec *TorrentSpec, err error) { @@ -2129,6 +2135,9 @@ func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (T Torrent, new bool, err er if err != nil { return } + if spec.ChunkSize != 0 { + t.chunkSize = pp.Integer(spec.ChunkSize) + } } if spec.DisplayName != "" { t.DisplayName = spec.DisplayName @@ -2463,7 +2472,7 @@ func (me *Client) fillRequests(t *torrent, c *connection) { panic("unwanted piece in connection request order") } piece := t.Pieces[pieceIndex] - for _, cs := range piece.shuffledPendingChunkSpecs(t.pieceLength(pieceIndex)) { + for _, cs := range piece.shuffledPendingChunkSpecs(t.pieceLength(pieceIndex), pp.Integer(t.chunkSize)) { r := request{pp.Integer(pieceIndex), cs} if !addRequest(r) { return @@ -2524,7 +2533,7 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er // log.Println("got chunk", req) piece.Event.Broadcast() // Record that we have the chunk. - piece.unpendChunkIndex(chunkIndex(req.chunkSpec)) + piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize)) delete(t.urgent, req) if piece.numPendingChunks() == 0 { for _, c := range t.Conns { diff --git a/client_test.go b/client_test.go index 3cd1f0ec..98fa9464 100644 --- a/client_test.go +++ b/client_test.go @@ -93,6 +93,7 @@ func TestTorrentInitialState(t *testing.T) { if err != nil { t.Fatal(err) } + tor.chunkSize = 2 err = tor.setMetadata(&mi.Info.Info, mi.Info.Bytes, nil) if err != nil { t.Fatal(err) @@ -102,13 +103,8 @@ func TestTorrentInitialState(t *testing.T) { } p := tor.Pieces[0] tor.pendAllChunkSpecs(0) - if p.numPendingChunks() != 1 { - t.Fatalf("should only be 1 chunk: %v", p.PendingChunkSpecs) - } - // TODO: Set chunkSize to 2, to test odd/even silliness. - if chunkIndexSpec(0, tor.pieceLength(0)).Length != 5 { - t.Fatal("pending chunk spec is incorrect") - } + assert.EqualValues(t, 3, p.numPendingChunks()) + assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize)) } func TestUnmarshalPEXMsg(t *testing.T) { @@ -271,7 +267,11 @@ func TestClientTransfer(t *testing.T) { cfg.TorrentDataOpener = blob.NewStore(leecherDataDir).OpenTorrent leecher, _ := NewClient(&cfg) defer leecher.Close() - leecherGreeting, _, _ := leecher.AddTorrentSpec(TorrentSpecFromMetaInfo(mi)) + leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) { + ret = TorrentSpecFromMetaInfo(mi) + ret.ChunkSize = 2 + return + }()) leecherGreeting.AddPeers([]Peer{ Peer{ IP: util.AddrIP(seeder.ListenAddr()), diff --git a/misc.go b/misc.go index 532d4a72..95c5c1e6 100644 --- a/misc.go +++ b/misc.go @@ -11,9 +11,9 @@ import ( ) const ( - pieceHash = crypto.SHA1 - maxRequests = 250 // Maximum pending requests we allow peers to send us. - chunkSize = 0x4000 // 16KiB + pieceHash = crypto.SHA1 + maxRequests = 250 // Maximum pending requests we allow peers to send us. + defaultChunkSize = 0x4000 // 16KiB // Peer ID client identifier prefix. We'll update this occasionally to // reflect changes to client behaviour that other clients may depend on. // Also see `extendedHandshakeClientVersion`. @@ -35,7 +35,7 @@ func (ih *InfoHash) HexString() string { return fmt.Sprintf("%x", ih[:]) } -func lastChunkSpec(pieceLength pp.Integer) (cs chunkSpec) { +func lastChunkSpec(pieceLength, chunkSize pp.Integer) (cs chunkSpec) { cs.Begin = (pieceLength - 1) / chunkSize * chunkSize cs.Length = pieceLength - cs.Begin return diff --git a/misc_test.go b/misc_test.go index a8bd10f4..55cd3c66 100644 --- a/misc_test.go +++ b/misc_test.go @@ -4,7 +4,7 @@ import . "gopkg.in/check.v1" func (suite) TestTorrentOffsetRequest(c *C) { check := func(tl, ps, off int64, expected request, ok bool) { - req, _ok := torrentOffsetRequest(tl, ps, chunkSize, off) + req, _ok := torrentOffsetRequest(tl, ps, defaultChunkSize, off) c.Check(_ok, Equals, ok) c.Check(req, Equals, expected) } diff --git a/piece.go b/piece.go index a5e9947b..e326c2b4 100644 --- a/piece.go +++ b/piece.go @@ -32,11 +32,11 @@ type piece struct { Priority piecePriority } -func (p *piece) pendingChunk(cs chunkSpec) bool { +func (p *piece) pendingChunk(cs chunkSpec, chunkSize pp.Integer) bool { if p.PendingChunkSpecs == nil { return false } - return p.PendingChunkSpecs[chunkIndex(cs)] + return p.PendingChunkSpecs[chunkIndex(cs, chunkSize)] } func (p *piece) numPendingChunks() (ret int) { @@ -55,7 +55,7 @@ func (p *piece) unpendChunkIndex(i int) { p.PendingChunkSpecs[i] = false } -func chunkIndexSpec(index int, pieceLength pp.Integer) chunkSpec { +func chunkIndexSpec(index int, pieceLength, chunkSize pp.Integer) chunkSpec { ret := chunkSpec{pp.Integer(index) * chunkSize, chunkSize} if ret.Begin+ret.Length > pieceLength { ret.Length = pieceLength - ret.Begin @@ -63,14 +63,14 @@ func chunkIndexSpec(index int, pieceLength pp.Integer) chunkSpec { return ret } -func (p *piece) shuffledPendingChunkSpecs(pieceLength pp.Integer) (css []chunkSpec) { +func (p *piece) shuffledPendingChunkSpecs(pieceLength, chunkSize pp.Integer) (css []chunkSpec) { if p.numPendingChunks() == 0 { return } css = make([]chunkSpec, 0, p.numPendingChunks()) for i, pending := range p.PendingChunkSpecs { if pending { - css = append(css, chunkIndexSpec(i, pieceLength)) + css = append(css, chunkIndexSpec(i, pieceLength, chunkSize)) } } if len(css) <= 1 { diff --git a/torrent.go b/torrent.go index 50e2ed91..c4bb5990 100644 --- a/torrent.go +++ b/torrent.go @@ -32,7 +32,7 @@ func (t *torrent) pieceNumPendingBytes(index int) (count pp.Integer) { } for i, pending := range piece.PendingChunkSpecs { if pending { - count += chunkIndexSpec(i, pieceLength).Length + count += chunkIndexSpec(i, pieceLength, t.chunkSize).Length } } return @@ -61,8 +61,9 @@ type torrent struct { // announcing, and communicating with peers. ceasingNetworking chan struct{} - InfoHash InfoHash - Pieces []*piece + InfoHash InfoHash + Pieces []*piece + chunkSize pp.Integer // Chunks that are wanted before all others. This is for // responsive/streaming readers that want to unblock ASAP. urgent map[request]struct{} @@ -552,7 +553,7 @@ func (t *torrent) requestOffset(r request) int64 { // Return the request that would include the given offset into the torrent // data. Returns !ok if there is no such request. func (t *torrent) offsetRequest(off int64) (req request, ok bool) { - return torrentOffsetRequest(t.Length(), t.Info.PieceLength, chunkSize, off) + return torrentOffsetRequest(t.Length(), t.Info.PieceLength, int64(t.chunkSize), off) } func (t *torrent) writeChunk(piece int, begin int64, data []byte) (err error) { @@ -575,26 +576,26 @@ func (t *torrent) validOutgoingRequest(r request) bool { if r.Index >= pp.Integer(t.Info.NumPieces()) { return false } - if r.Begin%chunkSize != 0 { + if r.Begin%t.chunkSize != 0 { return false } - if r.Length > chunkSize { + if r.Length > t.chunkSize { return false } pieceLength := t.pieceLength(int(r.Index)) if r.Begin+r.Length > pieceLength { return false } - return r.Length == chunkSize || r.Begin+r.Length == pieceLength + return r.Length == t.chunkSize || r.Begin+r.Length == pieceLength } func (t *torrent) pieceChunks(piece int) (css []chunkSpec) { - css = make([]chunkSpec, 0, (t.pieceLength(piece)+chunkSize-1)/chunkSize) + css = make([]chunkSpec, 0, (t.pieceLength(piece)+t.chunkSize-1)/t.chunkSize) var cs chunkSpec for left := t.pieceLength(piece); left != 0; left -= cs.Length { cs.Length = left - if cs.Length > chunkSize { - cs.Length = chunkSize + if cs.Length > t.chunkSize { + cs.Length = t.chunkSize } css = append(css, cs) cs.Begin += cs.Length @@ -606,7 +607,7 @@ func (t *torrent) pendAllChunkSpecs(pieceIndex int) { piece := t.Pieces[pieceIndex] if piece.PendingChunkSpecs == nil { // Allocate to exact size. - piece.PendingChunkSpecs = make([]bool, (t.pieceLength(pieceIndex)+chunkSize-1)/chunkSize) + piece.PendingChunkSpecs = make([]bool, (t.pieceLength(pieceIndex)+t.chunkSize-1)/t.chunkSize) } // Pend all the chunks. pcss := piece.PendingChunkSpecs @@ -671,10 +672,11 @@ func (t *torrent) haveChunk(r request) bool { if !t.haveInfo() { return false } - return !t.Pieces[r.Index].pendingChunk(r.chunkSpec) + p := t.Pieces[r.Index] + return !p.pendingChunk(r.chunkSpec, t.chunkSize) } -func chunkIndex(cs chunkSpec) int { +func chunkIndex(cs chunkSpec, chunkSize pp.Integer) int { return int(cs.Begin / chunkSize) } @@ -683,7 +685,7 @@ func (t *torrent) wantChunk(r request) bool { if !t.wantPiece(int(r.Index)) { return false } - if t.Pieces[r.Index].pendingChunk(r.chunkSpec) { + if t.Pieces[r.Index].pendingChunk(r.chunkSpec, t.chunkSize) { return true } _, ok := t.urgent[r] -- 2.48.1