"fmt"
"io"
"net"
+ "net/http"
"strconv"
"strings"
"time"
infoHash: ih,
peers: prioritizedPeers{
om: btree.New(32),
- getPrio: func(p Peer) peerPriority {
+ getPrio: func(p PeerInfo) peerPriority {
return bep40PriorityIgnoreError(cl.publicAddr(addrIpOrNil(p.Addr)), p.addr())
},
},
conns: make(map[*PeerConn]struct{}, 2*cl.config.EstablishedConnsPerTorrent),
- halfOpen: make(map[string]Peer),
+ halfOpen: make(map[string]PeerInfo),
pieceStateChanges: pubsub.NewPubSub(),
storageOpener: storageClient,
metadataChanged: sync.Cond{
L: cl.locker(),
},
+ webSeeds: make(map[string]*peer),
}
t._pendingPieces.NewSet = priorityBitmapStableNewSet
t.requestStrategy = cl.config.DefaultRequestStrategy(t.requestStrategyCallbacks(), &cl._mu)
return
}
-// Add or merge a torrent spec. If the torrent is already present, the
-// trackers will be merged with the existing ones. If the Info isn't yet
-// known, it will be set. The display name is replaced if the new spec
-// provides one. Returns new if the torrent wasn't already in the client.
-// Note that any `Storage` defined on the spec will be ignored if the
-// torrent is already present (i.e. `new` return value is `true`)
+// Add or merge a torrent spec. Returns new if the torrent wasn't already in the client. See also
+// Torrent.MergeSpec.
func (cl *Client) AddTorrentSpec(spec *TorrentSpec) (t *Torrent, new bool, err error) {
t, new = cl.AddTorrentInfoHashWithStorage(spec.InfoHash, spec.Storage)
+ err = t.MergeSpec(spec)
+ return
+}
+
+// The trackers will be merged with the existing ones. If the Info isn't yet known, it will be set.
+// The display name is replaced if the new spec provides one. Note that any `Storage` is ignored.
+func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
if spec.DisplayName != "" {
t.SetDisplayName(spec.DisplayName)
}
if spec.InfoBytes != nil {
- err = t.SetInfoBytes(spec.InfoBytes)
+ err := t.SetInfoBytes(spec.InfoBytes)
if err != nil {
- return
+ return err
}
}
+ cl := t.cl
+ cl.AddDHTNodes(spec.DhtNodes)
cl.lock()
defer cl.unlock()
+ useTorrentSources(spec.Sources, t)
+ for _, url := range spec.Webseeds {
+ t.addWebSeed(url)
+ }
if spec.ChunkSize != 0 {
t.setChunkSize(pp.Integer(spec.ChunkSize))
}
t.addTrackers(spec.Trackers)
t.maybeNewConns()
- return
+ return nil
+}
+
+func useTorrentSources(sources []string, t *Torrent) {
+ for _, s := range sources {
+ go func(s string) {
+ err := useTorrentSource(s, t)
+ if err != nil {
+ t.logger.WithDefaultLevel(log.Warning).Printf("using torrent source %q: %v", s, err)
+ } else {
+ t.logger.Printf("successfully used source %q", s)
+ }
+ }(s)
+ }
+}
+
+func useTorrentSource(source string, t *Torrent) error {
+ req, err := http.NewRequest(http.MethodGet, source, nil)
+ if err != nil {
+ panic(err)
+ }
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ go func() {
+ select {
+ case <-t.GotInfo():
+ case <-t.Closed():
+ case <-ctx.Done():
+ }
+ cancel()
+ }()
+ req = req.WithContext(ctx)
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ if ctx.Err() != nil {
+ return nil
+ }
+ return err
+ }
+ mi, err := metainfo.Load(resp.Body)
+ if err != nil {
+ if ctx.Err() != nil {
+ return nil
+ }
+ return err
+ }
+ return t.MergeSpec(TorrentSpecFromMetaInfo(mi))
}
func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
- var ss []string
- slices.MakeInto(&ss, mi.Nodes)
- cl.AddDHTNodes(ss)
return
}
func (cl *Client) newConnection(nc net.Conn, outgoing bool, remoteAddr net.Addr, network, connString string) (c *PeerConn) {
c = &PeerConn{
- conn: nc,
- outgoing: outgoing,
- choking: true,
- peerChoking: true,
- PeerMaxRequests: 250,
- writeBuffer: new(bytes.Buffer),
- remoteAddr: remoteAddr,
- network: network,
- connString: connString,
+ peer: peer{
+ outgoing: outgoing,
+ choking: true,
+ peerChoking: true,
+ PeerMaxRequests: 250,
+
+ remoteAddr: remoteAddr,
+ network: network,
+ connString: connString,
+ },
+ conn: nc,
+ writeBuffer: new(bytes.Buffer),
}
+ c.peerImpl = c
c.logger = cl.logger.WithValues(c).WithDefaultLevel(log.Debug).WithText(func(m log.Msg) string {
return fmt.Sprintf("%v: %s", c, m.Text())
})
if t == nil {
return
}
- t.addPeers([]Peer{{
+ t.addPeers([]PeerInfo{{
Addr: ipPortAddr{ip, port},
Source: PeerSourceDhtAnnouncePeer,
}})
require.NoError(t, err)
assert.True(t, _new)
defer tt.Drop()
- cn := &PeerConn{
+ cn := &PeerConn{peer: peer{
t: tt,
- }
+ }}
assert.NoError(t, cn.peerSentHave(0))
assert.Error(t, cn.peerSentHave(1))
}
// fmt.Fprintf(os.Stderr, "Usage: %s \n", os.Args[0])
-func resolvedPeerAddrs(ss []string) (ret []torrent.Peer, err error) {
+func resolvedPeerAddrs(ss []string) (ret []torrent.PeerInfo, err error) {
for _, s := range ss {
var addr *net.TCPAddr
addr, err = net.ResolveTCPAddr("tcp", s)
if err != nil {
return
}
- ret = append(ret, torrent.Peer{
+ ret = append(ret, torrent.PeerInfo{
Addr: addr,
})
}
}
span.Append(mm)
}
+ span.InitIndex()
for i := range iter.N(info.NumPieces()) {
p := info.Piece(i)
hash := sha1.New()
if !good {
return fmt.Errorf("hash mismatch at piece %d", i)
}
- fmt.Printf("%d: %x: %v\n", i, p.Hash(), good)
+ fmt.Printf("%d: %v: %v\n", i, p.Hash(), good)
}
return nil
}
if flags.Progress {
torrentBar(t, flags.PieceStates)
}
- t.AddPeers(func() (ret []torrent.Peer) {
+ t.AddPeers(func() (ret []torrent.PeerInfo) {
for _, ta := range flags.TestPeer {
- ret = append(ret, torrent.Peer{
+ ret = append(ret, torrent.PeerInfo{
Addr: ta,
})
}
PieceStates bool
Quiet bool `help:"discard client logging"`
Dht bool
+
TcpPeers bool
UtpPeers bool
- Ipv4 bool
- Ipv6 bool
- Pex bool
+ Webtorrent bool
+ DisableWebseeds bool
+
+ Ipv4 bool
+ Ipv6 bool
+ Pex bool
+
tagflag.StartPos
+
Torrent []string `arity:"+" help:"torrent file path or magnet uri"`
}{
UploadRate: -1,
DownloadRate: -1,
Progress: true,
Dht: true,
- TcpPeers: true,
- UtpPeers: true,
- Ipv4: true,
- Ipv6: true,
- Pex: true,
+
+ TcpPeers: true,
+ UtpPeers: true,
+ Webtorrent: true,
+
+ Ipv4: true,
+ Ipv6: true,
+ Pex: true,
}
func stdoutAndStderrAreSameFile() bool {
tagflag.ParseArgs(&flags, args, tagflag.Parent(parent))
defer envpprof.Stop()
clientConfig := torrent.NewDefaultClientConfig()
+ clientConfig.DisableWebseeds = flags.DisableWebseeds
clientConfig.DisableTCP = !flags.TcpPeers
clientConfig.DisableUTP = !flags.UtpPeers
clientConfig.DisableIPv4 = !flags.Ipv4
clientConfig.PublicIp4 = flags.PublicIP
clientConfig.PublicIp6 = flags.PublicIP
clientConfig.DisablePEX = !flags.Pex
+ clientConfig.DisableWebtorrent = !flags.Webtorrent
if flags.PackedBlocklist != "" {
blocklist, err := iplist.MMapPackedFile(flags.PackedBlocklist)
if err != nil {
func addTestPeer(client *torrent.Client) {
for _, t := range client.Torrents() {
- t.AddPeers([]torrent.Peer{{
+ t.AddPeers([]torrent.PeerInfo{{
Addr: args.TestPeer,
}})
}
--- /dev/null
+package common
+
+import (
+ "github.com/anacrolix/torrent/metainfo"
+ "github.com/anacrolix/torrent/segments"
+)
+
+func LengthIterFromUpvertedFiles(fis []metainfo.FileInfo) segments.LengthIter {
+ i := 0
+ return func() (segments.Length, bool) {
+ if i == len(fis) {
+ return -1, false
+ }
+ l := fis[i].Length
+ i++
+ return l, true
+ }
+}
DefaultRequestStrategy RequestStrategyMaker
Extensions PeerExtensionBits
+
+ DisableWebtorrent bool
+ DisableWebseeds bool
}
func (cfg *ClientConfig) SetListenAddr(addr string) *ClientConfig {
return f.t.info.Name
}
return strings.Join(fip, "/")
-
}
// The download status of a piece that comprises part of a File.
return ret
}
-func connLessTrusted(l, r *PeerConn) bool {
+func connLessTrusted(l, r *peer) bool {
return l.trust().Less(r.trust())
}
package mmap_span
import (
+ "fmt"
"io"
"log"
"sync"
+ "github.com/anacrolix/torrent/segments"
"github.com/edsrzf/mmap-go"
)
-type segment struct {
- *mmap.MMap
-}
-
-func (s segment) Size() int64 {
- return int64(len(*s.MMap))
-}
-
type MMapSpan struct {
- mu sync.RWMutex
- span
+ mu sync.RWMutex
+ mMaps []mmap.MMap
+ segmentLocater segments.Index
}
-func (ms *MMapSpan) Append(mmap mmap.MMap) {
- ms.span = append(ms.span, segment{&mmap})
+func (ms *MMapSpan) Append(mMap mmap.MMap) {
+ ms.mMaps = append(ms.mMaps, mMap)
}
-func (ms *MMapSpan) Close() error {
+func (ms *MMapSpan) Close() (errs []error) {
ms.mu.Lock()
defer ms.mu.Unlock()
- for _, mMap := range ms.span {
- err := mMap.(segment).Unmap()
+ for _, mMap := range ms.mMaps {
+ err := mMap.Unmap()
if err != nil {
- log.Print(err)
+ errs = append(errs, err)
}
}
- return nil
+ // This is for issue 211.
+ ms.mMaps = nil
+ ms.InitIndex()
+ return
}
-func (ms *MMapSpan) Size() (ret int64) {
+func (me *MMapSpan) InitIndex() {
+ i := 0
+ me.segmentLocater = segments.NewIndex(func() (segments.Length, bool) {
+ if i == len(me.mMaps) {
+ return -1, false
+ }
+ l := int64(len(me.mMaps[i]))
+ i++
+ return l, true
+ })
+ //log.Printf("made mmapspan index: %v", me.segmentLocater)
+}
+
+func (ms *MMapSpan) ReadAt(p []byte, off int64) (n int, err error) {
+ //log.Printf("reading %v bytes at %v", len(p), off)
ms.mu.RLock()
defer ms.mu.RUnlock()
- for _, seg := range ms.span {
- ret += seg.Size()
+ n = ms.locateCopy(func(a, b []byte) (_, _ []byte) { return a, b }, p, off)
+ if n != len(p) {
+ err = io.EOF
}
return
}
-func (ms *MMapSpan) ReadAt(p []byte, off int64) (n int, err error) {
- ms.mu.RLock()
- defer ms.mu.RUnlock()
- ms.ApplyTo(off, func(intervalOffset int64, interval sizer) (stop bool) {
- _n := copy(p, (*interval.(segment).MMap)[intervalOffset:])
+func copyBytes(dst, src []byte) int {
+ return copy(dst, src)
+}
+
+func (ms *MMapSpan) locateCopy(copyArgs func(remainingArgument, mmapped []byte) (dst, src []byte), p []byte, off int64) (n int) {
+ ms.segmentLocater.Locate(segments.Extent{off, int64(len(p))}, func(i int, e segments.Extent) bool {
+ mMapBytes := ms.mMaps[i][e.Start:]
+ //log.Printf("got segment %v: %v, copying %v, %v", i, e, len(p), len(mMapBytes))
+ _n := copyBytes(copyArgs(p, mMapBytes))
p = p[_n:]
n += _n
- return len(p) == 0
+ if segments.Int(_n) != e.Length {
+ panic(fmt.Sprintf("did %d bytes, expected to do %d", _n, e.Length))
+ }
+ return true
})
- if len(p) != 0 {
- err = io.EOF
- }
return
}
func (ms *MMapSpan) WriteAt(p []byte, off int64) (n int, err error) {
+ log.Printf("writing %v bytes at %v", len(p), off)
ms.mu.RLock()
defer ms.mu.RUnlock()
- ms.ApplyTo(off, func(iOff int64, i sizer) (stop bool) {
- mMap := i.(segment)
- _n := copy((*mMap.MMap)[iOff:], p)
- // err = mMap.Sync(gommap.MS_ASYNC)
- // if err != nil {
- // return true
- // }
- p = p[_n:]
- n += _n
- return len(p) == 0
- })
- if err != nil && len(p) != 0 {
+ n = ms.locateCopy(func(a, b []byte) (_, _ []byte) { return b, a }, p, off)
+ if n != len(p) {
err = io.ErrShortWrite
}
return
+++ /dev/null
-package mmap_span
-
-type sizer interface {
- Size() int64
-}
-
-type span []sizer
-
-func (s span) ApplyTo(off int64, f func(int64, sizer) (stop bool)) {
- for _, interval := range s {
- iSize := interval.Size()
- if off >= iSize {
- off -= iSize
- } else {
- if f(off, interval) {
- return
- }
- off = 0
- }
- }
-}
--- /dev/null
+package torrent
+
+import (
+ "github.com/anacrolix/torrent/metainfo"
+)
+
+// Contains implementation details that differ between peer types, like Webseeds and regular
+// BitTorrent protocol connections. Some methods are underlined so as to avoid collisions with
+// legacy PeerConn methods.
+type peerImpl interface {
+ updateRequests()
+ writeInterested(interested bool) bool
+ cancel(request) bool
+ // Return true if there's room for more activity.
+ request(request) bool
+ connectionFlags() string
+ _close()
+ _postCancel(request)
+ onGotInfo(*metainfo.Info)
+ drop()
+}
)
// Peer connection info, handed about publicly.
-type Peer struct {
+type PeerInfo struct {
Id [20]byte
Addr net.Addr
Source PeerSource
Trusted bool
}
-func (me Peer) Equal(other Peer) bool {
+func (me PeerInfo) equal(other PeerInfo) bool {
return me.Id == other.Id &&
me.Addr.String() == other.Addr.String() &&
me.Source == other.Source &&
me.Trusted == other.Trusted
}
-// FromPex generate Peer from peer exchange
-func (me *Peer) FromPex(na krpc.NodeAddr, fs peer_protocol.PexPeerFlags) {
+// Generate PeerInfo from peer exchange
+func (me *PeerInfo) FromPex(na krpc.NodeAddr, fs peer_protocol.PexPeerFlags) {
me.Addr = ipPortAddr{append([]byte(nil), na.IP...), na.Port}
me.Source = PeerSourcePex
// If they prefer encryption, they must support it.
me.PexPeerFlags = fs
}
-func (me Peer) addr() IpPort {
+func (me PeerInfo) addr() IpPort {
return IpPort{IP: addrIpOrNil(me.Addr), Port: uint16(addrPortOrZero(me.Addr))}
}
"github.com/anacrolix/torrent/tracker"
)
-type Peers []Peer
+// Helper-type used to bulk-manage PeerInfos.
+type peerInfos []PeerInfo
-func (me *Peers) AppendFromPex(nas []krpc.NodeAddr, fs []peer_protocol.PexPeerFlags) {
+func (me *peerInfos) AppendFromPex(nas []krpc.NodeAddr, fs []peer_protocol.PexPeerFlags) {
for i, na := range nas {
- var p Peer
+ var p PeerInfo
var f peer_protocol.PexPeerFlags
if i < len(fs) {
f = fs[i]
}
}
-func (ret Peers) AppendFromTracker(ps []tracker.Peer) Peers {
+func (ret peerInfos) AppendFromTracker(ps []tracker.Peer) peerInfos {
for _, p := range ps {
- _p := Peer{
+ _p := PeerInfo{
Addr: ipPortAddr{p.IP, p.Port},
Source: PeerSourceTracker,
}
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/missinggo/v2/prioritybitmap"
"github.com/anacrolix/multiless"
+ "github.com/anacrolix/torrent/metainfo"
"github.com/pkg/errors"
"github.com/anacrolix/torrent/bencode"
PeerSourcePex = "X"
)
-// Maintains the state of a connection with a peer.
-type PeerConn struct {
+type peer struct {
// First to ensure 64-bit alignment for atomics. See #262.
_stats ConnStats
t *Torrent
- // The actual Conn, used for closing, and setting socket options.
- conn net.Conn
+
+ peerImpl
+
connString string
outgoing bool
network string
remoteAddr net.Addr
- // The Reader and Writer for this Conn, with hooks installed for stats,
- // limiting, deadlines etc.
- w io.Writer
- r io.Reader
// True if the connection is operating over MSE obfuscation.
headerEncrypted bool
cryptoMethod mse.CryptoMethod
pieceInclination []int
_pieceRequestOrder prioritybitmap.PriorityBitmap
+ logger log.Logger
+}
+
+// Maintains the state of a connection with a peer.
+type PeerConn struct {
+ peer
+
+ // The actual Conn, used for closing, and setting socket options.
+ conn net.Conn
+ // The Reader and Writer for this Conn, with hooks installed for stats,
+ // limiting, deadlines etc.
+ w io.Writer
+ r io.Reader
+
writeBuffer *bytes.Buffer
uploadTimer *time.Timer
writerCond sync.Cond
-
- logger log.Logger
}
-func (cn *PeerConn) updateExpectingChunks() {
+func (cn *peer) updateExpectingChunks() {
if cn.expectingChunks() {
if cn.lastStartedExpectingToReceiveChunks.IsZero() {
cn.lastStartedExpectingToReceiveChunks = time.Now()
}
}
-func (cn *PeerConn) expectingChunks() bool {
+func (cn *peer) expectingChunks() bool {
return cn.interested && !cn.peerChoking
}
return ml.FinalOk()
}
-func (cn *PeerConn) cumInterest() time.Duration {
+func (cn *peer) cumInterest() time.Duration {
ret := cn.priorInterest
if cn.interested {
ret += time.Since(cn.lastBecameInterested)
return cn.conn.LocalAddr()
}
-func (cn *PeerConn) supportsExtension(ext pp.ExtensionName) bool {
+func (cn *peer) supportsExtension(ext pp.ExtensionName) bool {
_, ok := cn.PeerExtensionIDs[ext]
return ok
}
// The best guess at number of pieces in the torrent for this peer.
-func (cn *PeerConn) bestPeerNumPieces() pieceIndex {
+func (cn *peer) bestPeerNumPieces() pieceIndex {
if cn.t.haveInfo() {
return cn.t.numPieces()
}
return cn.peerMinPieces
}
-func (cn *PeerConn) completedString() string {
+func (cn *peer) completedString() string {
have := pieceIndex(cn._peerPieces.Len())
if cn.peerSentHaveAll {
have = cn.bestPeerNumPieces()
return fmt.Sprintf("%d/%d", have, cn.bestPeerNumPieces())
}
-// Correct the PeerPieces slice length. Return false if the existing slice is
-// invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
-// messages.
-func (cn *PeerConn) setNumPieces(num pieceIndex) error {
+func (cn *PeerConn) onGotInfo(info *metainfo.Info) {
+ cn.setNumPieces(info.NumPieces())
+}
+
+// Correct the PeerPieces slice length. Return false if the existing slice is invalid, such as by
+// receiving badly sized BITFIELD, or invalid HAVE messages.
+func (cn *PeerConn) setNumPieces(num pieceIndex) {
cn._peerPieces.RemoveRange(bitmap.BitIndex(num), bitmap.ToEnd)
cn.peerPiecesChanged()
- return nil
}
func eventAgeString(t time.Time) string {
}
// Inspired by https://github.com/transmission/transmission/wiki/Peer-Status-Text.
-func (cn *PeerConn) statusFlags() (ret string) {
+func (cn *peer) statusFlags() (ret string) {
c := func(b byte) {
ret += string([]byte{b})
}
// return buf.String()
// }
-func (cn *PeerConn) downloadRate() float64 {
+func (cn *peer) downloadRate() float64 {
return float64(cn._stats.BytesReadUsefulData.Int64()) / cn.cumInterest().Seconds()
}
-func (cn *PeerConn) writeStatus(w io.Writer, t *Torrent) {
+func (cn *peer) writeStatus(w io.Writer, t *Torrent) {
// \t isn't preserved in <pre> blocks?
fmt.Fprintf(w, "%+-55q %s %s\n", cn.PeerID, cn.PeerExtensionBytes, cn.connString)
fmt.Fprintf(w, " last msg: %s, connected: %s, last helpful: %s, itime: %s, etime: %s\n",
fmt.Fprintf(w, " next pieces: %v%s\n",
iter.ToSlice(iter.Head(10, cn.iterPendingPiecesUntyped)),
func() string {
- if cn == t.fastestConn {
+ if cn == t.fastestPeer {
return " (fastest)"
} else {
return ""
)
}
-func (cn *PeerConn) close() {
+func (cn *peer) close() {
if !cn.closed.Set() {
return
}
+ cn.discardPieceInclination()
+ cn._pieceRequestOrder.Clear()
+ cn.peerImpl._close()
+}
+
+func (cn *PeerConn) _close() {
if cn.pex.IsEnabled() {
cn.pex.Close()
}
cn.tickleWriter()
- cn.discardPieceInclination()
- cn._pieceRequestOrder.Clear()
if cn.conn != nil {
cn.conn.Close()
}
}
-func (cn *PeerConn) peerHasPiece(piece pieceIndex) bool {
+func (cn *peer) peerHasPiece(piece pieceIndex) bool {
return cn.peerSentHaveAll || cn._peerPieces.Contains(bitmap.BitIndex(piece))
}
cn.tickleWriter()
}
+// Returns true if there's room to write more.
+func (cn *PeerConn) write(msg pp.Message) bool {
+ cn.wroteMsg(&msg)
+ cn.writeBuffer.Write(msg.MustMarshalBinary())
+ torrent.Add(fmt.Sprintf("messages filled of type %s", msg.Type.String()), 1)
+ // 64KiB, but temporarily less to work around an issue with WebRTC. TODO: Update
+ // when https://github.com/pion/datachannel/issues/59 is fixed.
+ return cn.writeBuffer.Len() < 1<<15
+}
+
func (cn *PeerConn) requestMetadataPiece(index int) {
eID := cn.PeerExtensionIDs[pp.ExtensionNameMetadata]
if eID == 0 {
}
// The actual value to use as the maximum outbound requests.
-func (cn *PeerConn) nominalMaxRequests() (ret int) {
+func (cn *peer) nominalMaxRequests() (ret int) {
return int(clamp(
1,
int64(cn.PeerMaxRequests),
))
}
-func (cn *PeerConn) totalExpectingTime() (ret time.Duration) {
+func (cn *peer) totalExpectingTime() (ret time.Duration) {
ret = cn.cumulativeExpectedToReceiveChunks
if !cn.lastStartedExpectingToReceiveChunks.IsZero() {
ret += time.Since(cn.lastStartedExpectingToReceiveChunks)
})
}
-func (cn *PeerConn) setInterested(interested bool, msg func(pp.Message) bool) bool {
+func (cn *peer) setInterested(interested bool) bool {
if cn.interested == interested {
return true
}
}
cn.updateExpectingChunks()
// log.Printf("%p: setting interest: %v", cn, interested)
- return msg(pp.Message{
+ return cn.writeInterested(interested)
+}
+
+func (pc *PeerConn) writeInterested(interested bool) bool {
+ return pc.write(pp.Message{
Type: func() pp.MessageType {
if interested {
return pp.Interested
// are okay.
type messageWriter func(pp.Message) bool
-// Proxies the messageWriter's response.
-func (cn *PeerConn) request(r request, mw messageWriter) bool {
+func (cn *peer) request(r request) bool {
if _, ok := cn.requests[r]; ok {
panic("chunk already requested")
}
if !cn.peerHasPiece(pieceIndex(r.Index)) {
panic("requesting piece peer doesn't have")
}
- if _, ok := cn.t.conns[cn]; !ok {
+ if !cn.t.peerIsActive(cn) {
panic("requesting but not in active conns")
}
if cn.closed.IsSet() {
cn.t.pendingRequests[r]++
cn.t.requestStrategy.hooks().sentRequest(r)
cn.updateExpectingChunks()
- return mw(pp.Message{
+ return cn.peerImpl.request(r)
+}
+
+func (me *PeerConn) request(r request) bool {
+ return me.write(pp.Message{
Type: pp.Request,
Index: r.Index,
Begin: r.Begin,
})
}
-func (cn *PeerConn) fillWriteBuffer(msg func(pp.Message) bool) {
+func (me *PeerConn) cancel(r request) bool {
+ return me.write(makeCancelMessage(r))
+}
+
+func (cn *peer) doRequestState() bool {
if !cn.t.networkingEnabled || cn.t.dataDownloadDisallowed {
- if !cn.setInterested(false, msg) {
- return
+ if !cn.setInterested(false) {
+ return false
}
if len(cn.requests) != 0 {
for r := range cn.requests {
cn.deleteRequest(r)
// log.Printf("%p: cancelling request: %v", cn, r)
- if !msg(makeCancelMessage(r)) {
- return
+ if !cn.peerImpl.cancel(r) {
+ return false
}
}
}
filledBuffer := false
cn.iterPendingPieces(func(pieceIndex pieceIndex) bool {
cn.iterPendingRequests(pieceIndex, func(r request) bool {
- if !cn.setInterested(true, msg) {
+ if !cn.setInterested(true) {
filledBuffer = true
return false
}
if _, ok := cn.requests[r]; ok {
return true
}
- filledBuffer = !cn.request(r, msg)
+ filledBuffer = !cn.request(r)
return !filledBuffer
})
return !filledBuffer
// If we didn't completely top up the requests, we shouldn't mark
// the low water, since we'll want to top up the requests as soon
// as we have more write buffer space.
- return
+ return false
}
cn.requestsLowWater = len(cn.requests) / 2
}
+ return true
+}
+
+func (cn *PeerConn) fillWriteBuffer() {
+ if !cn.doRequestState() {
+ return
+ }
if cn.pex.IsEnabled() {
- if flow := cn.pex.Share(msg); !flow {
+ if flow := cn.pex.Share(cn.write); !flow {
return
}
}
- cn.upload(msg)
+ cn.upload(cn.write)
}
// Routine that writes to the peer. Some of what to write is buffered by
return
}
if cn.writeBuffer.Len() == 0 {
- cn.fillWriteBuffer(func(msg pp.Message) bool {
- cn.wroteMsg(&msg)
- cn.writeBuffer.Write(msg.MustMarshalBinary())
- torrent.Add(fmt.Sprintf("messages filled of type %s", msg.Type.String()), 1)
- // 64KiB, but temporarily less to work around an issue with WebRTC. TODO: Update
- // when https://github.com/pion/datachannel/issues/59 is fixed.
- return cn.writeBuffer.Len() < 1<<15
- })
+ cn.fillWriteBuffer()
}
if cn.writeBuffer.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout {
cn.writeBuffer.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
// conceivable that the best connection should do this, since it's least likely to waste our time if
// assigned to the highest priority pieces, and assigning more than one this role would cause
// significant wasted bandwidth.
-func (cn *PeerConn) shouldRequestWithoutBias() bool {
+func (cn *peer) shouldRequestWithoutBias() bool {
return cn.t.requestStrategy.shouldRequestWithoutBias(cn.requestStrategyConnection())
}
-func (cn *PeerConn) iterPendingPieces(f func(pieceIndex) bool) bool {
+func (cn *peer) iterPendingPieces(f func(pieceIndex) bool) bool {
if !cn.t.haveInfo() {
return false
}
return cn.t.requestStrategy.iterPendingPieces(cn, f)
}
-func (cn *PeerConn) iterPendingPiecesUntyped(f iter.Callback) {
+func (cn *peer) iterPendingPiecesUntyped(f iter.Callback) {
cn.iterPendingPieces(func(i pieceIndex) bool { return f(i) })
}
-func (cn *PeerConn) iterPendingRequests(piece pieceIndex, f func(request) bool) bool {
+func (cn *peer) iterPendingRequests(piece pieceIndex, f func(request) bool) bool {
return cn.t.requestStrategy.iterUndirtiedChunks(
cn.t.piece(piece).requestStrategyPiece(),
func(cs chunkSpec) bool {
}
// check callers updaterequests
-func (cn *PeerConn) stopRequestingPiece(piece pieceIndex) bool {
+func (cn *peer) stopRequestingPiece(piece pieceIndex) bool {
return cn._pieceRequestOrder.Remove(bitmap.BitIndex(piece))
}
// preference. Connection piece priority is specific to a connection and is
// used to pseudorandomly avoid connections always requesting the same pieces
// and thus wasting effort.
-func (cn *PeerConn) updatePiecePriority(piece pieceIndex) bool {
+func (cn *peer) updatePiecePriority(piece pieceIndex) bool {
tpp := cn.t.piecePriority(piece)
if !cn.peerHasPiece(piece) {
tpp = PiecePriorityNone
return cn._pieceRequestOrder.Set(bitmap.BitIndex(piece), prio) || cn.shouldRequestWithoutBias()
}
-func (cn *PeerConn) getPieceInclination() []int {
+func (cn *peer) getPieceInclination() []int {
if cn.pieceInclination == nil {
cn.pieceInclination = cn.t.getConnPieceInclination()
}
return cn.pieceInclination
}
-func (cn *PeerConn) discardPieceInclination() {
+func (cn *peer) discardPieceInclination() {
if cn.pieceInclination == nil {
return
}
// After handshake, we know what Torrent and Client stats to include for a
// connection.
-func (cn *PeerConn) postHandshakeStats(f func(*ConnStats)) {
+func (cn *peer) postHandshakeStats(f func(*ConnStats)) {
t := cn.t
f(&t.stats)
f(&t.cl.stats)
// All ConnStats that include this connection. Some objects are not known
// until the handshake is complete, after which it's expected to reconcile the
// differences.
-func (cn *PeerConn) allStats(f func(*ConnStats)) {
+func (cn *peer) allStats(f func(*ConnStats)) {
f(&cn._stats)
if cn.reconciledHandshakeStats {
cn.postHandshakeStats(f)
// Returns whether the connection could be useful to us. We're seeding and
// they want data, we don't have metainfo and they can provide it, etc.
-func (c *PeerConn) useful() bool {
+func (c *peer) useful() bool {
t := c.t
if c.closed.IsSet() {
return false
return false
}
-func (c *PeerConn) lastHelpful() (ret time.Time) {
+func (c *peer) lastHelpful() (ret time.Time) {
ret = c.lastUsefulChunkReceived
if c.t.seeding() && c.lastChunkSent.After(ret) {
ret = c.lastChunkSent
case pp.HaveNone:
err = c.peerSentHaveNone()
case pp.Reject:
- c.deleteRequest(newRequestFromMessage(&msg))
- c.decExpectedChunkReceive(newRequestFromMessage(&msg))
+ c.remoteRejectedRequest(newRequestFromMessage(&msg))
case pp.AllowedFast:
torrent.Add("allowed fasts received", 1)
log.Fmsg("peer allowed fast: %d", msg.Index).AddValues(c).SetLevel(log.Debug).Log(c.t.logger)
}
}
-func (c *PeerConn) decExpectedChunkReceive(r request) {
+func (c *peer) remoteRejectedRequest(r request) {
+ c.deleteRequest(r)
+ c.decExpectedChunkReceive(r)
+}
+
+func (c *peer) decExpectedChunkReceive(r request) {
count := c.validReceiveChunks[r]
if count == 1 {
delete(c.validReceiveChunks, r)
}
// Handle a received chunk from a peer.
-func (c *PeerConn) receiveChunk(msg *pp.Message) error {
+func (c *peer) receiveChunk(msg *pp.Message) error {
t := c.t
cl := t.cl
torrent.Add("chunks received", 1)
c.allStats(add(1, func(cs *ConnStats) *Count { return &cs.ChunksReadUseful }))
c.allStats(add(int64(len(msg.Piece)), func(cs *ConnStats) *Count { return &cs.BytesReadUsefulData }))
c.lastUsefulChunkReceived = time.Now()
- // if t.fastestConn != c {
+ // if t.fastestPeer != c {
// log.Printf("setting fastest connection %p", c)
// }
- t.fastestConn = c
+ t.fastestPeer = c
// Need to record that it hasn't been written yet, before we attempt to do
// anything with it.
// Cancel pending requests for this chunk.
for c := range t.conns {
- c.postCancel(req)
+ c._postCancel(req)
}
err := func() error {
return nil
}
-func (c *PeerConn) onDirtiedPiece(piece pieceIndex) {
+func (c *peer) onDirtiedPiece(piece pieceIndex) {
if c.peerTouchedPieces == nil {
c.peerTouchedPieces = make(map[pieceIndex]struct{})
}
c.peerTouchedPieces[piece] = struct{}{}
ds := &c.t.pieces[piece].dirtiers
if *ds == nil {
- *ds = make(map[*PeerConn]struct{})
+ *ds = make(map[*peer]struct{})
}
(*ds)[c] = struct{}{}
}
cn.t.dropConnection(cn)
}
-func (cn *PeerConn) netGoodPiecesDirtied() int64 {
+func (cn *peer) netGoodPiecesDirtied() int64 {
return cn._stats.PiecesDirtiedGood.Int64() - cn._stats.PiecesDirtiedBad.Int64()
}
-func (c *PeerConn) peerHasWantedPieces() bool {
+func (c *peer) peerHasWantedPieces() bool {
return !c._pieceRequestOrder.IsEmpty()
}
-func (c *PeerConn) numLocalRequests() int {
+func (c *peer) numLocalRequests() int {
return len(c.requests)
}
-func (c *PeerConn) deleteRequest(r request) bool {
+func (c *peer) deleteRequest(r request) bool {
if _, ok := c.requests[r]; !ok {
return false
}
panic(n)
}
c.updateRequests()
- for _c := range c.t.conns {
+ c.t.iterPeers(func(_c *peer) {
if !_c.interested && _c != c && c.peerHasPiece(pieceIndex(r.Index)) {
_c.updateRequests()
}
- }
+ })
return true
}
-func (c *PeerConn) deleteAllRequests() {
+func (c *peer) deleteAllRequests() {
for r := range c.requests {
c.deleteRequest(r)
}
c.writerCond.Broadcast()
}
-func (c *PeerConn) postCancel(r request) bool {
+func (c *peer) postCancel(r request) bool {
if !c.deleteRequest(r) {
return false
}
- c.post(makeCancelMessage(r))
+ c.peerImpl._postCancel(r)
return true
}
+func (c *PeerConn) _postCancel(r request) {
+ c.post(makeCancelMessage(r))
+}
+
func (c *PeerConn) sendChunk(r request, msg func(pp.Message) bool) (more bool, err error) {
// Count the chunk being sent, even if it isn't.
b := make([]byte, r.Length)
t.reconcileHandshakeStats(c)
}
-func (c *PeerConn) peerPriority() (peerPriority, error) {
+func (c *peer) peerPriority() (peerPriority, error) {
return bep40Priority(c.remoteIpPort(), c.t.cl.publicAddr(c.remoteIp()))
}
-func (c *PeerConn) remoteIp() net.IP {
+func (c *peer) remoteIp() net.IP {
return addrIpOrNil(c.remoteAddr)
}
-func (c *PeerConn) remoteIpPort() IpPort {
+func (c *peer) remoteIpPort() IpPort {
ipa, _ := tryIpPortFromNetAddr(c.remoteAddr)
return IpPort{ipa.IP, uint16(ipa.Port)}
}
return fmt.Sprintf("connection %p", c)
}
-func (c *PeerConn) trust() connectionTrust {
+func (c *peer) trust() connectionTrust {
return connectionTrust{c.trusted, c.netGoodPiecesDirtied()}
}
return multiless.New().Bool(l.Implicit, r.Implicit).Int64(l.NetGoodPiecesDirted, r.NetGoodPiecesDirted).Less()
}
-func (cn *PeerConn) requestStrategyConnection() requestStrategyConnection {
+func (cn *peer) requestStrategyConnection() requestStrategyConnection {
return cn
}
-func (cn *PeerConn) chunksReceivedWhileExpecting() int64 {
+func (cn *peer) chunksReceivedWhileExpecting() int64 {
return cn._chunksReceivedWhileExpecting
}
-func (cn *PeerConn) fastest() bool {
- return cn == cn.t.fastestConn
+func (cn *peer) fastest() bool {
+ return cn == cn.t.fastestPeer
}
-func (cn *PeerConn) peerMaxRequests() int {
+func (cn *peer) peerMaxRequests() int {
return cn.PeerMaxRequests
}
return cn.peerPieces()
}
-func (cn *PeerConn) peerPieces() bitmap.Bitmap {
+func (cn *peer) peerPieces() bitmap.Bitmap {
ret := cn._peerPieces.Copy()
if cn.peerSentHaveAll {
ret.AddRange(0, cn.t.numPieces())
return ret
}
-func (cn *PeerConn) pieceRequestOrder() *prioritybitmap.PriorityBitmap {
+func (cn *peer) pieceRequestOrder() *prioritybitmap.PriorityBitmap {
return &cn._pieceRequestOrder
}
-func (cn *PeerConn) stats() *ConnStats {
+func (cn *peer) stats() *ConnStats {
return &cn._stats
}
-func (cn *PeerConn) torrent() requestStrategyTorrent {
+func (cn *peer) torrent() requestStrategyTorrent {
return cn.t.requestStrategyTorrent()
}
conn *PeerConn
f pp.PexPeerFlags
}{
- {&PeerConn{outgoing: false, PeerPrefersEncryption: false}, 0},
- {&PeerConn{outgoing: false, PeerPrefersEncryption: true}, pp.PexPrefersEncryption},
- {&PeerConn{outgoing: true, PeerPrefersEncryption: false}, pp.PexOutgoingConn},
- {&PeerConn{outgoing: true, PeerPrefersEncryption: true}, pp.PexOutgoingConn | pp.PexPrefersEncryption},
- {&PeerConn{remoteAddr: udpAddr}, pp.PexSupportsUtp},
- {&PeerConn{remoteAddr: udpAddr, outgoing: true}, pp.PexOutgoingConn | pp.PexSupportsUtp},
- {&PeerConn{remoteAddr: tcpAddr, outgoing: true}, pp.PexOutgoingConn},
- {&PeerConn{remoteAddr: tcpAddr}, 0},
+ {&PeerConn{peer: peer{outgoing: false, PeerPrefersEncryption: false}}, 0},
+ {&PeerConn{peer: peer{outgoing: false, PeerPrefersEncryption: true}}, pp.PexPrefersEncryption},
+ {&PeerConn{peer: peer{outgoing: true, PeerPrefersEncryption: false}}, pp.PexOutgoingConn},
+ {&PeerConn{peer: peer{outgoing: true, PeerPrefersEncryption: true}}, pp.PexOutgoingConn | pp.PexPrefersEncryption},
+ {&PeerConn{peer: peer{remoteAddr: udpAddr}}, pp.PexSupportsUtp},
+ {&PeerConn{peer: peer{remoteAddr: udpAddr, outgoing: true}}, pp.PexOutgoingConn | pp.PexSupportsUtp},
+ {&PeerConn{peer: peer{remoteAddr: tcpAddr, outgoing: true}}, pp.PexOutgoingConn},
+ {&PeerConn{peer: peer{remoteAddr: tcpAddr}}, 0},
}
for i, tc := range testcases {
f := tc.conn.pexPeerFlags()
}{
{
pexAdd,
- &PeerConn{remoteAddr: udpAddr},
+ &PeerConn{peer: peer{remoteAddr: udpAddr}},
pexEvent{pexAdd, udpAddr, pp.PexSupportsUtp},
},
{
pexDrop,
- &PeerConn{remoteAddr: tcpAddr, outgoing: true, PeerListenPort: dialTcpAddr.Port},
+ &PeerConn{peer: peer{remoteAddr: tcpAddr, outgoing: true, PeerListenPort: dialTcpAddr.Port}},
pexEvent{pexDrop, tcpAddr, pp.PexOutgoingConn},
},
{
pexAdd,
- &PeerConn{remoteAddr: tcpAddr, PeerListenPort: dialTcpAddr.Port},
+ &PeerConn{peer: peer{remoteAddr: tcpAddr, PeerListenPort: dialTcpAddr.Port}},
pexEvent{pexAdd, dialTcpAddr, 0},
},
{
pexDrop,
- &PeerConn{remoteAddr: udpAddr, PeerListenPort: dialUdpAddr.Port},
+ &PeerConn{peer: peer{remoteAddr: udpAddr, PeerListenPort: dialUdpAddr.Port}},
pexEvent{pexDrop, dialUdpAddr, pp.PexSupportsUtp},
},
}
// Peer client ID.
type PeerID [20]byte
-// // Pretty prints the ID as hex, except parts that adher to the Peer ID
+// // Pretty prints the ID as hex, except parts that adher to the PeerInfo ID
// // Conventions of BEP 20.
// func (me PeerID) String() string {
// // if me[0] == '-' && me[7] == '-' {
func TestPexAdded(t *testing.T) {
t.Run("noHold", func(t *testing.T) {
s := new(pexState)
- s.Add(&PeerConn{remoteAddr: addrs[0], outgoing: true})
+ s.Add(&PeerConn{peer: peer{remoteAddr: addrs[0], outgoing: true}})
targ := &pexState{
ev: []pexEvent{
pexEvent{pexAdd, addrs[0], pp.PexOutgoingConn},
},
nc: 0,
}
- s.Add(&PeerConn{remoteAddr: addrs[0]})
+ s.Add(&PeerConn{peer: peer{remoteAddr: addrs[0]}})
targ := &pexState{
hold: []pexEvent{
pexEvent{pexDrop, addrs[1], 0},
},
nc: pexTargAdded,
}
- s.Add(&PeerConn{remoteAddr: addrs[0]})
+ s.Add(&PeerConn{peer: peer{remoteAddr: addrs[0]}})
targ := &pexState{
hold: []pexEvent{},
ev: []pexEvent{
func TestPexDropped(t *testing.T) {
t.Run("belowTarg", func(t *testing.T) {
s := &pexState{nc: 1}
- s.Drop(&PeerConn{remoteAddr: addrs[0], pex: pexConnState{Listed: true}})
+ s.Drop(&PeerConn{peer: peer{remoteAddr: addrs[0], pex: pexConnState{Listed: true}}})
targ := &pexState{
hold: []pexEvent{pexEvent{pexDrop, addrs[0], 0}},
nc: 0,
})
t.Run("aboveTarg", func(t *testing.T) {
s := &pexState{nc: pexTargAdded + 1}
- s.Drop(&PeerConn{remoteAddr: addrs[0], pex: pexConnState{Listed: true}})
+ s.Drop(&PeerConn{peer: peer{remoteAddr: addrs[0], pex: pexConnState{Listed: true}}})
targ := &pexState{
ev: []pexEvent{pexEvent{pexDrop, addrs[0], 0}},
nc: pexTargAdded,
})
t.Run("aboveTargNotListed", func(t *testing.T) {
s := &pexState{nc: pexTargAdded + 1}
- s.Drop(&PeerConn{remoteAddr: addrs[0], pex: pexConnState{Listed: false}})
+ s.Drop(&PeerConn{peer: peer{remoteAddr: addrs[0], pex: pexConnState{Listed: false}}})
targ := &pexState{nc: pexTargAdded + 1}
require.EqualValues(t, targ, s)
})
torrent.Add("pex added peers received", int64(len(rx.Added)))
torrent.Add("pex added6 peers received", int64(len(rx.Added6)))
- var peers Peers
+ var peers peerInfos
peers.AppendFromPex(rx.Added6, rx.Added6Flags)
peers.AppendFromPex(rx.Added, rx.AddedFlags)
s.dbg.Printf("adding %d peers from PEX", len(peers))
// Connections that have written data to this piece since its last check.
// This can include connections that have closed.
- dirtiers map[*PeerConn]struct{}
+ dirtiers map[*peer]struct{}
}
func (p *Piece) String() string {
// change if our apparent IP changes, we don't currently handle that.
type prioritizedPeersItem struct {
prio peerPriority
- p Peer
+ p PeerInfo
}
var hashSeed = maphash.MakeSeed()
type prioritizedPeers struct {
om *btree.BTree
- getPrio func(Peer) peerPriority
+ getPrio func(PeerInfo) peerPriority
}
-func (me *prioritizedPeers) Each(f func(Peer)) {
+func (me *prioritizedPeers) Each(f func(PeerInfo)) {
me.om.Ascend(func(i btree.Item) bool {
f(i.(prioritizedPeersItem).p)
return true
}
// Returns true if a peer is replaced.
-func (me *prioritizedPeers) Add(p Peer) bool {
+func (me *prioritizedPeers) Add(p PeerInfo) bool {
return me.om.ReplaceOrInsert(prioritizedPeersItem{me.getPrio(p), p}) != nil
}
// Returns true if a peer is replaced.
-func (me *prioritizedPeers) AddReturningReplacedPeer(p Peer) (ret Peer, ok bool) {
+func (me *prioritizedPeers) AddReturningReplacedPeer(p PeerInfo) (ret PeerInfo, ok bool) {
item := me.om.ReplaceOrInsert(prioritizedPeersItem{me.getPrio(p), p})
if item == nil {
return
return
}
-func (me *prioritizedPeers) PopMax() Peer {
+func (me *prioritizedPeers) PopMax() PeerInfo {
return me.om.DeleteMax().(prioritizedPeersItem).p
}
func TestPrioritizedPeers(t *testing.T) {
pp := prioritizedPeers{
om: btree.New(3),
- getPrio: func(p Peer) peerPriority {
+ getPrio: func(p PeerInfo) peerPriority {
return bep40PriorityIgnoreError(p.addr(), IpPort{IP: net.ParseIP("0.0.0.0")})
},
}
_, ok := pp.DeleteMin()
assert.Panics(t, func() { pp.PopMax() })
assert.False(t, ok)
- ps := []Peer{
+ ps := []PeerInfo{
{Addr: ipPortAddr{IP: net.ParseIP("1.2.3.4")}},
{Addr: ipPortAddr{IP: net.ParseIP("1::2")}},
{Addr: ipPortAddr{IP: net.ParseIP("")}},
assert.True(t, pp.Add(p))
assert.Equal(t, i+1, pp.Len())
}
- pop := func(expected *Peer) {
+ pop := func(expected *PeerInfo) {
if expected == nil {
assert.Panics(t, func() { pp.PopMax() })
} else {
assert.Equal(t, *expected, pp.PopMax())
}
}
- min := func(expected *Peer) {
+ min := func(expected *PeerInfo) {
i, ok := pp.DeleteMin()
if expected == nil {
assert.False(t, ok)
--- /dev/null
+package segments
+
+import (
+ "sort"
+)
+
+func NewIndex(segments LengthIter) (ret Index) {
+ var start Length
+ for l, ok := segments(); ok; l, ok = segments() {
+ ret.segments = append(ret.segments, Extent{start, l})
+ start += l
+ }
+ return
+}
+
+type Index struct {
+ segments []Extent
+}
+
+func (me Index) iterSegments() func() (Length, bool) {
+ return func() (Length, bool) {
+ if len(me.segments) == 0 {
+ return 0, false
+ } else {
+ l := me.segments[0].Length
+ me.segments = me.segments[1:]
+ return l, true
+ }
+ }
+}
+
+func (me Index) Locate(e Extent, output Callback) bool {
+ first := sort.Search(len(me.segments), func(i int) bool {
+ _e := me.segments[i]
+ return _e.End() > e.Start
+ })
+ if first == len(me.segments) {
+ return false
+ }
+ e.Start -= me.segments[first].Start
+ me.segments = me.segments[first:]
+ return Scan(me.iterSegments(), e, func(i int, e Extent) bool {
+ return output(i+first, e)
+ })
+}
--- /dev/null
+package segments
+
+type Int = int64
+
+type Length = Int
+
+func min(i Int, rest ...Int) Int {
+ ret := i
+ for _, i := range rest {
+ if i < ret {
+ ret = i
+ }
+ }
+ return ret
+}
+
+type Extent struct {
+ Start, Length Int
+}
+
+func (e Extent) End() Int {
+ return e.Start + e.Length
+}
+
+type (
+ Callback = func(int, Extent) bool
+ LengthIter = func() (Length, bool)
+)
+
+func Scan(haystack LengthIter, needle Extent, callback Callback) bool {
+ i := 0
+ for needle.Length != 0 {
+ l, ok := haystack()
+ if !ok {
+ return false
+ }
+ if needle.Start < l || needle.Start == l && l == 0 {
+ e1 := Extent{
+ Start: needle.Start,
+ Length: min(l, needle.End()) - needle.Start,
+ }
+ if e1.Length >= 0 {
+ if !callback(i, e1) {
+ return true
+ }
+ needle.Start = 0
+ needle.Length -= e1.Length
+ }
+ } else {
+ needle.Start -= l
+ }
+ i++
+ }
+ return true
+}
+
+func LocaterFromLengthIter(li LengthIter) Locater {
+ return func(e Extent, c Callback) bool {
+ return Scan(li, e, c)
+ }
+}
+
+type Locater func(Extent, Callback) bool
--- /dev/null
+package segments
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func LengthIterFromSlice(ls []Length) LengthIter {
+ return func() (Length, bool) {
+ switch len(ls) {
+ case 0:
+ return -1, false
+ default:
+ l := ls[0]
+ ls = ls[1:]
+ return l, true
+ }
+ }
+}
+
+type ScanCallbackValue struct {
+ Index int
+ Extent
+}
+
+type collectExtents []ScanCallbackValue
+
+func (me *collectExtents) scanCallback(i int, e Extent) bool {
+ *me = append(*me, ScanCallbackValue{
+ Index: i,
+ Extent: e,
+ })
+ return true
+}
+
+type newLocater func(LengthIter) Locater
+
+func assertLocate(t *testing.T, nl newLocater, ls []Length, needle Extent, firstExpectedIndex int, expectedExtents []Extent) {
+ var actual collectExtents
+ var expected collectExtents
+ for i, e := range expectedExtents {
+ expected.scanCallback(firstExpectedIndex+i, e)
+ }
+ nl(LengthIterFromSlice(ls))(needle, actual.scanCallback)
+ assert.EqualValues(t, expected, actual)
+}
+
+func testLocater(t *testing.T, newLocater newLocater) {
+ assertLocate(t, newLocater,
+ []Length{1, 0, 2, 0, 3},
+ Extent{2, 2},
+ 2,
+ []Extent{{1, 1}, {0, 0}, {0, 1}})
+ assertLocate(t, newLocater,
+ []Length{1, 0, 2, 0, 3},
+ Extent{6, 2},
+ 2,
+ []Extent{})
+ assertLocate(t, newLocater,
+ []Length{1652, 1514, 1554, 1618, 1546, 129241752, 1537}, // 128737588
+ Extent{0, 16384},
+ 0,
+ []Extent{
+ {0, 1652},
+ {0, 1514},
+ {0, 1554},
+ {0, 1618},
+ {0, 1546},
+ {0, 8500}})
+ assertLocate(t, newLocater,
+ []Length{1652, 1514, 1554, 1618, 1546, 129241752, 1537, 1536, 1551}, // 128737588
+ Extent{129236992, 16384},
+ 5,
+ []Extent{
+ {129229108, 12644},
+ {0, 1537},
+ {0, 1536},
+ {0, 667}})
+}
+
+func TestScan(t *testing.T) {
+ testLocater(t, LocaterFromLengthIter)
+}
+
+func TestIndex(t *testing.T) {
+ testLocater(t, func(li LengthIter) Locater {
+ return NewIndex(li).Locate
+ })
+}
"github.com/anacrolix/torrent/storage"
)
-// Specifies a new torrent for adding to a client. There are helpers for
-// magnet URIs and torrent metainfo files.
+// Specifies a new torrent for adding to a client. There are helpers for magnet URIs and torrent
+// metainfo files.
type TorrentSpec struct {
// The tiered tracker URIs.
Trackers [][]string
InfoBytes []byte
// The name to use if the Name field from the Info isn't available.
DisplayName string
- // The chunk size to use for outbound requests. Defaults to 16KiB if not
- // set.
+ Webseeds []string
+ DhtNodes []string
+ // The combination of the "xs" and "as" fields in magnet links, for now.
+ Sources []string
+
+ // The chunk size to use for outbound requests. Defaults to 16KiB if not set.
ChunkSize int
Storage storage.ClientImpl
}
Trackers: [][]string{m.Trackers},
DisplayName: m.DisplayName,
InfoHash: m.InfoHash,
+ Webseeds: m.Params["ws"],
+ Sources: append(m.Params["xs"], m.Params["as"]...),
+ // TODO: What's the parameter for DHT nodes or bootstrap peers in a magnet link?
}
return
}
-func TorrentSpecFromMetaInfo(mi *metainfo.MetaInfo) (spec *TorrentSpec) {
- info, _ := mi.UnmarshalInfo()
- spec = &TorrentSpec{
- Trackers: mi.AnnounceList,
+func TorrentSpecFromMetaInfo(mi *metainfo.MetaInfo) *TorrentSpec {
+ info, err := mi.UnmarshalInfo()
+ if err != nil {
+ panic(err)
+ }
+ return &TorrentSpec{
+ Trackers: mi.UpvertedAnnounceList(),
+ InfoHash: mi.HashInfoBytes(),
InfoBytes: mi.InfoBytes,
DisplayName: info.Name,
- InfoHash: mi.HashInfoBytes(),
+ Webseeds: mi.UrlList,
+ DhtNodes: func() (ret []string) {
+ ret = make([]string, len(mi.Nodes))
+ for _, node := range mi.Nodes {
+ ret = append(ret, string(node))
+ }
+ return
+ }(),
}
- if spec.Trackers == nil && mi.Announce != "" {
- spec.Trackers = [][]string{{mi.Announce}}
- }
- return
}
"path/filepath"
"github.com/anacrolix/missinggo"
+ "github.com/anacrolix/torrent/common"
+ "github.com/anacrolix/torrent/segments"
"github.com/anacrolix/torrent/metainfo"
)
if err != nil {
return nil, err
}
+ upvertedFiles := info.UpvertedFiles()
return &fileTorrentImpl{
dir,
- info,
+ info.Name,
+ upvertedFiles,
+ segments.NewIndex(common.LengthIterFromUpvertedFiles(upvertedFiles)),
infoHash,
fs.pc,
}, nil
}
type fileTorrentImpl struct {
- dir string
- info *metainfo.Info
- infoHash metainfo.Hash
- completion PieceCompletion
+ dir string
+ infoName string
+ upvertedFiles []metainfo.FileInfo
+ segmentLocater segments.Index
+ infoHash metainfo.Hash
+ completion PieceCompletion
}
func (fts *fileTorrentImpl) Piece(p metainfo.Piece) PieceImpl {
// Only returns EOF at the end of the torrent. Premature EOF is ErrUnexpectedEOF.
func (fst fileTorrentImplIO) ReadAt(b []byte, off int64) (n int, err error) {
- for _, fi := range fst.fts.info.UpvertedFiles() {
- for off < fi.Length {
- n1, err1 := fst.readFileAt(fi, b, off)
- n += n1
- off += int64(n1)
- b = b[n1:]
- if len(b) == 0 {
- // Got what we need.
- return
- }
- if n1 != 0 {
- // Made progress.
- continue
- }
- err = err1
- return
- }
- off -= fi.Length
+ fst.fts.segmentLocater.Locate(segments.Extent{off, int64(len(b))}, func(i int, e segments.Extent) bool {
+ n1, err1 := fst.readFileAt(fst.fts.upvertedFiles[i], b[:e.Length], e.Start)
+ n += n1
+ b = b[n1:]
+ err = err1
+ return err == nil // && int64(n1) == e.Length
+ })
+ if len(b) != 0 && err == nil {
+ err = io.EOF
}
- err = io.EOF
return
}
func (fst fileTorrentImplIO) WriteAt(p []byte, off int64) (n int, err error) {
- for _, fi := range fst.fts.info.UpvertedFiles() {
- if off >= fi.Length {
- off -= fi.Length
- continue
- }
- n1 := len(p)
- if int64(n1) > fi.Length-off {
- n1 = int(fi.Length - off)
- }
- name := fst.fts.fileInfoName(fi)
+ //log.Printf("write at %v: %v bytes", off, len(p))
+ fst.fts.segmentLocater.Locate(segments.Extent{off, int64(len(p))}, func(i int, e segments.Extent) bool {
+ name := fst.fts.fileInfoName(fst.fts.upvertedFiles[i])
os.MkdirAll(filepath.Dir(name), 0777)
var f *os.File
f, err = os.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
- return
- }
- n1, err = f.WriteAt(p[:n1], off)
- // TODO: On some systems, write errors can be delayed until the Close.
- f.Close()
- if err != nil {
- return
+ return false
}
+ var n1 int
+ n1, err = f.WriteAt(p[:e.Length], e.Start)
+ //log.Printf("%v %v wrote %v: %v", i, e, n1, err)
+ closeErr := f.Close()
n += n1
- off = 0
p = p[n1:]
- if len(p) == 0 {
- break
+ if err == nil {
+ err = closeErr
}
- }
+ if err == nil && int64(n1) != e.Length {
+ err = io.ErrShortWrite
+ }
+ return err == nil
+ })
return
}
func (fts *fileTorrentImpl) fileInfoName(fi metainfo.FileInfo) string {
- return filepath.Join(append([]string{fts.dir, fts.info.Name}, fi.Path...)...)
+ return filepath.Join(append([]string{fts.dir, fts.infoName}, fi.Path...)...)
}
}
func (ts *mmapTorrentStorage) Close() error {
- return ts.span.Close()
+ errs := ts.span.Close()
+ if len(errs) > 0 {
+ return errs[0]
+ }
+ return nil
}
type mmapStoragePiece struct {
mms.Append(mm)
}
}
+ mms.InitIndex()
return
}
return *t.files
}
-func (t *Torrent) AddPeers(pp []Peer) int {
+func (t *Torrent) AddPeers(pp []PeerInfo) int {
cl := t.cl
cl.lock()
defer cl.unlock()
"fmt"
"io"
"math/rand"
+ "net/http"
"net/url"
+ "sort"
"sync"
"text/tabwriter"
"time"
"unsafe"
+ "github.com/anacrolix/torrent/common"
+ "github.com/anacrolix/torrent/segments"
+ "github.com/anacrolix/torrent/webseed"
"github.com/davecgh/go-spew/spew"
"github.com/pion/datachannel"
metainfo metainfo.MetaInfo
// The info dict. nil if we don't have it (yet).
- info *metainfo.Info
- files *[]*File
+ info *metainfo.Info
+ fileIndex segments.Index
+ files *[]*File
+
+ webSeeds map[string]*peer
// Active peer connections, running message stream loops. TODO: Make this
// open (not-closed) connections only.
maxEstablishedConns int
// Set of addrs to which we're attempting to connect. Connections are
// half-open until all handshakes are completed.
- halfOpen map[string]Peer
- fastestConn *PeerConn
+ halfOpen map[string]PeerInfo
+ fastestPeer *peer
// Reserve of peers to connect to. A peer can be both here and in the
// active connections if were told about the peer after connecting with
// KnownSwarm returns the known subset of the peers in the Torrent's swarm, including active,
// pending, and half-open peers.
-func (t *Torrent) KnownSwarm() (ks []Peer) {
+func (t *Torrent) KnownSwarm() (ks []PeerInfo) {
// Add pending peers to the list
- t.peers.Each(func(peer Peer) {
+ t.peers.Each(func(peer PeerInfo) {
ks = append(ks, peer)
})
// Add active peers to the list
for conn := range t.conns {
- ks = append(ks, Peer{
+ ks = append(ks, PeerInfo{
Id: conn.PeerID,
Addr: conn.remoteAddr,
Source: conn.Discovery,
return
}
-func (t *Torrent) addPeer(p Peer) (added bool) {
+func (t *Torrent) addPeer(p PeerInfo) (added bool) {
cl := t.cl
torrent.Add(fmt.Sprintf("peers added by source %q", p.Source), 1)
if t.closed.IsSet() {
}
if replaced, ok := t.peers.AddReturningReplacedPeer(p); ok {
torrent.Add("peers replaced", 1)
- if !replaced.Equal(p) {
+ if !replaced.equal(p) {
t.logger.WithDefaultLevel(log.Debug).Printf("added %v replacing %v", p, replaced)
added = true
}
t.nameMu.Lock()
t.info = info
t.nameMu.Unlock()
+ t.fileIndex = segments.NewIndex(common.LengthIterFromUpvertedFiles(info.UpvertedFiles()))
t.displayName = "" // Save a few bytes lol.
t.initFiles()
t.cacheLength()
return nil
}
+// This seems to be all the follow-up tasks after info is set, that can't fail.
func (t *Torrent) onSetInfo() {
- for conn := range t.conns {
- if err := conn.setNumPieces(t.numPieces()); err != nil {
- t.logger.Printf("closing connection: %s", err)
- conn.close()
- }
- }
+ t.iterPeers(func(p *peer) {
+ p.onGotInfo(t.info)
+ })
for i := range t.pieces {
t.updatePieceCompletion(pieceIndex(i))
p := &t.pieces[i]
spew.NewDefaultConfig()
spew.Fdump(w, t.statsLocked())
- conns := t.connsAsSlice()
- slices.Sort(conns, worseConn)
- for i, c := range conns {
+ peers := t.peersAsSlice()
+ sort.Slice(peers, func(i, j int) bool {
+ return worseConn(peers[i], peers[j])
+ })
+ for i, c := range peers {
fmt.Fprintf(w, "%2d. ", i+1)
c.writeStatus(w, t)
}
return torrentRequestOffset(*t.length, int64(t.usualPieceSize()), r)
}
-// Return the request that would include the given offset into the torrent
-// data. Returns !ok if there is no such request.
+// Return the request that would include the given offset into the torrent data. Returns !ok if
+// there is no such request.
func (t *Torrent) offsetRequest(off int64) (req request, ok bool) {
return torrentOffsetRequest(*t.length, t.info.PieceLength, int64(t.chunkSize), off)
}
})
}
-// The worst connection is one that hasn't been sent, or sent anything useful
-// for the longest. A bad connection is one that usually sends us unwanted
-// pieces, or has been in worser half of the established connections for more
-// than a minute.
+// The worst connection is one that hasn't been sent, or sent anything useful for the longest. A bad
+// connection is one that usually sends us unwanted pieces, or has been in worser half of the
+// established connections for more than a minute.
func (t *Torrent) worstBadConn() *PeerConn {
wcs := worseConnSlice{t.unclosedConnsAsSlice()}
heap.Init(&wcs)
func (t *Torrent) piecePriorityChanged(piece pieceIndex) {
// t.logger.Printf("piece %d priority changed", piece)
- for c := range t.conns {
+ t.iterPeers(func(c *peer) {
if c.updatePiecePriority(piece) {
// log.Print("conn piece priority changed")
c.updateRequests()
}
- }
+ })
t.maybeNewConns()
t.publishPieceChange(piece)
}
}
torrent.Add("deleted connections", 1)
c.deleteAllRequests()
- if len(t.conns) == 0 {
+ if t.numActivePeers() == 0 {
t.assertNoPendingRequests()
}
return
}
+func (t *Torrent) numActivePeers() (num int) {
+ t.iterPeers(func(*peer) {
+ num++
+ })
+ return
+}
+
func (t *Torrent) assertNoPendingRequests() {
if len(t.pendingRequests) != 0 {
panic(t.pendingRequests)
sl := func() torrentTrackerAnnouncer {
switch u.Scheme {
case "ws", "wss":
+ if t.cl.config.DisableWebtorrent {
+ return nil
+ }
return t.startWebsocketAnnouncer(*u)
}
if u.Scheme == "udp4" && (t.cl.config.DisableIPv4Peers || t.cl.config.DisableIPv4) {
// Can't do anything with this.
continue
}
- t.addPeer(Peer{
+ t.addPeer(PeerInfo{
Addr: ipPortAddr{cp.IP, cp.Port},
Source: PeerSourceDhtGetPeers,
})
}
}
-func (t *Torrent) addPeers(peers []Peer) (added int) {
+func (t *Torrent) addPeers(peers []PeerInfo) (added int) {
for _, p := range peers {
if t.addPeer(p) {
added++
for addr := range t.halfOpen {
peers[addr] = struct{}{}
}
- t.peers.Each(func(peer Peer) {
+ t.peers.Each(func(peer PeerInfo) {
peers[peer.Addr.String()] = struct{}{}
})
return len(peers)
defer t.cl.unlock()
oldMax = t.maxEstablishedConns
t.maxEstablishedConns = max
- wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), worseConn)
+ wcs := slices.HeapInterface(slices.FromMapKeys(t.conns), func(l, r *PeerConn) bool {
+ return worseConn(&l.peer, &r.peer)
+ })
for len(t.conns) > t.maxEstablishedConns && wcs.Len() > 0 {
t.dropConnection(wcs.Pop().(*PeerConn))
}
c.stats().incrementPiecesDirtiedBad()
}
- bannableTouchers := make([]*PeerConn, 0, len(p.dirtiers))
+ bannableTouchers := make([]*peer, 0, len(p.dirtiers))
for c := range p.dirtiers {
if !c.trusted {
bannableTouchers = append(bannableTouchers, c)
// c.drop()
// }
// }
- for conn := range t.conns {
+ t.iterPeers(func(conn *peer) {
if conn.peerHasPiece(piece) {
conn.updateRequests()
}
- }
+ })
}
func (t *Torrent) tryCreateMorePieceHashers() {
}
}
-func (t *Torrent) connsAsSlice() (ret []*PeerConn) {
- for c := range t.conns {
- ret = append(ret, c)
- }
+func (t *Torrent) peersAsSlice() (ret []*peer) {
+ t.iterPeers(func(p *peer) {
+ ret = append(ret, p)
+ })
return
}
}
// Start the process of connecting to the given peer for the given torrent if appropriate.
-func (t *Torrent) initiateConn(peer Peer) {
+func (t *Torrent) initiateConn(peer PeerInfo) {
if peer.Id == t.cl.peerID {
return
}
// Adds a trusted, pending peer for each of the given Client's addresses. Typically used in tests to
// quickly make one Client visible to the Torrent of another Client.
func (t *Torrent) AddClientPeer(cl *Client) int {
- return t.AddPeers(func() (ps []Peer) {
+ return t.AddPeers(func() (ps []PeerInfo) {
for _, la := range cl.ListenAddrs() {
- ps = append(ps, Peer{
+ ps = append(ps, PeerInfo{
Addr: la,
Trusted: true,
})
torrent.Add("request timeouts", 1)
cb.t.cl.lock()
defer cb.t.cl.unlock()
- for cn := range cb.t.conns {
+ cb.t.iterPeers(func(cn *peer) {
if cn.peerHasPiece(pieceIndex(r.Index)) {
cn.updateRequests()
}
- }
+ })
}
func (t *Torrent) disallowDataDownloadLocked() {
log.Printf("disallowing data download")
t.dataDownloadDisallowed = true
- for c := range t.conns {
+ t.iterPeers(func(c *peer) {
c.updateRequests()
- }
+ })
}
func (t *Torrent) AllowDataDownload() {
defer t.cl.unlock()
log.Printf("AllowDataDownload")
t.dataDownloadDisallowed = false
- for c := range t.conns {
+ t.iterPeers(func(c *peer) {
c.updateRequests()
- }
-
+ })
}
func (t *Torrent) AllowDataUpload() {
defer t.cl.unlock()
t.userOnWriteChunkErr = f
}
+
+func (t *Torrent) iterPeers(f func(*peer)) {
+ for pc := range t.conns {
+ f(&pc.peer)
+ }
+ for _, ws := range t.webSeeds {
+ f(ws)
+ }
+}
+
+func (t *Torrent) addWebSeed(url string) {
+ if t.cl.config.DisableWebseeds {
+ return
+ }
+ if _, ok := t.webSeeds[url]; ok {
+ return
+ }
+ const maxRequests = 10
+ ws := webSeed{
+ peer: peer{
+ t: t,
+ connString: url,
+ outgoing: true,
+ network: "http",
+ reconciledHandshakeStats: true,
+ peerSentHaveAll: true,
+ PeerMaxRequests: maxRequests,
+ },
+ client: webseed.Client{
+ HttpClient: http.DefaultClient,
+ Url: url,
+ },
+ requests: make(map[request]webseed.Request, maxRequests),
+ }
+ ws.peer.peerImpl = &ws
+ if t.haveInfo() {
+ ws.onGotInfo(t.info)
+ }
+ t.webSeeds[url] = &ws.peer
+}
+
+func (t *Torrent) peerIsActive(p *peer) (active bool) {
+ t.iterPeers(func(p1 *peer) {
+ if p1 == p {
+ active = true
+ }
+ })
+ return
+}
return request{i, chunkSpec{b, l}}
}
-// Check the given Request is correct for various torrent offsets.
+// Check the given request is correct for various torrent offsets.
func TestTorrentRequest(t *testing.T) {
const s = 472183431 // Length of torrent.
for _, _case := range []struct {
off int64 // An offset into the torrent.
- req request // The expected Request. The zero value means !ok.
+ req request // The expected request. The zero value means !ok.
}{
// Invalid offset.
{-1, request{}},
ret.Err = fmt.Errorf("error announcing: %s", err)
return
}
- me.t.AddPeers(Peers(nil).AppendFromTracker(res.Peers))
+ me.t.AddPeers(peerInfos(nil).AppendFromTracker(res.Peers))
ret.NumPeers = len(res.Peers)
ret.Interval = time.Duration(res.Interval) * time.Second
return
--- /dev/null
+package torrent
+
+import (
+ "net/http"
+
+ "github.com/anacrolix/log"
+ "github.com/anacrolix/torrent/common"
+ "github.com/anacrolix/torrent/metainfo"
+ pp "github.com/anacrolix/torrent/peer_protocol"
+ "github.com/anacrolix/torrent/segments"
+ "github.com/anacrolix/torrent/webseed"
+)
+
+type httpRequestResult struct {
+ resp *http.Response
+ err error
+}
+
+type requestPart struct {
+ req *http.Request
+ e segments.Extent
+ result chan httpRequestResult
+}
+
+type webseedRequest struct {
+ cancel func()
+}
+
+type webSeed struct {
+ client webseed.Client
+ requests map[request]webseed.Request
+ peer peer
+}
+
+var _ peerImpl = (*webSeed)(nil)
+
+func (ws *webSeed) onGotInfo(info *metainfo.Info) {
+ ws.client.FileIndex = segments.NewIndex(common.LengthIterFromUpvertedFiles(info.UpvertedFiles()))
+ ws.client.Info = info
+}
+
+func (ws *webSeed) _postCancel(r request) {
+ ws.cancel(r)
+}
+
+func (ws *webSeed) writeInterested(interested bool) bool {
+ return true
+}
+
+func (ws *webSeed) cancel(r request) bool {
+ ws.requests[r].Cancel()
+ return true
+}
+
+func (ws *webSeed) intoSpec(r request) webseed.RequestSpec {
+ return webseed.RequestSpec{ws.peer.t.requestOffset(r), int64(r.Length)}
+}
+
+func (ws *webSeed) request(r request) bool {
+ webseedRequest := ws.client.NewRequest(ws.intoSpec(r))
+ ws.requests[r] = webseedRequest
+ go ws.requestResultHandler(r, webseedRequest)
+ return true
+}
+
+func (ws *webSeed) connectionFlags() string {
+ return "WS"
+}
+
+func (ws *webSeed) drop() {
+}
+
+func (ws *webSeed) updateRequests() {
+ ws.peer.doRequestState()
+}
+
+func (ws *webSeed) _close() {}
+
+func (ws *webSeed) requestResultHandler(r request, webseedRequest webseed.Request) {
+ result := <-webseedRequest.Result
+ ws.peer.t.cl.lock()
+ defer ws.peer.t.cl.unlock()
+ if result.Err != nil {
+ log.Printf("webseed request rejected: %v", result.Err)
+ ws.peer.remoteRejectedRequest(r)
+ } else {
+ err := ws.peer.receiveChunk(&pp.Message{
+ Type: pp.Piece,
+ Index: r.Index,
+ Begin: r.Begin,
+ Piece: result.Bytes,
+ })
+ if err != nil {
+ panic(err)
+ }
+ }
+}
--- /dev/null
+package webseed
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "io"
+ "net/http"
+
+ "github.com/anacrolix/torrent/metainfo"
+ "github.com/anacrolix/torrent/segments"
+)
+
+type RequestSpec = segments.Extent
+
+type requestPartResult struct {
+ resp *http.Response
+ err error
+}
+
+type requestPart struct {
+ req *http.Request
+ e segments.Extent
+ result chan requestPartResult
+}
+
+type Request struct {
+ cancel func()
+ Result chan RequestResult
+}
+
+func (r Request) Cancel() {
+ r.cancel()
+}
+
+type Client struct {
+ HttpClient *http.Client
+ Url string
+ FileIndex segments.Index
+ Info *metainfo.Info
+}
+
+type RequestResult struct {
+ Bytes []byte
+ Err error
+}
+
+func (ws *Client) NewRequest(r RequestSpec) Request {
+ ctx, cancel := context.WithCancel(context.Background())
+ var requestParts []requestPart
+ if !ws.FileIndex.Locate(r, func(i int, e segments.Extent) bool {
+ req, err := NewRequest(ws.Url, i, ws.Info, e.Start, e.Length)
+ if err != nil {
+ panic(err)
+ }
+ req = req.WithContext(ctx)
+ part := requestPart{
+ req: req,
+ result: make(chan requestPartResult, 1),
+ e: e,
+ }
+ go func() {
+ resp, err := ws.HttpClient.Do(req)
+ part.result <- requestPartResult{
+ resp: resp,
+ err: err,
+ }
+ }()
+ requestParts = append(requestParts, part)
+ return true
+ }) {
+ panic("request out of file bounds")
+ }
+ req := Request{
+ cancel: cancel,
+ Result: make(chan RequestResult, 1),
+ }
+ go func() {
+ b, err := readRequestPartResponses(requestParts)
+ req.Result <- RequestResult{
+ Bytes: b,
+ Err: err,
+ }
+ }()
+ return req
+}
+
+func recvPartResult(buf io.Writer, part requestPart) error {
+ result := <-part.result
+ if result.err != nil {
+ return result.err
+ }
+ defer result.resp.Body.Close()
+ if part.e.Start != 0 && result.resp.StatusCode != http.StatusPartialContent {
+ return fmt.Errorf("expected partial content response got %v", result.resp.StatusCode)
+ }
+ copied, err := io.Copy(buf, result.resp.Body)
+ if err != nil {
+ return err
+ }
+ if copied != part.e.Length {
+ return fmt.Errorf("got %v bytes, expected %v", copied, part.e.Length)
+ }
+ return nil
+}
+
+func readRequestPartResponses(parts []requestPart) ([]byte, error) {
+ var buf bytes.Buffer
+ for _, part := range parts {
+ err := recvPartResult(&buf, part)
+ if err != nil {
+ return buf.Bytes(), err
+ }
+ }
+ return buf.Bytes(), nil
+}
--- /dev/null
+package webseed
+
+import (
+ "fmt"
+ "net/http"
+ "path"
+ "strings"
+
+ "github.com/anacrolix/torrent/metainfo"
+)
+
+// Creates a request per BEP 19.
+func NewRequest(url string, fileIndex int, info *metainfo.Info, offset, length int64) (*http.Request, error) {
+ fileInfo := info.UpvertedFiles()[fileIndex]
+ if strings.HasSuffix(url, "/") {
+ url += path.Join(append([]string{info.Name}, fileInfo.Path...)...)
+ }
+ req, err := http.NewRequest(http.MethodGet, url, nil)
+ if err != nil {
+ return nil, err
+ }
+ if offset != 0 || length != fileInfo.Length {
+ req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+length-1))
+ }
+ return req, nil
+}
tc.mu.Unlock()
err := tc.doWebsocket()
level := log.Info
+ tc.mu.Lock()
if tc.closed {
level = log.Debug
}
+ tc.mu.Unlock()
tc.Logger.WithDefaultLevel(level).Printf("websocket instance ended: %v", err)
time.Sleep(time.Minute)
tc.mu.Lock()
"github.com/anacrolix/multiless"
)
-func worseConn(l, r *PeerConn) bool {
+func worseConn(l, r *peer) bool {
less, ok := multiless.New().Bool(
l.useful(), r.useful()).CmpInt64(
l.lastHelpful().Sub(r.lastHelpful()).Nanoseconds()).CmpInt64(
}
func (me worseConnSlice) Less(i, j int) bool {
- return worseConn(me.conns[i], me.conns[j])
+ return worseConn(&me.conns[i].peer, &me.conns[j].peer)
}
func (me *worseConnSlice) Pop() interface{} {