var closeGroup sync.WaitGroup // For concurrent cleanup to complete before returning
cl.lock()
for t := range cl.torrents {
- // Can we not modify cl.torrents as we delete from it?
- err := cl.dropTorrent(t, &closeGroup)
- if err != nil {
- errs = append(errs, err)
- }
+ cl.dropTorrent(t, &closeGroup)
}
+ // Can we not modify cl.torrents as we delete from it?
panicif.NotZero(len(cl.torrents))
panicif.NotZero(len(cl.torrentsByShortHash))
cl.clearPortMappings()
opts.ChunkSize = defaultChunkSize
}
t.setChunkSize(opts.ChunkSize)
+ cl.torrents[t] = struct{}{}
return
}
}
})
cl.torrentsByShortHash[infoHash] = t
- cl.torrents[t] = struct{}{}
t.setInfoBytesLocked(opts.InfoBytes)
cl.clearAcceptLimits()
t.updateWantPeersEvent()
return errors.Join(t.addPieceLayersLocked(spec.PieceLayers)...)
}
-func (cl *Client) dropTorrent(t *Torrent, wg *sync.WaitGroup) (err error) {
- t.eachShortInfohash(func(short [20]byte) {
- delete(cl.torrentsByShortHash, short)
- })
- err = t.close(wg)
- delete(cl.torrents, t)
- return
+func (cl *Client) dropTorrent(t *Torrent, wg *sync.WaitGroup) {
+ t.close(wg)
}
func (cl *Client) allTorrentsCompleted() bool {
--- /dev/null
+package torrent
func TestHaveAllThenBitfield(t *testing.T) {
cl := newTestingClient(t)
tt := cl.newTorrentForTesting()
- // cl.newConnection()
+ //pc := cl.newConnection(nil, newConnectionOpts{})
pc := PeerConn{
Peer: Peer{t: tt},
}
pc.initRequestState()
pc.legacyPeerImpl = &pc
tt.conns[&pc] = struct{}{}
+ g.InitNew(&pc.callbacks)
qt.Assert(t, qt.IsNil(pc.onPeerSentHaveAll()))
qt.Check(t, qt.DeepEquals(pc.t.connsWithAllPieces, map[*Peer]struct{}{&pc.Peer: {}}))
pc.peerSentBitfield([]bool{false, false, true, false, true, true, false, false})
return int64(t.pieces[piece].bytesLeft())
}
-// Drop the torrent from the client, and close it. It's always safe to do
-// this. No data corruption can, or should occur to either the torrent's data,
-// or connected peers.
+// Drop the torrent from the client, and close it. It's always safe to do this. No data corruption
+// can, or should occur to either the torrent's data, or connected peers.
func (t *Torrent) Drop() {
- var wg sync.WaitGroup
- defer wg.Wait()
+ if t.closed.IsSet() {
+ return
+ }
t.cl.lock()
defer t.cl.unlock()
- err := t.cl.dropTorrent(t, &wg)
- if err != nil {
- panic(err)
+ if t.closed.IsSet() {
+ return
}
+ var wg sync.WaitGroup
+ t.close(&wg)
+ wg.Wait()
}
// Number of bytes of the entire torrent we have completed. This is the sum of
return pieceIndex(t._completedPieces.GetCardinality())
}
-func (t *Torrent) close(wg *sync.WaitGroup) (err error) {
- if !t.closed.Set() {
- err = errors.New("already closed")
- return
- }
+func (t *Torrent) close(wg *sync.WaitGroup) {
+ // Should only be called from the Client.
+ panicif.False(t.closed.Set())
+ t.eachShortInfohash(func(short [20]byte) {
+ delete(t.cl.torrentsByShortHash, short)
+ })
t.closedCtxCancel(errTorrentClosed)
t.getInfoCtxCancel(errTorrentClosed)
for _, f := range t.onClose {
t.cl.event.Broadcast()
t.pieceStateChanges.Close()
t.updateWantPeersEvent()
+ g.MustDelete(t.cl.torrents, t)
return
}