]> Cypherpunks.ru repositories - nncp.git/blobdiff - src/toss.go
Sync directories for rename assurance
[nncp.git] / src / toss.go
index 81661387681db4d1bd9b0e4b40a9e5390bc3bf90..5bc8b7960b79b8d2e904405b1e1aacd7282672d8 100644 (file)
@@ -20,7 +20,7 @@ package nncp
 import (
        "bufio"
        "bytes"
-       "compress/zlib"
+       "encoding/base64"
        "fmt"
        "io"
        "io/ioutil"
@@ -33,8 +33,9 @@ import (
        "strconv"
        "strings"
 
-       "github.com/davecgh/go-xdr/xdr2"
+       xdr "github.com/davecgh/go-xdr/xdr2"
        "github.com/dustin/go-humanize"
+       "github.com/klauspost/compress/zstd"
        "golang.org/x/crypto/blake2b"
        "golang.org/x/crypto/poly1305"
 )
@@ -43,13 +44,22 @@ const (
        SeenSuffix = ".seen"
 )
 
-func newNotification(fromTo *FromToYAML, subject string) io.Reader {
-       return strings.NewReader(fmt.Sprintf(
-               "From: %s\nTo: %s\nSubject: %s\n",
-               fromTo.From,
-               fromTo.To,
-               mime.BEncoding.Encode("UTF-8", subject),
-       ))
+func newNotification(fromTo *FromToJSON, subject string, body []byte) io.Reader {
+       lines := []string{
+               "From: " + fromTo.From,
+               "To: " + fromTo.To,
+               "Subject: " + mime.BEncoding.Encode("UTF-8", subject),
+       }
+       if len(body) > 0 {
+               lines = append(lines, []string{
+                       "MIME-Version: 1.0",
+                       "Content-Type: text/plain; charset=utf-8",
+                       "Content-Transfer-Encoding: base64",
+                       "",
+                       base64.StdEncoding.EncodeToString(body),
+               }...)
+       }
+       return strings.NewReader(strings.Join(lines, "\n"))
 }
 
 func (ctx *Ctx) Toss(
@@ -58,6 +68,12 @@ func (ctx *Ctx) Toss(
        dryRun, doSeen, noFile, noFreq, noExec, noTrns bool,
 ) bool {
        isBad := false
+       sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
+       decompressor, err := zstd.NewReader(nil)
+       if err != nil {
+               panic(err)
+       }
+       defer decompressor.Close()
        for job := range ctx.Jobs(nodeId, TRx) {
                pktName := filepath.Base(job.Fd.Name())
                sds := SDS{"node": job.PktEnc.Sender, "pkt": pktName}
@@ -113,14 +129,11 @@ func (ctx *Ctx) Toss(
                        for _, p := range path[1:] {
                                args = append(args, string(p))
                        }
+                       argsStr := strings.Join(append([]string{handle}, args...), " ")
                        sds := SdsAdd(sds, SDS{
                                "type": "exec",
-                               "dst":  strings.Join(append([]string{handle}, args...), " "),
+                               "dst":  argsStr,
                        })
-                       decompressor, err := zlib.NewReader(pipeR)
-                       if err != nil {
-                               log.Fatalln(err)
-                       }
                        sender := ctx.Neigh[*job.PktEnc.Sender]
                        cmdline, exists := sender.Exec[handle]
                        if !exists || len(cmdline) == 0 {
@@ -128,6 +141,9 @@ func (ctx *Ctx) Toss(
                                isBad = true
                                goto Closing
                        }
+                       if err = decompressor.Reset(pipeR); err != nil {
+                               log.Fatalln(err)
+                       }
                        if !dryRun {
                                cmd := exec.Command(
                                        cmdline[0],
@@ -140,11 +156,28 @@ func (ctx *Ctx) Toss(
                                        "NNCP_NICE="+strconv.Itoa(int(pkt.Nice)),
                                )
                                cmd.Stdin = decompressor
-                               if err = cmd.Run(); err != nil {
+                               output, err := cmd.Output()
+                               if err != nil {
                                        ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "handle")
                                        isBad = true
                                        goto Closing
                                }
+                               if len(sendmail) > 0 && ctx.NotifyExec != nil {
+                                       notify, exists := ctx.NotifyExec[sender.Name+"."+handle]
+                                       if !exists {
+                                               notify, exists = ctx.NotifyExec["*."+handle]
+                                       }
+                                       if exists {
+                                               cmd := exec.Command(
+                                                       sendmail[0],
+                                                       append(sendmail[1:len(sendmail)], notify.To)...,
+                                               )
+                                               cmd.Stdin = newNotification(notify, fmt.Sprintf(
+                                                       "Exec from %s: %s", sender.Name, argsStr,
+                                               ), output)
+                                               cmd.Run()
+                                       }
+                               }
                        }
                        ctx.LogI("rx", sds, "")
                        if !dryRun {
@@ -228,6 +261,10 @@ func (ctx *Ctx) Toss(
                                        ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "rename")
                                        isBad = true
                                }
+                               if err = DirSync(*incoming); err != nil {
+                                       ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "sync")
+                                       isBad = true
+                               }
                                delete(sds, "tmp")
                        }
                        ctx.LogI("rx", sds, "")
@@ -241,8 +278,7 @@ func (ctx *Ctx) Toss(
                                        ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
                                        isBad = true
                                }
-                               sendmail, exists := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
-                               if exists && len(sendmail) > 0 && ctx.NotifyFile != nil {
+                               if len(sendmail) > 0 && ctx.NotifyFile != nil {
                                        cmd := exec.Command(
                                                sendmail[0],
                                                append(sendmail[1:len(sendmail)], ctx.NotifyFile.To)...,
@@ -252,7 +288,7 @@ func (ctx *Ctx) Toss(
                                                ctx.Neigh[*job.PktEnc.Sender].Name,
                                                dst,
                                                humanize.IBytes(uint64(pktSize)),
-                                       ))
+                                       ), nil)
                                        cmd.Run()
                                }
                        }
@@ -276,31 +312,22 @@ func (ctx *Ctx) Toss(
                        dst := string(dstRaw)
                        sds["dst"] = dst
                        sender := ctx.Neigh[*job.PktEnc.Sender]
-                       freq := sender.Freq
-                       if freq == nil {
+                       freqPath := sender.FreqPath
+                       if freqPath == nil {
                                ctx.LogE("rx", sds, "freqing is not allowed")
                                isBad = true
                                goto Closing
                        }
                        if !dryRun {
-                               if sender.FreqChunked == 0 {
-                                       err = ctx.TxFile(
-                                               sender,
-                                               pkt.Nice,
-                                               filepath.Join(*freq, src),
-                                               dst,
-                                               sender.FreqMinSize,
-                                       )
-                               } else {
-                                       err = ctx.TxFileChunked(
-                                               sender,
-                                               pkt.Nice,
-                                               filepath.Join(*freq, src),
-                                               dst,
-                                               sender.FreqMinSize,
-                                               sender.FreqChunked,
-                                       )
-                               }
+                               err = ctx.TxFile(
+                                       sender,
+                                       pkt.Nice,
+                                       filepath.Join(*freqPath, src),
+                                       dst,
+                                       sender.FreqChunked,
+                                       sender.FreqMinSize,
+                                       sender.FreqMaxSize,
+                               )
                                if err != nil {
                                        ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx file")
                                        isBad = true
@@ -318,17 +345,14 @@ func (ctx *Ctx) Toss(
                                        ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
                                        isBad = true
                                }
-                               sendmail, exists := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
-                               if exists && len(sendmail) > 0 && ctx.NotifyFreq != nil {
+                               if len(sendmail) > 0 && ctx.NotifyFreq != nil {
                                        cmd := exec.Command(
                                                sendmail[0],
                                                append(sendmail[1:len(sendmail)], ctx.NotifyFreq.To)...,
                                        )
                                        cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
-                                               "Freq from %s: %s",
-                                               ctx.Neigh[*job.PktEnc.Sender].Name,
-                                               src,
-                                       ))
+                                               "Freq from %s: %s", sender.Name, src,
+                                       ), nil)
                                        cmd.Run()
                                }
                        }