]> Cypherpunks.ru repositories - nncp.git/blob - src/sp.go
More errors checking
[nncp.git] / src / sp.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2020 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bytes"
22         "crypto/subtle"
23         "errors"
24         "io"
25         "os"
26         "path/filepath"
27         "sort"
28         "sync"
29         "time"
30
31         xdr "github.com/davecgh/go-xdr/xdr2"
32         "github.com/flynn/noise"
33 )
34
35 const (
36         MaxSPSize      = 1<<16 - 256
37         PartSuffix     = ".part"
38         SPHeadOverhead = 4
39 )
40
41 var (
42         MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1}
43
44         SPInfoOverhead    int
45         SPFreqOverhead    int
46         SPFileOverhead    int
47         SPHaltMarshalized []byte
48         SPPingMarshalized []byte
49
50         NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite(
51                 noise.DH25519,
52                 noise.CipherChaChaPoly,
53                 noise.HashBLAKE2b,
54         )
55
56         DefaultDeadline = 10 * time.Second
57         PingTimeout     = time.Minute
58
59         spWorkersGroup sync.WaitGroup
60 )
61
62 type SPType uint8
63
64 const (
65         SPTypeInfo SPType = iota
66         SPTypeFreq SPType = iota
67         SPTypeFile SPType = iota
68         SPTypeDone SPType = iota
69         SPTypeHalt SPType = iota
70         SPTypePing SPType = iota
71 )
72
73 type SPHead struct {
74         Type SPType
75 }
76
77 type SPInfo struct {
78         Nice uint8
79         Size uint64
80         Hash *[32]byte
81 }
82
83 type SPFreq struct {
84         Hash   *[32]byte
85         Offset uint64
86 }
87
88 type SPFile struct {
89         Hash    *[32]byte
90         Offset  uint64
91         Payload []byte
92 }
93
94 type SPDone struct {
95         Hash *[32]byte
96 }
97
98 type SPRaw struct {
99         Magic   [8]byte
100         Payload []byte
101 }
102
103 type FreqWithNice struct {
104         freq *SPFreq
105         nice uint8
106 }
107
108 type ConnDeadlined interface {
109         io.ReadWriteCloser
110         SetReadDeadline(t time.Time) error
111         SetWriteDeadline(t time.Time) error
112 }
113
114 func init() {
115         var buf bytes.Buffer
116         spHead := SPHead{Type: SPTypeHalt}
117         if _, err := xdr.Marshal(&buf, spHead); err != nil {
118                 panic(err)
119         }
120         SPHaltMarshalized = make([]byte, SPHeadOverhead)
121         copy(SPHaltMarshalized, buf.Bytes())
122         buf.Reset()
123
124         spHead = SPHead{Type: SPTypePing}
125         if _, err := xdr.Marshal(&buf, spHead); err != nil {
126                 panic(err)
127         }
128         SPPingMarshalized = make([]byte, SPHeadOverhead)
129         copy(SPPingMarshalized, buf.Bytes())
130         buf.Reset()
131
132         spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([32]byte)}
133         if _, err := xdr.Marshal(&buf, spInfo); err != nil {
134                 panic(err)
135         }
136         SPInfoOverhead = buf.Len()
137         buf.Reset()
138
139         spFreq := SPFreq{Hash: new([32]byte), Offset: 123}
140         if _, err := xdr.Marshal(&buf, spFreq); err != nil {
141                 panic(err)
142         }
143         SPFreqOverhead = buf.Len()
144         buf.Reset()
145
146         spFile := SPFile{Hash: new([32]byte), Offset: 123}
147         if _, err := xdr.Marshal(&buf, spFile); err != nil {
148                 panic(err)
149         }
150         SPFileOverhead = buf.Len()
151 }
152
153 func MarshalSP(typ SPType, sp interface{}) []byte {
154         var buf bytes.Buffer
155         if _, err := xdr.Marshal(&buf, SPHead{typ}); err != nil {
156                 panic(err)
157         }
158         if _, err := xdr.Marshal(&buf, sp); err != nil {
159                 panic(err)
160         }
161         return buf.Bytes()
162 }
163
164 func payloadsSplit(payloads [][]byte) [][]byte {
165         var outbounds [][]byte
166         outbound := make([]byte, 0, MaxSPSize)
167         for i, payload := range payloads {
168                 outbound = append(outbound, payload...)
169                 if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxSPSize {
170                         outbounds = append(outbounds, outbound)
171                         outbound = make([]byte, 0, MaxSPSize)
172                 }
173         }
174         if len(outbound) > 0 {
175                 outbounds = append(outbounds, outbound)
176         }
177         return outbounds
178 }
179
180 type SPState struct {
181         Ctx            *Ctx
182         Node           *Node
183         Nice           uint8
184         onlineDeadline time.Duration
185         maxOnlineTime  time.Duration
186         hs             *noise.HandshakeState
187         csOur          *noise.CipherState
188         csTheir        *noise.CipherState
189         payloads       chan []byte
190         pings          chan struct{}
191         infosTheir     map[[32]byte]*SPInfo
192         infosOurSeen   map[[32]byte]uint8
193         queueTheir     []*FreqWithNice
194         wg             sync.WaitGroup
195         RxBytes        int64
196         RxLastSeen     time.Time
197         RxLastNonPing  time.Time
198         TxBytes        int64
199         TxLastSeen     time.Time
200         TxLastNonPing  time.Time
201         started        time.Time
202         mustFinishAt   time.Time
203         Duration       time.Duration
204         RxSpeed        int64
205         TxSpeed        int64
206         rxLock         *os.File
207         txLock         *os.File
208         xxOnly         TRxTx
209         rxRate         int
210         txRate         int
211         isDead         chan struct{}
212         listOnly       bool
213         onlyPkts       map[[32]byte]bool
214         writeSPBuf     bytes.Buffer
215         sync.RWMutex
216 }
217
218 func (state *SPState) SetDead() {
219         state.Lock()
220         defer state.Unlock()
221         select {
222         case <-state.isDead:
223                 // Already closed channel, dead
224                 return
225         default:
226         }
227         close(state.isDead)
228         go func() {
229                 for _ = range state.payloads {
230                 }
231         }()
232         go func() {
233                 for _ = range state.pings {
234                 }
235         }()
236 }
237
238 func (state *SPState) NotAlive() bool {
239         select {
240         case <-state.isDead:
241                 return true
242         default:
243         }
244         return false
245 }
246
247 func (state *SPState) dirUnlock() {
248         state.Ctx.UnlockDir(state.rxLock)
249         state.Ctx.UnlockDir(state.txLock)
250 }
251
252 func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error {
253         state.writeSPBuf.Reset()
254         n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{
255                 Magic:   MagicNNCPLv1,
256                 Payload: payload,
257         })
258         if err != nil {
259                 return err
260         }
261         if n, err = dst.Write(state.writeSPBuf.Bytes()); err == nil {
262                 state.TxLastSeen = time.Now()
263                 state.TxBytes += int64(n)
264                 if !ping {
265                         state.TxLastNonPing = state.TxLastSeen
266                 }
267         }
268         return err
269 }
270
271 func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
272         var sp SPRaw
273         n, err := xdr.UnmarshalLimited(src, &sp, 1<<17)
274         if err != nil {
275                 ue := err.(*xdr.UnmarshalError)
276                 if ue.Err == io.EOF {
277                         return nil, ue.Err
278                 }
279                 return nil, err
280         }
281         state.RxLastSeen = time.Now()
282         state.RxBytes += int64(n)
283         if sp.Magic != MagicNNCPLv1 {
284                 return nil, BadMagic
285         }
286         return sp.Payload, nil
287 }
288
289 func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [][]byte {
290         var infos []*SPInfo
291         var totalSize int64
292         for job := range ctx.Jobs(nodeId, TTx) {
293                 job.Fd.Close() // #nosec G104
294                 if job.PktEnc.Nice > nice {
295                         continue
296                 }
297                 if _, known := (*seen)[*job.HshValue]; known {
298                         continue
299                 }
300                 totalSize += job.Size
301                 infos = append(infos, &SPInfo{
302                         Nice: job.PktEnc.Nice,
303                         Size: uint64(job.Size),
304                         Hash: job.HshValue,
305                 })
306                 (*seen)[*job.HshValue] = job.PktEnc.Nice
307         }
308         sort.Sort(ByNice(infos))
309         var payloads [][]byte
310         for _, info := range infos {
311                 payloads = append(payloads, MarshalSP(SPTypeInfo, info))
312                 ctx.LogD("sp-info-our", SDS{
313                         "node": nodeId,
314                         "name": Base32Codec.EncodeToString(info.Hash[:]),
315                         "size": info.Size,
316                 }, "")
317         }
318         if totalSize > 0 {
319                 ctx.LogI("sp-infos", SDS{
320                         "xx":   string(TTx),
321                         "node": nodeId,
322                         "pkts": len(payloads),
323                         "size": totalSize,
324                 }, "")
325         }
326         return payloadsSplit(payloads)
327 }
328
329 func (state *SPState) StartI(conn ConnDeadlined) error {
330         nodeId := state.Node.Id
331         err := state.Ctx.ensureRxDir(nodeId)
332         if err != nil {
333                 return err
334         }
335         var rxLock *os.File
336         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TRx) {
337                 rxLock, err = state.Ctx.LockDir(nodeId, string(TRx))
338                 if err != nil {
339                         return err
340                 }
341         }
342         var txLock *os.File
343         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
344                 txLock, err = state.Ctx.LockDir(nodeId, string(TTx))
345                 if err != nil {
346                         return err
347                 }
348         }
349         started := time.Now()
350         conf := noise.Config{
351                 CipherSuite: NoiseCipherSuite,
352                 Pattern:     noise.HandshakeIK,
353                 Initiator:   true,
354                 StaticKeypair: noise.DHKey{
355                         Private: state.Ctx.Self.NoisePrv[:],
356                         Public:  state.Ctx.Self.NoisePub[:],
357                 },
358                 PeerStatic: state.Node.NoisePub[:],
359         }
360         hs, err := noise.NewHandshakeState(conf)
361         if err != nil {
362                 return err
363         }
364         state.hs = hs
365         state.payloads = make(chan []byte)
366         state.pings = make(chan struct{})
367         state.infosTheir = make(map[[32]byte]*SPInfo)
368         state.infosOurSeen = make(map[[32]byte]uint8)
369         state.started = started
370         state.rxLock = rxLock
371         state.txLock = txLock
372
373         var infosPayloads [][]byte
374         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
375                 infosPayloads = state.Ctx.infosOur(nodeId, state.Nice, &state.infosOurSeen)
376         }
377         var firstPayload []byte
378         if len(infosPayloads) > 0 {
379                 firstPayload = infosPayloads[0]
380         }
381         // Pad first payload, to hide actual number of existing files
382         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
383                 firstPayload = append(firstPayload, SPHaltMarshalized...)
384         }
385
386         var buf []byte
387         var payload []byte
388         buf, _, _, err = state.hs.WriteMessage(nil, firstPayload)
389         if err != nil {
390                 state.dirUnlock()
391                 return err
392         }
393         sds := SDS{"node": nodeId, "nice": int(state.Nice)}
394         state.Ctx.LogD("sp-start", sds, "sending first message")
395         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
396         if err = state.WriteSP(conn, buf, false); err != nil {
397                 state.Ctx.LogE("sp-start", sds, err, "")
398                 state.dirUnlock()
399                 return err
400         }
401         state.Ctx.LogD("sp-start", sds, "waiting for first message")
402         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
403         if buf, err = state.ReadSP(conn); err != nil {
404                 state.Ctx.LogE("sp-start", sds, err, "")
405                 state.dirUnlock()
406                 return err
407         }
408         payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
409         if err != nil {
410                 state.Ctx.LogE("sp-start", sds, err, "")
411                 state.dirUnlock()
412                 return err
413         }
414         state.Ctx.LogD("sp-start", sds, "starting workers")
415         err = state.StartWorkers(conn, infosPayloads, payload)
416         if err != nil {
417                 state.Ctx.LogE("sp-start", sds, err, "")
418                 state.dirUnlock()
419         }
420         return err
421 }
422
423 func (state *SPState) StartR(conn ConnDeadlined) error {
424         started := time.Now()
425         conf := noise.Config{
426                 CipherSuite: NoiseCipherSuite,
427                 Pattern:     noise.HandshakeIK,
428                 Initiator:   false,
429                 StaticKeypair: noise.DHKey{
430                         Private: state.Ctx.Self.NoisePrv[:],
431                         Public:  state.Ctx.Self.NoisePub[:],
432                 },
433         }
434         hs, err := noise.NewHandshakeState(conf)
435         if err != nil {
436                 return err
437         }
438         xxOnly := TRxTx("")
439         state.hs = hs
440         state.payloads = make(chan []byte)
441         state.pings = make(chan struct{})
442         state.infosOurSeen = make(map[[32]byte]uint8)
443         state.infosTheir = make(map[[32]byte]*SPInfo)
444         state.started = started
445         state.xxOnly = xxOnly
446         var buf []byte
447         var payload []byte
448         state.Ctx.LogD("sp-start", SDS{"nice": int(state.Nice)}, "waiting for first message")
449         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
450         if buf, err = state.ReadSP(conn); err != nil {
451                 state.Ctx.LogE("sp-start", SDS{}, err, "")
452                 return err
453         }
454         if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
455                 state.Ctx.LogE("sp-start", SDS{}, err, "")
456                 return err
457         }
458
459         var node *Node
460         for _, n := range state.Ctx.Neigh {
461                 if subtle.ConstantTimeCompare(state.hs.PeerStatic(), n.NoisePub[:]) == 1 {
462                         node = n
463                         break
464                 }
465         }
466         if node == nil {
467                 peerId := Base32Codec.EncodeToString(state.hs.PeerStatic())
468                 state.Ctx.LogE("sp-start", SDS{"peer": peerId}, errors.New("unknown"), "")
469                 return errors.New("Unknown peer: " + peerId)
470         }
471         state.Node = node
472         state.rxRate = node.RxRate
473         state.txRate = node.TxRate
474         state.onlineDeadline = node.OnlineDeadline
475         state.maxOnlineTime = node.MaxOnlineTime
476         sds := SDS{"node": node.Id, "nice": int(state.Nice)}
477
478         if err = state.Ctx.ensureRxDir(node.Id); err != nil {
479                 return err
480         }
481         var rxLock *os.File
482         if xxOnly == "" || xxOnly == TRx {
483                 rxLock, err = state.Ctx.LockDir(node.Id, string(TRx))
484                 if err != nil {
485                         return err
486                 }
487         }
488         state.rxLock = rxLock
489         var txLock *os.File
490         if xxOnly == "" || xxOnly == TTx {
491                 txLock, err = state.Ctx.LockDir(node.Id, string(TTx))
492                 if err != nil {
493                         return err
494                 }
495         }
496         state.txLock = txLock
497
498         var infosPayloads [][]byte
499         if xxOnly == "" || xxOnly == TTx {
500                 infosPayloads = state.Ctx.infosOur(node.Id, state.Nice, &state.infosOurSeen)
501         }
502         var firstPayload []byte
503         if len(infosPayloads) > 0 {
504                 firstPayload = infosPayloads[0]
505         }
506         // Pad first payload, to hide actual number of existing files
507         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
508                 firstPayload = append(firstPayload, SPHaltMarshalized...)
509         }
510
511         state.Ctx.LogD("sp-start", sds, "sending first message")
512         buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload)
513         if err != nil {
514                 state.dirUnlock()
515                 return err
516         }
517         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
518         if err = state.WriteSP(conn, buf, false); err != nil {
519                 state.Ctx.LogE("sp-start", sds, err, "")
520                 state.dirUnlock()
521                 return err
522         }
523         state.Ctx.LogD("sp-start", sds, "starting workers")
524         err = state.StartWorkers(conn, infosPayloads, payload)
525         if err != nil {
526                 state.dirUnlock()
527         }
528         return err
529 }
530
531 func (state *SPState) StartWorkers(
532         conn ConnDeadlined,
533         infosPayloads [][]byte,
534         payload []byte,
535 ) error {
536         sds := SDS{"node": state.Node.Id, "nice": int(state.Nice)}
537         state.isDead = make(chan struct{})
538         if state.maxOnlineTime > 0 {
539                 state.mustFinishAt = state.started.Add(state.maxOnlineTime)
540         }
541
542         // Remaining handshake payload sending
543         if len(infosPayloads) > 1 {
544                 state.wg.Add(1)
545                 go func() {
546                         for _, payload := range infosPayloads[1:] {
547                                 state.Ctx.LogD(
548                                         "sp-work",
549                                         SdsAdd(sds, SDS{"size": len(payload)}),
550                                         "queuing remaining payload",
551                                 )
552                                 state.payloads <- payload
553                         }
554                         state.wg.Done()
555                 }()
556         }
557
558         // Processing of first payload and queueing its responses
559         state.Ctx.LogD(
560                 "sp-work",
561                 SdsAdd(sds, SDS{"size": len(payload)}),
562                 "processing first payload",
563         )
564         replies, err := state.ProcessSP(payload)
565         if err != nil {
566                 state.Ctx.LogE("sp-work", sds, err, "")
567                 return err
568         }
569         state.wg.Add(1)
570         go func() {
571                 for _, reply := range replies {
572                         state.Ctx.LogD(
573                                 "sp-work",
574                                 SdsAdd(sds, SDS{"size": len(reply)}),
575                                 "queuing reply",
576                         )
577                         state.payloads <- reply
578                 }
579                 state.wg.Done()
580         }()
581
582         // Periodic jobs
583         state.wg.Add(1)
584         go func() {
585                 deadlineTicker := time.NewTicker(time.Second)
586                 pingTicker := time.NewTicker(PingTimeout)
587                 for {
588                         select {
589                         case <-state.isDead:
590                                 state.wg.Done()
591                                 deadlineTicker.Stop()
592                                 pingTicker.Stop()
593                                 return
594                         case now := <-deadlineTicker.C:
595                                 if (now.Sub(state.RxLastNonPing) >= state.onlineDeadline &&
596                                         now.Sub(state.TxLastNonPing) >= state.onlineDeadline) ||
597                                         (state.maxOnlineTime > 0 && state.mustFinishAt.Before(now)) ||
598                                         (now.Sub(state.RxLastSeen) >= 2*PingTimeout) {
599                                         state.SetDead()
600                                         conn.Close() // #nosec G104
601                                 }
602                         case now := <-pingTicker.C:
603                                 if now.After(state.TxLastSeen.Add(PingTimeout)) {
604                                         state.wg.Add(1)
605                                         go func() {
606                                                 state.pings <- struct{}{}
607                                                 state.wg.Done()
608                                         }()
609                                 }
610                         }
611                 }
612         }()
613
614         // Spool checker and INFOs sender of appearing files
615         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
616                 state.wg.Add(1)
617                 go func() {
618                         ticker := time.NewTicker(time.Second)
619                         for {
620                                 select {
621                                 case <-state.isDead:
622                                         state.wg.Done()
623                                         ticker.Stop()
624                                         return
625                                 case <-ticker.C:
626                                         for _, payload := range state.Ctx.infosOur(
627                                                 state.Node.Id,
628                                                 state.Nice,
629                                                 &state.infosOurSeen,
630                                         ) {
631                                                 state.Ctx.LogD(
632                                                         "sp-work",
633                                                         SdsAdd(sds, SDS{"size": len(payload)}),
634                                                         "queuing new info",
635                                                 )
636                                                 state.payloads <- payload
637                                         }
638                                 }
639                         }
640                 }()
641         }
642
643         // Sender
644         state.wg.Add(1)
645         go func() {
646                 defer conn.Close()
647                 defer state.SetDead()
648                 defer state.wg.Done()
649                 for {
650                         if state.NotAlive() {
651                                 return
652                         }
653                         var payload []byte
654                         var ping bool
655                         select {
656                         case <-state.pings:
657                                 state.Ctx.LogD("sp-xmit", sds, "got ping")
658                                 payload = SPPingMarshalized
659                                 ping = true
660                         case payload = <-state.payloads:
661                                 state.Ctx.LogD(
662                                         "sp-xmit",
663                                         SdsAdd(sds, SDS{"size": len(payload)}),
664                                         "got payload",
665                                 )
666                         default:
667                                 state.RLock()
668                                 if len(state.queueTheir) == 0 {
669                                         state.RUnlock()
670                                         time.Sleep(100 * time.Millisecond)
671                                         continue
672                                 }
673                                 freq := state.queueTheir[0].freq
674                                 state.RUnlock()
675                                 if state.txRate > 0 {
676                                         time.Sleep(time.Second / time.Duration(state.txRate))
677                                 }
678                                 sdsp := SdsAdd(sds, SDS{
679                                         "xx":   string(TTx),
680                                         "pkt":  Base32Codec.EncodeToString(freq.Hash[:]),
681                                         "size": int64(freq.Offset),
682                                 })
683                                 state.Ctx.LogD("sp-file", sdsp, "queueing")
684                                 fd, err := os.Open(filepath.Join(
685                                         state.Ctx.Spool,
686                                         state.Node.Id.String(),
687                                         string(TTx),
688                                         Base32Codec.EncodeToString(freq.Hash[:]),
689                                 ))
690                                 if err != nil {
691                                         state.Ctx.LogE("sp-file", sdsp, err, "")
692                                         return
693                                 }
694                                 fi, err := fd.Stat()
695                                 if err != nil {
696                                         state.Ctx.LogE("sp-file", sdsp, err, "")
697                                         return
698                                 }
699                                 fullSize := fi.Size()
700                                 var buf []byte
701                                 if freq.Offset < uint64(fullSize) {
702                                         state.Ctx.LogD("sp-file", sdsp, "seeking")
703                                         if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil {
704                                                 state.Ctx.LogE("sp-file", sdsp, err, "")
705                                                 return
706                                         }
707                                         buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
708                                         n, err := fd.Read(buf)
709                                         if err != nil {
710                                                 state.Ctx.LogE("sp-file", sdsp, err, "")
711                                                 return
712                                         }
713                                         buf = buf[:n]
714                                         state.Ctx.LogD("sp-file", SdsAdd(sdsp, SDS{"size": n}), "read")
715                                 }
716                                 fd.Close() // #nosec G104
717                                 payload = MarshalSP(SPTypeFile, SPFile{
718                                         Hash:    freq.Hash,
719                                         Offset:  freq.Offset,
720                                         Payload: buf,
721                                 })
722                                 ourSize := freq.Offset + uint64(len(buf))
723                                 sdsp["size"] = int64(ourSize)
724                                 sdsp["fullsize"] = fullSize
725                                 if state.Ctx.ShowPrgrs {
726                                         Progress("Tx", sdsp)
727                                 }
728                                 state.Lock()
729                                 if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash {
730                                         if ourSize == uint64(fullSize) {
731                                                 state.Ctx.LogD("sp-file", sdsp, "finished")
732                                                 if len(state.queueTheir) > 1 {
733                                                         state.queueTheir = state.queueTheir[1:]
734                                                 } else {
735                                                         state.queueTheir = state.queueTheir[:0]
736                                                 }
737                                         } else {
738                                                 state.queueTheir[0].freq.Offset += uint64(len(buf))
739                                         }
740                                 } else {
741                                         state.Ctx.LogD("sp-file", sdsp, "queue disappeared")
742                                 }
743                                 state.Unlock()
744                         }
745                         state.Ctx.LogD("sp-xmit", SdsAdd(sds, SDS{"size": len(payload)}), "sending")
746                         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
747                         if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload), ping); err != nil {
748                                 state.Ctx.LogE("sp-xmit", sds, err, "")
749                                 return
750                         }
751                 }
752         }()
753
754         // Receiver
755         state.wg.Add(1)
756         go func() {
757                 for {
758                         if state.NotAlive() {
759                                 break
760                         }
761                         state.Ctx.LogD("sp-recv", sds, "waiting for payload")
762                         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
763                         payload, err := state.ReadSP(conn)
764                         if err != nil {
765                                 if err == io.EOF {
766                                         break
767                                 }
768                                 unmarshalErr := err.(*xdr.UnmarshalError)
769                                 if os.IsTimeout(unmarshalErr.Err) {
770                                         continue
771                                 }
772                                 if unmarshalErr.ErrorCode == xdr.ErrIO {
773                                         break
774                                 }
775                                 state.Ctx.LogE("sp-recv", sds, err, "")
776                                 break
777                         }
778                         state.Ctx.LogD(
779                                 "sp-recv",
780                                 SdsAdd(sds, SDS{"size": len(payload)}),
781                                 "got payload",
782                         )
783                         payload, err = state.csTheir.Decrypt(nil, nil, payload)
784                         if err != nil {
785                                 state.Ctx.LogE("sp-recv", sds, err, "")
786                                 break
787                         }
788                         state.Ctx.LogD(
789                                 "sp-recv",
790                                 SdsAdd(sds, SDS{"size": len(payload)}),
791                                 "processing",
792                         )
793                         replies, err := state.ProcessSP(payload)
794                         if err != nil {
795                                 state.Ctx.LogE("sp-recv", sds, err, "")
796                                 break
797                         }
798                         state.wg.Add(1)
799                         go func() {
800                                 for _, reply := range replies {
801                                         state.Ctx.LogD(
802                                                 "sp-recv",
803                                                 SdsAdd(sds, SDS{"size": len(reply)}),
804                                                 "queuing reply",
805                                         )
806                                         state.payloads <- reply
807                                 }
808                                 state.wg.Done()
809                         }()
810                         if state.rxRate > 0 {
811                                 time.Sleep(time.Second / time.Duration(state.rxRate))
812                         }
813                 }
814                 state.SetDead()
815                 state.wg.Done()
816                 state.SetDead()
817                 conn.Close() // #nosec G104
818         }()
819
820         return nil
821 }
822
823 func (state *SPState) Wait() {
824         state.wg.Wait()
825         close(state.payloads)
826         close(state.pings)
827         state.dirUnlock()
828         state.Duration = time.Now().Sub(state.started)
829         state.RxSpeed = state.RxBytes
830         state.TxSpeed = state.TxBytes
831         rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
832         txDuration := int64(state.TxLastSeen.Sub(state.started).Seconds())
833         if rxDuration > 0 {
834                 state.RxSpeed = state.RxBytes / rxDuration
835         }
836         if txDuration > 0 {
837                 state.TxSpeed = state.TxBytes / txDuration
838         }
839 }
840
841 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
842         sds := SDS{"node": state.Node.Id, "nice": int(state.Nice)}
843         r := bytes.NewReader(payload)
844         var err error
845         var replies [][]byte
846         var infosGot bool
847         for r.Len() > 0 {
848                 state.Ctx.LogD("sp-process", sds, "unmarshaling header")
849                 var head SPHead
850                 if _, err = xdr.Unmarshal(r, &head); err != nil {
851                         state.Ctx.LogE("sp-process", sds, err, "")
852                         return nil, err
853                 }
854                 if head.Type != SPTypePing {
855                         state.RxLastNonPing = state.RxLastSeen
856                 }
857                 switch head.Type {
858                 case SPTypeHalt:
859                         state.Ctx.LogD("sp-process", SdsAdd(sds, SDS{"type": "halt"}), "")
860                         state.Lock()
861                         state.queueTheir = nil
862                         state.Unlock()
863                 case SPTypePing:
864                         state.Ctx.LogD("sp-process", SdsAdd(sds, SDS{"type": "ping"}), "")
865                 case SPTypeInfo:
866                         infosGot = true
867                         sdsp := SdsAdd(sds, SDS{"type": "info"})
868                         state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet")
869                         var info SPInfo
870                         if _, err = xdr.Unmarshal(r, &info); err != nil {
871                                 state.Ctx.LogE("sp-process", sdsp, err, "")
872                                 return nil, err
873                         }
874                         sdsp = SdsAdd(sds, SDS{
875                                 "pkt":  Base32Codec.EncodeToString(info.Hash[:]),
876                                 "size": int64(info.Size),
877                                 "nice": int(info.Nice),
878                         })
879                         if !state.listOnly && info.Nice > state.Nice {
880                                 state.Ctx.LogD("sp-process", sdsp, "too nice")
881                                 continue
882                         }
883                         state.Ctx.LogD("sp-process", sdsp, "received")
884                         if !state.listOnly && state.xxOnly == TTx {
885                                 continue
886                         }
887                         state.Lock()
888                         state.infosTheir[*info.Hash] = &info
889                         state.Unlock()
890                         state.Ctx.LogD("sp-process", sdsp, "stating part")
891                         pktPath := filepath.Join(
892                                 state.Ctx.Spool,
893                                 state.Node.Id.String(),
894                                 string(TRx),
895                                 Base32Codec.EncodeToString(info.Hash[:]),
896                         )
897                         if _, err = os.Stat(pktPath); err == nil {
898                                 state.Ctx.LogI("sp-info", sdsp, "already done")
899                                 if !state.listOnly {
900                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
901                                 }
902                                 continue
903                         }
904                         if _, err = os.Stat(pktPath + SeenSuffix); err == nil {
905                                 state.Ctx.LogI("sp-info", sdsp, "already seen")
906                                 if !state.listOnly {
907                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
908                                 }
909                                 continue
910                         }
911                         fi, err := os.Stat(pktPath + PartSuffix)
912                         var offset int64
913                         if err == nil {
914                                 offset = fi.Size()
915                         }
916                         if !state.Ctx.IsEnoughSpace(int64(info.Size) - offset) {
917                                 state.Ctx.LogI("sp-info", sdsp, "not enough space")
918                                 continue
919                         }
920                         state.Ctx.LogI(
921                                 "sp-info",
922                                 SdsAdd(sdsp, SDS{"offset": offset}),
923                                 "",
924                         )
925                         if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) {
926                                 replies = append(replies, MarshalSP(
927                                         SPTypeFreq,
928                                         SPFreq{info.Hash, uint64(offset)},
929                                 ))
930                         }
931                 case SPTypeFile:
932                         sdsp := SdsAdd(sds, SDS{"type": "file"})
933                         state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet")
934                         var file SPFile
935                         if _, err = xdr.Unmarshal(r, &file); err != nil {
936                                 state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{"type": "file"}), err, "")
937                                 return nil, err
938                         }
939                         sdsp["xx"] = string(TRx)
940                         sdsp["pkt"] = Base32Codec.EncodeToString(file.Hash[:])
941                         sdsp["size"] = len(file.Payload)
942                         dirToSync := filepath.Join(
943                                 state.Ctx.Spool,
944                                 state.Node.Id.String(),
945                                 string(TRx),
946                         )
947                         filePath := filepath.Join(dirToSync, Base32Codec.EncodeToString(file.Hash[:]))
948                         state.Ctx.LogD("sp-file", sdsp, "opening part")
949                         fd, err := os.OpenFile(
950                                 filePath+PartSuffix,
951                                 os.O_RDWR|os.O_CREATE,
952                                 os.FileMode(0666),
953                         )
954                         if err != nil {
955                                 state.Ctx.LogE("sp-file", sdsp, err, "")
956                                 return nil, err
957                         }
958                         state.Ctx.LogD(
959                                 "sp-file",
960                                 SdsAdd(sdsp, SDS{"offset": file.Offset}),
961                                 "seeking",
962                         )
963                         if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
964                                 state.Ctx.LogE("sp-file", sdsp, err, "")
965                                 fd.Close() // #nosec G104
966                                 return nil, err
967                         }
968                         state.Ctx.LogD("sp-file", sdsp, "writing")
969                         _, err = fd.Write(file.Payload)
970                         if err != nil {
971                                 state.Ctx.LogE("sp-file", sdsp, err, "")
972                                 fd.Close() // #nosec G104
973                                 return nil, err
974                         }
975                         ourSize := int64(file.Offset + uint64(len(file.Payload)))
976                         sdsp["size"] = ourSize
977                         fullsize := int64(0)
978                         state.RLock()
979                         infoTheir, ok := state.infosTheir[*file.Hash]
980                         state.RUnlock()
981                         if ok {
982                                 fullsize = int64(infoTheir.Size)
983                         }
984                         sdsp["fullsize"] = fullsize
985                         if state.Ctx.ShowPrgrs {
986                                 Progress("Rx", sdsp)
987                         }
988                         if fullsize != ourSize {
989                                 fd.Close() // #nosec G104
990                                 continue
991                         }
992                         spWorkersGroup.Wait()
993                         spWorkersGroup.Add(1)
994                         go func() {
995                                 if err := fd.Sync(); err != nil {
996                                         state.Ctx.LogE("sp-file", sdsp, err, "sync")
997                                         fd.Close() // #nosec G104
998                                         return
999                                 }
1000                                 state.wg.Add(1)
1001                                 defer state.wg.Done()
1002                                 if _, err = fd.Seek(0, io.SeekStart); err != nil {
1003                                         fd.Close() // #nosec G104
1004                                         state.Ctx.LogE("sp-file", sdsp, err, "")
1005                                         return
1006                                 }
1007                                 state.Ctx.LogD("sp-file", sdsp, "checking")
1008                                 gut, err := Check(fd, file.Hash[:], sdsp, state.Ctx.ShowPrgrs)
1009                                 fd.Close() // #nosec G104
1010                                 if err != nil || !gut {
1011                                         state.Ctx.LogE("sp-file", sdsp, errors.New("checksum mismatch"), "")
1012                                         return
1013                                 }
1014                                 state.Ctx.LogI("sp-done", SdsAdd(sdsp, SDS{"xx": string(TRx)}), "")
1015                                 if err = os.Rename(filePath+PartSuffix, filePath); err != nil {
1016                                         state.Ctx.LogE("sp-file", sdsp, err, "rename")
1017                                         return
1018                                 }
1019                                 if err = DirSync(dirToSync); err != nil {
1020                                         state.Ctx.LogE("sp-file", sdsp, err, "sync")
1021                                         return
1022                                 }
1023                                 state.Lock()
1024                                 delete(state.infosTheir, *file.Hash)
1025                                 state.Unlock()
1026                                 spWorkersGroup.Done()
1027                                 state.wg.Add(1)
1028                                 go func() {
1029                                         state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
1030                                         state.wg.Done()
1031                                 }()
1032                         }()
1033                 case SPTypeDone:
1034                         sdsp := SdsAdd(sds, SDS{"type": "done"})
1035                         state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet")
1036                         var done SPDone
1037                         if _, err = xdr.Unmarshal(r, &done); err != nil {
1038                                 state.Ctx.LogE("sp-process", SdsAdd(sds, SDS{"type": "done"}), err, "")
1039                                 return nil, err
1040                         }
1041                         sdsp["pkt"] = Base32Codec.EncodeToString(done.Hash[:])
1042                         state.Ctx.LogD("sp-done", sdsp, "removing")
1043                         err := os.Remove(filepath.Join(
1044                                 state.Ctx.Spool,
1045                                 state.Node.Id.String(),
1046                                 string(TTx),
1047                                 Base32Codec.EncodeToString(done.Hash[:]),
1048                         ))
1049                         sdsp["xx"] = string(TTx)
1050                         if err == nil {
1051                                 state.Ctx.LogI("sp-done", sdsp, "")
1052                         } else {
1053                                 state.Ctx.LogE("sp-done", sdsp, err, "")
1054                         }
1055                 case SPTypeFreq:
1056                         sdsp := SdsAdd(sds, SDS{"type": "freq"})
1057                         state.Ctx.LogD("sp-process", sdsp, "unmarshaling packet")
1058                         var freq SPFreq
1059                         if _, err = xdr.Unmarshal(r, &freq); err != nil {
1060                                 state.Ctx.LogE("sp-process", sdsp, err, "")
1061                                 return nil, err
1062                         }
1063                         sdsp["pkt"] = Base32Codec.EncodeToString(freq.Hash[:])
1064                         sdsp["offset"] = freq.Offset
1065                         state.Ctx.LogD("sp-process", sdsp, "queueing")
1066                         nice, exists := state.infosOurSeen[*freq.Hash]
1067                         if exists {
1068                                 if state.onlyPkts == nil || !state.onlyPkts[*freq.Hash] {
1069                                         state.Lock()
1070                                         insertIdx := 0
1071                                         var freqWithNice *FreqWithNice
1072                                         for insertIdx, freqWithNice = range state.queueTheir {
1073                                                 if freqWithNice.nice > nice {
1074                                                         break
1075                                                 }
1076                                         }
1077                                         state.queueTheir = append(state.queueTheir, nil)
1078                                         copy(state.queueTheir[insertIdx+1:], state.queueTheir[insertIdx:])
1079                                         state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice}
1080                                         state.Unlock()
1081                                 } else {
1082                                         state.Ctx.LogD("sp-process", sdsp, "skipping")
1083                                 }
1084                         } else {
1085                                 state.Ctx.LogD("sp-process", sdsp, "unknown")
1086                         }
1087                 default:
1088                         state.Ctx.LogE(
1089                                 "sp-process",
1090                                 SdsAdd(sds, SDS{"type": head.Type}),
1091                                 errors.New("unknown type"),
1092                                 "",
1093                         )
1094                         return nil, BadPktType
1095                 }
1096         }
1097         if infosGot {
1098                 var pkts int
1099                 var size uint64
1100                 state.RLock()
1101                 for _, info := range state.infosTheir {
1102                         pkts++
1103                         size += info.Size
1104                 }
1105                 state.RUnlock()
1106                 state.Ctx.LogI("sp-infos", SDS{
1107                         "xx":   string(TRx),
1108                         "node": state.Node.Id,
1109                         "pkts": pkts,
1110                         "size": int64(size),
1111                 }, "")
1112         }
1113         return payloadsSplit(replies), nil
1114 }