]> Sergey Matveev's repositories - btrtrc.git/blob - dht/getpeers.go
Rewrite import paths for migration from Bitbucket
[btrtrc.git] / dht / getpeers.go
1 package dht
2
3 // get_peers and announce_peers.
4
5 import (
6         "log"
7         "time"
8
9         "github.com/anacrolix/torrent/logonce"
10
11         "github.com/anacrolix/torrent/util"
12         "bitbucket.org/anacrolix/sync"
13         "github.com/willf/bloom"
14 )
15
16 type peerDiscovery struct {
17         *peerStream
18         triedAddrs          *bloom.BloomFilter
19         pending             int
20         server              *Server
21         infoHash            string
22         numContacted        int
23         announcePort        int
24         announcePortImplied bool
25 }
26
27 func (pd *peerDiscovery) NumContacted() int {
28         pd.mu.Lock()
29         defer pd.mu.Unlock()
30         return pd.numContacted
31 }
32
33 func (s *Server) Announce(infoHash string, port int, impliedPort bool) (*peerDiscovery, error) {
34         s.mu.Lock()
35         startAddrs := func() (ret []dHTAddr) {
36                 for _, n := range s.closestGoodNodes(160, infoHash) {
37                         ret = append(ret, n.addr)
38                 }
39                 return
40         }()
41         s.mu.Unlock()
42         if len(startAddrs) == 0 {
43                 addrs, err := bootstrapAddrs()
44                 if err != nil {
45                         return nil, err
46                 }
47                 for _, addr := range addrs {
48                         startAddrs = append(startAddrs, newDHTAddr(addr))
49                 }
50         }
51         disc := &peerDiscovery{
52                 peerStream: &peerStream{
53                         Values: make(chan peerStreamValue, 100),
54                         stop:   make(chan struct{}),
55                         values: make(chan peerStreamValue),
56                 },
57                 triedAddrs:          bloom.NewWithEstimates(1000, 0.5),
58                 server:              s,
59                 infoHash:            infoHash,
60                 announcePort:        port,
61                 announcePortImplied: impliedPort,
62         }
63         // Function ferries from values to Values until discovery is halted.
64         go func() {
65                 defer close(disc.Values)
66                 for {
67                         select {
68                         case psv := <-disc.values:
69                                 select {
70                                 case disc.Values <- psv:
71                                 case <-disc.stop:
72                                         return
73                                 }
74                         case <-disc.stop:
75                                 return
76                         }
77                 }
78         }()
79         for i, addr := range startAddrs {
80                 if i != 0 {
81                         time.Sleep(time.Millisecond)
82                 }
83                 disc.mu.Lock()
84                 disc.contact(addr)
85                 disc.mu.Unlock()
86         }
87         return disc, nil
88 }
89
90 func (me *peerDiscovery) gotNodeAddr(addr dHTAddr) {
91         if util.AddrPort(addr) == 0 {
92                 // Not a contactable address.
93                 return
94         }
95         if me.triedAddrs.Test([]byte(addr.String())) {
96                 return
97         }
98         if me.server.ipBlocked(util.AddrIP(addr)) {
99                 return
100         }
101         me.contact(addr)
102 }
103
104 func (me *peerDiscovery) contact(addr dHTAddr) {
105         me.numContacted++
106         me.triedAddrs.Add([]byte(addr.String()))
107         if err := me.getPeers(addr); err != nil {
108                 log.Printf("error sending get_peers request to %s: %#v", addr, err)
109                 return
110         }
111         me.pending++
112 }
113
114 func (me *peerDiscovery) transactionClosed() {
115         me.pending--
116         if me.pending == 0 {
117                 me.close()
118                 return
119         }
120 }
121
122 func (me *peerDiscovery) responseNode(node NodeInfo) {
123         me.gotNodeAddr(node.Addr)
124 }
125
126 func (me *peerDiscovery) closingCh() chan struct{} {
127         return me.peerStream.stop
128 }
129
130 func (me *peerDiscovery) announcePeer(to dHTAddr, token string) {
131         me.server.mu.Lock()
132         err := me.server.announcePeer(to, me.infoHash, me.announcePort, token, me.announcePortImplied)
133         me.server.mu.Unlock()
134         if err != nil {
135                 logonce.Stderr.Printf("error announcing peer: %s", err)
136         }
137 }
138
139 func (me *peerDiscovery) getPeers(addr dHTAddr) error {
140         me.server.mu.Lock()
141         defer me.server.mu.Unlock()
142         t, err := me.server.getPeers(addr, me.infoHash)
143         if err != nil {
144                 return err
145         }
146         t.SetResponseHandler(func(m Msg) {
147                 // Register suggested nodes closer to the target info-hash.
148                 me.mu.Lock()
149                 for _, n := range m.Nodes() {
150                         me.responseNode(n)
151                 }
152                 me.mu.Unlock()
153
154                 if vs := m.Values(); vs != nil {
155                         nodeInfo := NodeInfo{
156                                 Addr: t.remoteAddr,
157                         }
158                         copy(nodeInfo.ID[:], m.ID())
159                         select {
160                         case me.peerStream.values <- peerStreamValue{
161                                 Peers:    vs,
162                                 NodeInfo: nodeInfo,
163                         }:
164                         case <-me.peerStream.stop:
165                         }
166                 }
167
168                 if at, ok := m.AnnounceToken(); ok {
169                         me.announcePeer(addr, at)
170                 }
171
172                 me.mu.Lock()
173                 me.transactionClosed()
174                 me.mu.Unlock()
175         })
176         return nil
177 }
178
179 type peerStreamValue struct {
180         Peers    []util.CompactPeer // Peers given in get_peers response.
181         NodeInfo                    // The node that gave the response.
182 }
183
184 // TODO: This was to be the shared publicly accessible part returned by DHT
185 // functions that stream peers. Possibly not necessary anymore.
186 type peerStream struct {
187         mu     sync.Mutex
188         Values chan peerStreamValue
189         // Inner chan is set to nil when on close.
190         values chan peerStreamValue
191         stop   chan struct{}
192 }
193
194 func (ps *peerStream) Close() {
195         ps.mu.Lock()
196         defer ps.mu.Unlock()
197         ps.close()
198 }
199
200 func (ps *peerStream) close() {
201         select {
202         case <-ps.stop:
203         default:
204                 close(ps.stop)
205         }
206 }