]> Sergey Matveev's repositories - btrtrc.git/blob - peerconn_test.go
Rework storage.TorrentImpl to support shared capacity key
[btrtrc.git] / peerconn_test.go
1 package torrent
2
3 import (
4         "io"
5         "net"
6         "sync"
7         "testing"
8         "time"
9
10         "github.com/anacrolix/missinggo/pubsub"
11         "github.com/bradfitz/iter"
12         "github.com/frankban/quicktest"
13         "github.com/stretchr/testify/require"
14
15         "github.com/anacrolix/torrent/metainfo"
16         pp "github.com/anacrolix/torrent/peer_protocol"
17         "github.com/anacrolix/torrent/storage"
18 )
19
20 // Ensure that no race exists between sending a bitfield, and a subsequent
21 // Have that would potentially alter it.
22 func TestSendBitfieldThenHave(t *testing.T) {
23         cl := Client{
24                 config: TestingConfig(t),
25         }
26         cl.initLogger()
27         c := cl.newConnection(nil, false, nil, "io.Pipe", "")
28         c.setTorrent(cl.newTorrent(metainfo.Hash{}, nil))
29         c.t.setInfo(&metainfo.Info{
30                 Pieces: make([]byte, metainfo.HashSize*3),
31         })
32         r, w := io.Pipe()
33         //c.r = r
34         c.w = w
35         go c.writer(time.Minute)
36         c.locker().Lock()
37         c.t._completedPieces.Add(1)
38         c.postBitfield( /*[]bool{false, true, false}*/ )
39         c.locker().Unlock()
40         c.locker().Lock()
41         c.have(2)
42         c.locker().Unlock()
43         b := make([]byte, 15)
44         n, err := io.ReadFull(r, b)
45         c.locker().Lock()
46         // This will cause connection.writer to terminate.
47         c.closed.Set()
48         c.locker().Unlock()
49         require.NoError(t, err)
50         require.EqualValues(t, 15, n)
51         // Here we see that the bitfield doesn't have piece 2 set, as that should
52         // arrive in the following Have message.
53         require.EqualValues(t, "\x00\x00\x00\x02\x05@\x00\x00\x00\x05\x04\x00\x00\x00\x02", string(b))
54 }
55
56 type torrentStorage struct {
57         writeSem sync.Mutex
58 }
59
60 func (me *torrentStorage) Close() error { return nil }
61
62 func (me *torrentStorage) Piece(mp metainfo.Piece) storage.PieceImpl {
63         return me
64 }
65
66 func (me *torrentStorage) Completion() storage.Completion {
67         return storage.Completion{}
68 }
69
70 func (me *torrentStorage) MarkComplete() error {
71         return nil
72 }
73
74 func (me *torrentStorage) MarkNotComplete() error {
75         return nil
76 }
77
78 func (me *torrentStorage) ReadAt([]byte, int64) (int, error) {
79         panic("shouldn't be called")
80 }
81
82 func (me *torrentStorage) WriteAt(b []byte, _ int64) (int, error) {
83         if len(b) != defaultChunkSize {
84                 panic(len(b))
85         }
86         me.writeSem.Unlock()
87         return len(b), nil
88 }
89
90 func BenchmarkConnectionMainReadLoop(b *testing.B) {
91         c := quicktest.New(b)
92         cl := &Client{
93                 config: &ClientConfig{
94                         DownloadRateLimiter: unlimited,
95                 },
96         }
97         cl.initLogger()
98         ts := &torrentStorage{}
99         t := &Torrent{
100                 cl:                cl,
101                 storage:           &storage.Torrent{TorrentImpl: storage.TorrentImpl{Piece: ts.Piece, Close: ts.Close}},
102                 pieceStateChanges: pubsub.NewPubSub(),
103         }
104         require.NoError(b, t.setInfo(&metainfo.Info{
105                 Pieces:      make([]byte, 20),
106                 Length:      1 << 20,
107                 PieceLength: 1 << 20,
108         }))
109         t.setChunkSize(defaultChunkSize)
110         t._pendingPieces.Set(0, PiecePriorityNormal.BitmapPriority())
111         r, w := net.Pipe()
112         cn := cl.newConnection(r, true, r.RemoteAddr(), r.RemoteAddr().Network(), regularNetConnPeerConnConnString(r))
113         cn.setTorrent(t)
114         mrlErr := make(chan error)
115         msg := pp.Message{
116                 Type:  pp.Piece,
117                 Piece: make([]byte, defaultChunkSize),
118         }
119         go func() {
120                 cl.lock()
121                 err := cn.mainReadLoop()
122                 if err != nil {
123                         mrlErr <- err
124                 }
125                 close(mrlErr)
126         }()
127         wb := msg.MustMarshalBinary()
128         b.SetBytes(int64(len(msg.Piece)))
129         go func() {
130                 defer w.Close()
131                 ts.writeSem.Lock()
132                 for range iter.N(b.N) {
133                         cl.lock()
134                         // The chunk must be written to storage everytime, to ensure the
135                         // writeSem is unlocked.
136                         t.pieces[0]._dirtyChunks.Clear()
137                         cn.validReceiveChunks = map[Request]int{newRequestFromMessage(&msg): 1}
138                         cl.unlock()
139                         n, err := w.Write(wb)
140                         require.NoError(b, err)
141                         require.EqualValues(b, len(wb), n)
142                         ts.writeSem.Lock()
143                 }
144         }()
145         c.Assert([]error{nil, io.EOF}, quicktest.Contains, <-mrlErr)
146         c.Assert(cn._stats.ChunksReadUseful.Int64(), quicktest.Equals, int64(b.N))
147 }
148
149 func TestConnPexPeerFlags(t *testing.T) {
150         var (
151                 tcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848}
152                 udpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848}
153         )
154         var testcases = []struct {
155                 conn *PeerConn
156                 f    pp.PexPeerFlags
157         }{
158                 {&PeerConn{Peer: Peer{outgoing: false, PeerPrefersEncryption: false}}, 0},
159                 {&PeerConn{Peer: Peer{outgoing: false, PeerPrefersEncryption: true}}, pp.PexPrefersEncryption},
160                 {&PeerConn{Peer: Peer{outgoing: true, PeerPrefersEncryption: false}}, pp.PexOutgoingConn},
161                 {&PeerConn{Peer: Peer{outgoing: true, PeerPrefersEncryption: true}}, pp.PexOutgoingConn | pp.PexPrefersEncryption},
162                 {&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}}, pp.PexSupportsUtp},
163                 {&PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network(), outgoing: true}}, pp.PexOutgoingConn | pp.PexSupportsUtp},
164                 {&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true}}, pp.PexOutgoingConn},
165                 {&PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network()}}, 0},
166         }
167         for i, tc := range testcases {
168                 f := tc.conn.pexPeerFlags()
169                 require.EqualValues(t, tc.f, f, i)
170         }
171 }
172
173 func TestConnPexEvent(t *testing.T) {
174         var (
175                 udpAddr     = &net.UDPAddr{IP: net.IPv6loopback, Port: 4848}
176                 tcpAddr     = &net.TCPAddr{IP: net.IPv6loopback, Port: 4848}
177                 dialTcpAddr = &net.TCPAddr{IP: net.IPv6loopback, Port: 4747}
178                 dialUdpAddr = &net.UDPAddr{IP: net.IPv6loopback, Port: 4747}
179         )
180         var testcases = []struct {
181                 t pexEventType
182                 c *PeerConn
183                 e pexEvent
184         }{
185                 {
186                         pexAdd,
187                         &PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network()}},
188                         pexEvent{pexAdd, udpAddr, pp.PexSupportsUtp},
189                 },
190                 {
191                         pexDrop,
192                         &PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), outgoing: true, PeerListenPort: dialTcpAddr.Port}},
193                         pexEvent{pexDrop, tcpAddr, pp.PexOutgoingConn},
194                 },
195                 {
196                         pexAdd,
197                         &PeerConn{Peer: Peer{RemoteAddr: tcpAddr, Network: tcpAddr.Network(), PeerListenPort: dialTcpAddr.Port}},
198                         pexEvent{pexAdd, dialTcpAddr, 0},
199                 },
200                 {
201                         pexDrop,
202                         &PeerConn{Peer: Peer{RemoteAddr: udpAddr, Network: udpAddr.Network(), PeerListenPort: dialUdpAddr.Port}},
203                         pexEvent{pexDrop, dialUdpAddr, pp.PexSupportsUtp},
204                 },
205         }
206         for i, tc := range testcases {
207                 e := tc.c.pexEvent(tc.t)
208                 require.EqualValues(t, tc.e, e, i)
209         }
210 }