2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2019 Sergey Matveev <stargrave@stargrave.org>
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
36 xdr "github.com/davecgh/go-xdr/xdr2"
37 "github.com/dustin/go-humanize"
38 "github.com/klauspost/compress/zstd"
39 "golang.org/x/crypto/blake2b"
40 "golang.org/x/crypto/poly1305"
47 func newNotification(fromTo *FromToJSON, subject string, body []byte) io.Reader {
49 "From: " + fromTo.From,
51 "Subject: " + mime.BEncoding.Encode("UTF-8", subject),
54 lines = append(lines, []string{
56 "Content-Type: text/plain; charset=utf-8",
57 "Content-Transfer-Encoding: base64",
59 base64.StdEncoding.EncodeToString(body),
62 return strings.NewReader(strings.Join(lines, "\n"))
68 dryRun, doSeen, noFile, noFreq, noExec, noTrns bool,
71 sendmail := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
72 decompressor, err := zstd.NewReader(nil)
76 defer decompressor.Close()
77 for job := range ctx.Jobs(nodeId, TRx) {
78 pktName := filepath.Base(job.Fd.Name())
79 sds := SDS{"node": job.PktEnc.Sender, "pkt": pktName}
80 if job.PktEnc.Nice > nice {
81 ctx.LogD("rx", SdsAdd(sds, SDS{
82 "nice": strconv.Itoa(int(job.PktEnc.Nice)),
86 pipeR, pipeW := io.Pipe()
87 errs := make(chan error, 1)
89 pipeWB := bufio.NewWriter(pipeW)
90 _, _, err := PktEncRead(
93 bufio.NewReader(job.Fd),
101 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "decryption")
107 var pktSizeBlocks int64
108 if _, err = xdr.Unmarshal(pipeR, &pkt); err != nil {
109 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "unmarshal")
113 pktSize = job.Size - PktEncOverhead - PktOverhead - PktSizeOverhead
114 pktSizeBlocks = pktSize / (EncBlkSize + poly1305.TagSize)
115 if pktSize%(EncBlkSize+poly1305.TagSize) != 0 {
116 pktSize -= poly1305.TagSize
118 pktSize -= pktSizeBlocks * poly1305.TagSize
119 sds["size"] = strconv.FormatInt(pktSize, 10)
120 ctx.LogD("rx", sds, "taken")
126 path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0})
127 handle := string(path[0])
128 args := make([]string, 0, len(path)-1)
129 for _, p := range path[1:] {
130 args = append(args, string(p))
132 argsStr := strings.Join(append([]string{handle}, args...), " ")
133 sds := SdsAdd(sds, SDS{
137 sender := ctx.Neigh[*job.PktEnc.Sender]
138 cmdline, exists := sender.Exec[handle]
139 if !exists || len(cmdline) == 0 {
140 ctx.LogE("rx", SdsAdd(sds, SDS{"err": "No handle found"}), "")
144 if err = decompressor.Reset(pipeR); err != nil {
150 append(cmdline[1:len(cmdline)], args...)...,
154 "NNCP_SELF="+ctx.Self.Id.String(),
155 "NNCP_SENDER="+sender.Id.String(),
156 "NNCP_NICE="+strconv.Itoa(int(pkt.Nice)),
158 cmd.Stdin = decompressor
159 output, err := cmd.Output()
161 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "handle")
165 if len(sendmail) > 0 && ctx.NotifyExec != nil {
166 notify, exists := ctx.NotifyExec[sender.Name+"."+handle]
168 notify, exists = ctx.NotifyExec["*."+handle]
173 append(sendmail[1:len(sendmail)], notify.To)...,
175 cmd.Stdin = newNotification(notify, fmt.Sprintf(
176 "Exec from %s: %s", sender.Name, argsStr,
182 ctx.LogI("rx", sds, "")
185 if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
189 if err = os.Remove(job.Fd.Name()); err != nil {
190 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
198 dst := string(pkt.Path[:int(pkt.PathLen)])
199 sds := SdsAdd(sds, SDS{"type": "file", "dst": dst})
200 if filepath.IsAbs(dst) {
201 ctx.LogE("rx", sds, "non-relative destination path")
205 incoming := ctx.Neigh[*job.PktEnc.Sender].Incoming
207 ctx.LogE("rx", sds, "incoming is not allowed")
211 dir := filepath.Join(*incoming, path.Dir(dst))
212 if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
213 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "mkdir")
218 tmp, err := TempFile(dir, "file")
220 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "mktemp")
224 sds["tmp"] = tmp.Name()
225 ctx.LogD("rx", sds, "created")
226 bufW := bufio.NewWriter(tmp)
227 if _, err = io.Copy(bufW, pipeR); err != nil {
228 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "copy")
232 if err = bufW.Flush(); err != nil {
234 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "copy")
238 if err = tmp.Sync(); err != nil {
240 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "copy")
245 dstPathOrig := filepath.Join(*incoming, dst)
246 dstPath := dstPathOrig
249 if _, err = os.Stat(dstPath); err != nil {
250 if os.IsNotExist(err) {
253 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "stat")
257 dstPath = dstPathOrig + "." + strconv.Itoa(dstPathCtr)
260 if err = os.Rename(tmp.Name(), dstPath); err != nil {
261 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "rename")
266 ctx.LogI("rx", sds, "")
269 if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
273 if err = os.Remove(job.Fd.Name()); err != nil {
274 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
277 if len(sendmail) > 0 && ctx.NotifyFile != nil {
280 append(sendmail[1:len(sendmail)], ctx.NotifyFile.To)...,
282 cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
283 "File from %s: %s (%s)",
284 ctx.Neigh[*job.PktEnc.Sender].Name,
286 humanize.IBytes(uint64(pktSize)),
295 src := string(pkt.Path[:int(pkt.PathLen)])
296 if filepath.IsAbs(src) {
297 ctx.LogE("rx", sds, "non-relative source path")
301 sds := SdsAdd(sds, SDS{"type": "freq", "src": src})
302 dstRaw, err := ioutil.ReadAll(pipeR)
304 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "read")
308 dst := string(dstRaw)
310 sender := ctx.Neigh[*job.PktEnc.Sender]
311 freqPath := sender.FreqPath
313 ctx.LogE("rx", sds, "freqing is not allowed")
321 filepath.Join(*freqPath, src),
328 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx file")
333 ctx.LogI("rx", sds, "")
336 if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
340 if err = os.Remove(job.Fd.Name()); err != nil {
341 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
344 if len(sendmail) > 0 && ctx.NotifyFreq != nil {
347 append(sendmail[1:len(sendmail)], ctx.NotifyFreq.To)...,
349 cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
350 "Freq from %s: %s", sender.Name, src,
359 dst := new([blake2b.Size256]byte)
360 copy(dst[:], pkt.Path[:int(pkt.PathLen)])
361 nodeId := NodeId(*dst)
362 node, known := ctx.Neigh[nodeId]
363 sds := SdsAdd(sds, SDS{"type": "trns", "dst": nodeId})
365 ctx.LogE("rx", sds, "unknown node")
369 ctx.LogD("rx", sds, "taken")
371 if err = ctx.TxTrns(node, job.PktEnc.Nice, pktSize, pipeR); err != nil {
372 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx trns")
377 ctx.LogI("rx", sds, "")
380 if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
384 if err = os.Remove(job.Fd.Name()); err != nil {
385 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
390 ctx.LogE("rx", sds, "unknown type")