8 // Transaction keeps track of a message exchange between nodes,
9 // such as a query message and a response message
10 type Transaction struct {
15 onResponse func(Msg) // Called with the server locked.
22 userOnResponse func(Msg)
25 // SetResponseHandler sets up a function to be called when query response
27 func (t *Transaction) SetResponseHandler(f func(Msg)) {
34 func (t *Transaction) tryHandleResponse() {
35 if t.userOnResponse == nil {
39 case r, ok := <-t.response:
41 // TODO: I think some assumption is broken. This isn't supposed to
46 // Shouldn't be called more than once.
47 t.userOnResponse = nil
52 func (t *Transaction) key() transactionKey {
53 return transactionKey{
54 t.remoteAddr.String(),
59 func (t *Transaction) startTimer() {
60 t.timer = time.AfterFunc(jitterDuration(queryResendEvery, time.Second), t.timerCallback)
63 func (t *Transaction) timerCallback() {
77 if t.timer.Reset(jitterDuration(queryResendEvery, time.Second)) {
78 panic("timer should have fired to get here")
82 func (t *Transaction) sendQuery() error {
83 err := t.s.writeToNode(t.queryPacket, t.remoteAddr)
87 t.lastSend = time.Now()
91 func (t *Transaction) timeout() {
95 t.s.nodeTimedOut(t.remoteAddr)
100 func (t *Transaction) close() {
106 t.tryHandleResponse()
111 defer t.s.mu.Unlock()
112 t.s.deleteTransaction(t)
116 func (t *Transaction) closing() bool {
125 // Close (abandon) the transaction.
126 func (t *Transaction) Close() {
132 func (t *Transaction) handleResponse(m Msg) {
140 if t.onResponse != nil {
147 case t.response <- m:
149 panic("blocked handling response")
152 t.tryHandleResponse()