2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2019 Sergey Matveev <stargrave@stargrave.org>
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
37 "github.com/davecgh/go-xdr/xdr2"
38 "github.com/dustin/go-humanize"
39 "golang.org/x/crypto/blake2b"
40 "golang.org/x/crypto/poly1305"
47 func newNotification(fromTo *FromToYAML, subject string) io.Reader {
48 return strings.NewReader(fmt.Sprintf(
49 "From: %s\nTo: %s\nSubject: %s\n",
52 mime.BEncoding.Encode("UTF-8", subject),
59 dryRun, doSeen, noFile, noFreq, noExec, noTrns bool,
62 for job := range ctx.Jobs(nodeId, TRx) {
63 pktName := filepath.Base(job.Fd.Name())
64 sds := SDS{"node": job.PktEnc.Sender, "pkt": pktName}
65 if job.PktEnc.Nice > nice {
66 ctx.LogD("rx", SdsAdd(sds, SDS{
67 "nice": strconv.Itoa(int(job.PktEnc.Nice)),
71 pipeR, pipeW := io.Pipe()
72 errs := make(chan error, 1)
74 pipeWB := bufio.NewWriter(pipeW)
75 _, _, err := PktEncRead(
78 bufio.NewReader(job.Fd),
86 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "decryption")
92 var pktSizeBlocks int64
93 if _, err = xdr.Unmarshal(pipeR, &pkt); err != nil {
94 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "unmarshal")
98 pktSize = job.Size - PktEncOverhead - PktOverhead - PktSizeOverhead
99 pktSizeBlocks = pktSize / (EncBlkSize + poly1305.TagSize)
100 if pktSize%(EncBlkSize+poly1305.TagSize) != 0 {
101 pktSize -= poly1305.TagSize
103 pktSize -= pktSizeBlocks * poly1305.TagSize
104 sds["size"] = strconv.FormatInt(pktSize, 10)
105 ctx.LogD("rx", sds, "taken")
111 path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0})
112 handle := string(path[0])
113 args := make([]string, 0, len(path)-1)
114 for _, p := range path[1:] {
115 args = append(args, string(p))
117 sds := SdsAdd(sds, SDS{
119 "dst": strings.Join(append([]string{handle}, args...), " "),
121 decompressor, err := zlib.NewReader(pipeR)
125 sender := ctx.Neigh[*job.PktEnc.Sender]
126 cmdline, exists := sender.Exec[handle]
127 if !exists || len(cmdline) == 0 {
128 ctx.LogE("rx", SdsAdd(sds, SDS{"err": "No handle found"}), "")
135 append(cmdline[1:len(cmdline)], args...)...,
139 "NNCP_SELF="+ctx.Self.Id.String(),
140 "NNCP_SENDER="+sender.Id.String(),
141 "NNCP_NICE="+strconv.Itoa(int(pkt.Nice)),
143 cmd.Stdin = decompressor
144 if err = cmd.Run(); err != nil {
145 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "handle")
150 ctx.LogI("rx", sds, "")
153 if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
157 if err = os.Remove(job.Fd.Name()); err != nil {
158 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
166 dst := string(pkt.Path[:int(pkt.PathLen)])
167 sds := SdsAdd(sds, SDS{"type": "file", "dst": dst})
168 if filepath.IsAbs(dst) {
169 ctx.LogE("rx", sds, "non-relative destination path")
173 incoming := ctx.Neigh[*job.PktEnc.Sender].Incoming
175 ctx.LogE("rx", sds, "incoming is not allowed")
179 dir := filepath.Join(*incoming, path.Dir(dst))
180 if err = os.MkdirAll(dir, os.FileMode(0700)); err != nil {
181 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "mkdir")
186 tmp, err := ioutil.TempFile(dir, "nncp-file")
188 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "mktemp")
192 sds["tmp"] = tmp.Name()
193 ctx.LogD("rx", sds, "created")
194 bufW := bufio.NewWriter(tmp)
195 if _, err = io.Copy(bufW, pipeR); err != nil {
196 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "copy")
200 if err = bufW.Flush(); err != nil {
202 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "copy")
206 if err = tmp.Sync(); err != nil {
208 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "copy")
213 dstPathOrig := filepath.Join(*incoming, dst)
214 dstPath := dstPathOrig
217 if _, err = os.Stat(dstPath); err != nil {
218 if os.IsNotExist(err) {
221 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "stat")
225 dstPath = dstPathOrig + strconv.Itoa(dstPathCtr)
228 if err = os.Rename(tmp.Name(), dstPath); err != nil {
229 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "rename")
234 ctx.LogI("rx", sds, "")
237 if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
241 if err = os.Remove(job.Fd.Name()); err != nil {
242 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
245 sendmail, exists := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
246 if exists && len(sendmail) > 0 && ctx.NotifyFile != nil {
249 append(sendmail[1:len(sendmail)], ctx.NotifyFile.To)...,
251 cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
252 "File from %s: %s (%s)",
253 ctx.Neigh[*job.PktEnc.Sender].Name,
255 humanize.IBytes(uint64(pktSize)),
264 src := string(pkt.Path[:int(pkt.PathLen)])
265 if filepath.IsAbs(src) {
266 ctx.LogE("rx", sds, "non-relative source path")
270 sds := SdsAdd(sds, SDS{"type": "freq", "src": src})
271 dstRaw, err := ioutil.ReadAll(pipeR)
273 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "read")
277 dst := string(dstRaw)
279 sender := ctx.Neigh[*job.PktEnc.Sender]
282 ctx.LogE("rx", sds, "freqing is not allowed")
287 if sender.FreqChunked == 0 {
291 filepath.Join(*freq, src),
296 err = ctx.TxFileChunked(
299 filepath.Join(*freq, src),
306 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx file")
311 ctx.LogI("rx", sds, "")
314 if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
318 if err = os.Remove(job.Fd.Name()); err != nil {
319 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
322 sendmail, exists := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
323 if exists && len(sendmail) > 0 && ctx.NotifyFreq != nil {
326 append(sendmail[1:len(sendmail)], ctx.NotifyFreq.To)...,
328 cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
330 ctx.Neigh[*job.PktEnc.Sender].Name,
340 dst := new([blake2b.Size256]byte)
341 copy(dst[:], pkt.Path[:int(pkt.PathLen)])
342 nodeId := NodeId(*dst)
343 node, known := ctx.Neigh[nodeId]
344 sds := SdsAdd(sds, SDS{"type": "trns", "dst": nodeId})
346 ctx.LogE("rx", sds, "unknown node")
350 ctx.LogD("rx", sds, "taken")
352 if err = ctx.TxTrns(node, job.PktEnc.Nice, pktSize, pipeR); err != nil {
353 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx trns")
358 ctx.LogI("rx", sds, "")
361 if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
365 if err = os.Remove(job.Fd.Name()); err != nil {
366 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
371 ctx.LogE("rx", sds, "unknown type")