]> Cypherpunks.ru repositories - nncp.git/blob - src/cypherpunks.ru/nncp/toss.go
Configure sendmail command per node
[nncp.git] / src / cypherpunks.ru / nncp / toss.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2017 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 package nncp
20
21 import (
22         "bufio"
23         "compress/zlib"
24         "fmt"
25         "io"
26         "io/ioutil"
27         "log"
28         "mime"
29         "os"
30         "os/exec"
31         "path"
32         "path/filepath"
33         "strconv"
34         "strings"
35
36         "github.com/davecgh/go-xdr/xdr2"
37         "github.com/dustin/go-humanize"
38         "golang.org/x/crypto/blake2b"
39         "golang.org/x/sys/unix"
40 )
41
42 func newNotification(fromTo *FromToYAML, subject string) io.Reader {
43         return strings.NewReader(fmt.Sprintf(
44                 "From: %s\nTo: %s\nSubject: %s\n",
45                 fromTo.From,
46                 fromTo.To,
47                 mime.BEncoding.Encode("UTF-8", subject),
48         ))
49 }
50
51 func (ctx *Ctx) LockDir(nodeId *NodeId, xx TRxTx) (*os.File, error) {
52         ctx.ensureRxDir(nodeId)
53         lockPath := filepath.Join(ctx.Spool, nodeId.String(), string(xx)) + ".lock"
54         dirLock, err := os.OpenFile(
55                 lockPath,
56                 os.O_CREATE|os.O_WRONLY,
57                 os.FileMode(0600),
58         )
59         if err != nil {
60                 ctx.LogE("lockdir", SDS{"path": lockPath, "err": err}, "")
61                 return nil, err
62         }
63         err = unix.Flock(int(dirLock.Fd()), unix.LOCK_EX|unix.LOCK_NB)
64         if err != nil {
65                 ctx.LogE("lockdir", SDS{"path": lockPath, "err": err}, "")
66                 dirLock.Close()
67                 return nil, err
68         }
69         return dirLock, nil
70 }
71
72 func (ctx *Ctx) UnlockDir(fd *os.File) {
73         if fd != nil {
74                 unix.Flock(int(fd.Fd()), unix.LOCK_UN)
75                 fd.Close()
76         }
77 }
78
79 func (ctx *Ctx) Toss(nodeId *NodeId, nice uint8) {
80         dirLock, err := ctx.LockDir(nodeId, TRx)
81         if err != nil {
82                 return
83         }
84         defer ctx.UnlockDir(dirLock)
85         for job := range ctx.Jobs(nodeId, TRx) {
86                 pktName := filepath.Base(job.Fd.Name())
87                 sds := SDS{"node": job.PktEnc.Sender, "pkt": pktName}
88                 if job.PktEnc.Nice > nice {
89                         ctx.LogD("rx", SdsAdd(sds, SDS{
90                                 "nice": strconv.Itoa(int(job.PktEnc.Nice)),
91                         }), "too nice")
92                         continue
93                 }
94                 pipeR, pipeW := io.Pipe()
95                 errs := make(chan error, 1)
96                 go func(job Job) {
97                         pipeWB := bufio.NewWriter(pipeW)
98                         _, err := PktEncRead(
99                                 ctx.Self,
100                                 ctx.Neigh,
101                                 bufio.NewReader(job.Fd),
102                                 pipeWB,
103                         )
104                         errs <- err
105                         pipeWB.Flush()
106                         pipeW.Close()
107                         job.Fd.Close()
108                         if err != nil {
109                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "decryption")
110                         }
111                 }(job)
112                 var pkt Pkt
113                 var err error
114                 var pktSize int64
115                 if _, err = xdr.Unmarshal(pipeR, &pkt); err != nil {
116                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "unmarshal")
117                         goto Closing
118                 }
119                 pktSize = job.Size - PktEncOverhead - PktOverhead
120                 sds["size"] = strconv.FormatInt(pktSize, 10)
121                 ctx.LogD("rx", sds, "taken")
122                 switch pkt.Type {
123                 case PktTypeMail:
124                         recipients := string(pkt.Path[:int(pkt.PathLen)])
125                         sds := SdsAdd(sds, SDS{
126                                 "type": "mail",
127                                 "dst":  recipients,
128                         })
129                         decompressor, err := zlib.NewReader(pipeR)
130                         if err != nil {
131                                 log.Fatalln(err)
132                         }
133                         sendmail := ctx.Neigh[*job.PktEnc.Sender].Sendmail
134                         cmd := exec.Command(
135                                 sendmail[0],
136                                 append(
137                                         sendmail[1:len(sendmail)],
138                                         strings.Split(recipients, " ")...,
139                                 )...,
140                         )
141                         cmd.Stdin = decompressor
142                         if err = cmd.Run(); err != nil {
143                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "sendmail")
144                                 goto Closing
145                         }
146                         ctx.LogI("rx", sds, "")
147                         if err = os.Remove(job.Fd.Name()); err != nil {
148                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
149                         }
150                 case PktTypeFile:
151                         dst := string(pkt.Path[:int(pkt.PathLen)])
152                         sds := SdsAdd(sds, SDS{"type": "file", "dst": dst})
153                         incoming := ctx.Neigh[*job.PktEnc.Sender].Incoming
154                         if incoming == nil {
155                                 ctx.LogE("rx", sds, "incoming is not allowed")
156                                 goto Closing
157                         }
158                         dir := filepath.Join(*incoming, path.Dir(dst))
159                         if err = os.MkdirAll(dir, os.FileMode(0700)); err != nil {
160                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "mkdir")
161                                 goto Closing
162                         }
163                         tmp, err := ioutil.TempFile(dir, "nncp-file")
164                         sds["tmp"] = tmp.Name()
165                         ctx.LogD("rx", sds, "created")
166                         if err != nil {
167                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "mktemp")
168                                 goto Closing
169                         }
170                         bufW := bufio.NewWriter(tmp)
171                         if _, err = io.Copy(bufW, pipeR); err != nil {
172                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "copy")
173                                 goto Closing
174                         }
175                         bufW.Flush()
176                         tmp.Sync()
177                         tmp.Close()
178                         dstPathOrig := filepath.Join(*incoming, dst)
179                         dstPath := dstPathOrig
180                         dstPathCtr := 0
181                         for {
182                                 if _, err = os.Stat(dstPath); err != nil {
183                                         if os.IsNotExist(err) {
184                                                 break
185                                         }
186                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "stat")
187                                         goto Closing
188                                 }
189                                 dstPath = dstPathOrig + strconv.Itoa(dstPathCtr)
190                                 dstPathCtr++
191                         }
192                         if err = os.Rename(tmp.Name(), dstPath); err != nil {
193                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "rename")
194                         }
195                         delete(sds, "tmp")
196                         ctx.LogI("rx", sds, "")
197                         if err = os.Remove(job.Fd.Name()); err != nil {
198                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
199                         }
200                         sendmail := ctx.Neigh[*ctx.Self.Id].Sendmail
201                         if ctx.NotifyFile != nil {
202                                 cmd := exec.Command(
203                                         sendmail[0],
204                                         append(sendmail[1:len(sendmail)], ctx.NotifyFile.To)...,
205                                 )
206                                 cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
207                                         "File from %s: %s (%s)",
208                                         ctx.Neigh[*job.PktEnc.Sender].Name,
209                                         dst,
210                                         humanize.IBytes(uint64(pktSize)),
211                                 ))
212                                 cmd.Run()
213                         }
214                 case PktTypeFreq:
215                         src := string(pkt.Path[:int(pkt.PathLen)])
216                         sds := SdsAdd(sds, SDS{"type": "freq", "src": src})
217                         dstRaw, err := ioutil.ReadAll(pipeR)
218                         if err != nil {
219                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "read")
220                                 goto Closing
221                         }
222                         dst := string(dstRaw)
223                         sds["dst"] = dst
224                         sender := ctx.Neigh[*job.PktEnc.Sender]
225                         freq := sender.Freq
226                         if freq == nil {
227                                 ctx.LogE("rx", sds, "freqing is not allowed")
228                                 goto Closing
229                         }
230                         err = ctx.TxFile(sender, job.PktEnc.Nice, filepath.Join(*freq, src), dst)
231                         if err != nil {
232                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx file")
233                                 goto Closing
234                         }
235                         ctx.LogI("rx", sds, "")
236                         if err = os.Remove(job.Fd.Name()); err != nil {
237                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
238                         }
239                         if ctx.NotifyFreq != nil {
240                                 sendmail := ctx.Neigh[*ctx.Self.Id].Sendmail
241                                 cmd := exec.Command(
242                                         sendmail[0],
243                                         append(sendmail[1:len(sendmail)], ctx.NotifyFreq.To)...,
244                                 )
245                                 cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
246                                         "Freq from %s: %s",
247                                         ctx.Neigh[*job.PktEnc.Sender].Name,
248                                         src,
249                                 ))
250                                 cmd.Run()
251                         }
252                 case PktTypeTrns:
253                         dst := new([blake2b.Size256]byte)
254                         copy(dst[:], pkt.Path[:int(pkt.PathLen)])
255                         nodeId := NodeId(*dst)
256                         node, known := ctx.Neigh[nodeId]
257                         sds := SdsAdd(sds, SDS{"type": "trns", "dst": nodeId})
258                         if !known {
259                                 ctx.LogE("rx", sds, "unknown node")
260                                 goto Closing
261                         }
262                         ctx.LogD("rx", sds, "taken")
263                         if err = ctx.TxTrns(node, job.PktEnc.Nice, pktSize, pipeR); err != nil {
264                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx trns")
265                                 goto Closing
266                         }
267                         ctx.LogI("rx", sds, "")
268                         if err = os.Remove(job.Fd.Name()); err != nil {
269                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
270                         }
271                 default:
272                         ctx.LogE("rx", sds, "unknown type")
273                 }
274         Closing:
275                 pipeR.Close()
276         }
277 }