]> Sergey Matveev's repositories - btrtrc.git/blob - peerconn_test.go
Ability to override fifos/
[btrtrc.git] / peerconn_test.go
1 package torrent
2
3 import (
4         "encoding/binary"
5         "errors"
6         "fmt"
7         "golang.org/x/time/rate"
8         "io"
9         "net"
10         "sync"
11         "testing"
12
13         "github.com/frankban/quicktest"
14         qt "github.com/frankban/quicktest"
15         "github.com/stretchr/testify/require"
16
17         "github.com/anacrolix/torrent/metainfo"
18         pp "github.com/anacrolix/torrent/peer_protocol"
19         "github.com/anacrolix/torrent/storage"
20 )
21
22 // Ensure that no race exists between sending a bitfield, and a subsequent
23 // Have that would potentially alter it.
24 func TestSendBitfieldThenHave(t *testing.T) {
25         var cl Client
26         cl.init(TestingConfig(t))
27         cl.initLogger()
28         c := cl.newConnection(nil, newConnectionOpts{network: "io.Pipe"})
29         c.setTorrent(cl.newTorrent(metainfo.Hash{}, nil))
30         if err := c.t.setInfo(&metainfo.Info{Pieces: make([]byte, metainfo.HashSize*3)}); err != nil {
31                 t.Log(err)
32         }
33         r, w := io.Pipe()
34         // c.r = r
35         c.w = w
36         c.startMessageWriter()
37         c.locker().Lock()
38         c.t._completedPieces.Add(1)
39         c.postBitfield( /*[]bool{false, true, false}*/ )
40         c.locker().Unlock()
41         c.locker().Lock()
42         c.have(2)
43         c.locker().Unlock()
44         b := make([]byte, 15)
45         n, err := io.ReadFull(r, b)
46         c.locker().Lock()
47         // This will cause connection.writer to terminate.
48         c.closed.Set()
49         c.locker().Unlock()
50         require.NoError(t, err)
51         require.EqualValues(t, 15, n)
52         // Here we see that the bitfield doesn't have piece 2 set, as that should
53         // arrive in the following Have message.
54         require.EqualValues(t, "\x00\x00\x00\x02\x05@\x00\x00\x00\x05\x04\x00\x00\x00\x02", string(b))
55 }
56
57 type torrentStorage struct {
58         writeSem sync.Mutex
59 }
60
61 func (me *torrentStorage) Close() error { return nil }
62
63 func (me *torrentStorage) Piece(mp metainfo.Piece) storage.PieceImpl {
64         return me
65 }
66
67 func (me *torrentStorage) Completion() storage.Completion {
68         return storage.Completion{}
69 }
70
71 func (me *torrentStorage) MarkComplete() error {
72         return nil
73 }
74
75 func (me *torrentStorage) MarkNotComplete() error {
76         return nil
77 }
78
79 func (me *torrentStorage) ReadAt([]byte, int64) (int, error) {
80         panic("shouldn't be called")
81 }
82
83 func (me *torrentStorage) WriteAt(b []byte, _ int64) (int, error) {
84         if len(b) != defaultChunkSize {
85                 panic(len(b))
86         }
87         me.writeSem.Unlock()
88         return len(b), nil
89 }
90
91 func BenchmarkConnectionMainReadLoop(b *testing.B) {
92         c := quicktest.New(b)
93         var cl Client
94         cl.init(&ClientConfig{
95                 DownloadRateLimiter: unlimited,
96         })
97         cl.initLogger()
98         ts := &torrentStorage{}
99         t := cl.newTorrent(metainfo.Hash{}, nil)
100         t.initialPieceCheckDisabled = true
101         require.NoError(b, t.setInfo(&metainfo.Info{
102                 Pieces:      make([]byte, 20),
103                 Length:      1 << 20,
104                 PieceLength: 1 << 20,
105         }))
106         t.storage = &storage.Torrent{TorrentImpl: storage.TorrentImpl{Piece: ts.Piece, Close: ts.Close}}
107         t.onSetInfo()
108         t._pendingPieces.Add(0)
109         r, w := net.Pipe()
110         cn := cl.newConnection(r, newConnectionOpts{
111                 outgoing:   true,
112                 remoteAddr: r.RemoteAddr(),
113                 network:    r.RemoteAddr().Network(),
114                 connString: regularNetConnPeerConnConnString(r),
115         })
116         cn.setTorrent(t)
117         mrlErrChan := make(chan error)
118         msg := pp.Message{
119                 Type:  pp.Piece,
120                 Piece: make([]byte, defaultChunkSize),
121         }
122         go func() {
123                 cl.lock()
124                 err := cn.mainReadLoop()
125                 if err != nil {
126                         mrlErrChan <- err
127                 }
128                 close(mrlErrChan)
129         }()
130         wb := msg.MustMarshalBinary()
131         b.SetBytes(int64(len(msg.Piece)))
132         go func() {
133                 ts.writeSem.Lock()
134                 for i := 0; i < b.N; i += 1 {
135                         cl.lock()
136                         // The chunk must be written to storage everytime, to ensure the
137                         // writeSem is unlocked.
138                         t.pendAllChunkSpecs(0)
139                         cn.validReceiveChunks = map[RequestIndex]int{
140                                 t.requestIndexFromRequest(newRequestFromMessage(&msg)): 1,
141                         }
142                         cl.unlock()
143                         n, err := w.Write(wb)
144                         require.NoError(b, err)
145                         require.EqualValues(b, len(wb), n)
146                         ts.writeSem.Lock()
147                 }
148                 if err := w.Close(); err != nil {
149                         panic(err)
150                 }
151         }()
152         mrlErr := <-mrlErrChan
153         if mrlErr != nil && !errors.Is(mrlErr, io.EOF) {
154                 c.Fatal(mrlErr)
155         }
156         c.Assert(cn._stats.ChunksReadUseful.Int64(), quicktest.Equals, int64(b.N))
157 }
158
159 func TestConnPexPeerFlags(t *testing.T) {
160         var (
161                 tcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848}
162                 udpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848}
163         )
164         testcases := []struct {
165                 conn *PeerConn
166                 f    pp.PexPeerFlags
167         }{
168                 {&PeerConn{Peer: Peer{outgoing: false, PeerPrefersEncryption: false}}, 0},
169                 {&PeerConn{Peer: Peer{outgoing: false, PeerPrefersEncryption: true}}, pp.PexPrefersEncryption},
170                 {&PeerConn{Peer: Peer{outgoing: true, PeerPrefersEncryption: false}}, pp.PexOutgoingConn},
171                 {&PeerConn{Peer: Peer{outgoing: true, PeerPrefersEncryption: true}}, pp.PexOutgoingConn | pp.PexPrefersEncryption},
172                 {&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}}, pp.PexSupportsUtp},
173                 {&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network(), outgoing: true}}, pp.PexOutgoingConn | pp.PexSupportsUtp},
174                 {&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true}}, pp.PexOutgoingConn},
175                 {&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network()}}, 0},
176         }
177         for i, tc := range testcases {
178                 f := tc.conn.pexPeerFlags()
179                 require.EqualValues(t, tc.f, f, i)
180         }
181 }
182
183 func TestConnPexEvent(t *testing.T) {
184         var (
185                 udpAddr     = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848}
186                 tcpAddr     = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848}
187                 dialTcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4747}
188                 dialUdpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4747}
189         )
190         testcases := []struct {
191                 t pexEventType
192                 c *PeerConn
193                 e pexEvent
194         }{
195                 {
196                         pexAdd,
197                         &PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}},
198                         pexEvent{pexAdd, udpAddr, pp.PexSupportsUtp, nil},
199                 },
200                 {
201                         pexDrop,
202                         &PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true, PeerListenPort: dialTcpAddr.Port}},
203                         pexEvent{pexDrop, tcpAddr, pp.PexOutgoingConn, nil},
204                 },
205                 {
206                         pexAdd,
207                         &PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), PeerListenPort: dialTcpAddr.Port}},
208                         pexEvent{pexAdd, dialTcpAddr, 0, nil},
209                 },
210                 {
211                         pexDrop,
212                         &PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network(), PeerListenPort: dialUdpAddr.Port}},
213                         pexEvent{pexDrop, dialUdpAddr, pp.PexSupportsUtp, nil},
214                 },
215         }
216         for i, tc := range testcases {
217                 e := tc.c.pexEvent(tc.t)
218                 require.EqualValues(t, tc.e, e, i)
219         }
220 }
221
222 func TestHaveAllThenBitfield(t *testing.T) {
223         c := qt.New(t)
224         cl := newTestingClient(t)
225         tt := cl.newTorrentForTesting()
226         // cl.newConnection()
227         pc := PeerConn{
228                 Peer: Peer{t: tt},
229         }
230         pc.initRequestState()
231         pc.peerImpl = &pc
232         tt.conns[&pc] = struct{}{}
233         c.Assert(pc.onPeerSentHaveAll(), qt.IsNil)
234         c.Check(pc.t.connsWithAllPieces, qt.DeepEquals, map[*Peer]struct{}{&pc.Peer: {}})
235         pc.peerSentBitfield([]bool{false, false, true, false, true, true, false, false})
236         c.Check(pc.peerMinPieces, qt.Equals, 6)
237         c.Check(pc.t.connsWithAllPieces, qt.HasLen, 0)
238         c.Assert(pc.t.setInfo(&metainfo.Info{
239                 PieceLength: 0,
240                 Pieces:      make([]byte, pieceHash.Size()*7),
241         }), qt.IsNil)
242         pc.t.onSetInfo()
243         c.Check(tt.numPieces(), qt.Equals, 7)
244         c.Check(tt.pieceAvailabilityRuns(), qt.DeepEquals, []pieceAvailabilityRun{
245                 // The last element of the bitfield is irrelevant, as the Torrent actually only has 7
246                 // pieces.
247                 {2, 0}, {1, 1}, {1, 0}, {2, 1}, {1, 0},
248         })
249 }
250
251 func TestApplyRequestStateWriteBufferConstraints(t *testing.T) {
252         c := qt.New(t)
253         c.Check(interestedMsgLen, qt.Equals, 5)
254         c.Check(requestMsgLen, qt.Equals, 17)
255         c.Check(maxLocalToRemoteRequests >= 8, qt.IsTrue)
256         c.Logf("max local to remote requests: %v", maxLocalToRemoteRequests)
257 }
258
259 func peerConnForPreferredNetworkDirection(localPeerId, remotePeerId int, outgoing, utp, ipv6 bool) *PeerConn {
260         pc := PeerConn{}
261         pc.outgoing = outgoing
262         if utp {
263                 pc.Network = "udp"
264         }
265         if ipv6 {
266                 pc.RemoteAddr = &net.TCPAddr{IP: net.ParseIP(fmt.Sprintf("::420"))}
267         } else {
268                 pc.RemoteAddr = &net.TCPAddr{IP: net.IPv4(1, 2, 3, 4)}
269         }
270         binary.BigEndian.PutUint64(pc.PeerID[:], uint64(remotePeerId))
271         cl := Client{}
272         binary.BigEndian.PutUint64(cl.peerID[:], uint64(localPeerId))
273         pc.t = &Torrent{cl: &cl}
274         return &pc
275 }
276
277 func TestPreferredNetworkDirection(t *testing.T) {
278         pc := peerConnForPreferredNetworkDirection
279         c := qt.New(t)
280         // Prefer outgoing to higher peer ID
281         c.Assert(pc(1, 2, true, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)), qt.IsTrue)
282         c.Assert(pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, true, false, false)), qt.IsFalse)
283         c.Assert(pc(2, 1, false, false, false).hasPreferredNetworkOver(pc(2, 1, true, false, false)), qt.IsTrue)
284         // Don't prefer uTP
285         c.Assert(pc(1, 2, false, true, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)), qt.IsFalse)
286         // Prefer IPv6
287         c.Assert(pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, true)), qt.IsFalse)
288         // No difference
289         c.Assert(pc(1, 2, false, false, false).hasPreferredNetworkOver(pc(1, 2, false, false, false)), qt.IsFalse)
290 }
291
292 func TestReceiveLargeRequest(t *testing.T) {
293         c := qt.New(t)
294         cl := newTestingClient(t)
295         pc := cl.newConnection(nil, newConnectionOpts{network: "test"})
296         tor := cl.newTorrentForTesting()
297         tor.info = &metainfo.Info{PieceLength: 3 << 20}
298         pc.setTorrent(tor)
299         tor._completedPieces.Add(0)
300         pc.PeerExtensionBytes.SetBit(pp.ExtensionBitFast, true)
301         pc.choking = false
302         pc.initMessageWriter()
303         req := Request{}
304         req.Length = defaultChunkSize
305         c.Assert(pc.fastEnabled(), qt.IsTrue)
306         c.Check(pc.onReadRequest(req, false), qt.IsNil)
307         c.Check(pc.peerRequests, qt.HasLen, 1)
308         req.Length = 2 << 20
309         c.Check(pc.onReadRequest(req, false), qt.IsNil)
310         c.Check(pc.peerRequests, qt.HasLen, 2)
311         pc.peerRequests = nil
312         pc.t.cl.config.UploadRateLimiter = rate.NewLimiter(1, defaultChunkSize)
313         req.Length = defaultChunkSize
314         c.Check(pc.onReadRequest(req, false), qt.IsNil)
315         c.Check(pc.peerRequests, qt.HasLen, 1)
316         req.Length = 2 << 20
317         c.Check(pc.onReadRequest(req, false), qt.IsNil)
318         c.Check(pc.messageWriter.writeBuffer.Len(), qt.Equals, 17)
319 }