]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Improve uploading/seeding
authorMatt Joiner <anacrolix@gmail.com>
Tue, 16 Jun 2015 06:57:47 +0000 (16:57 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Tue, 16 Jun 2015 06:57:47 +0000 (16:57 +1000)
client.go
client_test.go
connection.go
fs/torrentfs_test.go
piece.go
torrent.go
worst_conns.go

index 1f11990051f8a0aa21fece170c9690387d127e5f..5f9420fd608ba82c34f375df0504cde81fe8231a 100644 (file)
--- a/client.go
+++ b/client.go
@@ -232,7 +232,7 @@ func (cl *Client) WriteStatus(_w io.Writer) {
                        w.WriteString("<missing metainfo>")
                }
                fmt.Fprint(w, "\n")
-               t.writeStatus(w)
+               t.writeStatus(w, cl)
                fmt.Fprintln(w)
        }
 }
@@ -342,7 +342,6 @@ func (t *torrent) connPendPiece(c *connection, piece int) {
 func (cl *Client) raisePiecePriority(t *torrent, piece int, priority piecePriority) {
        if t.Pieces[piece].Priority < priority {
                cl.prioritizePiece(t, piece, priority)
-               cl.event.Broadcast()
        }
 }
 
@@ -1404,6 +1403,54 @@ func (cl *Client) peerHasAll(t *torrent, cn *connection) {
        }
 }
 
+func (me *Client) upload(t *torrent, c *connection) {
+       if me.config.NoUpload {
+               return
+       }
+       if !c.PeerInterested {
+               return
+       }
+       if !me.seeding(t) && !t.connHasWantedPieces(c) {
+               return
+       }
+another:
+       for c.chunksSent < c.UsefulChunksReceived+6 {
+               c.Unchoke()
+               for r := range c.PeerRequests {
+                       err := me.sendChunk(t, c, r)
+                       if err != nil {
+                               log.Printf("error sending chunk to peer: %s", err)
+                       }
+                       delete(c.PeerRequests, r)
+                       goto another
+               }
+               return
+       }
+       c.Choke()
+}
+
+func (me *Client) sendChunk(t *torrent, c *connection, r request) error {
+       b := make([]byte, r.Length)
+       p := t.Info.Piece(int(r.Index))
+       n, err := dataReadAt(t.data, b, p.Offset()+int64(r.Begin))
+       if err != nil {
+               return err
+       }
+       if n != len(b) {
+               log.Fatal(b)
+       }
+       c.Post(pp.Message{
+               Type:  pp.Piece,
+               Index: r.Index,
+               Begin: r.Begin,
+               Piece: b,
+       })
+       uploadChunksPosted.Add(1)
+       c.chunksSent++
+       c.lastChunkSent = time.Now()
+       return nil
+}
+
 // Processes incoming bittorrent messages. The client lock is held upon entry
 // and exit.
 func (me *Client) connectionLoop(t *torrent, c *connection) error {
@@ -1448,11 +1495,7 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error {
                        me.peerUnchoked(t, c)
                case pp.Interested:
                        c.PeerInterested = true
-                       // TODO: This should be done from a dedicated unchoking routine.
-                       if me.config.NoUpload {
-                               break
-                       }
-                       c.Unchoke()
+                       me.upload(t, c)
                case pp.NotInterested:
                        c.PeerInterested = false
                        c.Choke()
@@ -1462,30 +1505,15 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error {
                        if c.Choked {
                                break
                        }
-                       request := newRequest(msg.Index, msg.Begin, msg.Length)
-                       // TODO: Requests should be satisfied from a dedicated upload
-                       // routine.
-                       // c.PeerRequests[request] = struct{}{}
-                       // if c.PeerRequests == nil {
-                       //      c.PeerRequests = make(map[request]struct{}, maxRequests)
-                       // }
-                       p := make([]byte, msg.Length)
-                       n, err := dataReadAt(t.data, p, int64(t.pieceLength(0))*int64(msg.Index)+int64(msg.Begin))
-                       // TODO: Failing to read for a request should not be fatal to the connection.
-                       if err != nil {
-                               return fmt.Errorf("reading t data to serve request %q: %s", request, err)
+                       if !c.PeerInterested {
+                               err = errors.New("peer sent request but isn't interested")
+                               break
                        }
-                       if n != int(msg.Length) {
-                               return fmt.Errorf("bad request: %v", msg)
+                       if c.PeerRequests == nil {
+                               c.PeerRequests = make(map[request]struct{}, maxRequests)
                        }
-                       c.Post(pp.Message{
-                               Type:  pp.Piece,
-                               Index: msg.Index,
-                               Begin: msg.Begin,
-                               Piece: p,
-                       })
-                       uploadChunksPosted.Add(1)
-                       c.chunksSent++
+                       c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{}
+                       me.upload(t, c)
                case pp.Cancel:
                        req := newRequest(msg.Index, msg.Begin, msg.Length)
                        if !c.PeerCancel(req) {
@@ -1699,7 +1727,7 @@ func (me *Client) addConnection(t *torrent, c *connection) bool {
        // TODO: This should probably be done by a routine that kills off bad
        // connections, and extra connections killed here instead.
        if len(t.Conns) > socketsPerTorrent {
-               wcs := t.worstConnsHeap()
+               wcs := t.worstConnsHeap(me)
                heap.Pop(wcs).(*connection).Close()
        }
        return true
@@ -1717,26 +1745,29 @@ func (t *torrent) needData() bool {
        return false
 }
 
-// TODO: I'm sure there's something here to do with seeding.
-func (t *torrent) badConn(c *connection) bool {
+func (cl *Client) usefulConn(t *torrent, c *connection) bool {
        // A 30 second grace for initial messages to go through.
        if time.Since(c.completedHandshake) < 30*time.Second {
-               return false
+               return true
        }
        if !t.haveInfo() {
                if !c.supportsExtension("ut_metadata") {
-                       return true
+                       return false
                }
-               if time.Since(c.completedHandshake) > 2*time.Minute {
+               if time.Since(c.completedHandshake) < 2*time.Minute {
                        return true
                }
+               return false
        }
-       return !t.connHasWantedPieces(c)
+       if cl.seeding(t) {
+               return c.PeerInterested
+       }
+       return t.connHasWantedPieces(c)
 }
 
-func (t *torrent) numGoodConns() (num int) {
+func (t *torrent) numGoodConns(cl *Client) (num int) {
        for _, c := range t.Conns {
-               if !t.badConn(c) {
+               if cl.usefulConn(t, c) {
                        num++
                }
        }
@@ -1744,10 +1775,10 @@ func (t *torrent) numGoodConns() (num int) {
 }
 
 func (me *Client) wantConns(t *torrent) bool {
-       if me.config.NoUpload && !t.needData() {
+       if !me.seeding(t) && !t.needData() {
                return false
        }
-       if t.numGoodConns() >= socketsPerTorrent {
+       if t.numGoodConns(me) >= socketsPerTorrent {
                return false
        }
        return true
@@ -2228,7 +2259,16 @@ func (cl *Client) waitWantPeers(t *torrent) bool {
 
 // Returns whether the client should make effort to seed the torrent.
 func (cl *Client) seeding(t *torrent) bool {
-       return cl.config.Seed && !cl.config.NoUpload
+       if cl.config.NoUpload {
+               return false
+       }
+       if !cl.config.Seed {
+               return false
+       }
+       if t.needData() {
+               return false
+       }
+       return true
 }
 
 func (cl *Client) announceTorrentDHT(t *torrent, impliedPort bool) {
@@ -2515,6 +2555,8 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
        c.UsefulChunksReceived++
        c.lastUsefulChunkReceived = time.Now()
 
+       me.upload(t, c)
+
        // Write the chunk out.
        err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
        if err != nil {
index 0e924bb26869756c1ff2313482bfc6c56ee47a0c..8c7cc344165ff85986300333e6fc49c772faae45 100644 (file)
@@ -18,8 +18,8 @@ import (
        "gopkg.in/check.v1"
 
        "github.com/anacrolix/torrent/bencode"
-       "github.com/anacrolix/torrent/data/blob"
        "github.com/anacrolix/torrent/data"
+       "github.com/anacrolix/torrent/data/blob"
        "github.com/anacrolix/torrent/internal/testutil"
        "github.com/anacrolix/torrent/metainfo"
        "github.com/anacrolix/torrent/util"
@@ -251,6 +251,7 @@ func TestClientTransfer(t *testing.T) {
        greetingTempDir, mi := testutil.GreetingTestTorrent()
        defer os.RemoveAll(greetingTempDir)
        cfg := TestingConfig
+       cfg.Seed = true
        cfg.DataDir = greetingTempDir
        seeder, err := NewClient(&cfg)
        if err != nil {
index 943e1a7cfcc29271dd99c499739678bedeedee54..a57462b3dcae273ece65a8920a12566cd4c2033d 100644 (file)
@@ -52,6 +52,7 @@ type connection struct {
        lastMessageReceived     time.Time
        completedHandshake      time.Time
        lastUsefulChunkReceived time.Time
+       lastChunkSent           time.Time
 
        // Stuff controlled by the local peer.
        Interested       bool
index b77dab2fefd56b0819cbeee11d7a811cc80adf7a..1d5dc10048f71500e4fef5a7a0797d9cf34f7a13 100644 (file)
@@ -170,6 +170,7 @@ func TestDownloadOnDemand(t *testing.T) {
                DisableTrackers: true,
                NoDHT:           true,
                ListenAddr:      ":0",
+               Seed:            true,
 
                NoDefaultBlocklist: true,
                // Ensure that the metainfo is obtained over the wire, since we added
index bae74094da2d1f618ab095507750450dd9683f23..a5e9947b9cb56c629bfade0585b8111c540a5764 100644 (file)
--- a/piece.go
+++ b/piece.go
@@ -20,7 +20,8 @@ const (
 )
 
 type piece struct {
-       Hash pieceSum // The completed piece SHA1 hash, from the metainfo "pieces" field.
+       // The completed piece SHA1 hash, from the metainfo "pieces" field.
+       Hash pieceSum
        // Chunks we don't have. The offset and length can be determined by the
        // request chunkSize in use.
        PendingChunkSpecs []bool
index e32175e9429a4ab74159334aa95e326cc92da89c..30afe1a9ecc4a9883abedf587eabe5f2836ef1f8 100644 (file)
@@ -131,10 +131,11 @@ func (t *torrent) addrActive(addr string) bool {
        return false
 }
 
-func (t *torrent) worstConnsHeap() (wcs *worstConns) {
+func (t *torrent) worstConnsHeap(cl *Client) (wcs *worstConns) {
        wcs = &worstConns{
-               c: append([]*connection{}, t.Conns...),
-               t: t,
+               c:  append([]*connection{}, t.Conns...),
+               t:  t,
+               cl: cl,
        }
        heap.Init(wcs)
        return
@@ -376,7 +377,7 @@ func pieceStateRunStatusChars(psr PieceStateRun) (ret string) {
        return
 }
 
-func (t *torrent) writeStatus(w io.Writer) {
+func (t *torrent) writeStatus(w io.Writer, cl *Client) {
        fmt.Fprintf(w, "Infohash: %x\n", t.InfoHash)
        fmt.Fprintf(w, "Metadata length: %d\n", t.metadataSize())
        fmt.Fprintf(w, "Metadata have: ")
@@ -421,8 +422,9 @@ func (t *torrent) writeStatus(w io.Writer) {
        fmt.Fprintf(w, "Half open: %d\n", len(t.HalfOpen))
        fmt.Fprintf(w, "Active peers: %d\n", len(t.Conns))
        sort.Sort(&worstConns{
-               c: t.Conns,
-               t: t,
+               c:  t.Conns,
+               t:  t,
+               cl: cl,
        })
        for _, c := range t.Conns {
                c.WriteStatus(w, t)
@@ -685,8 +687,9 @@ func (t *torrent) wantChunk(r request) bool {
 }
 
 func (t *torrent) urgentChunkInPiece(piece int) bool {
+       p := pp.Integer(piece)
        for req := range t.urgent {
-               if int(req.Index) == piece {
+               if req.Index == p {
                        return true
                }
        }
index 69f67e93776a19e9946fd3a91f4eb65d1364b28b..0d7b9d0ddf0d6b27079249cc1cca5e02647e61db 100644 (file)
@@ -6,12 +6,13 @@ import (
 
 // Implements heap functions such that [0] is the worst connection.
 type worstConns struct {
-       c []*connection
-       t *torrent
+       c  []*connection
+       t  *torrent
+       cl *Client
 }
 
-func (me worstConns) Len() int      { return len(me.c) }
-func (me worstConns) Swap(i, j int) { me.c[i], me.c[j] = me.c[j], me.c[i] }
+func (me *worstConns) Len() int      { return len(me.c) }
+func (me *worstConns) Swap(i, j int) { me.c[i], me.c[j] = me.c[j], me.c[i] }
 
 func (me *worstConns) Pop() (ret interface{}) {
        old := me.c
@@ -26,37 +27,25 @@ func (me *worstConns) Push(x interface{}) {
 }
 
 type worstConnsSortKey struct {
-       // Peer has something we want.
-       useless bool
-       // A fabricated duration since peer was last helpful.
-       age time.Duration
+       useful      bool
+       lastHelpful time.Time
 }
 
 func (me worstConnsSortKey) Less(other worstConnsSortKey) bool {
-       if me.useless != other.useless {
-               return me.useless
+       if me.useful != other.useful {
+               return !me.useful
        }
-       return me.age > other.age
+       return me.lastHelpful.Before(other.lastHelpful)
 }
 
-func (me worstConns) key(i int) (key worstConnsSortKey) {
+func (me *worstConns) key(i int) (key worstConnsSortKey) {
        c := me.c[i]
-       // Peer has had time to declare what they have.
-       if time.Now().Sub(c.completedHandshake) >= 30*time.Second {
-               if !me.t.haveInfo() {
-                       key.useless = !c.supportsExtension("ut_metadata")
-               } else {
-                       if !me.t.connHasWantedPieces(c) {
-                               key.useless = true
-                       }
-               }
+       key.useful = me.cl.usefulConn(me.t, c)
+       if me.cl.seeding(me.t) {
+               key.lastHelpful = c.lastChunkSent
+       } else {
+               key.lastHelpful = c.lastUsefulChunkReceived
        }
-       key.age = time.Duration(1+3*c.UnwantedChunksReceived) * time.Now().Sub(func() time.Time {
-               if !c.lastUsefulChunkReceived.IsZero() {
-                       return c.lastUsefulChunkReceived
-               }
-               return c.completedHandshake.Add(-time.Minute)
-       }()) / time.Duration(1+c.UsefulChunksReceived)
        return
 }