]> Cypherpunks.ru repositories - nncp.git/blob - src/cypherpunks.ru/nncp/toss.go
NNCP is expanded to "Node to Node copy"
[nncp.git] / src / cypherpunks.ru / nncp / toss.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2017 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 package nncp
20
21 import (
22         "bufio"
23         "compress/zlib"
24         "fmt"
25         "io"
26         "io/ioutil"
27         "log"
28         "mime"
29         "os"
30         "os/exec"
31         "path"
32         "path/filepath"
33         "strconv"
34         "strings"
35
36         "github.com/davecgh/go-xdr/xdr2"
37         "github.com/dustin/go-humanize"
38         "golang.org/x/crypto/blake2b"
39         "golang.org/x/sys/unix"
40 )
41
42 func newNotification(fromTo *FromToYAML, subject string) io.Reader {
43         return strings.NewReader(fmt.Sprintf(
44                 "From: %s\nTo: %s\nSubject: %s\n",
45                 fromTo.From,
46                 fromTo.To,
47                 mime.BEncoding.Encode("UTF-8", subject),
48         ))
49 }
50
51 func (ctx *Ctx) LockDir(nodeId *NodeId, xx TRxTx) (*os.File, error) {
52         ctx.ensureRxDir(nodeId)
53         lockPath := filepath.Join(ctx.Spool, nodeId.String(), string(xx)) + ".lock"
54         dirLock, err := os.OpenFile(
55                 lockPath,
56                 os.O_CREATE|os.O_WRONLY,
57                 os.FileMode(0600),
58         )
59         if err != nil {
60                 ctx.LogE("lockdir", SDS{"path": lockPath, "err": err}, "")
61                 return nil, err
62         }
63         err = unix.Flock(int(dirLock.Fd()), unix.LOCK_EX|unix.LOCK_NB)
64         if err != nil {
65                 ctx.LogE("lockdir", SDS{"path": lockPath, "err": err}, "")
66                 dirLock.Close()
67                 return nil, err
68         }
69         return dirLock, nil
70 }
71
72 func (ctx *Ctx) UnlockDir(fd *os.File) {
73         if fd != nil {
74                 unix.Flock(int(fd.Fd()), unix.LOCK_UN)
75                 fd.Close()
76         }
77 }
78
79 func (ctx *Ctx) Toss(nodeId *NodeId, nice uint8) {
80         dirLock, err := ctx.LockDir(nodeId, TRx)
81         if err != nil {
82                 return
83         }
84         defer ctx.UnlockDir(dirLock)
85         for job := range ctx.Jobs(nodeId, TRx) {
86                 pktName := filepath.Base(job.Fd.Name())
87                 sds := SDS{"node": job.PktEnc.Sender, "pkt": pktName}
88                 if job.PktEnc.Nice > nice {
89                         ctx.LogD("rx", SdsAdd(sds, SDS{
90                                 "nice": strconv.Itoa(int(job.PktEnc.Nice)),
91                         }), "too nice")
92                         continue
93                 }
94                 pipeR, pipeW := io.Pipe()
95                 errs := make(chan error, 1)
96                 go func(job Job) {
97                         pipeWB := bufio.NewWriter(pipeW)
98                         _, err := PktEncRead(
99                                 ctx.Self,
100                                 ctx.Neigh,
101                                 bufio.NewReader(job.Fd),
102                                 pipeWB,
103                         )
104                         errs <- err
105                         pipeWB.Flush()
106                         pipeW.Close()
107                         job.Fd.Close()
108                         if err != nil {
109                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "decryption")
110                         }
111                 }(job)
112                 var pkt Pkt
113                 var err error
114                 var pktSize int64
115                 if _, err = xdr.Unmarshal(pipeR, &pkt); err != nil {
116                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "unmarshal")
117                         goto Closing
118                 }
119                 pktSize = job.Size - PktEncOverhead - PktOverhead
120                 sds["size"] = strconv.FormatInt(pktSize, 10)
121                 ctx.LogD("rx", sds, "taken")
122                 switch pkt.Type {
123                 case PktTypeMail:
124                         recipients := string(pkt.Path[:int(pkt.PathLen)])
125                         sds := SdsAdd(sds, SDS{
126                                 "type": "mail",
127                                 "dst":  recipients,
128                         })
129                         decompressor, err := zlib.NewReader(pipeR)
130                         if err != nil {
131                                 log.Fatalln(err)
132                         }
133                         cmd := exec.Command(
134                                 ctx.Sendmail[0],
135                                 append(
136                                         ctx.Sendmail[1:len(ctx.Sendmail)],
137                                         strings.Split(recipients, " ")...,
138                                 )...,
139                         )
140                         cmd.Stdin = decompressor
141                         if err = cmd.Run(); err != nil {
142                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "sendmail")
143                                 goto Closing
144                         }
145                         ctx.LogI("rx", sds, "")
146                         if err = os.Remove(job.Fd.Name()); err != nil {
147                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
148                         }
149                 case PktTypeFile:
150                         dst := string(pkt.Path[:int(pkt.PathLen)])
151                         sds := SdsAdd(sds, SDS{"type": "file", "dst": dst})
152                         incoming := ctx.Neigh[*job.PktEnc.Sender].Incoming
153                         if incoming == nil {
154                                 ctx.LogE("rx", sds, "incoming is not allowed")
155                                 goto Closing
156                         }
157                         dir := filepath.Join(*incoming, path.Dir(dst))
158                         if err = os.MkdirAll(dir, os.FileMode(0700)); err != nil {
159                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "mkdir")
160                                 goto Closing
161                         }
162                         tmp, err := ioutil.TempFile(dir, "nncp-file")
163                         sds["tmp"] = tmp.Name()
164                         ctx.LogD("rx", sds, "created")
165                         if err != nil {
166                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "mktemp")
167                                 goto Closing
168                         }
169                         bufW := bufio.NewWriter(tmp)
170                         if _, err = io.Copy(bufW, pipeR); err != nil {
171                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "copy")
172                                 goto Closing
173                         }
174                         bufW.Flush()
175                         tmp.Sync()
176                         tmp.Close()
177                         dstPathOrig := filepath.Join(*incoming, dst)
178                         dstPath := dstPathOrig
179                         dstPathCtr := 0
180                         for {
181                                 if _, err = os.Stat(dstPath); err != nil {
182                                         if os.IsNotExist(err) {
183                                                 break
184                                         }
185                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "stat")
186                                         goto Closing
187                                 }
188                                 dstPath = dstPathOrig + strconv.Itoa(dstPathCtr)
189                                 dstPathCtr++
190                         }
191                         if err = os.Rename(tmp.Name(), dstPath); err != nil {
192                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "rename")
193                         }
194                         delete(sds, "tmp")
195                         ctx.LogI("rx", sds, "")
196                         if err = os.Remove(job.Fd.Name()); err != nil {
197                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
198                         }
199                         if ctx.NotifyFile != nil {
200                                 cmd := exec.Command(
201                                         ctx.Sendmail[0],
202                                         append(
203                                                 ctx.Sendmail[1:len(ctx.Sendmail)],
204                                                 ctx.NotifyFile.To,
205                                         )...,
206                                 )
207                                 cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
208                                         "File from %s: %s (%s)",
209                                         ctx.Neigh[*job.PktEnc.Sender].Name,
210                                         dst,
211                                         humanize.IBytes(uint64(pktSize)),
212                                 ))
213                                 cmd.Run()
214                         }
215                 case PktTypeFreq:
216                         src := string(pkt.Path[:int(pkt.PathLen)])
217                         sds := SdsAdd(sds, SDS{"type": "freq", "src": src})
218                         dstRaw, err := ioutil.ReadAll(pipeR)
219                         if err != nil {
220                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "read")
221                                 goto Closing
222                         }
223                         dst := string(dstRaw)
224                         sds["dst"] = dst
225                         sender := ctx.Neigh[*job.PktEnc.Sender]
226                         freq := sender.Freq
227                         if freq == nil {
228                                 ctx.LogE("rx", sds, "freqing is not allowed")
229                                 goto Closing
230                         }
231                         err = ctx.TxFile(sender, job.PktEnc.Nice, filepath.Join(*freq, src), dst)
232                         if err != nil {
233                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx file")
234                                 goto Closing
235                         }
236                         ctx.LogI("rx", sds, "")
237                         if err = os.Remove(job.Fd.Name()); err != nil {
238                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
239                         }
240                         if ctx.NotifyFreq != nil {
241                                 cmd := exec.Command(
242                                         ctx.Sendmail[0],
243                                         append(
244                                                 ctx.Sendmail[1:len(ctx.Sendmail)],
245                                                 ctx.NotifyFreq.To,
246                                         )...,
247                                 )
248                                 cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
249                                         "Freq from %s: %s",
250                                         ctx.Neigh[*job.PktEnc.Sender].Name,
251                                         src,
252                                 ))
253                                 cmd.Run()
254                         }
255                 case PktTypeTrns:
256                         dst := new([blake2b.Size256]byte)
257                         copy(dst[:], pkt.Path[:int(pkt.PathLen)])
258                         nodeId := NodeId(*dst)
259                         node, known := ctx.Neigh[nodeId]
260                         sds := SdsAdd(sds, SDS{"type": "trns", "dst": nodeId})
261                         if !known {
262                                 ctx.LogE("rx", sds, "unknown node")
263                                 goto Closing
264                         }
265                         ctx.LogD("rx", sds, "taken")
266                         if err = ctx.TxTrns(node, job.PktEnc.Nice, pktSize, pipeR); err != nil {
267                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx trns")
268                                 goto Closing
269                         }
270                         ctx.LogI("rx", sds, "")
271                         if err = os.Remove(job.Fd.Name()); err != nil {
272                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
273                         }
274                 default:
275                         ctx.LogE("rx", sds, "unknown type")
276                 }
277         Closing:
278                 pipeR.Close()
279         }
280 }