select {
case <-nextScrape:
break getPeers
- case cps, ok := <-ps.Values:
+ case v, ok := <-ps.Values:
if !ok {
break getPeers
}
- peersFoundByDHT.Add(int64(len(cps)))
+ peersFoundByDHT.Add(int64(len(v.Peers)))
err = cl.AddPeers(t.InfoHash, func() (ret []Peer) {
- for _, cp := range cps {
+ for _, cp := range v.Peers {
ret = append(ret, Peer{
IP: cp.IP[:],
Port: int(cp.Port),
if err != nil {
log.Fatalf("error loading table: %s", err)
}
- log.Printf("dht server on %s, ID is %q", s.LocalAddr(), s.IDString())
+ log.Printf("dht server on %s, ID is %x", s.LocalAddr(), s.IDString())
setupSignals()
}
log.Fatal(err)
}
go func() {
- for sl := range ps.Values {
- for _, p := range sl {
+ for v := range ps.Values {
+ log.Printf("received %d peers from %x", len(v.Peers), v.NodeInfo.ID)
+ for _, p := range v.Peers {
if _, ok := seen[p]; ok {
continue
}
return
}
+type peerStreamValue struct {
+ Peers []util.CompactPeer // Peers given in get_peers response.
+ NodeInfo // The node that gave the response.
+}
+
type peerStream struct {
mu sync.Mutex
- Values chan []util.CompactPeer
+ Values chan peerStreamValue
stop chan struct{}
}
func (s *Server) GetPeers(infoHash string) (ps *peerStream, err error) {
ps = &peerStream{
- Values: make(chan []util.CompactPeer),
+ Values: make(chan peerStreamValue),
stop: make(chan struct{}),
}
done := make(chan struct{})
case m := <-t.Response:
vs := extractValues(m)
if vs != nil {
+ nodeInfo := NodeInfo{
+ Addr: t.remoteAddr,
+ }
+ id := func() string {
+ defer func() {
+ recover()
+ }()
+ return m["r"].(map[string]interface{})["id"].(string)
+ }()
+ copy(nodeInfo.ID[:], id)
select {
- case ps.Values <- vs:
+ case ps.Values <- peerStreamValue{
+ Peers: vs,
+ NodeInfo: nodeInfo,
+ }:
case <-ps.stop:
}
}