]> Cypherpunks.ru repositories - nncp.git/blob - src/cypherpunks.ru/nncp/toss.go
Initial
[nncp.git] / src / cypherpunks.ru / nncp / toss.go
1 /*
2 NNCP -- Node-to-Node CoPy
3 Copyright (C) 2016-2017 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 package nncp
20
21 import (
22         "bufio"
23         "compress/zlib"
24         "fmt"
25         "io"
26         "io/ioutil"
27         "log"
28         "mime"
29         "os"
30         "os/exec"
31         "path"
32         "path/filepath"
33         "strconv"
34         "strings"
35
36         "github.com/davecgh/go-xdr/xdr2"
37         "github.com/dustin/go-humanize"
38         "golang.org/x/crypto/blake2b"
39         "golang.org/x/sys/unix"
40 )
41
42 func newNotification(fromTo *FromToYAML, subject string) io.Reader {
43         return strings.NewReader(fmt.Sprintf(
44                 "From: %s\nTo: %s\nSubject: %s\n",
45                 fromTo.From,
46                 fromTo.To,
47                 mime.BEncoding.Encode("UTF-8", subject),
48         ))
49 }
50
51 func (ctx *Ctx) LockDir(nodeId *NodeId, xx TRxTx) (*os.File, error) {
52         lockPath := filepath.Join(ctx.Spool, nodeId.String(), string(xx)) + ".lock"
53         dirLock, err := os.OpenFile(
54                 lockPath,
55                 os.O_CREATE|os.O_WRONLY,
56                 os.FileMode(0600),
57         )
58         if err != nil {
59                 ctx.LogE("lockdir", SDS{"path": lockPath, "err": err}, "")
60                 return nil, err
61         }
62         err = unix.Flock(int(dirLock.Fd()), unix.LOCK_EX|unix.LOCK_NB)
63         if err != nil {
64                 ctx.LogE("lockdir", SDS{"path": lockPath, "err": err}, "")
65                 dirLock.Close()
66                 return nil, err
67         }
68         return dirLock, nil
69 }
70
71 func (ctx *Ctx) UnlockDir(fd *os.File) {
72         if fd != nil {
73                 unix.Flock(int(fd.Fd()), unix.LOCK_UN)
74                 fd.Close()
75         }
76 }
77
78 func (ctx *Ctx) Toss(nodeId *NodeId, nice uint8) {
79         dirLock, err := ctx.LockDir(nodeId, TRx)
80         if err != nil {
81                 return
82         }
83         defer ctx.UnlockDir(dirLock)
84         for job := range ctx.Jobs(nodeId, TRx) {
85                 pktName := filepath.Base(job.Fd.Name())
86                 sds := SDS{"node": job.PktEnc.Sender, "pkt": pktName}
87                 if job.PktEnc.Nice > nice {
88                         ctx.LogD("rx", SdsAdd(sds, SDS{
89                                 "nice": strconv.Itoa(int(job.PktEnc.Nice)),
90                         }), "too nice")
91                         continue
92                 }
93                 pipeR, pipeW := io.Pipe()
94                 errs := make(chan error, 1)
95                 go func(job Job) {
96                         pipeWB := bufio.NewWriter(pipeW)
97                         _, err := PktEncRead(
98                                 ctx.Self,
99                                 ctx.Neigh,
100                                 bufio.NewReader(job.Fd),
101                                 pipeWB,
102                         )
103                         errs <- err
104                         pipeWB.Flush()
105                         pipeW.Close()
106                         job.Fd.Close()
107                         if err != nil {
108                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "decryption")
109                         }
110                 }(job)
111                 var pkt Pkt
112                 var err error
113                 var pktSize int64
114                 if _, err = xdr.Unmarshal(pipeR, &pkt); err != nil {
115                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "unmarshal")
116                         goto Closing
117                 }
118                 pktSize = job.Size - PktEncOverhead - PktOverhead
119                 sds["size"] = strconv.FormatInt(pktSize, 10)
120                 ctx.LogD("rx", sds, "taken")
121                 switch pkt.Type {
122                 case PktTypeMail:
123                         recipients := string(pkt.Path[:int(pkt.PathLen)])
124                         sds := SdsAdd(sds, SDS{
125                                 "type": "mail",
126                                 "dst":  recipients,
127                         })
128                         decompressor, err := zlib.NewReader(pipeR)
129                         if err != nil {
130                                 log.Fatalln(err)
131                         }
132                         cmd := exec.Command(
133                                 ctx.Sendmail[0],
134                                 append(
135                                         ctx.Sendmail[1:len(ctx.Sendmail)],
136                                         strings.Split(recipients, " ")...,
137                                 )...,
138                         )
139                         cmd.Stdin = decompressor
140                         if err = cmd.Run(); err != nil {
141                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "sendmail")
142                                 goto Closing
143                         }
144                         ctx.LogI("rx", sds, "")
145                         if err = os.Remove(job.Fd.Name()); err != nil {
146                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
147                         }
148                 case PktTypeFile:
149                         dst := string(pkt.Path[:int(pkt.PathLen)])
150                         sds := SdsAdd(sds, SDS{"type": "file", "dst": dst})
151                         incoming := ctx.Neigh[*job.PktEnc.Sender].Incoming
152                         if incoming == nil {
153                                 ctx.LogE("rx", sds, "incoming is not allowed")
154                                 goto Closing
155                         }
156                         dir := filepath.Join(*incoming, path.Dir(dst))
157                         if err = os.MkdirAll(dir, os.FileMode(0700)); err != nil {
158                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "mkdir")
159                                 goto Closing
160                         }
161                         tmp, err := ioutil.TempFile(dir, "nncp-file")
162                         sds["tmp"] = tmp.Name()
163                         ctx.LogD("rx", sds, "created")
164                         if err != nil {
165                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "mktemp")
166                                 goto Closing
167                         }
168                         bufW := bufio.NewWriter(tmp)
169                         if _, err = io.Copy(bufW, pipeR); err != nil {
170                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "copy")
171                                 goto Closing
172                         }
173                         bufW.Flush()
174                         tmp.Sync()
175                         tmp.Close()
176                         dstPathOrig := filepath.Join(*incoming, dst)
177                         dstPath := dstPathOrig
178                         dstPathCtr := 0
179                         for {
180                                 if _, err = os.Stat(dstPath); err != nil {
181                                         if os.IsNotExist(err) {
182                                                 break
183                                         }
184                                         ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "stat")
185                                         goto Closing
186                                 }
187                                 dstPath = dstPathOrig + strconv.Itoa(dstPathCtr)
188                                 dstPathCtr++
189                         }
190                         if err = os.Rename(tmp.Name(), dstPath); err != nil {
191                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "rename")
192                         }
193                         delete(sds, "tmp")
194                         ctx.LogI("rx", sds, "")
195                         if err = os.Remove(job.Fd.Name()); err != nil {
196                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
197                         }
198                         if ctx.NotifyFile != nil {
199                                 cmd := exec.Command(
200                                         ctx.Sendmail[0],
201                                         append(
202                                                 ctx.Sendmail[1:len(ctx.Sendmail)],
203                                                 ctx.NotifyFile.To,
204                                         )...,
205                                 )
206                                 cmd.Stdin = newNotification(ctx.NotifyFile, fmt.Sprintf(
207                                         "File from %s: %s (%s)",
208                                         ctx.Neigh[*job.PktEnc.Sender].Name,
209                                         dst,
210                                         humanize.IBytes(uint64(pktSize)),
211                                 ))
212                                 cmd.Run()
213                         }
214                 case PktTypeFreq:
215                         src := string(pkt.Path[:int(pkt.PathLen)])
216                         sds := SdsAdd(sds, SDS{"type": "freq", "src": src})
217                         dstRaw, err := ioutil.ReadAll(pipeR)
218                         if err != nil {
219                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "read")
220                                 goto Closing
221                         }
222                         dst := string(dstRaw)
223                         sds["dst"] = dst
224                         sender := ctx.Neigh[*job.PktEnc.Sender]
225                         freq := sender.Freq
226                         if freq == nil {
227                                 ctx.LogE("rx", sds, "freqing is not allowed")
228                                 goto Closing
229                         }
230                         err = ctx.TxFile(sender, job.PktEnc.Nice, filepath.Join(*freq, src), dst)
231                         if err != nil {
232                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx file")
233                                 goto Closing
234                         }
235                         ctx.LogI("rx", sds, "")
236                         if err = os.Remove(job.Fd.Name()); err != nil {
237                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
238                         }
239                         if ctx.NotifyFreq != nil {
240                                 cmd := exec.Command(
241                                         ctx.Sendmail[0],
242                                         append(
243                                                 ctx.Sendmail[1:len(ctx.Sendmail)],
244                                                 ctx.NotifyFreq.To,
245                                         )...,
246                                 )
247                                 cmd.Stdin = newNotification(ctx.NotifyFreq, fmt.Sprintf(
248                                         "Freq from %s: %s",
249                                         ctx.Neigh[*job.PktEnc.Sender].Name,
250                                         src,
251                                 ))
252                                 cmd.Run()
253                         }
254                 case PktTypeTrns:
255                         dst := new([blake2b.Size256]byte)
256                         copy(dst[:], pkt.Path[:int(pkt.PathLen)])
257                         nodeId := NodeId(*dst)
258                         node, known := ctx.Neigh[nodeId]
259                         sds := SdsAdd(sds, SDS{"type": "trns", "dst": nodeId})
260                         if !known {
261                                 ctx.LogE("rx", sds, "unknown node")
262                                 goto Closing
263                         }
264                         ctx.LogD("rx", sds, "taken")
265                         if err = ctx.TxTrns(node, job.PktEnc.Nice, pktSize, pipeR); err != nil {
266                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "tx trns")
267                                 goto Closing
268                         }
269                         ctx.LogI("rx", sds, "")
270                         if err = os.Remove(job.Fd.Name()); err != nil {
271                                 ctx.LogE("rx", SdsAdd(sds, SDS{"err": err}), "remove")
272                         }
273                 default:
274                         ctx.LogE("rx", sds, "unknown type")
275                 }
276         Closing:
277                 pipeR.Close()
278         }
279 }