client.go | 5 ++++- client_test.go | 10 ++++++++++ piece.go | 1 + t.go | 5 +++++ torrent.go | 19 ++++++++++++++++--- diff --git a/client.go b/client.go index aba0ab3379c1899b318bc89cde6c4360e4460c62..0707b8eae34ec47ed542f882309586a23fbd7229 100644 --- a/client.go +++ b/client.go @@ -1945,7 +1945,8 @@ ceasingNetworking: make(chan struct{}), gotMetainfo: make(chan struct{}), - HalfOpen: make(map[string]struct{}), + HalfOpen: make(map[string]struct{}), + pieceStateChanges: pubsub.NewPubSub(), } t.wantPeers.L = &t.stateMu return @@ -2587,6 +2588,7 @@ c.peerTouchedPieces[int(req.Index)] = struct{}{} // log.Println("got chunk", req) piece.Event.Broadcast() + defer t.publishPieceChange(int(req.Index)) // Record that we have the chunk. piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize)) delete(t.urgent, req) @@ -2656,6 +2658,7 @@ // TODO: Check this isn't called more than once for each piece being correct. func (me *Client) pieceChanged(t *torrent, piece int) { correct := t.pieceComplete(piece) p := t.Pieces[piece] + defer t.publishPieceChange(piece) defer p.Event.Broadcast() if correct { p.Priority = PiecePriorityNone diff --git a/client_test.go b/client_test.go index 47f1371594fb16a4fb2024b455c7ed899f25d5d7..be387f78712d907d79c6eda9f04a25d88167685f 100644 --- a/client_test.go +++ b/client_test.go @@ -274,6 +274,16 @@ ret = TorrentSpecFromMetaInfo(mi) ret.ChunkSize = 2 return }()) + // TODO: The piece state publishing is kinda jammed in here until I have a + // more thorough test. + go func() { + s := leecherGreeting.pieceStateChanges.Subscribe() + defer s.Close() + for i := range s.Values { + log.Print(i) + } + log.Print("finished") + }() leecherGreeting.AddPeers([]Peer{ Peer{ IP: missinggo.AddrIP(seeder.ListenAddr()), diff --git a/piece.go b/piece.go index 3892155235485925def628def99d7be14142ae0d..408dce376911ad73c0c469b9c1b953e047b2f80d 100644 --- a/piece.go +++ b/piece.go @@ -30,6 +30,7 @@ QueuedForHash bool EverHashed bool Event sync.Cond Priority piecePriority + PublicPieceState PieceState pendingWritesMutex sync.Mutex pendingWrites int diff --git a/t.go b/t.go index c845277c645d1595f4f4d41f8248dae6bc7fb97d..87ae5d82e5a26a7a4595ebd1ae03b90751cb6568 100644 --- a/t.go +++ b/t.go @@ -1,6 +1,7 @@ package torrent import ( + "github.com/anacrolix/missinggo/pubsub" "github.com/anacrolix/torrent/metainfo" ) @@ -68,3 +69,7 @@ t.cl.mu.RLock() defer t.cl.mu.RUnlock() return t.bytesCompleted() } + +func (t Torrent) SubscribePieceStateChanges() *pubsub.Subscription { + return t.torrent.pieceStateChanges.Subscribe() +} diff --git a/torrent.go b/torrent.go index d1034f82853ad6a27392d6de6ced669786f88597..b0c607b05bf2d23aa19c0b8db4cdc8833366f9cc 100644 --- a/torrent.go +++ b/torrent.go @@ -11,6 +11,7 @@ "sync" "time" "github.com/anacrolix/missinggo" + "github.com/anacrolix/missinggo/pubsub" "github.com/bradfitz/iter" "github.com/anacrolix/torrent/bencode" @@ -60,9 +61,11 @@ // Closed when no more network activity is desired. This includes // announcing, and communicating with peers. ceasingNetworking chan struct{} - InfoHash InfoHash - Pieces []*piece - chunkSize pp.Integer + InfoHash InfoHash + Pieces []*piece + // Values are the piece indices that changed. + pieceStateChanges *pubsub.PubSub + chunkSize pp.Integer // Chunks that are wanted before all others. This is for // responsive/streaming readers that want to unblock ASAP. urgent map[request]struct{} @@ -540,6 +543,7 @@ } for _, conn := range t.Conns { conn.Close() } + t.pieceStateChanges.Close() return } @@ -761,3 +765,12 @@ } } return nil } + +func (t *torrent) publishPieceChange(piece int) { + cur := t.pieceState(piece) + p := t.Pieces[piece] + if cur != p.PublicPieceState { + t.pieceStateChanges.Publish(piece) + } + p.PublicPieceState = cur +}