]> Sergey Matveev's repositories - btrtrc.git/blobdiff - client.go
Improve uploading/seeding
[btrtrc.git] / client.go
index 1f11990051f8a0aa21fece170c9690387d127e5f..5f9420fd608ba82c34f375df0504cde81fe8231a 100644 (file)
--- a/client.go
+++ b/client.go
@@ -232,7 +232,7 @@ func (cl *Client) WriteStatus(_w io.Writer) {
                        w.WriteString("<missing metainfo>")
                }
                fmt.Fprint(w, "\n")
-               t.writeStatus(w)
+               t.writeStatus(w, cl)
                fmt.Fprintln(w)
        }
 }
@@ -342,7 +342,6 @@ func (t *torrent) connPendPiece(c *connection, piece int) {
 func (cl *Client) raisePiecePriority(t *torrent, piece int, priority piecePriority) {
        if t.Pieces[piece].Priority < priority {
                cl.prioritizePiece(t, piece, priority)
-               cl.event.Broadcast()
        }
 }
 
@@ -1404,6 +1403,54 @@ func (cl *Client) peerHasAll(t *torrent, cn *connection) {
        }
 }
 
+func (me *Client) upload(t *torrent, c *connection) {
+       if me.config.NoUpload {
+               return
+       }
+       if !c.PeerInterested {
+               return
+       }
+       if !me.seeding(t) && !t.connHasWantedPieces(c) {
+               return
+       }
+another:
+       for c.chunksSent < c.UsefulChunksReceived+6 {
+               c.Unchoke()
+               for r := range c.PeerRequests {
+                       err := me.sendChunk(t, c, r)
+                       if err != nil {
+                               log.Printf("error sending chunk to peer: %s", err)
+                       }
+                       delete(c.PeerRequests, r)
+                       goto another
+               }
+               return
+       }
+       c.Choke()
+}
+
+func (me *Client) sendChunk(t *torrent, c *connection, r request) error {
+       b := make([]byte, r.Length)
+       p := t.Info.Piece(int(r.Index))
+       n, err := dataReadAt(t.data, b, p.Offset()+int64(r.Begin))
+       if err != nil {
+               return err
+       }
+       if n != len(b) {
+               log.Fatal(b)
+       }
+       c.Post(pp.Message{
+               Type:  pp.Piece,
+               Index: r.Index,
+               Begin: r.Begin,
+               Piece: b,
+       })
+       uploadChunksPosted.Add(1)
+       c.chunksSent++
+       c.lastChunkSent = time.Now()
+       return nil
+}
+
 // Processes incoming bittorrent messages. The client lock is held upon entry
 // and exit.
 func (me *Client) connectionLoop(t *torrent, c *connection) error {
@@ -1448,11 +1495,7 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error {
                        me.peerUnchoked(t, c)
                case pp.Interested:
                        c.PeerInterested = true
-                       // TODO: This should be done from a dedicated unchoking routine.
-                       if me.config.NoUpload {
-                               break
-                       }
-                       c.Unchoke()
+                       me.upload(t, c)
                case pp.NotInterested:
                        c.PeerInterested = false
                        c.Choke()
@@ -1462,30 +1505,15 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error {
                        if c.Choked {
                                break
                        }
-                       request := newRequest(msg.Index, msg.Begin, msg.Length)
-                       // TODO: Requests should be satisfied from a dedicated upload
-                       // routine.
-                       // c.PeerRequests[request] = struct{}{}
-                       // if c.PeerRequests == nil {
-                       //      c.PeerRequests = make(map[request]struct{}, maxRequests)
-                       // }
-                       p := make([]byte, msg.Length)
-                       n, err := dataReadAt(t.data, p, int64(t.pieceLength(0))*int64(msg.Index)+int64(msg.Begin))
-                       // TODO: Failing to read for a request should not be fatal to the connection.
-                       if err != nil {
-                               return fmt.Errorf("reading t data to serve request %q: %s", request, err)
+                       if !c.PeerInterested {
+                               err = errors.New("peer sent request but isn't interested")
+                               break
                        }
-                       if n != int(msg.Length) {
-                               return fmt.Errorf("bad request: %v", msg)
+                       if c.PeerRequests == nil {
+                               c.PeerRequests = make(map[request]struct{}, maxRequests)
                        }
-                       c.Post(pp.Message{
-                               Type:  pp.Piece,
-                               Index: msg.Index,
-                               Begin: msg.Begin,
-                               Piece: p,
-                       })
-                       uploadChunksPosted.Add(1)
-                       c.chunksSent++
+                       c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{}
+                       me.upload(t, c)
                case pp.Cancel:
                        req := newRequest(msg.Index, msg.Begin, msg.Length)
                        if !c.PeerCancel(req) {
@@ -1699,7 +1727,7 @@ func (me *Client) addConnection(t *torrent, c *connection) bool {
        // TODO: This should probably be done by a routine that kills off bad
        // connections, and extra connections killed here instead.
        if len(t.Conns) > socketsPerTorrent {
-               wcs := t.worstConnsHeap()
+               wcs := t.worstConnsHeap(me)
                heap.Pop(wcs).(*connection).Close()
        }
        return true
@@ -1717,26 +1745,29 @@ func (t *torrent) needData() bool {
        return false
 }
 
-// TODO: I'm sure there's something here to do with seeding.
-func (t *torrent) badConn(c *connection) bool {
+func (cl *Client) usefulConn(t *torrent, c *connection) bool {
        // A 30 second grace for initial messages to go through.
        if time.Since(c.completedHandshake) < 30*time.Second {
-               return false
+               return true
        }
        if !t.haveInfo() {
                if !c.supportsExtension("ut_metadata") {
-                       return true
+                       return false
                }
-               if time.Since(c.completedHandshake) > 2*time.Minute {
+               if time.Since(c.completedHandshake) < 2*time.Minute {
                        return true
                }
+               return false
        }
-       return !t.connHasWantedPieces(c)
+       if cl.seeding(t) {
+               return c.PeerInterested
+       }
+       return t.connHasWantedPieces(c)
 }
 
-func (t *torrent) numGoodConns() (num int) {
+func (t *torrent) numGoodConns(cl *Client) (num int) {
        for _, c := range t.Conns {
-               if !t.badConn(c) {
+               if cl.usefulConn(t, c) {
                        num++
                }
        }
@@ -1744,10 +1775,10 @@ func (t *torrent) numGoodConns() (num int) {
 }
 
 func (me *Client) wantConns(t *torrent) bool {
-       if me.config.NoUpload && !t.needData() {
+       if !me.seeding(t) && !t.needData() {
                return false
        }
-       if t.numGoodConns() >= socketsPerTorrent {
+       if t.numGoodConns(me) >= socketsPerTorrent {
                return false
        }
        return true
@@ -2228,7 +2259,16 @@ func (cl *Client) waitWantPeers(t *torrent) bool {
 
 // Returns whether the client should make effort to seed the torrent.
 func (cl *Client) seeding(t *torrent) bool {
-       return cl.config.Seed && !cl.config.NoUpload
+       if cl.config.NoUpload {
+               return false
+       }
+       if !cl.config.Seed {
+               return false
+       }
+       if t.needData() {
+               return false
+       }
+       return true
 }
 
 func (cl *Client) announceTorrentDHT(t *torrent, impliedPort bool) {
@@ -2515,6 +2555,8 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
        c.UsefulChunksReceived++
        c.lastUsefulChunkReceived = time.Now()
 
+       me.upload(t, c)
+
        // Write the chunk out.
        err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
        if err != nil {