2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
38 xdr "github.com/davecgh/go-xdr/xdr2"
39 "github.com/dustin/go-humanize"
40 "github.com/klauspost/compress/zstd"
41 "golang.org/x/crypto/blake2b"
42 "golang.org/x/crypto/poly1305"
49 func newNotification(fromTo *FromToJSON, subject string, body []byte) io.Reader {
51 "From: " + fromTo.From,
53 "Subject: " + mime.BEncoding.Encode("UTF-8", subject),
59 "Content-Type: text/plain; charset=utf-8",
60 "Content-Transfer-Encoding: base64",
62 base64.StdEncoding.EncodeToString(body),
65 return strings.NewReader(strings.Join(lines, "\n"))
71 dryRun, doSeen, noFile, noFreq, noExec, noTrns bool,
73 dirLock, err := ctx.LockDir(nodeId, "toss")
77 defer ctx.UnlockDir(dirLock)
79 sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
80 decompressor, err := zstd.NewReader(nil)
84 defer decompressor.Close()
85 for job := range ctx.Jobs(nodeId, TRx) {
86 pktName := filepath.Base(job.Path)
88 {"Node", job.PktEnc.Sender},
90 {"Nice", int(job.PktEnc.Nice)},
92 if job.PktEnc.Nice > nice {
93 ctx.LogD("rx-too-nice", les, func(les LEs) string {
95 "Tossing %s/%s: too nice: %s",
96 ctx.NodeName(job.PktEnc.Sender), pktName,
97 NicenessFmt(job.PktEnc.Nice),
102 fd, err := os.Open(job.Path)
104 ctx.LogE("rx-open", les, err, func(les LEs) string {
106 "Tossing %s/%s: opening %s",
107 ctx.NodeName(job.PktEnc.Sender), pktName, job.Path,
114 pipeR, pipeW := io.Pipe()
115 go func(job Job) error {
116 pipeWB := bufio.NewWriter(pipeW)
117 _, _, err := PktEncRead(ctx.Self, ctx.Neigh, bufio.NewReader(fd), pipeWB)
118 fd.Close() // #nosec G104
120 return pipeW.CloseWithError(err)
122 if err = pipeWB.Flush(); err != nil {
123 return pipeW.CloseWithError(err)
129 var pktSizeBlocks int64
130 if _, err = xdr.Unmarshal(pipeR, &pkt); err != nil {
131 ctx.LogE("rx-unmarshal", les, err, func(les LEs) string {
133 "Tossing %s/%s: unmarshal",
134 ctx.NodeName(job.PktEnc.Sender), pktName,
140 pktSize = job.Size - PktEncOverhead - PktOverhead - PktSizeOverhead
141 pktSizeBlocks = pktSize / (EncBlkSize + poly1305.TagSize)
142 if pktSize%(EncBlkSize+poly1305.TagSize) != 0 {
143 pktSize -= poly1305.TagSize
145 pktSize -= pktSizeBlocks * poly1305.TagSize
146 les = append(les, LE{"Size", pktSize})
147 ctx.LogD("rx", les, func(les LEs) string {
149 "Tossing %s/%s (%s)",
150 ctx.NodeName(job.PktEnc.Sender), pktName,
151 humanize.IBytes(uint64(pktSize)),
156 case PktTypeExec, PktTypeExecFat:
160 path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0})
161 handle := string(path[0])
162 args := make([]string, 0, len(path)-1)
163 for _, p := range path[1:] {
164 args = append(args, string(p))
166 argsStr := strings.Join(append([]string{handle}, args...), " ")
167 les = append(les, LE{"Type", "exec"}, LE{"Dst", argsStr})
168 sender := ctx.Neigh[*job.PktEnc.Sender]
169 cmdline, exists := sender.Exec[handle]
170 if !exists || len(cmdline) == 0 {
172 "rx-no-handle", les, errors.New("No handle found"),
173 func(les LEs) string {
175 "Tossing exec %s/%s (%s): %s",
176 ctx.NodeName(job.PktEnc.Sender), pktName,
177 humanize.IBytes(uint64(pktSize)), argsStr,
184 if pkt.Type == PktTypeExec {
185 if err = decompressor.Reset(pipeR); err != nil {
190 cmd := exec.Command(cmdline[0], append(cmdline[1:], args...)...)
193 "NNCP_SELF="+ctx.Self.Id.String(),
194 "NNCP_SENDER="+sender.Id.String(),
195 "NNCP_NICE="+strconv.Itoa(int(pkt.Nice)),
197 if pkt.Type == PktTypeExec {
198 cmd.Stdin = decompressor
202 output, err := cmd.Output()
204 ctx.LogE("rx-hande", les, err, func(les LEs) string {
206 "Tossing exec %s/%s (%s): %s: handling",
207 ctx.NodeName(job.PktEnc.Sender), pktName,
208 humanize.IBytes(uint64(pktSize)), argsStr,
214 if len(sendmail) > 0 && ctx.NotifyExec != nil {
215 notify, exists := ctx.NotifyExec[sender.Name+"."+handle]
217 notify, exists = ctx.NotifyExec["*."+handle]
222 append(sendmail[1:], notify.To)...,
224 cmd.Stdin = newNotification(notify, fmt.Sprintf(
225 "Exec from %s: %s", sender.Name, argsStr,
227 if err = cmd.Run(); err != nil {
228 ctx.LogE("rx-notify", les, err, func(les LEs) string {
230 "Tossing exec %s/%s (%s): %s: notifying",
231 ctx.NodeName(job.PktEnc.Sender), pktName,
232 humanize.IBytes(uint64(pktSize)), argsStr,
239 ctx.LogI("rx", les, func(les LEs) string {
241 "Got exec from %s to %s (%s)",
242 ctx.NodeName(job.PktEnc.Sender), argsStr,
243 humanize.IBytes(uint64(pktSize)),
248 if fd, err := os.Create(job.Path + SeenSuffix); err == nil {
249 fd.Close() // #nosec G104
252 if err = os.Remove(job.Path); err != nil {
253 ctx.LogE("rx-notify", les, err, func(les LEs) string {
255 "Tossing exec %s/%s (%s): %s: notifying",
256 ctx.NodeName(job.PktEnc.Sender), pktName,
257 humanize.IBytes(uint64(pktSize)), argsStr,
261 } else if ctx.HdrUsage {
262 os.Remove(job.Path + HdrSuffix)
270 dst := string(pkt.Path[:int(pkt.PathLen)])
271 les = append(les, LE{"Type", "file"}, LE{"Dst", dst})
272 if filepath.IsAbs(dst) {
274 "rx-non-rel", les, errors.New("non-relative destination path"),
275 func(les LEs) string {
277 "Tossing file %s/%s (%s): %s",
278 ctx.NodeName(job.PktEnc.Sender), pktName,
279 humanize.IBytes(uint64(pktSize)), dst,
286 incoming := ctx.Neigh[*job.PktEnc.Sender].Incoming
289 "rx-no-incoming", les, errors.New("incoming is not allowed"),
290 func(les LEs) string {
292 "Tossing file %s/%s (%s): %s",
293 ctx.NodeName(job.PktEnc.Sender), pktName,
294 humanize.IBytes(uint64(pktSize)), dst,
301 dir := filepath.Join(*incoming, path.Dir(dst))
302 if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
303 ctx.LogE("rx-mkdir", les, err, func(les LEs) string {
305 "Tossing file %s/%s (%s): %s: mkdir",
306 ctx.NodeName(job.PktEnc.Sender), pktName,
307 humanize.IBytes(uint64(pktSize)), dst,
314 tmp, err := TempFile(dir, "file")
316 ctx.LogE("rx-mktemp", les, err, func(les LEs) string {
318 "Tossing file %s/%s (%s): %s: mktemp",
319 ctx.NodeName(job.PktEnc.Sender), pktName,
320 humanize.IBytes(uint64(pktSize)), dst,
326 les = append(les, LE{"Tmp", tmp.Name()})
327 ctx.LogD("rx-tmp-created", les, func(les LEs) string {
329 "Tossing file %s/%s (%s): %s: created: %s",
330 ctx.NodeName(job.PktEnc.Sender), pktName,
331 humanize.IBytes(uint64(pktSize)), dst, tmp.Name(),
334 bufW := bufio.NewWriter(tmp)
335 if _, err = CopyProgressed(
336 bufW, pipeR, "Rx file",
337 append(les, LE{"FullSize", pktSize}),
340 ctx.LogE("rx-copy", les, err, func(les LEs) string {
342 "Tossing file %s/%s (%s): %s: copying",
343 ctx.NodeName(job.PktEnc.Sender), pktName,
344 humanize.IBytes(uint64(pktSize)), dst,
350 if err = bufW.Flush(); err != nil {
351 tmp.Close() // #nosec G104
352 ctx.LogE("rx-flush", les, err, func(les LEs) string {
354 "Tossing file %s/%s (%s): %s: flushing",
355 ctx.NodeName(job.PktEnc.Sender), pktName,
356 humanize.IBytes(uint64(pktSize)), dst,
362 if err = tmp.Sync(); err != nil {
363 tmp.Close() // #nosec G104
364 ctx.LogE("rx-sync", les, err, func(les LEs) string {
366 "Tossing file %s/%s (%s): %s: syncing",
367 ctx.NodeName(job.PktEnc.Sender), pktName,
368 humanize.IBytes(uint64(pktSize)), dst,
374 if err = tmp.Close(); err != nil {
375 ctx.LogE("rx-close", les, err, func(les LEs) string {
377 "Tossing file %s/%s (%s): %s: closing",
378 ctx.NodeName(job.PktEnc.Sender), pktName,
379 humanize.IBytes(uint64(pktSize)), dst,
385 dstPathOrig := filepath.Join(*incoming, dst)
386 dstPath := dstPathOrig
389 if _, err = os.Stat(dstPath); err != nil {
390 if os.IsNotExist(err) {
393 ctx.LogE("rx-stat", les, err, func(les LEs) string {
395 "Tossing file %s/%s (%s): %s: stating: %s",
396 ctx.NodeName(job.PktEnc.Sender), pktName,
397 humanize.IBytes(uint64(pktSize)), dst, dstPath,
403 dstPath = dstPathOrig + "." + strconv.Itoa(dstPathCtr)
406 if err = os.Rename(tmp.Name(), dstPath); err != nil {
407 ctx.LogE("rx-rename", les, err, func(les LEs) string {
409 "Tossing file %s/%s (%s): %s: renaming",
410 ctx.NodeName(job.PktEnc.Sender), pktName,
411 humanize.IBytes(uint64(pktSize)), dst,
416 if err = DirSync(*incoming); err != nil {
417 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
419 "Tossing file %s/%s (%s): %s: dirsyncing",
420 ctx.NodeName(job.PktEnc.Sender), pktName,
421 humanize.IBytes(uint64(pktSize)), dst,
426 les = les[:len(les)-1] // delete Tmp
428 ctx.LogI("rx", les, func(les LEs) string {
430 "Got file %s (%s) from %s",
431 dst, humanize.IBytes(uint64(pktSize)),
432 ctx.NodeName(job.PktEnc.Sender),
437 if fd, err := os.Create(job.Path + SeenSuffix); err == nil {
438 fd.Close() // #nosec G104
441 if err = os.Remove(job.Path); err != nil {
442 ctx.LogE("rx-remove", les, err, func(les LEs) string {
444 "Tossing file %s/%s (%s): %s: removing",
445 ctx.NodeName(job.PktEnc.Sender), pktName,
446 humanize.IBytes(uint64(pktSize)), dst,
450 } else if ctx.HdrUsage {
451 os.Remove(job.Path + HdrSuffix)
453 if len(sendmail) > 0 && ctx.NotifyFile != nil {
456 append(sendmail[1:], ctx.NotifyFile.To)...,
458 cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
459 "File from %s: %s (%s)",
460 ctx.Neigh[*job.PktEnc.Sender].Name,
462 humanize.IBytes(uint64(pktSize)),
464 if err = cmd.Run(); err != nil {
465 ctx.LogE("rx-notify", les, err, func(les LEs) string {
467 "Tossing file %s/%s (%s): %s: notifying",
468 ctx.NodeName(job.PktEnc.Sender), pktName,
469 humanize.IBytes(uint64(pktSize)), dst,
480 src := string(pkt.Path[:int(pkt.PathLen)])
481 les := append(les, LE{"Type", "freq"}, LE{"Src", src})
482 if filepath.IsAbs(src) {
484 "rx-non-rel", les, errors.New("non-relative source path"),
485 func(les LEs) string {
487 "Tossing freq %s/%s (%s): %s: notifying",
488 ctx.NodeName(job.PktEnc.Sender), pktName,
489 humanize.IBytes(uint64(pktSize)), src,
496 dstRaw, err := ioutil.ReadAll(pipeR)
498 ctx.LogE("rx-read", les, err, func(les LEs) string {
500 "Tossing freq %s/%s (%s): %s: reading",
501 ctx.NodeName(job.PktEnc.Sender), pktName,
502 humanize.IBytes(uint64(pktSize)), src,
508 dst := string(dstRaw)
509 les = append(les, LE{"Dst", dst})
510 sender := ctx.Neigh[*job.PktEnc.Sender]
511 freqPath := sender.FreqPath
514 "rx-no-freq", les, errors.New("freqing is not allowed"),
515 func(les LEs) string {
517 "Tossing freq %s/%s (%s): %s -> %s",
518 ctx.NodeName(job.PktEnc.Sender), pktName,
519 humanize.IBytes(uint64(pktSize)), src, dst,
530 filepath.Join(*freqPath, src),
537 ctx.LogE("rx-tx", les, err, func(les LEs) string {
539 "Tossing freq %s/%s (%s): %s -> %s: txing",
540 ctx.NodeName(job.PktEnc.Sender), pktName,
541 humanize.IBytes(uint64(pktSize)), src, dst,
548 ctx.LogI("rx", les, func(les LEs) string {
550 "Got file request %s to %s",
551 src, ctx.NodeName(job.PktEnc.Sender),
556 if fd, err := os.Create(job.Path + SeenSuffix); err == nil {
557 fd.Close() // #nosec G104
560 if err = os.Remove(job.Path); err != nil {
561 ctx.LogE("rx-remove", les, err, func(les LEs) string {
563 "Tossing freq %s/%s (%s): %s -> %s: removing",
564 ctx.NodeName(job.PktEnc.Sender), pktName,
565 humanize.IBytes(uint64(pktSize)), src, dst,
569 } else if ctx.HdrUsage {
570 os.Remove(job.Path + HdrSuffix)
572 if len(sendmail) > 0 && ctx.NotifyFreq != nil {
575 append(sendmail[1:], ctx.NotifyFreq.To)...,
577 cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
578 "Freq from %s: %s", sender.Name, src,
580 if err = cmd.Run(); err != nil {
581 ctx.LogE("rx-notify", les, err, func(les LEs) string {
583 "Tossing freq %s/%s (%s): %s -> %s: notifying",
584 ctx.NodeName(job.PktEnc.Sender), pktName,
585 humanize.IBytes(uint64(pktSize)), src, dst,
596 dst := new([blake2b.Size256]byte)
597 copy(dst[:], pkt.Path[:int(pkt.PathLen)])
598 nodeId := NodeId(*dst)
599 node, known := ctx.Neigh[nodeId]
600 les := append(les, LE{"Type", "trns"}, LE{"Dst", nodeId})
601 logMsg := func(les LEs) string {
603 "Tossing trns %s/%s (%s): %s",
604 ctx.NodeName(job.PktEnc.Sender),
606 humanize.IBytes(uint64(pktSize)),
611 ctx.LogE("rx-unknown", les, errors.New("unknown node"), logMsg)
615 ctx.LogD("rx-tx", les, logMsg)
617 if err = ctx.TxTrns(node, job.PktEnc.Nice, pktSize, pipeR); err != nil {
618 ctx.LogE("rx", les, err, func(les LEs) string {
619 return logMsg(les) + ": txing"
625 ctx.LogI("rx", les, func(les LEs) string {
627 "Got transitional packet from %s to %s (%s)",
628 ctx.NodeName(job.PktEnc.Sender),
629 ctx.NodeName(&nodeId),
630 humanize.IBytes(uint64(pktSize)),
635 if fd, err := os.Create(job.Path + SeenSuffix); err == nil {
636 fd.Close() // #nosec G104
639 if err = os.Remove(job.Path); err != nil {
640 ctx.LogE("rx", les, err, func(les LEs) string {
642 "Tossing trns %s/%s (%s): %s: removing",
643 ctx.NodeName(job.PktEnc.Sender),
645 humanize.IBytes(uint64(pktSize)),
646 ctx.NodeName(&nodeId),
650 } else if ctx.HdrUsage {
651 os.Remove(job.Path + HdrSuffix)
657 "rx-type-unknown", les, errors.New("unknown type"),
658 func(les LEs) string {
660 "Tossing %s/%s (%s)",
661 ctx.NodeName(job.PktEnc.Sender),
663 humanize.IBytes(uint64(pktSize)),
670 pipeR.Close() // #nosec G104
675 func (ctx *Ctx) AutoToss(
678 doSeen, noFile, noFreq, noExec, noTrns bool,
679 ) (chan struct{}, chan bool) {
680 finish := make(chan struct{})
681 badCode := make(chan bool)
691 time.Sleep(time.Second)
692 bad = !ctx.Toss(nodeId, nice, false, doSeen, noFile, noFreq, noExec, noTrns) || bad
695 return finish, badCode