2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2017 Sergey Matveev <stargrave@stargrave.org>
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
36 "github.com/davecgh/go-xdr/xdr2"
37 "github.com/dustin/go-humanize"
38 "golang.org/x/crypto/blake2b"
39 "golang.org/x/sys/unix"
42 func newNotification(fromTo *FromToYAML, subject string) io.Reader {
43 return strings.NewReader(fmt.Sprintf(
44 "From: %s\nTo: %s\nSubject: %s\n",
47 mime.BEncoding.Encode("UTF-8", subject),
51 func (ctx *Ctx) LockDir(nodeId *NodeId, xx TRxTx) (*os.File, error) {
52 ctx.ensureRxDir(nodeId)
53 lockPath := filepath.Join(ctx.Spool, nodeId.String(), string(xx)) + ".lock"
54 dirLock, err := os.OpenFile(
56 os.O_CREATE|os.O_WRONLY,
60 ctx.LogE("lockdir", SDS{"path": lockPath, "err": err}, "")
63 err = unix.Flock(int(dirLock.Fd()), unix.LOCK_EX|unix.LOCK_NB)
65 ctx.LogE("lockdir", SDS{"path": lockPath, "err": err}, "")
72 func (ctx *Ctx) UnlockDir(fd *os.File) {
74 unix.Flock(int(fd.Fd()), unix.LOCK_UN)
79 func (ctx *Ctx) Toss(nodeId *NodeId, nice uint8) {
80 dirLock, err := ctx.LockDir(nodeId, TRx)
84 defer ctx.UnlockDir(dirLock)
85 for job := range ctx.Jobs(nodeId, TRx) {
86 pktName := filepath.Base(job.Fd.Name())
87 sds := SDS{"node": job.PktEnc.Sender, "pkt": pktName}
88 if job.PktEnc.Nice > nice {
89 ctx.LogD("rx", SdsAdd(sds, SDS{
90 "nice": strconv.Itoa(int(job.PktEnc.Nice)),
94 pipeR, pipeW := io.Pipe()
95 errs := make(chan error, 1)
97 pipeWB := bufio.NewWriter(pipeW)
101 bufio.NewReader(job.Fd),
109 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "decryption")
115 if _, err = xdr.Unmarshal(pipeR, &pkt); err != nil {
116 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "unmarshal")
119 pktSize = job.Size - PktEncOverhead - PktOverhead
120 sds["size"] = strconv.FormatInt(pktSize, 10)
121 ctx.LogD("rx", sds, "taken")
124 recipients := string(pkt.Path[:int(pkt.PathLen)])
125 sds := SdsAdd(sds, SDS{
129 decompressor, err := zlib.NewReader(pipeR)
136 ctx.Sendmail[1:len(ctx.Sendmail)],
137 strings.Split(recipients, " ")...,
140 cmd.Stdin = decompressor
141 if err = cmd.Run(); err != nil {
142 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "sendmail")
145 ctx.LogI("rx", sds, "")
146 if err = os.Remove(job.Fd.Name()); err != nil {
147 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
150 dst := string(pkt.Path[:int(pkt.PathLen)])
151 sds := SdsAdd(sds, SDS{"type": "file", "dst": dst})
152 incoming := ctx.Neigh[*job.PktEnc.Sender].Incoming
154 ctx.LogE("rx", sds, "incoming is not allowed")
157 dir := filepath.Join(*incoming, path.Dir(dst))
158 if err = os.MkdirAll(dir, os.FileMode(0700)); err != nil {
159 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "mkdir")
162 tmp, err := ioutil.TempFile(dir, "nncp-file")
163 sds["tmp"] = tmp.Name()
164 ctx.LogD("rx", sds, "created")
166 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "mktemp")
169 bufW := bufio.NewWriter(tmp)
170 if _, err = io.Copy(bufW, pipeR); err != nil {
171 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "copy")
177 dstPathOrig := filepath.Join(*incoming, dst)
178 dstPath := dstPathOrig
181 if _, err = os.Stat(dstPath); err != nil {
182 if os.IsNotExist(err) {
185 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "stat")
188 dstPath = dstPathOrig + strconv.Itoa(dstPathCtr)
191 if err = os.Rename(tmp.Name(), dstPath); err != nil {
192 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "rename")
195 ctx.LogI("rx", sds, "")
196 if err = os.Remove(job.Fd.Name()); err != nil {
197 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
199 if ctx.NotifyFile != nil {
203 ctx.Sendmail[1:len(ctx.Sendmail)],
207 cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
208 "File from %s: %s (%s)",
209 ctx.Neigh[*job.PktEnc.Sender].Name,
211 humanize.IBytes(uint64(pktSize)),
216 src := string(pkt.Path[:int(pkt.PathLen)])
217 sds := SdsAdd(sds, SDS{"type": "freq", "src": src})
218 dstRaw, err := ioutil.ReadAll(pipeR)
220 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "read")
223 dst := string(dstRaw)
225 sender := ctx.Neigh[*job.PktEnc.Sender]
228 ctx.LogE("rx", sds, "freqing is not allowed")
231 err = ctx.TxFile(sender, job.PktEnc.Nice, filepath.Join(*freq, src), dst)
233 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx file")
236 ctx.LogI("rx", sds, "")
237 if err = os.Remove(job.Fd.Name()); err != nil {
238 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
240 if ctx.NotifyFreq != nil {
244 ctx.Sendmail[1:len(ctx.Sendmail)],
248 cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
250 ctx.Neigh[*job.PktEnc.Sender].Name,
256 dst := new([blake2b.Size256]byte)
257 copy(dst[:], pkt.Path[:int(pkt.PathLen)])
258 nodeId := NodeId(*dst)
259 node, known := ctx.Neigh[nodeId]
260 sds := SdsAdd(sds, SDS{"type": "trns", "dst": nodeId})
262 ctx.LogE("rx", sds, "unknown node")
265 ctx.LogD("rx", sds, "taken")
266 if err = ctx.TxTrns(node, job.PktEnc.Nice, pktSize, pipeR); err != nil {
267 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx trns")
270 ctx.LogI("rx", sds, "")
271 if err = os.Remove(job.Fd.Name()); err != nil {
272 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
275 ctx.LogE("rx", sds, "unknown type")