gotMetainfo: make(chan struct{}),
- HalfOpen: make(map[string]struct{}),
+ HalfOpen: make(map[string]struct{}),
+ pieceStateChanges: pubsub.NewPubSub(),
}
t.wantPeers.L = &t.stateMu
return
// log.Println("got chunk", req)
piece.Event.Broadcast()
+ defer t.publishPieceChange(int(req.Index))
// Record that we have the chunk.
piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
delete(t.urgent, req)
func (me *Client) pieceChanged(t *torrent, piece int) {
correct := t.pieceComplete(piece)
p := t.Pieces[piece]
+ defer t.publishPieceChange(piece)
defer p.Event.Broadcast()
if correct {
p.Priority = PiecePriorityNone
ret.ChunkSize = 2
return
}())
+ // TODO: The piece state publishing is kinda jammed in here until I have a
+ // more thorough test.
+ go func() {
+ s := leecherGreeting.pieceStateChanges.Subscribe()
+ defer s.Close()
+ for i := range s.Values {
+ log.Print(i)
+ }
+ log.Print("finished")
+ }()
leecherGreeting.AddPeers([]Peer{
Peer{
IP: missinggo.AddrIP(seeder.ListenAddr()),
EverHashed bool
Event sync.Cond
Priority piecePriority
+ PublicPieceState PieceState
pendingWritesMutex sync.Mutex
pendingWrites int
package torrent
import (
+ "github.com/anacrolix/missinggo/pubsub"
"github.com/anacrolix/torrent/metainfo"
)
defer t.cl.mu.RUnlock()
return t.bytesCompleted()
}
+
+func (t Torrent) SubscribePieceStateChanges() *pubsub.Subscription {
+ return t.torrent.pieceStateChanges.Subscribe()
+}
"time"
"github.com/anacrolix/missinggo"
+ "github.com/anacrolix/missinggo/pubsub"
"github.com/bradfitz/iter"
"github.com/anacrolix/torrent/bencode"
// announcing, and communicating with peers.
ceasingNetworking chan struct{}
- InfoHash InfoHash
- Pieces []*piece
- chunkSize pp.Integer
+ InfoHash InfoHash
+ Pieces []*piece
+ // Values are the piece indices that changed.
+ pieceStateChanges *pubsub.PubSub
+ chunkSize pp.Integer
// Chunks that are wanted before all others. This is for
// responsive/streaming readers that want to unblock ASAP.
urgent map[request]struct{}
for _, conn := range t.Conns {
conn.Close()
}
+ t.pieceStateChanges.Close()
return
}
}
return nil
}
+
+func (t *torrent) publishPieceChange(piece int) {
+ cur := t.pieceState(piece)
+ p := t.Pieces[piece]
+ if cur != p.PublicPieceState {
+ t.pieceStateChanges.Publish(piece)
+ }
+ p.PublicPieceState = cur
+}