"net"
"os"
"path/filepath"
+ "sort"
)
const (
PendingChunkSpecs map[ChunkSpec]struct{}
}
+func (p piece) NumPendingBytes() (count peer_protocol.Integer) {
+ for cs, _ := range p.PendingChunkSpecs {
+ count += cs.Length
+ }
+ return
+}
+
type ChunkSpec struct {
Begin, Length peer_protocol.Integer
}
Priorities *list.List
}
+type pieceByBytesPendingSlice struct {
+ Torrent *Torrent
+ Indices []peer_protocol.Integer
+}
+
+func (pcs pieceByBytesPendingSlice) Len() int {
+ return len(pcs.Indices)
+}
+
+func (me pieceByBytesPendingSlice) Less(i, j int) bool {
+ return me.Torrent.Pieces[i].NumPendingBytes() < me.Torrent.Pieces[j].NumPendingBytes()
+}
+
+func (me pieceByBytesPendingSlice) Swap(i, j int) {
+ me.Indices[i], me.Indices[j] = me.Indices[j], me.Indices[i]
+}
+
+func (t *Torrent) piecesByPendingBytesDesc() (indices []peer_protocol.Integer) {
+ slice := pieceByBytesPendingSlice{
+ Torrent: t,
+ Indices: make([]peer_protocol.Integer, 0, len(t.Pieces)),
+ }
+ for i := range t.Pieces {
+ slice.Indices = append(slice.Indices, peer_protocol.Integer(i))
+ }
+ sort.Sort(sort.Reverse(slice))
+ return slice.Indices
+}
+
func (t *Torrent) PrioritizeDataRegion(off, len_ int64) {
newPriorities := make([]Request, 0, (len_+2*(chunkSize-1))/chunkSize)
for len_ > 0 {
n, err = t.Data.ReadAt(p, off)
})
<-done
- log.Println(n, err)
return
}
return
}
log.Printf("connected to %s", conn.RemoteAddr())
- me.handshake(conn, torrent, peer.Id)
+ err = me.handshake(conn, torrent, peer.Id)
+ if err != nil {
+ log.Print(err)
+ }
}()
}
return false
}
-func (me *Client) handshake(sock net.Conn, torrent *Torrent, peerId [20]byte) {
+func (me *Client) handshake(sock net.Conn, torrent *Torrent, peerId [20]byte) (err error) {
conn := &connection{
Socket: sock,
Choked: true,
conn.post <- peer_protocol.Bytes(me.PeerId[:])
}
var b [28]byte
- _, err := io.ReadFull(conn.Socket, b[:])
+ _, err = io.ReadFull(conn.Socket, b[:])
switch err {
case nil:
case io.EOF:
me.dropConnection(torrent, conn)
})
err = me.runConnection(torrent, conn)
- if err != nil {
- log.Print(err)
- }
+ return
}
func (me *Client) peerGotPiece(torrent *Torrent, conn *connection, piece int) {
switch msg.Type {
case peer_protocol.Choke:
conn.PeerChoked = true
+ conn.Requests = nil
case peer_protocol.Unchoke:
conn.PeerChoked = false
me.peerUnchoked(torrent, conn)
func (me *Client) replenishConnRequests(torrent *Torrent, conn *connection) {
requestHeatMap := torrent.requestHeat()
- if torrent.Priorities == nil {
- return
- }
- for e := torrent.Priorities.Front(); e != nil; e = e.Next() {
- req := e.Value.(Request)
+ addRequest := func(req Request) (again bool) {
if !conn.PeerPieces[req.Index] {
- continue
+ return true
}
- switch torrent.Pieces[req.Index].State {
- case pieceStateUnknown:
- continue
- case pieceStateIncomplete:
- default:
- panic("prioritized chunk for invalid piece state")
+ if torrent.Pieces[req.Index].State != pieceStateIncomplete {
+ return true
}
if requestHeatMap[req] > 0 {
+ return true
+ }
+ return conn.Request(req)
+ }
+ if torrent.Priorities != nil {
+ for e := torrent.Priorities.Front(); e != nil; e = e.Next() {
+ if !addRequest(e.Value.(Request)) {
+ return
+ }
+ }
+ }
+ for _, index := range torrent.piecesByPendingBytesDesc() {
+ if torrent.Pieces[index].NumPendingBytes() == torrent.PieceLength(index) {
continue
}
- if !conn.Request(req) {
- break
+ for chunkSpec := range torrent.Pieces[index].PendingChunkSpecs {
+ if !addRequest(Request{index, chunkSpec}) {
+ return
+ }
}
}
}
log.Printf("got unnecessary chunk: %s", request)
return
}
+ log.Printf("got chunk %s", request)
err = torrent.WriteChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
if err != nil {
return
ChunkSpec{0, peer_protocol.Integer(torrent.PieceLength(piece))},
},
})
+ torrent.Pieces[piece].PendingChunkSpecs = nil
}
for _, conn := range torrent.Conns {
if correct {