]> Sergey Matveev's repositories - btrtrc.git/blob - connection_test.go
a6c3f318bd16c93ff1efb4e17696c0074029c4f3
[btrtrc.git] / connection_test.go
1 package torrent
2
3 import (
4         "io"
5         "net"
6         "sync"
7         "testing"
8         "time"
9
10         "github.com/anacrolix/missinggo/pubsub"
11         "github.com/bradfitz/iter"
12         "github.com/stretchr/testify/require"
13
14         "github.com/anacrolix/torrent/metainfo"
15         pp "github.com/anacrolix/torrent/peer_protocol"
16         "github.com/anacrolix/torrent/storage"
17 )
18
19 // Ensure that no race exists between sending a bitfield, and a subsequent
20 // Have that would potentially alter it.
21 func TestSendBitfieldThenHave(t *testing.T) {
22         r, w := io.Pipe()
23         cl := Client{
24                 config: &ClientConfig{DownloadRateLimiter: unlimited},
25         }
26         cl.initLogger()
27         c := cl.newConnection(nil, false)
28         c.setTorrent(cl.newTorrent(metainfo.Hash{}, nil))
29         c.t.setInfo(&metainfo.Info{
30                 Pieces: make([]byte, metainfo.HashSize*3),
31         })
32         c.r = r
33         c.w = w
34         go c.writer(time.Minute)
35         c.mu().Lock()
36         c.t.completedPieces.Add(1)
37         c.PostBitfield( /*[]bool{false, true, false}*/ )
38         c.mu().Unlock()
39         c.mu().Lock()
40         c.Have(2)
41         c.mu().Unlock()
42         b := make([]byte, 15)
43         n, err := io.ReadFull(r, b)
44         c.mu().Lock()
45         // This will cause connection.writer to terminate.
46         c.closed.Set()
47         c.mu().Unlock()
48         require.NoError(t, err)
49         require.EqualValues(t, 15, n)
50         // Here we see that the bitfield doesn't have piece 2 set, as that should
51         // arrive in the following Have message.
52         require.EqualValues(t, "\x00\x00\x00\x02\x05@\x00\x00\x00\x05\x04\x00\x00\x00\x02", string(b))
53 }
54
55 type torrentStorage struct {
56         writeSem sync.Mutex
57 }
58
59 func (me *torrentStorage) Close() error { return nil }
60
61 func (me *torrentStorage) Piece(mp metainfo.Piece) storage.PieceImpl {
62         return me
63 }
64
65 func (me *torrentStorage) Completion() storage.Completion {
66         return storage.Completion{}
67 }
68
69 func (me *torrentStorage) MarkComplete() error {
70         return nil
71 }
72
73 func (me *torrentStorage) MarkNotComplete() error {
74         return nil
75 }
76
77 func (me *torrentStorage) ReadAt([]byte, int64) (int, error) {
78         panic("shouldn't be called")
79 }
80
81 func (me *torrentStorage) WriteAt(b []byte, _ int64) (int, error) {
82         if len(b) != defaultChunkSize {
83                 panic(len(b))
84         }
85         me.writeSem.Unlock()
86         return len(b), nil
87 }
88
89 func BenchmarkConnectionMainReadLoop(b *testing.B) {
90         cl := &Client{
91                 config: &ClientConfig{
92                         DownloadRateLimiter: unlimited,
93                 },
94         }
95         ts := &torrentStorage{}
96         t := &Torrent{
97                 cl:                cl,
98                 storage:           &storage.Torrent{ts},
99                 pieceStateChanges: pubsub.NewPubSub(),
100         }
101         require.NoError(b, t.setInfo(&metainfo.Info{
102                 Pieces:      make([]byte, 20),
103                 Length:      1 << 20,
104                 PieceLength: 1 << 20,
105         }))
106         t.setChunkSize(defaultChunkSize)
107         t.pendingPieces.Set(0, PiecePriorityNormal.BitmapPriority())
108         r, w := net.Pipe()
109         cn := cl.newConnection(r, true)
110         cn.setTorrent(t)
111         mrlErr := make(chan error)
112         msg := pp.Message{
113                 Type:  pp.Piece,
114                 Piece: make([]byte, defaultChunkSize),
115         }
116         go func() {
117                 cl.mu.Lock()
118                 err := cn.mainReadLoop()
119                 if err != nil {
120                         mrlErr <- err
121                 }
122                 close(mrlErr)
123         }()
124         wb := msg.MustMarshalBinary()
125         b.SetBytes(int64(len(msg.Piece)))
126         go func() {
127                 defer w.Close()
128                 ts.writeSem.Lock()
129                 for range iter.N(b.N) {
130                         cl.mu.Lock()
131                         // The chunk must be written to storage everytime, to ensure the
132                         // writeSem is unlocked.
133                         t.pieces[0].dirtyChunks.Clear()
134                         cn.validReceiveChunks = map[request]struct{}{newRequestFromMessage(&msg): struct{}{}}
135                         cl.mu.Unlock()
136                         n, err := w.Write(wb)
137                         require.NoError(b, err)
138                         require.EqualValues(b, len(wb), n)
139                         ts.writeSem.Lock()
140                 }
141         }()
142         require.NoError(b, <-mrlErr)
143         require.EqualValues(b, b.N, cn.stats.ChunksReadUseful.Int64())
144 }