doc/cmd/nncp-ack.texi | 21 ++++++++++++++++++--- doc/cmd/nncp-cfgdir.texi | 2 +- doc/cmd/nncp-hash.texi | 2 +- doc/cmd/nncp-rm.texi | 14 ++++++++++---- doc/download.texi | 7 +++++++ doc/news.ru.texi | 24 +++++++++++++++++++++++- doc/news.texi | 24 +++++++++++++++++++++++- ports/nncp/Makefile | 2 +- src/cmd/nncp-ack/main.go | 107 +++++++++++++++++++++++++++++++++++++++++++++++++++-- src/cmd/nncp-rm/main.go | 45 +++++++++++++++++++++++++++++++++------------ src/cmd/nncp-trns/main.go | 2 +- src/nncp.go | 2 +- src/toss.go | 6 ++++-- src/tx.go | 54 ++++++++++++++++++++++++++++++++--------------------- src/tx_test.go | 2 +- diff --git a/doc/cmd/nncp-ack.texi b/doc/cmd/nncp-ack.texi index 45dcb9bada403d6a052584ca6c34811c3d841d63a06d691c5eaec6ed71e27921..16f79200f712261af7d37f67b9354b7a0a047f60c913e037646fccc774f43893 100644 --- a/doc/cmd/nncp-ack.texi +++ b/doc/cmd/nncp-ack.texi @@ -7,12 +7,19 @@ @example $ nncp-ack [options] -all $ nncp-ack [options] -node NODE[,@dots{}] $ nncp-ack [options] -node NODE -pkt PKT + +$ nncp-ack [@dots{}] 4>&1 >&2 | nncp-rm [@dots{}] -pkt @end example Send acknowledgement of successful @option{PKT} (Base32-encoded hash) packet receipt from @option{NODE} node. If no @option{-pkt} is specified, then acknowledge all packet in node's @code{rx} outbound spool. If @option{-all} is specified, then do that for all nodes. + +That commands outputs list of created encrypted ACK packets +(@code{NODE/PKT}) to @strong{4}th file descriptor. That output can be +passed for example to @command{@ref{nncp-rm}} to remove them after +transmission to not wait for acknowledgement and retransmission. General workflow with acknowledgement is following, assuming that Alice has some outbound packets for Bob: @@ -33,16 +40,24 @@ @end example That will also check if copied packets checksum is not mismatched. -@item Create ACK packets of received ones: +@item Create ACK packets of received ones, saving the list of encrypted + ACK packets: @example -bob$ nncp-ack -node alice +bob$ nncp-ack -node alice 4>acks @end example @item Send those newly created packets back to Alice: @example -bob$ nncp-xfer -tx /mnt/shared +bob$ nncp-xfer [-keep] -tx /mnt/shared +@end example + +@item Remove them from outbound spool, because we expect no + acknowledgement for them: + +@example +bob$ nncp-rm -node alice -pkt 0 { + if _, exists := pkts[filepath.Base(info.Name())]; exists { + ctx.LogI("rm", nncp.LEs{{K: "File", V: path}}, logMsg) + if *dryRun { + return nil + } + return os.Remove(path) } - return os.Remove(path) } if !*doSeen && !*doNoCK && !*doHdr && !*doPart && (*doRx || *doTx) && @@ -234,12 +252,12 @@ } return nil }) } - if *pktRaw != "" || *doRx || *doNoCK || *doPart { + if len(pkts) > 0 || *doRx || *doNoCK || *doPart { if err = remove(nncp.TRx); err != nil { log.Fatalln("Can not remove:", err) } } - if *pktRaw != "" || *doTx || *doHdr { + if len(pkts) > 0 || *doTx || *doHdr { if err = remove(nncp.TTx); err != nil { log.Fatalln("Can not remove:", err) } @@ -315,9 +333,12 @@ if info.IsDir() { return nil } if now.Sub(info.ModTime()) < oldBoundary { - ctx.LogD("rm-skip", nncp.LEs{{K: "File", V: path}}, func(les nncp.LEs) string { - return fmt.Sprintf("File %s: too fresh, skipping", path) - }) + ctx.LogD( + "rm-skip", nncp.LEs{{K: "File", V: path}}, + func(les nncp.LEs) string { + return fmt.Sprintf("File %s: too fresh, skipping", path) + }, + ) return nil } ctx.LogI( diff --git a/src/cmd/nncp-trns/main.go b/src/cmd/nncp-trns/main.go index e9735baa648cfd8761326956b1acaa4ad3aa9cb6e1b9c772156a98c6cea1de34..39e8993736f27812b29487f293770dbe704619973a8689981215250957fc9de9 100644 --- a/src/cmd/nncp-trns/main.go +++ b/src/cmd/nncp-trns/main.go @@ -137,7 +137,7 @@ pktTrns, err := nncp.NewPkt(nncp.PktTypeTrns, 0, pktEnc.Recipient[:]) if err != nil { panic(err) } - if _, _, err = ctx.Tx( + if _, _, _, err = ctx.Tx( node, pktTrns, nice, diff --git a/src/nncp.go b/src/nncp.go index db1e1afbc656a292e3b4364dafae4aafca90ba1dc2d16a14acf28f235a27f208..84b489bfb56f270eb503d87d1164db5c31b8e0bc04fb252570e8863101668f6b 100644 --- a/src/nncp.go +++ b/src/nncp.go @@ -40,7 +40,7 @@ const Base32Encoded32Len = 52 var ( - Version string = "8.6.0" + Version string = "8.7.0" Base32Codec *base32.Encoding = base32.StdEncoding.WithPadding(base32.NoPadding) ) diff --git a/src/toss.go b/src/toss.go index ae805e6e3688ce4d4739cc11d924bb4db0eb397625cf8a074e78b50d4e210e82..111c0643ea657fab8cc5a93cc9d5539862aba50eb3b94bf7fddebc2a53b9b242 100644 --- a/src/toss.go +++ b/src/toss.go @@ -621,7 +621,7 @@ pktTrns, err := NewPkt(PktTypeTrns, 0, nodeId[:]) if err != nil { panic(err) } - if _, _, err = ctx.Tx( + if _, _, _, err = ctx.Tx( node, pktTrns, nice, @@ -752,7 +752,7 @@ continue } if nodeId != sender.Id && nodeId != pktEnc.Sender { ctx.LogI("rx-area-echo", lesEcho, logMsgNode) - if _, _, err = ctx.Tx( + if _, _, _, err = ctx.Tx( node, &pkt, nice, @@ -926,6 +926,8 @@ ctx.LogE("rx-ack", les, err, func(les LEs) string { return logMsg(les) + ": removing packet" }) return err + } else if ctx.HdrUsage { + os.Remove(JobPath2Hdr(pktPath)) } } } else { diff --git a/src/tx.go b/src/tx.go index 60b8e404d5a8de783879941e0aa4f94d2e3436351b5eb8551e23727efefaca31..68d126a4a4fecd231d7e244af450ae05781c07b9fef2d71ee33a7c611c473bd4 100644 --- a/src/tx.go +++ b/src/tx.go @@ -57,12 +57,12 @@ srcSize, minSize, maxSize int64, src io.Reader, pktName string, areaId *AreaId, -) (*Node, int64, error) { +) (*Node, int64, string, error) { var area *Area if areaId != nil { area = ctx.AreaId2Area[*areaId] if area.Prv == nil { - return nil, 0, errors.New("area has no encryption keys") + return nil, 0, "", errors.New("area has no encryption keys") } } hops := make([]*Node, 0, 1+len(node.Via)) @@ -82,15 +82,15 @@ expectedSize = srcSize + PktOverhead expectedSize += sizePadCalc(expectedSize, minSize, wrappers) expectedSize = PktEncOverhead + sizeWithTags(expectedSize) if maxSize != 0 && expectedSize > maxSize { - return nil, 0, TooBig + return nil, 0, "", TooBig } if !ctx.IsEnoughSpace(expectedSize) { - return nil, 0, errors.New("is not enough space") + return nil, 0, "", errors.New("is not enough space") } } tmp, err := ctx.NewTmpFileWHash() if err != nil { - return nil, 0, err + return nil, 0, "", err } results := make(chan PktEncWriteResult) @@ -211,7 +211,7 @@ for i := 0; i <= wrappers; i++ { r := <-results if r.err != nil { tmp.Fd.Close() - return nil, 0, r.err + return nil, 0, "", r.err } if r.pktEncRaw != nil { pktEncRaw = r.pktEncRaw @@ -224,7 +224,7 @@ nodePath := filepath.Join(ctx.Spool, lastNode.Id.String()) err = tmp.Commit(filepath.Join(nodePath, string(TTx))) os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name)) if err != nil { - return lastNode, 0, err + return lastNode, 0, "", err } if ctx.HdrUsage { ctx.HdrWrite(pktEncRaw, filepath.Join(nodePath, string(TTx), tmp.Checksum())) @@ -255,18 +255,18 @@ ) } if err = ensureDir(seenDir); err != nil { ctx.LogE("tx-mkdir", les, err, logMsg) - return lastNode, 0, err + return lastNode, 0, "", err } if fd, err := os.Create(seenPath); err == nil { fd.Close() if err = DirSync(seenDir); err != nil { ctx.LogE("tx-dirsync", les, err, logMsg) - return lastNode, 0, err + return lastNode, 0, "", err } } ctx.LogI("tx-area", les, logMsg) } - return lastNode, payloadSize, err + return lastNode, payloadSize, tmp.Checksum(), err } type DummyCloser struct{} @@ -438,7 +438,7 @@ pkt, err := NewPkt(PktTypeFile, nice, []byte(dstPath)) if err != nil { return err } - _, finalSize, err := ctx.Tx( + _, finalSize, pktName, err := ctx.Tx( node, pkt, nice, srcSize, minSize, maxSize, bufio.NewReader(reader), dstPath, areaId, @@ -450,6 +450,7 @@ {"Nice", int(nice)}, {"Src", srcPath}, {"Dst", dstPath}, {"Size", finalSize}, + {"Pkt", pktName}, } logMsg := func(les LEs) string { return fmt.Sprintf( @@ -480,7 +481,7 @@ if err != nil { return err } hsh := MTHNew(0, 0) - _, size, err := ctx.Tx( + _, size, pktName, err := ctx.Tx( node, pkt, nice, 0, minSize, maxSize, io.TeeReader(lr, hsh), @@ -494,6 +495,7 @@ {"Nice", int(nice)}, {"Src", srcPath}, {"Dst", path}, {"Size", size}, + {"Pkt", pktName}, } logMsg := func(les LEs) string { return fmt.Sprintf( @@ -541,7 +543,7 @@ if err != nil { return err } metaPktSize := int64(buf.Len()) - _, _, err = ctx.Tx( + _, _, pktName, err := ctx.Tx( node, pkt, nice, @@ -555,6 +557,7 @@ {"Nice", int(nice)}, {"Src", srcPath}, {"Dst", path}, {"Size", metaPktSize}, + {"Pkt", pktName}, } logMsg := func(les LEs) string { return fmt.Sprintf( @@ -593,7 +596,9 @@ return err } src := strings.NewReader(dstPath) size := int64(src.Len()) - _, _, err = ctx.Tx(node, pkt, nice, size, minSize, MaxFileSize, src, srcPath, nil) + _, _, pktName, err := ctx.Tx( + node, pkt, nice, size, minSize, MaxFileSize, src, srcPath, nil, + ) les := LEs{ {"Type", "freq"}, {"Node", node.Id}, @@ -601,6 +606,7 @@ {"Nice", int(nice)}, {"ReplyNice", int(replyNice)}, {"Src", srcPath}, {"Dst", dstPath}, + {"Pkt", pktName}, } logMsg := func(les LEs) string { return fmt.Sprintf( @@ -657,7 +663,9 @@ pw.Close() }(in) in = pr } - _, size, err := ctx.Tx(node, pkt, nice, 0, minSize, maxSize, in, handle, areaId) + _, size, pktName, err := ctx.Tx( + node, pkt, nice, 0, minSize, maxSize, in, handle, areaId, + ) if !noCompress { e := <-compressErr if err == nil { @@ -672,6 +680,7 @@ {"Nice", int(nice)}, {"ReplyNice", int(replyNice)}, {"Dst", dst}, {"Size", size}, + {"Pkt", pktName}, } logMsg := func(les LEs) string { return fmt.Sprintf( @@ -735,25 +744,28 @@ node *Node, nice uint8, hsh string, minSize int64, -) error { +) (pktName string, err error) { hshRaw, err := Base32Codec.DecodeString(hsh) if err != nil { - return err + return "", err } if len(hshRaw) != MTHSize { - return errors.New("Invalid packet id size") + return "", errors.New("Invalid packet id size") } pkt, err := NewPkt(PktTypeACK, nice, []byte(hshRaw)) if err != nil { - return err + return "", err } src := bytes.NewReader([]byte{}) - _, _, err = ctx.Tx(node, pkt, nice, 0, minSize, MaxFileSize, src, hsh, nil) + _, _, pktName, err = ctx.Tx( + node, pkt, nice, 0, minSize, MaxFileSize, src, hsh, nil, + ) les := LEs{ {"Type", "ack"}, {"Node", node.Id}, {"Nice", int(nice)}, {"Pkt", hsh}, + {"NewPkt", pktName}, } logMsg := func(les LEs) string { return fmt.Sprintf("ACK to %s of %s is sent", ctx.NodeName(node.Id), hsh) @@ -763,5 +775,5 @@ ctx.LogI("tx", les, logMsg) } else { ctx.LogE("tx", les, err, logMsg) } - return err + return } diff --git a/src/tx_test.go b/src/tx_test.go index 92dc7ce4757f7686aa04d8c9be231acf3eb773383bc675eaf59dc091af52b1eb..12d6441606dece05acb453cadfe5c78e044e1ae4e89ad4a8c79ae83bf36d4486 100644 --- a/src/tx_test.go +++ b/src/tx_test.go @@ -87,7 +87,7 @@ nodeTgt.Via = append(nodeTgt.Via, node.Id) } pkt, err := NewPkt(PktTypeExec, replyNice, []byte(pathSrc)) src := bytes.NewReader(data) - dstNode, _, err := ctx.Tx( + dstNode, _, _, err := ctx.Tx( nodeTgt, pkt, 123,