]> Cypherpunks.ru repositories - nncp.git/blob - src/sp.go
SPCheckers are always running
[nncp.git] / src / sp.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bytes"
22         "crypto/subtle"
23         "errors"
24         "hash"
25         "io"
26         "os"
27         "path/filepath"
28         "sort"
29         "sync"
30         "time"
31
32         xdr "github.com/davecgh/go-xdr/xdr2"
33         "github.com/flynn/noise"
34         "golang.org/x/crypto/blake2b"
35 )
36
37 const (
38         MaxSPSize      = 1<<16 - 256
39         PartSuffix     = ".part"
40         SPHeadOverhead = 4
41 )
42
43 type SPCheckerQueues struct {
44         appeared chan *[32]byte
45         checked  chan *[32]byte
46 }
47
48 var (
49         MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1}
50
51         SPInfoOverhead    int
52         SPFreqOverhead    int
53         SPFileOverhead    int
54         SPHaltMarshalized []byte
55         SPPingMarshalized []byte
56
57         NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite(
58                 noise.DH25519,
59                 noise.CipherChaChaPoly,
60                 noise.HashBLAKE2b,
61         )
62
63         DefaultDeadline = 10 * time.Second
64         PingTimeout     = time.Minute
65
66         spCheckers = make(map[NodeId]*SPCheckerQueues)
67 )
68
69 type FdAndFullSize struct {
70         fd       *os.File
71         fullSize int64
72 }
73
74 type HasherAndOffset struct {
75         h      hash.Hash
76         offset uint64
77 }
78
79 type SPType uint8
80
81 const (
82         SPTypeInfo SPType = iota
83         SPTypeFreq SPType = iota
84         SPTypeFile SPType = iota
85         SPTypeDone SPType = iota
86         SPTypeHalt SPType = iota
87         SPTypePing SPType = iota
88 )
89
90 type SPHead struct {
91         Type SPType
92 }
93
94 type SPInfo struct {
95         Nice uint8
96         Size uint64
97         Hash *[32]byte
98 }
99
100 type SPFreq struct {
101         Hash   *[32]byte
102         Offset uint64
103 }
104
105 type SPFile struct {
106         Hash    *[32]byte
107         Offset  uint64
108         Payload []byte
109 }
110
111 type SPDone struct {
112         Hash *[32]byte
113 }
114
115 type SPRaw struct {
116         Magic   [8]byte
117         Payload []byte
118 }
119
120 type FreqWithNice struct {
121         freq *SPFreq
122         nice uint8
123 }
124
125 type ConnDeadlined interface {
126         io.ReadWriteCloser
127         SetReadDeadline(t time.Time) error
128         SetWriteDeadline(t time.Time) error
129 }
130
131 func init() {
132         var buf bytes.Buffer
133         spHead := SPHead{Type: SPTypeHalt}
134         if _, err := xdr.Marshal(&buf, spHead); err != nil {
135                 panic(err)
136         }
137         SPHaltMarshalized = make([]byte, SPHeadOverhead)
138         copy(SPHaltMarshalized, buf.Bytes())
139         buf.Reset()
140
141         spHead = SPHead{Type: SPTypePing}
142         if _, err := xdr.Marshal(&buf, spHead); err != nil {
143                 panic(err)
144         }
145         SPPingMarshalized = make([]byte, SPHeadOverhead)
146         copy(SPPingMarshalized, buf.Bytes())
147         buf.Reset()
148
149         spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([32]byte)}
150         if _, err := xdr.Marshal(&buf, spInfo); err != nil {
151                 panic(err)
152         }
153         SPInfoOverhead = buf.Len()
154         buf.Reset()
155
156         spFreq := SPFreq{Hash: new([32]byte), Offset: 123}
157         if _, err := xdr.Marshal(&buf, spFreq); err != nil {
158                 panic(err)
159         }
160         SPFreqOverhead = buf.Len()
161         buf.Reset()
162
163         spFile := SPFile{Hash: new([32]byte), Offset: 123}
164         if _, err := xdr.Marshal(&buf, spFile); err != nil {
165                 panic(err)
166         }
167         SPFileOverhead = buf.Len()
168 }
169
170 func MarshalSP(typ SPType, sp interface{}) []byte {
171         var buf bytes.Buffer
172         if _, err := xdr.Marshal(&buf, SPHead{typ}); err != nil {
173                 panic(err)
174         }
175         if _, err := xdr.Marshal(&buf, sp); err != nil {
176                 panic(err)
177         }
178         return buf.Bytes()
179 }
180
181 func payloadsSplit(payloads [][]byte) [][]byte {
182         var outbounds [][]byte
183         outbound := make([]byte, 0, MaxSPSize)
184         for i, payload := range payloads {
185                 outbound = append(outbound, payload...)
186                 if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxSPSize {
187                         outbounds = append(outbounds, outbound)
188                         outbound = make([]byte, 0, MaxSPSize)
189                 }
190         }
191         if len(outbound) > 0 {
192                 outbounds = append(outbounds, outbound)
193         }
194         return outbounds
195 }
196
197 type SPState struct {
198         Ctx            *Ctx
199         Node           *Node
200         Nice           uint8
201         NoCK           bool
202         onlineDeadline time.Duration
203         maxOnlineTime  time.Duration
204         hs             *noise.HandshakeState
205         csOur          *noise.CipherState
206         csTheir        *noise.CipherState
207         payloads       chan []byte
208         pings          chan struct{}
209         infosTheir     map[[32]byte]*SPInfo
210         infosOurSeen   map[[32]byte]uint8
211         queueTheir     []*FreqWithNice
212         wg             sync.WaitGroup
213         RxBytes        int64
214         RxLastSeen     time.Time
215         RxLastNonPing  time.Time
216         TxBytes        int64
217         TxLastSeen     time.Time
218         TxLastNonPing  time.Time
219         started        time.Time
220         mustFinishAt   time.Time
221         Duration       time.Duration
222         RxSpeed        int64
223         TxSpeed        int64
224         rxLock         *os.File
225         txLock         *os.File
226         xxOnly         TRxTx
227         rxRate         int
228         txRate         int
229         isDead         chan struct{}
230         listOnly       bool
231         onlyPkts       map[[32]byte]bool
232         writeSPBuf     bytes.Buffer
233         fds            map[string]FdAndFullSize
234         fileHashers    map[string]*HasherAndOffset
235         checkerQueues  SPCheckerQueues
236         sync.RWMutex
237 }
238
239 func (state *SPState) SetDead() {
240         state.Lock()
241         defer state.Unlock()
242         select {
243         case <-state.isDead:
244                 // Already closed channel, dead
245                 return
246         default:
247         }
248         close(state.isDead)
249         go func() {
250                 for range state.payloads {
251                 }
252         }()
253         go func() {
254                 for range state.pings {
255                 }
256         }()
257         go func() {
258                 for _, s := range state.fds {
259                         s.fd.Close()
260                 }
261         }()
262 }
263
264 func (state *SPState) NotAlive() bool {
265         select {
266         case <-state.isDead:
267                 return true
268         default:
269         }
270         return false
271 }
272
273 func (state *SPState) dirUnlock() {
274         state.Ctx.UnlockDir(state.rxLock)
275         state.Ctx.UnlockDir(state.txLock)
276 }
277
278 func SPChecker(ctx *Ctx, nodeId *NodeId, appeared, checked chan *[32]byte) {
279         for hshValue := range appeared {
280                 les := LEs{
281                         {"XX", string(TRx)},
282                         {"Node", nodeId},
283                         {"Pkt", Base32Codec.EncodeToString(hshValue[:])},
284                 }
285                 ctx.LogD("sp-checker", les, "checking")
286                 size, err := ctx.CheckNoCK(nodeId, hshValue)
287                 les = append(les, LE{"Size", size})
288                 if err != nil {
289                         ctx.LogE("sp-checker", les, err, "")
290                         continue
291                 }
292                 ctx.LogI("sp-done", les, "")
293                 go func(hsh *[32]byte) { checked <- hsh }(hshValue)
294         }
295 }
296
297 func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error {
298         state.writeSPBuf.Reset()
299         n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{
300                 Magic:   MagicNNCPLv1,
301                 Payload: payload,
302         })
303         if err != nil {
304                 return err
305         }
306         if n, err = dst.Write(state.writeSPBuf.Bytes()); err == nil {
307                 state.TxLastSeen = time.Now()
308                 state.TxBytes += int64(n)
309                 if !ping {
310                         state.TxLastNonPing = state.TxLastSeen
311                 }
312         }
313         return err
314 }
315
316 func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
317         var sp SPRaw
318         n, err := xdr.UnmarshalLimited(src, &sp, 1<<17)
319         if err != nil {
320                 ue := err.(*xdr.UnmarshalError)
321                 if ue.Err == io.EOF {
322                         return nil, ue.Err
323                 }
324                 return nil, err
325         }
326         state.RxLastSeen = time.Now()
327         state.RxBytes += int64(n)
328         if sp.Magic != MagicNNCPLv1 {
329                 return nil, BadMagic
330         }
331         return sp.Payload, nil
332 }
333
334 func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [][]byte {
335         var infos []*SPInfo
336         var totalSize int64
337         for job := range ctx.Jobs(nodeId, TTx) {
338                 if job.PktEnc.Nice > nice {
339                         continue
340                 }
341                 if _, known := (*seen)[*job.HshValue]; known {
342                         continue
343                 }
344                 totalSize += job.Size
345                 infos = append(infos, &SPInfo{
346                         Nice: job.PktEnc.Nice,
347                         Size: uint64(job.Size),
348                         Hash: job.HshValue,
349                 })
350                 (*seen)[*job.HshValue] = job.PktEnc.Nice
351         }
352         sort.Sort(ByNice(infos))
353         var payloads [][]byte
354         for _, info := range infos {
355                 payloads = append(payloads, MarshalSP(SPTypeInfo, info))
356                 ctx.LogD("sp-info-our", LEs{
357                         {"Node", nodeId},
358                         {"Name", Base32Codec.EncodeToString(info.Hash[:])},
359                         {"Size", info.Size},
360                 }, "")
361         }
362         if totalSize > 0 {
363                 ctx.LogI("sp-infos", LEs{
364                         {"XX", string(TTx)},
365                         {"Node", nodeId},
366                         {"Pkts", len(payloads)},
367                         {"Size", totalSize},
368                 }, "")
369         }
370         return payloadsSplit(payloads)
371 }
372
373 func (state *SPState) StartI(conn ConnDeadlined) error {
374         nodeId := state.Node.Id
375         err := state.Ctx.ensureRxDir(nodeId)
376         if err != nil {
377                 return err
378         }
379         var rxLock *os.File
380         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TRx) {
381                 rxLock, err = state.Ctx.LockDir(nodeId, string(TRx))
382                 if err != nil {
383                         return err
384                 }
385         }
386         var txLock *os.File
387         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
388                 txLock, err = state.Ctx.LockDir(nodeId, string(TTx))
389                 if err != nil {
390                         return err
391                 }
392         }
393         started := time.Now()
394         conf := noise.Config{
395                 CipherSuite: NoiseCipherSuite,
396                 Pattern:     noise.HandshakeIK,
397                 Initiator:   true,
398                 StaticKeypair: noise.DHKey{
399                         Private: state.Ctx.Self.NoisePrv[:],
400                         Public:  state.Ctx.Self.NoisePub[:],
401                 },
402                 PeerStatic: state.Node.NoisePub[:],
403         }
404         hs, err := noise.NewHandshakeState(conf)
405         if err != nil {
406                 return err
407         }
408         state.hs = hs
409         state.payloads = make(chan []byte)
410         state.pings = make(chan struct{})
411         state.infosTheir = make(map[[32]byte]*SPInfo)
412         state.infosOurSeen = make(map[[32]byte]uint8)
413         state.started = started
414         state.rxLock = rxLock
415         state.txLock = txLock
416
417         var infosPayloads [][]byte
418         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
419                 infosPayloads = state.Ctx.infosOur(nodeId, state.Nice, &state.infosOurSeen)
420         }
421         var firstPayload []byte
422         if len(infosPayloads) > 0 {
423                 firstPayload = infosPayloads[0]
424         }
425         // Pad first payload, to hide actual number of existing files
426         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
427                 firstPayload = append(firstPayload, SPHaltMarshalized...)
428         }
429
430         var buf []byte
431         var payload []byte
432         buf, _, _, err = state.hs.WriteMessage(nil, firstPayload)
433         if err != nil {
434                 state.dirUnlock()
435                 return err
436         }
437         les := LEs{{"Node", nodeId}, {"Nice", int(state.Nice)}}
438         state.Ctx.LogD("sp-start", les, "sending first message")
439         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
440         if err = state.WriteSP(conn, buf, false); err != nil {
441                 state.Ctx.LogE("sp-start", les, err, "")
442                 state.dirUnlock()
443                 return err
444         }
445         state.Ctx.LogD("sp-start", les, "waiting for first message")
446         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
447         if buf, err = state.ReadSP(conn); err != nil {
448                 state.Ctx.LogE("sp-start", les, err, "")
449                 state.dirUnlock()
450                 return err
451         }
452         payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
453         if err != nil {
454                 state.Ctx.LogE("sp-start", les, err, "")
455                 state.dirUnlock()
456                 return err
457         }
458         state.Ctx.LogD("sp-start", les, "starting workers")
459         err = state.StartWorkers(conn, infosPayloads, payload)
460         if err != nil {
461                 state.Ctx.LogE("sp-start", les, err, "")
462                 state.dirUnlock()
463         }
464         return err
465 }
466
467 func (state *SPState) StartR(conn ConnDeadlined) error {
468         started := time.Now()
469         conf := noise.Config{
470                 CipherSuite: NoiseCipherSuite,
471                 Pattern:     noise.HandshakeIK,
472                 Initiator:   false,
473                 StaticKeypair: noise.DHKey{
474                         Private: state.Ctx.Self.NoisePrv[:],
475                         Public:  state.Ctx.Self.NoisePub[:],
476                 },
477         }
478         hs, err := noise.NewHandshakeState(conf)
479         if err != nil {
480                 return err
481         }
482         xxOnly := TRxTx("")
483         state.hs = hs
484         state.payloads = make(chan []byte)
485         state.pings = make(chan struct{})
486         state.infosOurSeen = make(map[[32]byte]uint8)
487         state.infosTheir = make(map[[32]byte]*SPInfo)
488         state.started = started
489         state.xxOnly = xxOnly
490
491         var buf []byte
492         var payload []byte
493         state.Ctx.LogD("sp-start", LEs{{"Nice", int(state.Nice)}}, "waiting for first message")
494         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
495         if buf, err = state.ReadSP(conn); err != nil {
496                 state.Ctx.LogE("sp-start", LEs{}, err, "")
497                 return err
498         }
499         if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
500                 state.Ctx.LogE("sp-start", LEs{}, err, "")
501                 return err
502         }
503
504         var node *Node
505         for _, n := range state.Ctx.Neigh {
506                 if subtle.ConstantTimeCompare(state.hs.PeerStatic(), n.NoisePub[:]) == 1 {
507                         node = n
508                         break
509                 }
510         }
511         if node == nil {
512                 peerId := Base32Codec.EncodeToString(state.hs.PeerStatic())
513                 state.Ctx.LogE("sp-start", LEs{{"Peer", peerId}}, errors.New("unknown peer"), "")
514                 return errors.New("Unknown peer: " + peerId)
515         }
516         state.Node = node
517         state.rxRate = node.RxRate
518         state.txRate = node.TxRate
519         state.onlineDeadline = node.OnlineDeadline
520         state.maxOnlineTime = node.MaxOnlineTime
521         les := LEs{{"Node", node.Id}, {"Nice", int(state.Nice)}}
522
523         if err = state.Ctx.ensureRxDir(node.Id); err != nil {
524                 return err
525         }
526         var rxLock *os.File
527         if xxOnly == "" || xxOnly == TRx {
528                 rxLock, err = state.Ctx.LockDir(node.Id, string(TRx))
529                 if err != nil {
530                         return err
531                 }
532         }
533         state.rxLock = rxLock
534         var txLock *os.File
535         if xxOnly == "" || xxOnly == TTx {
536                 txLock, err = state.Ctx.LockDir(node.Id, string(TTx))
537                 if err != nil {
538                         return err
539                 }
540         }
541         state.txLock = txLock
542
543         var infosPayloads [][]byte
544         if xxOnly == "" || xxOnly == TTx {
545                 infosPayloads = state.Ctx.infosOur(node.Id, state.Nice, &state.infosOurSeen)
546         }
547         var firstPayload []byte
548         if len(infosPayloads) > 0 {
549                 firstPayload = infosPayloads[0]
550         }
551         // Pad first payload, to hide actual number of existing files
552         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
553                 firstPayload = append(firstPayload, SPHaltMarshalized...)
554         }
555
556         state.Ctx.LogD("sp-start", les, "sending first message")
557         buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload)
558         if err != nil {
559                 state.dirUnlock()
560                 return err
561         }
562         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
563         if err = state.WriteSP(conn, buf, false); err != nil {
564                 state.Ctx.LogE("sp-start", les, err, "")
565                 state.dirUnlock()
566                 return err
567         }
568         state.Ctx.LogD("sp-start", les, "starting workers")
569         err = state.StartWorkers(conn, infosPayloads, payload)
570         if err != nil {
571                 state.dirUnlock()
572         }
573         return err
574 }
575
576 func (state *SPState) closeFd(pth string) {
577         s, exists := state.fds[pth]
578         delete(state.fds, pth)
579         if exists {
580                 s.fd.Close()
581         }
582 }
583
584 func (state *SPState) StartWorkers(
585         conn ConnDeadlined,
586         infosPayloads [][]byte,
587         payload []byte,
588 ) error {
589         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
590         state.fds = make(map[string]FdAndFullSize)
591         state.fileHashers = make(map[string]*HasherAndOffset)
592         state.isDead = make(chan struct{})
593         if state.maxOnlineTime > 0 {
594                 state.mustFinishAt = state.started.Add(state.maxOnlineTime)
595         }
596
597         // Checker
598         if !state.NoCK {
599                 queues := spCheckers[*state.Node.Id]
600                 if queues == nil {
601                         queues = &SPCheckerQueues{
602                                 appeared: make(chan *[32]byte),
603                                 checked:  make(chan *[32]byte),
604                         }
605                         spCheckers[*state.Node.Id] = queues
606                         go SPChecker(state.Ctx, state.Node.Id, queues.appeared, queues.checked)
607                 }
608                 state.checkerQueues = *queues
609                 go func() {
610                         for job := range state.Ctx.JobsNoCK(state.Node.Id) {
611                                 if job.PktEnc.Nice <= state.Nice {
612                                         state.checkerQueues.appeared <- job.HshValue
613                                 }
614                         }
615                 }()
616                 state.wg.Add(1)
617                 go func() {
618                         defer state.wg.Done()
619                         for {
620                                 select {
621                                 case <-state.isDead:
622                                         return
623                                 case hsh := <-state.checkerQueues.checked:
624                                         state.payloads <- MarshalSP(SPTypeDone, SPDone{hsh})
625                                 }
626                         }
627                 }()
628         }
629
630         // Remaining handshake payload sending
631         if len(infosPayloads) > 1 {
632                 state.wg.Add(1)
633                 go func() {
634                         for _, payload := range infosPayloads[1:] {
635                                 state.Ctx.LogD(
636                                         "sp-work",
637                                         append(les, LE{"Size", len(payload)}),
638                                         "queuing remaining payload",
639                                 )
640                                 state.payloads <- payload
641                         }
642                         state.wg.Done()
643                 }()
644         }
645
646         // Processing of first payload and queueing its responses
647         state.Ctx.LogD(
648                 "sp-work",
649                 append(les, LE{"Size", len(payload)}),
650                 "processing first payload",
651         )
652         replies, err := state.ProcessSP(payload)
653         if err != nil {
654                 state.Ctx.LogE("sp-work", les, err, "")
655                 return err
656         }
657         state.wg.Add(1)
658         go func() {
659                 for _, reply := range replies {
660                         state.Ctx.LogD(
661                                 "sp-work",
662                                 append(les, LE{"Size", len(reply)}),
663                                 "queuing reply",
664                         )
665                         state.payloads <- reply
666                 }
667                 state.wg.Done()
668         }()
669
670         // Periodic jobs
671         state.wg.Add(1)
672         go func() {
673                 deadlineTicker := time.NewTicker(time.Second)
674                 pingTicker := time.NewTicker(PingTimeout)
675                 for {
676                         select {
677                         case <-state.isDead:
678                                 state.wg.Done()
679                                 deadlineTicker.Stop()
680                                 pingTicker.Stop()
681                                 return
682                         case now := <-deadlineTicker.C:
683                                 if (now.Sub(state.RxLastNonPing) >= state.onlineDeadline &&
684                                         now.Sub(state.TxLastNonPing) >= state.onlineDeadline) ||
685                                         (state.maxOnlineTime > 0 && state.mustFinishAt.Before(now)) ||
686                                         (now.Sub(state.RxLastSeen) >= 2*PingTimeout) {
687                                         state.SetDead()
688                                         conn.Close() // #nosec G104
689                                 }
690                         case now := <-pingTicker.C:
691                                 if now.After(state.TxLastSeen.Add(PingTimeout)) {
692                                         state.wg.Add(1)
693                                         go func() {
694                                                 state.pings <- struct{}{}
695                                                 state.wg.Done()
696                                         }()
697                                 }
698                         }
699                 }
700         }()
701
702         // Spool checker and INFOs sender of appearing files
703         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
704                 state.wg.Add(1)
705                 go func() {
706                         ticker := time.NewTicker(time.Second)
707                         for {
708                                 select {
709                                 case <-state.isDead:
710                                         state.wg.Done()
711                                         ticker.Stop()
712                                         return
713                                 case <-ticker.C:
714                                         for _, payload := range state.Ctx.infosOur(
715                                                 state.Node.Id,
716                                                 state.Nice,
717                                                 &state.infosOurSeen,
718                                         ) {
719                                                 state.Ctx.LogD(
720                                                         "sp-work",
721                                                         append(les, LE{"Size", len(payload)}),
722                                                         "queuing new info",
723                                                 )
724                                                 state.payloads <- payload
725                                         }
726                                 }
727                         }
728                 }()
729         }
730
731         // Sender
732         state.wg.Add(1)
733         go func() {
734                 defer conn.Close()
735                 defer state.SetDead()
736                 defer state.wg.Done()
737                 for {
738                         if state.NotAlive() {
739                                 return
740                         }
741                         var payload []byte
742                         var ping bool
743                         select {
744                         case <-state.pings:
745                                 state.Ctx.LogD("sp-xmit", les, "got ping")
746                                 payload = SPPingMarshalized
747                                 ping = true
748                         case payload = <-state.payloads:
749                                 state.Ctx.LogD(
750                                         "sp-xmit",
751                                         append(les, LE{"Size", len(payload)}),
752                                         "got payload",
753                                 )
754                         default:
755                                 state.RLock()
756                                 if len(state.queueTheir) == 0 {
757                                         state.RUnlock()
758                                         time.Sleep(100 * time.Millisecond)
759                                         continue
760                                 }
761                                 freq := state.queueTheir[0].freq
762                                 state.RUnlock()
763                                 if state.txRate > 0 {
764                                         time.Sleep(time.Second / time.Duration(state.txRate))
765                                 }
766                                 lesp := append(les, LEs{
767                                         {"XX", string(TTx)},
768                                         {"Pkt", Base32Codec.EncodeToString(freq.Hash[:])},
769                                         {"Size", int64(freq.Offset)},
770                                 }...)
771                                 state.Ctx.LogD("sp-file", lesp, "queueing")
772                                 pth := filepath.Join(
773                                         state.Ctx.Spool,
774                                         state.Node.Id.String(),
775                                         string(TTx),
776                                         Base32Codec.EncodeToString(freq.Hash[:]),
777                                 )
778                                 fdAndFullSize, exists := state.fds[pth]
779                                 if !exists {
780                                         fd, err := os.Open(pth)
781                                         if err != nil {
782                                                 state.Ctx.LogE("sp-file", lesp, err, "")
783                                                 return
784                                         }
785                                         fi, err := fd.Stat()
786                                         if err != nil {
787                                                 state.Ctx.LogE("sp-file", lesp, err, "")
788                                                 return
789                                         }
790                                         fdAndFullSize = FdAndFullSize{fd: fd, fullSize: fi.Size()}
791                                         state.fds[pth] = fdAndFullSize
792                                 }
793                                 fd := fdAndFullSize.fd
794                                 fullSize := fdAndFullSize.fullSize
795                                 var buf []byte
796                                 if freq.Offset < uint64(fullSize) {
797                                         state.Ctx.LogD("sp-file", lesp, "seeking")
798                                         if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil {
799                                                 state.Ctx.LogE("sp-file", lesp, err, "")
800                                                 return
801                                         }
802                                         buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
803                                         n, err := fd.Read(buf)
804                                         if err != nil {
805                                                 state.Ctx.LogE("sp-file", lesp, err, "")
806                                                 return
807                                         }
808                                         buf = buf[:n]
809                                         state.Ctx.LogD("sp-file", append(lesp, LE{"Size", n}), "read")
810                                 }
811                                 state.closeFd(pth)
812                                 payload = MarshalSP(SPTypeFile, SPFile{
813                                         Hash:    freq.Hash,
814                                         Offset:  freq.Offset,
815                                         Payload: buf,
816                                 })
817                                 ourSize := freq.Offset + uint64(len(buf))
818                                 lesp = append(lesp, LE{"Size", int64(ourSize)})
819                                 lesp = append(lesp, LE{"FullSize", fullSize})
820                                 if state.Ctx.ShowPrgrs {
821                                         Progress("Tx", lesp)
822                                 }
823                                 state.Lock()
824                                 if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash {
825                                         if ourSize == uint64(fullSize) {
826                                                 state.Ctx.LogD("sp-file", lesp, "finished")
827                                                 if len(state.queueTheir) > 1 {
828                                                         state.queueTheir = state.queueTheir[1:]
829                                                 } else {
830                                                         state.queueTheir = state.queueTheir[:0]
831                                                 }
832                                         } else {
833                                                 state.queueTheir[0].freq.Offset += uint64(len(buf))
834                                         }
835                                 } else {
836                                         state.Ctx.LogD("sp-file", lesp, "queue disappeared")
837                                 }
838                                 state.Unlock()
839                         }
840                         state.Ctx.LogD("sp-xmit", append(les, LE{"Size", len(payload)}), "sending")
841                         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
842                         if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload), ping); err != nil {
843                                 state.Ctx.LogE("sp-xmit", les, err, "")
844                                 return
845                         }
846                 }
847         }()
848
849         // Receiver
850         state.wg.Add(1)
851         go func() {
852                 for {
853                         if state.NotAlive() {
854                                 break
855                         }
856                         state.Ctx.LogD("sp-recv", les, "waiting for payload")
857                         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
858                         payload, err := state.ReadSP(conn)
859                         if err != nil {
860                                 if err == io.EOF {
861                                         break
862                                 }
863                                 unmarshalErr := err.(*xdr.UnmarshalError)
864                                 if os.IsTimeout(unmarshalErr.Err) {
865                                         continue
866                                 }
867                                 if unmarshalErr.ErrorCode == xdr.ErrIO {
868                                         break
869                                 }
870                                 state.Ctx.LogE("sp-recv", les, err, "")
871                                 break
872                         }
873                         state.Ctx.LogD(
874                                 "sp-recv",
875                                 append(les, LE{"Size", len(payload)}),
876                                 "got payload",
877                         )
878                         payload, err = state.csTheir.Decrypt(nil, nil, payload)
879                         if err != nil {
880                                 state.Ctx.LogE("sp-recv", les, err, "")
881                                 break
882                         }
883                         state.Ctx.LogD(
884                                 "sp-recv",
885                                 append(les, LE{"Size", len(payload)}),
886                                 "processing",
887                         )
888                         replies, err := state.ProcessSP(payload)
889                         if err != nil {
890                                 state.Ctx.LogE("sp-recv", les, err, "")
891                                 break
892                         }
893                         state.wg.Add(1)
894                         go func() {
895                                 for _, reply := range replies {
896                                         state.Ctx.LogD(
897                                                 "sp-recv",
898                                                 append(les, LE{"Size", len(reply)}),
899                                                 "queuing reply",
900                                         )
901                                         state.payloads <- reply
902                                 }
903                                 state.wg.Done()
904                         }()
905                         if state.rxRate > 0 {
906                                 time.Sleep(time.Second / time.Duration(state.rxRate))
907                         }
908                 }
909                 state.SetDead()
910                 state.wg.Done()
911                 state.SetDead()
912                 conn.Close() // #nosec G104
913         }()
914
915         return nil
916 }
917
918 func (state *SPState) Wait() {
919         state.wg.Wait()
920         close(state.payloads)
921         close(state.pings)
922         state.dirUnlock()
923         state.Duration = time.Now().Sub(state.started)
924         state.RxSpeed = state.RxBytes
925         state.TxSpeed = state.TxBytes
926         rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
927         txDuration := int64(state.TxLastSeen.Sub(state.started).Seconds())
928         if rxDuration > 0 {
929                 state.RxSpeed = state.RxBytes / rxDuration
930         }
931         if txDuration > 0 {
932                 state.TxSpeed = state.TxBytes / txDuration
933         }
934 }
935
936 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
937         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
938         r := bytes.NewReader(payload)
939         var err error
940         var replies [][]byte
941         var infosGot bool
942         for r.Len() > 0 {
943                 state.Ctx.LogD("sp-process", les, "unmarshaling header")
944                 var head SPHead
945                 if _, err = xdr.Unmarshal(r, &head); err != nil {
946                         state.Ctx.LogE("sp-process", les, err, "")
947                         return nil, err
948                 }
949                 if head.Type != SPTypePing {
950                         state.RxLastNonPing = state.RxLastSeen
951                 }
952                 switch head.Type {
953                 case SPTypeHalt:
954                         state.Ctx.LogD("sp-process", append(les, LE{"Type", "halt"}), "")
955                         state.Lock()
956                         state.queueTheir = nil
957                         state.Unlock()
958
959                 case SPTypePing:
960                         state.Ctx.LogD("sp-process", append(les, LE{"Type", "ping"}), "")
961
962                 case SPTypeInfo:
963                         infosGot = true
964                         lesp := append(les, LE{"Type", "info"})
965                         state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
966                         var info SPInfo
967                         if _, err = xdr.Unmarshal(r, &info); err != nil {
968                                 state.Ctx.LogE("sp-process", lesp, err, "")
969                                 return nil, err
970                         }
971                         lesp = append(lesp, LEs{
972                                 {"Pkt", Base32Codec.EncodeToString(info.Hash[:])},
973                                 {"Size", int64(info.Size)},
974                                 {"Nice", int(info.Nice)},
975                         }...)
976                         if !state.listOnly && info.Nice > state.Nice {
977                                 state.Ctx.LogD("sp-process", lesp, "too nice")
978                                 continue
979                         }
980                         state.Ctx.LogD("sp-process", lesp, "received")
981                         if !state.listOnly && state.xxOnly == TTx {
982                                 continue
983                         }
984                         state.Lock()
985                         state.infosTheir[*info.Hash] = &info
986                         state.Unlock()
987                         state.Ctx.LogD("sp-process", lesp, "stating part")
988                         pktPath := filepath.Join(
989                                 state.Ctx.Spool,
990                                 state.Node.Id.String(),
991                                 string(TRx),
992                                 Base32Codec.EncodeToString(info.Hash[:]),
993                         )
994                         if _, err = os.Stat(pktPath); err == nil {
995                                 state.Ctx.LogI("sp-info", lesp, "already done")
996                                 if !state.listOnly {
997                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
998                                 }
999                                 continue
1000                         }
1001                         if _, err = os.Stat(pktPath + SeenSuffix); err == nil {
1002                                 state.Ctx.LogI("sp-info", lesp, "already seen")
1003                                 if !state.listOnly {
1004                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
1005                                 }
1006                                 continue
1007                         }
1008                         if _, err = os.Stat(pktPath + NoCKSuffix); err == nil {
1009                                 state.Ctx.LogI("sp-info", lesp, "still non checksummed")
1010                                 continue
1011                         }
1012                         fi, err := os.Stat(pktPath + PartSuffix)
1013                         var offset int64
1014                         if err == nil {
1015                                 offset = fi.Size()
1016                         }
1017                         if !state.Ctx.IsEnoughSpace(int64(info.Size) - offset) {
1018                                 state.Ctx.LogI("sp-info", lesp, "not enough space")
1019                                 continue
1020                         }
1021                         state.Ctx.LogI("sp-info", append(lesp, LE{"Offset", offset}), "")
1022                         if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) {
1023                                 replies = append(replies, MarshalSP(
1024                                         SPTypeFreq,
1025                                         SPFreq{info.Hash, uint64(offset)},
1026                                 ))
1027                         }
1028
1029                 case SPTypeFile:
1030                         lesp := append(les, LE{"Type", "file"})
1031                         state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
1032                         var file SPFile
1033                         if _, err = xdr.Unmarshal(r, &file); err != nil {
1034                                 state.Ctx.LogE("sp-process", lesp, err, "")
1035                                 return nil, err
1036                         }
1037                         lesp = append(lesp, LEs{
1038                                 {"XX", string(TRx)},
1039                                 {"Pkt", Base32Codec.EncodeToString(file.Hash[:])},
1040                                 {"Size", len(file.Payload)},
1041                         }...)
1042                         dirToSync := filepath.Join(
1043                                 state.Ctx.Spool,
1044                                 state.Node.Id.String(),
1045                                 string(TRx),
1046                         )
1047                         filePath := filepath.Join(dirToSync, Base32Codec.EncodeToString(file.Hash[:]))
1048                         filePathPart := filePath + PartSuffix
1049                         state.Ctx.LogD("sp-file", lesp, "opening part")
1050                         fdAndFullSize, exists := state.fds[filePathPart]
1051                         var fd *os.File
1052                         if exists {
1053                                 fd = fdAndFullSize.fd
1054                         } else {
1055                                 fd, err = os.OpenFile(
1056                                         filePathPart,
1057                                         os.O_RDWR|os.O_CREATE,
1058                                         os.FileMode(0666),
1059                                 )
1060                                 if err != nil {
1061                                         state.Ctx.LogE("sp-file", lesp, err, "")
1062                                         return nil, err
1063                                 }
1064                                 state.fds[filePathPart] = FdAndFullSize{fd: fd}
1065                                 if file.Offset == 0 {
1066                                         h, err := blake2b.New256(nil)
1067                                         if err != nil {
1068                                                 panic(err)
1069                                         }
1070                                         state.fileHashers[filePath] = &HasherAndOffset{h: h}
1071                                 }
1072                         }
1073                         state.Ctx.LogD("sp-file", append(lesp, LE{"Offset", file.Offset}), "seeking")
1074                         if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
1075                                 state.Ctx.LogE("sp-file", lesp, err, "")
1076                                 state.closeFd(filePathPart)
1077                                 return nil, err
1078                         }
1079                         state.Ctx.LogD("sp-file", lesp, "writing")
1080                         if _, err = fd.Write(file.Payload); err != nil {
1081                                 state.Ctx.LogE("sp-file", lesp, err, "")
1082                                 state.closeFd(filePathPart)
1083                                 return nil, err
1084                         }
1085                         hasherAndOffset, hasherExists := state.fileHashers[filePath]
1086                         if hasherExists {
1087                                 if hasherAndOffset.offset == file.Offset {
1088                                         if _, err = hasherAndOffset.h.Write(file.Payload); err != nil {
1089                                                 panic(err)
1090                                         }
1091                                         hasherAndOffset.offset += uint64(len(file.Payload))
1092                                 } else {
1093                                         state.Ctx.LogE(
1094                                                 "sp-file", lesp,
1095                                                 errors.New("offset differs"),
1096                                                 "deleting hasher",
1097                                         )
1098                                         delete(state.fileHashers, filePath)
1099                                         hasherExists = false
1100                                 }
1101                         }
1102                         ourSize := int64(file.Offset + uint64(len(file.Payload)))
1103                         lesp[len(lesp)-1].V = ourSize
1104                         fullsize := int64(0)
1105                         state.RLock()
1106                         infoTheir, ok := state.infosTheir[*file.Hash]
1107                         state.RUnlock()
1108                         if ok {
1109                                 fullsize = int64(infoTheir.Size)
1110                         }
1111                         lesp = append(lesp, LE{"FullSize", fullsize})
1112                         if state.Ctx.ShowPrgrs {
1113                                 Progress("Rx", lesp)
1114                         }
1115                         if fullsize != ourSize {
1116                                 continue
1117                         }
1118                         err = fd.Sync()
1119                         if err != nil {
1120                                 state.Ctx.LogE("sp-file", lesp, err, "sync")
1121                                 state.closeFd(filePathPart)
1122                                 continue
1123                         }
1124                         if hasherExists {
1125                                 if bytes.Compare(hasherAndOffset.h.Sum(nil), file.Hash[:]) != 0 {
1126                                         state.Ctx.LogE("sp-file", lesp, errors.New("checksum mismatch"), "")
1127                                         continue
1128                                 }
1129                                 if err = os.Rename(filePathPart, filePath); err != nil {
1130                                         state.Ctx.LogE("sp-file", lesp, err, "rename")
1131                                         continue
1132                                 }
1133                                 if err = DirSync(dirToSync); err != nil {
1134                                         state.Ctx.LogE("sp-file", lesp, err, "sync")
1135                                         continue
1136                                 }
1137                                 state.Ctx.LogI("sp-file", lesp, "done")
1138                                 state.wg.Add(1)
1139                                 go func() {
1140                                         state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
1141                                         state.wg.Done()
1142                                 }()
1143                                 state.Lock()
1144                                 delete(state.infosTheir, *file.Hash)
1145                                 state.Unlock()
1146                                 if !state.Ctx.HdrUsage {
1147                                         state.closeFd(filePathPart)
1148                                         continue
1149                                 }
1150                                 if _, err = fd.Seek(0, io.SeekStart); err != nil {
1151                                         state.Ctx.LogE("sp-file", lesp, err, "seek")
1152                                         state.closeFd(filePathPart)
1153                                         continue
1154                                 }
1155                                 _, pktEncRaw, err := state.Ctx.HdrRead(fd)
1156                                 state.closeFd(filePathPart)
1157                                 if err != nil {
1158                                         state.Ctx.LogE("sp-file", lesp, err, "HdrRead")
1159                                         continue
1160                                 }
1161                                 state.Ctx.HdrWrite(pktEncRaw, filePath)
1162                                 continue
1163                         }
1164                         state.closeFd(filePathPart)
1165                         if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil {
1166                                 state.Ctx.LogE("sp-file", lesp, err, "rename")
1167                                 continue
1168                         }
1169                         if err = DirSync(dirToSync); err != nil {
1170                                 state.Ctx.LogE("sp-file", lesp, err, "sync")
1171                                 continue
1172                         }
1173                         state.Ctx.LogI("sp-file", lesp, "downloaded")
1174                         state.Lock()
1175                         delete(state.infosTheir, *file.Hash)
1176                         state.Unlock()
1177                         if !state.NoCK {
1178                                 state.checkerQueues.appeared <- file.Hash
1179                         }
1180
1181                 case SPTypeDone:
1182                         lesp := append(les, LE{"Type", "done"})
1183                         state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
1184                         var done SPDone
1185                         if _, err = xdr.Unmarshal(r, &done); err != nil {
1186                                 state.Ctx.LogE("sp-process", lesp, err, "")
1187                                 return nil, err
1188                         }
1189                         lesp = append(lesp, LE{"Pkt", Base32Codec.EncodeToString(done.Hash[:])})
1190                         lesp = append(lesp, LE{"XX", string(TTx)})
1191                         state.Ctx.LogD("sp-done", lesp, "removing")
1192                         pth := filepath.Join(
1193                                 state.Ctx.Spool,
1194                                 state.Node.Id.String(),
1195                                 string(TTx),
1196                                 Base32Codec.EncodeToString(done.Hash[:]),
1197                         )
1198                         if err = os.Remove(pth); err == nil {
1199                                 state.Ctx.LogI("sp-done", lesp, "")
1200                                 if state.Ctx.HdrUsage {
1201                                         os.Remove(pth + HdrSuffix)
1202                                 }
1203                         } else {
1204                                 state.Ctx.LogE("sp-done", lesp, err, "")
1205                         }
1206
1207                 case SPTypeFreq:
1208                         lesp := append(les, LE{"Type", "freq"})
1209                         state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
1210                         var freq SPFreq
1211                         if _, err = xdr.Unmarshal(r, &freq); err != nil {
1212                                 state.Ctx.LogE("sp-process", lesp, err, "")
1213                                 return nil, err
1214                         }
1215                         lesp = append(lesp, LE{"Pkt", Base32Codec.EncodeToString(freq.Hash[:])})
1216                         lesp = append(lesp, LE{"Offset", freq.Offset})
1217                         state.Ctx.LogD("sp-process", lesp, "queueing")
1218                         nice, exists := state.infosOurSeen[*freq.Hash]
1219                         if exists {
1220                                 if state.onlyPkts == nil || !state.onlyPkts[*freq.Hash] {
1221                                         state.Lock()
1222                                         insertIdx := 0
1223                                         var freqWithNice *FreqWithNice
1224                                         for insertIdx, freqWithNice = range state.queueTheir {
1225                                                 if freqWithNice.nice > nice {
1226                                                         break
1227                                                 }
1228                                         }
1229                                         state.queueTheir = append(state.queueTheir, nil)
1230                                         copy(state.queueTheir[insertIdx+1:], state.queueTheir[insertIdx:])
1231                                         state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice}
1232                                         state.Unlock()
1233                                 } else {
1234                                         state.Ctx.LogD("sp-process", lesp, "skipping")
1235                                 }
1236                         } else {
1237                                 state.Ctx.LogD("sp-process", lesp, "unknown")
1238                         }
1239
1240                 default:
1241                         state.Ctx.LogE(
1242                                 "sp-process",
1243                                 append(les, LE{"Type", head.Type}),
1244                                 errors.New("unknown type"),
1245                                 "",
1246                         )
1247                         return nil, BadPktType
1248                 }
1249         }
1250         if infosGot {
1251                 var pkts int
1252                 var size uint64
1253                 state.RLock()
1254                 for _, info := range state.infosTheir {
1255                         pkts++
1256                         size += info.Size
1257                 }
1258                 state.RUnlock()
1259                 state.Ctx.LogI("sp-infos", LEs{
1260                         {"XX", string(TRx)},
1261                         {"Node", state.Node.Id},
1262                         {"Pkts", pkts},
1263                         {"Size", int64(size)},
1264                 }, "")
1265         }
1266         return payloadsSplit(replies), nil
1267 }