]> Cypherpunks.ru repositories - nncp.git/blob - src/cypherpunks.ru/nncp/llp.go
6b91f2c04426a76420cf8d2a36b4ca53b0382f5f
[nncp.git] / src / cypherpunks.ru / nncp / llp.go
1 /*
2 NNCP -- Node-to-Node CoPy
3 Copyright (C) 2016-2017 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 package nncp
20
21 import (
22         "bytes"
23         "crypto/subtle"
24         "errors"
25         "io"
26         "net"
27         "os"
28         "path/filepath"
29         "sort"
30         "strconv"
31         "sync"
32         "time"
33
34         "github.com/davecgh/go-xdr/xdr2"
35         "github.com/flynn/noise"
36 )
37
38 const (
39         MaxLLPSize       = 2<<15 - 256
40         PartSuffix       = ".part"
41         DeadlineDuration = 10
42 )
43
44 var (
45         MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'L', 1, 0, 0}
46
47         LLPHeadOverhead    int
48         LLPInfoOverhead    int
49         LLPFreqOverhead    int
50         LLPFileOverhead    int
51         LLPHaltMarshalized []byte
52
53         NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite(
54                 noise.DH25519,
55                 noise.CipherChaChaPoly,
56                 noise.HashBLAKE2b,
57         )
58 )
59
60 type LLPType uint8
61
62 const (
63         LLPTypeInfo LLPType = iota
64         LLPTypeFreq LLPType = iota
65         LLPTypeFile LLPType = iota
66         LLPTypeDone LLPType = iota
67         LLPTypeHalt LLPType = iota
68 )
69
70 type LLPHead struct {
71         Type LLPType
72 }
73
74 type LLPInfo struct {
75         Nice uint8
76         Size uint64
77         Hash *[32]byte
78 }
79
80 type LLPFreq struct {
81         Hash   *[32]byte
82         Offset uint64
83 }
84
85 type LLPFile struct {
86         Hash    *[32]byte
87         Offset  uint64
88         Payload []byte
89 }
90
91 type LLPDone struct {
92         Hash *[32]byte
93 }
94
95 type LLPRaw struct {
96         Magic   [8]byte
97         Payload []byte
98 }
99
100 func init() {
101         var buf bytes.Buffer
102         llpHead := LLPHead{Type: LLPTypeHalt}
103         if _, err := xdr.Marshal(&buf, llpHead); err != nil {
104                 panic(err)
105         }
106         copy(LLPHaltMarshalized, buf.Bytes())
107         LLPHeadOverhead = buf.Len()
108         buf.Reset()
109
110         llpInfo := LLPInfo{Nice: 123, Size: 123, Hash: new([32]byte)}
111         if _, err := xdr.Marshal(&buf, llpInfo); err != nil {
112                 panic(err)
113         }
114         LLPInfoOverhead = buf.Len()
115         buf.Reset()
116
117         llpFreq := LLPFreq{Hash: new([32]byte), Offset: 123}
118         if _, err := xdr.Marshal(&buf, llpFreq); err != nil {
119                 panic(err)
120         }
121         LLPFreqOverhead = buf.Len()
122         buf.Reset()
123
124         llpFile := LLPFile{Hash: new([32]byte), Offset: 123}
125         if _, err := xdr.Marshal(&buf, llpFile); err != nil {
126                 panic(err)
127         }
128         LLPFileOverhead = buf.Len()
129 }
130
131 func MarshalLLP(typ LLPType, llp interface{}) []byte {
132         var buf bytes.Buffer
133         var err error
134         if _, err = xdr.Marshal(&buf, LLPHead{typ}); err != nil {
135                 panic(err)
136         }
137         if _, err = xdr.Marshal(&buf, llp); err != nil {
138                 panic(err)
139         }
140         return buf.Bytes()
141 }
142
143 func payloadsSplit(payloads [][]byte) [][]byte {
144         var outbounds [][]byte
145         outbound := make([]byte, 0, MaxLLPSize)
146         for i, payload := range payloads {
147                 outbound = append(outbound, payload...)
148                 if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxLLPSize {
149                         outbounds = append(outbounds, outbound)
150                         outbound = make([]byte, 0, MaxLLPSize)
151                 }
152         }
153         if len(outbound) > 0 {
154                 outbounds = append(outbounds, outbound)
155         }
156         return outbounds
157 }
158
159 type LLPState struct {
160         ctx        *Ctx
161         NodeId     *NodeId
162         nice       uint8
163         hs         *noise.HandshakeState
164         csOur      *noise.CipherState
165         csTheir    *noise.CipherState
166         payloads   chan []byte
167         infosTheir map[[32]byte]*LLPInfo
168         queueTheir []*LLPFreq
169         wg         sync.WaitGroup
170         RxBytes    int64
171         RxLastSeen time.Time
172         TxBytes    int64
173         TxLastSeen time.Time
174         started    time.Time
175         Duration   time.Duration
176         RxSpeed    int64
177         TxSpeed    int64
178         rxLock     *os.File
179         txLock     *os.File
180         xxOnly     *TRxTx
181         sync.RWMutex
182 }
183
184 func (state *LLPState) isDead() bool {
185         now := time.Now()
186         return now.Sub(state.RxLastSeen).Seconds() >= DeadlineDuration && now.Sub(state.TxLastSeen).Seconds() >= DeadlineDuration
187 }
188
189 func (state *LLPState) dirUnlock() {
190         state.ctx.UnlockDir(state.rxLock)
191         state.ctx.UnlockDir(state.txLock)
192 }
193
194 func (state *LLPState) WriteLLP(dst io.Writer, payload []byte) error {
195         n, err := xdr.Marshal(dst, LLPRaw{Magic: MagicNNCPLv1, Payload: payload})
196         if err == nil {
197                 state.TxLastSeen = time.Now()
198                 state.TxBytes += int64(n)
199         }
200         return err
201 }
202
203 func (state *LLPState) ReadLLP(src io.Reader) ([]byte, error) {
204         var llp LLPRaw
205         n, err := xdr.Unmarshal(src, &llp)
206         if err != nil {
207                 return nil, err
208         }
209         state.RxLastSeen = time.Now()
210         state.RxBytes += int64(n)
211         if llp.Magic != MagicNNCPLv1 {
212                 return nil, BadMagic
213         }
214         return llp.Payload, nil
215 }
216
217 func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8) [][]byte {
218         var infos []*LLPInfo
219         var totalSize int64
220         for job := range ctx.Jobs(nodeId, TTx) {
221                 job.Fd.Close()
222                 if job.PktEnc.Nice > nice {
223                         continue
224                 }
225                 totalSize += job.Size
226                 infos = append(infos, &LLPInfo{
227                         Nice: job.PktEnc.Nice,
228                         Size: uint64(job.Size),
229                         Hash: job.HshValue,
230                 })
231         }
232         sort.Sort(ByNice(infos))
233         var payloads [][]byte
234         for _, info := range infos {
235                 payloads = append(payloads, MarshalLLP(LLPTypeInfo, info))
236                 ctx.LogD("llp-info-our", SDS{
237                         "node": nodeId,
238                         "name": ToBase32(info.Hash[:]),
239                         "size": strconv.FormatInt(int64(info.Size), 10),
240                 }, "")
241         }
242         ctx.LogI("llp-infos", SDS{
243                 "xx":   string(TTx),
244                 "node": nodeId,
245                 "pkts": strconv.Itoa(len(payloads)),
246                 "size": strconv.FormatInt(totalSize, 10),
247         }, "")
248         return payloadsSplit(payloads)
249 }
250
251 func (ctx *Ctx) ensureRxDir(nodeId *NodeId) error {
252         dirPath := filepath.Join(ctx.Spool, nodeId.String(), string(TRx))
253         if err := os.MkdirAll(dirPath, os.FileMode(0700)); err != nil {
254                 ctx.LogE("llp-ensure", SDS{"dir": dirPath, "err": err}, "")
255                 return err
256         }
257         fd, err := os.Open(dirPath)
258         if err != nil {
259                 ctx.LogE("llp-ensure", SDS{"dir": dirPath, "err": err}, "")
260                 return err
261         }
262         fd.Close()
263         return nil
264 }
265
266 func (ctx *Ctx) StartI(conn net.Conn, nodeId *NodeId, nice uint8, xxOnly *TRxTx) (*LLPState, error) {
267         err := ctx.ensureRxDir(nodeId)
268         if err != nil {
269                 return nil, err
270         }
271         var rxLock *os.File
272         if xxOnly != nil && *xxOnly == TRx {
273                 rxLock, err = ctx.LockDir(nodeId, TRx)
274                 if err != nil {
275                         return nil, err
276                 }
277         }
278         var txLock *os.File
279         if xxOnly != nil && *xxOnly == TTx {
280                 txLock, err = ctx.LockDir(nodeId, TTx)
281                 if err != nil {
282                         return nil, err
283                 }
284         }
285         started := time.Now()
286         conf := noise.Config{
287                 CipherSuite: NoiseCipherSuite,
288                 Pattern:     noise.HandshakeIK,
289                 Initiator:   true,
290                 StaticKeypair: noise.DHKey{
291                         Private: ctx.Self.NoisePrv[:],
292                         Public:  ctx.Self.NoisePub[:],
293                 },
294                 PeerStatic: ctx.Neigh[*nodeId].NoisePub[:],
295         }
296         state := LLPState{
297                 ctx:        ctx,
298                 hs:         noise.NewHandshakeState(conf),
299                 NodeId:     nodeId,
300                 nice:       nice,
301                 payloads:   make(chan []byte),
302                 infosTheir: make(map[[32]byte]*LLPInfo),
303                 started:    started,
304                 rxLock:     rxLock,
305                 txLock:     txLock,
306                 xxOnly:     xxOnly,
307         }
308
309         var infosPayloads [][]byte
310         if xxOnly == nil || *xxOnly != TTx {
311                 infosPayloads = ctx.infosOur(nodeId, nice)
312         }
313         var firstPayload []byte
314         if len(infosPayloads) > 0 {
315                 firstPayload = infosPayloads[0]
316         }
317         for i := 0; i < (MaxLLPSize-len(firstPayload))/LLPHeadOverhead; i++ {
318                 firstPayload = append(firstPayload, LLPHaltMarshalized...)
319         }
320
321         var buf []byte
322         var payload []byte
323         buf, _, _ = state.hs.WriteMessage(nil, firstPayload)
324         sds := SDS{"node": nodeId, "nice": strconv.Itoa(int(nice))}
325         ctx.LogD("llp-start", sds, "sending first message")
326         conn.SetWriteDeadline(time.Now().Add(DeadlineDuration * time.Second))
327         if err = state.WriteLLP(conn, buf); err != nil {
328                 ctx.LogE("llp-start", SdsAdd(sds, SDS{"err": err}), "")
329                 state.dirUnlock()
330                 return nil, err
331         }
332         ctx.LogD("llp-start", sds, "waiting for first message")
333         conn.SetReadDeadline(time.Now().Add(DeadlineDuration * time.Second))
334         if buf, err = state.ReadLLP(conn); err != nil {
335                 ctx.LogE("llp-start", SdsAdd(sds, SDS{"err": err}), "")
336                 state.dirUnlock()
337                 return nil, err
338         }
339         payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
340         if err != nil {
341                 ctx.LogE("llp-start", SdsAdd(sds, SDS{"err": err}), "")
342                 state.dirUnlock()
343                 return nil, err
344         }
345         ctx.LogD("llp-start", sds, "starting workers")
346         err = state.StartWorkers(conn, infosPayloads, payload)
347         if err != nil {
348                 ctx.LogE("llp-start", SdsAdd(sds, SDS{"err": err}), "")
349                 state.dirUnlock()
350                 return nil, err
351         }
352         return &state, err
353 }
354
355 func (ctx *Ctx) StartR(conn net.Conn, nice uint8, xxOnly *TRxTx) (*LLPState, error) {
356         started := time.Now()
357         conf := noise.Config{
358                 CipherSuite: NoiseCipherSuite,
359                 Pattern:     noise.HandshakeIK,
360                 Initiator:   false,
361                 StaticKeypair: noise.DHKey{
362                         Private: ctx.Self.NoisePrv[:],
363                         Public:  ctx.Self.NoisePub[:],
364                 },
365         }
366         state := LLPState{
367                 ctx:        ctx,
368                 hs:         noise.NewHandshakeState(conf),
369                 nice:       nice,
370                 payloads:   make(chan []byte),
371                 infosTheir: make(map[[32]byte]*LLPInfo),
372                 started:    started,
373                 xxOnly:     xxOnly,
374         }
375         var buf []byte
376         var payload []byte
377         var err error
378         ctx.LogD(
379                 "llp-start",
380                 SDS{"nice": strconv.Itoa(int(nice))},
381                 "waiting for first message",
382         )
383         conn.SetReadDeadline(time.Now().Add(DeadlineDuration * time.Second))
384         if buf, err = state.ReadLLP(conn); err != nil {
385                 ctx.LogE("llp-start", SDS{"err": err}, "")
386                 return nil, err
387         }
388         if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
389                 ctx.LogE("llp-start", SDS{"err": err}, "")
390                 return nil, err
391         }
392
393         var nodeId *NodeId
394         for _, node := range ctx.Neigh {
395                 if subtle.ConstantTimeCompare(state.hs.PeerStatic(), node.NoisePub[:]) == 1 {
396                         nodeId = node.Id
397                         break
398                 }
399         }
400         if nodeId == nil {
401                 peerId := ToBase32(state.hs.PeerStatic())
402                 ctx.LogE("llp-start", SDS{"peer": peerId}, "unknown")
403                 return nil, errors.New("Unknown peer: " + peerId)
404         }
405         state.NodeId = nodeId
406         sds := SDS{"node": nodeId, "nice": strconv.Itoa(int(nice))}
407
408         if ctx.ensureRxDir(nodeId); err != nil {
409                 return nil, err
410         }
411         var rxLock *os.File
412         if xxOnly != nil && *xxOnly == TRx {
413                 rxLock, err = ctx.LockDir(nodeId, TRx)
414                 if err != nil {
415                         return nil, err
416                 }
417         }
418         state.rxLock = rxLock
419         var txLock *os.File
420         if xxOnly != nil && *xxOnly == TTx {
421                 txLock, err = ctx.LockDir(nodeId, TTx)
422                 if err != nil {
423                         return nil, err
424                 }
425         }
426         state.txLock = txLock
427
428         var infosPayloads [][]byte
429         if xxOnly == nil || *xxOnly != TTx {
430                 infosPayloads = ctx.infosOur(nodeId, nice)
431         }
432         var firstPayload []byte
433         if len(infosPayloads) > 0 {
434                 firstPayload = infosPayloads[0]
435         }
436         for i := 0; i < (MaxLLPSize-len(firstPayload))/LLPHeadOverhead; i++ {
437                 firstPayload = append(firstPayload, LLPHaltMarshalized...)
438         }
439
440         ctx.LogD("llp-start", sds, "sending first message")
441         buf, state.csTheir, state.csOur = state.hs.WriteMessage(nil, firstPayload)
442         conn.SetWriteDeadline(time.Now().Add(DeadlineDuration * time.Second))
443         if err = state.WriteLLP(conn, buf); err != nil {
444                 ctx.LogE("llp-start", SdsAdd(sds, SDS{"err": err}), "")
445                 state.dirUnlock()
446                 return nil, err
447         }
448         ctx.LogD("llp-start", sds, "starting workers")
449         err = state.StartWorkers(conn, infosPayloads, payload)
450         if err != nil {
451                 state.dirUnlock()
452                 return nil, err
453         }
454         return &state, err
455 }
456
457 func (state *LLPState) StartWorkers(conn net.Conn, infosPayloads [][]byte, payload []byte) error {
458         sds := SDS{"node": state.NodeId, "nice": strconv.Itoa(int(state.nice))}
459         if len(infosPayloads) > 1 {
460                 go func() {
461                         for _, payload := range infosPayloads[1:] {
462                                 state.ctx.LogD(
463                                         "llp-work",
464                                         SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
465                                         "queuing remaining payload",
466                                 )
467                                 state.payloads <- payload
468                         }
469                 }()
470         }
471         state.ctx.LogD(
472                 "llp-work",
473                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
474                 "processing first payload",
475         )
476         replies, err := state.ProcessLLP(payload)
477         if err != nil {
478                 state.ctx.LogE("llp-work", SdsAdd(sds, SDS{"err": err}), "")
479                 return err
480         }
481         go func() {
482                 for _, reply := range replies {
483                         state.ctx.LogD(
484                                 "llp-work",
485                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(reply))}),
486                                 "queuing reply",
487                         )
488                         state.payloads <- reply
489                 }
490         }()
491         state.wg.Add(1)
492         go func() {
493                 defer state.wg.Done()
494                 for {
495                         if state.isDead() {
496                                 return
497                         }
498                         var payload []byte
499                         select {
500                         case payload = <-state.payloads:
501                                 state.ctx.LogD(
502                                         "llp-xmit",
503                                         SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
504                                         "got payload",
505                                 )
506                         default:
507                         }
508                         if payload == nil {
509                                 state.RLock()
510                                 if len(state.queueTheir) == 0 {
511                                         state.ctx.LogD("llp-xmit", sds, "file queue is empty")
512                                         state.RUnlock()
513                                         time.Sleep(100 * time.Millisecond)
514                                         continue
515                                 }
516                                 freq := state.queueTheir[0]
517                                 state.RUnlock()
518                                 sdsp := SdsAdd(sds, SDS{
519                                         "xx":   string(TTx),
520                                         "hash": ToBase32(freq.Hash[:]),
521                                         "size": strconv.FormatInt(int64(freq.Offset), 10),
522                                 })
523                                 state.ctx.LogD("llp-file", sdsp, "queueing")
524                                 fd, err := os.Open(filepath.Join(
525                                         state.ctx.Spool,
526                                         state.NodeId.String(),
527                                         string(TTx),
528                                         ToBase32(freq.Hash[:]),
529                                 ))
530                                 if err != nil {
531                                         state.ctx.LogE("llp-file", SdsAdd(sdsp, SDS{"err": err}), "")
532                                         break
533                                 }
534                                 fi, err := fd.Stat()
535                                 if err != nil {
536                                         state.ctx.LogE("llp-file", SdsAdd(sdsp, SDS{"err": err}), "")
537                                         break
538                                 }
539                                 fullSize := uint64(fi.Size())
540                                 var buf []byte
541                                 if freq.Offset < fullSize {
542                                         state.ctx.LogD("llp-file", sdsp, "seeking")
543                                         if _, err = fd.Seek(int64(freq.Offset), 0); err != nil {
544                                                 state.ctx.LogE("llp-file", SdsAdd(sdsp, SDS{"err": err}), "")
545                                                 break
546                                         }
547                                         buf = make([]byte, MaxLLPSize-LLPHeadOverhead-LLPFileOverhead)
548                                         n, err := fd.Read(buf)
549                                         if err != nil {
550                                                 state.ctx.LogE("llp-file", SdsAdd(sdsp, SDS{"err": err}), "")
551                                                 break
552                                         }
553                                         buf = buf[:n]
554                                         state.ctx.LogD(
555                                                 "llp-file",
556                                                 SdsAdd(sdsp, SDS{"size": strconv.Itoa(n)}),
557                                                 "read",
558                                         )
559                                 }
560                                 fd.Close()
561                                 payload = MarshalLLP(LLPTypeFile, LLPFile{
562                                         Hash:    freq.Hash,
563                                         Offset:  freq.Offset,
564                                         Payload: buf,
565                                 })
566                                 ourSize := freq.Offset + uint64(len(buf))
567                                 sdsp["size"] = strconv.FormatInt(int64(ourSize), 10)
568                                 sdsp["fullsize"] = strconv.FormatInt(int64(fullSize), 10)
569                                 state.ctx.LogP("llp-file", sdsp, "")
570                                 state.Lock()
571                                 if len(state.queueTheir) > 0 && *state.queueTheir[0].Hash == *freq.Hash {
572                                         if ourSize == fullSize {
573                                                 state.ctx.LogD("llp-file", sdsp, "finished")
574                                                 if len(state.queueTheir) > 1 {
575                                                         state.queueTheir = state.queueTheir[1:]
576                                                 } else {
577                                                         state.queueTheir = state.queueTheir[:0]
578                                                 }
579                                         } else {
580                                                 state.queueTheir[0].Offset += uint64(len(buf))
581                                         }
582                                 } else {
583                                         state.ctx.LogD("llp-file", sdsp, "queue disappeared")
584                                 }
585                                 state.Unlock()
586                         }
587                         state.ctx.LogD(
588                                 "llp-xmit",
589                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
590                                 "sending",
591                         )
592                         conn.SetWriteDeadline(time.Now().Add(DeadlineDuration * time.Second))
593                         if err := state.WriteLLP(conn, state.csOur.Encrypt(nil, nil, payload)); err != nil {
594                                 state.ctx.LogE("llp-xmit", SdsAdd(sds, SDS{"err": err}), "")
595                                 break
596                         }
597                 }
598         }()
599         state.wg.Add(1)
600         go func() {
601                 defer state.wg.Done()
602                 for {
603                         if state.isDead() {
604                                 return
605                         }
606                         state.ctx.LogD("llp-recv", sds, "waiting for payload")
607                         conn.SetReadDeadline(time.Now().Add(DeadlineDuration * time.Second))
608                         payload, err := state.ReadLLP(conn)
609                         if err != nil {
610                                 unmarshalErr := err.(*xdr.UnmarshalError)
611                                 netErr, ok := unmarshalErr.Err.(net.Error)
612                                 if (ok && netErr.Timeout()) || unmarshalErr.ErrorCode == xdr.ErrIO {
613                                         continue
614                                 } else {
615                                         state.ctx.LogE("llp-recv", SdsAdd(sds, SDS{"err": err}), "")
616                                         break
617                                 }
618                         }
619                         state.ctx.LogD(
620                                 "llp-recv",
621                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
622                                 "got payload",
623                         )
624                         payload, err = state.csTheir.Decrypt(nil, nil, payload)
625                         if err != nil {
626                                 state.ctx.LogE("llp-recv", SdsAdd(sds, SDS{"err": err}), "")
627                                 break
628                         }
629                         state.ctx.LogD(
630                                 "llp-recv",
631                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(payload))}),
632                                 "processing",
633                         )
634                         replies, err := state.ProcessLLP(payload)
635                         if err != nil {
636                                 state.ctx.LogE("llp-recv", SdsAdd(sds, SDS{"err": err}), "")
637                                 break
638                         }
639                         go func() {
640                                 for _, reply := range replies {
641                                         state.ctx.LogD(
642                                                 "llp-recv",
643                                                 SdsAdd(sds, SDS{"size": strconv.Itoa(len(reply))}),
644                                                 "queuing reply",
645                                         )
646                                         state.payloads <- reply
647                                 }
648                         }()
649                 }
650         }()
651         return nil
652 }
653
654 func (state *LLPState) Wait() {
655         state.wg.Wait()
656         state.dirUnlock()
657         state.Duration = time.Now().Sub(state.started)
658         state.RxSpeed = state.RxBytes
659         state.TxSpeed = state.TxBytes
660         rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
661         txDuration := int64(state.TxLastSeen.Sub(state.started).Seconds())
662         if rxDuration > 0 {
663                 state.RxSpeed = state.RxBytes / rxDuration
664         }
665         if txDuration > 0 {
666                 state.TxSpeed = state.TxBytes / txDuration
667         }
668 }
669
670 func (state *LLPState) ProcessLLP(payload []byte) ([][]byte, error) {
671         sds := SDS{"node": state.NodeId, "nice": strconv.Itoa(int(state.nice))}
672         r := bytes.NewReader(payload)
673         var err error
674         var replies [][]byte
675         var infosGot bool
676         for r.Len() > 0 {
677                 state.ctx.LogD("llp-process", sds, "unmarshaling header")
678                 var head LLPHead
679                 if _, err = xdr.Unmarshal(r, &head); err != nil {
680                         state.ctx.LogE("llp-process", SdsAdd(sds, SDS{"err": err}), "")
681                         return nil, err
682                 }
683                 switch head.Type {
684                 case LLPTypeInfo:
685                         infosGot = true
686                         sdsp := SdsAdd(sds, SDS{"type": "info"})
687                         state.ctx.LogD("llp-process", sdsp, "unmarshaling packet")
688                         var info LLPInfo
689                         if _, err = xdr.Unmarshal(r, &info); err != nil {
690                                 state.ctx.LogE("llp-process", SdsAdd(sdsp, SDS{"err": err}), "")
691                                 return nil, err
692                         }
693                         sdsp = SdsAdd(sds, SDS{
694                                 "hash": ToBase32(info.Hash[:]),
695                                 "size": strconv.FormatInt(int64(info.Size), 10),
696                         })
697                         if info.Nice > state.nice {
698                                 state.ctx.LogD("llp-process", sdsp, "too nice")
699                                 continue
700                         }
701                         state.ctx.LogD("llp-process", sdsp, "received")
702                         if state.xxOnly != nil && *state.xxOnly == TTx {
703                                 continue
704                         }
705                         state.Lock()
706                         state.infosTheir[*info.Hash] = &info
707                         state.Unlock()
708                         state.ctx.LogD("llp-process", sdsp, "stating part")
709                         if _, err = os.Stat(filepath.Join(
710                                 state.ctx.Spool,
711                                 state.NodeId.String(),
712                                 string(TRx),
713                                 ToBase32(info.Hash[:]),
714                         )); err == nil {
715                                 state.ctx.LogD("llp-process", sdsp, "already done")
716                                 replies = append(replies, MarshalLLP(LLPTypeDone, LLPDone{info.Hash}))
717                                 continue
718                         }
719                         fi, err := os.Stat(filepath.Join(
720                                 state.ctx.Spool,
721                                 state.NodeId.String(),
722                                 string(TRx),
723                                 ToBase32(info.Hash[:])+PartSuffix,
724                         ))
725                         var offset int64
726                         if err == nil {
727                                 offset = fi.Size()
728                                 state.ctx.LogD(
729                                         "llp-process",
730                                         SdsAdd(sdsp, SDS{"offset": strconv.FormatInt(offset, 10)}),
731                                         "part exists",
732                                 )
733                         }
734                         replies = append(replies, MarshalLLP(
735                                 LLPTypeFreq,
736                                 LLPFreq{info.Hash, uint64(offset)},
737                         ))
738                 case LLPTypeFile:
739                         state.ctx.LogD(
740                                 "llp-process",
741                                 SdsAdd(sds, SDS{"type": "file"}),
742                                 "unmarshaling packet",
743                         )
744                         var file LLPFile
745                         if _, err = xdr.Unmarshal(r, &file); err != nil {
746                                 state.ctx.LogE("llp-process", SdsAdd(sds, SDS{
747                                         "err":  err,
748                                         "type": "file",
749                                 }), "")
750                                 return nil, err
751                         }
752                         sdsp := SdsAdd(sds, SDS{
753                                 "xx":   string(TRx),
754                                 "hash": ToBase32(file.Hash[:]),
755                                 "size": strconv.Itoa(len(file.Payload)),
756                         })
757                         filePath := filepath.Join(
758                                 state.ctx.Spool,
759                                 state.NodeId.String(),
760                                 string(TRx),
761                                 ToBase32(file.Hash[:]),
762                         )
763                         state.ctx.LogD("llp-file", sdsp, "opening part")
764                         fd, err := os.OpenFile(
765                                 filePath+PartSuffix,
766                                 os.O_RDWR|os.O_CREATE,
767                                 os.FileMode(0600),
768                         )
769                         if err != nil {
770                                 state.ctx.LogE("llp-file", SdsAdd(sdsp, SDS{"err": err}), "")
771                                 return nil, err
772                         }
773                         state.ctx.LogD(
774                                 "llp-file",
775                                 SdsAdd(sdsp, SDS{"offset": strconv.FormatInt(int64(file.Offset), 10)}),
776                                 "seeking",
777                         )
778                         if _, err = fd.Seek(int64(file.Offset), 0); err != nil {
779                                 state.ctx.LogE("llp-file", SdsAdd(sdsp, SDS{"err": err}), "")
780                                 fd.Close()
781                                 return nil, err
782                         }
783                         state.ctx.LogD("llp-file", sdsp, "writing")
784                         _, err = fd.Write(file.Payload)
785                         if err != nil {
786                                 state.ctx.LogE("llp-file", SdsAdd(sdsp, SDS{"err": err}), "")
787                                 fd.Close()
788                                 return nil, err
789                         }
790                         ourSize := uint64(file.Offset) + uint64(len(file.Payload))
791                         sdsp["fullsize"] = strconv.FormatInt(int64(state.infosTheir[*file.Hash].Size), 10)
792                         sdsp["size"] = strconv.FormatInt(int64(ourSize), 10)
793                         state.ctx.LogP("llp-file", sdsp, "")
794                         if state.infosTheir[*file.Hash].Size != ourSize {
795                                 fd.Close()
796                                 continue
797                         }
798                         go func() {
799                                 if err := fd.Sync(); err != nil {
800                                         state.ctx.LogE("llp-file", SdsAdd(sdsp, SDS{"err": err}), "sync")
801                                         fd.Close()
802                                         return
803                                 }
804                                 state.wg.Add(1)
805                                 defer state.wg.Done()
806                                 fd.Seek(0, 0)
807                                 state.ctx.LogD("llp-file", sdsp, "checking")
808                                 gut, err := Check(fd, file.Hash[:])
809                                 fd.Close()
810                                 if err != nil || !gut {
811                                         state.ctx.LogE("llp-file", sdsp, "checksum mismatch")
812                                         return
813                                 }
814                                 state.ctx.LogI("llp-done", SdsAdd(sdsp, SDS{"xx": string(TRx)}), "")
815                                 os.Rename(filePath+PartSuffix, filePath)
816                                 state.payloads <- MarshalLLP(LLPTypeDone, LLPDone{file.Hash})
817                         }()
818                 case LLPTypeDone:
819                         state.ctx.LogD(
820                                 "llp-process",
821                                 SdsAdd(sds, SDS{"type": "done"}),
822                                 "unmarshaling packet",
823                         )
824                         var done LLPDone
825                         if _, err = xdr.Unmarshal(r, &done); err != nil {
826                                 state.ctx.LogE("llp-process", SdsAdd(sds, SDS{
827                                         "type": "done",
828                                         "err":  err,
829                                 }), "")
830                                 return nil, err
831                         }
832                         sdsp := SdsAdd(sds, SDS{"hash": ToBase32(done.Hash[:])})
833                         state.ctx.LogD("llp-done", sdsp, "removing")
834                         err := os.Remove(filepath.Join(
835                                 state.ctx.Spool,
836                                 state.NodeId.String(),
837                                 string(TTx),
838                                 ToBase32(done.Hash[:]),
839                         ))
840                         if err == nil {
841                                 state.ctx.LogI("llp-done", SdsAdd(sdsp, SDS{"xx": string(TTx)}), "")
842                         } else {
843                                 state.ctx.LogE("llp-done", SdsAdd(sdsp, SDS{"xx": string(TTx)}), "")
844                         }
845                 case LLPTypeFreq:
846                         sdsp := SdsAdd(sds, SDS{"type": "freq"})
847                         state.ctx.LogD("llp-process", sdsp, "unmarshaling packet")
848                         var freq LLPFreq
849                         if _, err = xdr.Unmarshal(r, &freq); err != nil {
850                                 state.ctx.LogE("llp-process", SdsAdd(sdsp, SDS{"err": err}), "")
851                                 return nil, err
852                         }
853                         state.ctx.LogD("llp-process", SdsAdd(sdsp, SDS{
854                                 "hash":   ToBase32(freq.Hash[:]),
855                                 "offset": strconv.FormatInt(int64(freq.Offset), 10),
856                         }), "queueing")
857                         state.Lock()
858                         state.queueTheir = append(state.queueTheir, &freq)
859                         state.Unlock()
860                 case LLPTypeHalt:
861                         sdsp := SdsAdd(sds, SDS{"type": "halt"})
862                         state.ctx.LogD("llp-process", sdsp, "")
863                         state.Lock()
864                         state.queueTheir = nil
865                         state.Unlock()
866                 default:
867                         state.ctx.LogE(
868                                 "llp-process",
869                                 SdsAdd(sds, SDS{"type": head.Type}),
870                                 "unknown",
871                         )
872                         return nil, BadPktType
873                 }
874         }
875         if infosGot {
876                 var pkts int
877                 var size uint64
878                 for _, info := range state.infosTheir {
879                         pkts++
880                         size += info.Size
881                 }
882                 state.ctx.LogI("llp-infos", SDS{
883                         "xx":   string(TRx),
884                         "node": state.NodeId,
885                         "pkts": strconv.Itoa(pkts),
886                         "size": strconv.FormatInt(int64(size), 10),
887                 }, "")
888         }
889         return payloadsSplit(replies), nil
890 }