2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
38 xdr "github.com/davecgh/go-xdr/xdr2"
39 "github.com/dustin/go-humanize"
40 "github.com/klauspost/compress/zstd"
41 "golang.org/x/crypto/blake2b"
42 "golang.org/x/crypto/poly1305"
49 func newNotification(fromTo *FromToJSON, subject string, body []byte) io.Reader {
51 "From: " + fromTo.From,
53 "Subject: " + mime.BEncoding.Encode("UTF-8", subject),
56 lines = append(lines, []string{
58 "Content-Type: text/plain; charset=utf-8",
59 "Content-Transfer-Encoding: base64",
61 base64.StdEncoding.EncodeToString(body),
64 return strings.NewReader(strings.Join(lines, "\n"))
70 dryRun, doSeen, noFile, noFreq, noExec, noTrns bool,
72 dirLock, err := ctx.LockDir(nodeId, "toss")
74 ctx.LogE("rx", LEs{}, err, "lock")
77 defer ctx.UnlockDir(dirLock)
79 sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
80 decompressor, err := zstd.NewReader(nil)
84 defer decompressor.Close()
85 for job := range ctx.Jobs(nodeId, TRx) {
86 pktName := filepath.Base(job.Path)
87 les := LEs{{"Node", job.PktEnc.Sender}, {"Pkt", pktName}}
88 if job.PktEnc.Nice > nice {
89 ctx.LogD("rx", append(les, LE{"Nice", int(job.PktEnc.Nice)}), "too nice")
92 fd, err := os.Open(job.Path)
94 ctx.LogE("rx", les, err, "open")
99 pipeR, pipeW := io.Pipe()
100 go func(job Job) error {
101 pipeWB := bufio.NewWriter(pipeW)
102 _, _, err := PktEncRead(ctx.Self, ctx.Neigh, bufio.NewReader(fd), pipeWB)
103 fd.Close() // #nosec G104
105 return pipeW.CloseWithError(err)
107 if err = pipeWB.Flush(); err != nil {
108 return pipeW.CloseWithError(err)
114 var pktSizeBlocks int64
115 if _, err = xdr.Unmarshal(pipeR, &pkt); err != nil {
116 ctx.LogE("rx", les, err, "unmarshal")
120 pktSize = job.Size - PktEncOverhead - PktOverhead - PktSizeOverhead
121 pktSizeBlocks = pktSize / (EncBlkSize + poly1305.TagSize)
122 if pktSize%(EncBlkSize+poly1305.TagSize) != 0 {
123 pktSize -= poly1305.TagSize
125 pktSize -= pktSizeBlocks * poly1305.TagSize
126 les = append(les, LE{"Size", pktSize})
127 ctx.LogD("rx", les, "taken")
129 case PktTypeExec, PktTypeExecFat:
133 path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0})
134 handle := string(path[0])
135 args := make([]string, 0, len(path)-1)
136 for _, p := range path[1:] {
137 args = append(args, string(p))
139 argsStr := strings.Join(append([]string{handle}, args...), " ")
140 les = append(les, LEs{
144 sender := ctx.Neigh[*job.PktEnc.Sender]
145 cmdline, exists := sender.Exec[handle]
146 if !exists || len(cmdline) == 0 {
147 ctx.LogE("rx", les, errors.New("No handle found"), "")
151 if pkt.Type == PktTypeExec {
152 if err = decompressor.Reset(pipeR); err != nil {
159 append(cmdline[1:], args...)...,
163 "NNCP_SELF="+ctx.Self.Id.String(),
164 "NNCP_SENDER="+sender.Id.String(),
165 "NNCP_NICE="+strconv.Itoa(int(pkt.Nice)),
167 if pkt.Type == PktTypeExec {
168 cmd.Stdin = decompressor
172 output, err := cmd.Output()
174 ctx.LogE("rx", les, err, "handle")
178 if len(sendmail) > 0 && ctx.NotifyExec != nil {
179 notify, exists := ctx.NotifyExec[sender.Name+"."+handle]
181 notify, exists = ctx.NotifyExec["*."+handle]
186 append(sendmail[1:], notify.To)...,
188 cmd.Stdin = newNotification(notify, fmt.Sprintf(
189 "Exec from %s: %s", sender.Name, argsStr,
191 if err = cmd.Run(); err != nil {
192 ctx.LogE("rx", les, err, "notify")
197 ctx.LogI("rx", les, "")
200 if fd, err := os.Create(job.Path + SeenSuffix); err == nil {
201 fd.Close() // #nosec G104
204 if err = os.Remove(job.Path); err != nil {
205 ctx.LogE("rx", les, err, "remove")
207 } else if ctx.HdrUsage {
208 os.Remove(job.Path + HdrSuffix)
215 dst := string(pkt.Path[:int(pkt.PathLen)])
216 les = append(les, LEs{{"Type", "file"}, {"Dst", dst}}...)
217 if filepath.IsAbs(dst) {
218 ctx.LogE("rx", les, errors.New("non-relative destination path"), "")
222 incoming := ctx.Neigh[*job.PktEnc.Sender].Incoming
224 ctx.LogE("rx", les, errors.New("incoming is not allowed"), "")
228 dir := filepath.Join(*incoming, path.Dir(dst))
229 if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
230 ctx.LogE("rx", les, err, "mkdir")
235 tmp, err := TempFile(dir, "file")
237 ctx.LogE("rx", les, err, "mktemp")
241 les = append(les, LE{"Tmp", tmp.Name()})
242 ctx.LogD("rx", les, "created")
243 bufW := bufio.NewWriter(tmp)
244 if _, err = CopyProgressed(
245 bufW, pipeR, "Rx file",
246 append(les, LE{"FullSize", pktSize}),
249 ctx.LogE("rx", les, err, "copy")
253 if err = bufW.Flush(); err != nil {
254 tmp.Close() // #nosec G104
255 ctx.LogE("rx", les, err, "copy")
259 if err = tmp.Sync(); err != nil {
260 tmp.Close() // #nosec G104
261 ctx.LogE("rx", les, err, "copy")
265 if err = tmp.Close(); err != nil {
266 ctx.LogE("rx", les, err, "copy")
270 dstPathOrig := filepath.Join(*incoming, dst)
271 dstPath := dstPathOrig
274 if _, err = os.Stat(dstPath); err != nil {
275 if os.IsNotExist(err) {
278 ctx.LogE("rx", les, err, "stat")
282 dstPath = dstPathOrig + "." + strconv.Itoa(dstPathCtr)
285 if err = os.Rename(tmp.Name(), dstPath); err != nil {
286 ctx.LogE("rx", les, err, "rename")
289 if err = DirSync(*incoming); err != nil {
290 ctx.LogE("rx", les, err, "sync")
293 les = les[:len(les)-1] // delete Tmp
295 ctx.LogI("rx", les, "")
298 if fd, err := os.Create(job.Path + SeenSuffix); err == nil {
299 fd.Close() // #nosec G104
302 if err = os.Remove(job.Path); err != nil {
303 ctx.LogE("rx", les, err, "remove")
305 } else if ctx.HdrUsage {
306 os.Remove(job.Path + HdrSuffix)
308 if len(sendmail) > 0 && ctx.NotifyFile != nil {
311 append(sendmail[1:], ctx.NotifyFile.To)...,
313 cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
314 "File from %s: %s (%s)",
315 ctx.Neigh[*job.PktEnc.Sender].Name,
317 humanize.IBytes(uint64(pktSize)),
319 if err = cmd.Run(); err != nil {
320 ctx.LogE("rx", les, err, "notify")
328 src := string(pkt.Path[:int(pkt.PathLen)])
329 if filepath.IsAbs(src) {
330 ctx.LogE("rx", les, errors.New("non-relative source path"), "")
334 les := append(les, LEs{{"Type", "freq"}, {"Src", src}}...)
335 dstRaw, err := ioutil.ReadAll(pipeR)
337 ctx.LogE("rx", les, err, "read")
341 dst := string(dstRaw)
342 les = append(les, LE{"Dst", dst})
343 sender := ctx.Neigh[*job.PktEnc.Sender]
344 freqPath := sender.FreqPath
346 ctx.LogE("rx", les, errors.New("freqing is not allowed"), "")
354 filepath.Join(*freqPath, src),
361 ctx.LogE("rx", les, err, "tx file")
366 ctx.LogI("rx", les, "")
369 if fd, err := os.Create(job.Path + SeenSuffix); err == nil {
370 fd.Close() // #nosec G104
373 if err = os.Remove(job.Path); err != nil {
374 ctx.LogE("rx", les, err, "remove")
376 } else if ctx.HdrUsage {
377 os.Remove(job.Path + HdrSuffix)
379 if len(sendmail) > 0 && ctx.NotifyFreq != nil {
382 append(sendmail[1:], ctx.NotifyFreq.To)...,
384 cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
385 "Freq from %s: %s", sender.Name, src,
387 if err = cmd.Run(); err != nil {
388 ctx.LogE("rx", les, err, "notify")
396 dst := new([blake2b.Size256]byte)
397 copy(dst[:], pkt.Path[:int(pkt.PathLen)])
398 nodeId := NodeId(*dst)
399 node, known := ctx.Neigh[nodeId]
400 les := append(les, LEs{{"Type", "trns"}, {"Dst", nodeId}}...)
402 ctx.LogE("rx", les, errors.New("unknown node"), "")
406 ctx.LogD("rx", les, "taken")
408 if err = ctx.TxTrns(node, job.PktEnc.Nice, pktSize, pipeR); err != nil {
409 ctx.LogE("rx", les, err, "tx trns")
414 ctx.LogI("rx", les, "")
417 if fd, err := os.Create(job.Path + SeenSuffix); err == nil {
418 fd.Close() // #nosec G104
421 if err = os.Remove(job.Path); err != nil {
422 ctx.LogE("rx", les, err, "remove")
424 } else if ctx.HdrUsage {
425 os.Remove(job.Path + HdrSuffix)
429 ctx.LogE("rx", les, errors.New("unknown type"), "")
433 pipeR.Close() // #nosec G104
438 func (ctx *Ctx) AutoToss(
441 doSeen, noFile, noFreq, noExec, noTrns bool,
442 ) (chan struct{}, chan bool) {
443 finish := make(chan struct{})
444 badCode := make(chan bool)
454 time.Sleep(time.Second)
455 bad = !ctx.Toss(nodeId, nice, false, doSeen, noFile, noFreq, noExec, noTrns)
458 return finish, badCode