}
if _, ok := t.urgent[req]; !ok && !t.haveChunk(req) {
if t.urgent == nil {
- t.urgent = make(map[request]struct{}, (n+chunkSize-1)/chunkSize)
+ t.urgent = make(map[request]struct{}, (n+int(t.chunkSize)-1)/int(t.chunkSize))
}
t.urgent[req] = struct{}{}
cl.event.Broadcast() // Why?
// it.
func newTorrent(ih InfoHash) (t *torrent, err error) {
t = &torrent{
- InfoHash: ih,
- Peers: make(map[peersKey]Peer),
+ InfoHash: ih,
+ chunkSize: defaultChunkSize,
+ Peers: make(map[peersKey]Peer),
closing: make(chan struct{}),
ceasingNetworking: make(chan struct{}),
// Specifies a new torrent for adding to a client. There are helpers for
// magnet URIs and torrent metainfo files.
type TorrentSpec struct {
- Trackers [][]string
- InfoHash InfoHash
- Info *metainfo.InfoEx
+ // The tiered tracker URIs.
+ Trackers [][]string
+ InfoHash InfoHash
+ Info *metainfo.InfoEx
+ // The name to use if the Name field from the Info isn't available.
DisplayName string
+ // The chunk size to use for outbound requests. Defaults to 16KiB if not
+ // set.
+ ChunkSize int
}
func TorrentSpecFromMagnetURI(uri string) (spec *TorrentSpec, err error) {
if err != nil {
return
}
+ if spec.ChunkSize != 0 {
+ t.chunkSize = pp.Integer(spec.ChunkSize)
+ }
}
if spec.DisplayName != "" {
t.DisplayName = spec.DisplayName
panic("unwanted piece in connection request order")
}
piece := t.Pieces[pieceIndex]
- for _, cs := range piece.shuffledPendingChunkSpecs(t.pieceLength(pieceIndex)) {
+ for _, cs := range piece.shuffledPendingChunkSpecs(t.pieceLength(pieceIndex), pp.Integer(t.chunkSize)) {
r := request{pp.Integer(pieceIndex), cs}
if !addRequest(r) {
return
// log.Println("got chunk", req)
piece.Event.Broadcast()
// Record that we have the chunk.
- piece.unpendChunkIndex(chunkIndex(req.chunkSpec))
+ piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
delete(t.urgent, req)
if piece.numPendingChunks() == 0 {
for _, c := range t.Conns {
if err != nil {
t.Fatal(err)
}
+ tor.chunkSize = 2
err = tor.setMetadata(&mi.Info.Info, mi.Info.Bytes, nil)
if err != nil {
t.Fatal(err)
}
p := tor.Pieces[0]
tor.pendAllChunkSpecs(0)
- if p.numPendingChunks() != 1 {
- t.Fatalf("should only be 1 chunk: %v", p.PendingChunkSpecs)
- }
- // TODO: Set chunkSize to 2, to test odd/even silliness.
- if chunkIndexSpec(0, tor.pieceLength(0)).Length != 5 {
- t.Fatal("pending chunk spec is incorrect")
- }
+ assert.EqualValues(t, 3, p.numPendingChunks())
+ assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
}
func TestUnmarshalPEXMsg(t *testing.T) {
cfg.TorrentDataOpener = blob.NewStore(leecherDataDir).OpenTorrent
leecher, _ := NewClient(&cfg)
defer leecher.Close()
- leecherGreeting, _, _ := leecher.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
+ leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
+ ret = TorrentSpecFromMetaInfo(mi)
+ ret.ChunkSize = 2
+ return
+ }())
leecherGreeting.AddPeers([]Peer{
Peer{
IP: util.AddrIP(seeder.ListenAddr()),
)
const (
- pieceHash = crypto.SHA1
- maxRequests = 250 // Maximum pending requests we allow peers to send us.
- chunkSize = 0x4000 // 16KiB
+ pieceHash = crypto.SHA1
+ maxRequests = 250 // Maximum pending requests we allow peers to send us.
+ defaultChunkSize = 0x4000 // 16KiB
// Peer ID client identifier prefix. We'll update this occasionally to
// reflect changes to client behaviour that other clients may depend on.
// Also see `extendedHandshakeClientVersion`.
return fmt.Sprintf("%x", ih[:])
}
-func lastChunkSpec(pieceLength pp.Integer) (cs chunkSpec) {
+func lastChunkSpec(pieceLength, chunkSize pp.Integer) (cs chunkSpec) {
cs.Begin = (pieceLength - 1) / chunkSize * chunkSize
cs.Length = pieceLength - cs.Begin
return
func (suite) TestTorrentOffsetRequest(c *C) {
check := func(tl, ps, off int64, expected request, ok bool) {
- req, _ok := torrentOffsetRequest(tl, ps, chunkSize, off)
+ req, _ok := torrentOffsetRequest(tl, ps, defaultChunkSize, off)
c.Check(_ok, Equals, ok)
c.Check(req, Equals, expected)
}
Priority piecePriority
}
-func (p *piece) pendingChunk(cs chunkSpec) bool {
+func (p *piece) pendingChunk(cs chunkSpec, chunkSize pp.Integer) bool {
if p.PendingChunkSpecs == nil {
return false
}
- return p.PendingChunkSpecs[chunkIndex(cs)]
+ return p.PendingChunkSpecs[chunkIndex(cs, chunkSize)]
}
func (p *piece) numPendingChunks() (ret int) {
p.PendingChunkSpecs[i] = false
}
-func chunkIndexSpec(index int, pieceLength pp.Integer) chunkSpec {
+func chunkIndexSpec(index int, pieceLength, chunkSize pp.Integer) chunkSpec {
ret := chunkSpec{pp.Integer(index) * chunkSize, chunkSize}
if ret.Begin+ret.Length > pieceLength {
ret.Length = pieceLength - ret.Begin
return ret
}
-func (p *piece) shuffledPendingChunkSpecs(pieceLength pp.Integer) (css []chunkSpec) {
+func (p *piece) shuffledPendingChunkSpecs(pieceLength, chunkSize pp.Integer) (css []chunkSpec) {
if p.numPendingChunks() == 0 {
return
}
css = make([]chunkSpec, 0, p.numPendingChunks())
for i, pending := range p.PendingChunkSpecs {
if pending {
- css = append(css, chunkIndexSpec(i, pieceLength))
+ css = append(css, chunkIndexSpec(i, pieceLength, chunkSize))
}
}
if len(css) <= 1 {
}
for i, pending := range piece.PendingChunkSpecs {
if pending {
- count += chunkIndexSpec(i, pieceLength).Length
+ count += chunkIndexSpec(i, pieceLength, t.chunkSize).Length
}
}
return
// announcing, and communicating with peers.
ceasingNetworking chan struct{}
- InfoHash InfoHash
- Pieces []*piece
+ InfoHash InfoHash
+ Pieces []*piece
+ chunkSize pp.Integer
// Chunks that are wanted before all others. This is for
// responsive/streaming readers that want to unblock ASAP.
urgent map[request]struct{}
// Return the request that would include the given offset into the torrent
// data. Returns !ok if there is no such request.
func (t *torrent) offsetRequest(off int64) (req request, ok bool) {
- return torrentOffsetRequest(t.Length(), t.Info.PieceLength, chunkSize, off)
+ return torrentOffsetRequest(t.Length(), t.Info.PieceLength, int64(t.chunkSize), off)
}
func (t *torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
if r.Index >= pp.Integer(t.Info.NumPieces()) {
return false
}
- if r.Begin%chunkSize != 0 {
+ if r.Begin%t.chunkSize != 0 {
return false
}
- if r.Length > chunkSize {
+ if r.Length > t.chunkSize {
return false
}
pieceLength := t.pieceLength(int(r.Index))
if r.Begin+r.Length > pieceLength {
return false
}
- return r.Length == chunkSize || r.Begin+r.Length == pieceLength
+ return r.Length == t.chunkSize || r.Begin+r.Length == pieceLength
}
func (t *torrent) pieceChunks(piece int) (css []chunkSpec) {
- css = make([]chunkSpec, 0, (t.pieceLength(piece)+chunkSize-1)/chunkSize)
+ css = make([]chunkSpec, 0, (t.pieceLength(piece)+t.chunkSize-1)/t.chunkSize)
var cs chunkSpec
for left := t.pieceLength(piece); left != 0; left -= cs.Length {
cs.Length = left
- if cs.Length > chunkSize {
- cs.Length = chunkSize
+ if cs.Length > t.chunkSize {
+ cs.Length = t.chunkSize
}
css = append(css, cs)
cs.Begin += cs.Length
piece := t.Pieces[pieceIndex]
if piece.PendingChunkSpecs == nil {
// Allocate to exact size.
- piece.PendingChunkSpecs = make([]bool, (t.pieceLength(pieceIndex)+chunkSize-1)/chunkSize)
+ piece.PendingChunkSpecs = make([]bool, (t.pieceLength(pieceIndex)+t.chunkSize-1)/t.chunkSize)
}
// Pend all the chunks.
pcss := piece.PendingChunkSpecs
if !t.haveInfo() {
return false
}
- return !t.Pieces[r.Index].pendingChunk(r.chunkSpec)
+ p := t.Pieces[r.Index]
+ return !p.pendingChunk(r.chunkSpec, t.chunkSize)
}
-func chunkIndex(cs chunkSpec) int {
+func chunkIndex(cs chunkSpec, chunkSize pp.Integer) int {
return int(cs.Begin / chunkSize)
}
if !t.wantPiece(int(r.Index)) {
return false
}
- if t.Pieces[r.Index].pendingChunk(r.chunkSpec) {
+ if t.Pieces[r.Index].pendingChunk(r.chunkSpec, t.chunkSize) {
return true
}
_, ok := t.urgent[r]