}
type Client struct {
- DataDir string
- HalfOpenLimit int
- PeerId [20]byte
- Listener net.Listener
- DisableTrackers bool
+ DataDir string
+ HalfOpenLimit int
+ PeerId [20]byte
+ Listener net.Listener
+ DisableTrackers bool
+ DownloadStrategy DownloadStrategy
mu sync.Mutex
event sync.Cond
panic("error generating peer id")
}
c.quit = make(chan struct{})
+ if c.DownloadStrategy == nil {
+ c.DownloadStrategy = DefaultDownloadStrategy{}
+ }
if c.Listener != nil {
go c.acceptConnections()
}
return true
}
-func (me *Client) replenishConnRequests(torrent *torrent, conn *connection) {
+type DownloadStrategy interface {
+ FillRequests(t *torrent, c *connection)
+}
+
+type DefaultDownloadStrategy struct{}
+
+func (DefaultDownloadStrategy) FillRequests(torrent *torrent, conn *connection) {
requestHeatMap := torrent.requestHeat()
addRequest := func(req request) (again bool) {
piece := torrent.Pieces[req.Index]
}
}
// Then finish off incomplete pieces in order of bytes remaining.
- for _, index := range torrent.piecesByPendingBytesDesc() {
- if torrent.PieceNumPendingBytes(index) == torrent.PieceLength(index) {
- continue
- }
+ for _, index := range torrent.piecesByPendingBytes() {
for chunkSpec := range torrent.Pieces[index].PendingChunkSpecs {
if !addRequest(request{index, chunkSpec}) {
return
}
}
}
- if len(conn.Requests) == 0 {
- conn.SetInterested(false)
+}
+
+type ResponsiveDownloadStrategy struct{}
+
+func (ResponsiveDownloadStrategy) FillRequests(t *torrent, c *connection) {
+ for e := t.Priorities.Front(); e != nil; e = e.Next() {
+ if !c.Request(e.Value.(request)) {
+ break
+ }
}
}
-func (me *Client) downloadedChunk(torrent *torrent, msg *peer_protocol.Message) (err error) {
- req := request{msg.Index, chunkSpec{msg.Begin, peer_protocol.Integer(len(msg.Piece))}}
- if _, ok := torrent.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok {
- log.Printf("got unnecessary chunk: %s", req)
- return
+func (me *Client) replenishConnRequests(t *torrent, c *connection) {
+ me.DownloadStrategy.FillRequests(t, c)
+ if len(c.Requests) == 0 {
+ c.SetInterested(false)
}
- err = torrent.WriteChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
+}
+
+func (me *Client) downloadedChunk(t *torrent, c *connection, msg *peer_protocol.Message) error {
+ req := newRequest(msg.Index, msg.Begin, peer_protocol.Integer(len(msg.Piece)))
+
+ // Request has been satisfied.
+ delete(c.Requests, req)
+
+ defer me.replenishConnRequests(t, c)
+
+ // Do we actually want this chunk?
+ if _, ok := t.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok {
+ return nil
+ }
+
+ // Write the chunk out.
+ err := t.WriteChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
if err != nil {
- return
+ return err
+ }
+ me.dataReady(dataSpec{t.InfoHash, req})
+
+ // Cancel pending requests for this chunk.
+ for _, c := range t.Conns {
+ if c.Cancel(req) {
+ me.replenishConnRequests(t, c)
+ }
}
- delete(torrent.Pieces[req.Index].PendingChunkSpecs, req.chunkSpec)
- if len(torrent.Pieces[req.Index].PendingChunkSpecs) == 0 {
- me.queuePieceCheck(torrent, req.Index)
+
+ // Record that we have the chunk.
+ delete(t.Pieces[req.Index].PendingChunkSpecs, req.chunkSpec)
+ if len(t.Pieces[req.Index].PendingChunkSpecs) == 0 {
+ me.queuePieceCheck(t, req.Index)
}
+
+ // Unprioritize the chunk.
var next *list.Element
- for e := torrent.Priorities.Front(); e != nil; e = next {
+ for e := t.Priorities.Front(); e != nil; e = next {
next = e.Next()
if e.Value.(request) == req {
- torrent.Priorities.Remove(e)
+ t.Priorities.Remove(e)
}
}
- me.dataReady(dataSpec{torrent.InfoHash, req})
- return
+
+ return nil
}
func (cl *Client) dataReady(ds dataSpec) {