]> Cypherpunks.ru repositories - nncp.git/blobdiff - src/cypherpunks.ru/nncp/toss.go
Merge branch 'develop'
[nncp.git] / src / cypherpunks.ru / nncp / toss.go
index 7434c1af551bef0338fdaee2bfb936e274e7f03e..86d154b2aa310a36b454e87c36ab989312c80e44 100644 (file)
@@ -1,6 +1,6 @@
 /*
 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
-Copyright (C) 2016-2017 Sergey Matveev <stargrave@stargrave.org>
+Copyright (C) 2016-2018 Sergey Matveev <stargrave@stargrave.org>
 
 This program is free software: you can redistribute it and/or modify
 it under the terms of the GNU General Public License as published by
@@ -20,6 +20,7 @@ package nncp
 
 import (
        "bufio"
+       "bytes"
        "compress/zlib"
        "fmt"
        "io"
@@ -36,7 +37,10 @@ import (
        "github.com/davecgh/go-xdr/xdr2"
        "github.com/dustin/go-humanize"
        "golang.org/x/crypto/blake2b"
-       "golang.org/x/sys/unix"
+)
+
+const (
+       SeenSuffix = ".seen"
 )
 
 func newNotification(fromTo *FromToYAML, subject string) io.Reader {
@@ -48,40 +52,8 @@ func newNotification(fromTo *FromToYAML, subject string) io.Reader {
        ))
 }
 
-func (ctx *Ctx) LockDir(nodeId *NodeId, xx TRxTx) (*os.File, error) {
-       ctx.ensureRxDir(nodeId)
-       lockPath := filepath.Join(ctx.Spool, nodeId.String(), string(xx)) + ".lock"
-       dirLock, err := os.OpenFile(
-               lockPath,
-               os.O_CREATE|os.O_WRONLY,
-               os.FileMode(0600),
-       )
-       if err != nil {
-               ctx.LogE("lockdir", SDS{"path": lockPath, "err": err}, "")
-               return nil, err
-       }
-       err = unix.Flock(int(dirLock.Fd()), unix.LOCK_EX|unix.LOCK_NB)
-       if err != nil {
-               ctx.LogE("lockdir", SDS{"path": lockPath, "err": err}, "")
-               dirLock.Close()
-               return nil, err
-       }
-       return dirLock, nil
-}
-
-func (ctx *Ctx) UnlockDir(fd *os.File) {
-       if fd != nil {
-               unix.Flock(int(fd.Fd()), unix.LOCK_UN)
-               fd.Close()
-       }
-}
-
-func (ctx *Ctx) Toss(nodeId *NodeId, nice uint8) {
-       dirLock, err := ctx.LockDir(nodeId, TRx)
-       if err != nil {
-               return
-       }
-       defer ctx.UnlockDir(dirLock)
+func (ctx *Ctx) Toss(nodeId *NodeId, nice uint8, dryRun, doSeen, noFile, noFreq, noExec, noTrns bool) bool {
+       isBad := false
        for job := range ctx.Jobs(nodeId, TRx) {
                pktName := filepath.Base(job.Fd.Name())
                sds := SDS{"node": job.PktEnc.Sender, "pkt": pktName}
@@ -95,7 +67,7 @@ func (ctx *Ctx) Toss(nodeId *NodeId, nice uint8) {
                errs := make(chan error, 1)
                go func(job Job) {
                        pipeWB := bufio.NewWriter(pipeW)
-                       _, err := PktEncRead(
+                       _, _, err := PktEncRead(
                                ctx.Self,
                                ctx.Neigh,
                                bufio.NewReader(job.Fd),
@@ -114,110 +86,171 @@ func (ctx *Ctx) Toss(nodeId *NodeId, nice uint8) {
                var pktSize int64
                if _, err = xdr.Unmarshal(pipeR, &pkt); err != nil {
                        ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "unmarshal")
+                       isBad = true
                        goto Closing
                }
                pktSize = job.Size - PktEncOverhead - PktOverhead
                sds["size"] = strconv.FormatInt(pktSize, 10)
                ctx.LogD("rx", sds, "taken")
                switch pkt.Type {
-               case PktTypeMail:
-                       recipients := string(pkt.Path[:int(pkt.PathLen)])
+               case PktTypeExec:
+                       if noExec {
+                               goto Closing
+                       }
+                       path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0})
+                       handle := string(path[0])
+                       args := make([]string, 0, len(path)-1)
+                       for _, p := range path[1:] {
+                               args = append(args, string(p))
+                       }
                        sds := SdsAdd(sds, SDS{
-                               "type": "mail",
-                               "dst":  recipients,
+                               "type": "exec",
+                               "dst":  strings.Join(append([]string{handle}, args...), " "),
                        })
                        decompressor, err := zlib.NewReader(pipeR)
                        if err != nil {
                                log.Fatalln(err)
                        }
-                       cmd := exec.Command(
-                               ctx.Sendmail[0],
-                               append(
-                                       ctx.Sendmail[1:len(ctx.Sendmail)],
-                                       strings.Split(recipients, " ")...,
-                               )...,
-                       )
-                       cmd.Stdin = decompressor
-                       if err = cmd.Run(); err != nil {
-                               ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "sendmail")
+                       sender := ctx.Neigh[*job.PktEnc.Sender]
+                       cmdline, exists := sender.Exec[handle]
+                       if !exists || len(cmdline) == 0 {
+                               ctx.LogE("rx", SdsAdd(sds, SDS{"err": "No handle found"}), "")
+                               isBad = true
                                goto Closing
                        }
+                       if !dryRun {
+                               cmd := exec.Command(
+                                       cmdline[0],
+                                       append(cmdline[1:len(cmdline)], args...)...,
+                               )
+                               cmd.Env = append(
+                                       cmd.Env,
+                                       "NNCP_SELF="+ctx.Self.Id.String(),
+                                       "NNCP_SENDER="+sender.Id.String(),
+                                       "NNCP_NICE="+strconv.Itoa(int(pkt.Nice)),
+                               )
+                               cmd.Stdin = decompressor
+                               if err = cmd.Run(); err != nil {
+                                       ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "handle")
+                                       isBad = true
+                                       goto Closing
+                               }
+                       }
                        ctx.LogI("rx", sds, "")
-                       if err = os.Remove(job.Fd.Name()); err != nil {
-                               ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
+                       if !dryRun {
+                               if doSeen {
+                                       if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
+                                               fd.Close()
+                                       }
+                               }
+                               if err = os.Remove(job.Fd.Name()); err != nil {
+                                       ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
+                                       isBad = true
+                               }
                        }
                case PktTypeFile:
+                       if noFile {
+                               goto Closing
+                       }
                        dst := string(pkt.Path[:int(pkt.PathLen)])
                        sds := SdsAdd(sds, SDS{"type": "file", "dst": dst})
+                       if filepath.IsAbs(dst) {
+                               ctx.LogE("rx", sds, "non-relative destination path")
+                               isBad = true
+                               goto Closing
+                       }
                        incoming := ctx.Neigh[*job.PktEnc.Sender].Incoming
                        if incoming == nil {
                                ctx.LogE("rx", sds, "incoming is not allowed")
+                               isBad = true
                                goto Closing
                        }
                        dir := filepath.Join(*incoming, path.Dir(dst))
                        if err = os.MkdirAll(dir, os.FileMode(0700)); err != nil {
                                ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "mkdir")
+                               isBad = true
                                goto Closing
                        }
-                       tmp, err := ioutil.TempFile(dir, "nncp-file")
-                       sds["tmp"] = tmp.Name()
-                       ctx.LogD("rx", sds, "created")
-                       if err != nil {
-                               ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "mktemp")
-                               goto Closing
-                       }
-                       bufW := bufio.NewWriter(tmp)
-                       if _, err = io.Copy(bufW, pipeR); err != nil {
-                               ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "copy")
-                               goto Closing
-                       }
-                       bufW.Flush()
-                       tmp.Sync()
-                       tmp.Close()
-                       dstPathOrig := filepath.Join(*incoming, dst)
-                       dstPath := dstPathOrig
-                       dstPathCtr := 0
-                       for {
-                               if _, err = os.Stat(dstPath); err != nil {
-                                       if os.IsNotExist(err) {
-                                               break
-                                       }
-                                       ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "stat")
+                       if !dryRun {
+                               tmp, err := ioutil.TempFile(dir, "nncp-file")
+                               sds["tmp"] = tmp.Name()
+                               ctx.LogD("rx", sds, "created")
+                               if err != nil {
+                                       ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "mktemp")
+                                       isBad = true
                                        goto Closing
                                }
-                               dstPath = dstPathOrig + strconv.Itoa(dstPathCtr)
-                               dstPathCtr++
-                       }
-                       if err = os.Rename(tmp.Name(), dstPath); err != nil {
-                               ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "rename")
+                               bufW := bufio.NewWriter(tmp)
+                               if _, err = io.Copy(bufW, pipeR); err != nil {
+                                       ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "copy")
+                                       isBad = true
+                                       goto Closing
+                               }
+                               bufW.Flush()
+                               tmp.Sync()
+                               tmp.Close()
+                               dstPathOrig := filepath.Join(*incoming, dst)
+                               dstPath := dstPathOrig
+                               dstPathCtr := 0
+                               for {
+                                       if _, err = os.Stat(dstPath); err != nil {
+                                               if os.IsNotExist(err) {
+                                                       break
+                                               }
+                                               ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "stat")
+                                               isBad = true
+                                               goto Closing
+                                       }
+                                       dstPath = dstPathOrig + strconv.Itoa(dstPathCtr)
+                                       dstPathCtr++
+                               }
+                               if err = os.Rename(tmp.Name(), dstPath); err != nil {
+                                       ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "rename")
+                                       isBad = true
+                               }
+                               delete(sds, "tmp")
                        }
-                       delete(sds, "tmp")
                        ctx.LogI("rx", sds, "")
-                       if err = os.Remove(job.Fd.Name()); err != nil {
-                               ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
-                       }
-                       if ctx.NotifyFile != nil {
-                               cmd := exec.Command(
-                                       ctx.Sendmail[0],
-                                       append(
-                                               ctx.Sendmail[1:len(ctx.Sendmail)],
-                                               ctx.NotifyFile.To,
-                                       )...,
-                               )
-                               cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
-                                       "File from %s: %s (%s)",
-                                       ctx.Neigh[*job.PktEnc.Sender].Name,
-                                       dst,
-                                       humanize.IBytes(uint64(pktSize)),
-                               ))
-                               cmd.Run()
+                       if !dryRun {
+                               if doSeen {
+                                       if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
+                                               fd.Close()
+                                       }
+                               }
+                               if err = os.Remove(job.Fd.Name()); err != nil {
+                                       ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
+                                       isBad = true
+                               }
+                               sendmail, exists := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
+                               if exists && len(sendmail) > 0 && ctx.NotifyFile != nil {
+                                       cmd := exec.Command(
+                                               sendmail[0],
+                                               append(sendmail[1:len(sendmail)], ctx.NotifyFile.To)...,
+                                       )
+                                       cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
+                                               "File from %s: %s (%s)",
+                                               ctx.Neigh[*job.PktEnc.Sender].Name,
+                                               dst,
+                                               humanize.IBytes(uint64(pktSize)),
+                                       ))
+                                       cmd.Run()
+                               }
                        }
                case PktTypeFreq:
+                       if noFreq {
+                               goto Closing
+                       }
                        src := string(pkt.Path[:int(pkt.PathLen)])
+                       if filepath.IsAbs(src) {
+                               ctx.LogE("rx", sds, "non-relative source path")
+                               isBad = true
+                               goto Closing
+                       }
                        sds := SdsAdd(sds, SDS{"type": "freq", "src": src})
                        dstRaw, err := ioutil.ReadAll(pipeR)
                        if err != nil {
                                ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "read")
+                               isBad = true
                                goto Closing
                        }
                        dst := string(dstRaw)
@@ -226,33 +259,63 @@ func (ctx *Ctx) Toss(nodeId *NodeId, nice uint8) {
                        freq := sender.Freq
                        if freq == nil {
                                ctx.LogE("rx", sds, "freqing is not allowed")
+                               isBad = true
                                goto Closing
                        }
-                       err = ctx.TxFile(sender, job.PktEnc.Nice, filepath.Join(*freq, src), dst)
-                       if err != nil {
-                               ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx file")
-                               goto Closing
+                       if !dryRun {
+                               if sender.FreqChunked == 0 {
+                                       err = ctx.TxFile(
+                                               sender,
+                                               pkt.Nice,
+                                               filepath.Join(*freq, src),
+                                               dst,
+                                               sender.FreqMinSize,
+                                       )
+                               } else {
+                                       err = ctx.TxFileChunked(
+                                               sender,
+                                               pkt.Nice,
+                                               filepath.Join(*freq, src),
+                                               dst,
+                                               sender.FreqMinSize,
+                                               sender.FreqChunked,
+                                       )
+                               }
+                               if err != nil {
+                                       ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx file")
+                                       isBad = true
+                                       goto Closing
+                               }
                        }
                        ctx.LogI("rx", sds, "")
-                       if err = os.Remove(job.Fd.Name()); err != nil {
-                               ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
-                       }
-                       if ctx.NotifyFreq != nil {
-                               cmd := exec.Command(
-                                       ctx.Sendmail[0],
-                                       append(
-                                               ctx.Sendmail[1:len(ctx.Sendmail)],
-                                               ctx.NotifyFreq.To,
-                                       )...,
-                               )
-                               cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
-                                       "Freq from %s: %s",
-                                       ctx.Neigh[*job.PktEnc.Sender].Name,
-                                       src,
-                               ))
-                               cmd.Run()
+                       if !dryRun {
+                               if doSeen {
+                                       if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
+                                               fd.Close()
+                                       }
+                               }
+                               if err = os.Remove(job.Fd.Name()); err != nil {
+                                       ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
+                                       isBad = true
+                               }
+                               sendmail, exists := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
+                               if exists && len(sendmail) > 0 && ctx.NotifyFreq != nil {
+                                       cmd := exec.Command(
+                                               sendmail[0],
+                                               append(sendmail[1:len(sendmail)], ctx.NotifyFreq.To)...,
+                                       )
+                                       cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
+                                               "Freq from %s: %s",
+                                               ctx.Neigh[*job.PktEnc.Sender].Name,
+                                               src,
+                                       ))
+                                       cmd.Run()
+                               }
                        }
                case PktTypeTrns:
+                       if noTrns {
+                               goto Closing
+                       }
                        dst := new([blake2b.Size256]byte)
                        copy(dst[:], pkt.Path[:int(pkt.PathLen)])
                        nodeId := NodeId(*dst)
@@ -260,21 +323,35 @@ func (ctx *Ctx) Toss(nodeId *NodeId, nice uint8) {
                        sds := SdsAdd(sds, SDS{"type": "trns", "dst": nodeId})
                        if !known {
                                ctx.LogE("rx", sds, "unknown node")
+                               isBad = true
                                goto Closing
                        }
                        ctx.LogD("rx", sds, "taken")
-                       if err = ctx.TxTrns(node, job.PktEnc.Nice, pktSize, pipeR); err != nil {
-                               ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx trns")
-                               goto Closing
+                       if !dryRun {
+                               if err = ctx.TxTrns(node, job.PktEnc.Nice, pktSize, pipeR); err != nil {
+                                       ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx trns")
+                                       isBad = true
+                                       goto Closing
+                               }
                        }
                        ctx.LogI("rx", sds, "")
-                       if err = os.Remove(job.Fd.Name()); err != nil {
-                               ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
+                       if !dryRun {
+                               if doSeen {
+                                       if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
+                                               fd.Close()
+                                       }
+                               }
+                               if err = os.Remove(job.Fd.Name()); err != nil {
+                                       ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
+                                       isBad = true
+                               }
                        }
                default:
                        ctx.LogE("rx", sds, "unknown type")
+                       isBad = true
                }
        Closing:
                pipeR.Close()
        }
+       return isBad
 }