README | 2 +- README.RU | 2 +- doc/admin.texi | 2 +- doc/call.texi | 11 +++++++++++ doc/cfg.texi | 4 ++++ doc/cmds.texi | 43 +++++++++++++++++++++++++++---------------- doc/contacts.texi | 2 +- doc/download.texi | 4 ++++ doc/integration.texi | 17 ++++++++++++++--- doc/news.ru.texi | 30 ++++++++++++++++++++++++++++++ doc/news.texi | 32 +++++++++++++++++++++++++++++++- doc/sendmail.sh | 6 ++++++ doc/spool.texi | 97 +++++++++++++++++++++++++++++++++++++---------------- makedist.sh | 4 ++-- ports/nncp/files/nncp.newsyslog.conf.sample | 2 +- src/call.go | 3 +++ src/cfg.go | 10 ++++++++++ src/check.go | 57 +++++++++++++++++++++++++++++++++++++++++++++++++++-- src/cmd/nncp-bundle/main.go | 25 +++++++++++++++++++------ src/cmd/nncp-call/main.go | 2 ++ src/cmd/nncp-caller/main.go | 2 +- src/cmd/nncp-cfgnew/main.go | 3 +++ src/cmd/nncp-check/main.go | 19 +++++++++++++++++-- src/cmd/nncp-daemon/main.go | 7 +++++-- src/cmd/nncp-rm/main.go | 23 +++++++++++------------ src/cmd/nncp-stat/main.go | 24 ++++++++++++++++++++---- src/cmd/nncp-xfer/main.go | 33 ++++++++++++++++++++++----------- src/ctx.go | 1 + src/go.mod | 10 +++++----- src/go.sum | 18 ++++++++++-------- src/humanizer.go | 24 +++++++++--------------- src/jobs.go | 102 +++++++++++++++++++++++++++++++++++++++++++++-------- src/log.go | 2 ++ src/nncp.go | 4 +++- src/pkt.go | 39 ++++++++++++++++++++++----------------- src/pkt_test.go | 4 ++-- src/progress.go | 2 +- src/sp.go | 282 +++++++++++++++++++++++++++++++++++++++++------------ src/tmp.go | 6 +++++- src/toss.go | 44 ++++++++++++++++++++++++++------------------ src/toss_test.go | 9 +++++++-- src/tx.go | 24 ++++++++++++++++++++---- src/tx_test.go | 8 ++++++-- diff --git a/README b/README index ef2d7850c19b449280ca13f974c46e7e339f42039f7291984a2214a1dfe64c47..a0c253956c606af4fd8844d604f1445b2e9e902b8c337923fdda126086ba24d1 100644 --- a/README +++ b/README @@ -22,7 +22,7 @@ Home page: http://www.nncpgo.org/ Please send questions regarding the use of NNCP, bug reports and patches to nncp-devel mailing list: -https://lists.cypherpunks.ru/pipermail/nncp-devel/ +http://lists.cypherpunks.ru/nncp_002ddevel.html Development Git source code repository currently is located here: http://www.git.cypherpunks.ru/?p=nncp.git;a=summary diff --git a/README.RU b/README.RU index b13fdb65f8af1818fb494303666f2b9c88f97cf89490c46b9289d193913e4119..4683e14596fce73da0a056d3dac4836792e95a8b09a105b6b230267fa67cd0f1 100644 --- a/README.RU +++ b/README.RU @@ -27,7 +27,7 @@ Домашняя страница: http://www.nncpgo.org/ Пожалуйста все вопросы касающиеся использования NNCP, отчёты об ошибках и патчи отправляйте в nncp-devel почтовую рассылку: -https://lists.cypherpunks.ru/pipermail/nncp-devel/ +http://lists.cypherpunks.ru/nncp_002ddevel.html Исходный код для разработчика находится в Git репозитории: http://www.git.cypherpunks.ru/?p=nncp.git;a=summary diff --git a/doc/admin.texi b/doc/admin.texi index 0ae401fb5d5581fa042863ba751f1f4bb601e428c2b57aa59119afdf37ba580b..e32f2d4e7e61c7c6cdeaf5d75b9b57ce696ef07161cf3bae5fe28809ba4ed79d 100644 --- a/doc/admin.texi +++ b/doc/admin.texi @@ -45,7 +45,7 @@ rotated. Choose you own preferable way to do it. Example @url{https://www.newsyslog.org/manual.html, newsyslog}'s entry: @example -/var/spool/nncp/log 644 7 100 * CYN +/var/spool/nncp/log 644 7 100 * BCYN @end example @item diff --git a/doc/call.texi b/doc/call.texi index ca76063489d8a8bc1b785933056160a3042103c2795cd55e0c8f04b6faee5e88..49bb81809abd291ed3361936ba1f309e2c830bc5a899fdf6bc16545a7e16adbd 100644 --- a/doc/call.texi +++ b/doc/call.texi @@ -31,6 +31,7 @@ }, { cron: "*/5 * * * * * *" when-tx-exists: true + nock: true }, ] @end verbatim @@ -84,5 +85,15 @@ created, or skip any kind of packet processing. @item when-tx-exists Call only if packets for sending exists. + +@anchor{CfgNoCK} +@item nock +NoCK (no-checksumming) tells not to do checksumming of received files, +assuming that it will be done for example with @ref{nncp-check} command +later. That can help minimizing time spent online, because HDD won't do +simultaneous reading of the data for checksumming and writing of the +received one, but just sequential writing of the file. Pay attention +that you have to make a call to remote node after checksumming is done, +to send notification about successful packet reception. @end table diff --git a/doc/cfg.texi b/doc/cfg.texi index f6de9b51d10c42209fae2734234c87a6d2ad8998c23f55cd270975b5089be53e..bc53047df4dd5766bed9d2ee3c8deb510e04973857d93275b88c2d18840e2716 100644 --- a/doc/cfg.texi +++ b/doc/cfg.texi @@ -9,6 +9,7 @@ spool: /var/spool/nncp log: /var/spool/nncp/log umask: "022" noprogress: true + nohdr: true notify: { file: { @@ -102,6 +103,9 @@ Enabled @strong{noprogress} option disabled progress showing for many commands by default. You can always force its showing with @option{-progress} command line option anyway. + +@anchor{CfgNoHdr} +@strong{nohdr} option disables @ref{HdrFile, .hdr} files usage. @anchor{CfgNotify} @strong{notify} section contains notification settings for successfully diff --git a/doc/cmds.texi b/doc/cmds.texi index 3bfcd6d34ae6378957ee2dd29953d49851b2b91ddeb610904c9af5d7d548dc77..9d806ec1c0075ec689ff9742812ed41c7a7e0f414c166bee0701cc3e0f64e0d7 100644 --- a/doc/cmds.texi +++ b/doc/cmds.texi @@ -102,6 +102,7 @@ [-pkts PKT,PKT,...] [-rxrate INT] [-txrate INT] [-autotoss*] + [-nock] NODE[:ADDR] [FORCEADDR] @end example @@ -114,15 +115,17 @@ transfer. If @option{-rx} option is specified then only inbound packets transmission is performed. If @option{-tx} option is specified, then -only outbound transmission is performed. @option{-onlinedeadline} -overrides @ref{CfgOnlineDeadline, @emph{onlinedeadline}}. -@option{-maxonlinetime} overrides @ref{CfgMaxOnlineTime, -@emph{maxonlinetime}}. @option{-rxrate}/@option{-txrate} override -@ref{CfgXxRate, rxrate/txrate}. @option{-list} option allows you to list -packets of remote node, without any transmission. +only outbound transmission is performed. -You can specify what packets your want to download, by specifying -@option{-pkts} option with comma-separated list of packets identifiers. +@option{-onlinedeadline} overrides @ref{CfgOnlineDeadline, @emph{onlinedeadline}}. +@option{-maxonlinetime} overrides @ref{CfgMaxOnlineTime, @emph{maxonlinetime}}. +@option{-rxrate}/@option{-txrate} override @ref{CfgXxRate, rxrate/txrate}. +Read @ref{CfgNoCK, more} about @option{-nock} option. + +@option{-list} option allows you to list packets of remote node, without +any transmission. You can specify what packets your want to download, by +specifying @option{-pkts} option with comma-separated list of packets +identifiers. Each @option{NODE} can contain several uniquely identified @option{ADDR}esses in @ref{CfgAddrs, configuration} file. If you do @@ -230,13 +233,16 @@ @node nncp-check @section nncp-check @example -$ nncp-check [options] +$ nncp-check [-nock] [options] @end example Perform @ref{Spool, spool} directory integrity check. Read all files that has Base32-encoded filenames and compare it with recalculated -BLAKE2b hash output of their contents. That supplementary command is -not used often in practice, if ever. +BLAKE2b hash output of their contents. + +The most useful mode of operation is with @option{-nock} option, that +checks integrity of @file{.nock} files, renaming them to ordinary +(verified) encrypted packets. @node nncp-cronexpr @section nncp-cronexpr @@ -252,7 +258,9 @@ @node nncp-daemon @section nncp-daemon @example -$ nncp-daemon [options] [-maxconn INT] [-bind ADDR] [-inetd] [-autotoss*] +$ nncp-daemon [options] + [-maxconn INT] [-bind ADDR] [-inetd] + [-autotoss*] [-nock] @end example Start listening TCP daemon, wait for incoming connections and run @@ -277,6 +285,8 @@ @option{-autotoss} option runs tosser on node's spool every second during the call. All @option{-autotoss-*} options is the same as in @ref{nncp-toss} command. + +Read @ref{CfgNoCK, more} about @option{-nock} option. @node nncp-exec @section nncp-exec @@ -509,6 +519,7 @@ $ nncp-rm [options] -tmp $ nncp-rm [options] -lock $ nncp-rm [options] -node NODE -part $ nncp-rm [options] -node NODE -seen +$ nncp-rm [options] -node NODE -nock $ nncp-rm [options] -node NODE [-rx] [-tx] $ nncp-rm [options] -node NODE -pkt PKT @end example @@ -529,10 +540,10 @@ Base32 name) will be deleted. This is useful when you see some packet failing to be processed. @item When either @option{-rx} or @option{-tx} options are specified -(maybe both of them), then delete all packets from that given queues. If -@option{-part} is given, then delete only @file{.part}ly downloaded -ones. If @option{-seen} option is specified, then delete only -@file{.seen} files. +(maybe both of them), then delete all packets from that given queues. +@option{-part} option deletes @file{.part}ly downloaded files. +@option{-seen} option deletes @file{.seen} files. @option{-nock} option +deletes non-checksummed (non-verified) @file{.nock} files. @item @option{-dryrun} option just prints what will be deleted. diff --git a/doc/contacts.texi b/doc/contacts.texi index 6e0969825a3060beefa8b3e565f5ef638ff708324afbb656363eb095d7c27827..4c2a432adfe1a34146602e481bda82031cd3281d2e4241bcf4350697106b9261 100644 --- a/doc/contacts.texi +++ b/doc/contacts.texi @@ -2,7 +2,7 @@ @node Contacts @unnumbered Contacts Please send questions regarding the use of NNCP, bug reports and patches to -@url{https://lists.cypherpunks.ru/pipermail/nncp-devel/, nncp-devel} +@url{http://lists.cypherpunks.ru/nncp_002ddevel.html, nncp-devel} mailing list. Announcements also go to this mailing list. Official website is @url{http://www.nncpgo.org/}. diff --git a/doc/download.texi b/doc/download.texi index 2ff95c019c6d4010a039c1a767639884a38f101feb7dc6d6f4d295b8eecb5282..a53b7ae3a07406e3e5100975f6e5d71e11bbda89d89f025387cdf9fd5d87e35e 100644 --- a/doc/download.texi +++ b/doc/download.texi @@ -25,6 +25,10 @@ @multitable {XXXXX} {XXXX-XX-XX} {XXXX KiB} {link sign} {xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx} @headitem Version @tab Date @tab Size @tab Tarball @tab SHA256 checksum +@item @ref{Release 6.0.0, 6.0.0} @tab 2021-01-23 @tab 1028 KiB +@tab @url{download/nncp-6.0.0.tar.xz, link} @url{download/nncp-6.0.0.tar.xz.sig, sign} +@tab @code{42FE8AA5 4520B3A1 ABB50D66 1BBBA6A1 41CE4E74 9B4816B0 D4C6845D 67465916} + @item @ref{Release 5.6.0, 5.6.0} @tab 2021-01-17 @tab 1024 KiB @tab @url{download/nncp-5.6.0.tar.xz, link} @url{download/nncp-5.6.0.tar.xz.sig, sign} @tab @code{1DC83F05 F14A3C3B 95820046 C60B170E B8C8936F 142A5B9A 1E943E6F 4CEFBDE3} diff --git a/doc/integration.texi b/doc/integration.texi index a16ee2fd7c3c90da0892eefac5a3224a4925ce6cc6f86d269dd4434b4d5ad5fe..a3c4e14386fb9f3f2265eeb700201b3d6705a7b615e0eb3a126cad61a38210c8 100644 --- a/doc/integration.texi +++ b/doc/integration.texi @@ -55,15 +55,26 @@ delivery via NNCP: @example /usr/local/etc/postfix/master.cf: nncp unix - n n - - pipe - flags=F user=nncp argv=nncp-exec -quiet $nexthop sendmail $recipient + flags=FRqhu user=nncp argv=nncp-exec -quiet $nexthop sendmail $recipient @end example This runs the @command{nncp-exec} command to place outgoing mail into -the NNCP queue after replacing @var{$nexthop} by the the receiving NNCP +the NNCP queue after replacing @var{$nexthop} by the receiving NNCP node and after replacing @var{$recipient} by the recipients. The @command{pipe(8)} delivery agent executes the @command{nncp-exec} command without assistance from the shell, so there are no problems with shell meta characters in command-line parameters. + +Pay attention to @code{flags}, containing @code{R}, telling Postfix to +include @code{Return-Path:} header. Otherwise that envelope sender +information will be lost. Possibly you will also need somehow to +preserve that header on the receiving side, because @command{sendmail} +command will replace it. For example you can rename it before feeding to +@command{sendmail} with +@code{reformail -R Return-Path: X-Original-Return-Path: | sendmail}, or +extract with: + +@verbatiminclude sendmail.sh @item Specify that mail for @emph{example.com}, should be delivered via NNCP, to a host named @emph{nncp-host}: @@ -134,7 +145,7 @@ @example /usr/local/etc/postfix/master.cf: nncp unix - n n - - pipe - flags=F user=nncp argv=nncp-exec -quiet $nexthop sendmail $recipient + flags=Fqhu user=nncp argv=nncp-exec -quiet $nexthop sendmail $recipient @end example This runs the @command{nncp-exec} command to place outgoing mail into diff --git a/doc/news.ru.texi b/doc/news.ru.texi index 23db4253ff491af02394fad4f8b8039c8016179b10e51373f13b1a9115de6e0e..7a2e6b7d40d4d9622a7100e066f52178670ab7842d24c4b882b813451c5f7065 100644 --- a/doc/news.ru.texi +++ b/doc/news.ru.texi @@ -1,6 +1,36 @@ @node Новости @section Новости +@node Релиз 6.1.0 +@subsection Релиз 6.1.0 +@itemize + +@item +Оптимизация: большинство команд теперь не держат открытыми файловые +дескрипторы. Прежде вы легко могли выйти за пределы максимально +допустимого количества открытых файлов, если у вас было много пакетов в +spool директории. + +@item +Оптимизация: не закрывать файловый дескриптор файла который мы качаем. +Прежде каждый его кусочек приводил к дорогим open/close вызовам. + +@item +Скачиваемые в режиме online файлы теперь сохраняются с @file{.nock} +суффиксом (non-checksummed), ожидая пока либо @command{nncp-check}, либо +online демоны не выполнят проверку целостности. + +@item +Оптимизация: для файлов, скачивание которых не было продолжено, сразу же +вычисляет контрольная сумма, пропуская промежуточный @file{.nock} шаг. + +@item +Возможность хранения заголовков зашифрованных пакетов в @file{.hdr} +файлах, рядом с самими пакетами. Это может существенно повысить скорость +получения списка пакетов на файловых системах с большим размером блока. + +@end itemize + @node Релиз 6.0.0 @subsection Релиз 6.0.0 @itemize diff --git a/doc/news.texi b/doc/news.texi index 9a0cce24bfb06af9f04d7a3c40cf6db2ddc9aec3bdd5dedd197e539d499bbe53..78f7662a7cab44be7737bae97f614d19024daf15c94bf40ab73c333d861cb79b 100644 --- a/doc/news.texi +++ b/doc/news.texi @@ -3,11 +3,41 @@ @unnumbered News See also this page @ref{Новости, on russian}. +@node Release 6.1.0 +@section Release 6.1.0 +@itemize + +@item +Optimization: most commands do not keep opened file descriptors now. +Previously you can exceed maximal number of opened files if you have got +many packets in the spool directory. + +@item +Optimization: do not close file descriptor of the file we download +online. Previously each chunk lead to expensive open/close calls. + +@item +Online downloaded files are saved with @file{.nock} (non-checksummed) +suffix, waiting either for @command{nncp-check}, or online daemons to +perform integrity check. + +@item +Optimization: files, that are not resumed, are checksummed immediately +during the online download, skipping @file{.nock}-intermediate step. + +@item +Ability to store encrypted packet's header in @file{.hdr} file, close to +the packet itself. That can greatly increase performance of packets +listing on filesystems with big block's size. + +@end itemize + @node Release 6.0.0 @section Release 6.0.0 @itemize -@item Log uses human readable and easy machine parseable +@item +Log uses human readable and easy machine parseable @url{https://www.gnu.org/software/recutils/, recfile} format for the records, instead of structured RFC 3339 lines. Old logs are not readable by @command{nncp-log} anymore. diff --git a/doc/sendmail.sh b/doc/sendmail.sh new file mode 100755 index 0000000000000000000000000000000000000000..adc8d22e84b0571a5a1c6176d1f512cce6ac12e5926b83784aac7232ba8d04fe --- /dev/null +++ b/doc/sendmail.sh @@ -0,0 +1,6 @@ +#!/bin/sh -e + +tmp=`mktemp` +trap "rm -f $tmp" HUP PIPE INT QUIT TERM EXIT +cat > $tmp +sendmail -f "`reformail -x Return-Path: < $tmp`" $@ < $tmp diff --git a/doc/spool.texi b/doc/spool.texi index 478c4f071f8cccd67472e94616e88d4829b5bde637cc51bd6bbc73fe6679c45f..422f0e103dfb88d63b6888500737b9f08b52061e929f5ecdd4e555fbbae7edd2 100644 --- a/doc/spool.texi +++ b/doc/spool.texi @@ -3,39 +3,78 @@ @unnumbered Spool directory Spool directory holds @ref{Encrypted, encrypted packets} received from remote nodes and queued for sending to them. It has the following -example structure: +example structure with just single outbound (@code{tx}) packet +@code{LYT64MWSNDK34CVYOO7TA6ZCJ3NWI2OUDBBMX2A4QWF34FIRY4DQ} to the node +@code{2WHBV3TPZHDOZGUJEH563ZEK7M33J4UESRFO4PDKWD5KZNPROABQ}: @example -spool/tmp/ -spool/2WHB...OABQ/rx.lock -spool/2WHB...OABQ/rx/5ZIB...UMKW.part -spool/2WHB...OABQ/tx.lock -spool/2WHB...OABQ/toss.lock -spool/BYRR...CG6Q/rx.lock -spool/BYRR...CG6Q/rx/ -spool/BYRR...CG6Q/tx.lock -spool/BYRR...CG6Q/tx/AQUT...DGNT.seen -spool/BYRR...CG6Q/tx/NSYY...ZUU6 -spool/BYRR...CG6Q/tx/VCSR...3VXX.seen -spool/BYRR...CG6Q/tx/ZI5U...5RRQ +spool/2WHBV3TPZHDOZGUJEH563ZEK7M33J4UESRFO4PDKWD5KZNPROABQ/toss.lock +spool/2WHBV3TPZHDOZGUJEH563ZEK7M33J4UESRFO4PDKWD5KZNPROABQ/rx.lock +spool/2WHBV3TPZHDOZGUJEH563ZEK7M33J4UESRFO4PDKWD5KZNPROABQ/rx/ +spool/2WHBV3TPZHDOZGUJEH563ZEK7M33J4UESRFO4PDKWD5KZNPROABQ/tx.lock +spool/2WHBV3TPZHDOZGUJEH563ZEK7M33J4UESRFO4PDKWD5KZNPROABQ/tx/LYT64MWSNDK34CVYOO7TA6ZCJ3NWI2OUDBBMX2A4QWF34FIRY4DQ +spool/tmp @end example -Except for @file{tmp}, all other directories are Base32-encoded node -identifiers (@file{2WHB...OABQ}, @file{BYRR...CG6Q} in our example). -Each node subdirectory has @file{rx} (received, partially received and -currently unprocessed packets) and @file{tx} (for outbound packets) -directories. +@table @file + +@item tmp +directory contains various temporary files that under normal +circumstances are renamed to necessary files inside other directories. +All directories in @file{spool} @strong{have to} be on the same +filesystem for working renaming. + +@item 2WHBV3TPZHDOZGUJEH563ZEK7M33J4UESRFO4PDKWD5KZNPROABQ +is an example Base32-encoded neighbour identifier. + +@item rx, tx +directories are for incoming and outgoing encrypted packets. @file{rx} +contains currently unfinished, non-checked, unprocessed, etc packets. + +@item toss.lock, rx.lock, tx.lock +Lock files. Only single process can work with @file{rx}/@file{tx} +directories at once. + +@item LYT64MWSNDK34CVYOO7TA6ZCJ3NWI2OUDBBMX2A4QWF34FIRY4DQ +is an example @ref{Encrypted, encrypted packet}. Its filename is Base32 +encoded BLAKE2b hash of the whole contents. It can be integrity checked +anytime. + +@item LYT64MWSNDK34CVYOO7TA6ZCJ3NWI2OUDBBMX2A4QWF34FIRY4DQ.part +is an example @strong{partly} received file. It can appear only when +online transfer is used. Its filename is sent by remote side and until +file is fully downloaded -- it plays no role. -Each @file{rx}/@file{tx} directory contains one file per encrypted -packet. Its filename is Base32 encoded BLAKE2b hash of the contents. So -it can be integrity checked at any time. @file{5ZIB...UMKW.part} is -partially received file from @file{2WHB...OABQ} node. @file{tx} -directory can not contain partially written files -- they are moved -atomically from @file{tmp}. +@item LYT64MWSNDK34CVYOO7TA6ZCJ3NWI2OUDBBMX2A4QWF34FIRY4DQ.nock +non-checksummed (NoCK) @strong{fully} received file. Its checksum is +verified against its filename either by @ref{nncp-check}, or by working +online daemons. If it is correct, then its extension is trimmed. -When @ref{nncp-toss} utility is called with @option{-seen} option, it -will create empty @file{XXX.seen} files, telling that some kind of -packet was already tossed sometime. +@item LYT64MWSNDK34CVYOO7TA6ZCJ3NWI2OUDBBMX2A4QWF34FIRY4DQ.seen +@ref{nncp-toss} utility can be invoked with @option{-seen} option, +leading to creation of @file{.seen} files, telling that the file with +specified hash has already been processed before. It could be useful +when there are use-cases where multiple ways of packets transfer +available and there is possibility of duplicates reception. You have to +manually remove them, when you do not need them (probably because they +are expired). -Only one process can work with @file{rx}/@file{tx} directories at once, -so there are corresponding lock files. +@anchor{HdrFile} +@item LYT64MWSNDK34CVYOO7TA6ZCJ3NWI2OUDBBMX2A4QWF34FIRY4DQ.hdr +If no @ref{CfgNoHdr, nohdr} option is enabled in configuration file, +then @file{.hdr} files are automatically created for every ordinary +(fully received and checksummed) packet. It literally contains just the +header of the corresponding packet. It will be automatically created +even during simple @ref{nncp-stat} call. On filesystems with big +blocksize (ZFS for example) it can greatly help listing the packets in +directories, because it prevents unnecessary read-amplification. On +other filesystems probably it won't help at all, or even harm +performance. + +There is a hack: you can create more dense @file{.hdr} allocation by +removing all @file{.hdr} files and then running @command{nncp-stat}, +that will recreate them. In many cases many @file{.hdr} files will be +allocated more or less linearly on the disk, decreasing listing time +even more. + +@end table diff --git a/makedist.sh b/makedist.sh index 3603fa3bed1c9f269706d6af7caea26b2fc564e317b025f1c4d6d3a247be4038..e4bc26bffc1bf80ab9c135f62a40b27357d29c47f05985d3dafae7754170589c 100755 --- a/makedist.sh +++ b/makedist.sh @@ -170,7 +170,7 @@ GPG key ID: 0x2B25868E75A1A953 NNCP releases Fingerprint: 92C2 F0AE FE73 208E 46BF F3DE 2B25 868E 75A1 A953 Please send questions regarding the use of NNCP, bug reports and patches -to mailing list: https://lists.cypherpunks.ru/pipermail/nncp-devel/ +to mailing list: http://lists.cypherpunks.ru/nncp_002ddevel.html EOF cat < nice { ctx.LogD("nncp-bundle", les, "too nice") - job.Fd.Close() // #nosec G104 continue + } + fd, err := os.Open(job.Path) + if err != nil { + log.Fatalln("Error during opening:", err) } if err = tarWr.WriteHeader(&tar.Header{ Format: tar.FormatUSTAR, @@ -155,7 +158,7 @@ }); err != nil { log.Fatalln("Error writing tar header:", err) } if _, err = nncp.CopyProgressed( - tarWr, job.Fd, "Tx", + tarWr, bufio.NewReader(fd), "Tx", append(les, nncp.LEs{ {K: "Pkt", V: nncp.Base32Codec.EncodeToString(job.HshValue[:])}, {K: "FullSize", V: job.Size}, @@ -164,7 +167,9 @@ ctx.ShowPrgrs, ); err != nil { log.Fatalln("Error during copying to tar:", err) } - job.Fd.Close() // #nosec G104 + if err = fd.Close(); err != nil { + log.Fatalln("Error during closing:", err) + } if err = tarWr.Flush(); err != nil { log.Fatalln("Error during tar flushing:", err) } @@ -172,8 +177,10 @@ if err = bufStdout.Flush(); err != nil { log.Fatalln("Error during stdout flushing:", err) } if *doDelete { - if err = os.Remove(job.Fd.Name()); err != nil { + if err = os.Remove(job.Path); err != nil { log.Fatalln("Error during deletion:", err) + } else if ctx.HdrUsage { + os.Remove(job.Path + nncp.HdrSuffix) } } ctx.LogI("nncp-bundle", append(les, nncp.LE{K: "Size", V: job.Size}), "") @@ -298,7 +305,10 @@ } if nncp.Base32Codec.EncodeToString(hsh.Sum(nil)) == pktName { ctx.LogI("nncp-bundle", les, "removed") if !*dryRun { - os.Remove(dstPath) // #nosec G104 + os.Remove(dstPath) + if ctx.HdrUsage { + os.Remove(dstPath + nncp.HdrSuffix) + } } } else { ctx.LogE("nncp-bundle", les, errors.New("bad checksum"), "") @@ -406,6 +416,9 @@ log.Fatalln("Error during renaming:", err) } if err = nncp.DirSync(dstDirPath); err != nil { log.Fatalln("Error during syncing:", err) + } + if ctx.HdrUsage { + ctx.HdrWrite(pktEncBuf, dstPath) } } } diff --git a/src/cmd/nncp-call/main.go b/src/cmd/nncp-call/main.go index 1676445e44d34ddaaf1194791317345a1978899159bed79e6cae773563fbb75b..5d1cd353ca241ab07c48f64c6fe90b5fe7278920148a58ee067eea375d25f0ce 100644 --- a/src/cmd/nncp-call/main.go +++ b/src/cmd/nncp-call/main.go @@ -44,6 +44,7 @@ niceRaw = flag.String("nice", nncp.NicenessFmt(255), "Minimal required niceness") rxOnly = flag.Bool("rx", false, "Only receive packets") txOnly = flag.Bool("tx", false, "Only transmit packets") listOnly = flag.Bool("list", false, "Only list remote packets") + noCK = flag.Bool("nock", false, "Do no checksum checking") onlyPktsRaw = flag.String("pkts", "", "Recieve only that packets, comma separated") rxRate = flag.Int("rxrate", 0, "Maximal receive rate, pkts/sec") txRate = flag.Int("txrate", 0, "Maximal transmit rate, pkts/sec") @@ -185,6 +186,7 @@ *txRate, onlineDeadline, maxOnlineTime, *listOnly, + *noCK, onlyPkts, ) diff --git a/src/cmd/nncp-caller/main.go b/src/cmd/nncp-caller/main.go index 706420ab06c05b02611bb4f2ef74d113153517269c6a6f1739e43303f907a776..2247758c7913ca8c69f24c6a17b12a53e6aa4f9278e88ab627112c65476110a8 100644 --- a/src/cmd/nncp-caller/main.go +++ b/src/cmd/nncp-caller/main.go @@ -138,7 +138,6 @@ if call.WhenTxExists && call.Xx != "TRx" { ctx.LogD("caller", les, "checking tx existence") txExists := false for job := range ctx.Jobs(node.Id, nncp.TTx) { - job.Fd.Close() if job.PktEnc.Nice > call.Nice { continue } @@ -177,6 +176,7 @@ call.TxRate, call.OnlineDeadline, call.MaxOnlineTime, false, + call.NoCK, nil, ) diff --git a/src/cmd/nncp-cfgnew/main.go b/src/cmd/nncp-cfgnew/main.go index 84ec9b1bb1e992fd72ff921cb87d54dce3bc8050520fed7c28991e3809264cb8..d5d6d2d69f6030636663bd307fe7c26cfa5fb1dd4bcdae81c3ade9a0ec1b121e 100644 --- a/src/cmd/nncp-cfgnew/main.go +++ b/src/cmd/nncp-cfgnew/main.go @@ -104,6 +104,8 @@ # Enforce specified umask usage # umask: "022" # Omit progress showing by default # noprogress: true + # Do not use .hdr files + # nohdr: true # Enable notification email sending # notify: { @@ -211,6 +213,7 @@ # # txrate: 20 # # xx: rx # # addr: lan # # when-tx-exists: true + # # nock: true # # # # autotoss: false # # autotoss-doseen: true diff --git a/src/cmd/nncp-check/main.go b/src/cmd/nncp-check/main.go index bbff5053935cb307276b3e10ca6b6b6547fddbff6adc4866190c224a6d5ce5cd..806b4cb166c526cd32e7b382ca06f428803b04bb51fea9697a9b2c044d998335 100644 --- a/src/cmd/nncp-check/main.go +++ b/src/cmd/nncp-check/main.go @@ -23,6 +23,7 @@ "flag" "fmt" "log" "os" + "path/filepath" "go.cypherpunks.ru/nncp/v5" ) @@ -30,12 +31,13 @@ func usage() { fmt.Fprintf(os.Stderr, nncp.UsageHeader()) fmt.Fprintf(os.Stderr, "nncp-check -- verify Rx/Tx packets checksum\n\n") - fmt.Fprintf(os.Stderr, "Usage: %s [options]\nOptions:\n", os.Args[0]) + fmt.Fprintf(os.Stderr, "Usage: %s [-nock] [options]\nOptions:\n", os.Args[0]) flag.PrintDefaults() } func main() { var ( + nock = flag.Bool("nock", false, "Process .nock files") cfgPath = flag.String("cfg", nncp.DefaultCfgPath, "Path to configuration file") nodeRaw = flag.String("node", "", "Process only that node") spoolPath = flag.String("spool", "", "Override path to spool") @@ -85,7 +87,20 @@ for nodeId, node := range ctx.Neigh { if nodeOnly != nil && nodeId != *nodeOnly.Id { continue } - if !ctx.Check(node.Id) { + if *nock { + for job := range ctx.JobsNoCK(node.Id) { + if _, err = ctx.CheckNoCK(node.Id, job.HshValue); err != nil { + pktName := nncp.Base32Codec.EncodeToString(job.HshValue[:]) + log.Println(filepath.Join( + ctx.Spool, + nodeId.String(), + string(nncp.TRx), + pktName+nncp.NoCKSuffix, + ), err) + isBad = true + } + } + } else if !ctx.Check(node.Id) { isBad = true } } diff --git a/src/cmd/nncp-daemon/main.go b/src/cmd/nncp-daemon/main.go index dc6dbf2210c0c2c9d23e60733a8ef08042344c6668949dcbe76dad89d1b18d2e..8b2a35e5c7939ae59357916a30fbee580046765fecc254e5513b8dcefd5088c8 100644 --- a/src/cmd/nncp-daemon/main.go +++ b/src/cmd/nncp-daemon/main.go @@ -70,11 +70,13 @@ func performSP( ctx *nncp.Ctx, conn nncp.ConnDeadlined, nice uint8, + noCK bool, nodeIdC chan *nncp.NodeId, ) { state := nncp.SPState{ Ctx: ctx, Nice: nice, + NoCK: noCK, } if err := state.StartR(conn); err == nil { ctx.LogI("call-start", nncp.LEs{{K: "Node", V: state.Node.Id}}, "connected") @@ -108,6 +110,7 @@ niceRaw = flag.String("nice", nncp.NicenessFmt(255), "Minimal required niceness") bind = flag.String("bind", "[::]:5400", "Address to bind to") inetd = flag.Bool("inetd", false, "Is it started as inetd service") maxConn = flag.Int("maxconn", 128, "Maximal number of simultaneous connections") + noCK = flag.Bool("nock", false, "Do no checksum checking") spoolPath = flag.String("spool", "", "Override path to spool") logPath = flag.String("log", "", "Override path to logfile") quiet = flag.Bool("quiet", false, "Print only errors") @@ -160,7 +163,7 @@ if *inetd { os.Stderr.Close() // #nosec G104 conn := &InetdConn{os.Stdin, os.Stdout} nodeIdC := make(chan *nncp.NodeId) - go performSP(ctx, conn, nice, nodeIdC) + go performSP(ctx, conn, nice, *noCK, nodeIdC) nodeId := <-nodeIdC var autoTossFinish chan struct{} var autoTossBadCode chan bool @@ -197,7 +200,7 @@ } ctx.LogD("daemon", nncp.LEs{{K: "Addr", V: conn.RemoteAddr()}}, "accepted") go func(conn net.Conn) { nodeIdC := make(chan *nncp.NodeId) - go performSP(ctx, conn, nice, nodeIdC) + go performSP(ctx, conn, nice, *noCK, nodeIdC) nodeId := <-nodeIdC var autoTossFinish chan struct{} var autoTossBadCode chan bool diff --git a/src/cmd/nncp-rm/main.go b/src/cmd/nncp-rm/main.go index 1e866805af696952e02290fffb487d99df627c45461aa88fa04bc2b2bc3f6218..c434e9315ae670a36bd094c9d3a4bb6ecfd8a2d0eaa298adb99ace6cc917dcce 100644 --- a/src/cmd/nncp-rm/main.go +++ b/src/cmd/nncp-rm/main.go @@ -39,6 +39,8 @@ fmt.Fprintf(os.Stderr, "Usage: %s [options] -tmp\n", os.Args[0]) fmt.Fprintf(os.Stderr, " %s [options] -lock\n", os.Args[0]) fmt.Fprintf(os.Stderr, " %s [options] -node NODE -part\n", os.Args[0]) fmt.Fprintf(os.Stderr, " %s [options] -node NODE -seen\n", os.Args[0]) + fmt.Fprintf(os.Stderr, " %s [options] -node NODE -nock\n", os.Args[0]) + fmt.Fprintf(os.Stderr, " %s [options] -node NODE -hdr\n", os.Args[0]) fmt.Fprintf(os.Stderr, " %s [options] -node NODE {-rx|-tx}\n", os.Args[0]) fmt.Fprintf(os.Stderr, " %s [options] -node NODE -pkt PKT\n", os.Args[0]) fmt.Fprintln(os.Stderr, "-older option's time units are: (s)econds, (m)inutes, (h)ours, (d)ays") @@ -50,12 +52,14 @@ func main() { var ( cfgPath = flag.String("cfg", nncp.DefaultCfgPath, "Path to configuration file") doTmp = flag.Bool("tmp", false, "Remove all temporary files") + doHdr = flag.Bool("hdr", false, "Remove all .hdr files") doLock = flag.Bool("lock", false, "Remove all lock files") nodeRaw = flag.String("node", "", "Node to remove files in") doRx = flag.Bool("rx", false, "Process received packets") doTx = flag.Bool("tx", false, "Process transfered packets") doPart = flag.Bool("part", false, "Remove only .part files") doSeen = flag.Bool("seen", false, "Remove only .seen files") + doNoCK = flag.Bool("nock", false, "Remove only .nock files") older = flag.String("older", "", "XXX{smhd}: only older than XXX number of time units") dryRun = flag.Bool("dryrun", false, "Do not actually remove files") pktRaw = flag.String("pkt", "", "Packet to remove") @@ -176,14 +180,10 @@ if now.Sub(info.ModTime()) < oldBoundary { ctx.LogD("nncp-rm", nncp.LEs{{K: "File", V: path}}, "too fresh, skipping") return nil } - if *doSeen && strings.HasSuffix(info.Name(), nncp.SeenSuffix) { - ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "") - if *dryRun { - return nil - } - return os.Remove(path) - } - if *doPart && strings.HasSuffix(info.Name(), nncp.PartSuffix) { + if (*doSeen && strings.HasSuffix(info.Name(), nncp.SeenSuffix)) || + (*doNoCK && strings.HasSuffix(info.Name(), nncp.NoCKSuffix)) || + (*doHdr && strings.HasSuffix(info.Name(), nncp.HdrSuffix)) || + (*doPart && strings.HasSuffix(info.Name(), nncp.PartSuffix)) { ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "") if *dryRun { return nil @@ -197,8 +197,7 @@ return nil } return os.Remove(path) } - if !*doSeen && - !*doPart && + if !*doSeen && !*doNoCK && !*doHdr && !*doPart && (*doRx || *doTx) && ((*doRx && xx == nncp.TRx) || (*doTx && xx == nncp.TTx)) { ctx.LogI("nncp-rm", nncp.LEs{{K: "File", V: path}}, "") @@ -210,12 +209,12 @@ } return nil }) } - if *pktRaw != "" || *doRx || *doSeen || *doPart { + if *pktRaw != "" || *doRx || *doSeen || *doNoCK || *doHdr || *doPart { if err = remove(nncp.TRx); err != nil { log.Fatalln("Can not remove:", err) } } - if *pktRaw != "" || *doTx { + if *pktRaw != "" || *doTx || *doHdr { if err = remove(nncp.TTx); err != nil { log.Fatalln("Can not remove:", err) } diff --git a/src/cmd/nncp-stat/main.go b/src/cmd/nncp-stat/main.go index a1ba1cb5d8463942212204d63f9fc611a3a1c3d318e1ad979229966cb9b2397e..52d1fbf9523096c24f02e1736dafdb8a0bbfc32b8bab756d1469db9d2ffe767b 100644 --- a/src/cmd/nncp-stat/main.go +++ b/src/cmd/nncp-stat/main.go @@ -98,18 +98,25 @@ } fmt.Println(node.Name) rxNums := make(map[uint8]int) rxBytes := make(map[uint8]int64) + noCKNums := make(map[uint8]int) + noCKBytes := make(map[uint8]int64) for job := range ctx.Jobs(node.Id, nncp.TRx) { - job.Fd.Close() // #nosec G104 if *showPkt { jobPrint(nncp.TRx, job) } rxNums[job.PktEnc.Nice] = rxNums[job.PktEnc.Nice] + 1 rxBytes[job.PktEnc.Nice] = rxBytes[job.PktEnc.Nice] + job.Size } + for job := range ctx.JobsNoCK(node.Id) { + if *showPkt { + jobPrint(nncp.TRx, job) + } + noCKNums[job.PktEnc.Nice] = noCKNums[job.PktEnc.Nice] + 1 + noCKBytes[job.PktEnc.Nice] = noCKBytes[job.PktEnc.Nice] + job.Size + } txNums := make(map[uint8]int) txBytes := make(map[uint8]int64) for job := range ctx.Jobs(node.Id, nncp.TTx) { - job.Fd.Close() // #nosec G104 if *showPkt { jobPrint(nncp.TTx, job) } @@ -120,17 +127,26 @@ var nice uint8 for nice = 1; nice > 0; nice++ { rxNum, rxExists := rxNums[nice] txNum, txExists := txNums[nice] - if !(rxExists || txExists) { + noCKNum, noCKExists := noCKNums[nice] + if !(rxExists || txExists || noCKExists) { continue } fmt.Printf( - "\tnice:% 4s | Rx: % 10s, % 3d pkts | Tx: % 10s, % 3d pkts\n", + "\tnice:% 4s | Rx: % 10s, % 3d pkts | Tx: % 10s, % 3d pkts", nncp.NicenessFmt(nice), humanize.IBytes(uint64(rxBytes[nice])), rxNum, humanize.IBytes(uint64(txBytes[nice])), txNum, ) + if noCKExists { + fmt.Printf( + " | NoCK: % 10s, % 3d pkts", + humanize.IBytes(uint64(noCKBytes[nice])), + noCKNum, + ) + } + fmt.Printf("\n") } } } diff --git a/src/cmd/nncp-xfer/main.go b/src/cmd/nncp-xfer/main.go index 19039937bcd80494f81ce9a88ef64cf46c3d7c863332282714be608ebb30f906..984834569b2b91033a4e4852631de787700d06a516dcce3731b5e68548054401 100644 --- a/src/cmd/nncp-xfer/main.go +++ b/src/cmd/nncp-xfer/main.go @@ -28,7 +28,6 @@ "log" "os" "path/filepath" - xdr "github.com/davecgh/go-xdr/xdr2" "go.cypherpunks.ru/nncp/v5" ) @@ -183,8 +182,7 @@ ctx.LogE("nncp-xfer", les, err, "open") isBad = true continue } - var pktEnc nncp.PktEnc - _, err = xdr.Unmarshal(fd, &pktEnc) + pktEnc, pktEncRaw, err := ctx.HdrRead(fd) if err != nil || pktEnc.Magic != nncp.MagicNNCPEv4 { ctx.LogD("nncp-xfer", les, "is not a packet") fd.Close() // #nosec G104 @@ -249,6 +247,14 @@ ctx.LogE("nncp-xfer", les, err, "remove") isBad = true } } + if ctx.HdrUsage { + ctx.HdrWrite(pktEncRaw, filepath.Join( + ctx.Spool, + nodeId.String(), + string(nncp.TRx), + tmp.Checksum(), + )) + } } } @@ -315,39 +321,42 @@ } } les = les[:len(les)-1] for job := range ctx.Jobs(&nodeId, nncp.TTx) { - pktName := filepath.Base(job.Fd.Name()) + pktName := filepath.Base(job.Path) les := append(les, nncp.LE{K: "Pkt", V: pktName}) if job.PktEnc.Nice > nice { ctx.LogD("nncp-xfer", les, "too nice") - job.Fd.Close() // #nosec G104 continue } if _, err = os.Stat(filepath.Join(dstPath, pktName)); err == nil || !os.IsNotExist(err) { ctx.LogD("nncp-xfer", les, "already exists") - job.Fd.Close() // #nosec G104 continue } if _, err = os.Stat(filepath.Join(dstPath, pktName+nncp.SeenSuffix)); err == nil || !os.IsNotExist(err) { ctx.LogD("nncp-xfer", les, "already exists") - job.Fd.Close() // #nosec G104 continue } tmp, err := nncp.TempFile(dstPath, "xfer") if err != nil { ctx.LogE("nncp-xfer", les, err, "mktemp") - job.Fd.Close() // #nosec G104 isBad = true break } les = append(les, nncp.LE{K: "Tmp", V: tmp.Name()}) ctx.LogD("nncp-xfer", les, "created") + fd, err := os.Open(job.Path) + if err != nil { + ctx.LogE("nncp-xfer", les, err, "open") + tmp.Close() // #nosec G104 + isBad = true + continue + } bufW := bufio.NewWriter(tmp) copied, err := nncp.CopyProgressed( - bufW, bufio.NewReader(job.Fd), "Tx", + bufW, bufio.NewReader(fd), "Tx", append(les, nncp.LE{K: "FullSize", V: job.Size}), ctx.ShowPrgrs, ) - job.Fd.Close() // #nosec G104 + fd.Close() // #nosec G104 if err != nil { ctx.LogE("nncp-xfer", les, err, "copy") tmp.Close() // #nosec G104 @@ -383,9 +392,11 @@ os.Remove(filepath.Join(dstPath, pktName+".part")) // #nosec G104 les = les[:len(les)-1] ctx.LogI("nncp-xfer", append(les, nncp.LE{K: "Size", V: copied}), "") if !*keep { - if err = os.Remove(job.Fd.Name()); err != nil { + if err = os.Remove(job.Path); err != nil { ctx.LogE("nncp-xfer", les, err, "remove") isBad = true + } else if ctx.HdrUsage { + os.Remove(job.Path + nncp.HdrSuffix) } } } diff --git a/src/ctx.go b/src/ctx.go index 654f57b2195501100df90c712f4625c1ea439e3406b93208f24d3b6e2c57cc4e..ef923c18a0164bcbdf2a5dfb3db58f6299ff482a26ac9c9c06f7752780adaaab 100644 --- a/src/ctx.go +++ b/src/ctx.go @@ -40,6 +40,7 @@ LogPath string UmaskForce *int Quiet bool ShowPrgrs bool + HdrUsage bool Debug bool NotifyFile *FromToJSON NotifyFreq *FromToJSON diff --git a/src/go.mod b/src/go.mod index eae52f6941cf16567e429a05b0294d4e6af8f5bcbd7cf4b1180840e4b43ad622..394d45b08db362efa66d69385ae788a6042ae4614b73fdcbafd5850e39c2437b 100644 --- a/src/go.mod +++ b/src/go.mod @@ -6,14 +6,14 @@ github.com/dustin/go-humanize v1.0.0 github.com/flynn/noise v0.0.0-20180327030543-2492fe189ae6 github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75 github.com/hjson/hjson-go v3.1.0+incompatible - github.com/klauspost/compress v1.11.4 + github.com/klauspost/compress v1.11.7 github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect go.cypherpunks.ru/balloon v1.1.1 go.cypherpunks.ru/recfile v0.4.3 - golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad - golang.org/x/net v0.0.0-20201224014010-6772e930b67b - golang.org/x/sys v0.0.0-20210105210732-16f7687f5001 - golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 + golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83 + golang.org/x/net v0.0.0-20210220033124-5f55cee0dc0d + golang.org/x/sys v0.0.0-20210220050731-9a76102bfb43 + golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b // indirect ) diff --git a/src/go.sum b/src/go.sum index 79cb1dbfc3d3e0c9fb33a7c16fa167f5bbd6b2534a2aaa1145e3f1a41ec6e12f..2cb6efde18d2f185325408c4a36328132ef3e55bfe6ce25a8a6e061313ff0394 100644 --- a/src/go.sum +++ b/src/go.sum @@ -8,8 +8,8 @@ github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75 h1:f0n1xnMSmBLzVfsMMvriDyA75NB/oBgILX2GcHXIQzY= github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75/go.mod h1:g2644b03hfBX9Ov0ZBDgXXens4rxSxmqFBbhvKv2yVA= github.com/hjson/hjson-go v3.1.0+incompatible h1:DY/9yE8ey8Zv22bY+mHV1uk2yRy0h8tKhZ77hEdi0Aw= github.com/hjson/hjson-go v3.1.0+incompatible/go.mod h1:qsetwF8NlsTsOTwZTApNlTCerV+b2GjYRRcIk4JMFio= -github.com/klauspost/compress v1.11.4 h1:kz40R/YWls3iqT9zX9AHN3WoVsrAWVyui5sxuLqiXqU= -github.com/klauspost/compress v1.11.4/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= +github.com/klauspost/compress v1.11.7 h1:0hzRabrMN4tSTvMfnL3SCv1ZGeAP23ynzodBgaHeMeg= +github.com/klauspost/compress v1.11.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= @@ -20,19 +20,21 @@ go.cypherpunks.ru/balloon v1.1.1/go.mod h1:k4s4ozrIrhpBjj78Z7LX8ZHxMQ+XE7DZUWl8gP2ojCo= go.cypherpunks.ru/recfile v0.4.3 h1:ephokihmV//p0ob6gx2FWXvm28/NBDbWTOJPUNahxO8= go.cypherpunks.ru/recfile v0.4.3/go.mod h1:sR+KajB+vzofL3SFVFwKt3Fke0FaCcN1g3YPNAhU3qI= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad h1:DN0cp81fZ3njFcrLCytUHRSUkqBjfTo4Tx9RJTWs0EY= -golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= +golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83 h1:/ZScEX8SfEmUGRHs0gxpqteO5nfNW6axyZbBdw9A12g= +golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20201224014010-6772e930b67b h1:iFwSg7t5GZmB/Q5TjiEAsdoLDrdJRC1RiF2WhuV29Qw= -golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20210220033124-5f55cee0dc0d h1:1aflnvSoWWLI2k/dMUAl5lvU1YO4Mb4hz0gh+1rjcxU= +golang.org/x/net v0.0.0-20210220033124-5f55cee0dc0d/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210105210732-16f7687f5001 h1:/dSxr6gT0FNI1MO5WLJo8mTmItROeOKTkDn+7OwWBos= -golang.org/x/sys v0.0.0-20210105210732-16f7687f5001/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210220050731-9a76102bfb43 h1:SgQ6LNaYJU0JIuEHv9+s6EbhSCwYeAf5Yvj6lpYlqAE= +golang.org/x/sys v0.0.0-20210220050731-9a76102bfb43/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 h1:v+OssWQX+hTHEmOBgwxdZxK4zHq3yOs8F9J7mk0PY8E= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d h1:SZxvLBoTP5yHO3Frd4z4vrF+DBX9vMVanchswa69toE= +golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/src/humanizer.go b/src/humanizer.go index 2c249d5f4288fc41ee4e6f9094b64e6a9073831cacd02020cc985f82b2fb7226..cb963052b76863fc77cebbdda99775af064aa27ae1c67ef206ee1491729447f2 100644 --- a/src/humanizer.go +++ b/src/humanizer.go @@ -47,13 +47,14 @@ node, err := ctx.FindNode(nodeS) if err == nil { nodeS = node.Name } + var sizeParsed uint64 var size string if sizeRaw, exists := le["Size"]; exists { - sp, err := strconv.ParseUint(sizeRaw, 10, 64) + sizeParsed, err = strconv.ParseUint(sizeRaw, 10, 64) if err != nil { return "", err } - size = humanize.IBytes(uint64(sp)) + size = humanize.IBytes(sizeParsed) } var msg string @@ -203,7 +204,6 @@ } if err, exists := le["Err"]; exists { msg += ": " + err } - case "sp-info": nice, err := NicenessParse(le["Nice"]) if err != nil { @@ -213,15 +213,13 @@ msg = fmt.Sprintf( "Packet %s (%s) (nice %s)", le["Pkt"], size, NicenessFmt(nice), ) - offsetParsed, err := strconv.ParseUint(le["Offset"], 10, 64) - if err != nil { - return "", err + if offset := le["Offset"]; offset != "" { + offsetParsed, err := strconv.ParseUint(offset, 10, 64) + if err != nil { + return "", err + } + msg += fmt.Sprintf(": %d%%", 100*offsetParsed/sizeParsed) } - sizeParsed, err := strconv.ParseUint(le["Size"], 10, 64) - if err != nil { - return "", err - } - msg += fmt.Sprintf(": %d%%", 100*offsetParsed/sizeParsed) if m, exists := le["Msg"]; exists { msg += ": " + m } @@ -247,10 +245,6 @@ default: return "", errors.New("unknown XX") } fullsize, err := strconv.ParseUint(le["FullSize"], 10, 64) - if err != nil { - return "", err - } - sizeParsed, err := strconv.ParseUint(le["Size"], 10, 64) if err != nil { return "", err } diff --git a/src/jobs.go b/src/jobs.go index 0478403aebf2dc93c4531d6eaa8be19f60688a248a90eb676835dad9300d924f..ebf857c08ddd38cfe70f25b9c91777c309717e8c2702d0a4d3d09b69d85a7d7f 100644 --- a/src/jobs.go +++ b/src/jobs.go @@ -18,9 +18,10 @@ package nncp import ( - "io" + "bytes" "os" "path/filepath" + "strings" xdr "github.com/davecgh/go-xdr/xdr2" ) @@ -30,16 +31,54 @@ const ( TRx TRxTx = "rx" TTx TRxTx = "tx" + + HdrSuffix = ".hdr" ) type Job struct { PktEnc *PktEnc - Fd *os.File + Path string Size int64 HshValue *[32]byte } -func (ctx *Ctx) Jobs(nodeId *NodeId, xx TRxTx) chan Job { +func (ctx *Ctx) HdrRead(fd *os.File) (*PktEnc, []byte, error) { + var pktEnc PktEnc + _, err := xdr.Unmarshal(fd, &pktEnc) + if err != nil { + return nil, nil, err + } + var raw bytes.Buffer + if _, err = xdr.Marshal(&raw, pktEnc); err != nil { + panic(err) + } + return &pktEnc, raw.Bytes(), nil +} + +func (ctx *Ctx) HdrWrite(pktEncRaw []byte, tgt string) error { + tmpHdr, err := ctx.NewTmpFile() + if err != nil { + ctx.LogE("hdr-write", []LE{}, err, "new") + return err + } + if _, err = tmpHdr.Write(pktEncRaw); err != nil { + ctx.LogE("hdr-write", []LE{}, err, "write") + os.Remove(tmpHdr.Name()) + return err + } + if err = tmpHdr.Close(); err != nil { + ctx.LogE("hdr-write", []LE{}, err, "close") + os.Remove(tmpHdr.Name()) + return err + } + if err = os.Rename(tmpHdr.Name(), tgt+HdrSuffix); err != nil { + ctx.LogE("hdr-write", []LE{}, err, "rename") + return err + } + return err +} + +func (ctx *Ctx) jobsFind(nodeId *NodeId, xx TRxTx, nock bool) chan Job { rxPath := filepath.Join(ctx.Spool, nodeId.String(), string(xx)) jobs := make(chan Job, 16) go func() { @@ -54,33 +93,58 @@ if err != nil { return } for _, fi := range fis { - hshValue, err := Base32Codec.DecodeString(fi.Name()) - if err != nil { - continue + name := fi.Name() + var hshValue []byte + if nock { + if !strings.HasSuffix(name, NoCKSuffix) || + len(name) != Base32Encoded32Len+len(NoCKSuffix) { + continue + } + hshValue, err = Base32Codec.DecodeString( + strings.TrimSuffix(name, NoCKSuffix), + ) + } else { + if len(name) != Base32Encoded32Len { + continue + } + hshValue, err = Base32Codec.DecodeString(name) } - fd, err := os.Open(filepath.Join(rxPath, fi.Name())) if err != nil { continue } - var pktEnc PktEnc - if _, err = xdr.Unmarshal(fd, &pktEnc); err != nil || pktEnc.Magic != MagicNNCPEv4 { - fd.Close() // #nosec G104 + pth := filepath.Join(rxPath, name) + hdrExists := true + var fd *os.File + if nock { + fd, err = os.Open(pth) + } else { + fd, err = os.Open(pth + HdrSuffix) + if err != nil && os.IsNotExist(err) { + hdrExists = false + fd, err = os.Open(pth) + } + } + if err != nil { continue } - if _, err = fd.Seek(0, io.SeekStart); err != nil { - fd.Close() // #nosec G104 + pktEnc, pktEncRaw, err := ctx.HdrRead(fd) + fd.Close() + if err != nil || pktEnc.Magic != MagicNNCPEv4 { continue } ctx.LogD("jobs", LEs{ {"XX", string(xx)}, {"Node", pktEnc.Sender}, - {"Name", fi.Name()}, + {"Name", name}, {"Nice", int(pktEnc.Nice)}, {"Size", fi.Size()}, }, "taken") + if !hdrExists && ctx.HdrUsage { + ctx.HdrWrite(pktEncRaw, pth) + } job := Job{ - PktEnc: &pktEnc, - Fd: fd, + PktEnc: pktEnc, + Path: pth, Size: fi.Size(), HshValue: new([32]byte), } @@ -90,3 +154,11 @@ } }() return jobs } + +func (ctx *Ctx) Jobs(nodeId *NodeId, xx TRxTx) chan Job { + return ctx.jobsFind(nodeId, xx, false) +} + +func (ctx *Ctx) JobsNoCK(nodeId *NodeId) chan Job { + return ctx.jobsFind(nodeId, TRx, true) +} diff --git a/src/log.go b/src/log.go index 4318929d8c6e7efc4d66e03f28f79c4f12f2279841b8eb775c644efa23f4eaf8..11703041392dbd5a831fc72cb989402bee5d3322d04b5187a91a058c893f5d28 100644 --- a/src/log.go +++ b/src/log.go @@ -43,6 +43,8 @@ for _, le := range les { switch v := le.V.(type) { case int, int8, uint8, int64, uint64: val = fmt.Sprintf("%d", v) + case bool: + val = fmt.Sprintf("%v", v) default: val = fmt.Sprintf("%s", v) } diff --git a/src/nncp.go b/src/nncp.go index fd0376876e587bbe71cdd47b6342a3131283fce826ec30f7ea783c589d5adbc0..9cb396b90f52d65ad7174c347c41af1165e8d58b929968e65941a374c82d6bc6 100644 --- a/src/nncp.go +++ b/src/nncp.go @@ -37,8 +37,10 @@ You should have received a copy of the GNU General Public License along with this program. If not, see .` ) +const Base32Encoded32Len = 52 + var ( - Version string = "6.0.0" + Version string = "6.1.0" Base32Codec *base32.Encoding = base32.StdEncoding.WithPadding(base32.NoPadding) ) diff --git a/src/pkt.go b/src/pkt.go index 147f21126ddb1d4c665179b3a20704be3ca00981ed307de348d7f9569237d248..66470bd5207c213d6ef4576269c56e990f32a33e4d878f440ab886a415e49535 100644 --- a/src/pkt.go +++ b/src/pkt.go @@ -192,14 +192,14 @@ nice uint8, size, padSize int64, data io.Reader, out io.Writer, -) error { +) ([]byte, error) { pubEph, prvEph, err := box.GenerateKey(rand.Reader) if err != nil { - return err + return nil, err } var pktBuf bytes.Buffer if _, err := xdr.Marshal(&pktBuf, pkt); err != nil { - return err + return nil, err } tbs := PktTbs{ Magic: MagicNNCPEv4, @@ -210,7 +210,7 @@ ExchPub: *pubEph, } var tbsBuf bytes.Buffer if _, err = xdr.Marshal(&tbsBuf, &tbs); err != nil { - return err + return nil, err } signature := new([ed25519.SignatureSize]byte) copy(signature[:], ed25519.Sign(our.SignPrv, tbsBuf.Bytes())) @@ -222,26 +222,31 @@ Recipient: their.Id, ExchPub: *pubEph, Sign: *signature, } - if _, err = xdr.Marshal(out, &pktEnc); err != nil { - return err + tbsBuf.Reset() + if _, err = xdr.Marshal(&tbsBuf, &pktEnc); err != nil { + return nil, err + } + pktEncRaw := tbsBuf.Bytes() + if _, err = out.Write(pktEncRaw); err != nil { + return nil, err } sharedKey := new([32]byte) curve25519.ScalarMult(sharedKey, prvEph, their.ExchPub) kdf, err := blake2b.NewXOF(KDFXOFSize, sharedKey[:]) if err != nil { - return err + return nil, err } if _, err = kdf.Write(MagicNNCPEv4[:]); err != nil { - return err + return nil, err } key := make([]byte, chacha20poly1305.KeySize) if _, err = io.ReadFull(kdf, key); err != nil { - return err + return nil, err } aead, err := chacha20poly1305.New(key) if err != nil { - return err + return nil, err } nonce := make([]byte, aead.NonceSize()) @@ -249,31 +254,31 @@ fullSize := pktBuf.Len() + int(size) sizeBuf := make([]byte, 8+aead.Overhead()) binary.BigEndian.PutUint64(sizeBuf, uint64(sizeWithTags(int64(fullSize)))) if _, err = out.Write(aead.Seal(sizeBuf[:0], nonce, sizeBuf[:8], nil)); err != nil { - return err + return nil, err } lr := io.LimitedReader{R: data, N: size} mr := io.MultiReader(&pktBuf, &lr) written, err := aeadProcess(aead, nonce, true, mr, out) if err != nil { - return err + return nil, err } if written != fullSize { - return io.ErrUnexpectedEOF + return nil, io.ErrUnexpectedEOF } if padSize > 0 { if _, err = io.ReadFull(kdf, key); err != nil { - return err + return nil, err } kdf, err = blake2b.NewXOF(blake2b.OutputLengthUnknown, key) if err != nil { - return err + return nil, err } if _, err = io.CopyN(out, kdf, padSize); err != nil { - return err + return nil, err } } - return nil + return pktEncRaw, nil } func TbsVerify(our *NodeOur, their *Node, pktEnc *PktEnc) (bool, error) { diff --git a/src/pkt_test.go b/src/pkt_test.go index 771f064c6ae0b6ce461db7bffe7bc8720d15462bc0057ba4e96117be31828599..14f56bb21ab0a48d108f1af84b607270721952a22d0509e4692d9874a146de89 100644 --- a/src/pkt_test.go +++ b/src/pkt_test.go @@ -44,7 +44,7 @@ pkt, err := NewPkt(PktTypeFile, 123, []byte(path)) if err != nil { panic(err) } - err = PktEncWrite( + _, err = PktEncWrite( nodeOur, nodeTheir.Their(), pkt, @@ -95,7 +95,7 @@ pkt, err := NewPkt(PktTypeFile, 123, []byte(path)) if err != nil { panic(err) } - err = PktEncWrite( + _, err = PktEncWrite( node1, node2.Their(), pkt, diff --git a/src/progress.go b/src/progress.go index 59748db8c16bea6722b98221b69d0182a3a69919d5c613c07c880ecfb71c76b3..9d5053a54f9b94e0862591aebab8a124a8028b08395c997149ae8ac9292306aa 100644 --- a/src/progress.go +++ b/src/progress.go @@ -141,7 +141,7 @@ progressBars[pkt] = pb progressBarsLock.Unlock() } what := pkt - if len(what) >= 52 { // Base32 encoded + if len(what) >= Base32Encoded32Len { // Base32 encoded what = what[:16] + ".." + what[len(what)-16:] } what = prefix + " " + what diff --git a/src/sp.go b/src/sp.go index d8bf7356e509dca2604edbfaa7207bb1bb44fe4b38f1d1a87df64c73b752425e..f9ebde288b377af29cb017dc06ab5b822fa77bdfe3858725babeedfcbf2ac9e6 100644 --- a/src/sp.go +++ b/src/sp.go @@ -21,6 +21,7 @@ import ( "bytes" "crypto/subtle" "errors" + "hash" "io" "os" "path/filepath" @@ -30,6 +31,7 @@ "time" xdr "github.com/davecgh/go-xdr/xdr2" "github.com/flynn/noise" + "golang.org/x/crypto/blake2b" ) const ( @@ -38,6 +40,11 @@ PartSuffix = ".part" SPHeadOverhead = 4 ) +type SPCheckerQueues struct { + appeared chan *[32]byte + checked chan *[32]byte +} + var ( MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1} @@ -56,9 +63,19 @@ DefaultDeadline = 10 * time.Second PingTimeout = time.Minute - spCheckerToken chan struct{} + spCheckers = make(map[NodeId]*SPCheckerQueues) ) +type FdAndFullSize struct { + fd *os.File + fullSize int64 +} + +type HasherAndOffset struct { + h hash.Hash + offset uint64 +} + type SPType uint8 const ( @@ -148,8 +165,6 @@ if _, err := xdr.Marshal(&buf, spFile); err != nil { panic(err) } SPFileOverhead = buf.Len() - spCheckerToken = make(chan struct{}, 1) - spCheckerToken <- struct{}{} } func MarshalSP(typ SPType, sp interface{}) []byte { @@ -183,6 +198,7 @@ type SPState struct { Ctx *Ctx Node *Node Nice uint8 + NoCK bool onlineDeadline time.Duration maxOnlineTime time.Duration hs *noise.HandshakeState @@ -214,6 +230,9 @@ isDead chan struct{} listOnly bool onlyPkts map[[32]byte]bool writeSPBuf bytes.Buffer + fds map[string]FdAndFullSize + fileHashers map[string]*HasherAndOffset + checkerQueues SPCheckerQueues sync.RWMutex } @@ -235,6 +254,11 @@ go func() { for range state.pings { } }() + go func() { + for _, s := range state.fds { + s.fd.Close() + } + }() } func (state *SPState) NotAlive() bool { @@ -251,6 +275,25 @@ state.Ctx.UnlockDir(state.rxLock) state.Ctx.UnlockDir(state.txLock) } +func SPChecker(ctx *Ctx, nodeId *NodeId, appeared, checked chan *[32]byte) { + for hshValue := range appeared { + les := LEs{ + {"XX", string(TRx)}, + {"Node", nodeId}, + {"Pkt", Base32Codec.EncodeToString(hshValue[:])}, + } + ctx.LogD("sp-checker", les, "checking") + size, err := ctx.CheckNoCK(nodeId, hshValue) + les = append(les, LE{"Size", size}) + if err != nil { + ctx.LogE("sp-checker", les, err, "") + continue + } + ctx.LogI("sp-done", les, "") + go func(hsh *[32]byte) { checked <- hsh }(hshValue) + } +} + func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error { state.writeSPBuf.Reset() n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{ @@ -292,7 +335,6 @@ func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [][]byte { var infos []*SPInfo var totalSize int64 for job := range ctx.Jobs(nodeId, TTx) { - job.Fd.Close() // #nosec G104 if job.PktEnc.Nice > nice { continue } @@ -445,6 +487,7 @@ state.infosOurSeen = make(map[[32]byte]uint8) state.infosTheir = make(map[[32]byte]*SPInfo) state.started = started state.xxOnly = xxOnly + var buf []byte var payload []byte state.Ctx.LogD("sp-start", LEs{{"Nice", int(state.Nice)}}, "waiting for first message") @@ -530,17 +573,60 @@ } return err } +func (state *SPState) closeFd(pth string) { + s, exists := state.fds[pth] + delete(state.fds, pth) + if exists { + s.fd.Close() + } +} + func (state *SPState) StartWorkers( conn ConnDeadlined, infosPayloads [][]byte, payload []byte, ) error { les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}} + state.fds = make(map[string]FdAndFullSize) + state.fileHashers = make(map[string]*HasherAndOffset) state.isDead = make(chan struct{}) if state.maxOnlineTime > 0 { state.mustFinishAt = state.started.Add(state.maxOnlineTime) } + // Checker + if !state.NoCK { + queues := spCheckers[*state.Node.Id] + if queues == nil { + queues = &SPCheckerQueues{ + appeared: make(chan *[32]byte), + checked: make(chan *[32]byte), + } + spCheckers[*state.Node.Id] = queues + go SPChecker(state.Ctx, state.Node.Id, queues.appeared, queues.checked) + } + state.checkerQueues = *queues + go func() { + for job := range state.Ctx.JobsNoCK(state.Node.Id) { + if job.PktEnc.Nice <= state.Nice { + state.checkerQueues.appeared <- job.HshValue + } + } + }() + state.wg.Add(1) + go func() { + defer state.wg.Done() + for { + select { + case <-state.isDead: + return + case hsh := <-state.checkerQueues.checked: + state.payloads <- MarshalSP(SPTypeDone, SPDone{hsh}) + } + } + }() + } + // Remaining handshake payload sending if len(infosPayloads) > 1 { state.wg.Add(1) @@ -683,22 +769,29 @@ {"Pkt", Base32Codec.EncodeToString(freq.Hash[:])}, {"Size", int64(freq.Offset)}, }...) state.Ctx.LogD("sp-file", lesp, "queueing") - fd, err := os.Open(filepath.Join( + pth := filepath.Join( state.Ctx.Spool, state.Node.Id.String(), string(TTx), Base32Codec.EncodeToString(freq.Hash[:]), - )) - if err != nil { - state.Ctx.LogE("sp-file", lesp, err, "") - return - } - fi, err := fd.Stat() - if err != nil { - state.Ctx.LogE("sp-file", lesp, err, "") - return + ) + fdAndFullSize, exists := state.fds[pth] + if !exists { + fd, err := os.Open(pth) + if err != nil { + state.Ctx.LogE("sp-file", lesp, err, "") + return + } + fi, err := fd.Stat() + if err != nil { + state.Ctx.LogE("sp-file", lesp, err, "") + return + } + fdAndFullSize = FdAndFullSize{fd: fd, fullSize: fi.Size()} + state.fds[pth] = fdAndFullSize } - fullSize := fi.Size() + fd := fdAndFullSize.fd + fullSize := fdAndFullSize.fullSize var buf []byte if freq.Offset < uint64(fullSize) { state.Ctx.LogD("sp-file", lesp, "seeking") @@ -715,7 +808,7 @@ } buf = buf[:n] state.Ctx.LogD("sp-file", append(lesp, LE{"Size", n}), "read") } - fd.Close() // #nosec G104 + state.closeFd(pth) payload = MarshalSP(SPTypeFile, SPFile{ Hash: freq.Hash, Offset: freq.Offset, @@ -862,8 +955,10 @@ state.Ctx.LogD("sp-process", append(les, LE{"Type", "halt"}), "") state.Lock() state.queueTheir = nil state.Unlock() + case SPTypePing: state.Ctx.LogD("sp-process", append(les, LE{"Type", "ping"}), "") + case SPTypeInfo: infosGot = true lesp := append(les, LE{"Type", "info"}) @@ -910,6 +1005,10 @@ replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash})) } continue } + if _, err = os.Stat(pktPath + NoCKSuffix); err == nil { + state.Ctx.LogI("sp-info", lesp, "still non checksummed") + continue + } fi, err := os.Stat(pktPath + PartSuffix) var offset int64 if err == nil { @@ -926,6 +1025,7 @@ SPTypeFreq, SPFreq{info.Hash, uint64(offset)}, )) } + case SPTypeFile: lesp := append(les, LE{"Type", "file"}) state.Ctx.LogD("sp-process", lesp, "unmarshaling packet") @@ -945,29 +1045,60 @@ state.Node.Id.String(), string(TRx), ) filePath := filepath.Join(dirToSync, Base32Codec.EncodeToString(file.Hash[:])) + filePathPart := filePath + PartSuffix state.Ctx.LogD("sp-file", lesp, "opening part") - fd, err := os.OpenFile( - filePath+PartSuffix, - os.O_RDWR|os.O_CREATE, - os.FileMode(0666), - ) - if err != nil { - state.Ctx.LogE("sp-file", lesp, err, "") - return nil, err + fdAndFullSize, exists := state.fds[filePathPart] + var fd *os.File + if exists { + fd = fdAndFullSize.fd + } else { + fd, err = os.OpenFile( + filePathPart, + os.O_RDWR|os.O_CREATE, + os.FileMode(0666), + ) + if err != nil { + state.Ctx.LogE("sp-file", lesp, err, "") + return nil, err + } + state.fds[filePathPart] = FdAndFullSize{fd: fd} + if file.Offset == 0 { + h, err := blake2b.New256(nil) + if err != nil { + panic(err) + } + state.fileHashers[filePath] = &HasherAndOffset{h: h} + } } state.Ctx.LogD("sp-file", append(lesp, LE{"Offset", file.Offset}), "seeking") if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil { state.Ctx.LogE("sp-file", lesp, err, "") - fd.Close() // #nosec G104 + state.closeFd(filePathPart) return nil, err } state.Ctx.LogD("sp-file", lesp, "writing") - _, err = fd.Write(file.Payload) - if err != nil { + if _, err = fd.Write(file.Payload); err != nil { state.Ctx.LogE("sp-file", lesp, err, "") - fd.Close() // #nosec G104 + state.closeFd(filePathPart) return nil, err } + hasherAndOffset, hasherExists := state.fileHashers[filePath] + if hasherExists { + if hasherAndOffset.offset == file.Offset { + if _, err = hasherAndOffset.h.Write(file.Payload); err != nil { + panic(err) + } + hasherAndOffset.offset += uint64(len(file.Payload)) + } else { + state.Ctx.LogE( + "sp-file", lesp, + errors.New("offset differs"), + "deleting hasher", + ) + delete(state.fileHashers, filePath) + hasherExists = false + } + } ourSize := int64(file.Offset + uint64(len(file.Payload))) lesp[len(lesp)-1].V = ourSize fullsize := int64(0) @@ -982,51 +1113,71 @@ if state.Ctx.ShowPrgrs { Progress("Rx", lesp) } if fullsize != ourSize { - fd.Close() // #nosec G104 continue } - <-spCheckerToken - go func() { - defer func() { - spCheckerToken <- struct{}{} - }() - if err := fd.Sync(); err != nil { - state.Ctx.LogE("sp-file", lesp, err, "sync") - fd.Close() // #nosec G104 - return - } - state.wg.Add(1) - defer state.wg.Done() - if _, err = fd.Seek(0, io.SeekStart); err != nil { - fd.Close() // #nosec G104 - state.Ctx.LogE("sp-file", lesp, err, "") - return - } - state.Ctx.LogD("sp-file", lesp, "checking") - gut, err := Check(fd, file.Hash[:], lesp, state.Ctx.ShowPrgrs) - fd.Close() // #nosec G104 - if err != nil || !gut { + err = fd.Sync() + if err != nil { + state.Ctx.LogE("sp-file", lesp, err, "sync") + state.closeFd(filePathPart) + continue + } + if hasherExists { + if bytes.Compare(hasherAndOffset.h.Sum(nil), file.Hash[:]) != 0 { state.Ctx.LogE("sp-file", lesp, errors.New("checksum mismatch"), "") - return + continue } - state.Ctx.LogI("sp-done", lesp, "") - if err = os.Rename(filePath+PartSuffix, filePath); err != nil { + if err = os.Rename(filePathPart, filePath); err != nil { state.Ctx.LogE("sp-file", lesp, err, "rename") - return + continue } if err = DirSync(dirToSync); err != nil { state.Ctx.LogE("sp-file", lesp, err, "sync") - return + continue } - state.Lock() - delete(state.infosTheir, *file.Hash) - state.Unlock() + state.Ctx.LogI("sp-file", lesp, "done") state.wg.Add(1) go func() { state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash}) state.wg.Done() }() - }() + state.Lock() + delete(state.infosTheir, *file.Hash) + state.Unlock() + if !state.Ctx.HdrUsage { + state.closeFd(filePathPart) + continue + } + if _, err = fd.Seek(0, io.SeekStart); err != nil { + state.Ctx.LogE("sp-file", lesp, err, "seek") + state.closeFd(filePathPart) + continue + } + _, pktEncRaw, err := state.Ctx.HdrRead(fd) + state.closeFd(filePathPart) + if err != nil { + state.Ctx.LogE("sp-file", lesp, err, "HdrRead") + continue + } + state.Ctx.HdrWrite(pktEncRaw, filePath) + continue + } + state.closeFd(filePathPart) + if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil { + state.Ctx.LogE("sp-file", lesp, err, "rename") + continue + } + if err = DirSync(dirToSync); err != nil { + state.Ctx.LogE("sp-file", lesp, err, "sync") + continue + } + state.Ctx.LogI("sp-file", lesp, "downloaded") + state.Lock() + delete(state.infosTheir, *file.Hash) + state.Unlock() + if !state.NoCK { + state.checkerQueues.appeared <- file.Hash + } + case SPTypeDone: lesp := append(les, LE{"Type", "done"}) state.Ctx.LogD("sp-process", lesp, "unmarshaling packet") @@ -1036,19 +1187,23 @@ state.Ctx.LogE("sp-process", lesp, err, "") return nil, err } lesp = append(lesp, LE{"Pkt", Base32Codec.EncodeToString(done.Hash[:])}) + lesp = append(lesp, LE{"XX", string(TTx)}) state.Ctx.LogD("sp-done", lesp, "removing") - err := os.Remove(filepath.Join( + pth := filepath.Join( state.Ctx.Spool, state.Node.Id.String(), string(TTx), Base32Codec.EncodeToString(done.Hash[:]), - )) - lesp = append(lesp, LE{"XX", string(TTx)}) - if err == nil { + ) + if err = os.Remove(pth); err == nil { state.Ctx.LogI("sp-done", lesp, "") + if state.Ctx.HdrUsage { + os.Remove(pth + HdrSuffix) + } } else { state.Ctx.LogE("sp-done", lesp, err, "") } + case SPTypeFreq: lesp := append(les, LE{"Type", "freq"}) state.Ctx.LogD("sp-process", lesp, "unmarshaling packet") @@ -1081,6 +1236,7 @@ } } else { state.Ctx.LogD("sp-process", lesp, "unknown") } + default: state.Ctx.LogE( "sp-process", diff --git a/src/tmp.go b/src/tmp.go index 509f83d5eb1ecca0fc567b32c792d6de4f565c91cd462c760a78f171add095ff..52b5aacfc021347518788284250ae2c6bbc31eac54fde4512ca93d0c7cdd800a 100644 --- a/src/tmp.go +++ b/src/tmp.go @@ -92,6 +92,10 @@ } return fd.Close() } +func (tmp *TmpFileWHash) Checksum() string { + return Base32Codec.EncodeToString(tmp.Hsh.Sum(nil)) +} + func (tmp *TmpFileWHash) Commit(dir string) error { var err error if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil { @@ -108,7 +112,7 @@ } if err = tmp.Fd.Close(); err != nil { return err } - checksum := Base32Codec.EncodeToString(tmp.Hsh.Sum(nil)) + checksum := tmp.Checksum() tmp.ctx.LogD("tmp", LEs{{"Src", tmp.Fd.Name()}, {"Dst", checksum}}, "commit") if err = os.Rename(tmp.Fd.Name(), filepath.Join(dir, checksum)); err != nil { return err diff --git a/src/toss.go b/src/toss.go index d6390810bd025120fcaf866c0f78683bcae3b933676c934d58a75f79a67c26a1..aa74b819d75813f2805db5459a418278b7f6bfd472943bcf5083590faf82bad3 100644 --- a/src/toss.go +++ b/src/toss.go @@ -83,23 +83,24 @@ panic(err) } defer decompressor.Close() for job := range ctx.Jobs(nodeId, TRx) { - pktName := filepath.Base(job.Fd.Name()) + pktName := filepath.Base(job.Path) les := LEs{{"Node", job.PktEnc.Sender}, {"Pkt", pktName}} if job.PktEnc.Nice > nice { ctx.LogD("rx", append(les, LE{"Nice", int(job.PktEnc.Nice)}), "too nice") - job.Fd.Close() // #nosec G104 continue } + fd, err := os.Open(job.Path) + if err != nil { + ctx.LogE("rx", les, err, "open") + isBad = true + continue + } + pipeR, pipeW := io.Pipe() go func(job Job) error { pipeWB := bufio.NewWriter(pipeW) - _, _, err := PktEncRead( - ctx.Self, - ctx.Neigh, - bufio.NewReader(job.Fd), - pipeWB, - ) - job.Fd.Close() // #nosec G104 + _, _, err := PktEncRead(ctx.Self, ctx.Neigh, bufio.NewReader(fd), pipeWB) + fd.Close() // #nosec G104 if err != nil { return pipeW.CloseWithError(err) } @@ -109,7 +110,6 @@ } return pipeW.Close() }(job) var pkt Pkt - var err error var pktSize int64 var pktSizeBlocks int64 if _, err = xdr.Unmarshal(pipeR, &pkt); err != nil { @@ -197,13 +197,15 @@ } ctx.LogI("rx", les, "") if !dryRun { if doSeen { - if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil { + if fd, err := os.Create(job.Path + SeenSuffix); err == nil { fd.Close() // #nosec G104 } } - if err = os.Remove(job.Fd.Name()); err != nil { + if err = os.Remove(job.Path); err != nil { ctx.LogE("rx", les, err, "remove") isBad = true + } else if ctx.HdrUsage { + os.Remove(job.Path + HdrSuffix) } } case PktTypeFile: @@ -293,13 +295,15 @@ } ctx.LogI("rx", les, "") if !dryRun { if doSeen { - if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil { + if fd, err := os.Create(job.Path + SeenSuffix); err == nil { fd.Close() // #nosec G104 } } - if err = os.Remove(job.Fd.Name()); err != nil { + if err = os.Remove(job.Path); err != nil { ctx.LogE("rx", les, err, "remove") isBad = true + } else if ctx.HdrUsage { + os.Remove(job.Path + HdrSuffix) } if len(sendmail) > 0 && ctx.NotifyFile != nil { cmd := exec.Command( @@ -362,13 +366,15 @@ } ctx.LogI("rx", les, "") if !dryRun { if doSeen { - if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil { + if fd, err := os.Create(job.Path + SeenSuffix); err == nil { fd.Close() // #nosec G104 } } - if err = os.Remove(job.Fd.Name()); err != nil { + if err = os.Remove(job.Path); err != nil { ctx.LogE("rx", les, err, "remove") isBad = true + } else if ctx.HdrUsage { + os.Remove(job.Path + HdrSuffix) } if len(sendmail) > 0 && ctx.NotifyFreq != nil { cmd := exec.Command( @@ -408,13 +414,15 @@ } ctx.LogI("rx", les, "") if !dryRun { if doSeen { - if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil { + if fd, err := os.Create(job.Path + SeenSuffix); err == nil { fd.Close() // #nosec G104 } } - if err = os.Remove(job.Fd.Name()); err != nil { + if err = os.Remove(job.Path); err != nil { ctx.LogE("rx", les, err, "remove") isBad = true + } else if ctx.HdrUsage { + os.Remove(job.Path + HdrSuffix) } } default: diff --git a/src/toss_test.go b/src/toss_test.go index 2593fb8c79e71b4eecaf2914a72f5b60d9fb2ec39fe149239b45430ab95dd1e6..4cb3f3045841be090c8d4e1ac4fcc03052d962721df115b807eab0ce52045931 100644 --- a/src/toss_test.go +++ b/src/toss_test.go @@ -383,7 +383,12 @@ return false } for job := range ctx.Jobs(ctx.Self.Id, TTx) { var buf bytes.Buffer - _, _, err := PktEncRead(ctx.Self, ctx.Neigh, job.Fd, &buf) + fd, err := os.Open(job.Path) + if err != nil { + t.Error(err) + return false + } + _, _, err = PktEncRead(ctx.Self, ctx.Neigh, fd, &buf) if err != nil { t.Error(err) return false @@ -454,7 +459,7 @@ PathLen: blake2b.Size256, } copy(pktTrans.Path[:], nodeOur.Id[:]) var dst bytes.Buffer - if err := PktEncWrite( + if _, err := PktEncWrite( ctx.Self, ctx.Neigh[*nodeOur.Id], &pktTrans, diff --git a/src/tx.go b/src/tx.go index c3e8cad94646de48b8ef952a165f83114498aba15c2f0a48a7d89febba52a122..541e7ac5cec324122b8e4293e38ce319bcf0be7c127a3e8dee4a8a2e3ee8ff4d 100644 --- a/src/tx.go +++ b/src/tx.go @@ -62,7 +62,9 @@ hops = append(hops, lastNode) } expectedSize := size for i := 0; i < len(hops); i++ { - expectedSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+expectedSize) + expectedSize = PktEncOverhead + + PktSizeOverhead + + sizeWithTags(PktOverhead+expectedSize) } padSize := minSize - expectedSize if padSize < 0 { @@ -79,16 +81,23 @@ errs := make(chan error) curSize := size pipeR, pipeW := io.Pipe() + var pktEncRaw []byte go func(size int64, src io.Reader, dst io.WriteCloser) { ctx.LogD("tx", LEs{ {"Node", hops[0].Id}, {"Nice", int(nice)}, {"Size", size}, }, "wrote") - errs <- PktEncWrite(ctx.Self, hops[0], pkt, nice, size, padSize, src, dst) + pktEncRaw, err = PktEncWrite( + ctx.Self, hops[0], pkt, nice, size, padSize, src, dst, + ) + errs <- err dst.Close() // #nosec G104 }(curSize, src, pipeW) - curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) + padSize + curSize = PktEncOverhead + + PktSizeOverhead + + sizeWithTags(PktOverhead+curSize) + + padSize var pipeRPrev io.Reader for i := 1; i < len(hops); i++ { @@ -101,7 +110,8 @@ {"Node", node.Id}, {"Nice", int(nice)}, {"Size", size}, }, "trns wrote") - errs <- PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst) + _, err := PktEncWrite(ctx.Self, node, pkt, nice, size, 0, src, dst) + errs <- err dst.Close() // #nosec G104 }(hops[i], pktTrns, curSize, pipeRPrev, pipeW) curSize = PktEncOverhead + PktSizeOverhead + sizeWithTags(PktOverhead+curSize) @@ -124,6 +134,12 @@ } nodePath := filepath.Join(ctx.Spool, lastNode.Id.String()) err = tmp.Commit(filepath.Join(nodePath, string(TTx))) os.Symlink(nodePath, filepath.Join(ctx.Spool, lastNode.Name)) // #nosec G104 + if err != nil { + return lastNode, err + } + if ctx.HdrUsage { + ctx.HdrWrite(pktEncRaw, filepath.Join(nodePath, string(TTx), tmp.Checksum())) + } return lastNode, err } diff --git a/src/tx_test.go b/src/tx_test.go index f6664c0c7dc04a7b094f6b50afec23d45d8981aea297cac54c6a064d40e0beb5..cc788b010e6f2a4a45ce5d5ec043026bb3c3f6250117d2a2143177b43761b511 100644 --- a/src/tx_test.go +++ b/src/tx_test.go @@ -98,9 +98,13 @@ if len(sentJobs) != 1 { return false } txJob := sentJobs[0] - defer txJob.Fd.Close() + fd, err := os.Open(txJob.Path) + if err != nil { + panic(err) + } + defer fd.Close() var bufR bytes.Buffer - if _, err = io.Copy(&bufR, txJob.Fd); err != nil { + if _, err = io.Copy(&bufR, fd); err != nil { panic(err) } var bufW bytes.Buffer