2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2019 Sergey Matveev <stargrave@stargrave.org>
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program. If not, see <http://www.gnu.org/licenses/>.
36 "github.com/davecgh/go-xdr/xdr2"
37 "github.com/dustin/go-humanize"
38 "golang.org/x/crypto/blake2b"
39 "golang.org/x/crypto/poly1305"
46 func newNotification(fromTo *FromToJSON, subject string) io.Reader {
47 return strings.NewReader(fmt.Sprintf(
48 "From: %s\nTo: %s\nSubject: %s\n",
51 mime.BEncoding.Encode("UTF-8", subject),
58 dryRun, doSeen, noFile, noFreq, noExec, noTrns bool,
61 for job := range ctx.Jobs(nodeId, TRx) {
62 pktName := filepath.Base(job.Fd.Name())
63 sds := SDS{"node": job.PktEnc.Sender, "pkt": pktName}
64 if job.PktEnc.Nice > nice {
65 ctx.LogD("rx", SdsAdd(sds, SDS{
66 "nice": strconv.Itoa(int(job.PktEnc.Nice)),
70 pipeR, pipeW := io.Pipe()
71 errs := make(chan error, 1)
73 pipeWB := bufio.NewWriter(pipeW)
74 _, _, err := PktEncRead(
77 bufio.NewReader(job.Fd),
85 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "decryption")
91 var pktSizeBlocks int64
92 if _, err = xdr.Unmarshal(pipeR, &pkt); err != nil {
93 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "unmarshal")
97 pktSize = job.Size - PktEncOverhead - PktOverhead - PktSizeOverhead
98 pktSizeBlocks = pktSize / (EncBlkSize + poly1305.TagSize)
99 if pktSize%(EncBlkSize+poly1305.TagSize) != 0 {
100 pktSize -= poly1305.TagSize
102 pktSize -= pktSizeBlocks * poly1305.TagSize
103 sds["size"] = strconv.FormatInt(pktSize, 10)
104 ctx.LogD("rx", sds, "taken")
110 path := bytes.Split(pkt.Path[:int(pkt.PathLen)], []byte{0})
111 handle := string(path[0])
112 args := make([]string, 0, len(path)-1)
113 for _, p := range path[1:] {
114 args = append(args, string(p))
116 sds := SdsAdd(sds, SDS{
118 "dst": strings.Join(append([]string{handle}, args...), " "),
120 decompressor, err := zlib.NewReader(pipeR)
124 sender := ctx.Neigh[*job.PktEnc.Sender]
125 cmdline, exists := sender.Exec[handle]
126 if !exists || len(cmdline) == 0 {
127 ctx.LogE("rx", SdsAdd(sds, SDS{"err": "No handle found"}), "")
134 append(cmdline[1:len(cmdline)], args...)...,
138 "NNCP_SELF="+ctx.Self.Id.String(),
139 "NNCP_SENDER="+sender.Id.String(),
140 "NNCP_NICE="+strconv.Itoa(int(pkt.Nice)),
142 cmd.Stdin = decompressor
143 if err = cmd.Run(); err != nil {
144 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "handle")
149 ctx.LogI("rx", sds, "")
152 if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
156 if err = os.Remove(job.Fd.Name()); err != nil {
157 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
165 dst := string(pkt.Path[:int(pkt.PathLen)])
166 sds := SdsAdd(sds, SDS{"type": "file", "dst": dst})
167 if filepath.IsAbs(dst) {
168 ctx.LogE("rx", sds, "non-relative destination path")
172 incoming := ctx.Neigh[*job.PktEnc.Sender].Incoming
174 ctx.LogE("rx", sds, "incoming is not allowed")
178 dir := filepath.Join(*incoming, path.Dir(dst))
179 if err = os.MkdirAll(dir, os.FileMode(0777)); err != nil {
180 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "mkdir")
185 tmp, err := TempFile(dir, "file")
187 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "mktemp")
191 sds["tmp"] = tmp.Name()
192 ctx.LogD("rx", sds, "created")
193 bufW := bufio.NewWriter(tmp)
194 if _, err = io.Copy(bufW, pipeR); err != nil {
195 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "copy")
199 if err = bufW.Flush(); err != nil {
201 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "copy")
205 if err = tmp.Sync(); err != nil {
207 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "copy")
212 dstPathOrig := filepath.Join(*incoming, dst)
213 dstPath := dstPathOrig
216 if _, err = os.Stat(dstPath); err != nil {
217 if os.IsNotExist(err) {
220 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "stat")
224 dstPath = dstPathOrig + "." + strconv.Itoa(dstPathCtr)
227 if err = os.Rename(tmp.Name(), dstPath); err != nil {
228 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "rename")
233 ctx.LogI("rx", sds, "")
236 if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
240 if err = os.Remove(job.Fd.Name()); err != nil {
241 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
244 sendmail, exists := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
245 if exists && len(sendmail) > 0 && ctx.NotifyFile != nil {
248 append(sendmail[1:len(sendmail)], ctx.NotifyFile.To)...,
250 cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
251 "File from %s: %s (%s)",
252 ctx.Neigh[*job.PktEnc.Sender].Name,
254 humanize.IBytes(uint64(pktSize)),
263 src := string(pkt.Path[:int(pkt.PathLen)])
264 if filepath.IsAbs(src) {
265 ctx.LogE("rx", sds, "non-relative source path")
269 sds := SdsAdd(sds, SDS{"type": "freq", "src": src})
270 dstRaw, err := ioutil.ReadAll(pipeR)
272 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "read")
276 dst := string(dstRaw)
278 sender := ctx.Neigh[*job.PktEnc.Sender]
281 ctx.LogE("rx", sds, "freqing is not allowed")
286 if sender.FreqChunked == 0 {
290 filepath.Join(*freq, src),
295 err = ctx.TxFileChunked(
298 filepath.Join(*freq, src),
305 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx file")
310 ctx.LogI("rx", sds, "")
313 if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
317 if err = os.Remove(job.Fd.Name()); err != nil {
318 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
321 sendmail, exists := ctx.Neigh[*ctx.SelfId].Exec["sendmail"]
322 if exists && len(sendmail) > 0 && ctx.NotifyFreq != nil {
325 append(sendmail[1:len(sendmail)], ctx.NotifyFreq.To)...,
327 cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
329 ctx.Neigh[*job.PktEnc.Sender].Name,
339 dst := new([blake2b.Size256]byte)
340 copy(dst[:], pkt.Path[:int(pkt.PathLen)])
341 nodeId := NodeId(*dst)
342 node, known := ctx.Neigh[nodeId]
343 sds := SdsAdd(sds, SDS{"type": "trns", "dst": nodeId})
345 ctx.LogE("rx", sds, "unknown node")
349 ctx.LogD("rx", sds, "taken")
351 if err = ctx.TxTrns(node, job.PktEnc.Nice, pktSize, pipeR); err != nil {
352 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx trns")
357 ctx.LogI("rx", sds, "")
360 if fd, err := os.Create(job.Fd.Name() + SeenSuffix); err == nil {
364 if err = os.Remove(job.Fd.Name()); err != nil {
365 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
370 ctx.LogE("rx", sds, "unknown type")