2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2020 Sergey Matveev <stargrave@stargrave.org>
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
37 xdr "github.com/davecgh/go-xdr/xdr2"
38 "github.com/dustin/go-humanize"
39 "github.com/klauspost/compress/zstd"
40 "golang.org/x/crypto/blake2b"
41 "golang.org/x/crypto/poly1305"
48 func newNotification(fromTo *FromToJSON, subject string, body []byte) io.Reader {
50 "From: " + fromTo.From,
52 "Subject: " + mime.BEncoding.Encode("UTF-8", subject),
55 lines = append(lines, []string{
57 "Content-Type: text/plain; charset=utf-8",
58 "Content-Transfer-Encoding: base64",
60 base64.StdEncoding.EncodeToString(body),
63 return strings.NewReader(strings.Join(lines, "\n"))
69 dryRun, doSeen, noFile, noFreq, noExec, noTrns bool,
72 sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
73 decompressor, err := zstd.NewReader(nil)
77 defer decompressor.Close()
78 for job := range ctx.Jobs(nodeId, TRx) {
79 pktName := filepath.Base(job.Fd.Name())
80 sds := SDS{"node": job.PktEnc.Sender, "pkt": pktName}
81 if job.PktEnc.Nice > nice {
82 ctx.LogD("rx", SdsAdd(sds, SDS{"nice": int(job.PktEnc.Nice)}), "too nice")
85 pipeR, pipeW := io.Pipe()
86 errs := make(chan error, 1)
88 pipeWB := bufio.NewWriter(pipeW)
89 _, _, err := PktEncRead(
92 bufio.NewReader(job.Fd),
100 ctx.LogE("rx", sds, err, "decryption")
106 var pktSizeBlocks int64
107 if _, err = xdr.Unmarshal(pipeR, &pkt); err != nil {
108 ctx.LogE("rx", sds, err, "unmarshal")
112 pktSize = job.Size - PktEncOverhead - PktOverhead - PktSizeOverhead
113 pktSizeBlocks = pktSize / (EncBlkSize + poly1305.TagSize)
114 if pktSize%(EncBlkSize+poly1305.TagSize) != 0 {
115 pktSize -= poly1305.TagSize
117 pktSize -= pktSizeBlocks * poly1305.TagSize
118 sds["size"] = pktSize
119 ctx.LogD("rx", sds, "taken")
125 path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0})
126 handle := string(path[0])
127 args := make([]string, 0, len(path)-1)
128 for _, p := range path[1:] {
129 args = append(args, string(p))
131 argsStr := strings.Join(append([]string{handle}, args...), " ")
132 sds := SdsAdd(sds, SDS{
136 sender := ctx.Neigh[*job.PktEnc.Sender]
137 cmdline, exists := sender.Exec[handle]
138 if !exists || len(cmdline) == 0 {
139 ctx.LogE("rx", sds, errors.New("No handle found"), "")
143 if err = decompressor.Reset(pipeR); err != nil {
149 append(cmdline[1:len(cmdline)], args...)...,
153 "NNCP_SELF="+ctx.Self.Id.String(),
154 "NNCP_SENDER="+sender.Id.String(),
155 "NNCP_NICE="+strconv.Itoa(int(pkt.Nice)),
157 cmd.Stdin = decompressor
158 output, err := cmd.Output()
160 ctx.LogE("rx", sds, err, "handle")
164 if len(sendmail) > 0 && ctx.NotifyExec != nil {
165 notify, exists := ctx.NotifyExec[sender.Name+"."+handle]
167 notify, exists = ctx.NotifyExec["*."+handle]
172 append(sendmail[1:len(sendmail)], notify.To)...,
174 cmd.Stdin = newNotification(notify, fmt.Sprintf(
175 "Exec from %s: %s", sender.Name, argsStr,
181 ctx.LogI("rx", sds, "")
184 if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
188 if err = os.Remove(job.Fd.Name()); err != nil {
189 ctx.LogE("rx", sds, err, "remove")
197 dst := string(pkt.Path[:int(pkt.PathLen)])
198 sds := SdsAdd(sds, SDS{"type": "file", "dst": dst})
199 if filepath.IsAbs(dst) {
200 ctx.LogE("rx", sds, errors.New("non-relative destination path"), "")
204 incoming := ctx.Neigh[*job.PktEnc.Sender].Incoming
206 ctx.LogE("rx", sds, errors.New("incoming is not allowed"), "")
210 dir := filepath.Join(*incoming, path.Dir(dst))
211 if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
212 ctx.LogE("rx", sds, err, "mkdir")
217 tmp, err := TempFile(dir, "file")
219 ctx.LogE("rx", sds, err, "mktemp")
223 sds["tmp"] = tmp.Name()
224 ctx.LogD("rx", sds, "created")
225 bufW := bufio.NewWriter(tmp)
226 if _, err = CopyProgressed(
227 bufW, pipeR, "Rx file",
228 SdsAdd(sds, SDS{"fullsize": sds["size"]}),
231 ctx.LogE("rx", sds, err, "copy")
235 if err = bufW.Flush(); err != nil {
237 ctx.LogE("rx", sds, err, "copy")
241 if err = tmp.Sync(); err != nil {
243 ctx.LogE("rx", sds, err, "copy")
248 dstPathOrig := filepath.Join(*incoming, dst)
249 dstPath := dstPathOrig
252 if _, err = os.Stat(dstPath); err != nil {
253 if os.IsNotExist(err) {
256 ctx.LogE("rx", sds, err, "stat")
260 dstPath = dstPathOrig + "." + strconv.Itoa(dstPathCtr)
263 if err = os.Rename(tmp.Name(), dstPath); err != nil {
264 ctx.LogE("rx", sds, err, "rename")
267 if err = DirSync(*incoming); err != nil {
268 ctx.LogE("rx", sds, err, "sync")
273 ctx.LogI("rx", sds, "")
276 if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
280 if err = os.Remove(job.Fd.Name()); err != nil {
281 ctx.LogE("rx", sds, err, "remove")
284 if len(sendmail) > 0 && ctx.NotifyFile != nil {
287 append(sendmail[1:len(sendmail)], ctx.NotifyFile.To)...,
289 cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
290 "File from %s: %s (%s)",
291 ctx.Neigh[*job.PktEnc.Sender].Name,
293 humanize.IBytes(uint64(pktSize)),
302 src := string(pkt.Path[:int(pkt.PathLen)])
303 if filepath.IsAbs(src) {
304 ctx.LogE("rx", sds, errors.New("non-relative source path"), "")
308 sds := SdsAdd(sds, SDS{"type": "freq", "src": src})
309 dstRaw, err := ioutil.ReadAll(pipeR)
311 ctx.LogE("rx", sds, err, "read")
315 dst := string(dstRaw)
317 sender := ctx.Neigh[*job.PktEnc.Sender]
318 freqPath := sender.FreqPath
320 ctx.LogE("rx", sds, errors.New("freqing is not allowed"), "")
328 filepath.Join(*freqPath, src),
335 ctx.LogE("rx", sds, err, "tx file")
340 ctx.LogI("rx", sds, "")
343 if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
347 if err = os.Remove(job.Fd.Name()); err != nil {
348 ctx.LogE("rx", sds, err, "remove")
351 if len(sendmail) > 0 && ctx.NotifyFreq != nil {
354 append(sendmail[1:len(sendmail)], ctx.NotifyFreq.To)...,
356 cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
357 "Freq from %s: %s", sender.Name, src,
366 dst := new([blake2b.Size256]byte)
367 copy(dst[:], pkt.Path[:int(pkt.PathLen)])
368 nodeId := NodeId(*dst)
369 node, known := ctx.Neigh[nodeId]
370 sds := SdsAdd(sds, SDS{"type": "trns", "dst": nodeId})
372 ctx.LogE("rx", sds, errors.New("unknown node"), "")
376 ctx.LogD("rx", sds, "taken")
378 if err = ctx.TxTrns(node, job.PktEnc.Nice, pktSize, pipeR); err != nil {
379 ctx.LogE("rx", sds, err, "tx trns")
384 ctx.LogI("rx", sds, "")
387 if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
391 if err = os.Remove(job.Fd.Name()); err != nil {
392 ctx.LogE("rx", sds, err, "remove")
397 ctx.LogE("rx", sds, errors.New("unknown type"), "")