2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
38 xdr "github.com/davecgh/go-xdr/xdr2"
39 "github.com/dustin/go-humanize"
40 "github.com/klauspost/compress/zstd"
41 "golang.org/x/crypto/poly1305"
48 func newNotification(fromTo *FromToJSON, subject string, body []byte) io.Reader {
50 "From: " + fromTo.From,
52 "Subject: " + mime.BEncoding.Encode("UTF-8", subject),
58 "Content-Type: text/plain; charset=utf-8",
59 "Content-Transfer-Encoding: base64",
61 base64.StdEncoding.EncodeToString(body),
64 return strings.NewReader(strings.Join(lines, "\n"))
70 dryRun, doSeen, noFile, noFreq, noExec, noTrns bool,
72 dirLock, err := ctx.LockDir(nodeId, "toss")
76 defer ctx.UnlockDir(dirLock)
78 sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
79 decompressor, err := zstd.NewReader(nil)
83 defer decompressor.Close()
84 for job := range ctx.Jobs(nodeId, TRx) {
85 pktName := filepath.Base(job.Path)
87 {"Node", job.PktEnc.Sender},
89 {"Nice", int(job.PktEnc.Nice)},
91 if job.PktEnc.Nice > nice {
92 ctx.LogD("rx-too-nice", les, func(les LEs) string {
94 "Tossing %s/%s: too nice: %s",
95 ctx.NodeName(job.PktEnc.Sender), pktName,
96 NicenessFmt(job.PktEnc.Nice),
101 fd, err := os.Open(job.Path)
103 ctx.LogE("rx-open", les, err, func(les LEs) string {
105 "Tossing %s/%s: opening %s",
106 ctx.NodeName(job.PktEnc.Sender), pktName, job.Path,
113 pipeR, pipeW := io.Pipe()
114 go func(job Job) error {
115 pipeWB := bufio.NewWriter(pipeW)
116 _, _, err := PktEncRead(ctx.Self, ctx.Neigh, bufio.NewReader(fd), pipeWB)
117 fd.Close() // #nosec G104
119 return pipeW.CloseWithError(err)
121 if err = pipeWB.Flush(); err != nil {
122 return pipeW.CloseWithError(err)
128 var pktSizeBlocks int64
129 if _, err = xdr.Unmarshal(pipeR, &pkt); err != nil {
130 ctx.LogE("rx-unmarshal", les, err, func(les LEs) string {
132 "Tossing %s/%s: unmarshal",
133 ctx.NodeName(job.PktEnc.Sender), pktName,
139 pktSize = job.Size - PktEncOverhead - PktOverhead - PktSizeOverhead
140 pktSizeBlocks = pktSize / (EncBlkSize + poly1305.TagSize)
141 if pktSize%(EncBlkSize+poly1305.TagSize) != 0 {
142 pktSize -= poly1305.TagSize
144 pktSize -= pktSizeBlocks * poly1305.TagSize
145 les = append(les, LE{"Size", pktSize})
146 ctx.LogD("rx", les, func(les LEs) string {
148 "Tossing %s/%s (%s)",
149 ctx.NodeName(job.PktEnc.Sender), pktName,
150 humanize.IBytes(uint64(pktSize)),
155 case PktTypeExec, PktTypeExecFat:
159 path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0})
160 handle := string(path[0])
161 args := make([]string, 0, len(path)-1)
162 for _, p := range path[1:] {
163 args = append(args, string(p))
165 argsStr := strings.Join(append([]string{handle}, args...), " ")
166 les = append(les, LE{"Type", "exec"}, LE{"Dst", argsStr})
167 sender := ctx.Neigh[*job.PktEnc.Sender]
168 cmdline, exists := sender.Exec[handle]
169 if !exists || len(cmdline) == 0 {
171 "rx-no-handle", les, errors.New("No handle found"),
172 func(les LEs) string {
174 "Tossing exec %s/%s (%s): %s",
175 ctx.NodeName(job.PktEnc.Sender), pktName,
176 humanize.IBytes(uint64(pktSize)), argsStr,
183 if pkt.Type == PktTypeExec {
184 if err = decompressor.Reset(pipeR); err != nil {
189 cmd := exec.Command(cmdline[0], append(cmdline[1:], args...)...)
192 "NNCP_SELF="+ctx.Self.Id.String(),
193 "NNCP_SENDER="+sender.Id.String(),
194 "NNCP_NICE="+strconv.Itoa(int(pkt.Nice)),
196 if pkt.Type == PktTypeExec {
197 cmd.Stdin = decompressor
201 output, err := cmd.Output()
203 ctx.LogE("rx-hande", les, err, func(les LEs) string {
205 "Tossing exec %s/%s (%s): %s: handling",
206 ctx.NodeName(job.PktEnc.Sender), pktName,
207 humanize.IBytes(uint64(pktSize)), argsStr,
213 if len(sendmail) > 0 && ctx.NotifyExec != nil {
214 notify, exists := ctx.NotifyExec[sender.Name+"."+handle]
216 notify, exists = ctx.NotifyExec["*."+handle]
221 append(sendmail[1:], notify.To)...,
223 cmd.Stdin = newNotification(notify, fmt.Sprintf(
224 "Exec from %s: %s", sender.Name, argsStr,
226 if err = cmd.Run(); err != nil {
227 ctx.LogE("rx-notify", les, err, func(les LEs) string {
229 "Tossing exec %s/%s (%s): %s: notifying",
230 ctx.NodeName(job.PktEnc.Sender), pktName,
231 humanize.IBytes(uint64(pktSize)), argsStr,
238 ctx.LogI("rx", les, func(les LEs) string {
240 "Got exec from %s to %s (%s)",
241 ctx.NodeName(job.PktEnc.Sender), argsStr,
242 humanize.IBytes(uint64(pktSize)),
247 if fd, err := os.Create(job.Path + SeenSuffix); err == nil {
248 fd.Close() // #nosec G104
251 if err = os.Remove(job.Path); err != nil {
252 ctx.LogE("rx-notify", les, err, func(les LEs) string {
254 "Tossing exec %s/%s (%s): %s: notifying",
255 ctx.NodeName(job.PktEnc.Sender), pktName,
256 humanize.IBytes(uint64(pktSize)), argsStr,
260 } else if ctx.HdrUsage {
261 os.Remove(job.Path + HdrSuffix)
269 dst := string(pkt.Path[:int(pkt.PathLen)])
270 les = append(les, LE{"Type", "file"}, LE{"Dst", dst})
271 if filepath.IsAbs(dst) {
273 "rx-non-rel", les, errors.New("non-relative destination path"),
274 func(les LEs) string {
276 "Tossing file %s/%s (%s): %s",
277 ctx.NodeName(job.PktEnc.Sender), pktName,
278 humanize.IBytes(uint64(pktSize)), dst,
285 incoming := ctx.Neigh[*job.PktEnc.Sender].Incoming
288 "rx-no-incoming", les, errors.New("incoming is not allowed"),
289 func(les LEs) string {
291 "Tossing file %s/%s (%s): %s",
292 ctx.NodeName(job.PktEnc.Sender), pktName,
293 humanize.IBytes(uint64(pktSize)), dst,
300 dir := filepath.Join(*incoming, path.Dir(dst))
301 if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
302 ctx.LogE("rx-mkdir", les, err, func(les LEs) string {
304 "Tossing file %s/%s (%s): %s: mkdir",
305 ctx.NodeName(job.PktEnc.Sender), pktName,
306 humanize.IBytes(uint64(pktSize)), dst,
313 tmp, err := TempFile(dir, "file")
315 ctx.LogE("rx-mktemp", les, err, func(les LEs) string {
317 "Tossing file %s/%s (%s): %s: mktemp",
318 ctx.NodeName(job.PktEnc.Sender), pktName,
319 humanize.IBytes(uint64(pktSize)), dst,
325 les = append(les, LE{"Tmp", tmp.Name()})
326 ctx.LogD("rx-tmp-created", les, func(les LEs) string {
328 "Tossing file %s/%s (%s): %s: created: %s",
329 ctx.NodeName(job.PktEnc.Sender), pktName,
330 humanize.IBytes(uint64(pktSize)), dst, tmp.Name(),
333 bufW := bufio.NewWriter(tmp)
334 if _, err = CopyProgressed(
335 bufW, pipeR, "Rx file",
336 append(les, LE{"FullSize", pktSize}),
339 ctx.LogE("rx-copy", les, err, func(les LEs) string {
341 "Tossing file %s/%s (%s): %s: copying",
342 ctx.NodeName(job.PktEnc.Sender), pktName,
343 humanize.IBytes(uint64(pktSize)), dst,
349 if err = bufW.Flush(); err != nil {
350 tmp.Close() // #nosec G104
351 ctx.LogE("rx-flush", les, err, func(les LEs) string {
353 "Tossing file %s/%s (%s): %s: flushing",
354 ctx.NodeName(job.PktEnc.Sender), pktName,
355 humanize.IBytes(uint64(pktSize)), dst,
361 if err = tmp.Sync(); err != nil {
362 tmp.Close() // #nosec G104
363 ctx.LogE("rx-sync", les, err, func(les LEs) string {
365 "Tossing file %s/%s (%s): %s: syncing",
366 ctx.NodeName(job.PktEnc.Sender), pktName,
367 humanize.IBytes(uint64(pktSize)), dst,
373 if err = tmp.Close(); err != nil {
374 ctx.LogE("rx-close", les, err, func(les LEs) string {
376 "Tossing file %s/%s (%s): %s: closing",
377 ctx.NodeName(job.PktEnc.Sender), pktName,
378 humanize.IBytes(uint64(pktSize)), dst,
384 dstPathOrig := filepath.Join(*incoming, dst)
385 dstPath := dstPathOrig
388 if _, err = os.Stat(dstPath); err != nil {
389 if os.IsNotExist(err) {
392 ctx.LogE("rx-stat", les, err, func(les LEs) string {
394 "Tossing file %s/%s (%s): %s: stating: %s",
395 ctx.NodeName(job.PktEnc.Sender), pktName,
396 humanize.IBytes(uint64(pktSize)), dst, dstPath,
402 dstPath = dstPathOrig + "." + strconv.Itoa(dstPathCtr)
405 if err = os.Rename(tmp.Name(), dstPath); err != nil {
406 ctx.LogE("rx-rename", les, err, func(les LEs) string {
408 "Tossing file %s/%s (%s): %s: renaming",
409 ctx.NodeName(job.PktEnc.Sender), pktName,
410 humanize.IBytes(uint64(pktSize)), dst,
415 if err = DirSync(*incoming); err != nil {
416 ctx.LogE("rx-dirsync", les, err, func(les LEs) string {
418 "Tossing file %s/%s (%s): %s: dirsyncing",
419 ctx.NodeName(job.PktEnc.Sender), pktName,
420 humanize.IBytes(uint64(pktSize)), dst,
425 les = les[:len(les)-1] // delete Tmp
427 ctx.LogI("rx", les, func(les LEs) string {
429 "Got file %s (%s) from %s",
430 dst, humanize.IBytes(uint64(pktSize)),
431 ctx.NodeName(job.PktEnc.Sender),
436 if fd, err := os.Create(job.Path + SeenSuffix); err == nil {
437 fd.Close() // #nosec G104
440 if err = os.Remove(job.Path); err != nil {
441 ctx.LogE("rx-remove", les, err, func(les LEs) string {
443 "Tossing file %s/%s (%s): %s: removing",
444 ctx.NodeName(job.PktEnc.Sender), pktName,
445 humanize.IBytes(uint64(pktSize)), dst,
449 } else if ctx.HdrUsage {
450 os.Remove(job.Path + HdrSuffix)
452 if len(sendmail) > 0 && ctx.NotifyFile != nil {
455 append(sendmail[1:], ctx.NotifyFile.To)...,
457 cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
458 "File from %s: %s (%s)",
459 ctx.Neigh[*job.PktEnc.Sender].Name,
461 humanize.IBytes(uint64(pktSize)),
463 if err = cmd.Run(); err != nil {
464 ctx.LogE("rx-notify", les, err, func(les LEs) string {
466 "Tossing file %s/%s (%s): %s: notifying",
467 ctx.NodeName(job.PktEnc.Sender), pktName,
468 humanize.IBytes(uint64(pktSize)), dst,
479 src := string(pkt.Path[:int(pkt.PathLen)])
480 les := append(les, LE{"Type", "freq"}, LE{"Src", src})
481 if filepath.IsAbs(src) {
483 "rx-non-rel", les, errors.New("non-relative source path"),
484 func(les LEs) string {
486 "Tossing freq %s/%s (%s): %s: notifying",
487 ctx.NodeName(job.PktEnc.Sender), pktName,
488 humanize.IBytes(uint64(pktSize)), src,
495 dstRaw, err := ioutil.ReadAll(pipeR)
497 ctx.LogE("rx-read", les, err, func(les LEs) string {
499 "Tossing freq %s/%s (%s): %s: reading",
500 ctx.NodeName(job.PktEnc.Sender), pktName,
501 humanize.IBytes(uint64(pktSize)), src,
507 dst := string(dstRaw)
508 les = append(les, LE{"Dst", dst})
509 sender := ctx.Neigh[*job.PktEnc.Sender]
510 freqPath := sender.FreqPath
513 "rx-no-freq", les, errors.New("freqing is not allowed"),
514 func(les LEs) string {
516 "Tossing freq %s/%s (%s): %s -> %s",
517 ctx.NodeName(job.PktEnc.Sender), pktName,
518 humanize.IBytes(uint64(pktSize)), src, dst,
529 filepath.Join(*freqPath, src),
536 ctx.LogE("rx-tx", les, err, func(les LEs) string {
538 "Tossing freq %s/%s (%s): %s -> %s: txing",
539 ctx.NodeName(job.PktEnc.Sender), pktName,
540 humanize.IBytes(uint64(pktSize)), src, dst,
547 ctx.LogI("rx", les, func(les LEs) string {
549 "Got file request %s to %s",
550 src, ctx.NodeName(job.PktEnc.Sender),
555 if fd, err := os.Create(job.Path + SeenSuffix); err == nil {
556 fd.Close() // #nosec G104
559 if err = os.Remove(job.Path); err != nil {
560 ctx.LogE("rx-remove", les, err, func(les LEs) string {
562 "Tossing freq %s/%s (%s): %s -> %s: removing",
563 ctx.NodeName(job.PktEnc.Sender), pktName,
564 humanize.IBytes(uint64(pktSize)), src, dst,
568 } else if ctx.HdrUsage {
569 os.Remove(job.Path + HdrSuffix)
571 if len(sendmail) > 0 && ctx.NotifyFreq != nil {
574 append(sendmail[1:], ctx.NotifyFreq.To)...,
576 cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
577 "Freq from %s: %s", sender.Name, src,
579 if err = cmd.Run(); err != nil {
580 ctx.LogE("rx-notify", les, err, func(les LEs) string {
582 "Tossing freq %s/%s (%s): %s -> %s: notifying",
583 ctx.NodeName(job.PktEnc.Sender), pktName,
584 humanize.IBytes(uint64(pktSize)), src, dst,
595 dst := new([MTHSize]byte)
596 copy(dst[:], pkt.Path[:int(pkt.PathLen)])
597 nodeId := NodeId(*dst)
598 node, known := ctx.Neigh[nodeId]
599 les := append(les, LE{"Type", "trns"}, LE{"Dst", nodeId})
600 logMsg := func(les LEs) string {
602 "Tossing trns %s/%s (%s): %s",
603 ctx.NodeName(job.PktEnc.Sender),
605 humanize.IBytes(uint64(pktSize)),
610 ctx.LogE("rx-unknown", les, errors.New("unknown node"), logMsg)
614 ctx.LogD("rx-tx", les, logMsg)
616 if err = ctx.TxTrns(node, job.PktEnc.Nice, pktSize, pipeR); err != nil {
617 ctx.LogE("rx", les, err, func(les LEs) string {
618 return logMsg(les) + ": txing"
624 ctx.LogI("rx", les, func(les LEs) string {
626 "Got transitional packet from %s to %s (%s)",
627 ctx.NodeName(job.PktEnc.Sender),
628 ctx.NodeName(&nodeId),
629 humanize.IBytes(uint64(pktSize)),
634 if fd, err := os.Create(job.Path + SeenSuffix); err == nil {
635 fd.Close() // #nosec G104
638 if err = os.Remove(job.Path); err != nil {
639 ctx.LogE("rx", les, err, func(les LEs) string {
641 "Tossing trns %s/%s (%s): %s: removing",
642 ctx.NodeName(job.PktEnc.Sender),
644 humanize.IBytes(uint64(pktSize)),
645 ctx.NodeName(&nodeId),
649 } else if ctx.HdrUsage {
650 os.Remove(job.Path + HdrSuffix)
656 "rx-type-unknown", les, errors.New("unknown type"),
657 func(les LEs) string {
659 "Tossing %s/%s (%s)",
660 ctx.NodeName(job.PktEnc.Sender),
662 humanize.IBytes(uint64(pktSize)),
669 pipeR.Close() // #nosec G104
674 func (ctx *Ctx) AutoToss(
677 doSeen, noFile, noFreq, noExec, noTrns bool,
678 ) (chan struct{}, chan bool) {
679 finish := make(chan struct{})
680 badCode := make(chan bool)
690 time.Sleep(time.Second)
691 bad = !ctx.Toss(nodeId, nice, false, doSeen, noFile, noFreq, noExec, noTrns) || bad
694 return finish, badCode