}
cl = &Client{}
cl.init(cfg)
- go cl.acceptLimitClearer()
- cl.initLogger()
- //cl.logger.Levelf(log.Critical, "test after init")
+ // Belongs after infallible init
defer func() {
if err != nil {
cl.Close()
cl = nil
}
}()
+ // Infallible init. Belongs in separate function.
+ {
+ go cl.acceptLimitClearer()
+ cl.initLogger()
+ //cl.logger.Levelf(log.Critical, "test after init")
+
+ storageImpl := cfg.DefaultStorage
+ if storageImpl == nil {
+ // We'd use mmap by default but HFS+ doesn't support sparse files.
+ storageImplCloser := storage.NewFile(cfg.DataDir)
+ cl.onClose = append(cl.onClose, func() {
+ if err := storageImplCloser.Close(); err != nil {
+ cl.logger.Printf("error closing default storage: %s", err)
+ }
+ })
+ storageImpl = storageImplCloser
+ }
+ cl.defaultStorage = storage.NewClient(storageImpl)
- storageImpl := cfg.DefaultStorage
- if storageImpl == nil {
- // We'd use mmap by default but HFS+ doesn't support sparse files.
- storageImplCloser := storage.NewFile(cfg.DataDir)
- cl.onClose = append(cl.onClose, func() {
- if err := storageImplCloser.Close(); err != nil {
- cl.logger.Printf("error closing default storage: %s", err)
+ if cfg.PeerID != "" {
+ missinggo.CopyExact(&cl.peerID, cfg.PeerID)
+ } else {
+ o := copy(cl.peerID[:], cfg.Bep20)
+ _, err = rand.Read(cl.peerID[o:])
+ if err != nil {
+ panic("error generating peer id")
}
- })
- storageImpl = storageImplCloser
- }
- cl.defaultStorage = storage.NewClient(storageImpl)
+ }
- if cfg.PeerID != "" {
- missinggo.CopyExact(&cl.peerID, cfg.PeerID)
- } else {
- o := copy(cl.peerID[:], cfg.Bep20)
- _, err = rand.Read(cl.peerID[o:])
- if err != nil {
- panic("error generating peer id")
+ cl.websocketTrackers = websocketTrackers{
+ PeerId: cl.peerID,
+ Logger: cl.logger.WithNames("websocketTrackers"),
+ GetAnnounceRequest: func(
+ event tracker.AnnounceEvent, infoHash [20]byte,
+ ) (
+ tracker.AnnounceRequest, error,
+ ) {
+ cl.lock()
+ defer cl.unlock()
+ t, ok := cl.torrentsByShortHash[infoHash]
+ if !ok {
+ return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
+ }
+ return t.announceRequest(event, infoHash), nil
+ },
+ Proxy: cl.config.HTTPProxy,
+ WebsocketTrackerHttpHeader: cl.config.WebsocketTrackerHttpHeader,
+ ICEServers: cl.ICEServers(),
+ DialContext: cl.config.TrackerDialContext,
+ callbacks: &cl.config.Callbacks,
+ OnConn: func(dc webtorrent.DataChannelConn, dcc webtorrent.DataChannelContext) {
+ cl.lock()
+ defer cl.unlock()
+ t, ok := cl.torrentsByShortHash[dcc.InfoHash]
+ if !ok {
+ cl.logger.WithDefaultLevel(log.Warning).Printf(
+ "got webrtc conn for unloaded torrent with infohash %x",
+ dcc.InfoHash,
+ )
+ dc.Close()
+ return
+ }
+ go t.onWebRtcConn(dc, dcc)
+ },
}
- }
+ cl.webseedRequestTimer = time.AfterFunc(webseedRequestUpdateTimerInterval, cl.updateWebseedRequestsTimerFunc)
+ }
builtinListenNetworks := cl.listenNetworks()
sockets, err := listenAll(
builtinListenNetworks,
}
}
- cl.websocketTrackers = websocketTrackers{
- PeerId: cl.peerID,
- Logger: cl.logger.WithNames("websocketTrackers"),
- GetAnnounceRequest: func(
- event tracker.AnnounceEvent, infoHash [20]byte,
- ) (
- tracker.AnnounceRequest, error,
- ) {
- cl.lock()
- defer cl.unlock()
- t, ok := cl.torrentsByShortHash[infoHash]
- if !ok {
- return tracker.AnnounceRequest{}, errors.New("torrent not tracked by client")
- }
- return t.announceRequest(event, infoHash), nil
- },
- Proxy: cl.config.HTTPProxy,
- WebsocketTrackerHttpHeader: cl.config.WebsocketTrackerHttpHeader,
- ICEServers: cl.ICEServers(),
- DialContext: cl.config.TrackerDialContext,
- callbacks: &cl.config.Callbacks,
- OnConn: func(dc webtorrent.DataChannelConn, dcc webtorrent.DataChannelContext) {
- cl.lock()
- defer cl.unlock()
- t, ok := cl.torrentsByShortHash[dcc.InfoHash]
- if !ok {
- cl.logger.WithDefaultLevel(log.Warning).Printf(
- "got webrtc conn for unloaded torrent with infohash %x",
- dcc.InfoHash,
- )
- dc.Close()
- return
- }
- go t.onWebRtcConn(dc, dcc)
- },
- }
-
- cl.webseedRequestTimer = time.AfterFunc(webseedRequestUpdateTimerInterval, cl.updateWebseedRequestsTimerFunc)
-
err = cl.checkConfig()
return
}
// Stops the client. All connections to peers are closed and all activity will come to a halt.
func (cl *Client) Close() (errs []error) {
+ // Close atomically, allow systems to break early if we're contended on the Client lock.
+ cl.closed.Set()
+ cl.webseedRequestTimer.Stop()
var closeGroup sync.WaitGroup // For concurrent cleanup to complete before returning
cl.lock()
for t := range cl.torrents {
for i := range cl.onClose {
cl.onClose[len(cl.onClose)-1-i]()
}
- cl.closed.Set()
cl.unlock()
cl.event.Broadcast()
closeGroup.Wait() // defer is LIFO. We want to Wait() after cl.unlock()
"testing"
g "github.com/anacrolix/generics"
- qt "github.com/go-quicktest/qt"
+ "github.com/go-quicktest/qt"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
pc.peerSentBitfield([]bool{false, false, true, false, true, true, false, false})
qt.Check(t, qt.Equals(pc.peerMinPieces, 6))
qt.Check(t, qt.HasLen(pc.t.connsWithAllPieces, 0))
- qt.Assert(t, qt.IsNil(pc.t.setInfo(&metainfo.Info{
- PieceLength: 0,
+ qt.Assert(t, qt.IsNil(pc.t.setInfoUnlocked(&metainfo.Info{
+ Name: "herp",
+ Length: 7,
+ PieceLength: 1,
Pieces: make([]byte, pieceHash.Size()*7),
})))
- pc.t.onSetInfo()
qt.Check(t, qt.Equals(tt.numPieces(), 7))
qt.Check(t, qt.DeepEquals(tt.pieceAvailabilityRuns(), []pieceAvailabilityRun{
// The last element of the bitfield is irrelevant, as the Torrent actually only has 7
g "github.com/anacrolix/generics"
"github.com/anacrolix/missinggo/v2"
"github.com/anacrolix/missinggo/v2/bitmap"
- qt "github.com/go-quicktest/qt"
+ "github.com/go-quicktest/qt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
func TestRelativeAvailabilityHaveNone(t *testing.T) {
var err error
cl := newTestingClient(t)
- mi, info := testutil.Greeting.Generate(5)
+ mi, _ := testutil.Greeting.Generate(5)
tt := cl.newTorrentOpt(AddTorrentOpts{InfoHash: mi.HashInfoBytes()})
tt.setChunkSize(2)
g.MakeMapIfNil(&tt.conns)
pc.legacyPeerImpl = &pc
pc.initRequestState()
g.InitNew(&pc.callbacks)
+ tt.cl.lock()
tt.conns[&pc] = struct{}{}
err = pc.peerSentHave(0)
+ tt.cl.unlock()
qt.Assert(t, qt.IsNil(err))
- err = tt.setInfo(&info)
+ err = tt.SetInfoBytes(mi.InfoBytes)
qt.Assert(t, qt.IsNil(err))
- tt.onSetInfo()
+ tt.cl.lock()
err = pc.peerSentHaveNone()
+ tt.cl.unlock()
qt.Assert(t, qt.IsNil(err))
var wg sync.WaitGroup
tt.close(&wg)