package torrent
import (
- "bitbucket.org/anacrolix/go.torrent/peer_protocol"
- "bitbucket.org/anacrolix/go.torrent/tracker"
- _ "bitbucket.org/anacrolix/go.torrent/tracker/udp"
"bufio"
"container/list"
"crypto"
"encoding"
"errors"
"fmt"
- metainfo "github.com/nsf/libtorgo/torrent"
"io"
- "launchpad.net/gommap"
"log"
mathRand "math/rand"
"net"
"sort"
"sync"
"time"
+
+ metainfo "github.com/nsf/libtorgo/torrent"
+
+ "bitbucket.org/anacrolix/go.torrent/peer_protocol"
+ "bitbucket.org/anacrolix/go.torrent/tracker"
+ _ "bitbucket.org/anacrolix/go.torrent/tracker/udp"
+ "launchpad.net/gommap"
)
const (
Trackers [][]tracker.Client
}
+func (t *Torrent) NumPieces() int {
+ return len(t.MetaInfo.Pieces) / PieceHash.Size()
+}
+
func (t *Torrent) Length() int64 {
return int64(t.PieceLength(peer_protocol.Integer(len(t.Pieces)-1))) + int64(len(t.Pieces)-1)*int64(t.PieceLength(0))
}
go cl.verifyPiece(t, pieceIndex)
}
+func (t *Torrent) offsetRequest(off int64) (req Request, ok bool) {
+ req.Index = peer_protocol.Integer(off / t.MetaInfo.PieceLength)
+ if req.Index < 0 || int(req.Index) >= len(t.Pieces) {
+ return
+ }
+ off %= t.MetaInfo.PieceLength
+ pieceLeft := t.PieceLength(req.Index) - peer_protocol.Integer(off)
+ if pieceLeft <= 0 {
+ return
+ }
+ req.Begin = chunkSize * (peer_protocol.Integer(off) / chunkSize)
+ req.Length = chunkSize
+ if req.Length > pieceLeft {
+ req.Length = pieceLeft
+ }
+ ok = true
+ return
+}
+
func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) {
cl.mu.Lock()
defer cl.mu.Unlock()
newPriorities := make([]Request, 0, (len_+2*(chunkSize-1))/chunkSize)
for len_ > 0 {
// TODO: Write a function to return the Request for a given offset.
- index := peer_protocol.Integer(off / t.MetaInfo.PieceLength)
- pieceOff := peer_protocol.Integer(off % t.MetaInfo.PieceLength)
- piece := t.Pieces[index]
- if !piece.EverHashed {
- cl.queuePieceCheck(t, index)
- }
- chunk := ChunkSpec{pieceOff / chunkSize * chunkSize, chunkSize}
- if chunk.Begin+chunk.Length > t.PieceLength(index) {
- chunk.Length = t.PieceLength(index) - chunk.Begin
+ req, ok := t.offsetRequest(off)
+ if !ok {
+ break
}
- adv := int64(chunk.Length - pieceOff%chunkSize)
- off += adv
- len_ -= adv
- if _, ok := piece.PendingChunkSpecs[chunk]; !ok && !piece.Hashing {
+ off += int64(req.Length)
+ len_ -= int64(req.Length)
+ if _, ok = t.Pieces[req.Index].PendingChunkSpecs[req.ChunkSpec]; !ok {
continue
}
- newPriorities = append(newPriorities, Request{index, chunk})
+ newPriorities = append(newPriorities, req)
}
if len(newPriorities) == 0 {
return
return
}
-func (t *Torrent) pieceChunkSpecs(index peer_protocol.Integer) (cs map[ChunkSpec]struct{}) {
- cs = make(map[ChunkSpec]struct{}, (t.MetaInfo.PieceLength+chunkSize-1)/chunkSize)
+func (t *Torrent) pendAllChunkSpecs(index peer_protocol.Integer) {
+ piece := t.Pieces[index]
+ if piece.PendingChunkSpecs == nil {
+ piece.PendingChunkSpecs = make(
+ map[ChunkSpec]struct{},
+ (t.MetaInfo.PieceLength+chunkSize-1)/chunkSize)
+ }
c := ChunkSpec{
Begin: 0,
}
- for left := peer_protocol.Integer(t.PieceLength(index)); left > 0; left -= c.Length {
+ cs := piece.PendingChunkSpecs
+ log.Print(index, t.PieceLength(index))
+ for left := peer_protocol.Integer(t.PieceLength(index)); left != 0; left -= c.Length {
c.Length = left
if c.Length > chunkSize {
c.Length = chunkSize
}
func (t *Torrent) PieceLength(piece peer_protocol.Integer) (len_ peer_protocol.Integer) {
- if int(piece) == len(t.Pieces)-1 {
+ if int(piece) == t.NumPieces()-1 {
len_ = peer_protocol.Integer(t.Data.Size() % t.MetaInfo.PieceLength)
}
if len_ == 0 {
}
}
+func (cl *Client) stopped() bool {
+ select {
+ case <-cl.quit:
+ return true
+ default:
+ return false
+ }
+}
+
func (me *Client) Stop() {
close(me.quit)
me.event.Broadcast()
}
for {
me.mu.Unlock()
+ // TODO: Can this be allocated on the stack?
msg := new(peer_protocol.Message)
err := decoder.Decode(msg)
me.mu.Lock()
if err != nil {
+ if me.stopped() {
+ return nil
+ }
return err
}
if msg.Keepalive {
InfoHash: BytesInfoHash(metaInfo.InfoHash),
MetaInfo: metaInfo,
}
+ torrent.Data, err = mmapTorrentData(metaInfo, dataDir)
+ if err != nil {
+ return
+ }
for offset := 0; offset < len(metaInfo.Pieces); offset += PieceHash.Size() {
hash := metaInfo.Pieces[offset : offset+PieceHash.Size()]
if len(hash) != PieceHash.Size() {
piece := &piece{}
copyHashSum(piece.Hash[:], hash)
torrent.Pieces = append(torrent.Pieces, piece)
- }
- torrent.Data, err = mmapTorrentData(metaInfo, dataDir)
- if err != nil {
- return
+ torrent.pendAllChunkSpecs(peer_protocol.Integer(len(torrent.Pieces) - 1))
}
torrent.Trackers = make([][]tracker.Client, len(metaInfo.AnnounceList))
for tierIndex := range metaInfo.AnnounceList {
}
}
}
- conn.SetInterested(false)
+ if len(conn.Requests) == 0 {
+ conn.SetInterested(false)
+ }
}
func (me *Client) downloadedChunk(torrent *Torrent, msg *peer_protocol.Message) (err error) {
} else {
log.Print("piece failed hash")
if len(p.PendingChunkSpecs) == 0 {
- p.PendingChunkSpecs = t.pieceChunkSpecs(piece)
+ t.pendAllChunkSpecs(piece)
}
}
for _, conn := range t.Conns {
package torrentfs
import (
- "bazil.org/fuse"
- fusefs "bazil.org/fuse/fs"
- "bitbucket.org/anacrolix/go.torrent"
"bytes"
- metainfo "github.com/nsf/libtorgo/torrent"
- "io"
"io/ioutil"
"net"
"os"
"path/filepath"
- "runtime"
"testing"
+ "time"
+
+ "bitbucket.org/anacrolix/go.torrent/testutil"
+
+ "bazil.org/fuse"
+ fusefs "bazil.org/fuse/fs"
+ "bitbucket.org/anacrolix/go.torrent"
+ metainfo "github.com/nsf/libtorgo/torrent"
)
func TestTCPAddrString(t *testing.T) {
}
}
-const dummyFileContents = "hello, world\n"
-
-func createDummyTorrentData(dirName string) string {
- f, _ := os.Create(filepath.Join(dirName, "greeting"))
- f.WriteString("hello, world\n")
- return f.Name()
-}
-
-func createMetaInfo(name string, w io.Writer) {
- builder := metainfo.Builder{}
- builder.AddFile(name)
- builder.AddAnnounceGroup([]string{"lol://cheezburger"})
- batch, err := builder.Submit()
- if err != nil {
- panic(err)
- }
- errs, _ := batch.Start(w, 1)
- <-errs
-}
-
func TestDownloadOnDemand(t *testing.T) {
- priorNumGoroutines := runtime.NumGoroutine()
- defer func() {
- n := runtime.NumGoroutine()
- if n != priorNumGoroutines {
- t.Fatalf("expected %d goroutines, but %d are running", priorNumGoroutines, n)
- }
- }()
dir, err := ioutil.TempDir("", "torrentfs")
if err != nil {
t.Fatal(err)
t.Logf("test directory: %s", dir)
finishedDir := filepath.Join(dir, "finished")
os.Mkdir(finishedDir, 0777)
- name := createDummyTorrentData(finishedDir)
+ name := testutil.CreateDummyTorrentData(finishedDir)
metaInfoBuf := &bytes.Buffer{}
- createMetaInfo(name, metaInfoBuf)
+ testutil.CreateMetaInfo(name, metaInfoBuf)
metaInfo, err := metainfo.Load(metaInfoBuf)
seeder := torrent.Client{
DataDir: finishedDir,
if fuseConn.MountError != nil {
t.Fatal(fuseConn.MountError)
}
+ go func() {
+ time.Sleep(10 * time.Second)
+ fuse.Unmount(mountDir)
+ }()
content, err := ioutil.ReadFile(filepath.Join(mountDir, "greeting"))
if err != nil {
t.Fatal(err)
}
- if string(content) != dummyFileContents {
+ if string(content) != testutil.GreetingFileContents {
t.FailNow()
}
}
--- /dev/null
+package testutil
+
+import (
+ "io"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+
+ metainfo "github.com/nsf/libtorgo/torrent"
+
+ "bytes"
+)
+
+const GreetingFileContents = "hello, world\n"
+
+func CreateDummyTorrentData(dirName string) string {
+ f, _ := os.Create(filepath.Join(dirName, "greeting"))
+ f.WriteString("hello, world\n")
+ return f.Name()
+}
+
+func CreateMetaInfo(name string, w io.Writer) {
+ builder := metainfo.Builder{}
+ builder.AddFile(name)
+ builder.AddAnnounceGroup([]string{"lol://cheezburger"})
+ batch, err := builder.Submit()
+ if err != nil {
+ panic(err)
+ }
+ errs, _ := batch.Start(w, 1)
+ <-errs
+}
+
+func GreetingTestTorrent() (tempDir string, metaInfo *metainfo.MetaInfo) {
+ tempDir, err := ioutil.TempDir(os.TempDir(), "")
+ if err != nil {
+ panic(err)
+ }
+ name := CreateDummyTorrentData(tempDir)
+ w := &bytes.Buffer{}
+ CreateMetaInfo(name, w)
+ metaInfo, _ = metainfo.Load(w)
+ return
+}