req, err := http.NewRequest("GET", _url.String(), nil)
req.Header.Set("User-Agent", opt.UserAgent)
req.Host = opt.HostHeader
+ if opt.Context != nil {
+ req = req.WithContext(opt.Context)
+ }
resp, err := (&http.Client{
Timeout: time.Second * 15,
Transport: &http.Transport{
import (
"bytes"
+ "context"
"encoding"
"encoding/binary"
- "errors"
"fmt"
"io"
"math/rand"
"github.com/anacrolix/dht/krpc"
"github.com/anacrolix/missinggo"
"github.com/anacrolix/missinggo/pproffd"
+ "github.com/pkg/errors"
)
type Action int32
// args is the binary serializable request body. trailer is optional data
// following it, such as for BEP 41.
-func (c *udpAnnounce) request(action Action, args interface{}, options []byte) (responseBody *bytes.Buffer, err error) {
+func (c *udpAnnounce) request(action Action, args interface{}, options []byte) (*bytes.Buffer, error) {
tid := newTransactionId()
- err = c.write(&RequestHeader{
- ConnectionId: c.connectionId,
- Action: action,
- TransactionId: tid,
- }, args, options)
- if err != nil {
- return
+ if err := errors.Wrap(
+ c.write(
+ &RequestHeader{
+ ConnectionId: c.connectionId,
+ Action: action,
+ TransactionId: tid,
+ }, args, options),
+ "writing request",
+ ); err != nil {
+ return nil, err
}
c.socket.SetReadDeadline(time.Now().Add(timeout(c.contiguousTimeouts)))
b := make([]byte, 0x800) // 2KiB
for {
- var n int
- n, err = c.socket.Read(b)
- if opE, ok := err.(*net.OpError); ok {
- if opE.Timeout() {
- c.contiguousTimeouts++
- return
- }
+ var (
+ n int
+ readErr error
+ readDone = make(chan struct{})
+ )
+ go func() {
+ defer close(readDone)
+ n, readErr = c.socket.Read(b)
+ }()
+ ctx := c.a.Context
+ if ctx == nil {
+ ctx = context.Background()
}
- if err != nil {
- return
+ select {
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ case <-readDone:
+ }
+ if opE, ok := readErr.(*net.OpError); ok && opE.Timeout() {
+ c.contiguousTimeouts++
+ }
+ if readErr != nil {
+ return nil, errors.Wrap(readErr, "reading from socket")
}
buf := bytes.NewBuffer(b[:n])
var h ResponseHeader
- err = binary.Read(buf, binary.BigEndian, &h)
+ err := binary.Read(buf, binary.BigEndian, &h)
switch err {
- case io.ErrUnexpectedEOF:
+ default:
+ panic(err)
+ case io.ErrUnexpectedEOF, io.EOF:
continue
case nil:
- default:
- return
}
if h.TransactionId != tid {
continue
if h.Action == ActionError {
err = errors.New(buf.String())
}
- responseBody = buf
- return
+ return buf, err
}
}
import (
"bytes"
+ "context"
"crypto/rand"
"encoding/binary"
"fmt"
rand.Read(req.PeerId[:])
rand.Read(req.InfoHash[:])
wg := sync.WaitGroup{}
- success := make(chan bool)
- fail := make(chan struct{})
+ ctx, cancel := context.WithCancel(context.Background())
for _, url := range trackers {
wg.Add(1)
go func(url string) {
resp, err := Announce{
TrackerUrl: url,
Request: req,
+ Context: ctx,
}.Do()
if err != nil {
t.Logf("error announcing to %s: %s", url, err)
t.Fatal(resp)
}
t.Logf("announced to %s", url)
- // TODO: Can probably get stuck here, but it's just a throwaway
- // test.
- success <- true
+ cancel()
}(url)
}
- go func() {
- wg.Wait()
- close(fail)
- }()
- select {
- case <-fail:
- // It doesn't matter if they all fail, the servers could just be down.
- case <-success:
- // Bail as quickly as we can. One success is enough.
- }
+ wg.Wait()
}
// Check that URLPath option is done correctly.