]> Sergey Matveev's repositories - btrtrc.git/blob - dht/transaction.go
Fix #41
[btrtrc.git] / dht / transaction.go
1 package dht
2
3 import (
4         "sync"
5         "time"
6 )
7
8 // Transaction keeps track of a message exchange between nodes,
9 // such as a query message and a response message
10 type Transaction struct {
11         mu             sync.Mutex
12         remoteAddr     dHTAddr
13         t              string
14         response       chan Msg
15         onResponse     func(Msg) // Called with the server locked.
16         done           chan struct{}
17         queryPacket    []byte
18         timer          *time.Timer
19         s              *Server
20         retries        int
21         lastSend       time.Time
22         userOnResponse func(Msg)
23 }
24
25 // SetResponseHandler sets up a function to be called when query response
26 // arrives
27 func (t *Transaction) SetResponseHandler(f func(Msg)) {
28         t.mu.Lock()
29         defer t.mu.Unlock()
30         t.userOnResponse = f
31         t.tryHandleResponse()
32 }
33
34 func (t *Transaction) tryHandleResponse() {
35         if t.userOnResponse == nil {
36                 return
37         }
38         select {
39         case r, ok := <-t.response:
40                 if !ok {
41                         // TODO: I think some assumption is broken. This isn't supposed to
42                         // happen.
43                         break
44                 }
45                 t.userOnResponse(r)
46                 // Shouldn't be called more than once.
47                 t.userOnResponse = nil
48         default:
49         }
50 }
51
52 func (t *Transaction) key() transactionKey {
53         return transactionKey{
54                 t.remoteAddr.String(),
55                 t.t,
56         }
57 }
58
59 func (t *Transaction) startTimer() {
60         t.timer = time.AfterFunc(jitterDuration(queryResendEvery, time.Second), t.timerCallback)
61 }
62
63 func (t *Transaction) timerCallback() {
64         t.mu.Lock()
65         defer t.mu.Unlock()
66         select {
67         case <-t.done:
68                 return
69         default:
70         }
71         if t.retries == 2 {
72                 t.timeout()
73                 return
74         }
75         t.retries++
76         t.sendQuery()
77         if t.timer.Reset(jitterDuration(queryResendEvery, time.Second)) {
78                 panic("timer should have fired to get here")
79         }
80 }
81
82 func (t *Transaction) sendQuery() error {
83         err := t.s.writeToNode(t.queryPacket, t.remoteAddr)
84         if err != nil {
85                 return err
86         }
87         t.lastSend = time.Now()
88         return nil
89 }
90
91 func (t *Transaction) timeout() {
92         go func() {
93                 t.s.mu.Lock()
94                 defer t.s.mu.Unlock()
95                 t.s.nodeTimedOut(t.remoteAddr)
96         }()
97         t.close()
98 }
99
100 func (t *Transaction) close() {
101         if t.closing() {
102                 return
103         }
104         t.queryPacket = nil
105         close(t.response)
106         t.tryHandleResponse()
107         close(t.done)
108         t.timer.Stop()
109         go func() {
110                 t.s.mu.Lock()
111                 defer t.s.mu.Unlock()
112                 t.s.deleteTransaction(t)
113         }()
114 }
115
116 func (t *Transaction) closing() bool {
117         select {
118         case <-t.done:
119                 return true
120         default:
121                 return false
122         }
123 }
124
125 // Close (abandon) the transaction.
126 func (t *Transaction) Close() {
127         t.mu.Lock()
128         defer t.mu.Unlock()
129         t.close()
130 }
131
132 func (t *Transaction) handleResponse(m Msg) {
133         t.mu.Lock()
134         if t.closing() {
135                 t.mu.Unlock()
136                 return
137         }
138         close(t.done)
139         t.mu.Unlock()
140         if t.onResponse != nil {
141                 t.s.mu.Lock()
142                 t.onResponse(m)
143                 t.s.mu.Unlock()
144         }
145         t.queryPacket = nil
146         select {
147         case t.response <- m:
148         default:
149                 panic("blocked handling response")
150         }
151         close(t.response)
152         t.tryHandleResponse()
153 }