From: Matt Joiner Date: Mon, 8 Dec 2014 22:59:25 +0000 (-0600) Subject: dht: Store transactions by key, and fix partial deadlock X-Git-Tag: v1.0.0~1405 X-Git-Url: http://www.git.stargrave.org/?a=commitdiff_plain;h=ac5715898293861dec5a328d4c510f33f8888d19;p=btrtrc.git dht: Store transactions by key, and fix partial deadlock --- diff --git a/dht/dht.go b/dht/dht.go index 65447144..99419eb2 100644 --- a/dht/dht.go +++ b/dht/dht.go @@ -12,9 +12,10 @@ import ( "math/rand" "net" "os" - "sync" "time" + "bitbucket.org/anacrolix/sync" + "bitbucket.org/anacrolix/go.torrent/iplist" "bitbucket.org/anacrolix/go.torrent/logonce" @@ -22,10 +23,16 @@ import ( "github.com/anacrolix/libtorgo/bencode" ) +// Uniquely identifies a transaction to us. +type transactionKey struct { + RemoteAddr string // host:port + T string // The KRPC transaction ID. +} + type Server struct { id string socket net.PacketConn - transactions []*transaction + transactions map[transactionKey]*transaction transactionIDInt uint64 nodes map[string]*Node // Keyed by dHTAddr.String(). mu sync.Mutex @@ -234,6 +241,13 @@ type transaction struct { retries int } +func (t *transaction) Key() transactionKey { + return transactionKey{ + t.remoteAddr.String(), + t.t, + } +} + func jitterDuration(average time.Duration, plusMinus time.Duration) time.Duration { return average - plusMinus/2 + time.Duration(rand.Int63n(int64(plusMinus))) } @@ -276,9 +290,11 @@ func (t *transaction) close() { close(t.Response) close(t.done) t.timer.Stop() - t.s.mu.Lock() - defer t.s.mu.Unlock() - t.s.removeTransaction(t) + go func() { + t.s.mu.Lock() + defer t.s.mu.Unlock() + t.s.deleteTransaction(t) + }() } func (t *transaction) closing() bool { @@ -349,6 +365,7 @@ func (s *Server) init() (err error) { return } s.closed = make(chan struct{}) + s.transactions = make(map[transactionKey]*transaction) return } @@ -386,9 +403,11 @@ func (s *Server) processPacket(b []byte, addr dHTAddr) { //log.Printf("unexpected message: %#v", d) return } - s.getNode(addr).lastGotResponse = time.Now() - t.handleResponse(d) - s.removeTransaction(t) + node := s.getNode(addr) + node.lastGotResponse = time.Now() + // TODO: Update node ID as this is an authoritative packet. + go t.handleResponse(d) + s.deleteTransaction(t) } func (s *Server) serve() error { @@ -550,12 +569,9 @@ func (s *Server) writeToNode(b []byte, node dHTAddr) (err error) { } func (s *Server) findResponseTransaction(transactionID string, sourceNode dHTAddr) *transaction { - for _, t := range s.transactions { - if t.t == transactionID && t.remoteAddr.String() == sourceNode.String() { - return t - } - } - return nil + return s.transactions[transactionKey{ + sourceNode.String(), + transactionID}] } func (s *Server) nextTransactionID() string { @@ -565,20 +581,15 @@ func (s *Server) nextTransactionID() string { return string(b[:n]) } -func (s *Server) removeTransaction(t *transaction) { - for i, tt := range s.transactions { - if t == tt { - last := len(s.transactions) - 1 - s.transactions[i] = s.transactions[last] - s.transactions = s.transactions[:last] - return - } - } - panic("transaction not found") +func (s *Server) deleteTransaction(t *transaction) { + delete(s.transactions, t.Key()) } func (s *Server) addTransaction(t *transaction) { - s.transactions = append(s.transactions, t) + if _, ok := s.transactions[t.Key()]; ok { + panic("transaction not unique") + } + s.transactions[t.Key()] = t } func (s *Server) IDString() string {