From: Matt Joiner <anacrolix@gmail.com>
Date: Thu, 20 Mar 2014 05:58:09 +0000 (+1100)
Subject: Improvements to pending chunks; extract testutil
X-Git-Tag: v1.0.0~1771
X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=fa000c4f882cbf1442ca5c0f9c81ba269a3c7416;p=btrtrc.git

Improvements to pending chunks; extract testutil
---

diff --git a/client.go b/client.go
index 8f163780..f7b5ebae 100644
--- a/client.go
+++ b/client.go
@@ -1,9 +1,6 @@
 package torrent
 
 import (
-	"bitbucket.org/anacrolix/go.torrent/peer_protocol"
-	"bitbucket.org/anacrolix/go.torrent/tracker"
-	_ "bitbucket.org/anacrolix/go.torrent/tracker/udp"
 	"bufio"
 	"container/list"
 	"crypto"
@@ -11,9 +8,7 @@ import (
 	"encoding"
 	"errors"
 	"fmt"
-	metainfo "github.com/nsf/libtorgo/torrent"
 	"io"
-	"launchpad.net/gommap"
 	"log"
 	mathRand "math/rand"
 	"net"
@@ -22,6 +17,13 @@ import (
 	"sort"
 	"sync"
 	"time"
+
+	metainfo "github.com/nsf/libtorgo/torrent"
+
+	"bitbucket.org/anacrolix/go.torrent/peer_protocol"
+	"bitbucket.org/anacrolix/go.torrent/tracker"
+	_ "bitbucket.org/anacrolix/go.torrent/tracker/udp"
+	"launchpad.net/gommap"
 )
 
 const (
@@ -252,6 +254,10 @@ type Torrent struct {
 	Trackers [][]tracker.Client
 }
 
+func (t *Torrent) NumPieces() int {
+	return len(t.MetaInfo.Pieces) / PieceHash.Size()
+}
+
 func (t *Torrent) Length() int64 {
 	return int64(t.PieceLength(peer_protocol.Integer(len(t.Pieces)-1))) + int64(len(t.Pieces)-1)*int64(t.PieceLength(0))
 }
@@ -303,6 +309,25 @@ func (cl *Client) queuePieceCheck(t *Torrent, pieceIndex peer_protocol.Integer)
 	go cl.verifyPiece(t, pieceIndex)
 }
 
+func (t *Torrent) offsetRequest(off int64) (req Request, ok bool) {
+	req.Index = peer_protocol.Integer(off / t.MetaInfo.PieceLength)
+	if req.Index < 0 || int(req.Index) >= len(t.Pieces) {
+		return
+	}
+	off %= t.MetaInfo.PieceLength
+	pieceLeft := t.PieceLength(req.Index) - peer_protocol.Integer(off)
+	if pieceLeft <= 0 {
+		return
+	}
+	req.Begin = chunkSize * (peer_protocol.Integer(off) / chunkSize)
+	req.Length = chunkSize
+	if req.Length > pieceLeft {
+		req.Length = pieceLeft
+	}
+	ok = true
+	return
+}
+
 func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) {
 	cl.mu.Lock()
 	defer cl.mu.Unlock()
@@ -310,23 +335,16 @@ func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) {
 	newPriorities := make([]Request, 0, (len_+2*(chunkSize-1))/chunkSize)
 	for len_ > 0 {
 		// TODO: Write a function to return the Request for a given offset.
-		index := peer_protocol.Integer(off / t.MetaInfo.PieceLength)
-		pieceOff := peer_protocol.Integer(off % t.MetaInfo.PieceLength)
-		piece := t.Pieces[index]
-		if !piece.EverHashed {
-			cl.queuePieceCheck(t, index)
-		}
-		chunk := ChunkSpec{pieceOff / chunkSize * chunkSize, chunkSize}
-		if chunk.Begin+chunk.Length > t.PieceLength(index) {
-			chunk.Length = t.PieceLength(index) - chunk.Begin
+		req, ok := t.offsetRequest(off)
+		if !ok {
+			break
 		}
-		adv := int64(chunk.Length - pieceOff%chunkSize)
-		off += adv
-		len_ -= adv
-		if _, ok := piece.PendingChunkSpecs[chunk]; !ok && !piece.Hashing {
+		off += int64(req.Length)
+		len_ -= int64(req.Length)
+		if _, ok = t.Pieces[req.Index].PendingChunkSpecs[req.ChunkSpec]; !ok {
 			continue
 		}
-		newPriorities = append(newPriorities, Request{index, chunk})
+		newPriorities = append(newPriorities, req)
 	}
 	if len(newPriorities) == 0 {
 		return
@@ -356,12 +374,19 @@ func (t *Torrent) bitfield() (bf []bool) {
 	return
 }
 
-func (t *Torrent) pieceChunkSpecs(index peer_protocol.Integer) (cs map[ChunkSpec]struct{}) {
-	cs = make(map[ChunkSpec]struct{}, (t.MetaInfo.PieceLength+chunkSize-1)/chunkSize)
+func (t *Torrent) pendAllChunkSpecs(index peer_protocol.Integer) {
+	piece := t.Pieces[index]
+	if piece.PendingChunkSpecs == nil {
+		piece.PendingChunkSpecs = make(
+			map[ChunkSpec]struct{},
+			(t.MetaInfo.PieceLength+chunkSize-1)/chunkSize)
+	}
 	c := ChunkSpec{
 		Begin: 0,
 	}
-	for left := peer_protocol.Integer(t.PieceLength(index)); left > 0; left -= c.Length {
+	cs := piece.PendingChunkSpecs
+	log.Print(index, t.PieceLength(index))
+	for left := peer_protocol.Integer(t.PieceLength(index)); left != 0; left -= c.Length {
 		c.Length = left
 		if c.Length > chunkSize {
 			c.Length = chunkSize
@@ -389,7 +414,7 @@ type Peer struct {
 }
 
 func (t *Torrent) PieceLength(piece peer_protocol.Integer) (len_ peer_protocol.Integer) {
-	if int(piece) == len(t.Pieces)-1 {
+	if int(piece) == t.NumPieces()-1 {
 		len_ = peer_protocol.Integer(t.Data.Size() % t.MetaInfo.PieceLength)
 	}
 	if len_ == 0 {
@@ -504,6 +529,15 @@ func (c *Client) Start() {
 	}
 }
 
+func (cl *Client) stopped() bool {
+	select {
+	case <-cl.quit:
+		return true
+	default:
+		return false
+	}
+}
+
 func (me *Client) Stop() {
 	close(me.quit)
 	me.event.Broadcast()
@@ -730,10 +764,14 @@ func (me *Client) connectionLoop(torrent *Torrent, conn *Connection) error {
 	}
 	for {
 		me.mu.Unlock()
+		// TODO: Can this be allocated on the stack?
 		msg := new(peer_protocol.Message)
 		err := decoder.Decode(msg)
 		me.mu.Lock()
 		if err != nil {
+			if me.stopped() {
+				return nil
+			}
 			return err
 		}
 		if msg.Keepalive {
@@ -872,6 +910,10 @@ func newTorrent(metaInfo *metainfo.MetaInfo, dataDir string) (torrent *Torrent,
 		InfoHash: BytesInfoHash(metaInfo.InfoHash),
 		MetaInfo: metaInfo,
 	}
+	torrent.Data, err = mmapTorrentData(metaInfo, dataDir)
+	if err != nil {
+		return
+	}
 	for offset := 0; offset < len(metaInfo.Pieces); offset += PieceHash.Size() {
 		hash := metaInfo.Pieces[offset : offset+PieceHash.Size()]
 		if len(hash) != PieceHash.Size() {
@@ -881,10 +923,7 @@ func newTorrent(metaInfo *metainfo.MetaInfo, dataDir string) (torrent *Torrent,
 		piece := &piece{}
 		copyHashSum(piece.Hash[:], hash)
 		torrent.Pieces = append(torrent.Pieces, piece)
-	}
-	torrent.Data, err = mmapTorrentData(metaInfo, dataDir)
-	if err != nil {
-		return
+		torrent.pendAllChunkSpecs(peer_protocol.Integer(len(torrent.Pieces) - 1))
 	}
 	torrent.Trackers = make([][]tracker.Client, len(metaInfo.AnnounceList))
 	for tierIndex := range metaInfo.AnnounceList {
@@ -1028,7 +1067,9 @@ func (me *Client) replenishConnRequests(torrent *Torrent, conn *Connection) {
 			}
 		}
 	}
-	conn.SetInterested(false)
+	if len(conn.Requests) == 0 {
+		conn.SetInterested(false)
+	}
 }
 
 func (me *Client) downloadedChunk(torrent *Torrent, msg *peer_protocol.Message) (err error) {
@@ -1098,7 +1139,7 @@ func (me *Client) pieceHashed(t *Torrent, piece peer_protocol.Integer, correct b
 	} else {
 		log.Print("piece failed hash")
 		if len(p.PendingChunkSpecs) == 0 {
-			p.PendingChunkSpecs = t.pieceChunkSpecs(piece)
+			t.pendAllChunkSpecs(piece)
 		}
 	}
 	for _, conn := range t.Conns {
diff --git a/client_test.go b/client_test.go
index ec15ee46..4cb963ac 100644
--- a/client_test.go
+++ b/client_test.go
@@ -1,6 +1,10 @@
 package torrent
 
 import (
+	"os"
+
+	"bitbucket.org/anacrolix/go.torrent/testutil"
+
 	"testing"
 )
 
@@ -15,3 +19,30 @@ func TestAddTorrentNoUsableURLs(t *testing.T) {
 func TestAddPeersToUnknownTorrent(t *testing.T) {
 	t.SkipNow()
 }
+
+func TestPieceHashSize(t *testing.T) {
+	if PieceHash.Size() != 20 {
+		t.FailNow()
+	}
+}
+
+func TestTorrentInitialState(t *testing.T) {
+	dir, mi := testutil.GreetingTestTorrent()
+	defer os.RemoveAll(dir)
+	tor, err := newTorrent(mi, "")
+	if err != nil {
+		t.Fatal(err)
+	}
+	if len(tor.Pieces) != 1 {
+		t.Fatal("wrong number of pieces")
+	}
+	p := tor.Pieces[0]
+	if len(p.PendingChunkSpecs) != 1 {
+		t.Fatalf("should only be 1 chunk: %s", p.PendingChunkSpecs)
+	}
+	if _, ok := p.PendingChunkSpecs[ChunkSpec{
+		Length: 13,
+	}]; !ok {
+		t.Fatal("pending chunk spec is incorrect")
+	}
+}
diff --git a/cmd/torrent/main.go b/cmd/torrent/main.go
index 6f53f6c0..cbc9512b 100644
--- a/cmd/torrent/main.go
+++ b/cmd/torrent/main.go
@@ -1,16 +1,17 @@
 package main
 
 import (
-	"bitbucket.org/anacrolix/go.torrent"
-	"bitbucket.org/anacrolix/go.torrent/tracker"
 	"flag"
 	"fmt"
-	metainfo "github.com/nsf/libtorgo/torrent"
 	"log"
 	"net"
 	"net/http"
 	_ "net/http/pprof"
 	"os"
+
+	metainfo "github.com/nsf/libtorgo/torrent"
+
+	"bitbucket.org/anacrolix/go.torrent"
 )
 
 var (
@@ -30,7 +31,6 @@ func main() {
 	}
 	client := torrent.Client{
 		DataDir: *downloadDir,
-		// HalfOpenLimit: 2,
 	}
 	client.Start()
 	defer client.Stop()
@@ -47,6 +47,7 @@ func main() {
 		if err != nil {
 			log.Fatal(err)
 		}
+		client.PrioritizeDataRegion(torrent.BytesInfoHash(metaInfo.InfoHash), 0, 999999999)
 		err = client.AddPeers(torrent.BytesInfoHash(metaInfo.InfoHash), func() []torrent.Peer {
 			if *testPeer == "" {
 				return nil
@@ -56,10 +57,9 @@ func main() {
 				log.Fatal(err)
 			}
 			return []torrent.Peer{{
-				Peer: tracker.Peer{
-					IP:   addr.IP,
-					Port: addr.Port,
-				}}}
+				IP:   addr.IP,
+				Port: addr.Port,
+			}}
 		}())
 		if err != nil {
 			log.Fatal(err)
diff --git a/fs/torrentfs_test.go b/fs/torrentfs_test.go
index 0c38850d..f43b913d 100644
--- a/fs/torrentfs_test.go
+++ b/fs/torrentfs_test.go
@@ -1,18 +1,20 @@
 package torrentfs
 
 import (
-	"bazil.org/fuse"
-	fusefs "bazil.org/fuse/fs"
-	"bitbucket.org/anacrolix/go.torrent"
 	"bytes"
-	metainfo "github.com/nsf/libtorgo/torrent"
-	"io"
 	"io/ioutil"
 	"net"
 	"os"
 	"path/filepath"
-	"runtime"
 	"testing"
+	"time"
+
+	"bitbucket.org/anacrolix/go.torrent/testutil"
+
+	"bazil.org/fuse"
+	fusefs "bazil.org/fuse/fs"
+	"bitbucket.org/anacrolix/go.torrent"
+	metainfo "github.com/nsf/libtorgo/torrent"
 )
 
 func TestTCPAddrString(t *testing.T) {
@@ -37,34 +39,7 @@ func TestTCPAddrString(t *testing.T) {
 	}
 }
 
-const dummyFileContents = "hello, world\n"
-
-func createDummyTorrentData(dirName string) string {
-	f, _ := os.Create(filepath.Join(dirName, "greeting"))
-	f.WriteString("hello, world\n")
-	return f.Name()
-}
-
-func createMetaInfo(name string, w io.Writer) {
-	builder := metainfo.Builder{}
-	builder.AddFile(name)
-	builder.AddAnnounceGroup([]string{"lol://cheezburger"})
-	batch, err := builder.Submit()
-	if err != nil {
-		panic(err)
-	}
-	errs, _ := batch.Start(w, 1)
-	<-errs
-}
-
 func TestDownloadOnDemand(t *testing.T) {
-	priorNumGoroutines := runtime.NumGoroutine()
-	defer func() {
-		n := runtime.NumGoroutine()
-		if n != priorNumGoroutines {
-			t.Fatalf("expected %d goroutines, but %d are running", priorNumGoroutines, n)
-		}
-	}()
 	dir, err := ioutil.TempDir("", "torrentfs")
 	if err != nil {
 		t.Fatal(err)
@@ -77,9 +52,9 @@ func TestDownloadOnDemand(t *testing.T) {
 	t.Logf("test directory: %s", dir)
 	finishedDir := filepath.Join(dir, "finished")
 	os.Mkdir(finishedDir, 0777)
-	name := createDummyTorrentData(finishedDir)
+	name := testutil.CreateDummyTorrentData(finishedDir)
 	metaInfoBuf := &bytes.Buffer{}
-	createMetaInfo(name, metaInfoBuf)
+	testutil.CreateMetaInfo(name, metaInfoBuf)
 	metaInfo, err := metainfo.Load(metaInfoBuf)
 	seeder := torrent.Client{
 		DataDir: finishedDir,
@@ -132,11 +107,15 @@ func TestDownloadOnDemand(t *testing.T) {
 	if fuseConn.MountError != nil {
 		t.Fatal(fuseConn.MountError)
 	}
+	go func() {
+		time.Sleep(10 * time.Second)
+		fuse.Unmount(mountDir)
+	}()
 	content, err := ioutil.ReadFile(filepath.Join(mountDir, "greeting"))
 	if err != nil {
 		t.Fatal(err)
 	}
-	if string(content) != dummyFileContents {
+	if string(content) != testutil.GreetingFileContents {
 		t.FailNow()
 	}
 }
diff --git a/testutil/testutil.go b/testutil/testutil.go
new file mode 100644
index 00000000..4dba8144
--- /dev/null
+++ b/testutil/testutil.go
@@ -0,0 +1,44 @@
+package testutil
+
+import (
+	"io"
+	"io/ioutil"
+	"os"
+	"path/filepath"
+
+	metainfo "github.com/nsf/libtorgo/torrent"
+
+	"bytes"
+)
+
+const GreetingFileContents = "hello, world\n"
+
+func CreateDummyTorrentData(dirName string) string {
+	f, _ := os.Create(filepath.Join(dirName, "greeting"))
+	f.WriteString("hello, world\n")
+	return f.Name()
+}
+
+func CreateMetaInfo(name string, w io.Writer) {
+	builder := metainfo.Builder{}
+	builder.AddFile(name)
+	builder.AddAnnounceGroup([]string{"lol://cheezburger"})
+	batch, err := builder.Submit()
+	if err != nil {
+		panic(err)
+	}
+	errs, _ := batch.Start(w, 1)
+	<-errs
+}
+
+func GreetingTestTorrent() (tempDir string, metaInfo *metainfo.MetaInfo) {
+	tempDir, err := ioutil.TempDir(os.TempDir(), "")
+	if err != nil {
+		panic(err)
+	}
+	name := CreateDummyTorrentData(tempDir)
+	w := &bytes.Buffer{}
+	CreateMetaInfo(name, w)
+	metaInfo, _ = metainfo.Load(w)
+	return
+}