]> Sergey Matveev's repositories - btrtrc.git/commitdiff
dht: Store transactions by key, and fix partial deadlock
authorMatt Joiner <anacrolix@gmail.com>
Mon, 8 Dec 2014 22:59:25 +0000 (16:59 -0600)
committerMatt Joiner <anacrolix@gmail.com>
Mon, 8 Dec 2014 22:59:25 +0000 (16:59 -0600)
dht/dht.go

index 654471443fe664218618ca661f463e5e66b0c1c3..99419eb20b853e43ff198377a0372d5b89ef3cbf 100644 (file)
@@ -12,9 +12,10 @@ import (
        "math/rand"
        "net"
        "os"
-       "sync"
        "time"
 
+       "bitbucket.org/anacrolix/sync"
+
        "bitbucket.org/anacrolix/go.torrent/iplist"
 
        "bitbucket.org/anacrolix/go.torrent/logonce"
@@ -22,10 +23,16 @@ import (
        "github.com/anacrolix/libtorgo/bencode"
 )
 
+// Uniquely identifies a transaction to us.
+type transactionKey struct {
+       RemoteAddr string // host:port
+       T          string // The KRPC transaction ID.
+}
+
 type Server struct {
        id               string
        socket           net.PacketConn
-       transactions     []*transaction
+       transactions     map[transactionKey]*transaction
        transactionIDInt uint64
        nodes            map[string]*Node // Keyed by dHTAddr.String().
        mu               sync.Mutex
@@ -234,6 +241,13 @@ type transaction struct {
        retries     int
 }
 
+func (t *transaction) Key() transactionKey {
+       return transactionKey{
+               t.remoteAddr.String(),
+               t.t,
+       }
+}
+
 func jitterDuration(average time.Duration, plusMinus time.Duration) time.Duration {
        return average - plusMinus/2 + time.Duration(rand.Int63n(int64(plusMinus)))
 }
@@ -276,9 +290,11 @@ func (t *transaction) close() {
        close(t.Response)
        close(t.done)
        t.timer.Stop()
-       t.s.mu.Lock()
-       defer t.s.mu.Unlock()
-       t.s.removeTransaction(t)
+       go func() {
+               t.s.mu.Lock()
+               defer t.s.mu.Unlock()
+               t.s.deleteTransaction(t)
+       }()
 }
 
 func (t *transaction) closing() bool {
@@ -349,6 +365,7 @@ func (s *Server) init() (err error) {
                return
        }
        s.closed = make(chan struct{})
+       s.transactions = make(map[transactionKey]*transaction)
        return
 }
 
@@ -386,9 +403,11 @@ func (s *Server) processPacket(b []byte, addr dHTAddr) {
                //log.Printf("unexpected message: %#v", d)
                return
        }
-       s.getNode(addr).lastGotResponse = time.Now()
-       t.handleResponse(d)
-       s.removeTransaction(t)
+       node := s.getNode(addr)
+       node.lastGotResponse = time.Now()
+       // TODO: Update node ID as this is an authoritative packet.
+       go t.handleResponse(d)
+       s.deleteTransaction(t)
 }
 
 func (s *Server) serve() error {
@@ -550,12 +569,9 @@ func (s *Server) writeToNode(b []byte, node dHTAddr) (err error) {
 }
 
 func (s *Server) findResponseTransaction(transactionID string, sourceNode dHTAddr) *transaction {
-       for _, t := range s.transactions {
-               if t.t == transactionID && t.remoteAddr.String() == sourceNode.String() {
-                       return t
-               }
-       }
-       return nil
+       return s.transactions[transactionKey{
+               sourceNode.String(),
+               transactionID}]
 }
 
 func (s *Server) nextTransactionID() string {
@@ -565,20 +581,15 @@ func (s *Server) nextTransactionID() string {
        return string(b[:n])
 }
 
-func (s *Server) removeTransaction(t *transaction) {
-       for i, tt := range s.transactions {
-               if t == tt {
-                       last := len(s.transactions) - 1
-                       s.transactions[i] = s.transactions[last]
-                       s.transactions = s.transactions[:last]
-                       return
-               }
-       }
-       panic("transaction not found")
+func (s *Server) deleteTransaction(t *transaction) {
+       delete(s.transactions, t.Key())
 }
 
 func (s *Server) addTransaction(t *transaction) {
-       s.transactions = append(s.transactions, t)
+       if _, ok := s.transactions[t.Key()]; ok {
+               panic("transaction not unique")
+       }
+       s.transactions[t.Key()] = t
 }
 
 func (s *Server) IDString() string {