cn := cl.newConnection(r, true)
cn.setTorrent(t)
mrlErr := make(chan error)
- cl.mu.Lock()
+ msg := pp.Message{
+ Type: pp.Piece,
+ Piece: make([]byte, defaultChunkSize),
+ }
go func() {
+ cl.mu.Lock()
err := cn.mainReadLoop()
if err != nil {
mrlErr <- err
}
close(mrlErr)
}()
- msg := pp.Message{
- Type: pp.Piece,
- Piece: make([]byte, defaultChunkSize),
- }
- wb, err := msg.MarshalBinary()
- require.NoError(b, err)
+ wb := msg.MustMarshalBinary()
b.SetBytes(int64(len(msg.Piece)))
- ts.writeSem.Lock()
- for range iter.N(b.N) {
- cl.mu.Lock()
- t.pieces[0].dirtyChunks.Clear()
- cl.mu.Unlock()
- n, err := w.Write(wb)
- require.NoError(b, err)
- require.EqualValues(b, len(wb), n)
+ go func() {
+ defer w.Close()
ts.writeSem.Lock()
- }
- w.Close()
+ for range iter.N(b.N) {
+ cl.mu.Lock()
+ // The chunk must be written to storage everytime, to ensure the
+ // writeSem is unlocked.
+ t.pieces[0].dirtyChunks.Clear()
+ cn.validReceiveChunks = map[request]struct{}{newRequestFromMessage(&msg): struct{}{}}
+ cl.mu.Unlock()
+ n, err := w.Write(wb)
+ require.NoError(b, err)
+ require.EqualValues(b, len(wb), n)
+ ts.writeSem.Lock()
+ }
+ }()
require.NoError(b, <-mrlErr)
require.EqualValues(b, b.N, cn.stats.ChunksReadUseful.Int64())
}