]> Cypherpunks.ru repositories - nncp.git/blobdiff - src/cypherpunks.ru/nncp/toss.go
NNCP_SENDER envvar while sendmail calling
[nncp.git] / src / cypherpunks.ru / nncp / toss.go
index c426987a747c54a754b132a44ac57e66753eedf6..962b240152626e4f9299cb4a9584e47b2797677d 100644 (file)
@@ -36,7 +36,6 @@ import (
        "github.com/davecgh/go-xdr/xdr2"
        "github.com/dustin/go-humanize"
        "golang.org/x/crypto/blake2b"
-       "golang.org/x/sys/unix"
 )
 
 func newNotification(fromTo *FromToYAML, subject string) io.Reader {
@@ -48,40 +47,8 @@ func newNotification(fromTo *FromToYAML, subject string) io.Reader {
        ))
 }
 
-func (ctx *Ctx) LockDir(nodeId *NodeId, xx TRxTx) (*os.File, error) {
-       ctx.ensureRxDir(nodeId)
-       lockPath := filepath.Join(ctx.Spool, nodeId.String(), string(xx)) + ".lock"
-       dirLock, err := os.OpenFile(
-               lockPath,
-               os.O_CREATE|os.O_WRONLY,
-               os.FileMode(0600),
-       )
-       if err != nil {
-               ctx.LogE("lockdir", SDS{"path": lockPath, "err": err}, "")
-               return nil, err
-       }
-       err = unix.Flock(int(dirLock.Fd()), unix.LOCK_EX|unix.LOCK_NB)
-       if err != nil {
-               ctx.LogE("lockdir", SDS{"path": lockPath, "err": err}, "")
-               dirLock.Close()
-               return nil, err
-       }
-       return dirLock, nil
-}
-
-func (ctx *Ctx) UnlockDir(fd *os.File) {
-       if fd != nil {
-               unix.Flock(int(fd.Fd()), unix.LOCK_UN)
-               fd.Close()
-       }
-}
-
-func (ctx *Ctx) Toss(nodeId *NodeId, nice uint8) {
-       dirLock, err := ctx.LockDir(nodeId, TRx)
-       if err != nil {
-               return
-       }
-       defer ctx.UnlockDir(dirLock)
+func (ctx *Ctx) Toss(nodeId *NodeId, nice uint8, dryRun bool) bool {
+       isBad := false
        for job := range ctx.Jobs(nodeId, TRx) {
                pktName := filepath.Base(job.Fd.Name())
                sds := SDS{"node": job.PktEnc.Sender, "pkt": pktName}
@@ -95,7 +62,7 @@ func (ctx *Ctx) Toss(nodeId *NodeId, nice uint8) {
                errs := make(chan error, 1)
                go func(job Job) {
                        pipeWB := bufio.NewWriter(pipeW)
-                       _, err := PktEncRead(
+                       _, _, err := PktEncRead(
                                ctx.Self,
                                ctx.Neigh,
                                bufio.NewReader(job.Fd),
@@ -114,6 +81,7 @@ func (ctx *Ctx) Toss(nodeId *NodeId, nice uint8) {
                var pktSize int64
                if _, err = xdr.Unmarshal(pipeR, &pkt); err != nil {
                        ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "unmarshal")
+                       isBad = true
                        goto Closing
                }
                pktSize = job.Size - PktEncOverhead - PktOverhead
@@ -130,93 +98,128 @@ func (ctx *Ctx) Toss(nodeId *NodeId, nice uint8) {
                        if err != nil {
                                log.Fatalln(err)
                        }
-                       sendmail := ctx.Neigh[*job.PktEnc.Sender].Sendmail
-                       cmd := exec.Command(
-                               sendmail[0],
-                               append(
-                                       sendmail[1:len(sendmail)],
-                                       strings.Split(recipients, " ")...,
-                               )...,
-                       )
-                       cmd.Stdin = decompressor
-                       if err = cmd.Run(); err != nil {
-                               ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "sendmail")
+                       sender := ctx.Neigh[*job.PktEnc.Sender]
+                       sendmail := sender.Sendmail
+                       if len(sendmail) == 0 {
+                               ctx.LogE("rx", SdsAdd(sds, SDS{"err": "No sendmail configured"}), "")
+                               isBad = true
                                goto Closing
                        }
+                       if !dryRun {
+                               cmd := exec.Command(
+                                       sendmail[0],
+                                       append(
+                                               sendmail[1:len(sendmail)],
+                                               strings.Split(recipients, " ")...,
+                                       )...,
+                               )
+                               cmd.Env = append(cmd.Env, "NNCP_SENDER=" + sender.Id.String())
+                               cmd.Stdin = decompressor
+                               if err = cmd.Run(); err != nil {
+                                       ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "sendmail")
+                                       isBad = true
+                                       goto Closing
+                               }
+                       }
                        ctx.LogI("rx", sds, "")
-                       if err = os.Remove(job.Fd.Name()); err != nil {
-                               ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
+                       if !dryRun {
+                               if err = os.Remove(job.Fd.Name()); err != nil {
+                                       ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
+                                       isBad = true
+                               }
                        }
                case PktTypeFile:
                        dst := string(pkt.Path[:int(pkt.PathLen)])
                        sds := SdsAdd(sds, SDS{"type": "file", "dst": dst})
+                       if filepath.IsAbs(dst) {
+                               ctx.LogE("rx", sds, "non-relative destination path")
+                               isBad = true
+                               goto Closing
+                       }
                        incoming := ctx.Neigh[*job.PktEnc.Sender].Incoming
                        if incoming == nil {
                                ctx.LogE("rx", sds, "incoming is not allowed")
+                               isBad = true
                                goto Closing
                        }
                        dir := filepath.Join(*incoming, path.Dir(dst))
                        if err = os.MkdirAll(dir, os.FileMode(0700)); err != nil {
                                ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "mkdir")
+                               isBad = true
                                goto Closing
                        }
-                       tmp, err := ioutil.TempFile(dir, "nncp-file")
-                       sds["tmp"] = tmp.Name()
-                       ctx.LogD("rx", sds, "created")
-                       if err != nil {
-                               ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "mktemp")
-                               goto Closing
-                       }
-                       bufW := bufio.NewWriter(tmp)
-                       if _, err = io.Copy(bufW, pipeR); err != nil {
-                               ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "copy")
-                               goto Closing
-                       }
-                       bufW.Flush()
-                       tmp.Sync()
-                       tmp.Close()
-                       dstPathOrig := filepath.Join(*incoming, dst)
-                       dstPath := dstPathOrig
-                       dstPathCtr := 0
-                       for {
-                               if _, err = os.Stat(dstPath); err != nil {
-                                       if os.IsNotExist(err) {
-                                               break
-                                       }
-                                       ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "stat")
+                       if !dryRun {
+                               tmp, err := ioutil.TempFile(dir, "nncp-file")
+                               sds["tmp"] = tmp.Name()
+                               ctx.LogD("rx", sds, "created")
+                               if err != nil {
+                                       ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "mktemp")
+                                       isBad = true
                                        goto Closing
                                }
-                               dstPath = dstPathOrig + strconv.Itoa(dstPathCtr)
-                               dstPathCtr++
-                       }
-                       if err = os.Rename(tmp.Name(), dstPath); err != nil {
-                               ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "rename")
+                               bufW := bufio.NewWriter(tmp)
+                               if _, err = io.Copy(bufW, pipeR); err != nil {
+                                       ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "copy")
+                                       isBad = true
+                                       goto Closing
+                               }
+                               bufW.Flush()
+                               tmp.Sync()
+                               tmp.Close()
+                               dstPathOrig := filepath.Join(*incoming, dst)
+                               dstPath := dstPathOrig
+                               dstPathCtr := 0
+                               for {
+                                       if _, err = os.Stat(dstPath); err != nil {
+                                               if os.IsNotExist(err) {
+                                                       break
+                                               }
+                                               ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "stat")
+                                               isBad = true
+                                               goto Closing
+                                       }
+                                       dstPath = dstPathOrig + strconv.Itoa(dstPathCtr)
+                                       dstPathCtr++
+                               }
+                               if err = os.Rename(tmp.Name(), dstPath); err != nil {
+                                       ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "rename")
+                                       isBad = true
+                               }
+                               delete(sds, "tmp")
                        }
-                       delete(sds, "tmp")
                        ctx.LogI("rx", sds, "")
-                       if err = os.Remove(job.Fd.Name()); err != nil {
-                               ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
-                       }
-                       sendmail := ctx.Neigh[*ctx.Self.Id].Sendmail
-                       if ctx.NotifyFile != nil {
-                               cmd := exec.Command(
-                                       sendmail[0],
-                                       append(sendmail[1:len(sendmail)], ctx.NotifyFile.To)...,
-                               )
-                               cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
-                                       "File from %s: %s (%s)",
-                                       ctx.Neigh[*job.PktEnc.Sender].Name,
-                                       dst,
-                                       humanize.IBytes(uint64(pktSize)),
-                               ))
-                               cmd.Run()
+                       if !dryRun {
+                               if err = os.Remove(job.Fd.Name()); err != nil {
+                                       ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
+                                       isBad = true
+                               }
+                               sendmail := ctx.Neigh[*ctx.SelfId].Sendmail
+                               if ctx.NotifyFile != nil {
+                                       cmd := exec.Command(
+                                               sendmail[0],
+                                               append(sendmail[1:len(sendmail)], ctx.NotifyFile.To)...,
+                                       )
+                                       cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
+                                               "File from %s: %s (%s)",
+                                               ctx.Neigh[*job.PktEnc.Sender].Name,
+                                               dst,
+                                               humanize.IBytes(uint64(pktSize)),
+                                       ))
+                                       cmd.Run()
+                               }
                        }
                case PktTypeFreq:
                        src := string(pkt.Path[:int(pkt.PathLen)])
+                       if filepath.IsAbs(src) {
+                               ctx.LogE("rx", sds, "non-relative source path")
+                               isBad = true
+                               goto Closing
+                       }
                        sds := SdsAdd(sds, SDS{"type": "freq", "src": src})
                        dstRaw, err := ioutil.ReadAll(pipeR)
                        if err != nil {
                                ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "read")
+                               isBad = true
                                goto Closing
                        }
                        dst := string(dstRaw)
@@ -225,29 +228,53 @@ func (ctx *Ctx) Toss(nodeId *NodeId, nice uint8) {
                        freq := sender.Freq
                        if freq == nil {
                                ctx.LogE("rx", sds, "freqing is not allowed")
+                               isBad = true
                                goto Closing
                        }
-                       err = ctx.TxFile(sender, job.PktEnc.Nice, filepath.Join(*freq, src), dst)
-                       if err != nil {
-                               ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx file")
-                               goto Closing
+                       if !dryRun {
+                               if sender.FreqChunked == 0 {
+                                       err = ctx.TxFile(
+                                               sender,
+                                               job.PktEnc.Nice,
+                                               filepath.Join(*freq, src),
+                                               dst,
+                                               sender.FreqMinSize,
+                                       )
+                               } else {
+                                       err = ctx.TxFileChunked(
+                                               sender,
+                                               job.PktEnc.Nice,
+                                               filepath.Join(*freq, src),
+                                               dst,
+                                               sender.FreqMinSize,
+                                               sender.FreqChunked,
+                                       )
+                               }
+                               if err != nil {
+                                       ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx file")
+                                       isBad = true
+                                       goto Closing
+                               }
                        }
                        ctx.LogI("rx", sds, "")
-                       if err = os.Remove(job.Fd.Name()); err != nil {
-                               ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
-                       }
-                       if ctx.NotifyFreq != nil {
-                               sendmail := ctx.Neigh[*ctx.Self.Id].Sendmail
-                               cmd := exec.Command(
-                                       sendmail[0],
-                                       append(sendmail[1:len(sendmail)], ctx.NotifyFreq.To)...,
-                               )
-                               cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
-                                       "Freq from %s: %s",
-                                       ctx.Neigh[*job.PktEnc.Sender].Name,
-                                       src,
-                               ))
-                               cmd.Run()
+                       if !dryRun {
+                               if err = os.Remove(job.Fd.Name()); err != nil {
+                                       ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
+                                       isBad = true
+                               }
+                               if ctx.NotifyFreq != nil {
+                                       sendmail := ctx.Neigh[*ctx.SelfId].Sendmail
+                                       cmd := exec.Command(
+                                               sendmail[0],
+                                               append(sendmail[1:len(sendmail)], ctx.NotifyFreq.To)...,
+                                       )
+                                       cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
+                                               "Freq from %s: %s",
+                                               ctx.Neigh[*job.PktEnc.Sender].Name,
+                                               src,
+                                       ))
+                                       cmd.Run()
+                               }
                        }
                case PktTypeTrns:
                        dst := new([blake2b.Size256]byte)
@@ -257,21 +284,30 @@ func (ctx *Ctx) Toss(nodeId *NodeId, nice uint8) {
                        sds := SdsAdd(sds, SDS{"type": "trns", "dst": nodeId})
                        if !known {
                                ctx.LogE("rx", sds, "unknown node")
+                               isBad = true
                                goto Closing
                        }
                        ctx.LogD("rx", sds, "taken")
-                       if err = ctx.TxTrns(node, job.PktEnc.Nice, pktSize, pipeR); err != nil {
-                               ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx trns")
-                               goto Closing
+                       if !dryRun {
+                               if err = ctx.TxTrns(node, job.PktEnc.Nice, pktSize, pipeR); err != nil {
+                                       ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx trns")
+                                       isBad = true
+                                       goto Closing
+                               }
                        }
                        ctx.LogI("rx", sds, "")
-                       if err = os.Remove(job.Fd.Name()); err != nil {
-                               ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
+                       if !dryRun {
+                               if err = os.Remove(job.Fd.Name()); err != nil {
+                                       ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
+                                       isBad = true
+                               }
                        }
                default:
                        ctx.LogE("rx", sds, "unknown type")
+                       isBad = true
                }
        Closing:
                pipeR.Close()
        }
+       return isBad
 }