post chan encoding.BinaryMarshaler
write chan []byte
+ // Stuff controlled by the local peer.
Interested bool
Choked bool
Requests map[Request]struct{}
+ // Stuff controlled by the remote peer.
PeerId [20]byte
PeerInterested bool
PeerChoked bool
return true
}
+func (c *Connection) Unchoke() {
+ if !c.Choked {
+ return
+ }
+ c.Post(peer_protocol.Message{
+ Type: peer_protocol.Unchoke,
+ })
+ c.Choked = false
+}
+
func (c *Connection) SetInterested(interested bool) {
if c.Interested == interested {
return
}
func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) {
+ log.Print(len_)
cl.mu.Lock()
defer cl.mu.Unlock()
t := cl.torrent(ih)
newPriorities := make([]Request, 0, (len_+2*(chunkSize-1))/chunkSize)
- for len_ > 0 {
- index := peer_protocol.Integer(off / int64(t.PieceLength(0)))
- pieceOff := peer_protocol.Integer(off % int64(t.PieceLength(0)))
+ for len_ != 0 {
+ // TODO: Write a function to return the Request for a given offset.
+ index := peer_protocol.Integer(off / t.MetaInfo.PieceLength)
+ pieceOff := peer_protocol.Integer(off % t.MetaInfo.PieceLength)
piece := t.Pieces[index]
if !piece.EverHashed {
cl.queuePieceCheck(t, index)
}
chunk := ChunkSpec{pieceOff / chunkSize * chunkSize, chunkSize}
- adv := int64(chunkSize - pieceOff%chunkSize)
+ if int64(chunk.Length) > len_ {
+ chunk.Length = peer_protocol.Integer(len_)
+ }
+ adv := int64(chunk.Length - pieceOff%chunkSize)
off += adv
len_ -= adv
if _, ok := piece.PendingChunkSpecs[chunk]; !ok && !piece.Hashing {
}
type Peer struct {
- Id [20]byte
- tracker.Peer
+ Id [20]byte
+ IP net.IP
+ Port int
}
func (t *Torrent) PieceLength(piece peer_protocol.Integer) (len_ peer_protocol.Integer) {
HalfOpenLimit int
PeerId [20]byte
DataReady chan DataSpec
+ Listener net.Listener
sync.Mutex
mu *sync.Mutex
err = errors.New("unknown torrent")
return
}
- index := peer_protocol.Integer(off / int64(t.PieceLength(0)))
+ index := peer_protocol.Integer(off / t.MetaInfo.PieceLength)
+ // Reading outside the bounds of a file is an error.
+ if index < 0 {
+ err = os.ErrInvalid
+ return
+ }
+ if int(index) >= len(t.Pieces) {
+ err = io.EOF
+ return
+ }
piece := t.Pieces[index]
if !piece.EverHashed {
cl.queuePieceCheck(t, index)
if err != nil {
panic("error generating peer id")
}
+ if c.Listener != nil {
+ go c.acceptConnections()
+ }
+}
+
+func (cl *Client) acceptConnections() {
+ for {
+ conn, err := cl.Listener.Accept()
+ if err != nil {
+ log.Print(err)
+ return
+ }
+ go func() {
+ if err := cl.runConnection(conn, nil); err != nil {
+ log.Print(err)
+ }
+ }()
+ }
}
func mmapTorrentData(metaInfo *metainfo.MetaInfo, location string) (mms MMapSpan, err error) {
return
}
log.Printf("connected to %s", conn.RemoteAddr())
- err = me.runConnection(conn, torrent, peer.Id)
+ err = me.runConnection(conn, torrent)
if err != nil {
log.Print(err)
}
return false
}
-func (me *Client) runConnection(sock net.Conn, torrent *Torrent, peerId [20]byte) (err error) {
+func (me *Client) runConnection(sock net.Conn, torrent *Torrent) (err error) {
conn := &Connection{
Socket: sock,
Choked: true,
me.peerUnchoked(torrent, conn)
case peer_protocol.Interested:
conn.PeerInterested = true
+ // TODO: This should be done from a dedicated unchoking routine.
+ conn.Unchoke()
case peer_protocol.NotInterested:
conn.PeerInterested = false
case peer_protocol.Have:
me.peerGotPiece(torrent, conn, int(msg.Index))
case peer_protocol.Request:
- conn.PeerRequests[Request{
+ if conn.PeerRequests == nil {
+ conn.PeerRequests = make(map[Request]struct{}, maxRequests)
+ }
+ request := Request{
Index: msg.Index,
ChunkSpec: ChunkSpec{msg.Begin, msg.Length},
- }] = struct{}{}
+ }
+ conn.PeerRequests[request] = struct{}{}
+ // TODO: Requests should be satisfied from a dedicated upload routine.
+ p := make([]byte, msg.Length)
+ n, err := torrent.Data.ReadAt(p, int64(torrent.PieceLength(0))*int64(msg.Index)+int64(msg.Begin))
+ if err != nil {
+ return fmt.Errorf("reading torrent data to serve request %s: %s", request, err)
+ }
+ if n != int(msg.Length) {
+ return fmt.Errorf("bad request: %s", msg)
+ }
+ conn.Post(peer_protocol.Message{
+ Type: peer_protocol.Piece,
+ Index: msg.Index,
+ Begin: msg.Begin,
+ Piece: p,
+ })
case peer_protocol.Bitfield:
if len(msg.Bitfield) < len(torrent.Pieces) {
err = errors.New("received invalid bitfield")
}
me.torrents[torrent.InfoHash] = torrent
go me.announceTorrent(torrent)
- go me.PrioritizeDataRegion(torrent.InfoHash, 0, torrent.Length())
- // for i := range torrent.Pieces {
- // me.queuePieceCheck(torrent, peer_protocol.Integer(i))
- // }
+ for i := range torrent.Pieces {
+ me.queuePieceCheck(torrent, peer_protocol.Integer(i))
+ }
return nil
}
var peers []Peer
for _, peer := range resp.Peers {
peers = append(peers, Peer{
- Peer: peer,
+ IP: peer.IP,
+ Port: peer.Port,
})
}
if err := cl.AddPeers(t.InfoHash, peers); err != nil {
delete(torrent.Pieces[request.Index].PendingChunkSpecs, request.ChunkSpec)
if len(torrent.Pieces[request.Index].PendingChunkSpecs) == 0 {
me.queuePieceCheck(torrent, request.Index)
- return
}
var next *list.Element
for e := torrent.Priorities.Front(); e != nil; e = next {