2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2019 Sergey Matveev <stargrave@stargrave.org>
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
35 "github.com/davecgh/go-xdr/xdr2"
36 "github.com/dustin/go-humanize"
37 "github.com/klauspost/compress/zstd"
38 "golang.org/x/crypto/blake2b"
39 "golang.org/x/crypto/poly1305"
46 func newNotification(fromTo *FromToJSON, subject string) io.Reader {
47 return strings.NewReader(fmt.Sprintf(
48 "From: %s\nTo: %s\nSubject: %s\n",
51 mime.BEncoding.Encode("UTF-8", subject),
58 dryRun, doSeen, noFile, noFreq, noExec, noTrns bool,
61 decompressor, err := zstd.NewReader(nil)
65 defer decompressor.Close()
66 for job := range ctx.Jobs(nodeId, TRx) {
67 pktName := filepath.Base(job.Fd.Name())
68 sds := SDS{"node": job.PktEnc.Sender, "pkt": pktName}
69 if job.PktEnc.Nice > nice {
70 ctx.LogD("rx", SdsAdd(sds, SDS{
71 "nice": strconv.Itoa(int(job.PktEnc.Nice)),
75 pipeR, pipeW := io.Pipe()
76 errs := make(chan error, 1)
78 pipeWB := bufio.NewWriter(pipeW)
79 _, _, err := PktEncRead(
82 bufio.NewReader(job.Fd),
90 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "decryption")
96 var pktSizeBlocks int64
97 if _, err = xdr.Unmarshal(pipeR, &pkt); err != nil {
98 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "unmarshal")
102 pktSize = job.Size - PktEncOverhead - PktOverhead - PktSizeOverhead
103 pktSizeBlocks = pktSize / (EncBlkSize + poly1305.TagSize)
104 if pktSize%(EncBlkSize+poly1305.TagSize) != 0 {
105 pktSize -= poly1305.TagSize
107 pktSize -= pktSizeBlocks * poly1305.TagSize
108 sds["size"] = strconv.FormatInt(pktSize, 10)
109 ctx.LogD("rx", sds, "taken")
115 path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0})
116 handle := string(path[0])
117 args := make([]string, 0, len(path)-1)
118 for _, p := range path[1:] {
119 args = append(args, string(p))
121 sds := SdsAdd(sds, SDS{
123 "dst": strings.Join(append([]string{handle}, args...), " "),
125 sender := ctx.Neigh[*job.PktEnc.Sender]
126 cmdline, exists := sender.Exec[handle]
127 if !exists || len(cmdline) == 0 {
128 ctx.LogE("rx", SdsAdd(sds, SDS{"err": "No handle found"}), "")
132 if err = decompressor.Reset(pipeR); err != nil {
138 append(cmdline[1:len(cmdline)], args...)...,
142 "NNCP_SELF="+ctx.Self.Id.String(),
143 "NNCP_SENDER="+sender.Id.String(),
144 "NNCP_NICE="+strconv.Itoa(int(pkt.Nice)),
146 cmd.Stdin = decompressor
147 if err = cmd.Run(); err != nil {
148 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "handle")
153 ctx.LogI("rx", sds, "")
156 if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
160 if err = os.Remove(job.Fd.Name()); err != nil {
161 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
169 dst := string(pkt.Path[:int(pkt.PathLen)])
170 sds := SdsAdd(sds, SDS{"type": "file", "dst": dst})
171 if filepath.IsAbs(dst) {
172 ctx.LogE("rx", sds, "non-relative destination path")
176 incoming := ctx.Neigh[*job.PktEnc.Sender].Incoming
178 ctx.LogE("rx", sds, "incoming is not allowed")
182 dir := filepath.Join(*incoming, path.Dir(dst))
183 if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
184 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "mkdir")
189 tmp, err := TempFile(dir, "file")
191 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "mktemp")
195 sds["tmp"] = tmp.Name()
196 ctx.LogD("rx", sds, "created")
197 bufW := bufio.NewWriter(tmp)
198 if _, err = io.Copy(bufW, pipeR); err != nil {
199 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "copy")
203 if err = bufW.Flush(); err != nil {
205 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "copy")
209 if err = tmp.Sync(); err != nil {
211 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "copy")
216 dstPathOrig := filepath.Join(*incoming, dst)
217 dstPath := dstPathOrig
220 if _, err = os.Stat(dstPath); err != nil {
221 if os.IsNotExist(err) {
224 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "stat")
228 dstPath = dstPathOrig + "." + strconv.Itoa(dstPathCtr)
231 if err = os.Rename(tmp.Name(), dstPath); err != nil {
232 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "rename")
237 ctx.LogI("rx", sds, "")
240 if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
244 if err = os.Remove(job.Fd.Name()); err != nil {
245 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
248 sendmail, exists := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
249 if exists && len(sendmail) > 0 && ctx.NotifyFile != nil {
252 append(sendmail[1:len(sendmail)], ctx.NotifyFile.To)...,
254 cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
255 "File from %s: %s (%s)",
256 ctx.Neigh[*job.PktEnc.Sender].Name,
258 humanize.IBytes(uint64(pktSize)),
267 src := string(pkt.Path[:int(pkt.PathLen)])
268 if filepath.IsAbs(src) {
269 ctx.LogE("rx", sds, "non-relative source path")
273 sds := SdsAdd(sds, SDS{"type": "freq", "src": src})
274 dstRaw, err := ioutil.ReadAll(pipeR)
276 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "read")
280 dst := string(dstRaw)
282 sender := ctx.Neigh[*job.PktEnc.Sender]
285 ctx.LogE("rx", sds, "freqing is not allowed")
290 if sender.FreqChunked == 0 {
294 filepath.Join(*freq, src),
299 err = ctx.TxFileChunked(
302 filepath.Join(*freq, src),
309 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx file")
314 ctx.LogI("rx", sds, "")
317 if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
321 if err = os.Remove(job.Fd.Name()); err != nil {
322 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
325 sendmail, exists := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
326 if exists && len(sendmail) > 0 && ctx.NotifyFreq != nil {
329 append(sendmail[1:len(sendmail)], ctx.NotifyFreq.To)...,
331 cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
333 ctx.Neigh[*job.PktEnc.Sender].Name,
343 dst := new([blake2b.Size256]byte)
344 copy(dst[:], pkt.Path[:int(pkt.PathLen)])
345 nodeId := NodeId(*dst)
346 node, known := ctx.Neigh[nodeId]
347 sds := SdsAdd(sds, SDS{"type": "trns", "dst": nodeId})
349 ctx.LogE("rx", sds, "unknown node")
353 ctx.LogD("rx", sds, "taken")
355 if err = ctx.TxTrns(node, job.PktEnc.Nice, pktSize, pipeR); err != nil {
356 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx trns")
361 ctx.LogI("rx", sds, "")
364 if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
368 if err = os.Remove(job.Fd.Name()); err != nil {
369 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
374 ctx.LogE("rx", sds, "unknown type")