]> Sergey Matveev's repositories - btrtrc.git/commitdiff
Allow subscribing to torrent piece state changes
authorMatt Joiner <anacrolix@gmail.com>
Sun, 6 Sep 2015 02:33:22 +0000 (12:33 +1000)
committerMatt Joiner <anacrolix@gmail.com>
Sun, 6 Sep 2015 02:33:22 +0000 (12:33 +1000)
client.go
client_test.go
piece.go
t.go
torrent.go

index aba0ab3379c1899b318bc89cde6c4360e4460c62..0707b8eae34ec47ed542f882309586a23fbd7229 100644 (file)
--- a/client.go
+++ b/client.go
@@ -1945,7 +1945,8 @@ func newTorrent(ih InfoHash) (t *torrent, err error) {
 
                gotMetainfo: make(chan struct{}),
 
-               HalfOpen: make(map[string]struct{}),
+               HalfOpen:          make(map[string]struct{}),
+               pieceStateChanges: pubsub.NewPubSub(),
        }
        t.wantPeers.L = &t.stateMu
        return
@@ -2587,6 +2588,7 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
 
        // log.Println("got chunk", req)
        piece.Event.Broadcast()
+       defer t.publishPieceChange(int(req.Index))
        // Record that we have the chunk.
        piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
        delete(t.urgent, req)
@@ -2656,6 +2658,7 @@ func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) {
 func (me *Client) pieceChanged(t *torrent, piece int) {
        correct := t.pieceComplete(piece)
        p := t.Pieces[piece]
+       defer t.publishPieceChange(piece)
        defer p.Event.Broadcast()
        if correct {
                p.Priority = PiecePriorityNone
index 47f1371594fb16a4fb2024b455c7ed899f25d5d7..be387f78712d907d79c6eda9f04a25d88167685f 100644 (file)
@@ -274,6 +274,16 @@ func TestClientTransfer(t *testing.T) {
                ret.ChunkSize = 2
                return
        }())
+       // TODO: The piece state publishing is kinda jammed in here until I have a
+       // more thorough test.
+       go func() {
+               s := leecherGreeting.pieceStateChanges.Subscribe()
+               defer s.Close()
+               for i := range s.Values {
+                       log.Print(i)
+               }
+               log.Print("finished")
+       }()
        leecherGreeting.AddPeers([]Peer{
                Peer{
                        IP:   missinggo.AddrIP(seeder.ListenAddr()),
index 3892155235485925def628def99d7be14142ae0d..408dce376911ad73c0c469b9c1b953e047b2f80d 100644 (file)
--- a/piece.go
+++ b/piece.go
@@ -30,6 +30,7 @@ type piece struct {
        EverHashed        bool
        Event             sync.Cond
        Priority          piecePriority
+       PublicPieceState  PieceState
 
        pendingWritesMutex sync.Mutex
        pendingWrites      int
diff --git a/t.go b/t.go
index c845277c645d1595f4f4d41f8248dae6bc7fb97d..87ae5d82e5a26a7a4595ebd1ae03b90751cb6568 100644 (file)
--- a/t.go
+++ b/t.go
@@ -1,6 +1,7 @@
 package torrent
 
 import (
+       "github.com/anacrolix/missinggo/pubsub"
        "github.com/anacrolix/torrent/metainfo"
 )
 
@@ -68,3 +69,7 @@ func (t Torrent) BytesCompleted() int64 {
        defer t.cl.mu.RUnlock()
        return t.bytesCompleted()
 }
+
+func (t Torrent) SubscribePieceStateChanges() *pubsub.Subscription {
+       return t.torrent.pieceStateChanges.Subscribe()
+}
index d1034f82853ad6a27392d6de6ced669786f88597..b0c607b05bf2d23aa19c0b8db4cdc8833366f9cc 100644 (file)
@@ -11,6 +11,7 @@ import (
        "time"
 
        "github.com/anacrolix/missinggo"
+       "github.com/anacrolix/missinggo/pubsub"
        "github.com/bradfitz/iter"
 
        "github.com/anacrolix/torrent/bencode"
@@ -60,9 +61,11 @@ type torrent struct {
        // announcing, and communicating with peers.
        ceasingNetworking chan struct{}
 
-       InfoHash  InfoHash
-       Pieces    []*piece
-       chunkSize pp.Integer
+       InfoHash InfoHash
+       Pieces   []*piece
+       // Values are the piece indices that changed.
+       pieceStateChanges *pubsub.PubSub
+       chunkSize         pp.Integer
        // Chunks that are wanted before all others. This is for
        // responsive/streaming readers that want to unblock ASAP.
        urgent map[request]struct{}
@@ -540,6 +543,7 @@ func (t *torrent) close() (err error) {
        for _, conn := range t.Conns {
                conn.Close()
        }
+       t.pieceStateChanges.Close()
        return
 }
 
@@ -761,3 +765,12 @@ func (t *torrent) worstBadConn(cl *Client) *connection {
        }
        return nil
 }
+
+func (t *torrent) publishPieceChange(piece int) {
+       cur := t.pieceState(piece)
+       p := t.Pieces[piece]
+       if cur != p.PublicPieceState {
+               t.pieceStateChanges.Publish(piece)
+       }
+       p.PublicPieceState = cur
+}