]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Improvements to pending chunks; extract testutil
authorMatt Joiner <anacrolix@gmail.com>
Thu, 20 Mar 2014 05:58:09 +0000 (16:58 +1100)
committerMatt Joiner <anacrolix@gmail.com>
Thu, 20 Mar 2014 05:58:09 +0000 (16:58 +1100)
client.go
client_test.go
cmd/torrent/main.go
fs/torrentfs_test.go
testutil/testutil.go [new file with mode: 0644]

index 8f163780412581a94e3a7ae4717ad08f6d61a3c2..f7b5ebaeee560bf2ca858e12ac93a722357754a8 100644 (file)
--- a/client.go
+++ b/client.go
@@ -1,9 +1,6 @@
 package torrent
 
 import (
-       "bitbucket.org/anacrolix/go.torrent/peer_protocol"
-       "bitbucket.org/anacrolix/go.torrent/tracker"
-       _ "bitbucket.org/anacrolix/go.torrent/tracker/udp"
        "bufio"
        "container/list"
        "crypto"
@@ -11,9 +8,7 @@ import (
        "encoding"
        "errors"
        "fmt"
-       metainfo "github.com/nsf/libtorgo/torrent"
        "io"
-       "launchpad.net/gommap"
        "log"
        mathRand "math/rand"
        "net"
@@ -22,6 +17,13 @@ import (
        "sort"
        "sync"
        "time"
+
+       metainfo "github.com/nsf/libtorgo/torrent"
+
+       "bitbucket.org/anacrolix/go.torrent/peer_protocol"
+       "bitbucket.org/anacrolix/go.torrent/tracker"
+       _ "bitbucket.org/anacrolix/go.torrent/tracker/udp"
+       "launchpad.net/gommap"
 )
 
 const (
@@ -252,6 +254,10 @@ type Torrent struct {
        Trackers [][]tracker.Client
 }
 
+func (t *Torrent) NumPieces() int {
+       return len(t.MetaInfo.Pieces) / PieceHash.Size()
+}
+
 func (t *Torrent) Length() int64 {
        return int64(t.PieceLength(peer_protocol.Integer(len(t.Pieces)-1))) + int64(len(t.Pieces)-1)*int64(t.PieceLength(0))
 }
@@ -303,6 +309,25 @@ func (cl *Client) queuePieceCheck(t *Torrent, pieceIndex peer_protocol.Integer)
        go cl.verifyPiece(t, pieceIndex)
 }
 
+func (t *Torrent) offsetRequest(off int64) (req Request, ok bool) {
+       req.Index = peer_protocol.Integer(off / t.MetaInfo.PieceLength)
+       if req.Index < 0 || int(req.Index) >= len(t.Pieces) {
+               return
+       }
+       off %= t.MetaInfo.PieceLength
+       pieceLeft := t.PieceLength(req.Index) - peer_protocol.Integer(off)
+       if pieceLeft <= 0 {
+               return
+       }
+       req.Begin = chunkSize * (peer_protocol.Integer(off) / chunkSize)
+       req.Length = chunkSize
+       if req.Length > pieceLeft {
+               req.Length = pieceLeft
+       }
+       ok = true
+       return
+}
+
 func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) {
        cl.mu.Lock()
        defer cl.mu.Unlock()
@@ -310,23 +335,16 @@ func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) {
        newPriorities := make([]Request, 0, (len_+2*(chunkSize-1))/chunkSize)
        for len_ > 0 {
                // TODO: Write a function to return the Request for a given offset.
-               index := peer_protocol.Integer(off / t.MetaInfo.PieceLength)
-               pieceOff := peer_protocol.Integer(off % t.MetaInfo.PieceLength)
-               piece := t.Pieces[index]
-               if !piece.EverHashed {
-                       cl.queuePieceCheck(t, index)
-               }
-               chunk := ChunkSpec{pieceOff / chunkSize * chunkSize, chunkSize}
-               if chunk.Begin+chunk.Length > t.PieceLength(index) {
-                       chunk.Length = t.PieceLength(index) - chunk.Begin
+               req, ok := t.offsetRequest(off)
+               if !ok {
+                       break
                }
-               adv := int64(chunk.Length - pieceOff%chunkSize)
-               off += adv
-               len_ -= adv
-               if _, ok := piece.PendingChunkSpecs[chunk]; !ok && !piece.Hashing {
+               off += int64(req.Length)
+               len_ -= int64(req.Length)
+               if _, ok = t.Pieces[req.Index].PendingChunkSpecs[req.ChunkSpec]; !ok {
                        continue
                }
-               newPriorities = append(newPriorities, Request{index, chunk})
+               newPriorities = append(newPriorities, req)
        }
        if len(newPriorities) == 0 {
                return
@@ -356,12 +374,19 @@ func (t *Torrent) bitfield() (bf []bool) {
        return
 }
 
-func (t *Torrent) pieceChunkSpecs(index peer_protocol.Integer) (cs map[ChunkSpec]struct{}) {
-       cs = make(map[ChunkSpec]struct{}, (t.MetaInfo.PieceLength+chunkSize-1)/chunkSize)
+func (t *Torrent) pendAllChunkSpecs(index peer_protocol.Integer) {
+       piece := t.Pieces[index]
+       if piece.PendingChunkSpecs == nil {
+               piece.PendingChunkSpecs = make(
+                       map[ChunkSpec]struct{},
+                       (t.MetaInfo.PieceLength+chunkSize-1)/chunkSize)
+       }
        c := ChunkSpec{
                Begin: 0,
        }
-       for left := peer_protocol.Integer(t.PieceLength(index)); left > 0; left -= c.Length {
+       cs := piece.PendingChunkSpecs
+       log.Print(index, t.PieceLength(index))
+       for left := peer_protocol.Integer(t.PieceLength(index)); left != 0; left -= c.Length {
                c.Length = left
                if c.Length > chunkSize {
                        c.Length = chunkSize
@@ -389,7 +414,7 @@ type Peer struct {
 }
 
 func (t *Torrent) PieceLength(piece peer_protocol.Integer) (len_ peer_protocol.Integer) {
-       if int(piece) == len(t.Pieces)-1 {
+       if int(piece) == t.NumPieces()-1 {
                len_ = peer_protocol.Integer(t.Data.Size() % t.MetaInfo.PieceLength)
        }
        if len_ == 0 {
@@ -504,6 +529,15 @@ func (c *Client) Start() {
        }
 }
 
+func (cl *Client) stopped() bool {
+       select {
+       case <-cl.quit:
+               return true
+       default:
+               return false
+       }
+}
+
 func (me *Client) Stop() {
        close(me.quit)
        me.event.Broadcast()
@@ -730,10 +764,14 @@ func (me *Client) connectionLoop(torrent *Torrent, conn *Connection) error {
        }
        for {
                me.mu.Unlock()
+               // TODO: Can this be allocated on the stack?
                msg := new(peer_protocol.Message)
                err := decoder.Decode(msg)
                me.mu.Lock()
                if err != nil {
+                       if me.stopped() {
+                               return nil
+                       }
                        return err
                }
                if msg.Keepalive {
@@ -872,6 +910,10 @@ func newTorrent(metaInfo *metainfo.MetaInfo, dataDir string) (torrent *Torrent,
                InfoHash: BytesInfoHash(metaInfo.InfoHash),
                MetaInfo: metaInfo,
        }
+       torrent.Data, err = mmapTorrentData(metaInfo, dataDir)
+       if err != nil {
+               return
+       }
        for offset := 0; offset < len(metaInfo.Pieces); offset += PieceHash.Size() {
                hash := metaInfo.Pieces[offset : offset+PieceHash.Size()]
                if len(hash) != PieceHash.Size() {
@@ -881,10 +923,7 @@ func newTorrent(metaInfo *metainfo.MetaInfo, dataDir string) (torrent *Torrent,
                piece := &piece{}
                copyHashSum(piece.Hash[:], hash)
                torrent.Pieces = append(torrent.Pieces, piece)
-       }
-       torrent.Data, err = mmapTorrentData(metaInfo, dataDir)
-       if err != nil {
-               return
+               torrent.pendAllChunkSpecs(peer_protocol.Integer(len(torrent.Pieces) - 1))
        }
        torrent.Trackers = make([][]tracker.Client, len(metaInfo.AnnounceList))
        for tierIndex := range metaInfo.AnnounceList {
@@ -1028,7 +1067,9 @@ func (me *Client) replenishConnRequests(torrent *Torrent, conn *Connection) {
                        }
                }
        }
-       conn.SetInterested(false)
+       if len(conn.Requests) == 0 {
+               conn.SetInterested(false)
+       }
 }
 
 func (me *Client) downloadedChunk(torrent *Torrent, msg *peer_protocol.Message) (err error) {
@@ -1098,7 +1139,7 @@ func (me *Client) pieceHashed(t *Torrent, piece peer_protocol.Integer, correct b
        } else {
                log.Print("piece failed hash")
                if len(p.PendingChunkSpecs) == 0 {
-                       p.PendingChunkSpecs = t.pieceChunkSpecs(piece)
+                       t.pendAllChunkSpecs(piece)
                }
        }
        for _, conn := range t.Conns {
index ec15ee46dd22e42ddf084983bfe0b4d9b2f03c1d..4cb963ac685d2fcfbe7fb1b7147d781ca15c8882 100644 (file)
@@ -1,6 +1,10 @@
 package torrent
 
 import (
+       "os"
+
+       "bitbucket.org/anacrolix/go.torrent/testutil"
+
        "testing"
 )
 
@@ -15,3 +19,30 @@ func TestAddTorrentNoUsableURLs(t *testing.T) {
 func TestAddPeersToUnknownTorrent(t *testing.T) {
        t.SkipNow()
 }
+
+func TestPieceHashSize(t *testing.T) {
+       if PieceHash.Size() != 20 {
+               t.FailNow()
+       }
+}
+
+func TestTorrentInitialState(t *testing.T) {
+       dir, mi := testutil.GreetingTestTorrent()
+       defer os.RemoveAll(dir)
+       tor, err := newTorrent(mi, "")
+       if err != nil {
+               t.Fatal(err)
+       }
+       if len(tor.Pieces) != 1 {
+               t.Fatal("wrong number of pieces")
+       }
+       p := tor.Pieces[0]
+       if len(p.PendingChunkSpecs) != 1 {
+               t.Fatalf("should only be 1 chunk: %s", p.PendingChunkSpecs)
+       }
+       if _, ok := p.PendingChunkSpecs[ChunkSpec{
+               Length: 13,
+       }]; !ok {
+               t.Fatal("pending chunk spec is incorrect")
+       }
+}
index 6f53f6c0ec419f94413436cabdb8e96cfb416820..cbc9512b119ea6d0285b2ebbe9a8313a38223be0 100644 (file)
@@ -1,16 +1,17 @@
 package main
 
 import (
-       "bitbucket.org/anacrolix/go.torrent"
-       "bitbucket.org/anacrolix/go.torrent/tracker"
        "flag"
        "fmt"
-       metainfo "github.com/nsf/libtorgo/torrent"
        "log"
        "net"
        "net/http"
        _ "net/http/pprof"
        "os"
+
+       metainfo "github.com/nsf/libtorgo/torrent"
+
+       "bitbucket.org/anacrolix/go.torrent"
 )
 
 var (
@@ -30,7 +31,6 @@ func main() {
        }
        client := torrent.Client{
                DataDir: *downloadDir,
-               // HalfOpenLimit: 2,
        }
        client.Start()
        defer client.Stop()
@@ -47,6 +47,7 @@ func main() {
                if err != nil {
                        log.Fatal(err)
                }
+               client.PrioritizeDataRegion(torrent.BytesInfoHash(metaInfo.InfoHash), 0, 999999999)
                err = client.AddPeers(torrent.BytesInfoHash(metaInfo.InfoHash), func() []torrent.Peer {
                        if *testPeer == "" {
                                return nil
@@ -56,10 +57,9 @@ func main() {
                                log.Fatal(err)
                        }
                        return []torrent.Peer{{
-                               Peer: tracker.Peer{
-                                       IP:   addr.IP,
-                                       Port: addr.Port,
-                               }}}
+                               IP:   addr.IP,
+                               Port: addr.Port,
+                       }}
                }())
                if err != nil {
                        log.Fatal(err)
index 0c38850d7773471d07444be91d1e5798724af598..f43b913de554ff46c569548a7bb78bca4e8bec03 100644 (file)
@@ -1,18 +1,20 @@
 package torrentfs
 
 import (
-       "bazil.org/fuse"
-       fusefs "bazil.org/fuse/fs"
-       "bitbucket.org/anacrolix/go.torrent"
        "bytes"
-       metainfo "github.com/nsf/libtorgo/torrent"
-       "io"
        "io/ioutil"
        "net"
        "os"
        "path/filepath"
-       "runtime"
        "testing"
+       "time"
+
+       "bitbucket.org/anacrolix/go.torrent/testutil"
+
+       "bazil.org/fuse"
+       fusefs "bazil.org/fuse/fs"
+       "bitbucket.org/anacrolix/go.torrent"
+       metainfo "github.com/nsf/libtorgo/torrent"
 )
 
 func TestTCPAddrString(t *testing.T) {
@@ -37,34 +39,7 @@ func TestTCPAddrString(t *testing.T) {
        }
 }
 
-const dummyFileContents = "hello, world\n"
-
-func createDummyTorrentData(dirName string) string {
-       f, _ := os.Create(filepath.Join(dirName, "greeting"))
-       f.WriteString("hello, world\n")
-       return f.Name()
-}
-
-func createMetaInfo(name string, w io.Writer) {
-       builder := metainfo.Builder{}
-       builder.AddFile(name)
-       builder.AddAnnounceGroup([]string{"lol://cheezburger"})
-       batch, err := builder.Submit()
-       if err != nil {
-               panic(err)
-       }
-       errs, _ := batch.Start(w, 1)
-       <-errs
-}
-
 func TestDownloadOnDemand(t *testing.T) {
-       priorNumGoroutines := runtime.NumGoroutine()
-       defer func() {
-               n := runtime.NumGoroutine()
-               if n != priorNumGoroutines {
-                       t.Fatalf("expected %d goroutines, but %d are running", priorNumGoroutines, n)
-               }
-       }()
        dir, err := ioutil.TempDir("", "torrentfs")
        if err != nil {
                t.Fatal(err)
@@ -77,9 +52,9 @@ func TestDownloadOnDemand(t *testing.T) {
        t.Logf("test directory: %s", dir)
        finishedDir := filepath.Join(dir, "finished")
        os.Mkdir(finishedDir, 0777)
-       name := createDummyTorrentData(finishedDir)
+       name := testutil.CreateDummyTorrentData(finishedDir)
        metaInfoBuf := &bytes.Buffer{}
-       createMetaInfo(name, metaInfoBuf)
+       testutil.CreateMetaInfo(name, metaInfoBuf)
        metaInfo, err := metainfo.Load(metaInfoBuf)
        seeder := torrent.Client{
                DataDir: finishedDir,
@@ -132,11 +107,15 @@ func TestDownloadOnDemand(t *testing.T) {
        if fuseConn.MountError != nil {
                t.Fatal(fuseConn.MountError)
        }
+       go func() {
+               time.Sleep(10 * time.Second)
+               fuse.Unmount(mountDir)
+       }()
        content, err := ioutil.ReadFile(filepath.Join(mountDir, "greeting"))
        if err != nil {
                t.Fatal(err)
        }
-       if string(content) != dummyFileContents {
+       if string(content) != testutil.GreetingFileContents {
                t.FailNow()
        }
 }
diff --git a/testutil/testutil.go b/testutil/testutil.go
new file mode 100644 (file)
index 0000000..4dba814
--- /dev/null
@@ -0,0 +1,44 @@
+package testutil
+
+import (
+       "io"
+       "io/ioutil"
+       "os"
+       "path/filepath"
+
+       metainfo "github.com/nsf/libtorgo/torrent"
+
+       "bytes"
+)
+
+const GreetingFileContents = "hello, world\n"
+
+func CreateDummyTorrentData(dirName string) string {
+       f, _ := os.Create(filepath.Join(dirName, "greeting"))
+       f.WriteString("hello, world\n")
+       return f.Name()
+}
+
+func CreateMetaInfo(name string, w io.Writer) {
+       builder := metainfo.Builder{}
+       builder.AddFile(name)
+       builder.AddAnnounceGroup([]string{"lol://cheezburger"})
+       batch, err := builder.Submit()
+       if err != nil {
+               panic(err)
+       }
+       errs, _ := batch.Start(w, 1)
+       <-errs
+}
+
+func GreetingTestTorrent() (tempDir string, metaInfo *metainfo.MetaInfo) {
+       tempDir, err := ioutil.TempDir(os.TempDir(), "")
+       if err != nil {
+               panic(err)
+       }
+       name := CreateDummyTorrentData(tempDir)
+       w := &bytes.Buffer{}
+       CreateMetaInfo(name, w)
+       metaInfo, _ = metainfo.Load(w)
+       return
+}