infoHash string
}
-const parallelQueries = 100
+const (
+ parallelQueries = 100
+ backlogMaxLen = 10000
+)
func (me *peerDiscovery) Close() {
me.peerStream.Close()
peerStream: &peerStream{
Values: make(chan peerStreamValue),
stop: make(chan struct{}),
+ values: make(chan peerStreamValue),
},
triedAddrs: make(map[string]struct{}, 500),
backlog: make(map[string]net.Addr, parallelQueries),
server: s,
infoHash: infoHash,
}
+ // Function ferries from values to Values until discovery is halted.
+ go func() {
+ defer close(disc.Values)
+ for {
+ select {
+ case psv := <-disc.values:
+ select {
+ case disc.Values <- psv:
+ case <-disc.stop:
+ return
+ }
+ case <-disc.stop:
+ return
+ }
+ }
+ }()
disc.mu.Lock()
for _, addr := range startAddrs {
disc.contact(addr)
return
}
if me.pending >= parallelQueries {
- me.backlog[addr.String()] = addr
+ if len(me.backlog) < backlogMaxLen {
+ me.backlog[addr.String()] = addr
+ }
} else {
me.contact(addr)
}
func (me *peerDiscovery) transactionClosed() {
me.pending--
- // log.Printf("pending: %d", me.pending)
for key, addr := range me.backlog {
if me.pending >= parallelQueries {
break
}()
copy(nodeInfo.ID[:], id)
select {
- case me.peerStream.Values <- peerStreamValue{
+ case me.peerStream.values <- peerStreamValue{
Peers: vs,
NodeInfo: nodeInfo,
}:
return nil
}
-func (me *peerDiscovery) streamValue(psv peerStreamValue) {
- me.peerStream.Values <- psv
-}
-
type peerStreamValue struct {
Peers []util.CompactPeer // Peers given in get_peers response.
NodeInfo // The node that gave the response.
type peerStream struct {
mu sync.Mutex
Values chan peerStreamValue
+ // Inner chan is set to nil when on close.
+ values chan peerStreamValue
stop chan struct{}
}
case <-ps.stop:
default:
close(ps.stop)
- close(ps.Values)
}
}