]> Sergey Matveev's repositories - btrtrc.git/blob - peerconn_test.go
Use relative availabilities to determine piece request order
[btrtrc.git] / peerconn_test.go
1 package torrent
2
3 import (
4         "errors"
5         "io"
6         "net"
7         "sync"
8         "testing"
9
10         "github.com/frankban/quicktest"
11         qt "github.com/frankban/quicktest"
12         "github.com/stretchr/testify/require"
13
14         "github.com/anacrolix/torrent/metainfo"
15         pp "github.com/anacrolix/torrent/peer_protocol"
16         "github.com/anacrolix/torrent/storage"
17 )
18
19 // Ensure that no race exists between sending a bitfield, and a subsequent
20 // Have that would potentially alter it.
21 func TestSendBitfieldThenHave(t *testing.T) {
22         var cl Client
23         cl.init(TestingConfig(t))
24         cl.initLogger()
25         c := cl.newConnection(nil, false, nil, "io.Pipe", "")
26         c.setTorrent(cl.newTorrent(metainfo.Hash{}, nil))
27         if err := c.t.setInfo(&metainfo.Info{Pieces: make([]byte, metainfo.HashSize*3)}); err != nil {
28                 t.Log(err)
29         }
30         r, w := io.Pipe()
31         // c.r = r
32         c.w = w
33         c.startWriter()
34         c.locker().Lock()
35         c.t._completedPieces.Add(1)
36         c.postBitfield( /*[]bool{false, true, false}*/ )
37         c.locker().Unlock()
38         c.locker().Lock()
39         c.have(2)
40         c.locker().Unlock()
41         b := make([]byte, 15)
42         n, err := io.ReadFull(r, b)
43         c.locker().Lock()
44         // This will cause connection.writer to terminate.
45         c.closed.Set()
46         c.locker().Unlock()
47         require.NoError(t, err)
48         require.EqualValues(t, 15, n)
49         // Here we see that the bitfield doesn't have piece 2 set, as that should
50         // arrive in the following Have message.
51         require.EqualValues(t, "\x00\x00\x00\x02\x05@\x00\x00\x00\x05\x04\x00\x00\x00\x02", string(b))
52 }
53
54 type torrentStorage struct {
55         writeSem sync.Mutex
56 }
57
58 func (me *torrentStorage) Close() error { return nil }
59
60 func (me *torrentStorage) Piece(mp metainfo.Piece) storage.PieceImpl {
61         return me
62 }
63
64 func (me *torrentStorage) Completion() storage.Completion {
65         return storage.Completion{}
66 }
67
68 func (me *torrentStorage) MarkComplete() error {
69         return nil
70 }
71
72 func (me *torrentStorage) MarkNotComplete() error {
73         return nil
74 }
75
76 func (me *torrentStorage) ReadAt([]byte, int64) (int, error) {
77         panic("shouldn't be called")
78 }
79
80 func (me *torrentStorage) WriteAt(b []byte, _ int64) (int, error) {
81         if len(b) != defaultChunkSize {
82                 panic(len(b))
83         }
84         me.writeSem.Unlock()
85         return len(b), nil
86 }
87
88 func BenchmarkConnectionMainReadLoop(b *testing.B) {
89         c := quicktest.New(b)
90         var cl Client
91         cl.init(&ClientConfig{
92                 DownloadRateLimiter: unlimited,
93         })
94         cl.initLogger()
95         ts := &torrentStorage{}
96         t := cl.newTorrent(metainfo.Hash{}, nil)
97         t.initialPieceCheckDisabled = true
98         require.NoError(b, t.setInfo(&metainfo.Info{
99                 Pieces:      make([]byte, 20),
100                 Length:      1 << 20,
101                 PieceLength: 1 << 20,
102         }))
103         t.storage = &storage.Torrent{TorrentImpl: storage.TorrentImpl{Piece: ts.Piece, Close: ts.Close}}
104         t.onSetInfo()
105         t._pendingPieces.Add(0)
106         r, w := net.Pipe()
107         cn := cl.newConnection(r, true, r.RemoteAddr(), r.RemoteAddr().Network(), regularNetConnPeerConnConnString(r))
108         cn.setTorrent(t)
109         mrlErrChan := make(chan error)
110         msg := pp.Message{
111                 Type:  pp.Piece,
112                 Piece: make([]byte, defaultChunkSize),
113         }
114         go func() {
115                 cl.lock()
116                 err := cn.mainReadLoop()
117                 if err != nil {
118                         mrlErrChan <- err
119                 }
120                 close(mrlErrChan)
121         }()
122         wb := msg.MustMarshalBinary()
123         b.SetBytes(int64(len(msg.Piece)))
124         go func() {
125                 ts.writeSem.Lock()
126                 for i := 0; i < b.N; i += 1 {
127                         cl.lock()
128                         // The chunk must be written to storage everytime, to ensure the
129                         // writeSem is unlocked.
130                         t.pendAllChunkSpecs(0)
131                         cn.validReceiveChunks = map[RequestIndex]int{
132                                 t.requestIndexFromRequest(newRequestFromMessage(&msg)): 1,
133                         }
134                         cl.unlock()
135                         n, err := w.Write(wb)
136                         require.NoError(b, err)
137                         require.EqualValues(b, len(wb), n)
138                         ts.writeSem.Lock()
139                 }
140                 if err := w.Close(); err != nil {
141                         panic(err)
142                 }
143         }()
144         mrlErr := <-mrlErrChan
145         if mrlErr != nil && !errors.Is(mrlErr, io.EOF) {
146                 c.Fatal(mrlErr)
147         }
148         c.Assert(cn._stats.ChunksReadUseful.Int64(), quicktest.Equals, int64(b.N))
149 }
150
151 func TestConnPexPeerFlags(t *testing.T) {
152         var (
153                 tcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848}
154                 udpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848}
155         )
156         testcases := []struct {
157                 conn *PeerConn
158                 f    pp.PexPeerFlags
159         }{
160                 {&PeerConn{Peer: Peer{outgoing: false, PeerPrefersEncryption: false}}, 0},
161                 {&PeerConn{Peer: Peer{outgoing: false, PeerPrefersEncryption: true}}, pp.PexPrefersEncryption},
162                 {&PeerConn{Peer: Peer{outgoing: true, PeerPrefersEncryption: false}}, pp.PexOutgoingConn},
163                 {&PeerConn{Peer: Peer{outgoing: true, PeerPrefersEncryption: true}}, pp.PexOutgoingConn | pp.PexPrefersEncryption},
164                 {&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}}, pp.PexSupportsUtp},
165                 {&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network(), outgoing: true}}, pp.PexOutgoingConn | pp.PexSupportsUtp},
166                 {&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true}}, pp.PexOutgoingConn},
167                 {&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network()}}, 0},
168         }
169         for i, tc := range testcases {
170                 f := tc.conn.pexPeerFlags()
171                 require.EqualValues(t, tc.f, f, i)
172         }
173 }
174
175 func TestConnPexEvent(t *testing.T) {
176         var (
177                 udpAddr     = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848}
178                 tcpAddr     = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848}
179                 dialTcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4747}
180                 dialUdpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4747}
181         )
182         testcases := []struct {
183                 t pexEventType
184                 c *PeerConn
185                 e pexEvent
186         }{
187                 {
188                         pexAdd,
189                         &PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}},
190                         pexEvent{pexAdd, udpAddr, pp.PexSupportsUtp, nil},
191                 },
192                 {
193                         pexDrop,
194                         &PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true, PeerListenPort: dialTcpAddr.Port}},
195                         pexEvent{pexDrop, tcpAddr, pp.PexOutgoingConn, nil},
196                 },
197                 {
198                         pexAdd,
199                         &PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), PeerListenPort: dialTcpAddr.Port}},
200                         pexEvent{pexAdd, dialTcpAddr, 0, nil},
201                 },
202                 {
203                         pexDrop,
204                         &PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network(), PeerListenPort: dialUdpAddr.Port}},
205                         pexEvent{pexDrop, dialUdpAddr, pp.PexSupportsUtp, nil},
206                 },
207         }
208         for i, tc := range testcases {
209                 e := tc.c.pexEvent(tc.t)
210                 require.EqualValues(t, tc.e, e, i)
211         }
212 }
213
214 func TestHaveAllThenBitfield(t *testing.T) {
215         c := qt.New(t)
216         cl := newTestingClient(t)
217         tt := cl.newTorrentForTesting()
218         // cl.newConnection()
219         pc := PeerConn{
220                 Peer: Peer{t: tt},
221         }
222         pc.peerImpl = &pc
223         tt.conns[&pc] = struct{}{}
224         c.Assert(pc.onPeerSentHaveAll(), qt.IsNil)
225         c.Check(pc.t.connsWithAllPieces, qt.DeepEquals, map[*Peer]struct{}{&pc.Peer: {}})
226         pc.peerSentBitfield([]bool{false, false, true, false, true, true, false, false})
227         c.Check(pc.peerMinPieces, qt.Equals, 6)
228         c.Check(pc.t.connsWithAllPieces, qt.HasLen, 0)
229         c.Assert(pc.t.setInfo(&metainfo.Info{
230                 PieceLength: 0,
231                 Pieces:      make([]byte, pieceHash.Size()*7),
232         }), qt.IsNil)
233         pc.t.onSetInfo()
234         c.Check(tt.numPieces(), qt.Equals, 7)
235         c.Check(tt.pieceAvailabilityRuns(), qt.DeepEquals, []pieceAvailabilityRun{
236                 // The last element of the bitfield is irrelevant, as the Torrent actually only has 7
237                 // pieces.
238                 {2, 0}, {1, 1}, {1, 0}, {2, 1}, {1, 0},
239         })
240 }