]> Sergey Matveev's repositories - btrtrc.git/blobdiff - client.go
Add a end-to-end test for torrentfs
[btrtrc.git] / client.go
index f874daba7e752c44b2151a509acea0da33b5fd80..83d0d6a082133f05d4bebe595b6d8f462ae31927 100644 (file)
--- a/client.go
+++ b/client.go
@@ -91,10 +91,12 @@ type Connection struct {
        post   chan encoding.BinaryMarshaler
        write  chan []byte
 
+       // Stuff controlled by the local peer.
        Interested bool
        Choked     bool
        Requests   map[Request]struct{}
 
+       // Stuff controlled by the remote peer.
        PeerId         [20]byte
        PeerInterested bool
        PeerChoked     bool
@@ -142,6 +144,16 @@ func (c *Connection) Request(chunk Request) bool {
        return true
 }
 
+func (c *Connection) Unchoke() {
+       if !c.Choked {
+               return
+       }
+       c.Post(peer_protocol.Message{
+               Type: peer_protocol.Unchoke,
+       })
+       c.Choked = false
+}
+
 func (c *Connection) SetInterested(interested bool) {
        if c.Interested == interested {
                return
@@ -282,19 +294,24 @@ func (cl *Client) queuePieceCheck(t *Torrent, pieceIndex peer_protocol.Integer)
 }
 
 func (cl *Client) PrioritizeDataRegion(ih InfoHash, off, len_ int64) {
+       log.Print(len_)
        cl.mu.Lock()
        defer cl.mu.Unlock()
        t := cl.torrent(ih)
        newPriorities := make([]Request, 0, (len_+2*(chunkSize-1))/chunkSize)
-       for len_ > 0 {
-               index := peer_protocol.Integer(off / int64(t.PieceLength(0)))
-               pieceOff := peer_protocol.Integer(off % int64(t.PieceLength(0)))
+       for len_ != 0 {
+               // TODO: Write a function to return the Request for a given offset.
+               index := peer_protocol.Integer(off / t.MetaInfo.PieceLength)
+               pieceOff := peer_protocol.Integer(off % t.MetaInfo.PieceLength)
                piece := t.Pieces[index]
                if !piece.EverHashed {
                        cl.queuePieceCheck(t, index)
                }
                chunk := ChunkSpec{pieceOff / chunkSize * chunkSize, chunkSize}
-               adv := int64(chunkSize - pieceOff%chunkSize)
+               if int64(chunk.Length) > len_ {
+                       chunk.Length = peer_protocol.Integer(len_)
+               }
+               adv := int64(chunk.Length - pieceOff%chunkSize)
                off += adv
                len_ -= adv
                if _, ok := piece.PendingChunkSpecs[chunk]; !ok && !piece.Hashing {
@@ -357,8 +374,9 @@ func (t *Torrent) requestHeat() (ret map[Request]int) {
 }
 
 type Peer struct {
-       Id [20]byte
-       tracker.Peer
+       Id   [20]byte
+       IP   net.IP
+       Port int
 }
 
 func (t *Torrent) PieceLength(piece peer_protocol.Integer) (len_ peer_protocol.Integer) {
@@ -394,6 +412,7 @@ type Client struct {
        HalfOpenLimit int
        PeerId        [20]byte
        DataReady     chan DataSpec
+       Listener      net.Listener
 
        sync.Mutex
        mu    *sync.Mutex
@@ -415,7 +434,16 @@ func (cl *Client) TorrentReadAt(ih InfoHash, off int64, p []byte) (n int, err er
                err = errors.New("unknown torrent")
                return
        }
-       index := peer_protocol.Integer(off / int64(t.PieceLength(0)))
+       index := peer_protocol.Integer(off / t.MetaInfo.PieceLength)
+       // Reading outside the bounds of a file is an error.
+       if index < 0 {
+               err = os.ErrInvalid
+               return
+       }
+       if int(index) >= len(t.Pieces) {
+               err = io.EOF
+               return
+       }
        piece := t.Pieces[index]
        if !piece.EverHashed {
                cl.queuePieceCheck(t, index)
@@ -459,6 +487,24 @@ func (c *Client) Start() {
        if err != nil {
                panic("error generating peer id")
        }
+       if c.Listener != nil {
+               go c.acceptConnections()
+       }
+}
+
+func (cl *Client) acceptConnections() {
+       for {
+               conn, err := cl.Listener.Accept()
+               if err != nil {
+                       log.Print(err)
+                       return
+               }
+               go func() {
+                       if err := cl.runConnection(conn, nil); err != nil {
+                               log.Print(err)
+                       }
+               }()
+       }
 }
 
 func mmapTorrentData(metaInfo *metainfo.MetaInfo, location string) (mms MMapSpan, err error) {
@@ -539,7 +585,7 @@ func (me *Client) initiateConn(peer Peer, torrent *Torrent) {
                        return
                }
                log.Printf("connected to %s", conn.RemoteAddr())
-               err = me.runConnection(conn, torrent, peer.Id)
+               err = me.runConnection(conn, torrent)
                if err != nil {
                        log.Print(err)
                }
@@ -564,7 +610,7 @@ func (me *Torrent) haveAnyPieces() bool {
        return false
 }
 
-func (me *Client) runConnection(sock net.Conn, torrent *Torrent, peerId [20]byte) (err error) {
+func (me *Client) runConnection(sock net.Conn, torrent *Torrent) (err error) {
        conn := &Connection{
                Socket:     sock,
                Choked:     true,
@@ -676,15 +722,36 @@ func (me *Client) connectionLoop(torrent *Torrent, conn *Connection) error {
                        me.peerUnchoked(torrent, conn)
                case peer_protocol.Interested:
                        conn.PeerInterested = true
+                       // TODO: This should be done from a dedicated unchoking routine.
+                       conn.Unchoke()
                case peer_protocol.NotInterested:
                        conn.PeerInterested = false
                case peer_protocol.Have:
                        me.peerGotPiece(torrent, conn, int(msg.Index))
                case peer_protocol.Request:
-                       conn.PeerRequests[Request{
+                       if conn.PeerRequests == nil {
+                               conn.PeerRequests = make(map[Request]struct{}, maxRequests)
+                       }
+                       request := Request{
                                Index:     msg.Index,
                                ChunkSpec: ChunkSpec{msg.Begin, msg.Length},
-                       }] = struct{}{}
+                       }
+                       conn.PeerRequests[request] = struct{}{}
+                       // TODO: Requests should be satisfied from a dedicated upload routine.
+                       p := make([]byte, msg.Length)
+                       n, err := torrent.Data.ReadAt(p, int64(torrent.PieceLength(0))*int64(msg.Index)+int64(msg.Begin))
+                       if err != nil {
+                               return fmt.Errorf("reading torrent data to serve request %s: %s", request, err)
+                       }
+                       if n != int(msg.Length) {
+                               return fmt.Errorf("bad request: %s", msg)
+                       }
+                       conn.Post(peer_protocol.Message{
+                               Type:  peer_protocol.Piece,
+                               Index: msg.Index,
+                               Begin: msg.Begin,
+                               Piece: p,
+                       })
                case peer_protocol.Bitfield:
                        if len(msg.Bitfield) < len(torrent.Pieces) {
                                err = errors.New("received invalid bitfield")
@@ -827,10 +894,9 @@ func (me *Client) AddTorrent(metaInfo *metainfo.MetaInfo) error {
        }
        me.torrents[torrent.InfoHash] = torrent
        go me.announceTorrent(torrent)
-       go me.PrioritizeDataRegion(torrent.InfoHash, 0, torrent.Length())
-       // for i := range torrent.Pieces {
-       //      me.queuePieceCheck(torrent, peer_protocol.Integer(i))
-       // }
+       for i := range torrent.Pieces {
+               me.queuePieceCheck(torrent, peer_protocol.Integer(i))
+       }
        return nil
 }
 
@@ -857,7 +923,8 @@ newAnnounce:
                                var peers []Peer
                                for _, peer := range resp.Peers {
                                        peers = append(peers, Peer{
-                                               Peer: peer,
+                                               IP:   peer.IP,
+                                               Port: peer.Port,
                                        })
                                }
                                if err := cl.AddPeers(t.InfoHash, peers); err != nil {
@@ -943,7 +1010,6 @@ func (me *Client) downloadedChunk(torrent *Torrent, msg *peer_protocol.Message)
        delete(torrent.Pieces[request.Index].PendingChunkSpecs, request.ChunkSpec)
        if len(torrent.Pieces[request.Index].PendingChunkSpecs) == 0 {
                me.queuePieceCheck(torrent, request.Index)
-               return
        }
        var next *list.Element
        for e := torrent.Priorities.Front(); e != nil; e = next {