]> Sergey Matveev's repositories - btrtrc.git/blob - peerconn_test.go
Break out peerConnWriter
[btrtrc.git] / peerconn_test.go
1 package torrent
2
3 import (
4         "io"
5         "net"
6         "sync"
7         "testing"
8
9         "github.com/anacrolix/missinggo/pubsub"
10         "github.com/bradfitz/iter"
11         "github.com/frankban/quicktest"
12         "github.com/stretchr/testify/require"
13
14         "github.com/anacrolix/torrent/metainfo"
15         pp "github.com/anacrolix/torrent/peer_protocol"
16         "github.com/anacrolix/torrent/storage"
17 )
18
19 // Ensure that no race exists between sending a bitfield, and a subsequent
20 // Have that would potentially alter it.
21 func TestSendBitfieldThenHave(t *testing.T) {
22         cl := Client{
23                 config: TestingConfig(t),
24         }
25         cl.initLogger()
26         c := cl.newConnection(nil, false, nil, "io.Pipe", "")
27         c.setTorrent(cl.newTorrent(metainfo.Hash{}, nil))
28         c.t.setInfo(&metainfo.Info{
29                 Pieces: make([]byte, metainfo.HashSize*3),
30         })
31         r, w := io.Pipe()
32         //c.r = r
33         c.w = w
34         c.startWriter()
35         c.locker().Lock()
36         c.t._completedPieces.Add(1)
37         c.postBitfield( /*[]bool{false, true, false}*/ )
38         c.locker().Unlock()
39         c.locker().Lock()
40         c.have(2)
41         c.locker().Unlock()
42         b := make([]byte, 15)
43         n, err := io.ReadFull(r, b)
44         c.locker().Lock()
45         // This will cause connection.writer to terminate.
46         c.closed.Set()
47         c.locker().Unlock()
48         require.NoError(t, err)
49         require.EqualValues(t, 15, n)
50         // Here we see that the bitfield doesn't have piece 2 set, as that should
51         // arrive in the following Have message.
52         require.EqualValues(t, "\x00\x00\x00\x02\x05@\x00\x00\x00\x05\x04\x00\x00\x00\x02", string(b))
53 }
54
55 type torrentStorage struct {
56         writeSem sync.Mutex
57 }
58
59 func (me *torrentStorage) Close() error { return nil }
60
61 func (me *torrentStorage) Piece(mp metainfo.Piece) storage.PieceImpl {
62         return me
63 }
64
65 func (me *torrentStorage) Completion() storage.Completion {
66         return storage.Completion{}
67 }
68
69 func (me *torrentStorage) MarkComplete() error {
70         return nil
71 }
72
73 func (me *torrentStorage) MarkNotComplete() error {
74         return nil
75 }
76
77 func (me *torrentStorage) ReadAt([]byte, int64) (int, error) {
78         panic("shouldn't be called")
79 }
80
81 func (me *torrentStorage) WriteAt(b []byte, _ int64) (int, error) {
82         if len(b) != defaultChunkSize {
83                 panic(len(b))
84         }
85         me.writeSem.Unlock()
86         return len(b), nil
87 }
88
89 func BenchmarkConnectionMainReadLoop(b *testing.B) {
90         c := quicktest.New(b)
91         cl := &Client{
92                 config: &ClientConfig{
93                         DownloadRateLimiter: unlimited,
94                 },
95         }
96         cl.initLogger()
97         ts := &torrentStorage{}
98         t := &Torrent{
99                 cl:                cl,
100                 storage:           &storage.Torrent{TorrentImpl: storage.TorrentImpl{Piece: ts.Piece, Close: ts.Close}},
101                 pieceStateChanges: pubsub.NewPubSub(),
102         }
103         require.NoError(b, t.setInfo(&metainfo.Info{
104                 Pieces:      make([]byte, 20),
105                 Length:      1 << 20,
106                 PieceLength: 1 << 20,
107         }))
108         t.setChunkSize(defaultChunkSize)
109         t._pendingPieces.Set(0, PiecePriorityNormal.BitmapPriority())
110         r, w := net.Pipe()
111         cn := cl.newConnection(r, true, r.RemoteAddr(), r.RemoteAddr().Network(), regularNetConnPeerConnConnString(r))
112         cn.setTorrent(t)
113         mrlErr := make(chan error)
114         msg := pp.Message{
115                 Type:  pp.Piece,
116                 Piece: make([]byte, defaultChunkSize),
117         }
118         go func() {
119                 cl.lock()
120                 err := cn.mainReadLoop()
121                 if err != nil {
122                         mrlErr <- err
123                 }
124                 close(mrlErr)
125         }()
126         wb := msg.MustMarshalBinary()
127         b.SetBytes(int64(len(msg.Piece)))
128         go func() {
129                 defer w.Close()
130                 ts.writeSem.Lock()
131                 for range iter.N(b.N) {
132                         cl.lock()
133                         // The chunk must be written to storage everytime, to ensure the
134                         // writeSem is unlocked.
135                         t.pieces[0]._dirtyChunks.Clear()
136                         cn.validReceiveChunks = map[Request]int{newRequestFromMessage(&msg): 1}
137                         cl.unlock()
138                         n, err := w.Write(wb)
139                         require.NoError(b, err)
140                         require.EqualValues(b, len(wb), n)
141                         ts.writeSem.Lock()
142                 }
143         }()
144         c.Assert([]error{nil, io.EOF}, quicktest.Contains, <-mrlErr)
145         c.Assert(cn._stats.ChunksReadUseful.Int64(), quicktest.Equals, int64(b.N))
146 }
147
148 func TestConnPexPeerFlags(t *testing.T) {
149         var (
150                 tcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848}
151                 udpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848}
152         )
153         var testcases = []struct {
154                 conn *PeerConn
155                 f    pp.PexPeerFlags
156         }{
157                 {&PeerConn{Peer: Peer{outgoing: false, PeerPrefersEncryption: false}}, 0},
158                 {&PeerConn{Peer: Peer{outgoing: false, PeerPrefersEncryption: true}}, pp.PexPrefersEncryption},
159                 {&PeerConn{Peer: Peer{outgoing: true, PeerPrefersEncryption: false}}, pp.PexOutgoingConn},
160                 {&PeerConn{Peer: Peer{outgoing: true, PeerPrefersEncryption: true}}, pp.PexOutgoingConn | pp.PexPrefersEncryption},
161                 {&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}}, pp.PexSupportsUtp},
162                 {&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network(), outgoing: true}}, pp.PexOutgoingConn | pp.PexSupportsUtp},
163                 {&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true}}, pp.PexOutgoingConn},
164                 {&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network()}}, 0},
165         }
166         for i, tc := range testcases {
167                 f := tc.conn.pexPeerFlags()
168                 require.EqualValues(t, tc.f, f, i)
169         }
170 }
171
172 func TestConnPexEvent(t *testing.T) {
173         var (
174                 udpAddr     = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848}
175                 tcpAddr     = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848}
176                 dialTcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4747}
177                 dialUdpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4747}
178         )
179         var testcases = []struct {
180                 t pexEventType
181                 c *PeerConn
182                 e pexEvent
183         }{
184                 {
185                         pexAdd,
186                         &PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}},
187                         pexEvent{pexAdd, udpAddr, pp.PexSupportsUtp},
188                 },
189                 {
190                         pexDrop,
191                         &PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true, PeerListenPort: dialTcpAddr.Port}},
192                         pexEvent{pexDrop, tcpAddr, pp.PexOutgoingConn},
193                 },
194                 {
195                         pexAdd,
196                         &PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), PeerListenPort: dialTcpAddr.Port}},
197                         pexEvent{pexAdd, dialTcpAddr, 0},
198                 },
199                 {
200                         pexDrop,
201                         &PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network(), PeerListenPort: dialUdpAddr.Port}},
202                         pexEvent{pexDrop, dialUdpAddr, pp.PexSupportsUtp},
203                 },
204         }
205         for i, tc := range testcases {
206                 e := tc.c.pexEvent(tc.t)
207                 require.EqualValues(t, tc.e, e, i)
208         }
209 }