// Read torrent data at the given offset. Returns ErrDataNotReady if the data
// isn't available.
-func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err error) {
- cl.mu.RLock()
- defer cl.mu.RUnlock()
- t := cl.torrent(ih)
- if t == nil {
- err = errors.New("unknown torrent")
- return
- }
- index := pp.Integer(off / int64(t.UsualPieceSize()))
+func (cl *Client) torrentReadAt(t *torrent, off int64, p []byte) (n int, err error) {
+ cl.mu.Lock()
+ defer cl.mu.Unlock()
+ index := int(off / int64(t.UsualPieceSize()))
// Reading outside the bounds of a file is an error.
if index < 0 {
err = os.ErrInvalid
}
piece := t.Pieces[index]
pieceOff := pp.Integer(off % int64(t.UsualPieceSize()))
- pieceLeft := int(t.PieceLength(index) - pieceOff)
+ pieceLeft := int(t.PieceLength(pp.Integer(index)) - pieceOff)
if pieceLeft <= 0 {
err = io.EOF
return
if len(p) > pieceLeft {
p = p[:pieceLeft]
}
- for cs, _ := range piece.PendingChunkSpecs {
- chunkOff := int64(pieceOff) - int64(cs.Begin)
- if chunkOff >= int64(t.PieceLength(index)) {
- panic(chunkOff)
- }
- if 0 <= chunkOff && chunkOff < int64(cs.Length) {
- // read begins in a pending chunk
- err = ErrDataNotReady
- return
- }
- // pending chunk caps available data
- if chunkOff < 0 && int64(len(p)) > -chunkOff {
- p = p[:-chunkOff]
- }
- }
if len(p) == 0 {
panic(len(p))
}
+ cl.prioritizePiece(t, index, piecePriorityHigh)
+ for i := index + 1; i < index+7 && i < t.NumPieces(); i++ {
+ cl.prioritizePiece(t, i, piecePriorityNormal)
+ }
+ for !piece.Complete() {
+ piece.Event.Wait()
+ }
return t.Data.ReadAt(p, off)
}
return filepath.Join(os.Getenv("HOME"), ".config/torrent")
}
+func (cl *Client) ConfigDir() string {
+ return cl.configDir()
+}
+
+func (t *torrent) connPendPiece(c *connection, piece int) {
+ c.pendPiece(piece, t.Pieces[piece].Priority)
+}
+
+func (cl *Client) prioritizePiece(t *torrent, piece int, priority piecePriority) {
+ if t.havePiece(piece) {
+ return
+ }
+ cl.queueFirstHash(t, piece)
+ t.Pieces[piece].Priority = priority
+ if t.wantPiece(piece) {
+ for _, c := range t.Conns {
+ if c.PeerHasPiece(pp.Integer(piece)) {
+ t.connPendPiece(c, piece)
+ cl.replenishConnRequests(t, c)
+ }
+ }
+ }
+}
+
func (cl *Client) setEnvBlocklist() (err error) {
filename := os.Getenv("TORRENT_BLOCKLIST_FILE")
defaultBlocklist := filename == ""
if !t.wantPiece(i) {
continue
}
- c.pieceRequestOrder.SetPiece(i, c.piecePriorities[i])
+ t.connPendPiece(c, i)
}
}
}
c.PeerPieces[piece] = true
if t.wantPiece(piece) {
- c.pieceRequestOrder.SetPiece(piece, c.piecePriorities[piece])
+ t.connPendPiece(c, piece)
me.replenishConnRequests(t, c)
}
}
return false
default:
}
+ if !me.wantConns(t) {
+ return false
+ }
for _, c0 := range t.Conns {
if c.PeerID == c0.PeerID {
// Already connected to a client with that ID.
return true
}
+func (t *torrent) needData() bool {
+ if !t.haveInfo() {
+ return true
+ }
+ for i := range t.Pieces {
+ if t.wantPiece(i) {
+ return true
+ }
+ }
+ return false
+}
+
+// TODO: I'm sure there's something here to do with seeding.
+func (t *torrent) badConn(c *connection) bool {
+ if time.Now().Sub(c.completedHandshake) < 30*time.Second {
+ return false
+ }
+ if !t.haveInfo() {
+ return !c.supportsExtension("ut_metadata")
+ }
+ return !t.connHasWantedPieces(c)
+}
+
+func (t *torrent) numGoodConns() (num int) {
+ for _, c := range t.Conns {
+ if !t.badConn(c) {
+ num++
+ }
+ }
+ return
+}
+
+func (me *Client) wantConns(t *torrent) bool {
+ if !t.needData() && me.noUpload {
+ return false
+ }
+ if t.numGoodConns() >= socketsPerTorrent {
+ return false
+ }
+ return true
+}
+
func (me *Client) openNewConns(t *torrent) {
select {
case <-t.ceasingNetworking:
return
default:
}
- if t.haveInfo() && !me.downloadStrategy.PendingData(t) {
- return
- }
for len(t.Peers) != 0 {
- if len(t.Conns) >= socketsPerTorrent {
- break
+ if !me.wantConns(t) {
+ return
}
if len(t.HalfOpen)+me.handshaking >= me.halfOpenLimit {
- break
+ return
}
var (
k peersKey
}
func (cl *Client) setMetaData(t *torrent, md metainfo.Info, bytes []byte) (err error) {
- err = t.setMetadata(md, cl.dataDir, bytes)
+ err = t.setMetadata(md, cl.dataDir, bytes, &cl.mu)
if err != nil {
return
}
func (t Torrent) DownloadAll() {
t.cl.mu.Lock()
for i := 0; i < t.NumPieces(); i++ {
- t.cl.queueFirstHash(t.torrent, i)
+ // TODO: Leave higher priorities as they were?
+ t.cl.prioritizePiece(t.torrent, i, piecePriorityNormal)
}
+ t.cl.prioritizePiece(t.torrent, 0, piecePriorityHigh)
+ t.cl.prioritizePiece(t.torrent, t.NumPieces()-1, piecePriorityHigh)
t.cl.mu.Unlock()
}
func (me Torrent) ReadAt(p []byte, off int64) (n int, err error) {
- err = me.cl.PrioritizeDataRegion(me.InfoHash, off, int64(len(p)))
- if err != nil {
- err = fmt.Errorf("error prioritizing: %s", err)
- return
- }
- <-me.cl.DataWaiter(me.InfoHash, off)
- return me.cl.TorrentReadAt(me.InfoHash, off, p)
+ return me.cl.torrentReadAt(me.torrent, off, p)
}
func (cl *Client) torrentCacheMetaInfo(ih InfoHash) (mi *metainfo.MetaInfo, err error) {
go func(tr tracker.Client) {
err := cl.announceTorrentSingleTracker(tr, req, t)
if err != nil {
- log.Printf("error announcing to %s: %s", tr, err)
+ log.Printf("error announcing %q to %s: %s", t, tr, err)
}
oks <- err == nil
}(tr)
}
p.EverHashed = true
if correct {
+ p.Priority = piecePriorityNone
p.PendingChunkSpecs = nil
+ p.Event.Broadcast()
me.downloadStrategy.TorrentGotPiece(t, int(piece))
me.dataReady(t, request{
pp.Integer(piece),
if len(p.PendingChunkSpecs) == 0 {
t.pendAllChunkSpecs(piece)
}
+ if p.Priority != piecePriorityNone {
+ me.openNewConns(t)
+ }
}
t.PieceBytesLeftChanged(int(piece))
for _, conn := range t.Conns {
conn.pieceRequestOrder.RemovePiece(int(piece))
}
if t.wantPiece(int(piece)) && conn.PeerHasPiece(piece) {
- conn.pieceRequestOrder.SetPiece(int(piece), conn.piecePriorities[piece])
+ conn.pendPiece(int(piece), t.Pieces[piece].Priority)
}
}
if t.haveAllPieces() && me.noUpload {
cl.pieceHashed(t, index, sum == p.Hash)
}
-func (me *Client) Torrents() (ret []*torrent) {
+func (me *Client) Torrents() (ret []Torrent) {
me.mu.Lock()
for _, t := range me.torrents {
- ret = append(ret, t)
+ ret = append(ret, Torrent{me, t})
}
me.mu.Unlock()
return
path []string
metadata *torrent.MetaInfo
FS *TorrentFS
- InfoHash torrent.InfoHash
+ t torrent.Torrent
}
type fileNode struct {
return "/" + strings.Join(append([]string{n.metadata.Name}, n.path...), "/")
}
-func blockingRead(fs *TorrentFS, ih torrent.InfoHash, off int64, p []byte, intr fusefs.Intr) (n int, err fuse.Error) {
- dataWaiter := fs.Client.DataWaiter(ih, off)
+func blockingRead(fs *TorrentFS, t torrent.Torrent, off int64, p []byte, intr fusefs.Intr) (n int, err fuse.Error) {
+ var (
+ _n int
+ _err fuse.Error
+ )
+ readDone := make(chan struct{})
+ go func() {
+ _n, _err = t.ReadAt(p, off)
+ close(readDone)
+ }()
select {
- case <-dataWaiter:
+ case <-readDone:
+ n = _n
+ err = _err
case <-fs.destroyed:
err = fuse.EIO
- return
case <-intr:
err = fuse.EINTR
- return
}
- n, err = fs.Client.TorrentReadAt(ih, off, p)
return
}
-func readFull(fs *TorrentFS, ih torrent.InfoHash, off int64, p []byte, intr fusefs.Intr) (n int, err fuse.Error) {
+func readFull(fs *TorrentFS, t torrent.Torrent, off int64, p []byte, intr fusefs.Intr) (n int, err fuse.Error) {
for len(p) != 0 {
var nn int
- nn, err = blockingRead(fs, ih, off, p, intr)
+ nn, err = blockingRead(fs, t, off, p, intr)
if err != nil {
break
}
if len(resp.Data) == 0 {
return nil
}
- infoHash := fn.InfoHash
torrentOff := fn.TorrentOffset + req.Offset
- go func() {
- if err := fn.FS.Client.PrioritizeDataRegion(infoHash, torrentOff, int64(size)); err != nil {
- log.Printf("error prioritizing %s: %s", fn.fsPath(), err)
- }
- }()
- n, err := readFull(fn.FS, infoHash, torrentOff, resp.Data, intr)
+ n, err := readFull(fn.FS, fn.t, torrentOff, resp.Data, intr)
if err != nil {
return err
}
__node := node{
metadata: t.Info,
FS: me.fs,
- InfoHash: t.InfoHash,
+ t: t,
}
if t.Info.SingleFile() {
_node = fileNode{__node, uint64(t.Info.Length), 0}