doc/admin.texi | 37 +++++++++++++++++++++++++++++-------- doc/download.texi | 4 ++++ doc/news.ru.texi | 13 +++++++++++++ doc/news.texi | 13 +++++++++++++ doc/nncp.html.do | 2 +- doc/nncp.info.do | 1 - doc/style.css | 2 -- ports/nncp/Makefile | 4 ++-- src/call.go | 50 +++++++++++++++++++++++++++++++++++++------------- src/check.go | 13 +++++++++---- src/cmd/nncp-bundle/main.go | 143 ++++++++++++++++++++++++++++++++++++++++------------- src/cmd/nncp-call/main.go | 2 +- src/cmd/nncp-caller/main.go | 60 ++++++++++++++++++++++++++++++++++++++++-------------- src/cmd/nncp-cfgenc/main.go | 2 +- src/cmd/nncp-cfgmin/main.go | 2 +- src/cmd/nncp-cfgnew/main.go | 2 +- src/cmd/nncp-check/main.go | 2 +- src/cmd/nncp-cronexpr/main.go | 2 +- src/cmd/nncp-daemon/main.go | 39 +++++++++++++++++++++++++++++++++------ src/cmd/nncp-exec/main.go | 2 +- src/cmd/nncp-file/main.go | 2 +- src/cmd/nncp-freq/main.go | 2 +- src/cmd/nncp-log/main.go | 2 +- src/cmd/nncp-pkt/main.go | 2 +- src/cmd/nncp-reass/main.go | 80 ++++++++++++++++++++++++++++++++++++----------------- src/cmd/nncp-rm/main.go | 27 +++++++++++++++++++-------- src/cmd/nncp-stat/main.go | 2 +- src/cmd/nncp-toss/main.go | 2 +- src/cmd/nncp-xfer/main.go | 194 ++++++++++++++++++++++++++++++++++++++++------------- src/ctx.go | 8 ++++++-- src/go.mod | 2 +- src/humanizer.go | 259 +++-------------------------------------------------- src/jobs.go | 29 +++++++++++++++++++++++------ src/lockdir.go | 9 ++++++--- src/log.go | 18 ++++++------------ src/nice.go | 14 ++++++++++++++ src/nncp.go | 2 +- src/progress.go | 2 +- src/sortbynice.go | 15 --------------- src/sp.go | 663 ++++++++++++++++++++++++++++++++++++++++++----------- src/tmp.go | 13 +++++++++++-- src/toss.go | 341 +++++++++++++++++++++++++++++++++++++++++++++-------- src/tx.go | 99 ++++++++++++++++++++++++++++++++++++++++++++--------- diff --git a/doc/admin.texi b/doc/admin.texi index e32f2d4e7e61c7c6cdeaf5d75b9b57ce696ef07161cf3bae5fe28809ba4ed79d..510f0ffc4e901d8365b714afaac2366aec2d039eb68ec86f7c75ef48d8f7c506 100644 --- a/doc/admin.texi +++ b/doc/admin.texi @@ -68,24 +68,22 @@ daemontools} for that task to run them under probably existing @verb{|uucp|} user: @example -# mkdir -p /var/service/nncp-toss/log -# chmod 755 /var/service/nncp-toss/log /var/service/nncp-toss -# cd /var/service/nncp-toss +# mkdir -p /var/service/.nncp-toss/log +# cd /var/service/.nncp-toss -# cat > run_ < run <&1 exec setuidgid uucp /usr/local/bin/nncp-toss -cycle 10 EOF -# chmod 755 run_ -# cat > log/run_ < log/run < run < log/run < diff --git a/ports/nncp/Makefile b/ports/nncp/Makefile index 0734a1a50646ce5dd8e8225bcdd999250d8e060e025bf7faea9a45ea003b9918..9ac17a039d57798670b8d85b90d1a51f4291dd558004a470116f5b823a47d53b 100644 --- a/ports/nncp/Makefile +++ b/ports/nncp/Makefile @@ -1,7 +1,7 @@ # $FreeBSD: $ PORTNAME= nncp -DISTVERSION= 5.7.0 +DISTVERSION= 6.1.0 CATEGORIES= net MASTER_SITES= http://www.nncpgo.org/download/ @@ -11,7 +11,7 @@ LICENSE= GPLv3 LICENSE_FILE= ${WRKSRC}/COPYING -USES= go:no_targets tar:xz +USES= go:modules,no_targets tar:xz USE_RC_SUBR= nncp-caller nncp-daemon nncp-toss diff --git a/src/call.go b/src/call.go index f4e6055e595c06e87c56561d1c47349f353705b367f0bc0ad59550b6391fdfd7..55ddd8b75f0fb80c2e4eddfcf793e1b01e68be1e81d578da17e8cf51a60f1a4d 100644 --- a/src/call.go +++ b/src/call.go @@ -18,9 +18,11 @@ package nncp import ( + "fmt" "net" "time" + "github.com/dustin/go-humanize" "github.com/gorhill/cronexpr" ) @@ -57,7 +59,9 @@ onlyPkts map[[32]byte]bool, ) (isGood bool) { for _, addr := range addrs { les := LEs{{"Node", node.Id}, {"Addr", addr}} - ctx.LogD("call", les, "dialing") + ctx.LogD("calling", les, func(les LEs) string { + return fmt.Sprintf("Calling %s (%s)", node.Name, addr) + }) var conn ConnDeadlined var err error if addr[0] == '|' { @@ -66,10 +70,14 @@ } else { conn, err = net.Dial("tcp", addr) } if err != nil { - ctx.LogD("call", append(les, LE{"Err", err}), "dialing") + ctx.LogD("calling", append(les, LE{"Err", err}), func(les LEs) string { + return fmt.Sprintf("Calling %s (%s)", node.Name, addr) + }) continue } - ctx.LogD("call", les, "connected") + ctx.LogD("call-connected", les, func(les LEs) string { + return fmt.Sprintf("Connected %s (%s)", node.Name, addr) + }) state := SPState{ Ctx: ctx, Node: node, @@ -84,21 +92,37 @@ NoCK: noCK, onlyPkts: onlyPkts, } if err = state.StartI(conn); err == nil { - ctx.LogI("call-start", les, "connected") + ctx.LogI("call-started", les, func(les LEs) string { + return fmt.Sprintf("Connection to %s (%s)", node.Name, addr) + }) state.Wait() - ctx.LogI("call-finish", LEs{ - {"Node", state.Node.Id}, - {"Duration", int64(state.Duration.Seconds())}, - {"RxBytes", state.RxBytes}, - {"TxBytes", state.TxBytes}, - {"RxSpeed", state.RxSpeed}, - {"TxSpeed", state.TxSpeed}, - }, "") + ctx.LogI("call-finished", append( + les, + LE{"Duration", int64(state.Duration.Seconds())}, + LE{"RxBytes", state.RxBytes}, + LE{"RxSpeed", state.RxSpeed}, + LE{"TxBytes", state.TxBytes}, + LE{"TxSpeed", state.TxSpeed}, + ), func(les LEs) string { + return fmt.Sprintf( + "Finished call with %s (%d:%d:%d): %s received (%s/sec), %s transferred (%s/sec)", + node.Name, + int(state.Duration.Hours()), + int(state.Duration.Minutes()), + int(state.Duration.Seconds()), + humanize.IBytes(uint64(state.RxBytes)), + humanize.IBytes(uint64(state.RxSpeed)), + humanize.IBytes(uint64(state.TxBytes)), + humanize.IBytes(uint64(state.TxSpeed)), + ) + }) isGood = true conn.Close() // #nosec G104 break } else { - ctx.LogE("call-start", les, err, "") + ctx.LogE("call-started", les, err, func(les LEs) string { + return fmt.Sprintf("Connection to %s (%s)", node.Name, addr) + }) conn.Close() // #nosec G104 } } diff --git a/src/check.go b/src/check.go index e039684719bf48e3b3862fbb4438d1136000c95356651ee5fc84933d6fb06681..721ac92868a8974e3a1e7b6da461da5c65061d1b041eeaa83b34d3af6a1526d1 100644 --- a/src/check.go +++ b/src/check.go @@ -21,6 +21,7 @@ import ( "bufio" "bytes" "errors" + "fmt" "io" "log" "os" @@ -45,26 +46,30 @@ func (ctx *Ctx) checkXxIsBad(nodeId *NodeId, xx TRxTx) bool { isBad := false for job := range ctx.Jobs(nodeId, xx) { + pktName := Base32Codec.EncodeToString(job.HshValue[:]) les := LEs{ {"XX", string(xx)}, {"Node", nodeId}, - {"Pkt", Base32Codec.EncodeToString(job.HshValue[:])}, + {"Pkt", pktName}, {"FullSize", job.Size}, + } + logMsg := func(les LEs) string { + return fmt.Sprintf("Checking: %s/%s/%s", nodeId, string(xx), pktName) } fd, err := os.Open(job.Path) if err != nil { - ctx.LogE("check", les, err, "") + ctx.LogE("checking", les, err, logMsg) return true } gut, err := Check(fd, job.HshValue[:], les, ctx.ShowPrgrs) fd.Close() // #nosec G104 if err != nil { - ctx.LogE("check", les, err, "") + ctx.LogE("checking", les, err, logMsg) return true } if !gut { isBad = true - ctx.LogE("check", les, errors.New("bad"), "") + ctx.LogE("checking", les, errors.New("bad"), logMsg) } } return isBad diff --git a/src/cmd/nncp-bundle/main.go b/src/cmd/nncp-bundle/main.go index a469120b641e817d5872b56fb0a5c40ec6b05f0ab076954b6edda1d30d45c3f4..c3e185a123ee9634a5d561adde05791485d8c7cc61877675bd3e04d0fb2135f1 100644 --- a/src/cmd/nncp-bundle/main.go +++ b/src/cmd/nncp-bundle/main.go @@ -33,7 +33,8 @@ "path/filepath" "strings" xdr "github.com/davecgh/go-xdr/xdr2" - "go.cypherpunks.ru/nncp/v5" + "github.com/dustin/go-humanize" + "go.cypherpunks.ru/nncp/v6" "golang.org/x/crypto/blake2b" ) @@ -119,16 +120,22 @@ var pktName string bufStdout := bufio.NewWriter(os.Stdout) tarWr := tar.NewWriter(bufStdout) for nodeId := range nodeIds { - les := nncp.LEs{ - {K: "XX", V: string(nncp.TTx)}, - {K: "Node", V: nodeId.String()}, - {K: "Pkt", V: "dummy"}, - } for job := range ctx.Jobs(&nodeId, nncp.TTx) { pktName = filepath.Base(job.Path) - les[len(les)-1].V = pktName + les := nncp.LEs{ + {K: "XX", V: string(nncp.TTx)}, + {K: "Node", V: nodeId.String()}, + {K: "Pkt", V: pktName}, + } if job.PktEnc.Nice > nice { - ctx.LogD("nncp-bundle", les, "too nice") + ctx.LogD("bundle-tx-too-nice", les, func(les nncp.LEs) string { + return fmt.Sprintf( + "Bundle transfer %s/tx/%s: too nice %s", + ctx.NodeName(&nodeId), + pktName, + nncp.NicenessFmt(job.PktEnc.Nice), + ) + }) continue } fd, err := os.Open(job.Path) @@ -183,7 +190,18 @@ } else if ctx.HdrUsage { os.Remove(job.Path + nncp.HdrSuffix) } } - ctx.LogI("nncp-bundle", append(les, nncp.LE{K: "Size", V: job.Size}), "") + ctx.LogI( + "bundle-tx", + append(les, nncp.LE{K: "Size", V: job.Size}), + func(les nncp.LEs) string { + return fmt.Sprintf( + "Bundle transfer, sent to node %s %s (%s)", + ctx.NodeName(&nodeId), + pktName, + humanize.IBytes(uint64(job.Size)), + ) + }, + ) } } if err = tarWr.Close(); err != nil { @@ -214,18 +232,25 @@ entry, err := tarR.Next() if err != nil { if err != io.EOF { ctx.LogD( - "nncp-bundle", + "bundle-rx-read-tar", nncp.LEs{{K: "XX", V: string(nncp.TRx)}, {K: "Err", V: err}}, - "error reading tar", + func(les nncp.LEs) string { + return "Bundle transfer rx: reading tar" + }, ) } continue } if entry.Typeflag != tar.TypeDir { ctx.LogD( - "nncp-bundle", - nncp.LEs{{K: "XX", V: string(nncp.TRx)}}, - "Expected NNCP/", + "bundle-rx-read-tar", + nncp.LEs{ + {K: "XX", V: string(nncp.TRx)}, + {K: "Err", V: errors.New("expected NNCP/")}, + }, + func(les nncp.LEs) string { + return "Bundle transfer rx: reading tar" + }, ) continue } @@ -233,47 +258,74 @@ entry, err = tarR.Next() if err != nil { if err != io.EOF { ctx.LogD( - "nncp-bundle", + "bundle-rx-read-tar", nncp.LEs{{K: "XX", V: string(nncp.TRx)}, {K: "Err", V: err}}, - "error reading tar", + func(les nncp.LEs) string { + return "Bundle transfer rx: reading tar" + }, ) } continue } les := nncp.LEs{{K: "XX", V: string(nncp.TRx)}, {K: "Pkt", V: entry.Name}} + logMsg := func(les nncp.LEs) string { + return "Bundle transfer rx/" + entry.Name + } if entry.Size < nncp.PktEncOverhead { - ctx.LogD("nncp-bundle", les, "Too small packet") + ctx.LogD("bundle-rx-too-small", les, func(les nncp.LEs) string { + return logMsg(les) + ": too small packet" + }) continue } if !ctx.IsEnoughSpace(entry.Size) { - ctx.LogE("nncp-bundle", les, errors.New("not enough spool space"), "") + ctx.LogE("bundle-rx", les, errors.New("not enough spool space"), logMsg) continue } pktName := filepath.Base(entry.Name) if _, err = nncp.Base32Codec.DecodeString(pktName); err != nil { - ctx.LogD("nncp-bundle", append(les, nncp.LE{K: "Err", V: "bad packet name"}), "") + ctx.LogD( + "bundle-rx", + append(les, nncp.LE{K: "Err", V: "bad packet name"}), + logMsg, + ) continue } if _, err = io.ReadFull(tarR, pktEncBuf); err != nil { - ctx.LogD("nncp-bundle", append(les, nncp.LE{K: "Err", V: err}), "read") + ctx.LogD( + "bundle-rx", + append(les, nncp.LE{K: "Err", V: err}), + logMsg, + ) continue } if _, err = xdr.Unmarshal(bytes.NewReader(pktEncBuf), &pktEnc); err != nil { - ctx.LogD("nncp-bundle", les, "Bad packet structure") + ctx.LogD( + "bundle-rx", + append(les, nncp.LE{K: "Err", V: "Bad packet structure"}), + logMsg, + ) continue } if pktEnc.Magic != nncp.MagicNNCPEv4 { - ctx.LogD("nncp-bundle", les, "Bad packet magic number") + ctx.LogD( + "bundle-rx", + append(les, nncp.LE{K: "Err", V: "Bad packet magic number"}), + logMsg, + ) continue } if pktEnc.Nice > nice { - ctx.LogD("nncp-bundle", les, "too nice") + ctx.LogD("bundle-rx-too-nice", les, func(les nncp.LEs) string { + return logMsg(les) + ": too nice" + }) continue } if *pktEnc.Sender == *ctx.SelfId && *doDelete { if len(nodeIds) > 0 { if _, exists := nodeIds[*pktEnc.Recipient]; !exists { - ctx.LogD("nncp-bundle", les, "Recipient is not requested") + ctx.LogD("bundle-tx-skip", les, func(les nncp.LEs) string { + return logMsg(les) + ": recipient is not requested" + }) continue } } @@ -283,9 +335,14 @@ {K: "XX", V: string(nncp.TTx)}, {K: "Node", V: nodeId32}, {K: "Pkt", V: pktName}, } + logMsg = func(les nncp.LEs) string { + return fmt.Sprintf("Bundle transfer %s/tx/%s", nodeId32, pktName) + } dstPath := filepath.Join(ctx.Spool, nodeId32, string(nncp.TTx), pktName) if _, err = os.Stat(dstPath); err != nil { - ctx.LogD("nncp-bundle", les, "Packet is already missing") + ctx.LogD("bundle-tx-missing", les, func(les nncp.LEs) string { + return logMsg(les) + ": packet is already missing" + }) continue } hsh, err := blake2b.New256(nil) @@ -303,7 +360,9 @@ ); err != nil { log.Fatalln("Error during copying:", err) } if nncp.Base32Codec.EncodeToString(hsh.Sum(nil)) == pktName { - ctx.LogI("nncp-bundle", les, "removed") + ctx.LogI("bundle-tx-removed", les, func(les nncp.LEs) string { + return logMsg(les) + ": removed" + }) if !*dryRun { os.Remove(dstPath) if ctx.HdrUsage { @@ -311,17 +370,21 @@ os.Remove(dstPath + nncp.HdrSuffix) } } } else { - ctx.LogE("nncp-bundle", les, errors.New("bad checksum"), "") + ctx.LogE("bundle-tx", les, errors.New("bad checksum"), logMsg) } continue } if *pktEnc.Recipient != *ctx.SelfId { - ctx.LogD("nncp-bundle", les, "Unknown recipient") + ctx.LogD("nncp-bundle", les, func(les nncp.LEs) string { + return logMsg(les) + ": unknown recipient" + }) continue } if len(nodeIds) > 0 { if _, exists := nodeIds[*pktEnc.Sender]; !exists { - ctx.LogD("nncp-bundle", les, "Sender is not requested") + ctx.LogD("bundle-rx-skip", les, func(les nncp.LEs) string { + return logMsg(les) + ": sender is not requested" + }) continue } } @@ -332,14 +395,21 @@ {K: "Node", V: sender}, {K: "Pkt", V: pktName}, {K: "FullSize", V: entry.Size}, } + logMsg = func(les nncp.LEs) string { + return fmt.Sprintf("Bundle transfer %s/rx/%s", sender, pktName) + } dstDirPath := filepath.Join(ctx.Spool, sender, string(nncp.TRx)) dstPath := filepath.Join(dstDirPath, pktName) if _, err = os.Stat(dstPath); err == nil || !os.IsNotExist(err) { - ctx.LogD("nncp-bundle", les, "Packet already exists") + ctx.LogD("bundle-rx-exists", les, func(les nncp.LEs) string { + return logMsg(les) + ": packet already exists" + }) continue } if _, err = os.Stat(dstPath + nncp.SeenSuffix); err == nil || !os.IsNotExist(err) { - ctx.LogD("nncp-bundle", les, "Packet already exists") + ctx.LogD("bundle-rx-seen", les, func(les nncp.LEs) string { + return logMsg(les) + ": packet already seen" + }) continue } if *doCheck { @@ -355,7 +425,7 @@ if _, err = nncp.CopyProgressed(hsh, tarR, "check", les, ctx.ShowPrgrs); err != nil { log.Fatalln("Error during copying:", err) } if nncp.Base32Codec.EncodeToString(hsh.Sum(nil)) != pktName { - ctx.LogE("nncp-bundle", les, errors.New("bad checksum"), "") + ctx.LogE("bundle-rx", les, errors.New("bad checksum"), logMsg) continue } } else { @@ -377,7 +447,7 @@ if err = tmp.Commit(dstDirPath); err != nil { log.Fatalln("Error during commiting:", err) } } else { - ctx.LogE("nncp-bundle", les, errors.New("bad checksum"), "") + ctx.LogE("bundle-rx", les, errors.New("bad checksum"), logMsg) tmp.Cancel() continue } @@ -428,7 +498,12 @@ les = append(les, nncp.LE{K: "Size", V: le.V}) break } } - ctx.LogI("nncp-bundle", les, "") + ctx.LogI("bundle-rx", les, func(les nncp.LEs) string { + return fmt.Sprintf( + "Bundle transfer, received from %s %s (%s)", + sender, pktName, humanize.IBytes(uint64(entry.Size)), + ) + }) } } } diff --git a/src/cmd/nncp-call/main.go b/src/cmd/nncp-call/main.go index 5d1cd353ca241ab07c48f64c6fe90b5fe7278920148a58ee067eea375d25f0ce..238729ee4c6b8f15a26ece168bc4ebd818deed30d2655ccdd2c599343c37aa0b 100644 --- a/src/cmd/nncp-call/main.go +++ b/src/cmd/nncp-call/main.go @@ -26,7 +26,7 @@ "os" "strings" "time" - "go.cypherpunks.ru/nncp/v5" + "go.cypherpunks.ru/nncp/v6" ) func usage() { diff --git a/src/cmd/nncp-caller/main.go b/src/cmd/nncp-caller/main.go index 2247758c7913ca8c69f24c6a17b12a53e6aa4f9278e88ab627112c65476110a8..a8493c91af7996e30995644414557a1b501471feb51ce5e9573bf994d030037a 100644 --- a/src/cmd/nncp-caller/main.go +++ b/src/cmd/nncp-caller/main.go @@ -27,7 +27,7 @@ "os" "sync" "time" - "go.cypherpunks.ru/nncp/v5" + "go.cypherpunks.ru/nncp/v6" ) func usage() { @@ -49,6 +49,13 @@ omitPrgrs = flag.Bool("noprogress", false, "Omit progress showing") debug = flag.Bool("debug", false, "Print debug messages") version = flag.Bool("version", false, "Print version information") warranty = flag.Bool("warranty", false, "Print warranty information") + + autoToss = flag.Bool("autotoss", false, "Toss after call is finished") + autoTossDoSeen = flag.Bool("autotoss-seen", false, "Create .seen files during tossing") + autoTossNoFile = flag.Bool("autotoss-nofile", false, "Do not process \"file\" packets during tossing") + autoTossNoFreq = flag.Bool("autotoss-nofreq", false, "Do not process \"freq\" packets during tossing") + autoTossNoExec = flag.Bool("autotoss-noexec", false, "Do not process \"exec\" packets during tossing") + autoTossNoTrns = flag.Bool("autotoss-notrns", false, "Do not process \"trns\" packets during tossing") ) flag.Usage = usage flag.Parse() @@ -86,7 +93,13 @@ if err != nil { log.Fatalln("Invalid NODE specified:", err) } if len(node.Calls) == 0 { - ctx.LogD("caller", nncp.LEs{{K: "Node", V: node.Id}}, "has no calls, skipping") + ctx.LogD( + "caller-no-calls", + nncp.LEs{{K: "Node", V: node.Id}}, + func(les nncp.LEs) string { + return fmt.Sprintf("%s node has no calls, skipping", node.Name) + }, + ) continue } nodes = append(nodes, node) @@ -94,7 +107,13 @@ } } else { for _, node := range ctx.Neigh { if len(node.Calls) == 0 { - ctx.LogD("caller", nncp.LEs{{K: "Node", V: node.Id}}, "has no calls, skipping") + ctx.LogD( + "caller-no-calls", + nncp.LEs{{K: "Node", V: node.Id}}, + func(les nncp.LEs) string { + return fmt.Sprintf("%s node has no calls, skipping", node.Name) + }, + ) continue } nodes = append(nodes, node) @@ -116,26 +135,35 @@ } else { addrs = append(addrs, *call.Addr) } les := nncp.LEs{{K: "Node", V: node.Id}, {K: "CallIndex", V: i}} + logMsg := func(les nncp.LEs) string { + return fmt.Sprintf("%s node, call %d", node.Name, i) + } for { n := time.Now() t := call.Cron.Next(n) - ctx.LogD("caller", les, t.String()) + ctx.LogD("caller-time", les, func(les nncp.LEs) string { + return logMsg(les) + ": " + t.String() + }) if t.IsZero() { - ctx.LogE("caller", les, errors.New("got zero time"), "") + ctx.LogE("caller", les, errors.New("got zero time"), logMsg) return } time.Sleep(t.Sub(n)) node.Lock() if node.Busy { node.Unlock() - ctx.LogD("caller", les, "busy") + ctx.LogD("caller-busy", les, func(les nncp.LEs) string { + return logMsg(les) + ": busy" + }) continue } else { node.Busy = true node.Unlock() if call.WhenTxExists && call.Xx != "TRx" { - ctx.LogD("caller", les, "checking tx existence") + ctx.LogD("caller", les, func(les nncp.LEs) string { + return logMsg(les) + ": checking tx existence" + }) txExists := false for job := range ctx.Jobs(node.Id, nncp.TTx) { if job.PktEnc.Nice > call.Nice { @@ -144,7 +172,9 @@ } txExists = true } if !txExists { - ctx.LogD("caller", les, "no tx") + ctx.LogD("caller-no-tx", les, func(les nncp.LEs) string { + return logMsg(les) + ": no tx" + }) node.Lock() node.Busy = false node.Unlock() @@ -154,15 +184,15 @@ } var autoTossFinish chan struct{} var autoTossBadCode chan bool - if call.AutoToss { + if call.AutoToss || *autoToss { autoTossFinish, autoTossBadCode = ctx.AutoToss( node.Id, call.Nice, - call.AutoTossDoSeen, - call.AutoTossNoFile, - call.AutoTossNoFreq, - call.AutoTossNoExec, - call.AutoTossNoTrns, + call.AutoTossDoSeen || *autoTossDoSeen, + call.AutoTossNoFile || *autoTossNoFile, + call.AutoTossNoFreq || *autoTossNoFreq, + call.AutoTossNoExec || *autoTossNoExec, + call.AutoTossNoTrns || *autoTossNoTrns, ) } @@ -180,7 +210,7 @@ call.NoCK, nil, ) - if call.AutoToss { + if call.AutoToss || *autoToss { close(autoTossFinish) <-autoTossBadCode } diff --git a/src/cmd/nncp-cfgenc/main.go b/src/cmd/nncp-cfgenc/main.go index 739f749c87c28a9427452b0b2f6367593dc161cc723600e6e3251c03e187fe36..e2d3f9e977f837097b395900446419d2f138e423077bff55623ce59e35302c22 100644 --- a/src/cmd/nncp-cfgenc/main.go +++ b/src/cmd/nncp-cfgenc/main.go @@ -28,7 +28,7 @@ "log" "os" xdr "github.com/davecgh/go-xdr/xdr2" - "go.cypherpunks.ru/nncp/v5" + "go.cypherpunks.ru/nncp/v6" "golang.org/x/crypto/blake2b" "golang.org/x/term" ) diff --git a/src/cmd/nncp-cfgmin/main.go b/src/cmd/nncp-cfgmin/main.go index c445d3b8f62dd50f9afa09ff704021e6ff661d084361577975a3bccc92ac6b31..37527a38526c2887500d1c32ff575bc696ab34abc61c1d0ebd275a40c42f6331 100644 --- a/src/cmd/nncp-cfgmin/main.go +++ b/src/cmd/nncp-cfgmin/main.go @@ -25,7 +25,7 @@ "log" "os" "github.com/hjson/hjson-go" - "go.cypherpunks.ru/nncp/v5" + "go.cypherpunks.ru/nncp/v6" ) func usage() { diff --git a/src/cmd/nncp-cfgnew/main.go b/src/cmd/nncp-cfgnew/main.go index d5d6d2d69f6030636663bd307fe7c26cfa5fb1dd4bcdae81c3ade9a0ec1b121e..7cc41c18c992b7e0631f7e7d87c507a9a888ab4bfd3f7f30aec46a640f7afa71 100644 --- a/src/cmd/nncp-cfgnew/main.go +++ b/src/cmd/nncp-cfgnew/main.go @@ -23,7 +23,7 @@ "flag" "fmt" "os" - "go.cypherpunks.ru/nncp/v5" + "go.cypherpunks.ru/nncp/v6" ) func usage() { diff --git a/src/cmd/nncp-check/main.go b/src/cmd/nncp-check/main.go index 806b4cb166c526cd32e7b382ca06f428803b04bb51fea9697a9b2c044d998335..201bb462a40ad99e6729a2a84939cc781b85c34e2ddd7439bb828ad6c633d705 100644 --- a/src/cmd/nncp-check/main.go +++ b/src/cmd/nncp-check/main.go @@ -25,7 +25,7 @@ "log" "os" "path/filepath" - "go.cypherpunks.ru/nncp/v5" + "go.cypherpunks.ru/nncp/v6" ) func usage() { diff --git a/src/cmd/nncp-cronexpr/main.go b/src/cmd/nncp-cronexpr/main.go index a2f7c2660be85ca8fb4affc679af0400b9059fd68ce6eafd32d270f3ad29a34c..f904ef052ff2f24f47df9add1faacbbebb0eee859c697b5607b354c4230bf37a 100644 --- a/src/cmd/nncp-cronexpr/main.go +++ b/src/cmd/nncp-cronexpr/main.go @@ -27,7 +27,7 @@ "strings" "time" "github.com/gorhill/cronexpr" - "go.cypherpunks.ru/nncp/v5" + "go.cypherpunks.ru/nncp/v6" ) func usage() { diff --git a/src/cmd/nncp-daemon/main.go b/src/cmd/nncp-daemon/main.go index 8b2a35e5c7939ae59357916a30fbee580046765fecc254e5513b8dcefd5088c8..889f04d0075d8c91b1dc162bcfe51aa3eb414694ed39d86a3b5d94b72fadc549 100644 --- a/src/cmd/nncp-daemon/main.go +++ b/src/cmd/nncp-daemon/main.go @@ -26,7 +26,8 @@ "net" "os" "time" - "go.cypherpunks.ru/nncp/v5" + "github.com/dustin/go-humanize" + "go.cypherpunks.ru/nncp/v6" "golang.org/x/net/netutil" ) @@ -79,17 +80,33 @@ Nice: nice, NoCK: noCK, } if err := state.StartR(conn); err == nil { - ctx.LogI("call-start", nncp.LEs{{K: "Node", V: state.Node.Id}}, "connected") + ctx.LogI( + "call-started", + nncp.LEs{{K: "Node", V: state.Node.Id}}, + func(les nncp.LEs) string { return "Connection with " + state.Node.Name }, + ) nodeIdC <- state.Node.Id state.Wait() - ctx.LogI("call-finish", nncp.LEs{ + ctx.LogI("call-finished", nncp.LEs{ {K: "Node", V: state.Node.Id}, {K: "Duration", V: int64(state.Duration.Seconds())}, {K: "RxBytes", V: state.RxBytes}, {K: "TxBytes", V: state.TxBytes}, {K: "RxSpeed", V: state.RxSpeed}, {K: "TxSpeed", V: state.TxSpeed}, - }, "") + }, func(les nncp.LEs) string { + return fmt.Sprintf( + "Finished call with %s (%d:%d:%d): %s received (%s/sec), %s transferred (%s/sec)", + state.Node.Name, + int(state.Duration.Hours()), + int(state.Duration.Minutes()), + int(state.Duration.Seconds()), + humanize.IBytes(uint64(state.RxBytes)), + humanize.IBytes(uint64(state.RxSpeed)), + humanize.IBytes(uint64(state.TxBytes)), + humanize.IBytes(uint64(state.TxSpeed)), + ) + }) } else { nodeId := "unknown" if state.Node == nil { @@ -98,7 +115,11 @@ } else { nodeIdC <- state.Node.Id nodeId = state.Node.Id.String() } - ctx.LogI("call-start", nncp.LEs{{K: "Node", V: nodeId}}, "connected") + ctx.LogI( + "call-started", + nncp.LEs{{K: "Node", V: nodeId}}, + func(les nncp.LEs) string { return "Connected to " + state.Node.Name }, + ) } close(nodeIdC) } @@ -197,7 +218,13 @@ conn, err := ln.Accept() if err != nil { log.Fatalln("Can not accept connection:", err) } - ctx.LogD("daemon", nncp.LEs{{K: "Addr", V: conn.RemoteAddr()}}, "accepted") + ctx.LogD( + "daemon-accepted", + nncp.LEs{{K: "Addr", V: conn.RemoteAddr()}}, + func(les nncp.LEs) string { + return "Accepted connection with " + conn.RemoteAddr().String() + }, + ) go func(conn net.Conn) { nodeIdC := make(chan *nncp.NodeId) go performSP(ctx, conn, nice, *noCK, nodeIdC) diff --git a/src/cmd/nncp-exec/main.go b/src/cmd/nncp-exec/main.go index 187f0603f7e2e895f65aec79f53582daa25a44ef0e6bf1b7edd84b79d49e7e89..ecd547f7e6ff8fa1960c69dc6dd1d34d82a77bb330f6cc2b11789542837121df 100644 --- a/src/cmd/nncp-exec/main.go +++ b/src/cmd/nncp-exec/main.go @@ -25,7 +25,7 @@ "fmt" "log" "os" - "go.cypherpunks.ru/nncp/v5" + "go.cypherpunks.ru/nncp/v6" ) func usage() { diff --git a/src/cmd/nncp-file/main.go b/src/cmd/nncp-file/main.go index 262f8f1ba3f8c8ca4c8b08b9d82977c39f74e3e246b70628aec7e8206bd7b9f9..e5fb93182da8dce9e173e422e75337112f5720cdb577af19cdc2231165f0d014 100644 --- a/src/cmd/nncp-file/main.go +++ b/src/cmd/nncp-file/main.go @@ -25,7 +25,7 @@ "log" "os" "strings" - "go.cypherpunks.ru/nncp/v5" + "go.cypherpunks.ru/nncp/v6" ) func usage() { diff --git a/src/cmd/nncp-freq/main.go b/src/cmd/nncp-freq/main.go index ad064bab2324cb74479789036681115a2e405f91396277f8407dc7e2f7f6b6bd..14710a1402f89af3de96002c28cf7828c092da41f04b806fab69723aa3cef662 100644 --- a/src/cmd/nncp-freq/main.go +++ b/src/cmd/nncp-freq/main.go @@ -26,7 +26,7 @@ "os" "path/filepath" "strings" - "go.cypherpunks.ru/nncp/v5" + "go.cypherpunks.ru/nncp/v6" ) func usage() { diff --git a/src/cmd/nncp-log/main.go b/src/cmd/nncp-log/main.go index 2fd0fae5741c8a064d96267535ea79052021525f2f4513eac6d1b7aab88d9186..5ede97e3ee47390a3bb1efd913ef7b2639d64a99c4a66da5638c0ad3e7483efe 100644 --- a/src/cmd/nncp-log/main.go +++ b/src/cmd/nncp-log/main.go @@ -25,7 +25,7 @@ "io" "log" "os" - "go.cypherpunks.ru/nncp/v5" + "go.cypherpunks.ru/nncp/v6" "go.cypherpunks.ru/recfile" ) diff --git a/src/cmd/nncp-pkt/main.go b/src/cmd/nncp-pkt/main.go index 08ff7300dea0358932c95b9afe61f583a9411a73f58754e94c3e85a17383f973..980ea2976231a880c52c9129f277dd2062357c7a8039865f7ff30e753dcc80cc 100644 --- a/src/cmd/nncp-pkt/main.go +++ b/src/cmd/nncp-pkt/main.go @@ -29,7 +29,7 @@ "os" xdr "github.com/davecgh/go-xdr/xdr2" "github.com/klauspost/compress/zstd" - "go.cypherpunks.ru/nncp/v5" + "go.cypherpunks.ru/nncp/v6" ) func usage() { diff --git a/src/cmd/nncp-reass/main.go b/src/cmd/nncp-reass/main.go index 3c9024a51b54b35a855e899d0e29fa9f345c09267170e9dd1b3516479cb19080..5a17b1de5eb11f526f962a07d9df625bf602b69be0a8d1375644f8cf1c45e070 100644 --- a/src/cmd/nncp-reass/main.go +++ b/src/cmd/nncp-reass/main.go @@ -35,7 +35,7 @@ "strings" xdr "github.com/davecgh/go-xdr/xdr2" "github.com/dustin/go-humanize" - "go.cypherpunks.ru/nncp/v5" + "go.cypherpunks.ru/nncp/v6" "golang.org/x/crypto/blake2b" ) @@ -58,19 +58,24 @@ log.Fatalln("Can not open file:", err) } var metaPkt nncp.ChunkedMeta les := nncp.LEs{{K: "Path", V: path}} + logMsg := func(les nncp.LEs) string { + return fmt.Sprintf("Reassembling chunked file \"%s\"", path) + } if _, err = xdr.Unmarshal(fd, &metaPkt); err != nil { - ctx.LogE("nncp-reass", les, err, "bad meta file") + ctx.LogE("reass-bad-meta", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": bad meta" + }) return false } fd.Close() // #nosec G104 if metaPkt.Magic != nncp.MagicNNCPMv1 { - ctx.LogE("nncp-reass", les, nncp.BadMagic, "") + ctx.LogE("reass", les, nncp.BadMagic, logMsg) return false } metaName := filepath.Base(path) if !strings.HasSuffix(metaName, nncp.ChunkedSuffixMeta) { - ctx.LogE("nncp-reass", les, errors.New("invalid filename suffix"), "") + ctx.LogE("reass", les, errors.New("invalid filename suffix"), logMsg) return false } mainName := strings.TrimSuffix(metaName, nncp.ChunkedSuffixMeta) @@ -108,7 +113,9 @@ for chunkNum, chunkPath := range chunksPaths { fi, err := os.Stat(chunkPath) lesChunk := append(les, nncp.LE{K: "Chunk", V: chunkNum}) if err != nil && os.IsNotExist(err) { - ctx.LogI("nncp-reass", lesChunk, "missing") + ctx.LogI("reass-chunk-miss", lesChunk, func(les nncp.LEs) string { + return fmt.Sprintf("%s: chunk %d missing", logMsg(les), chunkNum) + }) allChunksExist = false continue } @@ -119,7 +126,14 @@ } else { badSize = uint64(fi.Size()) != metaPkt.ChunkSize } if badSize { - ctx.LogE("nncp-reass", lesChunk, errors.New("invalid size"), "") + ctx.LogE( + "reass-chunk", + lesChunk, + errors.New("invalid size"), + func(les nncp.LEs) string { + return fmt.Sprintf("%s: chunk %d", logMsg(les), chunkNum) + }, + ) allChunksExist = false } } @@ -144,10 +158,7 @@ log.Fatalln(err) } if _, err = nncp.CopyProgressed( hsh, bufio.NewReader(fd), "check", - nncp.LEs{ - {K: "Pkt", V: chunkPath}, - {K: "FullSize", V: fi.Size()}, - }, + nncp.LEs{{K: "Pkt", V: chunkPath}, {K: "FullSize", V: fi.Size()}}, ctx.ShowPrgrs, ); err != nil { log.Fatalln(err) @@ -155,9 +166,12 @@ } fd.Close() // #nosec G104 if bytes.Compare(hsh.Sum(nil), metaPkt.Checksums[chunkNum][:]) != 0 { ctx.LogE( - "nncp-reass", + "reass-chunk", nncp.LEs{{K: "Path", V: path}, {K: "Chunk", V: chunkNum}}, - errors.New("checksum is bad"), "", + errors.New("checksum is bad"), + func(les nncp.LEs) string { + return fmt.Sprintf("%s: chunk %d", logMsg(les), chunkNum) + }, ) allChecksumsGood = false } @@ -166,7 +180,7 @@ if !allChecksumsGood { return false } if dryRun { - ctx.LogI("nncp-reass", nncp.LEs{{K: "path", V: path}}, "ready") + ctx.LogI("reass", nncp.LEs{{K: "path", V: path}}, logMsg) return true } @@ -181,7 +195,9 @@ if err != nil { log.Fatalln(err) } les = nncp.LEs{{K: "path", V: path}, {K: "Tmp", V: tmp.Name()}} - ctx.LogD("nncp-reass", les, "created") + ctx.LogD("reass-tmp-created", les, func(les nncp.LEs) string { + return fmt.Sprintf("%s: temporary %s created", logMsg(les), tmp.Name()) + }) dst = tmp } dstW := bufio.NewWriter(dst) @@ -198,10 +214,7 @@ log.Fatalln("Can not stat file:", err) } if _, err = nncp.CopyProgressed( dstW, bufio.NewReader(fd), "reass", - nncp.LEs{ - {K: "Pkt", V: chunkPath}, - {K: "FullSize", V: fi.Size()}, - }, + nncp.LEs{{K: "Pkt", V: chunkPath}, {K: "FullSize", V: fi.Size()}}, ctx.ShowPrgrs, ); err != nil { log.Fatalln(err) @@ -209,7 +222,13 @@ } fd.Close() // #nosec G104 if !keep { if err = os.Remove(chunkPath); err != nil { - ctx.LogE("nncp-reass", append(les, nncp.LE{K: "Chunk", V: chunkNum}), err, "") + ctx.LogE( + "reass-chunk", + append(les, nncp.LE{K: "Chunk", V: chunkNum}), err, + func(les nncp.LEs) string { + return fmt.Sprintf("%s: chunk %d", logMsg(les), chunkNum) + }, + ) hasErrors = true } } @@ -225,15 +244,21 @@ if err = tmp.Close(); err != nil { log.Fatalln("Can not close:", err) } } - ctx.LogD("nncp-reass", les, "written") + ctx.LogD("reass-written", les, func(les nncp.LEs) string { + return logMsg(les) + ": written" + }) if !keep { if err = os.Remove(path); err != nil { - ctx.LogE("nncp-reass", les, err, "") + ctx.LogE("reass-removing", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": removing" + }) hasErrors = true } } if stdout { - ctx.LogI("nncp-reass", nncp.LEs{{K: "Path", V: path}}, "done") + ctx.LogI("reass", nncp.LEs{{K: "Path", V: path}}, func(les nncp.LEs) string { + return logMsg(les) + ": done" + }) return !hasErrors } @@ -256,21 +281,26 @@ } if err = nncp.DirSync(mainDir); err != nil { log.Fatalln(err) } - ctx.LogI("nncp-reass", nncp.LEs{{K: "Path", V: path}}, "done") + ctx.LogI("reass", nncp.LEs{{K: "Path", V: path}}, func(les nncp.LEs) string { + return logMsg(les) + ": done" + }) return !hasErrors } func findMetas(ctx *nncp.Ctx, dirPath string) []string { dir, err := os.Open(dirPath) defer dir.Close() + logMsg := func(les nncp.LEs) string { + return "Finding .meta in " + dirPath + } if err != nil { - ctx.LogE("nncp-reass", nncp.LEs{{K: "Path", V: dirPath}}, err, "") + ctx.LogE("reass", nncp.LEs{{K: "Path", V: dirPath}}, err, logMsg) return nil } fis, err := dir.Readdir(0) dir.Close() // #nosec G104 if err != nil { - ctx.LogE("nncp-reass", nncp.LEs{{K: "Path", V: dirPath}}, err, "") + ctx.LogE("reass", nncp.LEs{{K: "Path", V: dirPath}}, err, logMsg) return nil } metaPaths := make([]string, 0) diff --git a/src/cmd/nncp-rm/main.go b/src/cmd/nncp-rm/main.go index c434e9315ae670a36bd094c9d3a4bb6ecfd8a2d0eaa298adb99ace6cc917dcce..b084ddf9b0aa9c271215d152564da2a8b70e622694f690c22b49d982edfd800c 100644 --- a/src/cmd/nncp-rm/main.go +++ b/src/cmd/nncp-rm/main.go @@ -29,7 +29,7 @@ "strconv" "strings" "time" - "go.cypherpunks.ru/nncp/v5" + "go.cypherpunks.ru/nncp/v6" ) func usage() { @@ -122,10 +122,14 @@ if info.IsDir() { return nil } if now.Sub(info.ModTime()) < oldBoundary { - ctx.LogD("nncp-rm", nncp.LEs{{K: "File", V: path}}, "too fresh, skipping") + ctx.LogD("rm-skip", nncp.LEs{{K: "File", V: path}}, func(les nncp.LEs) string { + return fmt.Sprintf("File %s: too fresh, skipping", path) + }) return nil } - ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "") + ctx.LogI("rm", nncp.LEs{{K: "File", V: path}}, func(les nncp.LEs) string { + return fmt.Sprintf("File %s: removed", path) + }) if *dryRun { return nil } @@ -145,7 +149,9 @@ if info.IsDir() { return nil } if strings.HasSuffix(info.Name(), ".lock") { - ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "") + ctx.LogI("rm", nncp.LEs{{K: "File", V: path}}, func(les nncp.LEs) string { + return fmt.Sprintf("File %s: removed", path) + }) if *dryRun { return nil } @@ -176,22 +182,27 @@ } if info.IsDir() { return nil } + logMsg := func(les nncp.LEs) string { + return fmt.Sprintf("File %s: removed", path) + } if now.Sub(info.ModTime()) < oldBoundary { - ctx.LogD("nncp-rm", nncp.LEs{{K: "File", V: path}}, "too fresh, skipping") + ctx.LogD("rm-skip", nncp.LEs{{K: "File", V: path}}, func(les nncp.LEs) string { + return fmt.Sprintf("File %s: too fresh, skipping", path) + }) return nil } if (*doSeen && strings.HasSuffix(info.Name(), nncp.SeenSuffix)) || (*doNoCK && strings.HasSuffix(info.Name(), nncp.NoCKSuffix)) || (*doHdr && strings.HasSuffix(info.Name(), nncp.HdrSuffix)) || (*doPart && strings.HasSuffix(info.Name(), nncp.PartSuffix)) { - ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "") + ctx.LogI("rm", nncp.LEs{{K: "File", V: path}}, logMsg) if *dryRun { return nil } return os.Remove(path) } if *pktRaw != "" && filepath.Base(info.Name()) == *pktRaw { - ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "") + ctx.LogI("rm", nncp.LEs{{K: "File", V: path}}, logMsg) if *dryRun { return nil } @@ -200,7 +211,7 @@ } if !*doSeen && !*doNoCK && !*doHdr && !*doPart && (*doRx || *doTx) && ((*doRx && xx == nncp.TRx) || (*doTx && xx == nncp.TTx)) { - ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "") + ctx.LogI("rm", nncp.LEs{{K: "File", V: path}}, logMsg) if *dryRun { return nil } diff --git a/src/cmd/nncp-stat/main.go b/src/cmd/nncp-stat/main.go index 52d1fbf9523096c24f02e1736dafdb8a0bbfc32b8bab756d1469db9d2ffe767b..56c0d4c1b6b61a964ffe9c5c1214554ae87f4e33ad2047040fc23d9cc8e38053 100644 --- a/src/cmd/nncp-stat/main.go +++ b/src/cmd/nncp-stat/main.go @@ -26,7 +26,7 @@ "os" "sort" "github.com/dustin/go-humanize" - "go.cypherpunks.ru/nncp/v5" + "go.cypherpunks.ru/nncp/v6" ) func usage() { diff --git a/src/cmd/nncp-toss/main.go b/src/cmd/nncp-toss/main.go index df127da450848b3197136ef203d2ece94d69446ce29cdbf21a0e18eb8a413e96..108780b7e98fa2ab38cb945e3ab09fdf1ad3144b870741052fbad96e74b24ccf 100644 --- a/src/cmd/nncp-toss/main.go +++ b/src/cmd/nncp-toss/main.go @@ -25,7 +25,7 @@ "log" "os" "time" - "go.cypherpunks.ru/nncp/v5" + "go.cypherpunks.ru/nncp/v6" ) func usage() { diff --git a/src/cmd/nncp-xfer/main.go b/src/cmd/nncp-xfer/main.go index 984834569b2b91033a4e4852631de787700d06a516dcce3731b5e68548054401..48f07d7ab57199d7220ff0cba6f64cc5de686d9df6480816a3e243d8a6b0090a 100644 --- a/src/cmd/nncp-xfer/main.go +++ b/src/cmd/nncp-xfer/main.go @@ -28,7 +28,8 @@ "log" "os" "path/filepath" - "go.cypherpunks.ru/nncp/v5" + "github.com/dustin/go-humanize" + "go.cypherpunks.ru/nncp/v6" ) func usage() { @@ -105,6 +106,7 @@ isBad := false var dir *os.File var fis []os.FileInfo var les nncp.LEs + var logMsg func(les nncp.LEs) string if *txOnly { goto Tx } @@ -112,26 +114,37 @@ les = nncp.LEs{ {K: "XX", V: string(nncp.TRx)}, {K: "Dir", V: selfPath}, } - ctx.LogD("nncp-xfer", les, "self") + logMsg = func(les nncp.LEs) string { + return "Packet transfer, received from self" + } + ctx.LogD("xfer-self", les, logMsg) if _, err = os.Stat(selfPath); err != nil { if os.IsNotExist(err) { - ctx.LogD("nncp-xfer", les, "no dir") + ctx.LogD("xfer-self-no-dir", les, func(les nncp.LEs) string { + return logMsg(les) + ": no directory" + }) goto Tx } - ctx.LogE("nncp-xfer", les, err, "stat") + ctx.LogE("xfer-self-stat", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": stating" + }) isBad = true goto Tx } dir, err = os.Open(selfPath) if err != nil { - ctx.LogE("nncp-xfer", les, err, "open") + ctx.LogE("xfer-self-open", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": opening" + }) isBad = true goto Tx } fis, err = dir.Readdir(0) dir.Close() // #nosec G104 if err != nil { - ctx.LogE("nncp-xfer", les, err, "read") + ctx.LogE("xfer-self-read", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": reading" + }) isBad = true goto Tx } @@ -141,28 +154,41 @@ continue } nodeId, err := nncp.NodeIdFromString(fi.Name()) les := append(les, nncp.LE{K: "Node", V: fi.Name()}) + logMsg := func(les nncp.LEs) string { + return "Packet transfer, received from " + ctx.NodeName(nodeId) + } if err != nil { - ctx.LogD("nncp-xfer", les, "is not NodeId") + ctx.LogD("xfer-rx-not-node", les, func(les nncp.LEs) string { + return logMsg(les) + ": is not NodeId" + }) continue } if nodeOnly != nil && *nodeId != *nodeOnly.Id { - ctx.LogD("nncp-xfer", les, "skip") + ctx.LogD("xfer-rx-skip", les, func(les nncp.LEs) string { + return logMsg(les) + ": skipping" + }) continue } if _, known := ctx.Neigh[*nodeId]; !known { - ctx.LogD("nncp-xfer", les, "unknown") + ctx.LogD("xfer-rx-unknown", les, func(les nncp.LEs) string { + return logMsg(les) + ": unknown" + }) continue } dir, err = os.Open(filepath.Join(selfPath, fi.Name())) if err != nil { - ctx.LogE("nncp-xfer", les, err, "open") + ctx.LogE("xfer-rx-open", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": opening" + }) isBad = true continue } fisInt, err := dir.Readdir(0) dir.Close() // #nosec G104 if err != nil { - ctx.LogE("nncp-xfer", les, err, "read") + ctx.LogE("xfer-rx-read", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": reading" + }) isBad = true continue } @@ -176,26 +202,45 @@ continue } filename := filepath.Join(dir.Name(), fiInt.Name()) les := append(les, nncp.LE{K: "File", V: filename}) + logMsg := func(les nncp.LEs) string { + return fmt.Sprintf( + "Packet transfer, received from %s: %s", + ctx.NodeName(nodeId), filename, + ) + } fd, err := os.Open(filename) if err != nil { - ctx.LogE("nncp-xfer", les, err, "open") + ctx.LogE("xfer-rx-open", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": opening" + }) isBad = true continue } pktEnc, pktEncRaw, err := ctx.HdrRead(fd) if err != nil || pktEnc.Magic != nncp.MagicNNCPEv4 { - ctx.LogD("nncp-xfer", les, "is not a packet") + ctx.LogD("xfer-rx-not-packet", les, func(les nncp.LEs) string { + return logMsg(les) + ": is not a packet" + }) fd.Close() // #nosec G104 continue } if pktEnc.Nice > nice { - ctx.LogD("nncp-xfer", les, "too nice") + ctx.LogD("xfer-rx-too-nice", les, func(les nncp.LEs) string { + return logMsg(les) + ": too nice" + }) fd.Close() // #nosec G104 continue } les = append(les, nncp.LE{K: "Size", V: fiInt.Size()}) + logMsg = func(les nncp.LEs) string { + return fmt.Sprintf( + "Packet transfer, received from %s: %s (%s)", + ctx.NodeName(nodeId), filename, + humanize.IBytes(uint64(fiInt.Size())), + ) + } if !ctx.IsEnoughSpace(fiInt.Size()) { - ctx.LogE("nncp-xfer", les, errors.New("is not enough space"), "") + ctx.LogE("xfer-rx", les, errors.New("is not enough space"), logMsg) fd.Close() // #nosec G104 continue } @@ -213,19 +258,20 @@ if err == nil { err = w.Close() } if err != nil { - ctx.LogE("nncp-xfer", les, err, "copy") + ctx.LogE("xfer-rx", les, err, logMsg) w.CloseWithError(err) // #nosec G104 } }() if _, err = nncp.CopyProgressed( tmp.W, r, "Rx", - append(les, nncp.LEs{ - {K: "Pkt", V: filename}, - {K: "FullSize", V: fiInt.Size()}, - }...), + append( + les, + nncp.LE{K: "Pkt", V: filename}, + nncp.LE{K: "FullSize", V: fiInt.Size()}, + ), ctx.ShowPrgrs, ); err != nil { - ctx.LogE("nncp-xfer", les, err, "copy") + ctx.LogE("xfer-rx", les, err, logMsg) isBad = true } fd.Close() // #nosec G104 @@ -240,10 +286,10 @@ string(nncp.TRx), )); err != nil { log.Fatalln(err) } - ctx.LogI("nncp-xfer", les, "") + ctx.LogI("xfer-rx", les, logMsg) if !*keep { if err = os.Remove(filename); err != nil { - ctx.LogE("nncp-xfer", les, err, "remove") + ctx.LogE("xfer-rx-remove", les, err, logMsg) isBad = true } } @@ -266,12 +312,14 @@ } return } for nodeId := range ctx.Neigh { - les := nncp.LEs{ - {K: "XX", V: string(nncp.TTx)}, - {K: "Node", V: nodeId}, + les := nncp.LEs{{K: "XX", V: string(nncp.TTx)}, {K: "Node", V: nodeId}} + logMsg := func(les nncp.LEs) string { + return "Packet transfer, sent to " + ctx.NodeName(&nodeId) } if nodeOnly != nil && nodeId != *nodeOnly.Id { - ctx.LogD("nncp-xfer", les, "skip") + ctx.LogD("xfer-tx-skip", les, func(les nncp.LEs) string { + return logMsg(les) + ": skipping" + }) continue } dirLock, err := ctx.LockDir(&nodeId, string(nncp.TTx)) @@ -280,41 +328,55 @@ continue } nodePath := filepath.Join(flag.Arg(0), nodeId.String()) les = append(les, nncp.LE{K: "Dir", V: nodePath}) + logMsg = func(les nncp.LEs) string { + return fmt.Sprintf( + "Packet transfer, sent to %s: directory %s", + ctx.NodeName(&nodeId), nodePath, + ) + } _, err = os.Stat(nodePath) if err != nil { if os.IsNotExist(err) { - ctx.LogD("nncp-xfer", les, "does not exist") + ctx.LogD("xfer-tx-not-exist", les, func(les nncp.LEs) string { + return logMsg(les) + ": does not exist" + }) if !*mkdir { ctx.UnlockDir(dirLock) continue } if err = os.Mkdir(nodePath, os.FileMode(0777)); err != nil { ctx.UnlockDir(dirLock) - ctx.LogE("nncp-xfer", les, err, "mkdir") + ctx.LogE("xfer-tx-mkdir", les, err, logMsg) isBad = true continue } } else { ctx.UnlockDir(dirLock) - ctx.LogE("nncp-xfer", les, err, "stat") + ctx.LogE("xfer-tx", les, err, logMsg) isBad = true continue } } dstPath := filepath.Join(nodePath, ctx.SelfId.String()) les[len(les)-1].V = dstPath + logMsg = func(les nncp.LEs) string { + return fmt.Sprintf( + "Packet transfer, sent to %s: directory %s", + ctx.NodeName(&nodeId), dstPath, + ) + } _, err = os.Stat(dstPath) if err != nil { if os.IsNotExist(err) { if err = os.Mkdir(dstPath, os.FileMode(0777)); err != nil { ctx.UnlockDir(dirLock) - ctx.LogE("nncp-xfer", les, err, "mkdir") + ctx.LogE("xfer-tx-mkdir", les, err, logMsg) isBad = true continue } } else { ctx.UnlockDir(dirLock) - ctx.LogE("nncp-xfer", les, err, "stat") + ctx.LogE("xfer-tx", les, err, logMsg) isBad = true continue } @@ -323,29 +385,47 @@ les = les[:len(les)-1] for job := range ctx.Jobs(&nodeId, nncp.TTx) { pktName := filepath.Base(job.Path) les := append(les, nncp.LE{K: "Pkt", V: pktName}) + logMsg = func(les nncp.LEs) string { + return fmt.Sprintf( + "Packet transfer, sent to %s: %s", + ctx.NodeName(&nodeId), pktName, + ) + } if job.PktEnc.Nice > nice { - ctx.LogD("nncp-xfer", les, "too nice") + ctx.LogD("xfer-tx-too-nice", les, func(les nncp.LEs) string { + return logMsg(les) + ": too nice" + }) continue } if _, err = os.Stat(filepath.Join(dstPath, pktName)); err == nil || !os.IsNotExist(err) { - ctx.LogD("nncp-xfer", les, "already exists") + ctx.LogD("xfer-tx-exists", les, func(les nncp.LEs) string { + return logMsg(les) + ": already exists" + }) continue } if _, err = os.Stat(filepath.Join(dstPath, pktName+nncp.SeenSuffix)); err == nil || !os.IsNotExist(err) { - ctx.LogD("nncp-xfer", les, "already exists") + ctx.LogD("xfer-tx-seen", les, func(les nncp.LEs) string { + return logMsg(les) + ": already seen" + }) continue } tmp, err := nncp.TempFile(dstPath, "xfer") if err != nil { - ctx.LogE("nncp-xfer", les, err, "mktemp") + ctx.LogE("xfer-tx-mktemp", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": mktemp" + }) isBad = true break } les = append(les, nncp.LE{K: "Tmp", V: tmp.Name()}) - ctx.LogD("nncp-xfer", les, "created") + ctx.LogD("xfer-tx-tmp-create", les, func(les nncp.LEs) string { + return fmt.Sprintf("%s: temporary %s created", logMsg(les), tmp.Name()) + }) fd, err := os.Open(job.Path) if err != nil { - ctx.LogE("nncp-xfer", les, err, "open") + ctx.LogE("xfer-tx-open", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": opening" + }) tmp.Close() // #nosec G104 isBad = true continue @@ -358,42 +438,64 @@ ctx.ShowPrgrs, ) fd.Close() // #nosec G104 if err != nil { - ctx.LogE("nncp-xfer", les, err, "copy") + ctx.LogE("xfer-tx-copy", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": copying" + }) tmp.Close() // #nosec G104 isBad = true continue } if err = bufW.Flush(); err != nil { tmp.Close() // #nosec G104 - ctx.LogE("nncp-xfer", les, err, "flush") + ctx.LogE("xfer-tx-flush", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": flushing" + }) isBad = true continue } if err = tmp.Sync(); err != nil { tmp.Close() // #nosec G104 - ctx.LogE("nncp-xfer", les, err, "sync") + ctx.LogE("xfer-tx-sync", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": syncing" + }) isBad = true continue } if err = tmp.Close(); err != nil { - ctx.LogE("nncp-xfer", les, err, "sync") + ctx.LogE("xfer-tx-close", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": closing" + }) } if err = os.Rename(tmp.Name(), filepath.Join(dstPath, pktName)); err != nil { - ctx.LogE("nncp-xfer", les, err, "rename") + ctx.LogE("xfer-tx-rename", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": renaming" + }) isBad = true continue } if err = nncp.DirSync(dstPath); err != nil { - ctx.LogE("nncp-xfer", les, err, "sync") + ctx.LogE("xfer-tx-dirsync", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": dirsyncing" + }) isBad = true continue } os.Remove(filepath.Join(dstPath, pktName+".part")) // #nosec G104 les = les[:len(les)-1] - ctx.LogI("nncp-xfer", append(les, nncp.LE{K: "Size", V: copied}), "") + ctx.LogI( + "xfer-tx", + append(les, nncp.LE{K: "Size", V: copied}), + func(les nncp.LEs) string { + return fmt.Sprintf( + "%s (%s)", logMsg(les), humanize.IBytes(uint64(copied)), + ) + }, + ) if !*keep { if err = os.Remove(job.Path); err != nil { - ctx.LogE("nncp-xfer", les, err, "remove") + ctx.LogE("xfer-tx-remove", les, err, func(les nncp.LEs) string { + return logMsg(les) + ": removing" + }) isBad = true } else if ctx.HdrUsage { os.Remove(job.Path + nncp.HdrSuffix) diff --git a/src/ctx.go b/src/ctx.go index ef923c18a0164bcbdf2a5dfb3db58f6299ff482a26ac9c9c06f7752780adaaab..30deaffa96e232c97421deefa3db792e826a8a5ff243de1f9b47039c4f76a7da 100644 --- a/src/ctx.go +++ b/src/ctx.go @@ -19,6 +19,7 @@ package nncp import ( "errors" + "fmt" "io/ioutil" "log" "os" @@ -65,13 +66,16 @@ } func (ctx *Ctx) ensureRxDir(nodeId *NodeId) error { dirPath := filepath.Join(ctx.Spool, nodeId.String(), string(TRx)) + logMsg := func(les LEs) string { + return fmt.Sprintf("Ensuring directory %s existence", dirPath) + } if err := os.MkdirAll(dirPath, os.FileMode(0777)); err != nil { - ctx.LogE("dir-ensure", LEs{{"Dir", dirPath}}, err, "") + ctx.LogE("dir-ensure-mkdir", LEs{{"Dir", dirPath}}, err, logMsg) return err } fd, err := os.Open(dirPath) if err != nil { - ctx.LogE("dir-ensure", LEs{{"Dir", dirPath}}, err, "") + ctx.LogE("dir-ensure-open", LEs{{"Dir", dirPath}}, err, logMsg) return err } return fd.Close() diff --git a/src/go.mod b/src/go.mod index 394d45b08db362efa66d69385ae788a6042ae4614b73fdcbafd5850e39c2437b..5ff7582ca1e90c892e794392c2af756101eccb282cb8c8f4d87fdf9129c0cf0f 100644 --- a/src/go.mod +++ b/src/go.mod @@ -1,4 +1,4 @@ -module go.cypherpunks.ru/nncp/v5 +module go.cypherpunks.ru/nncp/v6 require ( github.com/davecgh/go-xdr v0.0.0-20161123171359-e6a2ba005892 diff --git a/src/humanizer.go b/src/humanizer.go index cb963052b76863fc77cebbdda99775af064aa27ae1c67ef206ee1491729447f2..8f39b1e953201ebcd7ec00b63cee89ed6dc2b54274de3cc19259a01f537daca0 100644 --- a/src/humanizer.go +++ b/src/humanizer.go @@ -18,15 +18,21 @@ package nncp import ( - "errors" "fmt" - "strconv" "strings" "time" - "github.com/dustin/go-humanize" "go.cypherpunks.ru/recfile" ) + +func (ctx *Ctx) NodeName(id *NodeId) string { + idS := id.String() + node, err := ctx.FindNode(idS) + if err == nil { + return node.Name + } + return idS +} func (ctx *Ctx) HumanizeRec(rec string) string { r := recfile.NewReader(strings.NewReader(rec)) @@ -42,256 +48,15 @@ return humanized } func (ctx *Ctx) Humanize(le map[string]string) (string, error) { - nodeS := le["Node"] - node, err := ctx.FindNode(nodeS) - if err == nil { - nodeS = node.Name - } - var sizeParsed uint64 - var size string - if sizeRaw, exists := le["Size"]; exists { - sizeParsed, err = strconv.ParseUint(sizeRaw, 10, 64) - if err != nil { - return "", err - } - size = humanize.IBytes(sizeParsed) - } - - var msg string - switch le["Who"] { - case "tx": - switch le["Type"] { - case "file": - msg = fmt.Sprintf( - "File %s (%s) transfer to %s:%s: %s", - le["Src"], size, nodeS, le["Dst"], le["Msg"], - ) - case "freq": - msg = fmt.Sprintf( - "File request from %s:%s to %s: %s", - nodeS, le["Src"], le["Dst"], le["Msg"], - ) - case "exec": - msg = fmt.Sprintf( - "Exec to %s@%s (%s): %s", - nodeS, le["Dst"], size, le["Msg"], - ) - case "trns": - msg = fmt.Sprintf( - "Transitional packet to %s (%s) (nice %s): %s", - nodeS, size, le["Nice"], le["Msg"], - ) - default: - return "", errors.New("unknown \"tx\" type") - } - if err, exists := le["Err"]; exists { - msg += ": " + err - } - case "rx": - switch le["Type"] { - case "exec": - msg = fmt.Sprintf("Got exec from %s to %s (%s)", nodeS, le["Dst"], size) - case "file": - msg = fmt.Sprintf("Got file %s (%s) from %s", le["Dst"], size, nodeS) - case "freq": - msg = fmt.Sprintf("Got file request %s to %s", le["Src"], nodeS) - case "trns": - nodeT := le["Dst"] - node, err := ctx.FindNode(nodeT) - if err == nil { - nodeT = node.Name - } - msg = fmt.Sprintf( - "Got transitional packet from %s to %s (%s)", - nodeS, nodeT, size, - ) - default: - return "", errors.New("unknown \"rx\" type") - } - if err, exists := le["Err"]; exists { - msg += ": " + err - } - case "check": - msg = fmt.Sprintf("Checking: %s/%s/%s", le["Node"], le["XX"], le["Pkt"]) - if err, exists := le["Err"]; exists { - msg += fmt.Sprintf(" %s", err) - } - case "nncp-xfer": - switch le["XX"] { - case "rx": - msg = "Packet transfer, received from" - case "tx": - msg = "Packet transfer, sent to" - default: - return "", errors.New("unknown XX") - } - if nodeS != "" { - msg += " node " + nodeS - } - if size != "" { - msg += fmt.Sprintf(" (%s)", size) - } - if err, exists := le["Err"]; exists { - msg += ": " + err - } else { - msg += " " + le["Msg"] - } - case "nncp-bundle": - switch le["XX"] { - case "rx": - msg = "Bundle transfer, received from" - case "tx": - msg = "Bundle transfer, sent to" - default: - return "", errors.New("unknown XX") - } - if nodeS != "" { - msg += " node " + nodeS - } - msg += " " + le["Pkt"] - if size != "" { - msg += fmt.Sprintf(" (%s)", size) - } - if err, exists := le["Err"]; exists { - msg += ": " + err - } - case "nncp-rm": - msg += "removing " + le["File"] - case "call-start": - msg = fmt.Sprintf("Connection to %s", nodeS) - if err, exists := le["Err"]; exists { - msg += ": " + err - } - case "call-finish": - rx, err := strconv.ParseUint(le["RxBytes"], 10, 64) - if err != nil { - return "", err - } - rxs, err := strconv.ParseUint(le["RxSpeed"], 10, 64) - if err != nil { - return "", err - } - tx, err := strconv.ParseUint(le["TxBytes"], 10, 64) - if err != nil { - return "", err - } - txs, err := strconv.ParseUint(le["TxSpeed"], 10, 64) - if err != nil { - return "", err - } - msg = fmt.Sprintf( - "Finished call with %s: %s received (%s/sec), %s transferred (%s/sec)", - nodeS, - humanize.IBytes(uint64(rx)), humanize.IBytes(uint64(rxs)), - humanize.IBytes(uint64(tx)), humanize.IBytes(uint64(txs)), - ) - case "sp-start": - if nodeS == "" { - msg += "SP" - if peer, exists := le["Peer"]; exists { - msg += fmt.Sprintf(": %s", peer) - } - } else { - nice, err := NicenessParse(le["Nice"]) - if err != nil { - return "", err - } - msg += fmt.Sprintf("SP with %s (nice %s)", nodeS, NicenessFmt(nice)) - } - if m, exists := le["Msg"]; exists { - msg += ": " + m - } - if err, exists := le["Err"]; exists { - msg += ": " + err - } - case "sp-info": - nice, err := NicenessParse(le["Nice"]) - if err != nil { - return "", err - } - msg = fmt.Sprintf( - "Packet %s (%s) (nice %s)", - le["Pkt"], size, NicenessFmt(nice), - ) - if offset := le["Offset"]; offset != "" { - offsetParsed, err := strconv.ParseUint(offset, 10, 64) - if err != nil { - return "", err - } - msg += fmt.Sprintf(": %d%%", 100*offsetParsed/sizeParsed) - } - if m, exists := le["Msg"]; exists { - msg += ": " + m - } - case "sp-infos": - switch le["XX"] { - case "rx": - msg = fmt.Sprintf("%s has got for us: ", nodeS) - case "tx": - msg = fmt.Sprintf("We have got for %s: ", nodeS) - default: - return "", errors.New("unknown XX") - } - msg += fmt.Sprintf("%s packets, %s", le["Pkts"], size) - case "sp-process": - msg = fmt.Sprintf("%s has %s (%s): %s", nodeS, le["Pkt"], size, le["Msg"]) - case "sp-file": - switch le["XX"] { - case "rx": - msg = "Got packet " - case "tx": - msg = "Sent packet " - default: - return "", errors.New("unknown XX") - } - fullsize, err := strconv.ParseUint(le["FullSize"], 10, 64) - if err != nil { - return "", err - } - msg += fmt.Sprintf( - "%s %d%% (%s / %s)", - le["Pkt"], - 100*sizeParsed/fullsize, - humanize.IBytes(uint64(sizeParsed)), - humanize.IBytes(uint64(fullsize)), - ) - case "sp-done": - switch le["XX"] { - case "rx": - msg = fmt.Sprintf("Packet %s is retreived (%s)", le["Pkt"], size) - case "tx": - msg = fmt.Sprintf("Packet %s is sent", le["Pkt"]) - default: - return "", errors.New("unknown XX") - } - case "nncp-reass": - chunkNum, exists := le["Chunk"] - if exists { - msg = fmt.Sprintf( - "Reassembling chunked file \"%s\" (chunk %s): %s", - le["Path"], chunkNum, le["Msg"], - ) - } else { - msg = fmt.Sprintf( - "Reassembling chunked file \"%s\": %s", - le["Path"], le["Msg"], - ) - } - if err, exists := le["Err"]; exists { - msg += ": " + err - } - case "lockdir": - msg = fmt.Sprintf("Acquire lock for %s: %s", le["Path"], le["Err"]) - default: - return "", errors.New("unknown Who") - } when, err := time.Parse(time.RFC3339Nano, le["When"]) if err != nil { return "", err } var level string - if _, isErr := le["Err"]; isErr { + msg := le["Msg"] + if errMsg, isErr := le["Err"]; isErr { level = "ERROR " + msg += ": " + errMsg } return fmt.Sprintf("%s %s%s", when.Format(time.RFC3339), level, msg), nil } diff --git a/src/jobs.go b/src/jobs.go index ebf857c08ddd38cfe70f25b9c91777c309717e8c2702d0a4d3d09b69d85a7d7f..1dd48522c05ba1dec631a4483debe19459655ceedf0c7d61ccc9e80febd0cb4e 100644 --- a/src/jobs.go +++ b/src/jobs.go @@ -19,11 +19,13 @@ package nncp import ( "bytes" + "fmt" "os" "path/filepath" "strings" xdr "github.com/davecgh/go-xdr/xdr2" + "github.com/dustin/go-humanize" ) type TRxTx string @@ -58,21 +60,29 @@ func (ctx *Ctx) HdrWrite(pktEncRaw []byte, tgt string) error { tmpHdr, err := ctx.NewTmpFile() if err != nil { - ctx.LogE("hdr-write", []LE{}, err, "new") + ctx.LogE("hdr-write-tmp-new", nil, err, func(les LEs) string { + return "Header writing: new temporary file" + }) return err } if _, err = tmpHdr.Write(pktEncRaw); err != nil { - ctx.LogE("hdr-write", []LE{}, err, "write") + ctx.LogE("hdr-write-write", nil, err, func(les LEs) string { + return "Header writing: writing" + }) os.Remove(tmpHdr.Name()) return err } if err = tmpHdr.Close(); err != nil { - ctx.LogE("hdr-write", []LE{}, err, "close") + ctx.LogE("hdr-write-close", nil, err, func(les LEs) string { + return "Header writing: closing" + }) os.Remove(tmpHdr.Name()) return err } if err = os.Rename(tmpHdr.Name(), tgt+HdrSuffix); err != nil { - ctx.LogE("hdr-write", []LE{}, err, "rename") + ctx.LogE("hdr-write-rename", nil, err, func(les LEs) string { + return "Header writing: renaming" + }) return err } return err @@ -132,13 +142,20 @@ fd.Close() if err != nil || pktEnc.Magic != MagicNNCPEv4 { continue } - ctx.LogD("jobs", LEs{ + ctx.LogD("job", LEs{ {"XX", string(xx)}, {"Node", pktEnc.Sender}, {"Name", name}, {"Nice", int(pktEnc.Nice)}, {"Size", fi.Size()}, - }, "taken") + }, func(les LEs) string { + return fmt.Sprintf( + "Job %s/%s/%s nice: %s size: %s", + pktEnc.Sender, string(xx), name, + NicenessFmt(pktEnc.Nice), + humanize.IBytes(uint64(fi.Size())), + ) + }) if !hdrExists && ctx.HdrUsage { ctx.HdrWrite(pktEncRaw, pth) } diff --git a/src/lockdir.go b/src/lockdir.go index 554cbd12d145eb1c7157b6b52d47e6671680daf271ebe1fd9914a0422ade8d79..762f990874c92824eb64202d4b4c7a79a87003a84bda896bccbc88b80bb21b3f 100644 --- a/src/lockdir.go +++ b/src/lockdir.go @@ -26,7 +26,6 @@ ) func (ctx *Ctx) LockDir(nodeId *NodeId, lockCtx string) (*os.File, error) { if err := ctx.ensureRxDir(nodeId); err != nil { - ctx.LogE("lockdir", LEs{}, err, "") return nil, err } lockPath := filepath.Join(ctx.Spool, nodeId.String(), lockCtx) + ".lock" @@ -36,12 +35,16 @@ os.O_CREATE|os.O_WRONLY, os.FileMode(0666), ) if err != nil { - ctx.LogE("lockdir", LEs{{"path", lockPath}}, err, "") + ctx.LogE("lockdir-open", LEs{{"Path", lockPath}}, err, func(les LEs) string { + return "Locking directory: opening %s" + lockPath + }) return nil, err } err = unix.Flock(int(dirLock.Fd()), unix.LOCK_EX|unix.LOCK_NB) if err != nil { - ctx.LogE("lockdir", LEs{{"path", lockPath}}, err, "") + ctx.LogE("lockdir-flock", LEs{{"Path", lockPath}}, err, func(les LEs) string { + return "Locking directory: locking %s" + lockPath + }) dirLock.Close() // #nosec G104 return nil, err } diff --git a/src/log.go b/src/log.go index 11703041392dbd5a831fc72cb989402bee5d3322d04b5187a91a058c893f5d28..7048291d750d61c980c56cc05912c4c11a1bc2f64a313c64bcef0f57f5945509 100644 --- a/src/log.go +++ b/src/log.go @@ -94,22 +94,18 @@ fd.WriteString(rec) // #nosec G104 fd.Close() // #nosec G104 } -func (ctx *Ctx) LogD(who string, les LEs, msg string) { +func (ctx *Ctx) LogD(who string, les LEs, msg func(LEs) string) { if !ctx.Debug { return } les = append(LEs{{"Debug", true}, {"Who", who}}, les...) - if msg != "" { - les = append(les, LE{"Msg", msg}) - } + les = append(les, LE{"Msg", msg(les)}) fmt.Fprint(os.Stderr, les.Rec()) } -func (ctx *Ctx) LogI(who string, les LEs, msg string) { +func (ctx *Ctx) LogI(who string, les LEs, msg func(LEs) string) { les = append(LEs{{"Who", who}}, les...) - if msg != "" { - les = append(les, LE{"Msg", msg}) - } + les = append(les, LE{"Msg", msg(les)}) rec := les.Rec() if !ctx.Quiet { fmt.Fprintln(os.Stderr, ctx.HumanizeRec(rec)) @@ -117,11 +113,9 @@ } ctx.Log(rec) } -func (ctx *Ctx) LogE(who string, les LEs, err error, msg string) { +func (ctx *Ctx) LogE(who string, les LEs, err error, msg func(LEs) string) { les = append(LEs{{"Err", err.Error()}, {"Who", who}}, les...) - if msg != "" { - les = append(les, LE{"Msg", msg}) - } + les = append(les, LE{"Msg", msg(les)}) rec := les.Rec() if !ctx.Quiet { fmt.Fprintln(os.Stderr, ctx.HumanizeRec(rec)) diff --git a/src/nice.go b/src/nice.go index b9d81081b4d3800bb13c3fbc19b6bb1ec9dc557daa6652c6f7870c42933e7d3d..602754cd1ec467400019aa502670c8d43a7eaece5397fa41e861a2e0c62b61d9 100644 --- a/src/nice.go +++ b/src/nice.go @@ -106,3 +106,17 @@ return fmt.Sprintf("B+%d", nice-NiceBulk) } return strconv.Itoa(int(nice)) } + +type ByNice []*SPInfo + +func (a ByNice) Len() int { + return len(a) +} + +func (a ByNice) Swap(i, j int) { + a[i], a[j] = a[j], a[i] +} + +func (a ByNice) Less(i, j int) bool { + return a[i].Nice < a[j].Nice +} diff --git a/src/nncp.go b/src/nncp.go index 9cb396b90f52d65ad7174c347c41af1165e8d58b929968e65941a374c82d6bc6..ff2dc39d9c5c4f2f6b27e17910b2261837b641ebc28f42e6dd20fe2f60244090 100644 --- a/src/nncp.go +++ b/src/nncp.go @@ -40,7 +40,7 @@ const Base32Encoded32Len = 52 var ( - Version string = "6.1.0" + Version string = "6.2.0" Base32Codec *base32.Encoding = base32.StdEncoding.WithPadding(base32.NoPadding) ) diff --git a/src/progress.go b/src/progress.go index 9d5053a54f9b94e0862591aebab8a124a8028b08395c997149ae8ac9292306aa..3d8307c8f81725dd11a848ade42cbb288c4441dccd3c81bd0d104d9e0366baf4 100644 --- a/src/progress.go +++ b/src/progress.go @@ -25,7 +25,7 @@ "sync" "time" "github.com/dustin/go-humanize" - "go.cypherpunks.ru/nncp/v5/uilive" + "go.cypherpunks.ru/nncp/v6/uilive" ) func init() { diff --git a/src/sortbynice.go b/src/sortbynice.go deleted file mode 100644 index 94b130b90f0bf62276f701e09617e2f9b1dedcf096c9fa3bc7033d7aad05342c..0000000000000000000000000000000000000000 --- a/src/sortbynice.go +++ /dev/null @@ -1,15 +0,0 @@ -package nncp - -type ByNice []*SPInfo - -func (a ByNice) Len() int { - return len(a) -} - -func (a ByNice) Swap(i, j int) { - a[i], a[j] = a[j], a[i] -} - -func (a ByNice) Less(i, j int) bool { - return a[i].Nice < a[j].Nice -} diff --git a/src/sp.go b/src/sp.go index f9ebde288b377af29cb017dc06ab5b822fa77bdfe3858725babeedfcbf2ac9e6..7a90b06621171285acc72ea5eff30849f7cdc2b4697d579b4a222d384f31f1d8 100644 --- a/src/sp.go +++ b/src/sp.go @@ -21,6 +21,7 @@ import ( "bytes" "crypto/subtle" "errors" + "fmt" "hash" "io" "os" @@ -30,6 +31,7 @@ "sync" "time" xdr "github.com/davecgh/go-xdr/xdr2" + "github.com/dustin/go-humanize" "github.com/flynn/noise" "golang.org/x/crypto/blake2b" ) @@ -277,19 +279,32 @@ } func SPChecker(ctx *Ctx, nodeId *NodeId, appeared, checked chan *[32]byte) { for hshValue := range appeared { + pktName := Base32Codec.EncodeToString(hshValue[:]) les := LEs{ {"XX", string(TRx)}, {"Node", nodeId}, - {"Pkt", Base32Codec.EncodeToString(hshValue[:])}, + {"Pkt", pktName}, } - ctx.LogD("sp-checker", les, "checking") + ctx.LogD("sp-checker", les, func(les LEs) string { + return fmt.Sprintf("Checksumming %s/rx/%s", ctx.NodeName(nodeId), pktName) + }) size, err := ctx.CheckNoCK(nodeId, hshValue) les = append(les, LE{"Size", size}) if err != nil { - ctx.LogE("sp-checker", les, err, "") + ctx.LogE("sp-checker", les, err, func(les LEs) string { + return fmt.Sprintf( + "Checksumming %s/rx/%s (%s)", ctx.NodeName(nodeId), pktName, + humanize.IBytes(uint64(size)), + ) + }) continue } - ctx.LogI("sp-done", les, "") + ctx.LogI("sp-checker-done", les, func(les LEs) string { + return fmt.Sprintf( + "Packet %s is retreived (%s)", + pktName, humanize.IBytes(uint64(size)), + ) + }) go func(hsh *[32]byte) { checked <- hsh }(hshValue) } } @@ -353,19 +368,34 @@ sort.Sort(ByNice(infos)) var payloads [][]byte for _, info := range infos { payloads = append(payloads, MarshalSP(SPTypeInfo, info)) + pktName := Base32Codec.EncodeToString(info.Hash[:]) ctx.LogD("sp-info-our", LEs{ {"Node", nodeId}, - {"Name", Base32Codec.EncodeToString(info.Hash[:])}, + {"Name", pktName}, {"Size", info.Size}, - }, "") + }, func(les LEs) string { + return fmt.Sprintf( + "Our info: %s/tx/%s (%s)", + ctx.NodeName(nodeId), + pktName, + humanize.IBytes(info.Size), + ) + }) } if totalSize > 0 { - ctx.LogI("sp-infos", LEs{ + ctx.LogI("sp-infos-tx", LEs{ {"XX", string(TTx)}, {"Node", nodeId}, {"Pkts", len(payloads)}, {"Size", totalSize}, - }, "") + }, func(les LEs) string { + return fmt.Sprintf( + "We have got for %s: %d packets, %s", + ctx.NodeName(nodeId), + len(payloads), + humanize.IBytes(uint64(totalSize)), + ) + }) } return payloadsSplit(payloads) } @@ -435,30 +465,72 @@ state.dirUnlock() return err } les := LEs{{"Node", nodeId}, {"Nice", int(state.Nice)}} - state.Ctx.LogD("sp-start", les, "sending first message") + state.Ctx.LogD("sp-startI", les, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): sending first message", + state.Node.Name, + NicenessFmt(state.Nice), + ) + }) conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 if err = state.WriteSP(conn, buf, false); err != nil { - state.Ctx.LogE("sp-start", les, err, "") + state.Ctx.LogE("sp-startI", les, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): writing", + state.Node.Name, + NicenessFmt(state.Nice), + ) + }) state.dirUnlock() return err } - state.Ctx.LogD("sp-start", les, "waiting for first message") + state.Ctx.LogD("sp-startI-wait", les, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): waiting for first message", + state.Node.Name, + NicenessFmt(state.Nice), + ) + }) conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 if buf, err = state.ReadSP(conn); err != nil { - state.Ctx.LogE("sp-start", les, err, "") + state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): reading", + state.Node.Name, + NicenessFmt(state.Nice), + ) + }) state.dirUnlock() return err } payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf) if err != nil { - state.Ctx.LogE("sp-start", les, err, "") + state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): reading Noise message", + state.Node.Name, + NicenessFmt(state.Nice), + ) + }) state.dirUnlock() return err } - state.Ctx.LogD("sp-start", les, "starting workers") + state.Ctx.LogD("sp-startI-workers", les, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): starting workers", + state.Node.Name, + NicenessFmt(state.Nice), + ) + }) err = state.StartWorkers(conn, infosPayloads, payload) if err != nil { - state.Ctx.LogE("sp-start", les, err, "") + state.Ctx.LogE("sp-startI-workers", les, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): starting workers", + state.Node.Name, + NicenessFmt(state.Nice), + ) + }) state.dirUnlock() } return err @@ -490,14 +562,21 @@ state.xxOnly = xxOnly var buf []byte var payload []byte - state.Ctx.LogD("sp-start", LEs{{"Nice", int(state.Nice)}}, "waiting for first message") + logMsg := func(les LEs) string { + return fmt.Sprintf( + "SP nice %s: waiting for first message", + NicenessFmt(state.Nice), + ) + } + les := LEs{{"Nice", int(state.Nice)}} + state.Ctx.LogD("sp-startR", les, logMsg) conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 if buf, err = state.ReadSP(conn); err != nil { - state.Ctx.LogE("sp-start", LEs{}, err, "") + state.Ctx.LogE("sp-startR-read", les, err, logMsg) return err } if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil { - state.Ctx.LogE("sp-start", LEs{}, err, "") + state.Ctx.LogE("sp-startR-read", les, err, logMsg) return err } @@ -510,15 +589,16 @@ } } if node == nil { peerId := Base32Codec.EncodeToString(state.hs.PeerStatic()) - state.Ctx.LogE("sp-start", LEs{{"Peer", peerId}}, errors.New("unknown peer"), "") - return errors.New("Unknown peer: " + peerId) + err = errors.New("unknown peer: " + peerId) + state.Ctx.LogE("sp-startR-unknown", append(les, LE{"Peer", peerId}), err, logMsg) + return err } state.Node = node state.rxRate = node.RxRate state.txRate = node.TxRate state.onlineDeadline = node.OnlineDeadline state.maxOnlineTime = node.MaxOnlineTime - les := LEs{{"Node", node.Id}, {"Nice", int(state.Nice)}} + les = LEs{{"Node", node.Id}, {"Nice", int(state.Nice)}} if err = state.Ctx.ensureRxDir(node.Id); err != nil { return err @@ -553,7 +633,12 @@ for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ { firstPayload = append(firstPayload, SPHaltMarshalized...) } - state.Ctx.LogD("sp-start", les, "sending first message") + state.Ctx.LogD("sp-startR-write", les, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): sending first message", + node.Name, NicenessFmt(state.Nice), + ) + }) buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload) if err != nil { state.dirUnlock() @@ -561,11 +646,21 @@ return err } conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 if err = state.WriteSP(conn, buf, false); err != nil { - state.Ctx.LogE("sp-start", les, err, "") + state.Ctx.LogE("sp-startR-write", les, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): writing", + node.Name, NicenessFmt(state.Nice), + ) + }) state.dirUnlock() return err } - state.Ctx.LogD("sp-start", les, "starting workers") + state.Ctx.LogD("sp-startR-workers", les, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): starting workers", + node.Name, NicenessFmt(state.Nice), + ) + }) err = state.StartWorkers(conn, infosPayloads, payload) if err != nil { state.dirUnlock() @@ -633,9 +728,15 @@ state.wg.Add(1) go func() { for _, payload := range infosPayloads[1:] { state.Ctx.LogD( - "sp-work", + "sp-queue-remaining", append(les, LE{"Size", len(payload)}), - "queuing remaining payload", + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): queuing remaining payload (%s)", + state.Node.Name, NicenessFmt(state.Nice), + humanize.IBytes(uint64(len(payload))), + ) + }, ) state.payloads <- payload } @@ -644,23 +745,32 @@ }() } // Processing of first payload and queueing its responses - state.Ctx.LogD( - "sp-work", - append(les, LE{"Size", len(payload)}), - "processing first payload", - ) + logMsg := func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): processing first payload (%s)", + state.Node.Name, NicenessFmt(state.Nice), + humanize.IBytes(uint64(len(payload))), + ) + } + state.Ctx.LogD("sp-process", append(les, LE{"Size", len(payload)}), logMsg) replies, err := state.ProcessSP(payload) if err != nil { - state.Ctx.LogE("sp-work", les, err, "") + state.Ctx.LogE("sp-process", les, err, logMsg) return err } state.wg.Add(1) go func() { for _, reply := range replies { state.Ctx.LogD( - "sp-work", + "sp-queue-reply", append(les, LE{"Size", len(reply)}), - "queuing reply", + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): queuing reply (%s)", + state.Node.Name, NicenessFmt(state.Nice), + humanize.IBytes(uint64(len(payload))), + ) + }, ) state.payloads <- reply } @@ -717,9 +827,15 @@ state.Nice, &state.infosOurSeen, ) { state.Ctx.LogD( - "sp-work", + "sp-queue-info", append(les, LE{"Size", len(payload)}), - "queuing new info", + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): queuing new info (%s)", + state.Node.Name, NicenessFmt(state.Nice), + humanize.IBytes(uint64(len(payload))), + ) + }, ) state.payloads <- payload } @@ -742,14 +858,25 @@ var payload []byte var ping bool select { case <-state.pings: - state.Ctx.LogD("sp-xmit", les, "got ping") + state.Ctx.LogD("sp-got-ping", les, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): got ping", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) payload = SPPingMarshalized ping = true case payload = <-state.payloads: state.Ctx.LogD( - "sp-xmit", + "sp-got-payload", append(les, LE{"Size", len(payload)}), - "got payload", + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): got payload (%s)", + state.Node.Name, NicenessFmt(state.Nice), + humanize.IBytes(uint64(len(payload))), + ) + }, ) default: state.RLock() @@ -763,12 +890,24 @@ state.RUnlock() if state.txRate > 0 { time.Sleep(time.Second / time.Duration(state.txRate)) } - lesp := append(les, LEs{ - {"XX", string(TTx)}, - {"Pkt", Base32Codec.EncodeToString(freq.Hash[:])}, - {"Size", int64(freq.Offset)}, - }...) - state.Ctx.LogD("sp-file", lesp, "queueing") + pktName := Base32Codec.EncodeToString(freq.Hash[:]) + lesp := append( + les, + LE{"XX", string(TTx)}, + LE{"Pkt", pktName}, + LE{"Size", int64(freq.Offset)}, + ) + logMsg := func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): tx/%s (%s)", + state.Node.Name, NicenessFmt(state.Nice), + pktName, + humanize.IBytes(freq.Offset), + ) + } + state.Ctx.LogD("sp-queue", lesp, func(les LEs) string { + return logMsg(les) + ": queueing" + }) pth := filepath.Join( state.Ctx.Spool, state.Node.Id.String(), @@ -779,12 +918,16 @@ fdAndFullSize, exists := state.fds[pth] if !exists { fd, err := os.Open(pth) if err != nil { - state.Ctx.LogE("sp-file", lesp, err, "") + state.Ctx.LogE("sp-queue-open", lesp, err, func(les LEs) string { + return logMsg(les) + ": opening" + }) return } fi, err := fd.Stat() if err != nil { - state.Ctx.LogE("sp-file", lesp, err, "") + state.Ctx.LogE("sp-queue-stat", lesp, err, func(les LEs) string { + return logMsg(les) + ": stating" + }) return } fdAndFullSize = FdAndFullSize{fd: fd, fullSize: fi.Size()} @@ -794,19 +937,34 @@ fd := fdAndFullSize.fd fullSize := fdAndFullSize.fullSize var buf []byte if freq.Offset < uint64(fullSize) { - state.Ctx.LogD("sp-file", lesp, "seeking") + state.Ctx.LogD("sp-file-seek", lesp, func(les LEs) string { + return logMsg(les) + ": seeking" + }) if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil { - state.Ctx.LogE("sp-file", lesp, err, "") + state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string { + return logMsg(les) + ": seeking" + }) return } buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead) n, err := fd.Read(buf) if err != nil { - state.Ctx.LogE("sp-file", lesp, err, "") + state.Ctx.LogE("sp-file-read", lesp, err, func(les LEs) string { + return logMsg(les) + ": reading" + }) return } buf = buf[:n] - state.Ctx.LogD("sp-file", append(lesp, LE{"Size", n}), "read") + state.Ctx.LogD( + "sp-file-read", + append(lesp, LE{"Size", n}), + func(les LEs) string { + return fmt.Sprintf( + "%s: read %s", + logMsg(les), humanize.IBytes(uint64(n)), + ) + }, + ) } state.closeFd(pth) payload = MarshalSP(SPTypeFile, SPFile{ @@ -815,15 +973,16 @@ Offset: freq.Offset, Payload: buf, }) ourSize := freq.Offset + uint64(len(buf)) - lesp = append(lesp, LE{"Size", int64(ourSize)}) - lesp = append(lesp, LE{"FullSize", fullSize}) + lesp = append(lesp, LE{"Size", int64(ourSize)}, LE{"FullSize", fullSize}) if state.Ctx.ShowPrgrs { Progress("Tx", lesp) } state.Lock() if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash { if ourSize == uint64(fullSize) { - state.Ctx.LogD("sp-file", lesp, "finished") + state.Ctx.LogD("sp-file-finished", lesp, func(les LEs) string { + return logMsg(les) + ": finished" + }) if len(state.queueTheir) > 1 { state.queueTheir = state.queueTheir[1:] } else { @@ -833,14 +992,23 @@ } else { state.queueTheir[0].freq.Offset += uint64(len(buf)) } } else { - state.Ctx.LogD("sp-file", lesp, "queue disappeared") + state.Ctx.LogD("sp-file-disappeared", lesp, func(les LEs) string { + return logMsg(les) + ": queue disappeared" + }) } state.Unlock() } - state.Ctx.LogD("sp-xmit", append(les, LE{"Size", len(payload)}), "sending") + logMsg := func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): sending %s", + state.Node.Name, NicenessFmt(state.Nice), + humanize.IBytes(uint64(len(payload))), + ) + } + state.Ctx.LogD("sp-sending", append(les, LE{"Size", len(payload)}), logMsg) conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload), ping); err != nil { - state.Ctx.LogE("sp-xmit", les, err, "") + state.Ctx.LogE("sp-sending", les, err, logMsg) return } } @@ -853,7 +1021,13 @@ for { if state.NotAlive() { break } - state.Ctx.LogD("sp-recv", les, "waiting for payload") + logMsg := func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): waiting for payload", + state.Node.Name, NicenessFmt(state.Nice), + ) + } + state.Ctx.LogD("sp-recv-wait", les, logMsg) conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104 payload, err := state.ReadSP(conn) if err != nil { @@ -867,36 +1041,55 @@ } if unmarshalErr.ErrorCode == xdr.ErrIO { break } - state.Ctx.LogE("sp-recv", les, err, "") + state.Ctx.LogE("sp-recv-wait", les, err, logMsg) break } + logMsg = func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): payload (%s)", + state.Node.Name, NicenessFmt(state.Nice), + humanize.IBytes(uint64(len(payload))), + ) + } state.Ctx.LogD( - "sp-recv", + "sp-recv-got", append(les, LE{"Size", len(payload)}), - "got payload", + func(les LEs) string { return logMsg(les) + ": got" }, ) payload, err = state.csTheir.Decrypt(nil, nil, payload) if err != nil { - state.Ctx.LogE("sp-recv", les, err, "") + state.Ctx.LogE("sp-recv-got", les, err, func(les LEs) string { + return logMsg(les) + ": got" + }) break } state.Ctx.LogD( - "sp-recv", + "sp-recv-process", append(les, LE{"Size", len(payload)}), - "processing", + func(les LEs) string { + return logMsg(les) + ": processing" + }, ) replies, err := state.ProcessSP(payload) if err != nil { - state.Ctx.LogE("sp-recv", les, err, "") + state.Ctx.LogE("sp-recv-process", les, err, func(les LEs) string { + return logMsg(les) + ": processing" + }) break } state.wg.Add(1) go func() { for _, reply := range replies { state.Ctx.LogD( - "sp-recv", - append(les, LE{"Size", len(reply)}), - "queuing reply", + "sp-recv-reply", + append(les[:len(les)-1], LE{"Size", len(reply)}), + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): queuing reply (%s)", + state.Node.Name, NicenessFmt(state.Nice), + humanize.IBytes(uint64(len(reply))), + ) + }, ) state.payloads <- reply } @@ -940,10 +1133,20 @@ var err error var replies [][]byte var infosGot bool for r.Len() > 0 { - state.Ctx.LogD("sp-process", les, "unmarshaling header") + state.Ctx.LogD("sp-process-unmarshal", les, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling header", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) var head SPHead if _, err = xdr.Unmarshal(r, &head); err != nil { - state.Ctx.LogE("sp-process", les, err, "") + state.Ctx.LogE("sp-process-unmarshal", les, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling header", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) return nil, err } if head.Type != SPTypePing { @@ -951,62 +1154,126 @@ state.RxLastNonPing = state.RxLastSeen } switch head.Type { case SPTypeHalt: - state.Ctx.LogD("sp-process", append(les, LE{"Type", "halt"}), "") + state.Ctx.LogD( + "sp-process-halt", + append(les, LE{"Type", "halt"}), func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): got HALT", + state.Node.Name, NicenessFmt(state.Nice), + ) + }, + ) state.Lock() state.queueTheir = nil state.Unlock() case SPTypePing: - state.Ctx.LogD("sp-process", append(les, LE{"Type", "ping"}), "") + state.Ctx.LogD( + "sp-process-ping", + append(les, LE{"Type", "ping"}), + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): got PING", + state.Node.Name, NicenessFmt(state.Nice), + ) + }, + ) case SPTypeInfo: infosGot = true lesp := append(les, LE{"Type", "info"}) - state.Ctx.LogD("sp-process", lesp, "unmarshaling packet") + state.Ctx.LogD( + "sp-process-info-unmarshal", lesp, + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling INFO", + state.Node.Name, NicenessFmt(state.Nice), + ) + }, + ) var info SPInfo if _, err = xdr.Unmarshal(r, &info); err != nil { - state.Ctx.LogE("sp-process", lesp, err, "") + state.Ctx.LogE( + "sp-process-info-unmarshal", lesp, err, + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling INFO", + state.Node.Name, NicenessFmt(state.Nice), + ) + }, + ) return nil, err } - lesp = append(lesp, LEs{ - {"Pkt", Base32Codec.EncodeToString(info.Hash[:])}, - {"Size", int64(info.Size)}, - {"Nice", int(info.Nice)}, - }...) + pktName := Base32Codec.EncodeToString(info.Hash[:]) + lesp = append( + lesp, + LE{"Pkt", pktName}, + LE{"Size", int64(info.Size)}, + LE{"PktNice", int(info.Nice)}, + ) + logMsg := func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): INFO %s (%s) nice %s", + state.Node.Name, NicenessFmt(state.Nice), + pktName, + humanize.IBytes(info.Size), + NicenessFmt(info.Nice), + ) + } if !state.listOnly && info.Nice > state.Nice { - state.Ctx.LogD("sp-process", lesp, "too nice") + state.Ctx.LogD("sp-process-info-too-nice", lesp, func(les LEs) string { + return logMsg(les) + ": too nice" + }) continue } - state.Ctx.LogD("sp-process", lesp, "received") + state.Ctx.LogD("sp-process-info-got", lesp, func(les LEs) string { + return logMsg(les) + ": received" + }) if !state.listOnly && state.xxOnly == TTx { continue } state.Lock() state.infosTheir[*info.Hash] = &info state.Unlock() - state.Ctx.LogD("sp-process", lesp, "stating part") + state.Ctx.LogD("sp-process-info-stat", lesp, func(les LEs) string { + return logMsg(les) + ": stating part" + }) pktPath := filepath.Join( state.Ctx.Spool, state.Node.Id.String(), string(TRx), Base32Codec.EncodeToString(info.Hash[:]), ) + logMsg = func(les LEs) string { + return fmt.Sprintf( + "Packet %s (%s) (nice %s)", + pktName, + humanize.IBytes(info.Size), + NicenessFmt(info.Nice), + ) + } if _, err = os.Stat(pktPath); err == nil { - state.Ctx.LogI("sp-info", lesp, "already done") + state.Ctx.LogI("sp-info-done", lesp, func(les LEs) string { + return logMsg(les) + ": already done" + }) if !state.listOnly { replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash})) } continue } if _, err = os.Stat(pktPath + SeenSuffix); err == nil { - state.Ctx.LogI("sp-info", lesp, "already seen") + state.Ctx.LogI("sp-info-seen", lesp, func(les LEs) string { + return logMsg(les) + ": already seen" + }) if !state.listOnly { replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash})) } continue } if _, err = os.Stat(pktPath + NoCKSuffix); err == nil { - state.Ctx.LogI("sp-info", lesp, "still non checksummed") + state.Ctx.LogI("sp-info-nock", lesp, func(les LEs) string { + return logMsg(les) + ": still not checksummed" + }) continue } fi, err := os.Stat(pktPath + PartSuffix) @@ -1015,10 +1282,20 @@ if err == nil { offset = fi.Size() } if !state.Ctx.IsEnoughSpace(int64(info.Size) - offset) { - state.Ctx.LogI("sp-info", lesp, "not enough space") + state.Ctx.LogI("sp-info-no-space", lesp, func(les LEs) string { + return logMsg(les) + ": not enough space" + }) continue } - state.Ctx.LogI("sp-info", append(lesp, LE{"Offset", offset}), "") + state.Ctx.LogI( + "sp-info", + append(lesp, LE{"Offset", offset}), + func(les LEs) string { + return fmt.Sprintf( + "%s: %d%%", logMsg(les), 100*uint64(offset)/info.Size, + ) + }, + ) if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) { replies = append(replies, MarshalSP( SPTypeFreq, @@ -1028,25 +1305,45 @@ } case SPTypeFile: lesp := append(les, LE{"Type", "file"}) - state.Ctx.LogD("sp-process", lesp, "unmarshaling packet") + state.Ctx.LogD("sp-process-file", lesp, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling FILE", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) var file SPFile if _, err = xdr.Unmarshal(r, &file); err != nil { - state.Ctx.LogE("sp-process", lesp, err, "") + state.Ctx.LogE("sp-process-file", lesp, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling FILE", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) return nil, err } - lesp = append(lesp, LEs{ - {"XX", string(TRx)}, - {"Pkt", Base32Codec.EncodeToString(file.Hash[:])}, - {"Size", len(file.Payload)}, - }...) + pktName := Base32Codec.EncodeToString(file.Hash[:]) + lesp = append( + lesp, + LE{"XX", string(TRx)}, + LE{"Pkt", pktName}, + LE{"Size", len(file.Payload)}, + ) + logMsg := func(les LEs) string { + return fmt.Sprintf( + "Got packet %s (%s)", + pktName, humanize.IBytes(uint64(len(file.Payload))), + ) + } dirToSync := filepath.Join( state.Ctx.Spool, state.Node.Id.String(), string(TRx), ) - filePath := filepath.Join(dirToSync, Base32Codec.EncodeToString(file.Hash[:])) + filePath := filepath.Join(dirToSync, pktName) filePathPart := filePath + PartSuffix - state.Ctx.LogD("sp-file", lesp, "opening part") + state.Ctx.LogD("sp-file-open", lesp, func(les LEs) string { + return logMsg(les) + ": opening part" + }) fdAndFullSize, exists := state.fds[filePathPart] var fd *os.File if exists { @@ -1058,7 +1355,9 @@ os.O_RDWR|os.O_CREATE, os.FileMode(0666), ) if err != nil { - state.Ctx.LogE("sp-file", lesp, err, "") + state.Ctx.LogE("sp-file-open", lesp, err, func(les LEs) string { + return logMsg(les) + ": opening part" + }) return nil, err } state.fds[filePathPart] = FdAndFullSize{fd: fd} @@ -1070,15 +1369,26 @@ } state.fileHashers[filePath] = &HasherAndOffset{h: h} } } - state.Ctx.LogD("sp-file", append(lesp, LE{"Offset", file.Offset}), "seeking") + state.Ctx.LogD( + "sp-file-seek", + append(lesp, LE{"Offset", file.Offset}), + func(les LEs) string { + return fmt.Sprintf("%s: seeking %d", logMsg(les), file.Offset) + }) if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil { - state.Ctx.LogE("sp-file", lesp, err, "") + state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string { + return logMsg(les) + ": seeking" + }) state.closeFd(filePathPart) return nil, err } - state.Ctx.LogD("sp-file", lesp, "writing") + state.Ctx.LogD("sp-file-write", lesp, func(les LEs) string { + return logMsg(les) + ": writing" + }) if _, err = fd.Write(file.Payload); err != nil { - state.Ctx.LogE("sp-file", lesp, err, "") + state.Ctx.LogE("sp-file-write", lesp, err, func(les LEs) string { + return logMsg(les) + ": writing" + }) state.closeFd(filePathPart) return nil, err } @@ -1090,10 +1400,11 @@ panic(err) } hasherAndOffset.offset += uint64(len(file.Payload)) } else { - state.Ctx.LogE( - "sp-file", lesp, - errors.New("offset differs"), - "deleting hasher", + state.Ctx.LogD( + "sp-file-offset-differs", lesp, + func(les LEs) string { + return logMsg(les) + ": offset differs, deleting hasher" + }, ) delete(state.fileHashers, filePath) hasherExists = false @@ -1115,26 +1426,46 @@ } if fullsize != ourSize { continue } + logMsg = func(les LEs) string { + return fmt.Sprintf( + "Got packet %s %d%% (%s / %s)", + pktName, 100*ourSize/fullsize, + humanize.IBytes(uint64(ourSize)), + humanize.IBytes(uint64(fullsize)), + ) + } err = fd.Sync() if err != nil { - state.Ctx.LogE("sp-file", lesp, err, "sync") + state.Ctx.LogE("sp-file-sync", lesp, err, func(les LEs) string { + return logMsg(les) + ": syncing" + }) state.closeFd(filePathPart) continue } if hasherExists { if bytes.Compare(hasherAndOffset.h.Sum(nil), file.Hash[:]) != 0 { - state.Ctx.LogE("sp-file", lesp, errors.New("checksum mismatch"), "") + state.Ctx.LogE( + "sp-file-bad-checksum", lesp, + errors.New("checksum mismatch"), + logMsg, + ) continue } if err = os.Rename(filePathPart, filePath); err != nil { - state.Ctx.LogE("sp-file", lesp, err, "rename") + state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string { + return logMsg(les) + ": renaming" + }) continue } if err = DirSync(dirToSync); err != nil { - state.Ctx.LogE("sp-file", lesp, err, "sync") + state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string { + return logMsg(les) + ": dirsyncing" + }) continue } - state.Ctx.LogI("sp-file", lesp, "done") + state.Ctx.LogI("sp-file-done", lesp, func(les LEs) string { + return logMsg(les) + ": done" + }) state.wg.Add(1) go func() { state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash}) @@ -1148,14 +1479,18 @@ state.closeFd(filePathPart) continue } if _, err = fd.Seek(0, io.SeekStart); err != nil { - state.Ctx.LogE("sp-file", lesp, err, "seek") + state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string { + return logMsg(les) + ": seeking" + }) state.closeFd(filePathPart) continue } _, pktEncRaw, err := state.Ctx.HdrRead(fd) state.closeFd(filePathPart) if err != nil { - state.Ctx.LogE("sp-file", lesp, err, "HdrRead") + state.Ctx.LogE("sp-file-hdr-read", lesp, err, func(les LEs) string { + return logMsg(les) + ": HdrReading" + }) continue } state.Ctx.HdrWrite(pktEncRaw, filePath) @@ -1163,14 +1498,20 @@ continue } state.closeFd(filePathPart) if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil { - state.Ctx.LogE("sp-file", lesp, err, "rename") + state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string { + return logMsg(les) + ": renaming" + }) continue } if err = DirSync(dirToSync); err != nil { - state.Ctx.LogE("sp-file", lesp, err, "sync") + state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string { + return logMsg(les) + ": dirsyncing" + }) continue } - state.Ctx.LogI("sp-file", lesp, "downloaded") + state.Ctx.LogI("sp-file-downloaded", lesp, func(les LEs) string { + return logMsg(les) + ": downloaded" + }) state.Lock() delete(state.infosTheir, *file.Hash) state.Unlock() @@ -1180,41 +1521,74 @@ } case SPTypeDone: lesp := append(les, LE{"Type", "done"}) - state.Ctx.LogD("sp-process", lesp, "unmarshaling packet") + state.Ctx.LogD("sp-process-done-unmarshal", lesp, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling DONE", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) var done SPDone if _, err = xdr.Unmarshal(r, &done); err != nil { - state.Ctx.LogE("sp-process", lesp, err, "") + state.Ctx.LogE("sp-process-done-unmarshal", lesp, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling DONE", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) return nil, err } - lesp = append(lesp, LE{"Pkt", Base32Codec.EncodeToString(done.Hash[:])}) - lesp = append(lesp, LE{"XX", string(TTx)}) - state.Ctx.LogD("sp-done", lesp, "removing") + pktName := Base32Codec.EncodeToString(done.Hash[:]) + lesp = append(lesp, LE{"Pkt", pktName}, LE{"XX", string(TTx)}) + logMsg := func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): DONE: removing %s", + state.Node.Name, NicenessFmt(state.Nice), pktName, + ) + } + state.Ctx.LogD("sp-done", lesp, logMsg) pth := filepath.Join( state.Ctx.Spool, state.Node.Id.String(), string(TTx), - Base32Codec.EncodeToString(done.Hash[:]), + pktName, ) if err = os.Remove(pth); err == nil { - state.Ctx.LogI("sp-done", lesp, "") + state.Ctx.LogI("sp-done", lesp, func(les LEs) string { + return fmt.Sprintf("Packet %s is sent", pktName) + }) if state.Ctx.HdrUsage { os.Remove(pth + HdrSuffix) } } else { - state.Ctx.LogE("sp-done", lesp, err, "") + state.Ctx.LogE("sp-done", lesp, err, logMsg) } case SPTypeFreq: lesp := append(les, LE{"Type", "freq"}) - state.Ctx.LogD("sp-process", lesp, "unmarshaling packet") + state.Ctx.LogD("sp-process-freq", lesp, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling FREQ", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) var freq SPFreq if _, err = xdr.Unmarshal(r, &freq); err != nil { - state.Ctx.LogE("sp-process", lesp, err, "") + state.Ctx.LogE("sp-process-freq", lesp, err, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): unmarshaling FREQ", + state.Node.Name, NicenessFmt(state.Nice), + ) + }) return nil, err } - lesp = append(lesp, LE{"Pkt", Base32Codec.EncodeToString(freq.Hash[:])}) - lesp = append(lesp, LE{"Offset", freq.Offset}) - state.Ctx.LogD("sp-process", lesp, "queueing") + pktName := Base32Codec.EncodeToString(freq.Hash[:]) + lesp = append(lesp, LE{"Pkt", pktName}, LE{"Offset", freq.Offset}) + state.Ctx.LogD("sp-process-freq-queueing", lesp, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): FREQ %s: queuing", + state.Node.Name, NicenessFmt(state.Nice), pktName, + ) + }) nice, exists := state.infosOurSeen[*freq.Hash] if exists { if state.onlyPkts == nil || !state.onlyPkts[*freq.Hash] { @@ -1231,22 +1605,38 @@ copy(state.queueTheir[insertIdx+1:], state.queueTheir[insertIdx:]) state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice} state.Unlock() } else { - state.Ctx.LogD("sp-process", lesp, "skipping") + state.Ctx.LogD("sp-process-freq-skip", lesp, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): FREQ %s: skipping", + state.Node.Name, NicenessFmt(state.Nice), pktName, + ) + }) } } else { - state.Ctx.LogD("sp-process", lesp, "unknown") + state.Ctx.LogD("sp-process-freq-unknown", lesp, func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): FREQ %s: unknown", + state.Node.Name, NicenessFmt(state.Nice), pktName, + ) + }) } default: state.Ctx.LogE( - "sp-process", + "sp-process-type-unknown", append(les, LE{"Type", head.Type}), errors.New("unknown type"), - "", + func(les LEs) string { + return fmt.Sprintf( + "SP with %s (nice %s): %d", + state.Node.Name, NicenessFmt(state.Nice), head.Type, + ) + }, ) return nil, BadPktType } } + if infosGot { var pkts int var size uint64 @@ -1256,12 +1646,17 @@ pkts++ size += info.Size } state.RUnlock() - state.Ctx.LogI("sp-infos", LEs{ + state.Ctx.LogI("sp-infos-rx", LEs{ {"XX", string(TRx)}, {"Node", state.Node.Id}, {"Pkts", pkts}, {"Size", int64(size)}, - }, "") + }, func(les LEs) string { + return fmt.Sprintf( + "%s has got for us: %d packets, %s", + state.Node.Name, pkts, humanize.IBytes(size), + ) + }) } return payloadsSplit(replies), nil } diff --git a/src/tmp.go b/src/tmp.go index 52b5aacfc021347518788284250ae2c6bbc31eac54fde4512ca93d0c7cdd800a..c223b70baad2a3234f0653bec01ffe219dbf3d5a8452092ddcaa357f8b248f63 100644 --- a/src/tmp.go +++ b/src/tmp.go @@ -19,6 +19,7 @@ package nncp import ( "bufio" + "fmt" "hash" "io" "os" @@ -44,7 +45,9 @@ return nil, err } fd, err := TempFile(jobsPath, "") if err == nil { - ctx.LogD("tmp", LEs{{"Src", fd.Name()}}, "created") + ctx.LogD("tmp", LEs{{"Src", fd.Name()}}, func(les LEs) string { + return "Temporary file created: %s" + fd.Name() + }) } return fd, err } @@ -113,7 +116,13 @@ if err = tmp.Fd.Close(); err != nil { return err } checksum := tmp.Checksum() - tmp.ctx.LogD("tmp", LEs{{"Src", tmp.Fd.Name()}, {"Dst", checksum}}, "commit") + tmp.ctx.LogD( + "tmp-rename", + LEs{{"Src", tmp.Fd.Name()}, {"Dst", checksum}}, + func(les LEs) string { + return fmt.Sprintf("Temporary file: %s -> %s", tmp.Fd.Name(), checksum) + }, + ) if err = os.Rename(tmp.Fd.Name(), filepath.Join(dir, checksum)); err != nil { return err } diff --git a/src/toss.go b/src/toss.go index aa74b819d75813f2805db5459a418278b7f6bfd472943bcf5083590faf82bad3..d11506aa9fa268af0b3654e705ba00671551d879271859f46c9588b0f6db1b98 100644 --- a/src/toss.go +++ b/src/toss.go @@ -53,13 +53,14 @@ "To: " + fromTo.To, "Subject: " + mime.BEncoding.Encode("UTF-8", subject), } if len(body) > 0 { - lines = append(lines, []string{ + lines = append( + lines, "MIME-Version: 1.0", "Content-Type: text/plain; charset=utf-8", "Content-Transfer-Encoding: base64", "", base64.StdEncoding.EncodeToString(body), - }...) + ) } return strings.NewReader(strings.Join(lines, "\n")) } @@ -71,7 +72,6 @@ dryRun, doSeen, noFile, noFreq, noExec, noTrns bool, ) bool { dirLock, err := ctx.LockDir(nodeId, "toss") if err != nil { - ctx.LogE("rx", LEs{}, err, "lock") return false } defer ctx.UnlockDir(dirLock) @@ -84,14 +84,29 @@ } defer decompressor.Close() for job := range ctx.Jobs(nodeId, TRx) { pktName := filepath.Base(job.Path) - les := LEs{{"Node", job.PktEnc.Sender}, {"Pkt", pktName}} + les := LEs{ + {"Node", job.PktEnc.Sender}, + {"Pkt", pktName}, + {"Nice", int(job.PktEnc.Nice)}, + } if job.PktEnc.Nice > nice { - ctx.LogD("rx", append(les, LE{"Nice", int(job.PktEnc.Nice)}), "too nice") + ctx.LogD("rx-too-nice", les, func(les LEs) string { + return fmt.Sprintf( + "Tossing %s/%s: too nice: %s", + ctx.NodeName(job.PktEnc.Sender), pktName, + NicenessFmt(job.PktEnc.Nice), + ) + }) continue } fd, err := os.Open(job.Path) if err != nil { - ctx.LogE("rx", les, err, "open") + ctx.LogE("rx-open", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing %s/%s: opening %s", + ctx.NodeName(job.PktEnc.Sender), pktName, job.Path, + ) + }) isBad = true continue } @@ -113,7 +128,12 @@ var pkt Pkt var pktSize int64 var pktSizeBlocks int64 if _, err = xdr.Unmarshal(pipeR, &pkt); err != nil { - ctx.LogE("rx", les, err, "unmarshal") + ctx.LogE("rx-unmarshal", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing %s/%s: unmarshal", + ctx.NodeName(job.PktEnc.Sender), pktName, + ) + }) isBad = true goto Closing } @@ -124,7 +144,14 @@ pktSize -= poly1305.TagSize } pktSize -= pktSizeBlocks * poly1305.TagSize les = append(les, LE{"Size", pktSize}) - ctx.LogD("rx", les, "taken") + ctx.LogD("rx", les, func(les LEs) string { + return fmt.Sprintf( + "Tossing %s/%s (%s)", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), + ) + }) + switch pkt.Type { case PktTypeExec, PktTypeExecFat: if noExec { @@ -137,14 +164,20 @@ for _, p := range path[1:] { args = append(args, string(p)) } argsStr := strings.Join(append([]string{handle}, args...), " ") - les = append(les, LEs{ - {"Type", "exec"}, - {"Dst", argsStr}, - }...) + les = append(les, LE{"Type", "exec"}, LE{"Dst", argsStr}) sender := ctx.Neigh[*job.PktEnc.Sender] cmdline, exists := sender.Exec[handle] if !exists || len(cmdline) == 0 { - ctx.LogE("rx", les, errors.New("No handle found"), "") + ctx.LogE( + "rx-no-handle", les, errors.New("No handle found"), + func(les LEs) string { + return fmt.Sprintf( + "Tossing exec %s/%s (%s): %s", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), argsStr, + ) + }, + ) isBad = true goto Closing } @@ -154,10 +187,7 @@ log.Fatalln(err) } } if !dryRun { - cmd := exec.Command( - cmdline[0], - append(cmdline[1:], args...)..., - ) + cmd := exec.Command(cmdline[0], append(cmdline[1:], args...)...) cmd.Env = append( cmd.Env, "NNCP_SELF="+ctx.Self.Id.String(), @@ -171,7 +201,13 @@ cmd.Stdin = pipeR } output, err := cmd.Output() if err != nil { - ctx.LogE("rx", les, err, "handle") + ctx.LogE("rx-hande", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing exec %s/%s (%s): %s: handling", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), argsStr, + ) + }) isBad = true goto Closing } @@ -189,12 +225,24 @@ cmd.Stdin = newNotification(notify, fmt.Sprintf( "Exec from %s: %s", sender.Name, argsStr, ), output) if err = cmd.Run(); err != nil { - ctx.LogE("rx", les, err, "notify") + ctx.LogE("rx-notify", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing exec %s/%s (%s): %s: notifying", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), argsStr, + ) + }) } } } } - ctx.LogI("rx", les, "") + ctx.LogI("rx", les, func(les LEs) string { + return fmt.Sprintf( + "Got exec from %s to %s (%s)", + ctx.NodeName(job.PktEnc.Sender), argsStr, + humanize.IBytes(uint64(pktSize)), + ) + }) if !dryRun { if doSeen { if fd, err := os.Create(job.Path + SeenSuffix); err == nil { @@ -202,68 +250,135 @@ fd.Close() // #nosec G104 } } if err = os.Remove(job.Path); err != nil { - ctx.LogE("rx", les, err, "remove") + ctx.LogE("rx-notify", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing exec %s/%s (%s): %s: notifying", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), argsStr, + ) + }) isBad = true } else if ctx.HdrUsage { os.Remove(job.Path + HdrSuffix) } } + case PktTypeFile: if noFile { goto Closing } dst := string(pkt.Path[:int(pkt.PathLen)]) - les = append(les, LEs{{"Type", "file"}, {"Dst", dst}}...) + les = append(les, LE{"Type", "file"}, LE{"Dst", dst}) if filepath.IsAbs(dst) { - ctx.LogE("rx", les, errors.New("non-relative destination path"), "") + ctx.LogE( + "rx-non-rel", les, errors.New("non-relative destination path"), + func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), dst, + ) + }, + ) isBad = true goto Closing } incoming := ctx.Neigh[*job.PktEnc.Sender].Incoming if incoming == nil { - ctx.LogE("rx", les, errors.New("incoming is not allowed"), "") + ctx.LogE( + "rx-no-incoming", les, errors.New("incoming is not allowed"), + func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), dst, + ) + }, + ) isBad = true goto Closing } dir := filepath.Join(*incoming, path.Dir(dst)) if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil { - ctx.LogE("rx", les, err, "mkdir") + ctx.LogE("rx-mkdir", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: mkdir", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), dst, + ) + }) isBad = true goto Closing } if !dryRun { tmp, err := TempFile(dir, "file") if err != nil { - ctx.LogE("rx", les, err, "mktemp") + ctx.LogE("rx-mktemp", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: mktemp", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), dst, + ) + }) isBad = true goto Closing } les = append(les, LE{"Tmp", tmp.Name()}) - ctx.LogD("rx", les, "created") + ctx.LogD("rx-tmp-created", les, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: created: %s", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), dst, tmp.Name(), + ) + }) bufW := bufio.NewWriter(tmp) if _, err = CopyProgressed( bufW, pipeR, "Rx file", append(les, LE{"FullSize", pktSize}), ctx.ShowPrgrs, ); err != nil { - ctx.LogE("rx", les, err, "copy") + ctx.LogE("rx-copy", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: copying", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), dst, + ) + }) isBad = true goto Closing } if err = bufW.Flush(); err != nil { tmp.Close() // #nosec G104 - ctx.LogE("rx", les, err, "copy") + ctx.LogE("rx-flush", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: flushing", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), dst, + ) + }) isBad = true goto Closing } if err = tmp.Sync(); err != nil { tmp.Close() // #nosec G104 - ctx.LogE("rx", les, err, "copy") + ctx.LogE("rx-sync", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: syncing", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), dst, + ) + }) isBad = true goto Closing } if err = tmp.Close(); err != nil { - ctx.LogE("rx", les, err, "copy") + ctx.LogE("rx-close", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: closing", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), dst, + ) + }) isBad = true goto Closing } @@ -275,7 +390,13 @@ if _, err = os.Stat(dstPath); err != nil { if os.IsNotExist(err) { break } - ctx.LogE("rx", les, err, "stat") + ctx.LogE("rx-stat", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: stating: %s", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), dst, dstPath, + ) + }) isBad = true goto Closing } @@ -283,16 +404,34 @@ dstPath = dstPathOrig + "." + strconv.Itoa(dstPathCtr) dstPathCtr++ } if err = os.Rename(tmp.Name(), dstPath); err != nil { - ctx.LogE("rx", les, err, "rename") + ctx.LogE("rx-rename", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: renaming", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), dst, + ) + }) isBad = true } if err = DirSync(*incoming); err != nil { - ctx.LogE("rx", les, err, "sync") + ctx.LogE("rx-dirsync", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: dirsyncing", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), dst, + ) + }) isBad = true } les = les[:len(les)-1] // delete Tmp } - ctx.LogI("rx", les, "") + ctx.LogI("rx", les, func(les LEs) string { + return fmt.Sprintf( + "Got file %s (%s) from %s", + dst, humanize.IBytes(uint64(pktSize)), + ctx.NodeName(job.PktEnc.Sender), + ) + }) if !dryRun { if doSeen { if fd, err := os.Create(job.Path + SeenSuffix); err == nil { @@ -300,7 +439,13 @@ fd.Close() // #nosec G104 } } if err = os.Remove(job.Path); err != nil { - ctx.LogE("rx", les, err, "remove") + ctx.LogE("rx-remove", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: removing", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), dst, + ) + }) isBad = true } else if ctx.HdrUsage { os.Remove(job.Path + HdrSuffix) @@ -317,24 +462,46 @@ dst, humanize.IBytes(uint64(pktSize)), ), nil) if err = cmd.Run(); err != nil { - ctx.LogE("rx", les, err, "notify") + ctx.LogE("rx-notify", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing file %s/%s (%s): %s: notifying", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), dst, + ) + }) } } } + case PktTypeFreq: if noFreq { goto Closing } src := string(pkt.Path[:int(pkt.PathLen)]) + les := append(les, LE{"Type", "freq"}, LE{"Src", src}) if filepath.IsAbs(src) { - ctx.LogE("rx", les, errors.New("non-relative source path"), "") + ctx.LogE( + "rx-non-rel", les, errors.New("non-relative source path"), + func(les LEs) string { + return fmt.Sprintf( + "Tossing freq %s/%s (%s): %s: notifying", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), src, + ) + }, + ) isBad = true goto Closing } - les := append(les, LEs{{"Type", "freq"}, {"Src", src}}...) dstRaw, err := ioutil.ReadAll(pipeR) if err != nil { - ctx.LogE("rx", les, err, "read") + ctx.LogE("rx-read", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing freq %s/%s (%s): %s: reading", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), src, + ) + }) isBad = true goto Closing } @@ -343,7 +510,16 @@ les = append(les, LE{"Dst", dst}) sender := ctx.Neigh[*job.PktEnc.Sender] freqPath := sender.FreqPath if freqPath == nil { - ctx.LogE("rx", les, errors.New("freqing is not allowed"), "") + ctx.LogE( + "rx-no-freq", les, errors.New("freqing is not allowed"), + func(les LEs) string { + return fmt.Sprintf( + "Tossing freq %s/%s (%s): %s -> %s", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), src, dst, + ) + }, + ) isBad = true goto Closing } @@ -358,12 +534,23 @@ sender.FreqMinSize, sender.FreqMaxSize, ) if err != nil { - ctx.LogE("rx", les, err, "tx file") + ctx.LogE("rx-tx", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing freq %s/%s (%s): %s -> %s: txing", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), src, dst, + ) + }) isBad = true goto Closing } } - ctx.LogI("rx", les, "") + ctx.LogI("rx", les, func(les LEs) string { + return fmt.Sprintf( + "Got file request %s to %s", + src, ctx.NodeName(job.PktEnc.Sender), + ) + }) if !dryRun { if doSeen { if fd, err := os.Create(job.Path + SeenSuffix); err == nil { @@ -371,7 +558,13 @@ fd.Close() // #nosec G104 } } if err = os.Remove(job.Path); err != nil { - ctx.LogE("rx", les, err, "remove") + ctx.LogE("rx-remove", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing freq %s/%s (%s): %s -> %s: removing", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), src, dst, + ) + }) isBad = true } else if ctx.HdrUsage { os.Remove(job.Path + HdrSuffix) @@ -385,10 +578,17 @@ cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf( "Freq from %s: %s", sender.Name, src, ), nil) if err = cmd.Run(); err != nil { - ctx.LogE("rx", les, err, "notify") + ctx.LogE("rx-notify", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing freq %s/%s (%s): %s -> %s: notifying", + ctx.NodeName(job.PktEnc.Sender), pktName, + humanize.IBytes(uint64(pktSize)), src, dst, + ) + }) } } } + case PktTypeTrns: if noTrns { goto Closing @@ -397,21 +597,39 @@ dst := new([blake2b.Size256]byte) copy(dst[:], pkt.Path[:int(pkt.PathLen)]) nodeId := NodeId(*dst) node, known := ctx.Neigh[nodeId] - les := append(les, LEs{{"Type", "trns"}, {"Dst", nodeId}}...) + les := append(les, LE{"Type", "trns"}, LE{"Dst", nodeId}) + logMsg := func(les LEs) string { + return fmt.Sprintf( + "Tossing trns %s/%s (%s): %s", + ctx.NodeName(job.PktEnc.Sender), + pktName, + humanize.IBytes(uint64(pktSize)), + nodeId.String(), + ) + } if !known { - ctx.LogE("rx", les, errors.New("unknown node"), "") + ctx.LogE("rx-unknown", les, errors.New("unknown node"), logMsg) isBad = true goto Closing } - ctx.LogD("rx", les, "taken") + ctx.LogD("rx-tx", les, logMsg) if !dryRun { if err = ctx.TxTrns(node, job.PktEnc.Nice, pktSize, pipeR); err != nil { - ctx.LogE("rx", les, err, "tx trns") + ctx.LogE("rx", les, err, func(les LEs) string { + return logMsg(les) + ": txing" + }) isBad = true goto Closing } } - ctx.LogI("rx", les, "") + ctx.LogI("rx", les, func(les LEs) string { + return fmt.Sprintf( + "Got transitional packet from %s to %s (%s)", + ctx.NodeName(job.PktEnc.Sender), + ctx.NodeName(&nodeId), + humanize.IBytes(uint64(pktSize)), + ) + }) if !dryRun { if doSeen { if fd, err := os.Create(job.Path + SeenSuffix); err == nil { @@ -419,14 +637,33 @@ fd.Close() // #nosec G104 } } if err = os.Remove(job.Path); err != nil { - ctx.LogE("rx", les, err, "remove") + ctx.LogE("rx", les, err, func(les LEs) string { + return fmt.Sprintf( + "Tossing trns %s/%s (%s): %s: removing", + ctx.NodeName(job.PktEnc.Sender), + pktName, + humanize.IBytes(uint64(pktSize)), + ctx.NodeName(&nodeId), + ) + }) isBad = true } else if ctx.HdrUsage { os.Remove(job.Path + HdrSuffix) } } + default: - ctx.LogE("rx", les, errors.New("unknown type"), "") + ctx.LogE( + "rx-type-unknown", les, errors.New("unknown type"), + func(les LEs) string { + return fmt.Sprintf( + "Tossing %s/%s (%s)", + ctx.NodeName(job.PktEnc.Sender), + pktName, + humanize.IBytes(uint64(pktSize)), + ) + }, + ) isBad = true } Closing: diff --git a/src/tx.go b/src/tx.go index 541e7ac5cec324122b8e4293e38ce319bcf0be7c127a3e8dee4a8a2e3ee8ff4d..96207e7513ec0639f46df736b6a1a44f0575a81565db3b73619b6b218b811b5e 100644 --- a/src/tx.go +++ b/src/tx.go @@ -23,6 +23,7 @@ "bufio" "bytes" "crypto/rand" "errors" + "fmt" "hash" "io" "io/ioutil" @@ -33,6 +34,7 @@ "strings" "time" xdr "github.com/davecgh/go-xdr/xdr2" + "github.com/dustin/go-humanize" "github.com/klauspost/compress/zstd" "golang.org/x/crypto/blake2b" "golang.org/x/crypto/chacha20poly1305" @@ -87,7 +89,14 @@ ctx.LogD("tx", LEs{ {"Node", hops[0].Id}, {"Nice", int(nice)}, {"Size", size}, - }, "wrote") + }, func(les LEs) string { + return fmt.Sprintf( + "Tx packet to %s (%s) nice: %s", + ctx.NodeName(hops[0].Id), + humanize.IBytes(uint64(size)), + NicenessFmt(nice), + ) + }) pktEncRaw, err = PktEncWrite( ctx.Self, hops[0], pkt, nice, size, padSize, src, dst, ) @@ -109,7 +118,14 @@ ctx.LogD("tx", LEs{ {"Node", node.Id}, {"Nice", int(nice)}, {"Size", size}, - }, "trns wrote") + }, func(les LEs) string { + return fmt.Sprintf( + "Tx trns packet to %s (%s) nice: %s", + ctx.NodeName(node.Id), + humanize.IBytes(uint64(size)), + NicenessFmt(nice), + ) + }) _, err := PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst) errs <- err dst.Close() // #nosec G104 @@ -371,10 +387,19 @@ {"Src", srcPath}, {"Dst", dstPath}, {"Size", fileSize}, } + logMsg := func(les LEs) string { + return fmt.Sprintf( + "File %s (%s) sent to %s:%s", + srcPath, + humanize.IBytes(uint64(fileSize)), + ctx.NodeName(node.Id), + dstPath, + ) + } if err == nil { - ctx.LogI("tx", les, "sent") + ctx.LogI("tx", les, logMsg) } else { - ctx.LogE("tx", les, err, "sent") + ctx.LogE("tx", les, err, logMsg) } return err } @@ -427,10 +452,19 @@ {"Src", srcPath}, {"Dst", path}, {"Size", sizeToSend}, } + logMsg := func(les LEs) string { + return fmt.Sprintf( + "File %s (%s) sent to %s:%s", + srcPath, + humanize.IBytes(uint64(sizeToSend)), + ctx.NodeName(node.Id), + path, + ) + } if err == nil { - ctx.LogI("tx", les, "sent") + ctx.LogI("tx", les, logMsg) } else { - ctx.LogE("tx", les, err, "sent") + ctx.LogE("tx", les, err, logMsg) return err } hsh.Sum(metaPkt.Checksums[chunkNum][:0]) @@ -460,10 +494,19 @@ {"Src", srcPath}, {"Dst", path}, {"Size", metaPktSize}, } + logMsg := func(les LEs) string { + return fmt.Sprintf( + "File %s (%s) sent to %s:%s", + srcPath, + humanize.IBytes(uint64(metaPktSize)), + ctx.NodeName(node.Id), + path, + ) + } if err == nil { - ctx.LogI("tx", les, "sent") + ctx.LogI("tx", les, logMsg) } else { - ctx.LogE("tx", les, err, "sent") + ctx.LogE("tx", les, err, logMsg) } return err } @@ -496,10 +539,17 @@ {"ReplyNice", int(replyNice)}, {"Src", srcPath}, {"Dst", dstPath}, } + logMsg := func(les LEs) string { + return fmt.Sprintf( + "File request from %s:%s to %s sent", + ctx.NodeName(node.Id), srcPath, + dstPath, + ) + } if err == nil { - ctx.LogI("tx", les, "sent") + ctx.LogI("tx", les, logMsg) } else { - ctx.LogE("tx", les, err, "sent") + ctx.LogE("tx", les, err, logMsg) } return err } @@ -599,18 +649,25 @@ size = fileSize _, err = ctx.Tx(node, pkt, nice, size, minSize, tmpReader, handle) } + dst := strings.Join(append([]string{handle}, args...), " ") les := LEs{ {"Type", "exec"}, {"Node", node.Id}, {"Nice", int(nice)}, {"ReplyNice", int(replyNice)}, - {"Dst", strings.Join(append([]string{handle}, args...), " ")}, + {"Dst", dst}, {"Size", size}, } + logMsg := func(les LEs) string { + return fmt.Sprintf( + "Exec sent to %s@%s (%s)", + ctx.NodeName(node.Id), dst, humanize.IBytes(uint64(size)), + ) + } if err == nil { - ctx.LogI("tx", les, "sent") + ctx.LogI("tx", les, logMsg) } else { - ctx.LogE("tx", les, err, "sent") + ctx.LogE("tx", les, err, logMsg) } return err } @@ -622,10 +679,18 @@ {"Node", node.Id}, {"Nice", int(nice)}, {"Size", size}, } - ctx.LogD("tx", les, "taken") + logMsg := func(les LEs) string { + return fmt.Sprintf( + "Transitional packet to %s (%s) (nice %s)", + ctx.NodeName(node.Id), + humanize.IBytes(uint64(size)), + NicenessFmt(nice), + ) + } + ctx.LogD("tx", les, logMsg) if !ctx.IsEnoughSpace(size) { err := errors.New("is not enough space") - ctx.LogE("tx", les, err, err.Error()) + ctx.LogE("tx", les, err, logMsg) return err } tmp, err := ctx.NewTmpFileWHash() @@ -642,9 +707,9 @@ } nodePath := filepath.Join(ctx.Spool, node.Id.String()) err = tmp.Commit(filepath.Join(nodePath, string(TTx))) if err == nil { - ctx.LogI("tx", les, "sent") + ctx.LogI("tx", les, logMsg) } else { - ctx.LogI("tx", append(les, LE{"Err", err}), "sent") + ctx.LogI("tx", append(les, LE{"Err", err}), logMsg) } os.Symlink(nodePath, filepath.Join(ctx.Spool, node.Name)) // #nosec G104 return err