import (
"bytes"
"container/heap"
- "context"
"crypto/sha1"
"errors"
"fmt"
"io"
+ "math/rand"
"net/netip"
"net/url"
"sort"
"github.com/anacrolix/missinggo/v2/pubsub"
"github.com/anacrolix/multiless"
"github.com/anacrolix/sync"
- request_strategy "github.com/anacrolix/torrent/request-strategy"
- typedRoaring "github.com/anacrolix/torrent/typed-roaring"
"github.com/davecgh/go-spew/spew"
- "github.com/pion/datachannel"
"github.com/anacrolix/torrent/bencode"
"github.com/anacrolix/torrent/common"
"github.com/anacrolix/torrent/metainfo"
pp "github.com/anacrolix/torrent/peer_protocol"
+ request_strategy "github.com/anacrolix/torrent/request-strategy"
"github.com/anacrolix/torrent/segments"
"github.com/anacrolix/torrent/storage"
"github.com/anacrolix/torrent/tracker"
+ typedRoaring "github.com/anacrolix/torrent/typed-roaring"
"github.com/anacrolix/torrent/webseed"
- "github.com/anacrolix/torrent/webtorrent"
)
// Maintains state of torrent within a Client. Many methods should not be called before the info is
userOnWriteChunkErr func(error)
closed chansync.SetOnce
+ onClose []func()
infoHash metainfo.Hash
pieces []Piece
+
+ // The order pieces are requested if there's no stronger reason like availability or priority.
+ pieceRequestOrder []int
// Values are the piece indices that changed.
pieceStateChanges pubsub.PubSub[PieceStateChange]
// The size of chunks to request from peers over the wire. This is
chunkPool sync.Pool
// Total length of the torrent in bytes. Stored because it's not O(1) to
// get this from the info dict.
- length *int64
+ _length Option[int64]
// The storage to open when the info dict becomes available.
storageOpener *storage.Client
connsWithAllPieces map[*Peer]struct{}
- requestState []requestState
+ requestState map[RequestIndex]requestState
// Chunks we've written to since the corresponding piece was last checked.
dirtyChunks typedRoaring.Bitmap[RequestIndex]
requestIndexes []RequestIndex
}
+func (t *Torrent) length() int64 {
+ return t._length.Value
+}
+
func (t *Torrent) selectivePieceAvailabilityFromPeers(i pieceIndex) (count int) {
// This could be done with roaring.BitSliceIndexing.
t.iterPeers(func(peer *Peer) {
beginFile := pieceFirstFileIndex(piece.torrentBeginOffset(), files)
endFile := pieceEndFileIndex(piece.torrentEndOffset(), files)
piece.files = files[beginFile:endFile]
- piece.undirtiedChunksIter = undirtiedChunksIter{
- TorrentDirtyChunks: &t.dirtyChunks,
- StartRequestIndex: piece.requestIndexOffset(),
- EndRequestIndex: piece.requestIndexOffset() + piece.numChunks(),
- }
}
}
for _, f := range t.info.UpvertedFiles() {
l += f.Length
}
- t.length = &l
+ t._length = Some(l)
}
// TODO: This shouldn't fail for storage reasons. Instead we should handle storage failure
// This seems to be all the follow-up tasks after info is set, that can't fail.
func (t *Torrent) onSetInfo() {
+ t.pieceRequestOrder = rand.Perm(t.numPieces())
t.initPieceRequestOrder()
- MakeSliceWithLength(&t.requestState, t.numChunks())
MakeSliceWithLength(&t.requestPieceStates, t.numPieces())
for i := range t.pieces {
p := &t.pieces[i]
t.cl.event.Broadcast()
close(t.gotMetainfoC)
t.updateWantPeersEvent()
+ t.requestState = make(map[RequestIndex]requestState)
t.tryCreateMorePieceHashers()
t.iterPeers(func(p *Peer) {
p.onGotInfo(t.info)
return
}
if uint32(size) > maxMetadataSize {
- return errors.New("bad size")
+ return log.WithLevel(log.Warning, errors.New("bad size"))
}
if len(t.metadataBytes) == size {
return
}
return worseConn(i, j)
})
+ var buf bytes.Buffer
for i, c := range peers {
fmt.Fprintf(w, "%2d. ", i+1)
- c.writeStatus(w, t)
+ buf.Reset()
+ c.writeStatus(&buf, t)
+ w.Write(bytes.TrimRight(
+ bytes.ReplaceAll(buf.Bytes(), []byte("\n"), []byte("\n ")),
+ " "))
}
}
}
func (t *Torrent) numPieces() pieceIndex {
- return pieceIndex(t.info.NumPieces())
+ return t.info.NumPieces()
}
func (t *Torrent) numPiecesCompleted() (num pieceIndex) {
err = errors.New("already closed")
return
}
+ for _, f := range t.onClose {
+ f()
+ }
if t.storage != nil {
wg.Add(1)
go func() {
}
func (t *Torrent) requestOffset(r Request) int64 {
- return torrentRequestOffset(*t.length, int64(t.usualPieceSize()), r)
+ return torrentRequestOffset(t.length(), int64(t.usualPieceSize()), r)
}
// Return the request that would include the given offset into the torrent data. Returns !ok if
// there is no such request.
func (t *Torrent) offsetRequest(off int64) (req Request, ok bool) {
- return torrentOffsetRequest(*t.length, t.info.PieceLength, int64(t.chunkSize), off)
+ return torrentOffsetRequest(t.length(), t.info.PieceLength, int64(t.chunkSize), off)
}
func (t *Torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
return 0
}
if piece == t.numPieces()-1 {
- ret := pp.Integer(*t.length % t.info.PieceLength)
+ ret := pp.Integer(t.length() % t.info.PieceLength)
if ret != 0 {
return ret
}
// Returns the range of pieces [begin, end) that contains the extent of bytes.
func (t *Torrent) byteRegionPieces(off, size int64) (begin, end pieceIndex) {
- if off >= *t.length {
+ if off >= t.length() {
return
}
if off < 0 {
if !t.haveInfo() {
return 0
}
- return *t.length - t.bytesLeft()
+ return t.length() - t.bytesLeft()
}
func (t *Torrent) SetInfoBytes(b []byte) (err error) {
return true
}
-func (t *Torrent) onWebRtcConn(
- c datachannel.ReadWriteCloser,
- dcc webtorrent.DataChannelContext,
-) {
- defer c.Close()
- netConn := webrtcNetConn{
- ReadWriteCloser: c,
- DataChannelContext: dcc,
- }
- peerRemoteAddr := netConn.RemoteAddr()
- if t.cl.badPeerAddr(peerRemoteAddr) {
- return
- }
- pc, err := t.cl.initiateProtocolHandshakes(
- context.Background(),
- netConn,
- t,
- dcc.LocalOffered,
- false,
- netConn.RemoteAddr(),
- webrtcNetwork,
- fmt.Sprintf("webrtc offer_id %x: %v", dcc.OfferId, regularNetConnPeerConnConnString(netConn)),
- )
- if err != nil {
- t.logger.WithDefaultLevel(log.Error).Printf("error in handshaking webrtc connection: %v", err)
- return
- }
- if dcc.LocalOffered {
- pc.Discovery = PeerSourceTracker
- } else {
- pc.Discovery = PeerSourceIncoming
- }
- pc.conn.SetWriteDeadline(time.Time{})
- t.cl.lock()
- defer t.cl.unlock()
- err = t.cl.runHandshookConn(pc, t)
- if err != nil {
- t.logger.WithDefaultLevel(log.Critical).Printf("error running handshook webrtc conn: %v", err)
- }
-}
-
func (t *Torrent) logRunHandshookConn(pc *PeerConn, logAll bool, level log.Level) {
err := t.cl.runHandshookConn(pc, t)
if err != nil || logAll {
- t.logger.WithDefaultLevel(level).Printf("error running handshook conn: %v", err)
+ t.logger.WithDefaultLevel(level).Levelf(log.ErrorLevel(err), "error running handshook conn: %v", err)
}
}
t.logRunHandshookConn(pc, false, log.Debug)
}
-func (t *Torrent) startWebsocketAnnouncer(u url.URL) torrentTrackerAnnouncer {
- wtc, release := t.cl.websocketTrackers.Get(u.String())
- go func() {
- <-t.closed.Done()
- release()
- }()
- wst := websocketTrackerStatus{u, wtc}
- go func() {
- err := wtc.Announce(tracker.Started, t.infoHash)
- if err != nil {
- t.logger.WithDefaultLevel(log.Warning).Printf(
- "error in initial announce to %q: %v",
- u.String(), err,
- )
- }
- }()
- return wst
-}
-
func (t *Torrent) startScrapingTracker(_url string) {
if _url == "" {
return
}
sl := func() torrentTrackerAnnouncer {
switch u.Scheme {
- case "ws", "wss":
- if t.cl.config.DisableWebtorrent {
- return nil
- }
- return t.startWebsocketAnnouncer(*u)
case "udp4":
if t.cl.config.DisableIPv4Peers || t.cl.config.DisableIPv4 {
return nil
Event: event,
NumWant: func() int32 {
if t.wantPeers() && len(t.cl.dialers) > 0 {
- return -1
+ return 200 // Win has UDP packet limit. See: https://github.com/anacrolix/torrent/issues/764
} else {
return 0
}
c._stats.incrementPiecesDirtiedGood()
}
t.clearPieceTouchers(piece)
+ hasDirty := p.hasDirtyChunks()
t.cl.unlock()
+ if hasDirty {
+ p.Flush() // You can be synchronous here!
+ }
err := p.Storage().MarkComplete()
if err != nil {
t.logger.Printf("%T: error marking piece complete %d: %s", t.storage, piece, err)
t.iterPeers(func(p *Peer) {
remoteIp := p.remoteIp()
if remoteIp == nil {
- if p.bannableAddr.Ok() {
+ if p.bannableAddr.Ok {
t.logger.WithDefaultLevel(log.Debug).Printf("can't get remote ip for peer %v", p)
}
return
},
},
activeRequests: make(map[Request]webseed.Request, maxRequests),
- maxRequests: maxRequests,
}
ws.peer.initRequestState()
for _, opt := range opts {
}
// TODO: This is a check that an old invariant holds. It can be removed after some testing.
//delete(t.pendingRequests, r)
- var zeroRequestState requestState
- if t.requestState[r] != zeroRequestState {
+ if _, ok := t.requestState[r]; ok {
panic("expected request state to be gone")
}
return p
return pieceIndex(ri / t.chunksPerRegularPiece())
}
+func (t *Torrent) iterUndirtiedRequestIndexesInPiece(
+ reuseIter *typedRoaring.Iterator[RequestIndex],
+ piece pieceIndex,
+ f func(RequestIndex),
+) {
+ reuseIter.Initialize(&t.dirtyChunks)
+ pieceRequestIndexOffset := t.pieceRequestIndexOffset(piece)
+ iterBitmapUnsetInRange(
+ reuseIter,
+ pieceRequestIndexOffset, pieceRequestIndexOffset+t.pieceNumChunks(piece),
+ f,
+ )
+}
+
type requestState struct {
peer *Peer
when time.Time
}
+
+// Returns an error if a received chunk is out of bounds in someway.
+func (t *Torrent) checkValidReceiveChunk(r Request) error {
+ if !t.haveInfo() {
+ return errors.New("torrent missing info")
+ }
+ if int(r.Index) >= t.numPieces() {
+ return fmt.Errorf("chunk index %v, torrent num pieces %v", r.Index, t.numPieces())
+ }
+ pieceLength := t.pieceLength(pieceIndex(r.Index))
+ if r.Begin >= pieceLength {
+ return fmt.Errorf("chunk begins beyond end of piece (%v >= %v)", r.Begin, pieceLength)
+ }
+ // We could check chunk lengths here, but chunk request size is not changed often, and tricky
+ // for peers to manipulate as they need to send potentially large buffers to begin with. There
+ // should be considerable checks elsewhere for this case due to the network overhead. We should
+ // catch most of the overflow manipulation stuff by checking index and begin above.
+ return nil
+}