Hash pieceSum
PendingChunkSpecs map[ChunkSpec]struct{}
Hashing bool
+ QueuedForHash bool
EverHashed bool
}
func (p *piece) Complete() bool {
- return len(p.PendingChunkSpecs) == 0 && !p.Hashing && p.EverHashed
+ return len(p.PendingChunkSpecs) == 0 && p.EverHashed
}
func lastChunkSpec(pieceLength peer_protocol.Integer) (cs ChunkSpec) {
c.post <- msg
}
+// Returns true if more requests can be sent.
func (c *Connection) Request(chunk Request) bool {
+ if !c.PeerPieces[chunk.Index] {
+ panic("peer doesn't have that piece!")
+ }
if len(c.Requests) >= maxRequests {
return false
}
// Currently doesn't really queue, but should in the future.
func (cl *Client) queuePieceCheck(t *Torrent, pieceIndex peer_protocol.Integer) {
piece := t.Pieces[pieceIndex]
- if piece.Hashing {
+ if piece.QueuedForHash {
return
}
- piece.Hashing = true
+ piece.QueuedForHash = true
go cl.verifyPiece(t, pieceIndex)
}
func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) {
- log.Print(len_)
cl.mu.Lock()
defer cl.mu.Unlock()
t := cl.torrent(ih)
newPriorities := make([]Request, 0, (len_+2*(chunkSize-1))/chunkSize)
- for len_ != 0 {
+ for len_ > 0 {
// TODO: Write a function to return the Request for a given offset.
index := peer_protocol.Integer(off / t.MetaInfo.PieceLength)
pieceOff := peer_protocol.Integer(off % t.MetaInfo.PieceLength)
cl.queuePieceCheck(t, index)
}
chunk := ChunkSpec{pieceOff / chunkSize * chunkSize, chunkSize}
- if int64(chunk.Length) > len_ {
- chunk.Length = peer_protocol.Integer(len_)
+ if chunk.Begin+chunk.Length > t.PieceLength(index) {
+ chunk.Length = t.PieceLength(index) - chunk.Begin
}
adv := int64(chunk.Length - pieceOff%chunkSize)
off += adv
}
newPriorities = append(newPriorities, Request{index, chunk})
}
- if len(newPriorities) < 1 {
+ if len(newPriorities) == 0 {
return
}
log.Print(newPriorities)
func (t *Torrent) bitfield() (bf []bool) {
for _, p := range t.Pieces {
- bf = append(bf, p.EverHashed && !p.Hashing && len(p.PendingChunkSpecs) == 0)
+ bf = append(bf, p.EverHashed && len(p.PendingChunkSpecs) == 0)
}
return
}
}
type Client struct {
- DataDir string
- HalfOpenLimit int
- PeerId [20]byte
- DataReady chan DataSpec
- Listener net.Listener
+ DataDir string
+ HalfOpenLimit int
+ PeerId [20]byte
+ Listener net.Listener
+ DisableTrackers bool
sync.Mutex
mu *sync.Mutex
event sync.Cond
quit chan struct{}
- halfOpen int
- torrents map[InfoHash]*Torrent
+ halfOpen int
+ torrents map[InfoHash]*Torrent
+ dataWaiter chan struct{}
}
var (
func (t *Torrent) wantPiece(index int) bool {
p := t.Pieces[index]
- return p.EverHashed && !p.Hashing && len(p.PendingChunkSpecs) != 0
+ return p.EverHashed && len(p.PendingChunkSpecs) != 0
}
func (me *Client) peerUnchoked(torrent *Torrent, conn *Connection) {
if err != nil {
return err
}
- log.Print(msg.Type)
if msg.Keepalive {
continue
}
if err != nil {
return err
}
+ log.Print("replenishing from loop")
me.replenishConnRequests(torrent, conn)
}
}
return torrent.Close()
}
me.torrents[torrent.InfoHash] = torrent
- go me.announceTorrent(torrent)
+ if !me.DisableTrackers {
+ go me.announceTorrent(torrent)
+ }
for i := range torrent.Pieces {
me.queuePieceCheck(torrent, peer_protocol.Integer(i))
}
addRequest := func(req Request) (again bool) {
piece := torrent.Pieces[req.Index]
if piece.Hashing {
+ // We can't be sure we want this.
+ log.Print("piece is hashing")
return true
}
if piece.Complete() {
+ log.Print("piece is complete")
+ // We already have this.
return true
}
if requestHeatMap[req] > 0 {
+ log.Print("piece is hot")
+ // We've already requested this.
return true
}
return conn.Request(req)
}
+ // First request prioritized chunks.
if torrent.Priorities != nil {
for e := torrent.Priorities.Front(); e != nil; e = e.Next() {
+ log.Print(e.Value.(Request))
if !addRequest(e.Value.(Request)) {
return
}
}
}
+ // Then finish of incomplete pieces in order of bytes remaining.
for _, index := range torrent.piecesByPendingBytesDesc() {
if torrent.PieceNumPendingBytes(index) == torrent.PieceLength(index) {
continue
}
func (cl *Client) dataReady(ds DataSpec) {
- if cl.DataReady == nil {
- return
+ if cl.dataWaiter != nil {
+ close(cl.dataWaiter)
}
- go func() {
- cl.DataReady <- ds
- }()
+ cl.dataWaiter = nil
+}
+
+func (cl *Client) DataWaiter() <-chan struct{} {
+ cl.Lock()
+ defer cl.Unlock()
+ if cl.dataWaiter == nil {
+ cl.dataWaiter = make(chan struct{})
+ }
+ return cl.dataWaiter
}
func (me *Client) pieceHashed(t *Torrent, piece peer_protocol.Integer, correct bool) {
p := t.Pieces[piece]
- if !p.Hashing {
- panic("invalid state")
- }
- p.Hashing = false
p.EverHashed = true
if correct {
+ log.Print("piece passed hash")
p.PendingChunkSpecs = nil
var next *list.Element
if t.Priorities != nil {
},
})
} else {
+ log.Print("piece failed hash")
if len(p.PendingChunkSpecs) == 0 {
p.PendingChunkSpecs = t.pieceChunkSpecs(piece)
}
}
func (cl *Client) verifyPiece(t *Torrent, index peer_protocol.Integer) {
+ cl.mu.Lock()
+ p := t.Pieces[index]
+ for p.Hashing {
+ cl.event.Wait()
+ }
+ p.Hashing = true
+ p.QueuedForHash = false
+ cl.mu.Unlock()
sum := t.HashPiece(index)
cl.mu.Lock()
- piece := t.Pieces[index]
- cl.pieceHashed(t, index, sum == piece.Hash)
- piece.Hashing = false
+ p.Hashing = false
+ cl.pieceHashed(t, index, sum == p.Hash)
cl.mu.Unlock()
}
"bazil.org/fuse"
fusefs "bazil.org/fuse/fs"
"bitbucket.org/anacrolix/go.torrent"
+ "bitbucket.org/anacrolix/go.torrent/fs"
"flag"
metainfo "github.com/nsf/libtorgo/torrent"
"log"
"net/http"
_ "net/http/pprof"
"os"
+ "os/signal"
"os/user"
"path/filepath"
- "sync"
+ "syscall"
"time"
)
var (
- downloadDir string
- torrentPath string
- mountDir string
+ downloadDir string
+ torrentPath string
+ mountDir string
+ disableTrackers = flag.Bool("disableTrackers", false, "disables trackers")
+ testPeer = flag.String("testPeer", "", "the address for a test peer")
+ pprofAddr = flag.String("pprofAddr", "", "pprof HTTP server bind address")
+ testPeerAddr *net.TCPAddr
)
func init() {
flag.StringVar(&mountDir, "mountDir", "", "location the torrent contents are made available")
}
+func resolveTestPeerAddr() {
+ if *testPeer == "" {
+ return
+ }
+ var err error
+ testPeerAddr, err = net.ResolveTCPAddr("tcp4", *testPeer)
+ if err != nil {
+ log.Fatal(err)
+ }
+}
+
+func setSignalHandlers() {
+ c := make(chan os.Signal)
+ signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
+ go func() {
+ <-c
+ fuse.Unmount(mountDir)
+ }()
+}
+
func main() {
- pprofAddr := flag.String("pprofAddr", "", "pprof HTTP server bind address")
- testPeer := flag.String("testPeer", "", "the address for a test peer")
flag.Parse()
+ if flag.NArg() != 0 {
+ os.Stderr.WriteString("one does not simply pass positional args\n")
+ os.Exit(2)
+ }
+ if mountDir == "" {
+ os.Stderr.WriteString("y u no specify mountpoint?\n")
+ os.Exit(2)
+ }
log.SetFlags(log.LstdFlags | log.Lshortfile)
if *pprofAddr != "" {
go http.ListenAndServe(*pprofAddr, nil)
}
+ conn, err := fuse.Mount(mountDir)
+ if err != nil {
+ log.Fatal(err)
+ }
+ defer fuse.Unmount(mountDir)
+ // TODO: Think about the ramifications of exiting not due to a signal.
+ setSignalHandlers()
+ defer conn.Close()
client := &torrent.Client{
- DataDir: downloadDir,
- HalfOpenLimit: 2,
+ DataDir: downloadDir,
+ DisableTrackers: *disableTrackers,
}
client.Start()
torrentDir, err := os.Open(torrentPath)
if err != nil {
log.Fatal(err)
}
- var testAddr *net.TCPAddr
- if *testPeer != "" {
- testAddr, err = net.ResolveTCPAddr("tcp4", *testPeer)
- if err != nil {
- log.Fatal(err)
- }
- }
+ resolveTestPeerAddr()
for _, name := range names {
metaInfo, err := metainfo.LoadFromFile(filepath.Join(torrentPath, name))
if err != nil {
log.Print(err)
}
}
- conn, err := fuse.Mount(mountDir)
- if err != nil {
- log.Fatal(err)
- }
- fs := &TorrentFS{
- Client: client,
- DataSubs: make(map[chan torrent.DataSpec]struct{}),
- }
- go fs.publishData()
+ fs := torrentfs.New(client)
go func() {
for {
torrentLoop:
for _, t := range client.Torrents() {
client.Lock()
for _, c := range t.Conns {
- if c.Socket.RemoteAddr().String() == testAddr.String() {
+ if c.Socket.RemoteAddr().String() == testPeerAddr.String() {
client.Unlock()
continue torrentLoop
}
}
client.Unlock()
- if testAddr != nil {
+ if testPeerAddr != nil {
if err := client.AddPeers(t.InfoHash, []torrent.Peer{{
- IP: testAddr.IP,
- Port: testAddr.Port,
+ IP: testPeerAddr.IP,
+ Port: testPeerAddr.Port,
}}); err != nil {
log.Print(err)
}
time.Sleep(10 * time.Second)
}
}()
- fusefs.Serve(conn, fs)
+ if err := fusefs.Serve(conn, fs); err != nil {
+ log.Fatal(err)
+ }
}
fusefs "bazil.org/fuse/fs"
"bitbucket.org/anacrolix/go.torrent"
metainfo "github.com/nsf/libtorgo/torrent"
+ "log"
"os"
"sync"
)
sync.Mutex
}
-func (tfs *torrentFS) publishData() {
- for {
- spec := <-tfs.Client.DataReady
- tfs.Lock()
- for ds := range tfs.DataSubs {
- ds <- spec
- }
- tfs.Unlock()
- }
-}
-
-func (tfs *torrentFS) SubscribeData() chan torrent.DataSpec {
- ch := make(chan torrent.DataSpec)
- tfs.Lock()
- tfs.DataSubs[ch] = struct{}{}
- tfs.Unlock()
- return ch
-}
-
-func (tfs *torrentFS) UnsubscribeData(ch chan torrent.DataSpec) {
- go func() {
- for _ = range ch {
- }
- }()
- tfs.Lock()
- delete(tfs.DataSubs, ch)
- tfs.Unlock()
- close(ch)
-}
-
var _ fusefs.NodeForgetter = rootNode{}
type rootNode struct {
if req.Dir {
panic("hodor")
}
- dataSpecs := fn.FS.SubscribeData()
- defer fn.FS.UnsubscribeData(dataSpecs)
data := make([]byte, func() int {
_len := int64(fn.size) - req.Offset
if int64(req.Size) < _len {
}
infoHash := torrent.BytesInfoHash(fn.metaInfo.InfoHash)
torrentOff := fn.TorrentOffset + req.Offset
+ log.Print(torrentOff, len(data), fn.TorrentOffset)
fn.FS.Client.PrioritizeDataRegion(infoHash, torrentOff, int64(len(data)))
for {
+ dataWaiter := fn.FS.Client.DataWaiter()
n, err := fn.FS.Client.TorrentReadAt(infoHash, torrentOff, data)
switch err {
case nil:
return nil
case torrent.ErrDataNotReady:
select {
- case <-dataSpecs:
+ case <-dataWaiter:
case <-intr:
return fuse.EINTR
}
default:
+ log.Print(err)
return fuse.EIO
}
}
Client: cl,
DataSubs: make(map[chan torrent.DataSpec]struct{}),
}
- go fs.publishData()
return fs
}