From eebd09c0fef2832f5330cd0ce571638b3af07e07 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Sun, 6 Sep 2015 12:33:22 +1000 Subject: [PATCH] Allow subscribing to torrent piece state changes --- client.go | 5 ++++- client_test.go | 10 ++++++++++ piece.go | 1 + t.go | 5 +++++ torrent.go | 19 ++++++++++++++++--- 5 files changed, 36 insertions(+), 4 deletions(-) diff --git a/client.go b/client.go index aba0ab33..0707b8ea 100644 --- a/client.go +++ b/client.go @@ -1945,7 +1945,8 @@ func newTorrent(ih InfoHash) (t *torrent, err error) { gotMetainfo: make(chan struct{}), - HalfOpen: make(map[string]struct{}), + HalfOpen: make(map[string]struct{}), + pieceStateChanges: pubsub.NewPubSub(), } t.wantPeers.L = &t.stateMu return @@ -2587,6 +2588,7 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er // log.Println("got chunk", req) piece.Event.Broadcast() + defer t.publishPieceChange(int(req.Index)) // Record that we have the chunk. piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize)) delete(t.urgent, req) @@ -2656,6 +2658,7 @@ func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) { func (me *Client) pieceChanged(t *torrent, piece int) { correct := t.pieceComplete(piece) p := t.Pieces[piece] + defer t.publishPieceChange(piece) defer p.Event.Broadcast() if correct { p.Priority = PiecePriorityNone diff --git a/client_test.go b/client_test.go index 47f13715..be387f78 100644 --- a/client_test.go +++ b/client_test.go @@ -274,6 +274,16 @@ func TestClientTransfer(t *testing.T) { ret.ChunkSize = 2 return }()) + // TODO: The piece state publishing is kinda jammed in here until I have a + // more thorough test. + go func() { + s := leecherGreeting.pieceStateChanges.Subscribe() + defer s.Close() + for i := range s.Values { + log.Print(i) + } + log.Print("finished") + }() leecherGreeting.AddPeers([]Peer{ Peer{ IP: missinggo.AddrIP(seeder.ListenAddr()), diff --git a/piece.go b/piece.go index 38921552..408dce37 100644 --- a/piece.go +++ b/piece.go @@ -30,6 +30,7 @@ type piece struct { EverHashed bool Event sync.Cond Priority piecePriority + PublicPieceState PieceState pendingWritesMutex sync.Mutex pendingWrites int diff --git a/t.go b/t.go index c845277c..87ae5d82 100644 --- a/t.go +++ b/t.go @@ -1,6 +1,7 @@ package torrent import ( + "github.com/anacrolix/missinggo/pubsub" "github.com/anacrolix/torrent/metainfo" ) @@ -68,3 +69,7 @@ func (t Torrent) BytesCompleted() int64 { defer t.cl.mu.RUnlock() return t.bytesCompleted() } + +func (t Torrent) SubscribePieceStateChanges() *pubsub.Subscription { + return t.torrent.pieceStateChanges.Subscribe() +} diff --git a/torrent.go b/torrent.go index d1034f82..b0c607b0 100644 --- a/torrent.go +++ b/torrent.go @@ -11,6 +11,7 @@ import ( "time" "github.com/anacrolix/missinggo" + "github.com/anacrolix/missinggo/pubsub" "github.com/bradfitz/iter" "github.com/anacrolix/torrent/bencode" @@ -60,9 +61,11 @@ type torrent struct { // announcing, and communicating with peers. ceasingNetworking chan struct{} - InfoHash InfoHash - Pieces []*piece - chunkSize pp.Integer + InfoHash InfoHash + Pieces []*piece + // Values are the piece indices that changed. + pieceStateChanges *pubsub.PubSub + chunkSize pp.Integer // Chunks that are wanted before all others. This is for // responsive/streaming readers that want to unblock ASAP. urgent map[request]struct{} @@ -540,6 +543,7 @@ func (t *torrent) close() (err error) { for _, conn := range t.Conns { conn.Close() } + t.pieceStateChanges.Close() return } @@ -761,3 +765,12 @@ func (t *torrent) worstBadConn(cl *Client) *connection { } return nil } + +func (t *torrent) publishPieceChange(piece int) { + cur := t.pieceState(piece) + p := t.Pieces[piece] + if cur != p.PublicPieceState { + t.pieceStateChanges.Publish(piece) + } + p.PublicPieceState = cur +} -- 2.48.1