]> Sergey Matveev's repositories - btrtrc.git/blob - dht/announce.go
a7c55868d618a4e881c69350ebd728f260a3be50
[btrtrc.git] / dht / announce.go
1 package dht
2
3 // get_peers and announce_peers.
4
5 import (
6         "log"
7         "time"
8
9         "github.com/anacrolix/missinggo"
10         "github.com/anacrolix/sync"
11         "github.com/willf/bloom"
12
13         "github.com/anacrolix/torrent/logonce"
14 )
15
16 // Maintains state for an ongoing Announce operation. An Announce is started
17 // by calling Server.Announce.
18 type Announce struct {
19         mu    sync.Mutex
20         Peers chan PeersValues
21         // Inner chan is set to nil when on close.
22         values              chan PeersValues
23         stop                chan struct{}
24         triedAddrs          *bloom.BloomFilter
25         pending             int
26         server              *Server
27         infoHash            string
28         numContacted        int
29         announcePort        int
30         announcePortImplied bool
31 }
32
33 // Returns the number of distinct remote addresses the announce has queried.
34 func (me *Announce) NumContacted() int {
35         me.mu.Lock()
36         defer me.mu.Unlock()
37         return me.numContacted
38 }
39
40 // This is kind of the main thing you want to do with DHT. It traverses the
41 // graph toward nodes that store peers for the infohash, streaming them to the
42 // caller, and announcing the local node to each node if allowed and
43 // specified.
44 func (s *Server) Announce(infoHash string, port int, impliedPort bool) (*Announce, error) {
45         s.mu.Lock()
46         startAddrs := func() (ret []dHTAddr) {
47                 for _, n := range s.closestGoodNodes(160, infoHash) {
48                         ret = append(ret, n.addr)
49                 }
50                 return
51         }()
52         s.mu.Unlock()
53         if len(startAddrs) == 0 {
54                 addrs, err := bootstrapAddrs(s.bootstrapNodes)
55                 if err != nil {
56                         return nil, err
57                 }
58                 for _, addr := range addrs {
59                         startAddrs = append(startAddrs, newDHTAddr(addr))
60                 }
61         }
62         disc := &Announce{
63                 Peers:               make(chan PeersValues, 100),
64                 stop:                make(chan struct{}),
65                 values:              make(chan PeersValues),
66                 triedAddrs:          bloom.NewWithEstimates(1000, 0.5),
67                 server:              s,
68                 infoHash:            infoHash,
69                 announcePort:        port,
70                 announcePortImplied: impliedPort,
71         }
72         // Function ferries from values to Values until discovery is halted.
73         go func() {
74                 defer close(disc.Peers)
75                 for {
76                         select {
77                         case psv := <-disc.values:
78                                 select {
79                                 case disc.Peers <- psv:
80                                 case <-disc.stop:
81                                         return
82                                 }
83                         case <-disc.stop:
84                                 return
85                         }
86                 }
87         }()
88         for i, addr := range startAddrs {
89                 if i != 0 {
90                         time.Sleep(time.Millisecond)
91                 }
92                 disc.mu.Lock()
93                 disc.contact(addr)
94                 disc.mu.Unlock()
95         }
96         return disc, nil
97 }
98
99 func (me *Announce) gotNodeAddr(addr dHTAddr) {
100         if missinggo.AddrPort(addr) == 0 {
101                 // Not a contactable address.
102                 return
103         }
104         if me.triedAddrs.Test([]byte(addr.String())) {
105                 return
106         }
107         if me.server.ipBlocked(addr.UDPAddr().IP) {
108                 return
109         }
110         me.server.mu.Lock()
111         if me.server.badNodes.Test([]byte(addr.String())) {
112                 me.server.mu.Unlock()
113                 return
114         }
115         me.server.mu.Unlock()
116         me.contact(addr)
117 }
118
119 func (me *Announce) contact(addr dHTAddr) {
120         me.numContacted++
121         me.triedAddrs.Add([]byte(addr.String()))
122         if err := me.getPeers(addr); err != nil {
123                 log.Printf("error sending get_peers request to %s: %#v", addr, err)
124                 return
125         }
126         me.pending++
127 }
128
129 func (me *Announce) transactionClosed() {
130         me.pending--
131         if me.pending == 0 {
132                 me.close()
133                 return
134         }
135 }
136
137 func (me *Announce) responseNode(node NodeInfo) {
138         me.gotNodeAddr(node.Addr)
139 }
140
141 func (me *Announce) closingCh() chan struct{} {
142         return me.stop
143 }
144
145 func (me *Announce) announcePeer(to dHTAddr, token string) {
146         me.server.mu.Lock()
147         err := me.server.announcePeer(to, me.infoHash, me.announcePort, token, me.announcePortImplied)
148         me.server.mu.Unlock()
149         if err != nil {
150                 logonce.Stderr.Printf("error announcing peer: %s", err)
151         }
152 }
153
154 func (me *Announce) getPeers(addr dHTAddr) error {
155         me.server.mu.Lock()
156         defer me.server.mu.Unlock()
157         t, err := me.server.getPeers(addr, me.infoHash)
158         if err != nil {
159                 return err
160         }
161         t.SetResponseHandler(func(m Msg) {
162                 // Register suggested nodes closer to the target info-hash.
163                 me.mu.Lock()
164                 for _, n := range m.Nodes() {
165                         me.responseNode(n)
166                 }
167                 me.mu.Unlock()
168
169                 if vs := m.Values(); vs != nil {
170                         for _, cp := range vs {
171                                 if cp.Port == 0 {
172                                         me.server.mu.Lock()
173                                         me.server.badNode(addr)
174                                         me.server.mu.Unlock()
175                                         return
176                                 }
177                         }
178                         nodeInfo := NodeInfo{
179                                 Addr: t.remoteAddr,
180                         }
181                         copy(nodeInfo.ID[:], m.SenderID())
182                         select {
183                         case me.values <- PeersValues{
184                                 Peers:    vs,
185                                 NodeInfo: nodeInfo,
186                         }:
187                         case <-me.stop:
188                         }
189                 }
190
191                 if at, ok := m.AnnounceToken(); ok {
192                         me.announcePeer(addr, at)
193                 }
194
195                 me.mu.Lock()
196                 me.transactionClosed()
197                 me.mu.Unlock()
198         })
199         return nil
200 }
201
202 // Corresponds to the "values" key in a get_peers KRPC response. A list of
203 // peers that a node has reported as being in the swarm for a queried info
204 // hash.
205 type PeersValues struct {
206         Peers    []Peer // Peers given in get_peers response.
207         NodeInfo        // The node that gave the response.
208 }
209
210 // Stop the announce.
211 func (me *Announce) Close() {
212         me.mu.Lock()
213         defer me.mu.Unlock()
214         me.close()
215 }
216
217 func (ps *Announce) close() {
218         select {
219         case <-ps.stop:
220         default:
221                 close(ps.stop)
222         }
223 }