"github.com/anacrolix/missinggo/v2/pubsub"
"github.com/anacrolix/multiless"
"github.com/anacrolix/sync"
- request_strategy "github.com/anacrolix/torrent/request-strategy"
- typedRoaring "github.com/anacrolix/torrent/typed-roaring"
"github.com/davecgh/go-spew/spew"
"github.com/pion/datachannel"
"github.com/anacrolix/torrent/common"
"github.com/anacrolix/torrent/metainfo"
pp "github.com/anacrolix/torrent/peer_protocol"
+ request_strategy "github.com/anacrolix/torrent/request-strategy"
"github.com/anacrolix/torrent/segments"
"github.com/anacrolix/torrent/storage"
"github.com/anacrolix/torrent/tracker"
+ typedRoaring "github.com/anacrolix/torrent/typed-roaring"
"github.com/anacrolix/torrent/webseed"
"github.com/anacrolix/torrent/webtorrent"
)
userOnWriteChunkErr func(error)
closed chansync.SetOnce
+ onClose []func()
infoHash metainfo.Hash
pieces []Piece
chunkPool sync.Pool
// Total length of the torrent in bytes. Stored because it's not O(1) to
// get this from the info dict.
- length *int64
+ _length Option[int64]
// The storage to open when the info dict becomes available.
storageOpener *storage.Client
connsWithAllPieces map[*Peer]struct{}
- requestState []requestState
+ requestState map[RequestIndex]requestState
// Chunks we've written to since the corresponding piece was last checked.
dirtyChunks typedRoaring.Bitmap[RequestIndex]
requestIndexes []RequestIndex
}
+func (t *Torrent) length() int64 {
+ return t._length.Value
+}
+
func (t *Torrent) selectivePieceAvailabilityFromPeers(i pieceIndex) (count int) {
// This could be done with roaring.BitSliceIndexing.
t.iterPeers(func(peer *Peer) {
for _, f := range t.info.UpvertedFiles() {
l += f.Length
}
- t.length = &l
+ t._length = Some(l)
}
// TODO: This shouldn't fail for storage reasons. Instead we should handle storage failure
func (t *Torrent) onSetInfo() {
t.pieceRequestOrder = rand.Perm(t.numPieces())
t.initPieceRequestOrder()
- MakeSliceWithLength(&t.requestState, t.numChunks())
MakeSliceWithLength(&t.requestPieceStates, t.numPieces())
for i := range t.pieces {
p := &t.pieces[i]
t.cl.event.Broadcast()
close(t.gotMetainfoC)
t.updateWantPeersEvent()
+ t.requestState = make(map[RequestIndex]requestState)
t.tryCreateMorePieceHashers()
t.iterPeers(func(p *Peer) {
p.onGotInfo(t.info)
return
}
if uint32(size) > maxMetadataSize {
- return errors.New("bad size")
+ return log.WithLevel(log.Warning, errors.New("bad size"))
}
if len(t.metadataBytes) == size {
return
err = errors.New("already closed")
return
}
+ for _, f := range t.onClose {
+ f()
+ }
if t.storage != nil {
wg.Add(1)
go func() {
}
func (t *Torrent) requestOffset(r Request) int64 {
- return torrentRequestOffset(*t.length, int64(t.usualPieceSize()), r)
+ return torrentRequestOffset(t.length(), int64(t.usualPieceSize()), r)
}
// Return the request that would include the given offset into the torrent data. Returns !ok if
// there is no such request.
func (t *Torrent) offsetRequest(off int64) (req Request, ok bool) {
- return torrentOffsetRequest(*t.length, t.info.PieceLength, int64(t.chunkSize), off)
+ return torrentOffsetRequest(t.length(), t.info.PieceLength, int64(t.chunkSize), off)
}
func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
return 0
}
if piece == t.numPieces()-1 {
- ret := pp.Integer(*t.length % t.info.PieceLength)
+ ret := pp.Integer(t.length() % t.info.PieceLength)
if ret != 0 {
return ret
}
// Returns the range of pieces [begin, end) that contains the extent of bytes.
func (t *Torrent) byteRegionPieces(off, size int64) (begin, end pieceIndex) {
- if off >= *t.length {
+ if off >= t.length() {
return
}
if off < 0 {
if !t.haveInfo() {
return 0
}
- return *t.length - t.bytesLeft()
+ return t.length() - t.bytesLeft()
}
func (t *Torrent) SetInfoBytes(b []byte) (err error) {
DataChannelContext: dcc,
}
peerRemoteAddr := netConn.RemoteAddr()
+ //t.logger.Levelf(log.Critical, "onWebRtcConn remote addr: %v", peerRemoteAddr)
if t.cl.badPeerAddr(peerRemoteAddr) {
return
}
+ localAddrIpPort := missinggo.IpPortFromNetAddr(netConn.LocalAddr())
pc, err := t.cl.initiateProtocolHandshakes(
context.Background(),
netConn,
t,
- dcc.LocalOffered,
false,
- netConn.RemoteAddr(),
- webrtcNetwork,
- fmt.Sprintf("webrtc offer_id %x: %v", dcc.OfferId, regularNetConnPeerConnConnString(netConn)),
+ newConnectionOpts{
+ outgoing: dcc.LocalOffered,
+ remoteAddr: peerRemoteAddr,
+ localPublicAddr: localAddrIpPort,
+ network: webrtcNetwork,
+ connString: fmt.Sprintf("webrtc offer_id %x: %v", dcc.OfferId, regularNetConnPeerConnConnString(netConn)),
+ },
)
if err != nil {
t.logger.WithDefaultLevel(log.Error).Printf("error in handshaking webrtc connection: %v", err)
defer t.cl.unlock()
err = t.cl.runHandshookConn(pc, t)
if err != nil {
- t.logger.WithDefaultLevel(log.Critical).Printf("error running handshook webrtc conn: %v", err)
+ t.logger.WithDefaultLevel(log.Debug).Printf("error running handshook webrtc conn: %v", err)
}
}
func (t *Torrent) logRunHandshookConn(pc *PeerConn, logAll bool, level log.Level) {
err := t.cl.runHandshookConn(pc, t)
if err != nil || logAll {
- t.logger.WithDefaultLevel(level).Printf("error running handshook conn: %v", err)
+ t.logger.WithDefaultLevel(level).Levelf(log.ErrorLevel(err), "error running handshook conn: %v", err)
}
}
}
func (t *Torrent) startWebsocketAnnouncer(u url.URL) torrentTrackerAnnouncer {
- wtc, release := t.cl.websocketTrackers.Get(u.String())
- go func() {
- <-t.closed.Done()
- release()
- }()
+ wtc, release := t.cl.websocketTrackers.Get(u.String(), t.infoHash)
+ // This needs to run before the Torrent is dropped from the Client, to prevent a new webtorrent.TrackerClient for
+ // the same info hash before the old one is cleaned up.
+ t.onClose = append(t.onClose, release)
wst := websocketTrackerStatus{u, wtc}
go func() {
err := wtc.Announce(tracker.Started, t.infoHash)
Event: event,
NumWant: func() int32 {
if t.wantPeers() && len(t.cl.dialers) > 0 {
- return -1
+ return 200 // Win has UDP packet limit. See: https://github.com/anacrolix/torrent/issues/764
} else {
return 0
}
c._stats.incrementPiecesDirtiedGood()
}
t.clearPieceTouchers(piece)
+ hasDirty := p.hasDirtyChunks()
t.cl.unlock()
+ if hasDirty {
+ p.Flush() // You can be synchronous here!
+ }
err := p.Storage().MarkComplete()
if err != nil {
t.logger.Printf("%T: error marking piece complete %d: %s", t.storage, piece, err)
t.iterPeers(func(p *Peer) {
remoteIp := p.remoteIp()
if remoteIp == nil {
- if p.bannableAddr.Ok() {
+ if p.bannableAddr.Ok {
t.logger.WithDefaultLevel(log.Debug).Printf("can't get remote ip for peer %v", p)
}
return
}
// TODO: This is a check that an old invariant holds. It can be removed after some testing.
//delete(t.pendingRequests, r)
- var zeroRequestState requestState
- if t.requestState[r] != zeroRequestState {
+ if _, ok := t.requestState[r]; ok {
panic("expected request state to be gone")
}
return p