Type reflect.Type
}
-func (this *MarshalTypeError) Error() string {
- return "bencode: unsupported type: " + this.Type.String()
+func (e *MarshalTypeError) Error() string {
+ return "bencode: unsupported type: " + e.Type.String()
}
// Unmarshal argument must be a non-nil value of some pointer type.
return tag, tag_options("")
}
-func (this tag_options) contains(option_name string) bool {
- if len(this) == 0 {
+func (opts tag_options) contains(option_name string) bool {
+ if len(opts) == 0 {
return false
}
- s := string(this)
+ s := string(opts)
for s != "" {
var next string
i := strings.Index(s, ",")
torrents map[metainfo.Hash]*Torrent
}
-func (me *Client) IPBlockList() iplist.Ranger {
- me.mu.Lock()
- defer me.mu.Unlock()
- return me.ipBlockList
+func (cl *Client) IPBlockList() iplist.Ranger {
+ cl.mu.Lock()
+ defer cl.mu.Unlock()
+ return cl.ipBlockList
}
-func (me *Client) SetIPBlockList(list iplist.Ranger) {
- me.mu.Lock()
- defer me.mu.Unlock()
- me.ipBlockList = list
- if me.dHT != nil {
- me.dHT.SetIPBlockList(list)
+func (cl *Client) SetIPBlockList(list iplist.Ranger) {
+ cl.mu.Lock()
+ defer cl.mu.Unlock()
+ cl.ipBlockList = list
+ if cl.dHT != nil {
+ cl.dHT.SetIPBlockList(list)
}
}
-func (me *Client) PeerID() string {
- return string(me.peerID[:])
+func (cl *Client) PeerID() string {
+ return string(cl.peerID[:])
}
-func (me *Client) ListenAddr() (addr net.Addr) {
- for _, l := range me.listeners {
+func (cl *Client) ListenAddr() (addr net.Addr) {
+ for _, l := range cl.listeners {
addr = l.Addr()
break
}
Hashes []metainfo.Hash
}
-func (me hashSorter) Len() int {
- return len(me.Hashes)
+func (hs hashSorter) Len() int {
+ return len(hs.Hashes)
}
-func (me hashSorter) Less(a, b int) bool {
- return (&big.Int{}).SetBytes(me.Hashes[a][:]).Cmp((&big.Int{}).SetBytes(me.Hashes[b][:])) < 0
+func (hs hashSorter) Less(a, b int) bool {
+ return (&big.Int{}).SetBytes(hs.Hashes[a][:]).Cmp((&big.Int{}).SetBytes(hs.Hashes[b][:])) < 0
}
-func (me hashSorter) Swap(a, b int) {
- me.Hashes[a], me.Hashes[b] = me.Hashes[b], me.Hashes[a]
+func (hs hashSorter) Swap(a, b int) {
+ hs.Hashes[a], hs.Hashes[b] = hs.Hashes[b], hs.Hashes[a]
}
func (cl *Client) sortedTorrents() (ret []*Torrent) {
// Stops the client. All connections to peers are closed and all activity will
// come to a halt.
-func (me *Client) Close() {
- me.mu.Lock()
- defer me.mu.Unlock()
- me.closed.Set()
- if me.dHT != nil {
- me.dHT.Close()
- }
- for _, l := range me.listeners {
+func (cl *Client) Close() {
+ cl.mu.Lock()
+ defer cl.mu.Unlock()
+ cl.closed.Set()
+ if cl.dHT != nil {
+ cl.dHT.Close()
+ }
+ for _, l := range cl.listeners {
l.Close()
}
- for _, t := range me.torrents {
+ for _, t := range cl.torrents {
t.close()
}
- me.event.Broadcast()
+ cl.event.Broadcast()
}
var ipv6BlockRange = iplist.Range{Description: "non-IPv4 address"}
return
}
-func (me *Client) torrent(ih metainfo.Hash) *Torrent {
- return me.torrents[ih]
+func (cl *Client) torrent(ih metainfo.Hash) *Torrent {
+ return cl.torrents[ih]
}
type dialResult struct {
}
// Returns whether an address is known to connect to a client with our own ID.
-func (me *Client) dopplegangerAddr(addr string) bool {
- _, ok := me.dopplegangerAddrs[addr]
+func (cl *Client) dopplegangerAddr(addr string) bool {
+ _, ok := cl.dopplegangerAddrs[addr]
return ok
}
// Start the process of connecting to the given peer for the given torrent if
// appropriate.
-func (me *Client) initiateConn(peer Peer, t *Torrent) {
- if peer.Id == me.peerID {
+func (cl *Client) initiateConn(peer Peer, t *Torrent) {
+ if peer.Id == cl.peerID {
return
}
addr := net.JoinHostPort(peer.IP.String(), fmt.Sprintf("%d", peer.Port))
- if me.dopplegangerAddr(addr) || t.addrActive(addr) {
+ if cl.dopplegangerAddr(addr) || t.addrActive(addr) {
duplicateConnsAvoided.Add(1)
return
}
- if r, ok := me.ipBlockRange(peer.IP); ok {
+ if r, ok := cl.ipBlockRange(peer.IP); ok {
log.Printf("outbound connect to %s blocked by IP blocklist rule %s", peer.IP, r)
return
}
t.halfOpen[addr] = struct{}{}
- go me.outgoingConnection(t, addr, peer.Source)
+ go cl.outgoingConnection(t, addr, peer.Source)
}
-func (me *Client) dialTimeout(t *Torrent) time.Duration {
- me.mu.Lock()
+func (cl *Client) dialTimeout(t *Torrent) time.Duration {
+ cl.mu.Lock()
pendingPeers := len(t.peers)
- me.mu.Unlock()
- return reducedDialTimeout(nominalDialTimeout, me.halfOpenLimit, pendingPeers)
+ cl.mu.Unlock()
+ return reducedDialTimeout(nominalDialTimeout, cl.halfOpenLimit, pendingPeers)
}
-func (me *Client) dialTCP(addr string, t *Torrent) (c net.Conn, err error) {
- c, err = net.DialTimeout("tcp", addr, me.dialTimeout(t))
+func (cl *Client) dialTCP(addr string, t *Torrent) (c net.Conn, err error) {
+ c, err = net.DialTimeout("tcp", addr, cl.dialTimeout(t))
if err == nil {
c.(*net.TCPConn).SetLinger(0)
}
return
}
-func (me *Client) dialUTP(addr string, t *Torrent) (c net.Conn, err error) {
- return me.utpSock.DialTimeout(addr, me.dialTimeout(t))
+func (cl *Client) dialUTP(addr string, t *Torrent) (c net.Conn, err error) {
+ return cl.utpSock.DialTimeout(addr, cl.dialTimeout(t))
}
// Returns a connection over UTP or TCP, whichever is first to connect.
-func (me *Client) dialFirst(addr string, t *Torrent) (conn net.Conn, utp bool) {
+func (cl *Client) dialFirst(addr string, t *Torrent) (conn net.Conn, utp bool) {
// Initiate connections via TCP and UTP simultaneously. Use the first one
// that succeeds.
left := 0
- if !me.config.DisableUTP {
+ if !cl.config.DisableUTP {
left++
}
- if !me.config.DisableTCP {
+ if !cl.config.DisableTCP {
left++
}
resCh := make(chan dialResult, left)
- if !me.config.DisableUTP {
- go doDial(me.dialUTP, resCh, true, addr, t)
+ if !cl.config.DisableUTP {
+ go doDial(cl.dialUTP, resCh, true, addr, t)
}
- if !me.config.DisableTCP {
- go doDial(me.dialTCP, resCh, false, addr, t)
+ if !cl.config.DisableTCP {
+ go doDial(cl.dialTCP, resCh, false, addr, t)
}
var res dialResult
// Wait for a successful connection.
return
}
-func (me *Client) noLongerHalfOpen(t *Torrent, addr string) {
+func (cl *Client) noLongerHalfOpen(t *Torrent, addr string) {
if _, ok := t.halfOpen[addr]; !ok {
panic("invariant broken")
}
delete(t.halfOpen, addr)
- me.openNewConns(t)
+ cl.openNewConns(t)
}
// Performs initiator handshakes and returns a connection. Returns nil
// *connection if no connection for valid reasons.
-func (me *Client) handshakesConnection(nc net.Conn, t *Torrent, encrypted, utp bool) (c *connection, err error) {
+func (cl *Client) handshakesConnection(nc net.Conn, t *Torrent, encrypted, utp bool) (c *connection, err error) {
c = newConnection()
c.conn = nc
c.rw = nc
if err != nil {
return
}
- ok, err := me.initiateHandshakes(c, t)
+ ok, err := cl.initiateHandshakes(c, t)
if !ok {
c = nil
}
// Returns nil connection and nil error if no connection could be established
// for valid reasons.
-func (me *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) {
- nc, utp := me.dialFirst(addr, t)
+func (cl *Client) establishOutgoingConn(t *Torrent, addr string) (c *connection, err error) {
+ nc, utp := cl.dialFirst(addr, t)
if nc == nil {
return
}
- c, err = me.handshakesConnection(nc, t, !me.config.DisableEncryption, utp)
+ c, err = cl.handshakesConnection(nc, t, !cl.config.DisableEncryption, utp)
if err != nil {
nc.Close()
return
return
}
nc.Close()
- if me.config.DisableEncryption {
+ if cl.config.DisableEncryption {
// We already tried without encryption.
return
}
// Try again without encryption, using whichever protocol type worked last
// time.
if utp {
- nc, err = me.dialUTP(addr, t)
+ nc, err = cl.dialUTP(addr, t)
} else {
- nc, err = me.dialTCP(addr, t)
+ nc, err = cl.dialTCP(addr, t)
}
if err != nil {
err = fmt.Errorf("error dialing for unencrypted connection: %s", err)
return
}
- c, err = me.handshakesConnection(nc, t, false, utp)
+ c, err = cl.handshakesConnection(nc, t, false, utp)
if err != nil || c == nil {
nc.Close()
}
// Called to dial out and run a connection. The addr we're given is already
// considered half-open.
-func (me *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
- c, err := me.establishOutgoingConn(t, addr)
- me.mu.Lock()
- defer me.mu.Unlock()
+func (cl *Client) outgoingConnection(t *Torrent, addr string, ps peerSource) {
+ c, err := cl.establishOutgoingConn(t, addr)
+ cl.mu.Lock()
+ defer cl.mu.Unlock()
// Don't release lock between here and addConnection, unless it's for
// failure.
- me.noLongerHalfOpen(t, addr)
+ cl.noLongerHalfOpen(t, addr)
if err != nil {
- if me.config.Debug {
+ if cl.config.Debug {
log.Printf("error establishing outgoing connection: %s", err)
}
return
}
defer c.Close()
c.Discovery = ps
- err = me.runInitiatedHandshookConn(c, t)
+ err = cl.runInitiatedHandshookConn(c, t)
if err != nil {
- if me.config.Debug {
+ if cl.config.Debug {
log.Printf("error in established outgoing connection: %s", err)
}
}
peerID [20]byte
)
-func (me *peerExtensionBytes) SupportsExtended() bool {
- return me[5]&0x10 != 0
+func (pex *peerExtensionBytes) SupportsExtended() bool {
+ return pex[5]&0x10 != 0
}
-func (me *peerExtensionBytes) SupportsDHT() bool {
- return me[7]&0x01 != 0
+func (pex *peerExtensionBytes) SupportsDHT() bool {
+ return pex[7]&0x01 != 0
}
-func (me *peerExtensionBytes) SupportsFast() bool {
- return me[7]&0x04 != 0
+func (pex *peerExtensionBytes) SupportsFast() bool {
+ return pex[7]&0x04 != 0
}
type handshakeResult struct {
r io.Reader
}
-func (me deadlineReader) Read(b []byte) (n int, err error) {
+func (r deadlineReader) Read(b []byte) (n int, err error) {
// Keep-alives should be received every 2 mins. Give a bit of gracetime.
- err = me.nc.SetReadDeadline(time.Now().Add(150 * time.Second))
+ err = r.nc.SetReadDeadline(time.Now().Add(150 * time.Second))
if err != nil {
err = fmt.Errorf("error setting read deadline: %s", err)
}
- n, err = me.r.Read(b)
+ n, err = r.r.Read(b)
// Convert common errors into io.EOF.
// if err != nil {
// if opError, ok := err.(*net.OpError); ok && opError.Op == "read" && opError.Err == syscall.ECONNRESET {
return
}
-func (me *Client) initiateHandshakes(c *connection, t *Torrent) (ok bool, err error) {
+func (cl *Client) initiateHandshakes(c *connection, t *Torrent) (ok bool, err error) {
if c.encrypted {
c.rw, err = mse.InitiateHandshake(c.rw, t.infoHash[:], nil)
if err != nil {
return
}
}
- ih, ok, err := me.connBTHandshake(c, &t.infoHash)
+ ih, ok, err := cl.connBTHandshake(c, &t.infoHash)
if ih != t.infoHash {
ok = false
}
return
}
-func (me *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
- if conn.PeerExtensionBytes.SupportsExtended() && me.extensionBytes.SupportsExtended() {
+func (cl *Client) sendInitialMessages(conn *connection, torrent *Torrent) {
+ if conn.PeerExtensionBytes.SupportsExtended() && cl.extensionBytes.SupportsExtended() {
conn.Post(pp.Message{
Type: pp.Extended,
ExtendedID: pp.HandshakeExtendedID,
"m": func() (ret map[string]int) {
ret = make(map[string]int, 2)
ret["ut_metadata"] = metadataExtendedId
- if !me.config.DisablePEX {
+ if !cl.config.DisablePEX {
ret["ut_pex"] = pexExtendedId
}
return
// No upload queue is implemented yet.
"reqq": 64,
}
- if !me.config.DisableEncryption {
+ if !cl.config.DisableEncryption {
d["e"] = 1
}
if torrent.metadataSizeKnown() {
d["metadata_size"] = torrent.metadataSize()
}
- if p := me.incomingPeerPort(); p != 0 {
+ if p := cl.incomingPeerPort(); p != 0 {
d["p"] = p
}
yourip, err := addrCompactIP(conn.remoteAddr())
}
if torrent.haveAnyPieces() {
conn.Bitfield(torrent.bitfield())
- } else if me.extensionBytes.SupportsFast() && conn.PeerExtensionBytes.SupportsFast() {
+ } else if cl.extensionBytes.SupportsFast() && conn.PeerExtensionBytes.SupportsFast() {
conn.Post(pp.Message{
Type: pp.HaveNone,
})
}
- if conn.PeerExtensionBytes.SupportsDHT() && me.extensionBytes.SupportsDHT() && me.dHT != nil {
+ if conn.PeerExtensionBytes.SupportsDHT() && cl.extensionBytes.SupportsDHT() && cl.dHT != nil {
conn.Post(pp.Message{
Type: pp.Port,
- Port: uint16(missinggo.AddrPort(me.dHT.Addr())),
+ Port: uint16(missinggo.AddrPort(cl.dHT.Addr())),
})
}
}
-func (me *Client) peerUnchoked(torrent *Torrent, conn *connection) {
+func (cl *Client) peerUnchoked(torrent *Torrent, conn *connection) {
conn.updateRequests()
}
return
}
-func (me *Client) upload(t *Torrent, c *connection) {
- if me.config.NoUpload {
+func (cl *Client) upload(t *Torrent, c *connection) {
+ if cl.config.NoUpload {
return
}
if !c.PeerInterested {
return
}
- seeding := me.seeding(t)
+ seeding := cl.seeding(t)
if !seeding && !t.connHasWantedPieces(c) {
return
}
for seeding || c.chunksSent < c.UsefulChunksReceived+6 {
c.Unchoke()
for r := range c.PeerRequests {
- err := me.sendChunk(t, c, r)
+ err := cl.sendChunk(t, c, r)
if err != nil {
if t.pieceComplete(int(r.Index)) && err == io.ErrUnexpectedEOF {
// We had the piece, but not anymore.
c.Choke()
}
-func (me *Client) sendChunk(t *Torrent, c *connection, r request) error {
+func (cl *Client) sendChunk(t *Torrent, c *connection, r request) error {
// Count the chunk being sent, even if it isn't.
b := make([]byte, r.Length)
p := t.info.Piece(int(r.Index))
// Processes incoming bittorrent messages. The client lock is held upon entry
// and exit.
-func (me *Client) connectionLoop(t *Torrent, c *connection) error {
+func (cl *Client) connectionLoop(t *Torrent, c *connection) error {
decoder := pp.Decoder{
R: bufio.NewReader(c.rw),
MaxLength: 256 * 1024,
}
for {
- me.mu.Unlock()
+ cl.mu.Unlock()
var msg pp.Message
err := decoder.Decode(&msg)
- me.mu.Lock()
- if me.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
+ cl.mu.Lock()
+ if cl.closed.IsSet() || c.closed.IsSet() || err == io.EOF {
return nil
}
if err != nil {
// We can then reset our interest.
c.updateRequests()
case pp.Reject:
- me.connDeleteRequest(t, c, newRequest(msg.Index, msg.Begin, msg.Length))
+ cl.connDeleteRequest(t, c, newRequest(msg.Index, msg.Begin, msg.Length))
c.updateRequests()
case pp.Unchoke:
c.PeerChoked = false
- me.peerUnchoked(t, c)
+ cl.peerUnchoked(t, c)
case pp.Interested:
c.PeerInterested = true
- me.upload(t, c)
+ cl.upload(t, c)
case pp.NotInterested:
c.PeerInterested = false
c.Choke()
c.PeerRequests = make(map[request]struct{}, maxRequests)
}
c.PeerRequests[newRequest(msg.Index, msg.Begin, msg.Length)] = struct{}{}
- me.upload(t, c)
+ cl.upload(t, c)
case pp.Cancel:
req := newRequest(msg.Index, msg.Begin, msg.Length)
if !c.PeerCancel(req) {
case pp.HaveNone:
err = c.peerSentHaveNone()
case pp.Piece:
- me.downloadedChunk(t, c, &msg)
+ cl.downloadedChunk(t, c, &msg)
case pp.Extended:
switch msg.ExtendedID {
case pp.HandshakeExtendedID:
if !ok {
log.Printf("bad metadata_size type: %T", metadata_sizeUntyped)
} else {
- t.setMetadataSize(metadata_size, me)
+ t.setMetadataSize(metadata_size, cl)
}
}
if _, ok := c.PeerExtensionIDs["ut_metadata"]; ok {
- me.requestPendingMetadata(t, c)
+ cl.requestPendingMetadata(t, c)
}
case metadataExtendedId:
- err = me.gotMetadataExtensionMsg(msg.ExtendedPayload, t, c)
+ err = cl.gotMetadataExtensionMsg(msg.ExtendedPayload, t, c)
if err != nil {
err = fmt.Errorf("error handling metadata extension message: %s", err)
}
case pexExtendedId:
- if me.config.DisablePEX {
+ if cl.config.DisablePEX {
break
}
var pexMsg peerExchangeMessage
break
}
go func() {
- me.mu.Lock()
- me.addPeers(t, func() (ret []Peer) {
+ cl.mu.Lock()
+ cl.addPeers(t, func() (ret []Peer) {
for i, cp := range pexMsg.Added {
p := Peer{
IP: make([]byte, 4),
}
return
}())
- me.mu.Unlock()
+ cl.mu.Unlock()
}()
default:
err = fmt.Errorf("unexpected extended message ID: %v", msg.ExtendedID)
}
}
case pp.Port:
- if me.dHT == nil {
+ if cl.dHT == nil {
break
}
pingAddr, err := net.ResolveUDPAddr("", c.remoteAddr().String())
if msg.Port != 0 {
pingAddr.Port = int(msg.Port)
}
- me.dHT.Ping(pingAddr)
+ cl.dHT.Ping(pingAddr)
default:
err = fmt.Errorf("received unknown message type: %#v", msg.Type)
}
}
// Returns true if connection is removed from torrent.Conns.
-func (me *Client) deleteConnection(t *Torrent, c *connection) bool {
+func (cl *Client) deleteConnection(t *Torrent, c *connection) bool {
for i0, _c := range t.conns {
if _c != c {
continue
return false
}
-func (me *Client) dropConnection(t *Torrent, c *connection) {
- me.event.Broadcast()
+func (cl *Client) dropConnection(t *Torrent, c *connection) {
+ cl.event.Broadcast()
c.Close()
- if me.deleteConnection(t, c) {
- me.openNewConns(t)
+ if cl.deleteConnection(t, c) {
+ cl.openNewConns(t)
}
}
// Returns true if the connection is added.
-func (me *Client) addConnection(t *Torrent, c *connection) bool {
- if me.closed.IsSet() {
+func (cl *Client) addConnection(t *Torrent, c *connection) bool {
+ if cl.closed.IsSet() {
return false
}
select {
return false
default:
}
- if !me.wantConns(t) {
+ if !cl.wantConns(t) {
return false
}
for _, c0 := range t.conns {
}
}
if len(t.conns) >= socketsPerTorrent {
- c := t.worstBadConn(me)
+ c := t.worstBadConn(cl)
if c == nil {
return false
}
- if me.config.Debug && missinggo.CryHeard() {
+ if cl.config.Debug && missinggo.CryHeard() {
log.Printf("%s: dropping connection to make room for new one:\n %s", t, c)
}
c.Close()
- me.deleteConnection(t, c)
+ cl.deleteConnection(t, c)
}
if len(t.conns) >= socketsPerTorrent {
panic(len(t.conns))
return t.connHasWantedPieces(c)
}
-func (me *Client) wantConns(t *Torrent) bool {
- if !me.seeding(t) && !t.needData() {
+func (cl *Client) wantConns(t *Torrent) bool {
+ if !cl.seeding(t) && !t.needData() {
return false
}
if len(t.conns) < socketsPerTorrent {
return true
}
- return t.worstBadConn(me) != nil
+ return t.worstBadConn(cl) != nil
}
-func (me *Client) openNewConns(t *Torrent) {
+func (cl *Client) openNewConns(t *Torrent) {
select {
case <-t.ceasingNetworking:
return
default:
}
for len(t.peers) != 0 {
- if !me.wantConns(t) {
+ if !cl.wantConns(t) {
return
}
- if len(t.halfOpen) >= me.halfOpenLimit {
+ if len(t.halfOpen) >= cl.halfOpenLimit {
return
}
var (
break
}
delete(t.peers, k)
- me.initiateConn(p, t)
+ cl.initiateConn(p, t)
}
t.wantPeers.Broadcast()
}
-func (me *Client) addPeers(t *Torrent, peers []Peer) {
+func (cl *Client) addPeers(t *Torrent, peers []Peer) {
for _, p := range peers {
- if me.dopplegangerAddr(net.JoinHostPort(
+ if cl.dopplegangerAddr(net.JoinHostPort(
p.IP.String(),
strconv.FormatInt(int64(p.Port), 10),
)) {
continue
}
- if _, ok := me.ipBlockRange(p.IP); ok {
+ if _, ok := cl.ipBlockRange(p.IP); ok {
continue
}
if p.Port == 0 {
// The spec says to scrub these yourselves. Fine.
continue
}
- t.addPeer(p, me)
+ t.addPeer(p, cl)
}
}
return
}
-func (me *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
- t, ok := me.torrents[infoHash]
+func (cl *Client) dropTorrent(infoHash metainfo.Hash) (err error) {
+ t, ok := cl.torrents[infoHash]
if !ok {
err = fmt.Errorf("no such torrent")
return
if err != nil {
panic(err)
}
- delete(me.torrents, infoHash)
+ delete(cl.torrents, infoHash)
return
}
// Returns true when all torrents are completely downloaded and false if the
// client is stopped before that.
-func (me *Client) WaitAll() bool {
- me.mu.Lock()
- defer me.mu.Unlock()
- for !me.allTorrentsCompleted() {
- if me.closed.IsSet() {
+func (cl *Client) WaitAll() bool {
+ cl.mu.Lock()
+ defer cl.mu.Unlock()
+ for !cl.allTorrentsCompleted() {
+ if cl.closed.IsSet() {
return false
}
- me.event.Wait()
+ cl.event.Wait()
}
return true
}
// Handle a received chunk from a peer.
-func (me *Client) downloadedChunk(t *Torrent, c *connection, msg *pp.Message) {
+func (cl *Client) downloadedChunk(t *Torrent, c *connection, msg *pp.Message) {
chunksReceived.Add(1)
req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
// Request has been satisfied.
- if me.connDeleteRequest(t, c, req) {
+ if cl.connDeleteRequest(t, c, req) {
defer c.updateRequests()
} else {
unexpectedChunksReceived.Add(1)
piece := &t.pieces[index]
// Do we actually want this chunk?
- if !t.wantChunk(req) {
+ if !t.wantPiece(req) {
unwantedChunksReceived.Add(1)
c.UnwantedChunksReceived++
return
c.UsefulChunksReceived++
c.lastUsefulChunkReceived = time.Now()
- me.upload(t, c)
+ cl.upload(t, c)
// Need to record that it hasn't been written yet, before we attempt to do
// anything with it.
// Cancel pending requests for this chunk.
for _, c := range t.conns {
- if me.connCancel(t, c, req) {
+ if cl.connCancel(t, c, req) {
c.updateRequests()
}
}
- me.mu.Unlock()
+ cl.mu.Unlock()
// Write the chunk out.
err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
- me.mu.Lock()
+ cl.mu.Lock()
piece.decrementPendingWrites()
// It's important that the piece is potentially queued before we check if
// the piece is still wanted, because if it is queued, it won't be wanted.
if t.pieceAllDirty(index) {
- me.queuePieceCheck(t, int(req.Index))
+ cl.queuePieceCheck(t, int(req.Index))
}
if c.peerTouchedPieces == nil {
}
c.peerTouchedPieces[index] = struct{}{}
- me.event.Broadcast()
+ cl.event.Broadcast()
t.publishPieceChange(int(req.Index))
return
}
// Return the connections that touched a piece, and clear the entry while
// doing it.
-func (me *Client) reapPieceTouches(t *Torrent, piece int) (ret []*connection) {
+func (cl *Client) reapPieceTouches(t *Torrent, piece int) (ret []*connection) {
for _, c := range t.conns {
if _, ok := c.peerTouchedPieces[piece]; ok {
ret = append(ret, c)
return
}
-func (me *Client) pieceHashed(t *Torrent, piece int, correct bool) {
+func (cl *Client) pieceHashed(t *Torrent, piece int, correct bool) {
p := &t.pieces[piece]
if p.EverHashed {
// Don't score the first time a piece is hashed, it could be an
}
}
p.EverHashed = true
- touchers := me.reapPieceTouches(t, piece)
+ touchers := cl.reapPieceTouches(t, piece)
if correct {
err := p.Storage().MarkComplete()
if err != nil {
} else if len(touchers) != 0 {
log.Printf("dropping %d conns that touched piece", len(touchers))
for _, c := range touchers {
- me.dropConnection(t, c)
+ cl.dropConnection(t, c)
}
}
- me.pieceChanged(t, piece)
+ cl.pieceChanged(t, piece)
}
-func (me *Client) onCompletedPiece(t *Torrent, piece int) {
+func (cl *Client) onCompletedPiece(t *Torrent, piece int) {
t.pendingPieces.Remove(piece)
t.pendAllChunkSpecs(piece)
for _, conn := range t.conns {
}
// Could check here if peer doesn't have piece, but due to caching
// some peers may have said they have a piece but they don't.
- me.upload(t, conn)
+ cl.upload(t, conn)
}
}
-func (me *Client) onFailedPiece(t *Torrent, piece int) {
+func (cl *Client) onFailedPiece(t *Torrent, piece int) {
if t.pieceAllDirty(piece) {
t.pendAllChunkSpecs(piece)
}
- if !t.wantPiece(piece) {
+ if !t.wantPieceIndex(piece) {
return
}
- me.openNewConns(t)
+ cl.openNewConns(t)
for _, conn := range t.conns {
if conn.PeerHasPiece(piece) {
conn.updateRequests()
}
}
-func (me *Client) pieceChanged(t *Torrent, piece int) {
+func (cl *Client) pieceChanged(t *Torrent, piece int) {
correct := t.pieceComplete(piece)
- defer me.event.Broadcast()
+ defer cl.event.Broadcast()
if correct {
- me.onCompletedPiece(t, piece)
+ cl.onCompletedPiece(t, piece)
} else {
- me.onFailedPiece(t, piece)
+ cl.onFailedPiece(t, piece)
}
if t.updatePiecePriority(piece) {
t.piecePriorityChanged(piece)
}
// Returns handles to all the torrents loaded in the Client.
-func (me *Client) Torrents() (ret []*Torrent) {
- me.mu.Lock()
- for _, t := range me.torrents {
+func (cl *Client) Torrents() (ret []*Torrent) {
+ cl.mu.Lock()
+ for _, t := range cl.torrents {
ret = append(ret, t)
}
- me.mu.Unlock()
+ cl.mu.Unlock()
return
}
-func (me *Client) AddMagnet(uri string) (T *Torrent, err error) {
+func (cl *Client) AddMagnet(uri string) (T *Torrent, err error) {
spec, err := TorrentSpecFromMagnetURI(uri)
if err != nil {
return
}
- T, _, err = me.AddTorrentSpec(spec)
+ T, _, err = cl.AddTorrentSpec(spec)
return
}
-func (me *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
- T, _, err = me.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
+func (cl *Client) AddTorrent(mi *metainfo.MetaInfo) (T *Torrent, err error) {
+ T, _, err = cl.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
var ss []string
missinggo.CastSlice(&ss, mi.Nodes)
- me.AddDHTNodes(ss)
+ cl.AddDHTNodes(ss)
return
}
-func (me *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
+func (cl *Client) AddTorrentFromFile(filename string) (T *Torrent, err error) {
mi, err := metainfo.LoadFromFile(filename)
if err != nil {
return
}
- return me.AddTorrent(mi)
+ return cl.AddTorrent(mi)
}
-func (me *Client) DHT() *dht.Server {
- return me.dHT
+func (cl *Client) DHT() *dht.Server {
+ return cl.dHT
}
-func (me *Client) AddDHTNodes(nodes []string) {
+func (cl *Client) AddDHTNodes(nodes []string) {
for _, n := range nodes {
hmp := missinggo.SplitHostMaybePort(n)
ip := net.ParseIP(hmp.Host)
Port: hmp.Port,
}),
}
- me.DHT().AddNode(ni)
+ cl.DHT().AddNode(ni)
}
}
type badStorage struct{}
-func (me badStorage) OpenTorrent(*metainfo.InfoEx) (storage.Torrent, error) {
- return me, nil
+func (bs badStorage) OpenTorrent(*metainfo.InfoEx) (storage.Torrent, error) {
+ return bs, nil
}
-func (me badStorage) Close() error {
+func (bs badStorage) Close() error {
return nil
}
-func (me badStorage) Piece(p metainfo.Piece) storage.Piece {
+func (bs badStorage) Piece(p metainfo.Piece) storage.Piece {
return badStoragePiece{p}
}
p metainfo.Piece
}
-func (me badStoragePiece) WriteAt(b []byte, off int64) (int, error) {
+func (p badStoragePiece) WriteAt(b []byte, off int64) (int, error) {
return 0, nil
}
-func (me badStoragePiece) GetIsComplete() bool {
+func (p badStoragePiece) GetIsComplete() bool {
return true
}
-func (me badStoragePiece) MarkComplete() error {
+func (p badStoragePiece) MarkComplete() error {
return errors.New("psyyyyyyyche")
}
-func (me badStoragePiece) randomlyTruncatedDataString() string {
+func (p badStoragePiece) randomlyTruncatedDataString() string {
return "hello, world\n"[:rand.Intn(14)]
}
-func (me badStoragePiece) ReadAt(b []byte, off int64) (n int, err error) {
- r := strings.NewReader(me.randomlyTruncatedDataString())
- return r.ReadAt(b, off+me.p.Offset())
+func (p badStoragePiece) ReadAt(b []byte, off int64) (n int, err error) {
+ r := strings.NewReader(p.randomlyTruncatedDataString())
+ return r.ReadAt(b, off+p.p.Offset())
}
// We read from a piece which is marked completed, but is missing data.
)
}
-func (c *connection) Close() {
- c.closed.Set()
- c.discardPieceInclination()
- c.pieceRequestOrder.Clear()
+func (cn *connection) Close() {
+ cn.closed.Set()
+ cn.discardPieceInclination()
+ cn.pieceRequestOrder.Clear()
// TODO: This call blocks sometimes, why?
- go c.conn.Close()
+ go cn.conn.Close()
}
-func (c *connection) PeerHasPiece(piece int) bool {
- return c.peerHasAll || c.peerPieces.Contains(piece)
+func (cn *connection) PeerHasPiece(piece int) bool {
+ return cn.peerHasAll || cn.peerPieces.Contains(piece)
}
-func (c *connection) Post(msg pp.Message) {
+func (cn *connection) Post(msg pp.Message) {
select {
- case c.post <- msg:
+ case cn.post <- msg:
postedMessageTypes.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
- case <-c.closed.C():
+ case <-cn.closed.C():
}
}
-func (c *connection) RequestPending(r request) bool {
- _, ok := c.Requests[r]
+func (cn *connection) RequestPending(r request) bool {
+ _, ok := cn.Requests[r]
return ok
}
-func (c *connection) requestMetadataPiece(index int) {
- eID := c.PeerExtensionIDs["ut_metadata"]
+func (cn *connection) requestMetadataPiece(index int) {
+ eID := cn.PeerExtensionIDs["ut_metadata"]
if eID == 0 {
return
}
- if index < len(c.metadataRequests) && c.metadataRequests[index] {
+ if index < len(cn.metadataRequests) && cn.metadataRequests[index] {
return
}
- c.Post(pp.Message{
+ cn.Post(pp.Message{
Type: pp.Extended,
ExtendedID: eID,
ExtendedPayload: func() []byte {
return b
}(),
})
- for index >= len(c.metadataRequests) {
- c.metadataRequests = append(c.metadataRequests, false)
+ for index >= len(cn.metadataRequests) {
+ cn.metadataRequests = append(cn.metadataRequests, false)
}
- c.metadataRequests[index] = true
+ cn.metadataRequests[index] = true
}
-func (c *connection) requestedMetadataPiece(index int) bool {
- return index < len(c.metadataRequests) && c.metadataRequests[index]
+func (cn *connection) requestedMetadataPiece(index int) bool {
+ return index < len(cn.metadataRequests) && cn.metadataRequests[index]
}
// The actual value to use as the maximum outbound requests.
-func (c *connection) nominalMaxRequests() (ret int) {
- ret = c.PeerMaxRequests
+func (cn *connection) nominalMaxRequests() (ret int) {
+ ret = cn.PeerMaxRequests
if ret > 64 {
ret = 64
}
}
// Returns true if more requests can be sent.
-func (c *connection) Request(chunk request) bool {
- if len(c.Requests) >= c.nominalMaxRequests() {
+func (cn *connection) Request(chunk request) bool {
+ if len(cn.Requests) >= cn.nominalMaxRequests() {
return false
}
- if !c.PeerHasPiece(int(chunk.Index)) {
+ if !cn.PeerHasPiece(int(chunk.Index)) {
return true
}
- if c.RequestPending(chunk) {
+ if cn.RequestPending(chunk) {
return true
}
- c.SetInterested(true)
- if c.PeerChoked {
+ cn.SetInterested(true)
+ if cn.PeerChoked {
return false
}
- if c.Requests == nil {
- c.Requests = make(map[request]struct{}, c.PeerMaxRequests)
+ if cn.Requests == nil {
+ cn.Requests = make(map[request]struct{}, cn.PeerMaxRequests)
}
- c.Requests[chunk] = struct{}{}
- c.requestsLowWater = len(c.Requests) / 2
- c.Post(pp.Message{
+ cn.Requests[chunk] = struct{}{}
+ cn.requestsLowWater = len(cn.Requests) / 2
+ cn.Post(pp.Message{
Type: pp.Request,
Index: chunk.Index,
Begin: chunk.Begin,
}
// Returns true if an unsatisfied request was canceled.
-func (c *connection) Cancel(r request) bool {
- if c.Requests == nil {
+func (cn *connection) Cancel(r request) bool {
+ if cn.Requests == nil {
return false
}
- if _, ok := c.Requests[r]; !ok {
+ if _, ok := cn.Requests[r]; !ok {
return false
}
- delete(c.Requests, r)
- c.Post(pp.Message{
+ delete(cn.Requests, r)
+ cn.Post(pp.Message{
Type: pp.Cancel,
Index: r.Index,
Begin: r.Begin,
}
// Returns true if an unsatisfied request was canceled.
-func (c *connection) PeerCancel(r request) bool {
- if c.PeerRequests == nil {
+func (cn *connection) PeerCancel(r request) bool {
+ if cn.PeerRequests == nil {
return false
}
- if _, ok := c.PeerRequests[r]; !ok {
+ if _, ok := cn.PeerRequests[r]; !ok {
return false
}
- delete(c.PeerRequests, r)
+ delete(cn.PeerRequests, r)
return true
}
-func (c *connection) Choke() {
- if c.Choked {
+func (cn *connection) Choke() {
+ if cn.Choked {
return
}
- c.Post(pp.Message{
+ cn.Post(pp.Message{
Type: pp.Choke,
})
- c.PeerRequests = nil
- c.Choked = true
+ cn.PeerRequests = nil
+ cn.Choked = true
}
-func (c *connection) Unchoke() {
- if !c.Choked {
+func (cn *connection) Unchoke() {
+ if !cn.Choked {
return
}
- c.Post(pp.Message{
+ cn.Post(pp.Message{
Type: pp.Unchoke,
})
- c.Choked = false
+ cn.Choked = false
}
-func (c *connection) SetInterested(interested bool) {
- if c.Interested == interested {
+func (cn *connection) SetInterested(interested bool) {
+ if cn.Interested == interested {
return
}
- c.Post(pp.Message{
+ cn.Post(pp.Message{
Type: func() pp.MessageType {
if interested {
return pp.Interested
}
}(),
})
- c.Interested = interested
+ cn.Interested = interested
}
var (
)
// Writes buffers to the socket from the write channel.
-func (conn *connection) writer() {
+func (cn *connection) writer() {
defer func() {
- conn.t.cl.mu.Lock()
- defer conn.t.cl.mu.Unlock()
- conn.Close()
+ cn.t.cl.mu.Lock()
+ defer cn.t.cl.mu.Unlock()
+ cn.Close()
}()
// Reduce write syscalls.
- buf := bufio.NewWriter(conn.rw)
+ buf := bufio.NewWriter(cn.rw)
for {
if buf.Buffered() == 0 {
// There's nothing to write, so block until we get something.
select {
- case b, ok := <-conn.writeCh:
+ case b, ok := <-cn.writeCh:
if !ok {
return
}
if err != nil {
return
}
- case <-conn.closed.C():
+ case <-cn.closed.C():
return
}
} else {
// We already have something to write, so flush if there's nothing
// more to write.
select {
- case b, ok := <-conn.writeCh:
+ case b, ok := <-cn.writeCh:
if !ok {
return
}
if err != nil {
return
}
- case <-conn.closed.C():
+ case <-cn.closed.C():
return
default:
connectionWriterFlush.Add(1)
}
}
-func (conn *connection) writeOptimizer(keepAliveDelay time.Duration) {
- defer close(conn.writeCh) // Responsible for notifying downstream routines.
- pending := list.New() // Message queue.
- var nextWrite []byte // Set to nil if we need to need to marshal the next message.
+func (cn *connection) writeOptimizer(keepAliveDelay time.Duration) {
+ defer close(cn.writeCh) // Responsible for notifying downstream routines.
+ pending := list.New() // Message queue.
+ var nextWrite []byte // Set to nil if we need to need to marshal the next message.
timer := time.NewTimer(keepAliveDelay)
defer timer.Stop()
lastWrite := time.Now()
for {
- write := conn.writeCh // Set to nil if there's nothing to write.
+ write := cn.writeCh // Set to nil if there's nothing to write.
if pending.Len() == 0 {
write = nil
} else if nextWrite == nil {
}
pending.PushBack(pp.Message{Keepalive: true})
postedKeepalives.Add(1)
- case msg, ok := <-conn.post:
+ case msg, ok := <-cn.post:
if !ok {
return
}
if pending.Len() == 0 {
timer.Reset(keepAliveDelay)
}
- case <-conn.closed.C():
+ case <-cn.closed.C():
return
}
}
cn.sentHaves = haves
}
-func (c *connection) updateRequests() {
- if !c.t.haveInfo() {
+func (cn *connection) updateRequests() {
+ if !cn.t.haveInfo() {
return
}
- if c.Interested {
- if c.PeerChoked {
+ if cn.Interested {
+ if cn.PeerChoked {
return
}
- if len(c.Requests) > c.requestsLowWater {
+ if len(cn.Requests) > cn.requestsLowWater {
return
}
}
- c.fillRequests()
- if len(c.Requests) == 0 && !c.PeerChoked {
+ cn.fillRequests()
+ if len(cn.Requests) == 0 && !cn.PeerChoked {
// So we're not choked, but we don't want anything right now. We may
// have completed readahead, and the readahead window has not rolled
// over to the next piece. Better to stay interested in case we're
// going to want data in the near future.
- c.SetInterested(!c.t.haveAllPieces())
+ cn.SetInterested(!cn.t.haveAllPieces())
}
}
-func (c *connection) fillRequests() {
- c.pieceRequestOrder.IterTyped(func(piece int) (more bool) {
- if c.t.cl.config.Debug && c.t.havePiece(piece) {
+func (cn *connection) fillRequests() {
+ cn.pieceRequestOrder.IterTyped(func(piece int) (more bool) {
+ if cn.t.cl.config.Debug && cn.t.havePiece(piece) {
panic(piece)
}
- return c.requestPiecePendingChunks(piece)
+ return cn.requestPiecePendingChunks(piece)
})
}
-func (c *connection) requestPiecePendingChunks(piece int) (again bool) {
- return c.t.connRequestPiecePendingChunks(c, piece)
+func (cn *connection) requestPiecePendingChunks(piece int) (again bool) {
+ return cn.t.connRequestPiecePendingChunks(cn, piece)
}
-func (c *connection) stopRequestingPiece(piece int) {
- c.pieceRequestOrder.Remove(piece)
+func (cn *connection) stopRequestingPiece(piece int) {
+ cn.pieceRequestOrder.Remove(piece)
}
-func (c *connection) updatePiecePriority(piece int) {
- tpp := c.t.piecePriority(piece)
- if !c.PeerHasPiece(piece) {
+func (cn *connection) updatePiecePriority(piece int) {
+ tpp := cn.t.piecePriority(piece)
+ if !cn.PeerHasPiece(piece) {
tpp = PiecePriorityNone
}
if tpp == PiecePriorityNone {
- c.stopRequestingPiece(piece)
+ cn.stopRequestingPiece(piece)
return
}
- prio := c.getPieceInclination()[piece]
+ prio := cn.getPieceInclination()[piece]
switch tpp {
case PiecePriorityNormal:
case PiecePriorityReadahead:
- prio -= c.t.numPieces()
+ prio -= cn.t.numPieces()
case PiecePriorityNext, PiecePriorityNow:
- prio -= 2 * c.t.numPieces()
+ prio -= 2 * cn.t.numPieces()
default:
panic(tpp)
}
prio += piece
- c.pieceRequestOrder.Set(piece, prio)
- c.updateRequests()
+ cn.pieceRequestOrder.Set(piece, prio)
+ cn.updateRequests()
}
-func (c *connection) getPieceInclination() []int {
- if c.pieceInclination == nil {
- c.pieceInclination = c.t.getConnPieceInclination()
+func (cn *connection) getPieceInclination() []int {
+ if cn.pieceInclination == nil {
+ cn.pieceInclination = cn.t.getConnPieceInclination()
}
- return c.pieceInclination
+ return cn.pieceInclination
}
-func (c *connection) discardPieceInclination() {
- if c.pieceInclination == nil {
+func (cn *connection) discardPieceInclination() {
+ if cn.pieceInclination == nil {
return
}
- c.t.putPieceInclination(c.pieceInclination)
- c.pieceInclination = nil
+ cn.t.putPieceInclination(cn.pieceInclination)
+ cn.pieceInclination = nil
}
-func (c *connection) peerHasPieceChanged(piece int) {
- c.updatePiecePriority(piece)
+func (cn *connection) peerHasPieceChanged(piece int) {
+ cn.updatePiecePriority(piece)
}
-func (c *connection) peerPiecesChanged() {
- if c.t.haveInfo() {
- for i := range iter.N(c.t.numPieces()) {
- c.peerHasPieceChanged(i)
+func (cn *connection) peerPiecesChanged() {
+ if cn.t.haveInfo() {
+ for i := range iter.N(cn.t.numPieces()) {
+ cn.peerHasPieceChanged(i)
}
}
}
-func (c *connection) raisePeerMinPieces(newMin int) {
- if newMin > c.peerMinPieces {
- c.peerMinPieces = newMin
+func (cn *connection) raisePeerMinPieces(newMin int) {
+ if newMin > cn.peerMinPieces {
+ cn.peerMinPieces = newMin
}
}
-func (c *connection) peerSentHave(piece int) error {
- if c.t.haveInfo() && piece >= c.t.numPieces() {
+func (cn *connection) peerSentHave(piece int) error {
+ if cn.t.haveInfo() && piece >= cn.t.numPieces() {
return errors.New("invalid piece")
}
- if c.PeerHasPiece(piece) {
+ if cn.PeerHasPiece(piece) {
return nil
}
- c.raisePeerMinPieces(piece + 1)
- c.peerPieces.Set(piece, true)
- c.peerHasPieceChanged(piece)
+ cn.raisePeerMinPieces(piece + 1)
+ cn.peerPieces.Set(piece, true)
+ cn.peerHasPieceChanged(piece)
return nil
}
-func (c *connection) peerSentBitfield(bf []bool) error {
- c.peerHasAll = false
+func (cn *connection) peerSentBitfield(bf []bool) error {
+ cn.peerHasAll = false
if len(bf)%8 != 0 {
panic("expected bitfield length divisible by 8")
}
// We know that the last byte means that at most the last 7 bits are
// wasted.
- c.raisePeerMinPieces(len(bf) - 7)
- if c.t.haveInfo() {
+ cn.raisePeerMinPieces(len(bf) - 7)
+ if cn.t.haveInfo() {
// Ignore known excess pieces.
- bf = bf[:c.t.numPieces()]
+ bf = bf[:cn.t.numPieces()]
}
for i, have := range bf {
if have {
- c.raisePeerMinPieces(i + 1)
+ cn.raisePeerMinPieces(i + 1)
}
- c.peerPieces.Set(i, have)
+ cn.peerPieces.Set(i, have)
}
- c.peerPiecesChanged()
+ cn.peerPiecesChanged()
return nil
}
return nil
}
-func (c *connection) peerSentHaveNone() error {
- c.peerPieces.Clear()
- c.peerHasAll = false
- c.peerPiecesChanged()
+func (cn *connection) peerSentHaveNone() error {
+ cn.peerPieces.Clear()
+ cn.peerHasAll = false
+ cn.peerPiecesChanged()
return nil
}
}
// Returns the number of distinct remote addresses the announce has queried.
-func (me *Announce) NumContacted() int {
- me.mu.Lock()
- defer me.mu.Unlock()
- return me.numContacted
+func (a *Announce) NumContacted() int {
+ a.mu.Lock()
+ defer a.mu.Unlock()
+ return a.numContacted
}
// This is kind of the main thing you want to do with DHT. It traverses the
return disc, nil
}
-func (me *Announce) gotNodeAddr(addr Addr) {
+func (a *Announce) gotNodeAddr(addr Addr) {
if addr.UDPAddr().Port == 0 {
// Not a contactable address.
return
}
- if me.triedAddrs.Test([]byte(addr.String())) {
+ if a.triedAddrs.Test([]byte(addr.String())) {
return
}
- if me.server.ipBlocked(addr.UDPAddr().IP) {
+ if a.server.ipBlocked(addr.UDPAddr().IP) {
return
}
- me.server.mu.Lock()
- if me.server.badNodes.Test([]byte(addr.String())) {
- me.server.mu.Unlock()
+ a.server.mu.Lock()
+ if a.server.badNodes.Test([]byte(addr.String())) {
+ a.server.mu.Unlock()
return
}
- me.server.mu.Unlock()
- me.contact(addr)
+ a.server.mu.Unlock()
+ a.contact(addr)
}
-func (me *Announce) contact(addr Addr) {
- me.numContacted++
- me.triedAddrs.Add([]byte(addr.String()))
- if err := me.getPeers(addr); err != nil {
+func (a *Announce) contact(addr Addr) {
+ a.numContacted++
+ a.triedAddrs.Add([]byte(addr.String()))
+ if err := a.getPeers(addr); err != nil {
log.Printf("error sending get_peers request to %s: %#v", addr, err)
return
}
- me.pending++
+ a.pending++
}
-func (me *Announce) transactionClosed() {
- me.pending--
- if me.pending == 0 {
- me.close()
+func (a *Announce) transactionClosed() {
+ a.pending--
+ if a.pending == 0 {
+ a.close()
return
}
}
-func (me *Announce) responseNode(node NodeInfo) {
- me.gotNodeAddr(node.Addr)
+func (a *Announce) responseNode(node NodeInfo) {
+ a.gotNodeAddr(node.Addr)
}
-func (me *Announce) closingCh() chan struct{} {
- return me.stop
+func (a *Announce) closingCh() chan struct{} {
+ return a.stop
}
// Announce to a peer, if appropriate.
-func (me *Announce) maybeAnnouncePeer(to Addr, token, peerId string) {
- me.server.mu.Lock()
- defer me.server.mu.Unlock()
- if !me.server.config.NoSecurity {
+func (a *Announce) maybeAnnouncePeer(to Addr, token, peerId string) {
+ a.server.mu.Lock()
+ defer a.server.mu.Unlock()
+ if !a.server.config.NoSecurity {
if len(peerId) != 20 {
return
}
return
}
}
- err := me.server.announcePeer(to, me.infoHash, me.announcePort, token, me.announcePortImplied)
+ err := a.server.announcePeer(to, a.infoHash, a.announcePort, token, a.announcePortImplied)
if err != nil {
logonce.Stderr.Printf("error announcing peer: %s", err)
}
}
-func (me *Announce) getPeers(addr Addr) error {
- me.server.mu.Lock()
- defer me.server.mu.Unlock()
- t, err := me.server.getPeers(addr, me.infoHash)
+func (a *Announce) getPeers(addr Addr) error {
+ a.server.mu.Lock()
+ defer a.server.mu.Unlock()
+ t, err := a.server.getPeers(addr, a.infoHash)
if err != nil {
return err
}
t.SetResponseHandler(func(m Msg, ok bool) {
// Register suggested nodes closer to the target info-hash.
if m.R != nil {
- me.mu.Lock()
+ a.mu.Lock()
for _, n := range m.R.Nodes {
- me.responseNode(n)
+ a.responseNode(n)
}
- me.mu.Unlock()
+ a.mu.Unlock()
if vs := m.R.Values; len(vs) != 0 {
nodeInfo := NodeInfo{
}
copy(nodeInfo.ID[:], m.SenderID())
select {
- case me.values <- PeersValues{
+ case a.values <- PeersValues{
Peers: func() (ret []Peer) {
for _, cp := range vs {
ret = append(ret, Peer(cp))
}(),
NodeInfo: nodeInfo,
}:
- case <-me.stop:
+ case <-a.stop:
}
}
- me.maybeAnnouncePeer(addr, m.R.Token, m.SenderID())
+ a.maybeAnnouncePeer(addr, m.R.Token, m.SenderID())
}
- me.mu.Lock()
- me.transactionClosed()
- me.mu.Unlock()
+ a.mu.Lock()
+ a.transactionClosed()
+ a.mu.Unlock()
})
return nil
}
}
// Stop the announce.
-func (me *Announce) Close() {
- me.mu.Lock()
- defer me.mu.Unlock()
- me.close()
+func (a *Announce) Close() {
+ a.mu.Lock()
+ defer a.mu.Unlock()
+ a.close()
}
-func (ps *Announce) close() {
+func (a *Announce) close() {
select {
- case <-ps.stop:
+ case <-a.stop:
default:
- close(ps.stop)
+ close(a.stop)
}
}
Target nodeID
}
-func (me nodeMaxHeap) Len() int { return len(me.IDs) }
+func (mh nodeMaxHeap) Len() int { return len(mh.IDs) }
-func (me nodeMaxHeap) Less(i, j int) bool {
- m := me.IDs[i].Distance(&me.Target)
- n := me.IDs[j].Distance(&me.Target)
+func (mh nodeMaxHeap) Less(i, j int) bool {
+ m := mh.IDs[i].Distance(&mh.Target)
+ n := mh.IDs[j].Distance(&mh.Target)
return m.Cmp(&n) > 0
}
-func (me *nodeMaxHeap) Pop() (ret interface{}) {
- ret, me.IDs = me.IDs[len(me.IDs)-1], me.IDs[:len(me.IDs)-1]
+func (mh *nodeMaxHeap) Pop() (ret interface{}) {
+ ret, mh.IDs = mh.IDs[len(mh.IDs)-1], mh.IDs[:len(mh.IDs)-1]
return
}
-func (me *nodeMaxHeap) Push(val interface{}) {
- me.IDs = append(me.IDs, val.(nodeID))
+func (mh *nodeMaxHeap) Push(val interface{}) {
+ mh.IDs = append(mh.IDs, val.(nodeID))
}
-func (me nodeMaxHeap) Swap(i, j int) {
- me.IDs[i], me.IDs[j] = me.IDs[j], me.IDs[i]
+func (mh nodeMaxHeap) Swap(i, j int) {
+ mh.IDs[i], mh.IDs[j] = mh.IDs[j], mh.IDs[i]
}
type closestNodesSelector struct {
k int
}
-func (me *closestNodesSelector) Push(id nodeID) {
- heap.Push(&me.closest, id)
- if me.closest.Len() > me.k {
- heap.Pop(&me.closest)
+func (cns *closestNodesSelector) Push(id nodeID) {
+ heap.Push(&cns.closest, id)
+ if cns.closest.Len() > cns.k {
+ heap.Pop(&cns.closest)
}
}
-func (me *closestNodesSelector) IDs() []nodeID {
- return me.closest.IDs
+func (cns *closestNodesSelector) IDs() []nodeID {
+ return cns.closest.IDs
}
func newKClosestNodesSelector(k int, targetID nodeID) (ret closestNodesSelector) {
var _ bencode.Unmarshaler = &CompactIPv4NodeInfo{}
-func (me *CompactIPv4NodeInfo) UnmarshalBencode(_b []byte) (err error) {
+func (i *CompactIPv4NodeInfo) UnmarshalBencode(_b []byte) (err error) {
var b []byte
err = bencode.Unmarshal(_b, &b)
if err != nil {
err = fmt.Errorf("bad length: %d", len(b))
return
}
- for i := 0; i < len(b); i += CompactIPv4NodeInfoLen {
+ for k := 0; k < len(b); k += CompactIPv4NodeInfoLen {
var ni NodeInfo
- err = ni.UnmarshalCompactIPv4(b[i : i+CompactIPv4NodeInfoLen])
+ err = ni.UnmarshalCompactIPv4(b[k : k+CompactIPv4NodeInfoLen])
if err != nil {
return
}
- *me = append(*me, ni)
+ *i = append(*i, ni)
}
return
}
-func (me CompactIPv4NodeInfo) MarshalBencode() (ret []byte, err error) {
+func (i CompactIPv4NodeInfo) MarshalBencode() (ret []byte, err error) {
var buf bytes.Buffer
- for _, ni := range me {
+ for _, ni := range i {
buf.Write(ni.ID[:])
if ni.Addr == nil {
err = errors.New("nil addr in node info")
Port int
}
-func (me *Peer) String() string {
- return net.JoinHostPort(me.IP.String(), strconv.FormatInt(int64(me.Port), 10))
+func (p *Peer) String() string {
+ return net.JoinHostPort(p.IP.String(), strconv.FormatInt(int64(p.Port), 10))
}
func bootstrapAddrs(nodeAddrs []string) (addrs []*net.UDPAddr, err error) {
_ error = KRPCError{}
)
-func (me *KRPCError) UnmarshalBencode(_b []byte) (err error) {
+func (e *KRPCError) UnmarshalBencode(_b []byte) (err error) {
var _v interface{}
err = bencode.Unmarshal(_b, &_v)
if err != nil {
}
switch v := _v.(type) {
case []interface{}:
- me.Code = int(v[0].(int64))
- me.Msg = v[1].(string)
+ e.Code = int(v[0].(int64))
+ e.Msg = v[1].(string)
case string:
- me.Msg = v
+ e.Msg = v
default:
err = fmt.Errorf(`KRPC error bencode value has unexpected type: %T`, _v)
}
return
}
-func (me KRPCError) MarshalBencode() (ret []byte, err error) {
- return bencode.Marshal([]interface{}{me.Code, me.Msg})
+func (e KRPCError) MarshalBencode() (ret []byte, err error) {
+ return bencode.Marshal([]interface{}{e.Code, e.Msg})
}
-func (me KRPCError) Error() string {
- return fmt.Sprintf("KRPC error %d: %s", me.Code, me.Msg)
+func (e KRPCError) Error() string {
+ return fmt.Sprintf("KRPC error %d: %s", e.Code, e.Msg)
}
return nil
}
-func (cni *NodeInfo) UnmarshalCompactIPv4(b []byte) error {
+func (ni *NodeInfo) UnmarshalCompactIPv4(b []byte) error {
if len(b) != CompactIPv4NodeInfoLen {
return errors.New("expected 26 bytes")
}
- missinggo.CopyExact(cni.ID[:], b[:20])
- cni.Addr = NewAddr(&net.UDPAddr{
+ missinggo.CopyExact(ni.ID[:], b[:20])
+ ni.Addr = NewAddr(&net.UDPAddr{
IP: append(make([]byte, 0, 4), b[20:24]...),
Port: int(binary.BigEndian.Uint16(b[24:26])),
})
return ret
}
-func (me *Server) badNode(addr Addr) {
- me.badNodes.Add([]byte(addr.String()))
- delete(me.nodes, addr.String())
+func (s *Server) badNode(addr Addr) {
+ s.badNodes.Add([]byte(addr.String()))
+ delete(s.nodes, addr.String())
}
return f.offset
}
-func (f File) FileInfo() metainfo.FileInfo {
+func (f *File) FileInfo() metainfo.FileInfo {
return f.fi
}
-func (f File) Path() string {
+func (f *File) Path() string {
return f.path
}
return nil
}
-func (me rootNode) Lookup(ctx context.Context, name string) (_node fusefs.Node, err error) {
- for _, t := range me.fs.Client.Torrents() {
+func (rn rootNode) Lookup(ctx context.Context, name string) (_node fusefs.Node, err error) {
+ for _, t := range rn.fs.Client.Torrents() {
info := t.Info()
if t.Name() != name || info == nil {
continue
}
__node := node{
metadata: info,
- FS: me.fs,
+ FS: rn.fs,
t: t,
}
if !info.IsDir() {
return
}
-func (me rootNode) ReadDirAll(ctx context.Context) (dirents []fuse.Dirent, err error) {
- for _, t := range me.fs.Client.Torrents() {
+func (rn rootNode) ReadDirAll(ctx context.Context) (dirents []fuse.Dirent, err error) {
+ for _, t := range rn.fs.Client.Torrents() {
info := t.Info()
if info == nil {
continue
return
}
-func (rootNode) Attr(ctx context.Context, attr *fuse.Attr) error {
+func (rn rootNode) Attr(ctx context.Context, attr *fuse.Attr) error {
attr.Mode = os.ModeDir
return nil
}
// TODO(anacrolix): Why should rootNode implement this?
-func (me rootNode) Forget() {
- me.fs.Destroy()
+func (rn rootNode) Forget() {
+ rn.fs.Destroy()
}
func (tfs *TorrentFS) Root() (fusefs.Node, error) {
return rootNode{tfs}, nil
}
-func (me *TorrentFS) Destroy() {
- me.mu.Lock()
+func (tfs *TorrentFS) Destroy() {
+ tfs.mu.Lock()
select {
- case <-me.destroyed:
+ case <-tfs.destroyed:
default:
- close(me.destroyed)
+ close(tfs.destroyed)
}
- me.mu.Unlock()
+ tfs.mu.Unlock()
}
func New(cl *torrent.Client) *TorrentFS {
Metainfo *metainfo.MetaInfo
}
-func (me *testLayout) Destroy() error {
- return os.RemoveAll(me.BaseDir)
+func (tl *testLayout) Destroy() error {
+ return os.RemoveAll(tl.BaseDir)
}
func newGreetingLayout() (tl testLayout, err error) {
}
}
-func (me *IPList) NumRanges() int {
- if me == nil {
+func (ipl *IPList) NumRanges() int {
+ if ipl == nil {
return 0
}
- return len(me.ranges)
+ return len(ipl.ranges)
}
// Return the range the given IP is in. Returns nil if no range is found.
-func (me *IPList) Lookup(ip net.IP) (r Range, ok bool) {
- if me == nil {
+func (ipl *IPList) Lookup(ip net.IP) (r Range, ok bool) {
+ if ipl == nil {
return
}
// TODO: Perhaps all addresses should be converted to IPv6, if the future
// memory for IPv4 addresses?
v4 := ip.To4()
if v4 != nil {
- r, ok = me.lookup(v4)
+ r, ok = ipl.lookup(v4)
if ok {
return
}
}
v6 := ip.To16()
if v6 != nil {
- return me.lookup(v6)
+ return ipl.lookup(v6)
}
if v4 == nil && v6 == nil {
r = Range{
}
// Return the range the given IP is in. Returns nil if no range is found.
-func (me *IPList) lookup(ip net.IP) (Range, bool) {
+func (ipl *IPList) lookup(ip net.IP) (Range, bool) {
return lookup(func(i int) net.IP {
- return me.ranges[i].First
+ return ipl.ranges[i].First
}, func(i int) Range {
- return me.ranges[i]
- }, len(me.ranges), ip)
+ return ipl.ranges[i]
+ }, len(ipl.ranges), ip)
}
func minifyIP(ip *net.IP) {
packedRangeLen = 20
)
-func (me *IPList) WritePacked(w io.Writer) (err error) {
- descOffsets := make(map[string]int64, len(me.ranges))
- descs := make([]string, 0, len(me.ranges))
+func (ipl *IPList) WritePacked(w io.Writer) (err error) {
+ descOffsets := make(map[string]int64, len(ipl.ranges))
+ descs := make([]string, 0, len(ipl.ranges))
var nextOffset int64
// This is a little monadic, no?
write := func(b []byte, expectedLen int) {
}
}
var b [8]byte
- binary.LittleEndian.PutUint64(b[:], uint64(len(me.ranges)))
+ binary.LittleEndian.PutUint64(b[:], uint64(len(ipl.ranges)))
write(b[:], 8)
- for _, r := range me.ranges {
+ for _, r := range ipl.ranges {
write(r.First.To4(), 4)
write(r.Last.To4(), 4)
descOff, ok := descOffsets[r.Description]
var _ Ranger = PackedIPList{}
-func (me PackedIPList) len() int {
- return int(binary.LittleEndian.Uint64(me[:8]))
+func (pil PackedIPList) len() int {
+ return int(binary.LittleEndian.Uint64(pil[:8]))
}
-func (me PackedIPList) NumRanges() int {
- return me.len()
+func (pil PackedIPList) NumRanges() int {
+ return pil.len()
}
-func (me PackedIPList) getFirst(i int) net.IP {
+func (pil PackedIPList) getFirst(i int) net.IP {
off := packedRangesOffset + packedRangeLen*i
- return net.IP(me[off : off+4])
+ return net.IP(pil[off : off+4])
}
-func (me PackedIPList) getRange(i int) (ret Range) {
+func (pil PackedIPList) getRange(i int) (ret Range) {
rOff := packedRangesOffset + packedRangeLen*i
- last := me[rOff+4 : rOff+8]
- descOff := int(binary.LittleEndian.Uint64(me[rOff+8:]))
- descLen := int(binary.LittleEndian.Uint32(me[rOff+16:]))
- descOff += packedRangesOffset + packedRangeLen*me.len()
+ last := pil[rOff+4 : rOff+8]
+ descOff := int(binary.LittleEndian.Uint64(pil[rOff+8:]))
+ descLen := int(binary.LittleEndian.Uint32(pil[rOff+16:]))
+ descOff += packedRangesOffset + packedRangeLen*pil.len()
ret = Range{
- me.getFirst(i),
+ pil.getFirst(i),
net.IP(last),
- string(me[descOff : descOff+descLen]),
+ string(pil[descOff : descOff+descLen]),
}
return
}
-func (me PackedIPList) Lookup(ip net.IP) (r Range, ok bool) {
+func (pil PackedIPList) Lookup(ip net.IP) (r Range, ok bool) {
ip4 := ip.To4()
if ip4 == nil {
// If the IP list was built successfully, then it only contained IPv4
}
return
}
- return lookup(me.getFirst, me.getRange, me.len(), ip4)
+ return lookup(pil.getFirst, pil.getRange, pil.len(), ip4)
}
func MMapPacked(filename string) (ret Ranger, err error) {
// 20-byte SHA1 hash used for info and pieces.
type Hash [20]byte
-func (me Hash) Bytes() []byte {
- return me[:]
+func (h Hash) Bytes() []byte {
+ return h[:]
}
-func (ih *Hash) AsString() string {
- return string(ih[:])
+func (h *Hash) AsString() string {
+ return string(h[:])
}
-func (ih Hash) HexString() string {
- return fmt.Sprintf("%x", ih[:])
+func (h Hash) HexString() string {
+ return fmt.Sprintf("%x", h[:])
}
return nil
}
-func (me *Info) TotalLength() (ret int64) {
- if me.IsDir() {
- for _, fi := range me.Files {
+func (info *Info) TotalLength() (ret int64) {
+ if info.IsDir() {
+ for _, fi := range info.Files {
ret += fi.Length
}
} else {
- ret = me.Length
+ ret = info.Length
}
return
}
-func (me *Info) NumPieces() int {
- if len(me.Pieces)%20 != 0 {
- panic(len(me.Pieces))
+func (info *Info) NumPieces() int {
+ if len(info.Pieces)%20 != 0 {
+ panic(len(info.Pieces))
}
- return len(me.Pieces) / 20
+ return len(info.Pieces) / 20
}
-func (me *InfoEx) Piece(i int) Piece {
- return Piece{me, i}
+func (info *InfoEx) Piece(i int) Piece {
+ return Piece{info, i}
}
-func (i *Info) IsDir() bool {
- return len(i.Files) != 0
+func (info *Info) IsDir() bool {
+ return len(info.Files) != 0
}
// The files field, converted up from the old single-file in the parent info
// dict if necessary. This is a helper to avoid having to conditionally handle
// single and multi-file torrent infos.
-func (i *Info) UpvertedFiles() []FileInfo {
- if len(i.Files) == 0 {
+func (info *Info) UpvertedFiles() []FileInfo {
+ if len(info.Files) == 0 {
return []FileInfo{{
- Length: i.Length,
+ Length: info.Length,
// Callers should determine that Info.Name is the basename, and
// thus a regular file.
Path: nil,
}}
}
- return i.Files
+ return info.Files
}
// The info dictionary with its hash and raw bytes exposed, as these are
_ bencode.Unmarshaler = &InfoEx{}
)
-func (this *InfoEx) UnmarshalBencode(data []byte) error {
- this.Bytes = append(make([]byte, 0, len(data)), data...)
+func (ie *InfoEx) UnmarshalBencode(data []byte) error {
+ ie.Bytes = append(make([]byte, 0, len(data)), data...)
h := sha1.New()
- _, err := h.Write(this.Bytes)
+ _, err := h.Write(ie.Bytes)
if err != nil {
panic(err)
}
- this.Hash = new(Hash)
- missinggo.CopyExact(this.Hash, h.Sum(nil))
- return bencode.Unmarshal(data, &this.Info)
+ ie.Hash = new(Hash)
+ missinggo.CopyExact(ie.Hash, h.Sum(nil))
+ return bencode.Unmarshal(data, &ie.Info)
}
-func (this InfoEx) MarshalBencode() ([]byte, error) {
- if this.Bytes != nil {
- return this.Bytes, nil
+func (ie InfoEx) MarshalBencode() ([]byte, error) {
+ if ie.Bytes != nil {
+ return ie.Bytes, nil
}
- return bencode.Marshal(&this.Info)
+ return bencode.Marshal(&ie.Info)
}
type MetaInfo struct {
_ bencode.Unmarshaler = new(Node)
)
-func (me *Node) UnmarshalBencode(b []byte) (err error) {
+func (n *Node) UnmarshalBencode(b []byte) (err error) {
var iface interface{}
err = bencode.Unmarshal(b, &iface)
if err != nil {
}
switch v := iface.(type) {
case string:
- *me = Node(v)
+ *n = Node(v)
case []interface{}:
func() {
defer func() {
err = r.(error)
}
}()
- *me = Node(net.JoinHostPort(v[0].(string), strconv.FormatInt(v[1].(int64), 10)))
+ *n = Node(net.JoinHostPort(v[0].(string), strconv.FormatInt(v[1].(int64), 10)))
}()
default:
err = fmt.Errorf("unsupported type: %T", iface)
i int
}
-func (me Piece) Length() int64 {
- if me.i == me.Info.NumPieces()-1 {
- return me.Info.TotalLength() - int64(me.i)*me.Info.PieceLength
+func (p Piece) Length() int64 {
+ if p.i == p.Info.NumPieces()-1 {
+ return p.Info.TotalLength() - int64(p.i)*p.Info.PieceLength
}
- return me.Info.PieceLength
+ return p.Info.PieceLength
}
-func (me Piece) Offset() int64 {
- return int64(me.i) * me.Info.PieceLength
+func (p Piece) Offset() int64 {
+ return int64(p.i) * p.Info.PieceLength
}
-func (me Piece) Hash() (ret Hash) {
- missinggo.CopyExact(&ret, me.Info.Pieces[me.i*20:(me.i+1)*20])
+func (p Piece) Hash() (ret Hash) {
+ missinggo.CopyExact(&ret, p.Info.Pieces[p.i*20:(p.i+1)*20])
return
}
*mmap.MMap
}
-func (me segment) Size() int64 {
- return int64(len(*me.MMap))
+func (s segment) Size() int64 {
+ return int64(len(*s.MMap))
}
type MMapSpan struct {
span
}
-func (me *MMapSpan) Append(mmap mmap.MMap) {
- me.span = append(me.span, segment{&mmap})
+func (ms *MMapSpan) Append(mmap mmap.MMap) {
+ ms.span = append(ms.span, segment{&mmap})
}
-func (me MMapSpan) Close() error {
- for _, mMap := range me.span {
+func (ms MMapSpan) Close() error {
+ for _, mMap := range ms.span {
err := mMap.(segment).Unmap()
if err != nil {
log.Print(err)
return nil
}
-func (me MMapSpan) Size() (ret int64) {
- for _, seg := range me.span {
+func (ms MMapSpan) Size() (ret int64) {
+ for _, seg := range ms.span {
ret += seg.Size()
}
return
}
-func (me MMapSpan) ReadAt(p []byte, off int64) (n int, err error) {
- me.ApplyTo(off, func(intervalOffset int64, interval sizer) (stop bool) {
+func (ms MMapSpan) ReadAt(p []byte, off int64) (n int, err error) {
+ ms.ApplyTo(off, func(intervalOffset int64, interval sizer) (stop bool) {
_n := copy(p, (*interval.(segment).MMap)[intervalOffset:])
p = p[_n:]
n += _n
return
}
-func (me MMapSpan) WriteAt(p []byte, off int64) (n int, err error) {
- me.ApplyTo(off, func(iOff int64, i sizer) (stop bool) {
+func (ms MMapSpan) WriteAt(p []byte, off int64) (n int, err error) {
+ ms.ApplyTo(off, func(iOff int64, i sizer) (stop bool) {
mMap := i.(segment)
_n := copy((*mMap.MMap)[iOff:], p)
// err = mMap.Sync(gommap.MS_ASYNC)
type span []sizer
-func (me span) ApplyTo(off int64, f func(int64, sizer) (stop bool)) {
- for _, interval := range me {
+func (s span) ApplyTo(off int64, f func(int64, sizer) (stop bool)) {
+ for _, interval := range s {
iSize := interval.Size()
if off >= iSize {
off -= iSize
r io.Reader
}
-func (me *cipherReader) Read(b []byte) (n int, err error) {
+func (cr *cipherReader) Read(b []byte) (n int, err error) {
be := make([]byte, len(b))
- n, err = me.r.Read(be)
- me.c.XORKeyStream(b[:n], be[:n])
+ n, err = cr.r.Read(be)
+ cr.c.XORKeyStream(b[:n], be[:n])
return
}
w io.Writer
}
-func (me *cipherWriter) Write(b []byte) (n int, err error) {
+func (cr *cipherWriter) Write(b []byte) (n int, err error) {
be := make([]byte, len(b))
- me.c.XORKeyStream(be, b)
- n, err = me.w.Write(be)
+ cr.c.XORKeyStream(be, b)
+ n, err = cr.w.Write(be)
if n != len(be) {
// The cipher will have advanced beyond the callers stream position.
// We can't use the cipher anymore.
- me.c = nil
+ cr.c = nil
}
return
}
n int64
}
-func (me *trackReader) Read(b []byte) (n int, err error) {
- n, err = me.r.Read(b)
- me.n += int64(n)
+func (tr *trackReader) Read(b []byte) (n int, err error) {
+ n, err = tr.r.Read(b)
+ tr.n += int64(n)
return
}
type piecePriority byte
-func (me *piecePriority) Raise(maybe piecePriority) {
- if maybe > *me {
- *me = maybe
+func (pp *piecePriority) Raise(maybe piecePriority) {
+ if maybe > *pp {
+ *pp = maybe
}
}
}
}
-func (me *fileStorage) OpenTorrent(info *metainfo.InfoEx) (Torrent, error) {
- return fileTorrentStorage{me}, nil
+func (fs *fileStorage) OpenTorrent(info *metainfo.InfoEx) (Torrent, error) {
+ return fileTorrentStorage{fs}, nil
}
type fileTorrentStorage struct {
*fileStorage
}
-func (me *fileStorage) Piece(p metainfo.Piece) Piece {
+func (fs *fileStorage) Piece(p metainfo.Piece) Piece {
_io := &fileStorageTorrent{
p.Info,
- me.baseDir,
+ fs.baseDir,
}
return &fileStoragePiece{
- me,
+ fs,
p,
missinggo.NewSectionWriter(_io, p.Offset(), p.Length()),
io.NewSectionReader(_io, p.Offset(), p.Length()),
}
}
-func (me *fileStorage) Close() error {
+func (fs *fileStorage) Close() error {
return nil
}
io.ReaderAt
}
-func (me *fileStoragePiece) GetIsComplete() bool {
- return me.completed[me.p.Hash()]
+func (fs *fileStoragePiece) GetIsComplete() bool {
+ return fs.completed[fs.p.Hash()]
}
-func (me *fileStoragePiece) MarkComplete() error {
- if me.completed == nil {
- me.completed = make(map[[20]byte]bool)
+func (fs *fileStoragePiece) MarkComplete() error {
+ if fs.completed == nil {
+ fs.completed = make(map[[20]byte]bool)
}
- me.completed[me.p.Hash()] = true
+ fs.completed[fs.p.Hash()] = true
return nil
}
}
// Returns EOF on short or missing file.
-func (me *fileStorageTorrent) readFileAt(fi metainfo.FileInfo, b []byte, off int64) (n int, err error) {
- f, err := os.Open(me.fileInfoName(fi))
+func (fst *fileStorageTorrent) readFileAt(fi metainfo.FileInfo, b []byte, off int64) (n int, err error) {
+ f, err := os.Open(fst.fileInfoName(fi))
if os.IsNotExist(err) {
// File missing is treated the same as a short file.
err = io.EOF
}
// Only returns EOF at the end of the torrent. Premature EOF is ErrUnexpectedEOF.
-func (me *fileStorageTorrent) ReadAt(b []byte, off int64) (n int, err error) {
- for _, fi := range me.info.UpvertedFiles() {
+func (fst *fileStorageTorrent) ReadAt(b []byte, off int64) (n int, err error) {
+ for _, fi := range fst.info.UpvertedFiles() {
for off < fi.Length {
- n1, err1 := me.readFileAt(fi, b, off)
+ n1, err1 := fst.readFileAt(fi, b, off)
n += n1
off += int64(n1)
b = b[n1:]
return
}
-func (me *fileStorageTorrent) WriteAt(p []byte, off int64) (n int, err error) {
- for _, fi := range me.info.UpvertedFiles() {
+func (fst *fileStorageTorrent) WriteAt(p []byte, off int64) (n int, err error) {
+ for _, fi := range fst.info.UpvertedFiles() {
if off >= fi.Length {
off -= fi.Length
continue
if int64(n1) > fi.Length-off {
n1 = int(fi.Length - off)
}
- name := me.fileInfoName(fi)
+ name := fst.fileInfoName(fi)
os.MkdirAll(filepath.Dir(name), 0770)
var f *os.File
f, err = os.OpenFile(name, os.O_WRONLY|os.O_CREATE, 0660)
return
}
-func (me *fileStorageTorrent) fileInfoName(fi metainfo.FileInfo) string {
- return filepath.Join(append([]string{me.baseDir, me.info.Name}, fi.Path...)...)
+func (fst *fileStorageTorrent) fileInfoName(fi metainfo.FileInfo) string {
+ return filepath.Join(append([]string{fst.baseDir, fst.info.Name}, fi.Path...)...)
}
}
}
-func (me *mmapStorage) OpenTorrent(info *metainfo.InfoEx) (t Torrent, err error) {
- span, err := MMapTorrent(&info.Info, me.baseDir)
+func (s *mmapStorage) OpenTorrent(info *metainfo.InfoEx) (t Torrent, err error) {
+ span, err := MMapTorrent(&info.Info, s.baseDir)
t = &mmapTorrentStorage{
span: span,
}
completed map[metainfo.Hash]bool
}
-func (me *mmapTorrentStorage) Piece(p metainfo.Piece) Piece {
+func (ts *mmapTorrentStorage) Piece(p metainfo.Piece) Piece {
return mmapStoragePiece{
- storage: me,
+ storage: ts,
p: p,
- ReaderAt: io.NewSectionReader(me.span, p.Offset(), p.Length()),
- WriterAt: missinggo.NewSectionWriter(me.span, p.Offset(), p.Length()),
+ ReaderAt: io.NewSectionReader(ts.span, p.Offset(), p.Length()),
+ WriterAt: missinggo.NewSectionWriter(ts.span, p.Offset(), p.Length()),
}
}
-func (me *mmapTorrentStorage) Close() error {
- me.span.Close()
+func (ts *mmapTorrentStorage) Close() error {
+ ts.span.Close()
return nil
}
io.WriterAt
}
-func (me mmapStoragePiece) GetIsComplete() bool {
- return me.storage.completed[me.p.Hash()]
+func (sp mmapStoragePiece) GetIsComplete() bool {
+ return sp.storage.completed[sp.p.Hash()]
}
-func (me mmapStoragePiece) MarkComplete() error {
- if me.storage.completed == nil {
- me.storage.completed = make(map[metainfo.Hash]bool)
+func (sp mmapStoragePiece) MarkComplete() error {
+ if sp.storage.completed == nil {
+ sp.storage.completed = make(map[metainfo.Hash]bool)
}
- me.storage.completed[me.p.Hash()] = true
+ sp.storage.completed[sp.p.Hash()] = true
return nil
}
s *pieceFileStorage
}
-func (me *pieceFileStorage) OpenTorrent(info *metainfo.InfoEx) (Torrent, error) {
- return &pieceFileTorrentStorage{me}, nil
+func (s *pieceFileStorage) OpenTorrent(info *metainfo.InfoEx) (Torrent, error) {
+ return &pieceFileTorrentStorage{s}, nil
}
-func (me *pieceFileTorrentStorage) Close() error {
+func (s *pieceFileTorrentStorage) Close() error {
return nil
}
-func (me *pieceFileTorrentStorage) Piece(p metainfo.Piece) Piece {
- return pieceFileTorrentStoragePiece{me, p, me.s.fs}
+func (s *pieceFileTorrentStorage) Piece(p metainfo.Piece) Piece {
+ return pieceFileTorrentStoragePiece{s, p, s.s.fs}
}
type pieceFileTorrentStoragePiece struct {
fs missinggo.FileStore
}
-func (me pieceFileTorrentStoragePiece) completedPath() string {
- return path.Join("completed", me.p.Hash().HexString())
+func (s pieceFileTorrentStoragePiece) completedPath() string {
+ return path.Join("completed", s.p.Hash().HexString())
}
-func (me pieceFileTorrentStoragePiece) incompletePath() string {
- return path.Join("incomplete", me.p.Hash().HexString())
+func (s pieceFileTorrentStoragePiece) incompletePath() string {
+ return path.Join("incomplete", s.p.Hash().HexString())
}
-func (me pieceFileTorrentStoragePiece) GetIsComplete() bool {
- fi, err := me.fs.Stat(me.completedPath())
- return err == nil && fi.Size() == me.p.Length()
+func (s pieceFileTorrentStoragePiece) GetIsComplete() bool {
+ fi, err := s.fs.Stat(s.completedPath())
+ return err == nil && fi.Size() == s.p.Length()
}
-func (me pieceFileTorrentStoragePiece) MarkComplete() error {
- return me.fs.Rename(me.incompletePath(), me.completedPath())
+func (s pieceFileTorrentStoragePiece) MarkComplete() error {
+ return s.fs.Rename(s.incompletePath(), s.completedPath())
}
-func (me pieceFileTorrentStoragePiece) openFile() (f missinggo.File, err error) {
- f, err = me.fs.OpenFile(me.completedPath(), os.O_RDONLY)
+func (s pieceFileTorrentStoragePiece) openFile() (f missinggo.File, err error) {
+ f, err = s.fs.OpenFile(s.completedPath(), os.O_RDONLY)
if err == nil {
var fi os.FileInfo
fi, err = f.Stat()
- if err == nil && fi.Size() == me.p.Length() {
+ if err == nil && fi.Size() == s.p.Length() {
return
}
f.Close()
} else if !os.IsNotExist(err) {
return
}
- f, err = me.fs.OpenFile(me.incompletePath(), os.O_RDONLY)
+ f, err = s.fs.OpenFile(s.incompletePath(), os.O_RDONLY)
if os.IsNotExist(err) {
err = io.ErrUnexpectedEOF
}
return
}
-func (me pieceFileTorrentStoragePiece) ReadAt(b []byte, off int64) (n int, err error) {
- f, err := me.openFile()
+func (s pieceFileTorrentStoragePiece) ReadAt(b []byte, off int64) (n int, err error) {
+ f, err := s.openFile()
if err != nil {
return
}
defer f.Close()
- missinggo.LimitLen(&b, me.p.Length()-off)
+ missinggo.LimitLen(&b, s.p.Length()-off)
n, err = f.ReadAt(b, off)
off += int64(n)
- if off >= me.p.Length() {
+ if off >= s.p.Length() {
err = io.EOF
} else if err == io.EOF {
err = io.ErrUnexpectedEOF
return
}
-func (me pieceFileTorrentStoragePiece) WriteAt(b []byte, off int64) (n int, err error) {
- if me.GetIsComplete() {
+func (s pieceFileTorrentStoragePiece) WriteAt(b []byte, off int64) (n int, err error) {
+ if s.GetIsComplete() {
err = errors.New("piece completed")
return
}
- f, err := me.fs.OpenFile(me.incompletePath(), os.O_WRONLY|os.O_CREATE)
+ f, err := s.fs.OpenFile(s.incompletePath(), os.O_WRONLY|os.O_CREATE)
if err != nil {
return
}
defer f.Close()
- missinggo.LimitLen(&b, me.p.Length()-off)
+ missinggo.LimitLen(&b, s.p.Length()-off)
return f.WriteAt(b, off)
}
return t.completedPieces.Len() == t.numPieces()
}
-func (me *Torrent) haveAnyPieces() bool {
- for i := range me.pieces {
- if me.pieceComplete(i) {
+func (t *Torrent) haveAnyPieces() bool {
+ for i := range t.pieces {
+ if t.pieceComplete(i) {
return true
}
}
return int(cs.Begin / chunkSize)
}
-// TODO: This should probably be called wantPiece.
-func (t *Torrent) wantChunk(r request) bool {
- if !t.wantPiece(int(r.Index)) {
+func (t *Torrent) wantPiece(r request) bool {
+ if !t.wantPieceIndex(int(r.Index)) {
return false
}
if t.pieces[r.Index].pendingChunk(r.chunkSpec, t.chunkSize) {
return false
}
-// TODO: This should be called wantPieceIndex.
-func (t *Torrent) wantPiece(index int) bool {
+func (t *Torrent) wantPieceIndex(index int) bool {
if !t.haveInfo() {
return false
}
return
}
-func (me *httpClient) Announce(ar *AnnounceRequest) (ret AnnounceResponse, err error) {
+func (c *httpClient) Announce(ar *AnnounceRequest) (ret AnnounceResponse, err error) {
// retain query parameters from announce URL
- q := me.url.Query()
+ q := c.url.Query()
q.Set("info_hash", string(ar.InfoHash[:]))
q.Set("peer_id", string(ar.PeerId[:]))
q.Set("compact", "1")
// According to https://wiki.vuze.com/w/Message_Stream_Encryption.
q.Set("supportcrypto", "1")
- var reqURL url.URL = me.url
+ var reqURL url.URL = c.url
reqURL.RawQuery = q.Encode()
resp, err := http.Get(reqURL.String())
if err != nil {
return
}
-func (me *httpClient) Connect() error {
+func (c *httpClient) Connect() error {
// HTTP trackers do not require a connecting handshake.
return nil
}
-func (me *httpClient) String() string {
- return me.URL()
+func (c *httpClient) String() string {
+ return c.URL()
}
-func (me *httpClient) URL() string {
- return me.url.String()
+func (c *httpClient) URL() string {
+ return c.url.String()
}
return
}
-func (me *server) respond(addr net.Addr, rh ResponseHeader, parts ...interface{}) (err error) {
+func (s *server) respond(addr net.Addr, rh ResponseHeader, parts ...interface{}) (err error) {
b, err := marshal(append([]interface{}{rh}, parts...)...)
if err != nil {
return
}
- _, err = me.pc.WriteTo(b, addr)
+ _, err = s.pc.WriteTo(b, addr)
return
}
-func (me *server) newConn() (ret int64) {
+func (s *server) newConn() (ret int64) {
ret = rand.Int63()
- if me.conns == nil {
- me.conns = make(map[int64]struct{})
+ if s.conns == nil {
+ s.conns = make(map[int64]struct{})
}
- me.conns[ret] = struct{}{}
+ s.conns[ret] = struct{}{}
return
}
-func (me *server) serveOne() (err error) {
+func (s *server) serveOne() (err error) {
b := make([]byte, 0x10000)
- n, addr, err := me.pc.ReadFrom(b)
+ n, addr, err := s.pc.ReadFrom(b)
if err != nil {
return
}
if h.ConnectionId != connectRequestConnectionId {
return
}
- connId := me.newConn()
- err = me.respond(addr, ResponseHeader{
+ connId := s.newConn()
+ err = s.respond(addr, ResponseHeader{
ActionConnect,
h.TransactionId,
}, ConnectionResponse{
})
return
case ActionAnnounce:
- if _, ok := me.conns[h.ConnectionId]; !ok {
- me.respond(addr, ResponseHeader{
+ if _, ok := s.conns[h.ConnectionId]; !ok {
+ s.respond(addr, ResponseHeader{
TransactionId: h.TransactionId,
Action: ActionError,
}, []byte("not connected"))
if err != nil {
return
}
- t := me.t[ar.InfoHash]
+ t := s.t[ar.InfoHash]
b, err = t.Peers.MarshalBinary()
if err != nil {
panic(err)
}
- err = me.respond(addr, ResponseHeader{
+ err = s.respond(addr, ResponseHeader{
TransactionId: h.TransactionId,
Action: ActionAnnounce,
}, AnnounceResponseHeader{
return
default:
err = fmt.Errorf("unhandled action: %d", h.Action)
- me.respond(addr, ResponseHeader{
+ s.respond(addr, ResponseHeader{
TransactionId: h.TransactionId,
Action: ActionError,
}, []byte("unhandled action"))
type AnnounceEvent int32
-func (me AnnounceEvent) String() string {
+func (e AnnounceEvent) String() string {
// See BEP 3, "event".
- return []string{"empty", "completed", "started", "stopped"}[me]
+ return []string{"empty", "completed", "started", "stopped"}[e]
}
type Peer struct {
url url.URL
}
-func (me *udpClient) Close() error {
- if me.socket != nil {
- return me.socket.Close()
+func (c *udpClient) Close() error {
+ if c.socket != nil {
+ return c.socket.Close()
}
return nil
}
dirState map[metainfo.Hash]entity
}
-func (me *Instance) Close() {
- me.w.Close()
+func (i *Instance) Close() {
+ i.w.Close()
}
-func (me *Instance) handleEvents() {
- defer close(me.Events)
- for e := range me.w.Events {
+func (i *Instance) handleEvents() {
+ defer close(i.Events)
+ for e := range i.w.Events {
log.Printf("event: %s", e)
if e.Op == fsnotify.Write {
// TODO: Special treatment as an existing torrent may have changed.
} else {
- me.refresh()
+ i.refresh()
}
}
}
-func (me *Instance) handleErrors() {
- for err := range me.w.Errors {
+func (i *Instance) handleErrors() {
+ for err := range i.w.Errors {
log.Printf("error in torrent directory watcher: %s", err)
}
}
return
}
-func (me *Instance) torrentRemoved(ih metainfo.Hash) {
- me.Events <- Event{
+func (i *Instance) torrentRemoved(ih metainfo.Hash) {
+ i.Events <- Event{
InfoHash: ih,
Change: Removed,
}
}
-func (me *Instance) torrentAdded(e entity) {
- me.Events <- Event{
+func (i *Instance) torrentAdded(e entity) {
+ i.Events <- Event{
InfoHash: e.Hash,
Change: Added,
MagnetURI: e.MagnetURI,
}
}
-func (me *Instance) refresh() {
- _new := scanDir(me.dirName)
- old := me.dirState
+func (i *Instance) refresh() {
+ _new := scanDir(i.dirName)
+ old := i.dirState
for ih, _ := range old {
_, ok := _new[ih]
if !ok {
- me.torrentRemoved(ih)
+ i.torrentRemoved(ih)
}
}
for ih, newE := range _new {
if newE == oldE {
continue
}
- me.torrentRemoved(ih)
+ i.torrentRemoved(ih)
}
- me.torrentAdded(newE)
+ i.torrentAdded(newE)
}
- me.dirState = _new
+ i.dirState = _new
}
func New(dirName string) (i *Instance, err error) {
)
// This allows bencode.Unmarshal to do better than a string or []byte.
-func (me *CompactIPv4Peers) UnmarshalBencode(b []byte) (err error) {
+func (cps *CompactIPv4Peers) UnmarshalBencode(b []byte) (err error) {
var bb []byte
err = bencode.Unmarshal(b, &bb)
if err != nil {
return
}
- *me, err = UnmarshalIPv4CompactPeers(bb)
+ *cps, err = UnmarshalIPv4CompactPeers(bb)
return
}
-func (me CompactIPv4Peers) MarshalBinary() (ret []byte, err error) {
- ret = make([]byte, len(me)*6)
- for i, cp := range me {
+func (cps CompactIPv4Peers) MarshalBinary() (ret []byte, err error) {
+ ret = make([]byte, len(cps)*6)
+ for i, cp := range cps {
copy(ret[6*i:], cp.IP.To4())
binary.BigEndian.PutUint16(ret[6*i+4:], uint16(cp.Port))
}
_ bencode.Unmarshaler = &CompactPeer{}
)
-func (me CompactPeer) MarshalBencode() (ret []byte, err error) {
- ip := me.IP
+func (cp CompactPeer) MarshalBencode() (ret []byte, err error) {
+ ip := cp.IP
if ip4 := ip.To4(); ip4 != nil {
ip = ip4
}
ret = make([]byte, len(ip)+2)
copy(ret, ip)
- binary.BigEndian.PutUint16(ret[len(ip):], uint16(me.Port))
+ binary.BigEndian.PutUint16(ret[len(ip):], uint16(cp.Port))
return bencode.Marshal(ret)
}
-func (me *CompactPeer) UnmarshalBinary(b []byte) error {
+func (cp *CompactPeer) UnmarshalBinary(b []byte) error {
switch len(b) {
case 18:
- me.IP = make([]byte, 16)
+ cp.IP = make([]byte, 16)
case 6:
- me.IP = make([]byte, 4)
+ cp.IP = make([]byte, 4)
default:
return fmt.Errorf("bad compact peer string: %q", b)
}
- copy(me.IP, b)
- b = b[len(me.IP):]
- me.Port = int(binary.BigEndian.Uint16(b))
+ copy(cp.IP, b)
+ b = b[len(cp.IP):]
+ cp.Port = int(binary.BigEndian.Uint16(b))
return nil
}
-func (me *CompactPeer) UnmarshalBencode(b []byte) (err error) {
+func (cp *CompactPeer) UnmarshalBencode(b []byte) (err error) {
var _b []byte
err = bencode.Unmarshal(b, &_b)
if err != nil {
return
}
- return me.UnmarshalBinary(_b)
+ return cp.UnmarshalBinary(_b)
}
func UnmarshalIPv4CompactPeers(b []byte) (ret []CompactPeer, err error) {
cl *Client
}
-func (me *worstConns) Len() int { return len(me.c) }
-func (me *worstConns) Swap(i, j int) { me.c[i], me.c[j] = me.c[j], me.c[i] }
+func (wc *worstConns) Len() int { return len(wc.c) }
+func (wc *worstConns) Swap(i, j int) { wc.c[i], wc.c[j] = wc.c[j], wc.c[i] }
-func (me *worstConns) Pop() (ret interface{}) {
- old := me.c
+func (wc *worstConns) Pop() (ret interface{}) {
+ old := wc.c
n := len(old)
ret = old[n-1]
- me.c = old[:n-1]
+ wc.c = old[:n-1]
return
}
-func (me *worstConns) Push(x interface{}) {
- me.c = append(me.c, x.(*connection))
+func (wc *worstConns) Push(x interface{}) {
+ wc.c = append(wc.c, x.(*connection))
}
type worstConnsSortKey struct {
connected time.Time
}
-func (me worstConnsSortKey) Less(other worstConnsSortKey) bool {
- if me.useful != other.useful {
- return !me.useful
+func (wc worstConnsSortKey) Less(other worstConnsSortKey) bool {
+ if wc.useful != other.useful {
+ return !wc.useful
}
- if !me.lastHelpful.Equal(other.lastHelpful) {
- return me.lastHelpful.Before(other.lastHelpful)
+ if !wc.lastHelpful.Equal(other.lastHelpful) {
+ return wc.lastHelpful.Before(other.lastHelpful)
}
- return me.connected.Before(other.connected)
+ return wc.connected.Before(other.connected)
}
-func (me *worstConns) key(i int) (key worstConnsSortKey) {
- c := me.c[i]
- key.useful = me.cl.usefulConn(me.t, c)
- if me.cl.seeding(me.t) {
+func (wc *worstConns) key(i int) (key worstConnsSortKey) {
+ c := wc.c[i]
+ key.useful = wc.cl.usefulConn(wc.t, c)
+ if wc.cl.seeding(wc.t) {
key.lastHelpful = c.lastChunkSent
}
// Intentionally consider the last time a chunk was received when seeding,
return
}
-func (me worstConns) Less(i, j int) bool {
- return me.key(i).Less(me.key(j))
+func (wc worstConns) Less(i, j int) bool {
+ return wc.key(i).Less(wc.key(j))
}