PeerPieces []bool
}
-func (c *connection) PeerHasPiece(index int) bool {
+func (c *connection) Close() {
+ c.Socket.Close()
+ close(c.post)
+}
+
+func (c *connection) PeerHasPiece(index peer_protocol.Integer) bool {
if c.PeerPieces == nil {
return false
}
if len(c.Requests) >= maxRequests {
return false
}
+ c.SetInterested(true)
+ if c.PeerChoked {
+ return false
+ }
if _, ok := c.Requests[chunk]; !ok {
c.Post(peer_protocol.Message{
Type: peer_protocol.Request,
func (conn *connection) writer() {
for {
b := <-conn.write
+ if b == nil {
+ break
+ }
n, err := conn.Socket.Write(b)
if err != nil {
log.Print(err)
- close(conn.write)
break
}
if n != len(b) {
func (conn *connection) writeOptimizer() {
pending := list.New()
var nextWrite []byte
+ defer close(conn.write)
for {
write := conn.write
if pending.Len() == 0 {
}
}
select {
- case msg := <-conn.post:
+ case msg, ok := <-conn.post:
+ if !ok {
+ return
+ }
pending.PushBack(msg)
case write <- nextWrite:
pending.Remove(pending.Front())
}
type Torrent struct {
- InfoHash InfoHash
- Pieces []piece
- Data MMapSpan
- MetaInfo *metainfo.MetaInfo
- Conns []*connection
- Peers []Peer
+ InfoHash InfoHash
+ Pieces []piece
+ Data MMapSpan
+ MetaInfo *metainfo.MetaInfo
+ Conns []*connection
+ Peers []Peer
+ Priorities *list.List
+}
+
+func (t *Torrent) PrioritizeDataRegion(off, len_ int64) {
+ newPriorities := make([]Request, 0, (len_+2*(chunkSize-1))/chunkSize)
+ for len_ > 0 {
+ index := peer_protocol.Integer(off / int64(t.PieceLength(0)))
+ pieceOff := peer_protocol.Integer(off % int64(t.PieceLength(0)))
+ piece := t.Pieces[index]
+ if piece.State == pieceStateComplete {
+ adv := int64(t.PieceLength(index) - pieceOff)
+ off += adv
+ len_ -= adv
+ continue
+ }
+ chunk := ChunkSpec{pieceOff / chunkSize * chunkSize, chunkSize}
+ adv := int64(chunkSize - pieceOff%chunkSize)
+ off += adv
+ len_ -= adv
+ switch piece.State {
+ case pieceStateIncomplete:
+ if _, ok := piece.PendingChunkSpecs[chunk]; !ok {
+ continue
+ }
+ case pieceStateUnknown:
+ default:
+ panic("unexpected piece state")
+ }
+ newPriorities = append(newPriorities, Request{index, chunk})
+ }
+ if len(newPriorities) < 1 {
+ return
+ }
+ log.Print(newPriorities)
+ if t.Priorities == nil {
+ t.Priorities = list.New()
+ }
+ t.Priorities.PushFront(newPriorities[0])
+ for _, req := range newPriorities[1:] {
+ t.Priorities.PushBack(req)
+ }
}
func (t *Torrent) WriteChunk(piece int, begin int64, data []byte) (err error) {
return
}
-func (t *Torrent) pieceChunkSpecs(index int) (cs map[ChunkSpec]struct{}) {
+func (t *Torrent) pieceChunkSpecs(index peer_protocol.Integer) (cs map[ChunkSpec]struct{}) {
cs = make(map[ChunkSpec]struct{}, (t.MetaInfo.PieceLength+chunkSize-1)/chunkSize)
c := ChunkSpec{
Begin: 0,
Port int
}
-func (t *Torrent) PieceLength(piece int) (len_ peer_protocol.Integer) {
- if piece == len(t.Pieces)-1 {
+func (t *Torrent) PieceLength(piece peer_protocol.Integer) (len_ peer_protocol.Integer) {
+ if int(piece) == len(t.Pieces)-1 {
len_ = peer_protocol.Integer(t.Data.Size() % t.MetaInfo.PieceLength)
}
if len_ == 0 {
return
}
-func (t *Torrent) HashPiece(piece int) (ps pieceSum) {
+func (t *Torrent) HashPiece(piece peer_protocol.Integer) (ps pieceSum) {
hash := PieceHash.New()
n, err := t.Data.WriteSectionTo(hash, int64(piece)*t.MetaInfo.PieceLength, t.MetaInfo.PieceLength)
if err != nil {
err = errors.New("unknown torrent")
return
}
- index := int(off / int64(t.PieceLength(0)))
+ index := peer_protocol.Integer(off / int64(t.PieceLength(0)))
piece := t.Pieces[index]
pieceOff := peer_protocol.Integer(off % int64(t.PieceLength(0)))
switch piece.State {
c.addTorrent = make(chan *Torrent)
c.torrentFinished = make(chan InfoHash)
c.actorTask = make(chan func())
+ if c.HalfOpenLimit == 0 {
+ c.HalfOpenLimit = 10
+ }
o := copy(c.PeerId[:], BEP20)
_, err := rand.Read(c.PeerId[o:])
if err != nil {
write: make(chan []byte),
post: make(chan encoding.BinaryMarshaler),
}
+ defer conn.Close()
go conn.writer()
go conn.writeOptimizer()
conn.post <- peer_protocol.Bytes(peer_protocol.Protocol)
}
var b [28]byte
_, err := io.ReadFull(conn.Socket, b[:])
- if err != nil {
- log.Fatal(err)
+ switch err {
+ case nil:
+ case io.EOF:
+ return
+ default:
+ err = fmt.Errorf("when reading protocol and extensions: %s", err)
+ return
}
if string(b[:20]) != peer_protocol.Protocol {
- log.Printf("wrong protocol: %#v", string(b[:20]))
+ err = fmt.Errorf("wrong protocol: %#v", string(b[:20]))
return
}
if 8 != copy(conn.PeerExtensions[:], b[20:]) {
conn.post <- peer_protocol.Bytes(torrent.InfoHash[:])
conn.post <- peer_protocol.Bytes(me.PeerId[:])
}
+ done := make(chan struct{})
me.withContext(func() {
me.addConnection(torrent, conn)
if torrent.haveAnyPieces() {
Bitfield: torrent.bitfield(),
})
}
- go func() {
- defer me.withContext(func() {
- me.dropConnection(torrent, conn)
- })
- err := me.runConnection(torrent, conn)
- if err != nil {
- log.Print(err)
- }
- }()
+ close(done)
})
+ <-done
+ defer me.withContext(func() {
+ me.dropConnection(torrent, conn)
+ })
+ err = me.runConnection(torrent, conn)
+ if err != nil {
+ log.Print(err)
+ }
}
func (me *Client) peerGotPiece(torrent *Torrent, conn *connection, piece int) {
}
conn.PeerPieces[piece] = true
if torrent.wantPiece(piece) {
- conn.SetInterested(true)
me.replenishConnRequests(torrent, conn)
}
}
break
}
delete(conn.Requests, request_)
- if _, ok := torrent.Pieces[request_.Index].PendingChunkSpecs[request_.ChunkSpec]; !ok {
- log.Printf("got unnecessary chunk: %s", request_)
- break
- }
- err = torrent.WriteChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
- if err != nil {
- break
- }
- delete(torrent.Pieces[request_.Index].PendingChunkSpecs, request_.ChunkSpec)
- me.downloadedChunk(torrent, request_)
- if len(torrent.Pieces[request_.Index].PendingChunkSpecs) == 0 {
- torrent.Pieces[request_.Index].State = pieceStateUnknown
- go me.verifyPiece(torrent, int(request_.Index))
- }
+ err = me.downloadedChunk(torrent, msg)
default:
log.Printf("received unknown message type: %#v", msg.Type)
}
}
func (me *Client) replenishConnRequests(torrent *Torrent, conn *connection) {
- if len(conn.Requests) >= maxRequests {
- return
- }
- if conn.PeerChoked {
+ requestHeatMap := torrent.requestHeat()
+ if torrent.Priorities == nil {
return
}
- requestHeatMap := torrent.requestHeat()
- for index, has := range conn.PeerPieces {
- if !has {
+ for e := torrent.Priorities.Front(); e != nil; e = e.Next() {
+ req := e.Value.(Request)
+ if !conn.PeerPieces[req.Index] {
continue
}
- for chunkSpec, _ := range torrent.Pieces[index].PendingChunkSpecs {
- request := Request{peer_protocol.Integer(index), chunkSpec}
- if heat := requestHeatMap[request]; heat > 0 {
- continue
- }
- conn.SetInterested(true)
- if !conn.Request(request) {
- return
- }
+ switch torrent.Pieces[req.Index].State {
+ case pieceStateUnknown:
+ continue
+ case pieceStateIncomplete:
+ default:
+ panic("prioritized chunk for invalid piece state")
+ }
+ if requestHeatMap[req] > 0 {
+ continue
+ }
+ if !conn.Request(req) {
+ break
}
}
- //conn.SetInterested(false)
}
-func (me *Client) downloadedChunk(t *Torrent, chunk Request) {
+func (me *Client) downloadedChunk(torrent *Torrent, msg *peer_protocol.Message) (err error) {
+ request := Request{msg.Index, ChunkSpec{msg.Begin, peer_protocol.Integer(len(msg.Piece))}}
+ if _, ok := torrent.Pieces[request.Index].PendingChunkSpecs[request.ChunkSpec]; !ok {
+ log.Printf("got unnecessary chunk: %s", request)
+ return
+ }
+ err = torrent.WriteChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
+ if err != nil {
+ return
+ }
+ delete(torrent.Pieces[request.Index].PendingChunkSpecs, request.ChunkSpec)
+ if len(torrent.Pieces[request.Index].PendingChunkSpecs) == 0 {
+ torrent.Pieces[request.Index].State = pieceStateUnknown
+ go me.verifyPiece(torrent, request.Index)
+ return
+ }
+ var next *list.Element
+ for e := torrent.Priorities.Front(); e != nil; e = next {
+ next = e.Next()
+ if e.Value.(Request) == request {
+ torrent.Priorities.Remove(e)
+ }
+ }
+ me.dataReady(DataSpec{torrent.InfoHash, request})
+ return
}
func (cl *Client) dataReady(ds DataSpec) {
}()
}
-func (me *Client) pieceHashed(ih InfoHash, piece int, correct bool) {
+func (me *Client) pieceHashed(ih InfoHash, piece peer_protocol.Integer, correct bool) {
torrent := me.torrents[ih]
newState := func() pieceState {
if correct {
return pieceStateIncomplete
}
}()
- oldState := torrent.Pieces[piece].State
- if newState == oldState {
- return
- }
+ // oldState := torrent.Pieces[piece].State
+ // if newState == oldState {
+ // return
+ // }
torrent.Pieces[piece].State = newState
switch newState {
case pieceStateIncomplete:
- torrent.Pieces[piece].PendingChunkSpecs = torrent.pieceChunkSpecs(piece)
+ torrent.Pieces[int(piece)].PendingChunkSpecs = torrent.pieceChunkSpecs(piece)
case pieceStateComplete:
- log.Print(piece)
+ var next *list.Element
+ if torrent.Priorities != nil {
+ for e := torrent.Priorities.Front(); e != nil; e = next {
+ next = e.Next()
+ if e.Value.(Request).Index == piece {
+ torrent.Priorities.Remove(e)
+ }
+ }
+ }
me.dataReady(DataSpec{
torrent.InfoHash,
Request{
}
}
-func (me *Client) verifyPiece(torrent *Torrent, index int) {
+func (me *Client) verifyPiece(torrent *Torrent, index peer_protocol.Integer) {
sum := torrent.HashPiece(index)
+ done := make(chan struct{})
me.withContext(func() {
me.pieceHashed(torrent.InfoHash, index, sum == torrent.Pieces[index].Hash)
+ close(done)
})
+ <-done
}
func (me *Client) run() {
me.torrents[torrent.InfoHash] = torrent
go func() {
for index := range torrent.Pieces {
- me.verifyPiece(torrent, index)
+ me.verifyPiece(torrent, peer_protocol.Integer(index))
}
}()
case infoHash := <-me.torrentFinished:
<-done
return
}
+
+func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) {
+ done := make(chan struct{})
+ cl.withContext(func() {
+ t := cl.torrent(ih)
+ t.PrioritizeDataRegion(off, len_)
+ for _, cn := range t.Conns {
+ cl.replenishConnRequests(t, cn)
+ }
+ close(done)
+ })
+ <-done
+}