]> Cypherpunks.ru repositories - nncp.git/blob - src/sp.go
.hdr files
[nncp.git] / src / sp.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bytes"
22         "crypto/subtle"
23         "errors"
24         "hash"
25         "io"
26         "os"
27         "path/filepath"
28         "sort"
29         "sync"
30         "time"
31
32         xdr "github.com/davecgh/go-xdr/xdr2"
33         "github.com/flynn/noise"
34         "golang.org/x/crypto/blake2b"
35 )
36
37 const (
38         MaxSPSize      = 1<<16 - 256
39         PartSuffix     = ".part"
40         SPHeadOverhead = 4
41 )
42
43 var (
44         MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1}
45
46         SPInfoOverhead    int
47         SPFreqOverhead    int
48         SPFileOverhead    int
49         SPHaltMarshalized []byte
50         SPPingMarshalized []byte
51
52         NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite(
53                 noise.DH25519,
54                 noise.CipherChaChaPoly,
55                 noise.HashBLAKE2b,
56         )
57
58         DefaultDeadline = 10 * time.Second
59         PingTimeout     = time.Minute
60 )
61
62 type FdAndFullSize struct {
63         fd       *os.File
64         fullSize int64
65 }
66
67 type HasherAndOffset struct {
68         h      hash.Hash
69         offset uint64
70 }
71
72 type SPType uint8
73
74 const (
75         SPTypeInfo SPType = iota
76         SPTypeFreq SPType = iota
77         SPTypeFile SPType = iota
78         SPTypeDone SPType = iota
79         SPTypeHalt SPType = iota
80         SPTypePing SPType = iota
81 )
82
83 type SPHead struct {
84         Type SPType
85 }
86
87 type SPInfo struct {
88         Nice uint8
89         Size uint64
90         Hash *[32]byte
91 }
92
93 type SPFreq struct {
94         Hash   *[32]byte
95         Offset uint64
96 }
97
98 type SPFile struct {
99         Hash    *[32]byte
100         Offset  uint64
101         Payload []byte
102 }
103
104 type SPDone struct {
105         Hash *[32]byte
106 }
107
108 type SPRaw struct {
109         Magic   [8]byte
110         Payload []byte
111 }
112
113 type FreqWithNice struct {
114         freq *SPFreq
115         nice uint8
116 }
117
118 type ConnDeadlined interface {
119         io.ReadWriteCloser
120         SetReadDeadline(t time.Time) error
121         SetWriteDeadline(t time.Time) error
122 }
123
124 func init() {
125         var buf bytes.Buffer
126         spHead := SPHead{Type: SPTypeHalt}
127         if _, err := xdr.Marshal(&buf, spHead); err != nil {
128                 panic(err)
129         }
130         SPHaltMarshalized = make([]byte, SPHeadOverhead)
131         copy(SPHaltMarshalized, buf.Bytes())
132         buf.Reset()
133
134         spHead = SPHead{Type: SPTypePing}
135         if _, err := xdr.Marshal(&buf, spHead); err != nil {
136                 panic(err)
137         }
138         SPPingMarshalized = make([]byte, SPHeadOverhead)
139         copy(SPPingMarshalized, buf.Bytes())
140         buf.Reset()
141
142         spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([32]byte)}
143         if _, err := xdr.Marshal(&buf, spInfo); err != nil {
144                 panic(err)
145         }
146         SPInfoOverhead = buf.Len()
147         buf.Reset()
148
149         spFreq := SPFreq{Hash: new([32]byte), Offset: 123}
150         if _, err := xdr.Marshal(&buf, spFreq); err != nil {
151                 panic(err)
152         }
153         SPFreqOverhead = buf.Len()
154         buf.Reset()
155
156         spFile := SPFile{Hash: new([32]byte), Offset: 123}
157         if _, err := xdr.Marshal(&buf, spFile); err != nil {
158                 panic(err)
159         }
160         SPFileOverhead = buf.Len()
161 }
162
163 func MarshalSP(typ SPType, sp interface{}) []byte {
164         var buf bytes.Buffer
165         if _, err := xdr.Marshal(&buf, SPHead{typ}); err != nil {
166                 panic(err)
167         }
168         if _, err := xdr.Marshal(&buf, sp); err != nil {
169                 panic(err)
170         }
171         return buf.Bytes()
172 }
173
174 func payloadsSplit(payloads [][]byte) [][]byte {
175         var outbounds [][]byte
176         outbound := make([]byte, 0, MaxSPSize)
177         for i, payload := range payloads {
178                 outbound = append(outbound, payload...)
179                 if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxSPSize {
180                         outbounds = append(outbounds, outbound)
181                         outbound = make([]byte, 0, MaxSPSize)
182                 }
183         }
184         if len(outbound) > 0 {
185                 outbounds = append(outbounds, outbound)
186         }
187         return outbounds
188 }
189
190 type SPState struct {
191         Ctx            *Ctx
192         Node           *Node
193         Nice           uint8
194         NoCK           bool
195         onlineDeadline time.Duration
196         maxOnlineTime  time.Duration
197         hs             *noise.HandshakeState
198         csOur          *noise.CipherState
199         csTheir        *noise.CipherState
200         payloads       chan []byte
201         pings          chan struct{}
202         infosTheir     map[[32]byte]*SPInfo
203         infosOurSeen   map[[32]byte]uint8
204         queueTheir     []*FreqWithNice
205         wg             sync.WaitGroup
206         RxBytes        int64
207         RxLastSeen     time.Time
208         RxLastNonPing  time.Time
209         TxBytes        int64
210         TxLastSeen     time.Time
211         TxLastNonPing  time.Time
212         started        time.Time
213         mustFinishAt   time.Time
214         Duration       time.Duration
215         RxSpeed        int64
216         TxSpeed        int64
217         rxLock         *os.File
218         txLock         *os.File
219         xxOnly         TRxTx
220         rxRate         int
221         txRate         int
222         isDead         chan struct{}
223         listOnly       bool
224         onlyPkts       map[[32]byte]bool
225         writeSPBuf     bytes.Buffer
226         fds            map[string]FdAndFullSize
227         fileHashers    map[string]*HasherAndOffset
228         checkerJobs    chan *[32]byte
229         sync.RWMutex
230 }
231
232 func (state *SPState) SetDead() {
233         state.Lock()
234         defer state.Unlock()
235         select {
236         case <-state.isDead:
237                 // Already closed channel, dead
238                 return
239         default:
240         }
241         close(state.isDead)
242         go func() {
243                 for range state.payloads {
244                 }
245         }()
246         go func() {
247                 for range state.pings {
248                 }
249         }()
250         go func() {
251                 for _, s := range state.fds {
252                         s.fd.Close()
253                 }
254         }()
255         if !state.NoCK {
256                 close(state.checkerJobs)
257         }
258 }
259
260 func (state *SPState) NotAlive() bool {
261         select {
262         case <-state.isDead:
263                 return true
264         default:
265         }
266         return false
267 }
268
269 func (state *SPState) dirUnlock() {
270         state.Ctx.UnlockDir(state.rxLock)
271         state.Ctx.UnlockDir(state.txLock)
272 }
273
274 func (state *SPState) SPChecker() {
275         for hshValue := range state.checkerJobs {
276                 les := LEs{
277                         {"XX", string(TRx)},
278                         {"Node", state.Node.Id},
279                         {"Pkt", Base32Codec.EncodeToString(hshValue[:])},
280                 }
281                 state.Ctx.LogD("sp-file", les, "checking")
282                 size, err := state.Ctx.CheckNoCK(state.Node.Id, hshValue)
283                 les = append(les, LE{"Size", size})
284                 if err != nil {
285                         state.Ctx.LogE("sp-file", les, err, "")
286                         continue
287                 }
288                 state.Ctx.LogI("sp-done", les, "")
289                 state.wg.Add(1)
290                 go func(hsh *[32]byte) {
291                         if !state.NotAlive() {
292                                 state.payloads <- MarshalSP(SPTypeDone, SPDone{hsh})
293                         }
294                         state.wg.Done()
295                 }(hshValue)
296         }
297 }
298
299 func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error {
300         state.writeSPBuf.Reset()
301         n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{
302                 Magic:   MagicNNCPLv1,
303                 Payload: payload,
304         })
305         if err != nil {
306                 return err
307         }
308         if n, err = dst.Write(state.writeSPBuf.Bytes()); err == nil {
309                 state.TxLastSeen = time.Now()
310                 state.TxBytes += int64(n)
311                 if !ping {
312                         state.TxLastNonPing = state.TxLastSeen
313                 }
314         }
315         return err
316 }
317
318 func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
319         var sp SPRaw
320         n, err := xdr.UnmarshalLimited(src, &sp, 1<<17)
321         if err != nil {
322                 ue := err.(*xdr.UnmarshalError)
323                 if ue.Err == io.EOF {
324                         return nil, ue.Err
325                 }
326                 return nil, err
327         }
328         state.RxLastSeen = time.Now()
329         state.RxBytes += int64(n)
330         if sp.Magic != MagicNNCPLv1 {
331                 return nil, BadMagic
332         }
333         return sp.Payload, nil
334 }
335
336 func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [][]byte {
337         var infos []*SPInfo
338         var totalSize int64
339         for job := range ctx.Jobs(nodeId, TTx) {
340                 if job.PktEnc.Nice > nice {
341                         continue
342                 }
343                 if _, known := (*seen)[*job.HshValue]; known {
344                         continue
345                 }
346                 totalSize += job.Size
347                 infos = append(infos, &SPInfo{
348                         Nice: job.PktEnc.Nice,
349                         Size: uint64(job.Size),
350                         Hash: job.HshValue,
351                 })
352                 (*seen)[*job.HshValue] = job.PktEnc.Nice
353         }
354         sort.Sort(ByNice(infos))
355         var payloads [][]byte
356         for _, info := range infos {
357                 payloads = append(payloads, MarshalSP(SPTypeInfo, info))
358                 ctx.LogD("sp-info-our", LEs{
359                         {"Node", nodeId},
360                         {"Name", Base32Codec.EncodeToString(info.Hash[:])},
361                         {"Size", info.Size},
362                 }, "")
363         }
364         if totalSize > 0 {
365                 ctx.LogI("sp-infos", LEs{
366                         {"XX", string(TTx)},
367                         {"Node", nodeId},
368                         {"Pkts", len(payloads)},
369                         {"Size", totalSize},
370                 }, "")
371         }
372         return payloadsSplit(payloads)
373 }
374
375 func (state *SPState) StartI(conn ConnDeadlined) error {
376         nodeId := state.Node.Id
377         err := state.Ctx.ensureRxDir(nodeId)
378         if err != nil {
379                 return err
380         }
381         var rxLock *os.File
382         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TRx) {
383                 rxLock, err = state.Ctx.LockDir(nodeId, string(TRx))
384                 if err != nil {
385                         return err
386                 }
387         }
388         var txLock *os.File
389         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
390                 txLock, err = state.Ctx.LockDir(nodeId, string(TTx))
391                 if err != nil {
392                         return err
393                 }
394         }
395         started := time.Now()
396         conf := noise.Config{
397                 CipherSuite: NoiseCipherSuite,
398                 Pattern:     noise.HandshakeIK,
399                 Initiator:   true,
400                 StaticKeypair: noise.DHKey{
401                         Private: state.Ctx.Self.NoisePrv[:],
402                         Public:  state.Ctx.Self.NoisePub[:],
403                 },
404                 PeerStatic: state.Node.NoisePub[:],
405         }
406         hs, err := noise.NewHandshakeState(conf)
407         if err != nil {
408                 return err
409         }
410         state.hs = hs
411         state.payloads = make(chan []byte)
412         state.pings = make(chan struct{})
413         state.infosTheir = make(map[[32]byte]*SPInfo)
414         state.infosOurSeen = make(map[[32]byte]uint8)
415         state.started = started
416         state.rxLock = rxLock
417         state.txLock = txLock
418
419         var infosPayloads [][]byte
420         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
421                 infosPayloads = state.Ctx.infosOur(nodeId, state.Nice, &state.infosOurSeen)
422         }
423         var firstPayload []byte
424         if len(infosPayloads) > 0 {
425                 firstPayload = infosPayloads[0]
426         }
427         // Pad first payload, to hide actual number of existing files
428         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
429                 firstPayload = append(firstPayload, SPHaltMarshalized...)
430         }
431
432         var buf []byte
433         var payload []byte
434         buf, _, _, err = state.hs.WriteMessage(nil, firstPayload)
435         if err != nil {
436                 state.dirUnlock()
437                 return err
438         }
439         les := LEs{{"Node", nodeId}, {"Nice", int(state.Nice)}}
440         state.Ctx.LogD("sp-start", les, "sending first message")
441         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
442         if err = state.WriteSP(conn, buf, false); err != nil {
443                 state.Ctx.LogE("sp-start", les, err, "")
444                 state.dirUnlock()
445                 return err
446         }
447         state.Ctx.LogD("sp-start", les, "waiting for first message")
448         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
449         if buf, err = state.ReadSP(conn); err != nil {
450                 state.Ctx.LogE("sp-start", les, err, "")
451                 state.dirUnlock()
452                 return err
453         }
454         payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
455         if err != nil {
456                 state.Ctx.LogE("sp-start", les, err, "")
457                 state.dirUnlock()
458                 return err
459         }
460         state.Ctx.LogD("sp-start", les, "starting workers")
461         err = state.StartWorkers(conn, infosPayloads, payload)
462         if err != nil {
463                 state.Ctx.LogE("sp-start", les, err, "")
464                 state.dirUnlock()
465         }
466         return err
467 }
468
469 func (state *SPState) StartR(conn ConnDeadlined) error {
470         started := time.Now()
471         conf := noise.Config{
472                 CipherSuite: NoiseCipherSuite,
473                 Pattern:     noise.HandshakeIK,
474                 Initiator:   false,
475                 StaticKeypair: noise.DHKey{
476                         Private: state.Ctx.Self.NoisePrv[:],
477                         Public:  state.Ctx.Self.NoisePub[:],
478                 },
479         }
480         hs, err := noise.NewHandshakeState(conf)
481         if err != nil {
482                 return err
483         }
484         xxOnly := TRxTx("")
485         state.hs = hs
486         state.payloads = make(chan []byte)
487         state.pings = make(chan struct{})
488         state.infosOurSeen = make(map[[32]byte]uint8)
489         state.infosTheir = make(map[[32]byte]*SPInfo)
490         state.started = started
491         state.xxOnly = xxOnly
492
493         var buf []byte
494         var payload []byte
495         state.Ctx.LogD("sp-start", LEs{{"Nice", int(state.Nice)}}, "waiting for first message")
496         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
497         if buf, err = state.ReadSP(conn); err != nil {
498                 state.Ctx.LogE("sp-start", LEs{}, err, "")
499                 return err
500         }
501         if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
502                 state.Ctx.LogE("sp-start", LEs{}, err, "")
503                 return err
504         }
505
506         var node *Node
507         for _, n := range state.Ctx.Neigh {
508                 if subtle.ConstantTimeCompare(state.hs.PeerStatic(), n.NoisePub[:]) == 1 {
509                         node = n
510                         break
511                 }
512         }
513         if node == nil {
514                 peerId := Base32Codec.EncodeToString(state.hs.PeerStatic())
515                 state.Ctx.LogE("sp-start", LEs{{"Peer", peerId}}, errors.New("unknown peer"), "")
516                 return errors.New("Unknown peer: " + peerId)
517         }
518         state.Node = node
519         state.rxRate = node.RxRate
520         state.txRate = node.TxRate
521         state.onlineDeadline = node.OnlineDeadline
522         state.maxOnlineTime = node.MaxOnlineTime
523         les := LEs{{"Node", node.Id}, {"Nice", int(state.Nice)}}
524
525         if err = state.Ctx.ensureRxDir(node.Id); err != nil {
526                 return err
527         }
528         var rxLock *os.File
529         if xxOnly == "" || xxOnly == TRx {
530                 rxLock, err = state.Ctx.LockDir(node.Id, string(TRx))
531                 if err != nil {
532                         return err
533                 }
534         }
535         state.rxLock = rxLock
536         var txLock *os.File
537         if xxOnly == "" || xxOnly == TTx {
538                 txLock, err = state.Ctx.LockDir(node.Id, string(TTx))
539                 if err != nil {
540                         return err
541                 }
542         }
543         state.txLock = txLock
544
545         var infosPayloads [][]byte
546         if xxOnly == "" || xxOnly == TTx {
547                 infosPayloads = state.Ctx.infosOur(node.Id, state.Nice, &state.infosOurSeen)
548         }
549         var firstPayload []byte
550         if len(infosPayloads) > 0 {
551                 firstPayload = infosPayloads[0]
552         }
553         // Pad first payload, to hide actual number of existing files
554         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
555                 firstPayload = append(firstPayload, SPHaltMarshalized...)
556         }
557
558         state.Ctx.LogD("sp-start", les, "sending first message")
559         buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload)
560         if err != nil {
561                 state.dirUnlock()
562                 return err
563         }
564         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
565         if err = state.WriteSP(conn, buf, false); err != nil {
566                 state.Ctx.LogE("sp-start", les, err, "")
567                 state.dirUnlock()
568                 return err
569         }
570         state.Ctx.LogD("sp-start", les, "starting workers")
571         err = state.StartWorkers(conn, infosPayloads, payload)
572         if err != nil {
573                 state.dirUnlock()
574         }
575         return err
576 }
577
578 func (state *SPState) closeFd(pth string) {
579         s, exists := state.fds[pth]
580         delete(state.fds, pth)
581         if exists {
582                 s.fd.Close()
583         }
584 }
585
586 func (state *SPState) FillExistingNoCK() {
587         checkerJobs := make([]*[32]byte, 0)
588         for job := range state.Ctx.JobsNoCK(state.Node.Id) {
589                 if job.PktEnc.Nice > state.Nice {
590                         continue
591                 }
592                 checkerJobs = append(checkerJobs, job.HshValue)
593         }
594         for _, job := range checkerJobs {
595                 state.checkerJobs <- job
596         }
597         state.wg.Done()
598 }
599
600 func (state *SPState) StartWorkers(
601         conn ConnDeadlined,
602         infosPayloads [][]byte,
603         payload []byte,
604 ) error {
605         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
606         state.fds = make(map[string]FdAndFullSize)
607         state.fileHashers = make(map[string]*HasherAndOffset)
608         state.isDead = make(chan struct{})
609         if state.maxOnlineTime > 0 {
610                 state.mustFinishAt = state.started.Add(state.maxOnlineTime)
611         }
612
613         // Checker
614         if !state.NoCK {
615                 state.checkerJobs = make(chan *[32]byte)
616                 go state.SPChecker()
617                 state.wg.Add(1)
618                 go state.FillExistingNoCK()
619         }
620
621         // Remaining handshake payload sending
622         if len(infosPayloads) > 1 {
623                 state.wg.Add(1)
624                 go func() {
625                         for _, payload := range infosPayloads[1:] {
626                                 state.Ctx.LogD(
627                                         "sp-work",
628                                         append(les, LE{"Size", len(payload)}),
629                                         "queuing remaining payload",
630                                 )
631                                 state.payloads <- payload
632                         }
633                         state.wg.Done()
634                 }()
635         }
636
637         // Processing of first payload and queueing its responses
638         state.Ctx.LogD(
639                 "sp-work",
640                 append(les, LE{"Size", len(payload)}),
641                 "processing first payload",
642         )
643         replies, err := state.ProcessSP(payload)
644         if err != nil {
645                 state.Ctx.LogE("sp-work", les, err, "")
646                 return err
647         }
648         state.wg.Add(1)
649         go func() {
650                 for _, reply := range replies {
651                         state.Ctx.LogD(
652                                 "sp-work",
653                                 append(les, LE{"Size", len(reply)}),
654                                 "queuing reply",
655                         )
656                         state.payloads <- reply
657                 }
658                 state.wg.Done()
659         }()
660
661         // Periodic jobs
662         state.wg.Add(1)
663         go func() {
664                 deadlineTicker := time.NewTicker(time.Second)
665                 pingTicker := time.NewTicker(PingTimeout)
666                 for {
667                         select {
668                         case <-state.isDead:
669                                 state.wg.Done()
670                                 deadlineTicker.Stop()
671                                 pingTicker.Stop()
672                                 return
673                         case now := <-deadlineTicker.C:
674                                 if (now.Sub(state.RxLastNonPing) >= state.onlineDeadline &&
675                                         now.Sub(state.TxLastNonPing) >= state.onlineDeadline) ||
676                                         (state.maxOnlineTime > 0 && state.mustFinishAt.Before(now)) ||
677                                         (now.Sub(state.RxLastSeen) >= 2*PingTimeout) {
678                                         state.SetDead()
679                                         conn.Close() // #nosec G104
680                                 }
681                         case now := <-pingTicker.C:
682                                 if now.After(state.TxLastSeen.Add(PingTimeout)) {
683                                         state.wg.Add(1)
684                                         go func() {
685                                                 state.pings <- struct{}{}
686                                                 state.wg.Done()
687                                         }()
688                                 }
689                         }
690                 }
691         }()
692
693         // Spool checker and INFOs sender of appearing files
694         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
695                 state.wg.Add(1)
696                 go func() {
697                         ticker := time.NewTicker(time.Second)
698                         for {
699                                 select {
700                                 case <-state.isDead:
701                                         state.wg.Done()
702                                         ticker.Stop()
703                                         return
704                                 case <-ticker.C:
705                                         for _, payload := range state.Ctx.infosOur(
706                                                 state.Node.Id,
707                                                 state.Nice,
708                                                 &state.infosOurSeen,
709                                         ) {
710                                                 state.Ctx.LogD(
711                                                         "sp-work",
712                                                         append(les, LE{"Size", len(payload)}),
713                                                         "queuing new info",
714                                                 )
715                                                 state.payloads <- payload
716                                         }
717                                 }
718                         }
719                 }()
720         }
721
722         // Sender
723         state.wg.Add(1)
724         go func() {
725                 defer conn.Close()
726                 defer state.SetDead()
727                 defer state.wg.Done()
728                 for {
729                         if state.NotAlive() {
730                                 return
731                         }
732                         var payload []byte
733                         var ping bool
734                         select {
735                         case <-state.pings:
736                                 state.Ctx.LogD("sp-xmit", les, "got ping")
737                                 payload = SPPingMarshalized
738                                 ping = true
739                         case payload = <-state.payloads:
740                                 state.Ctx.LogD(
741                                         "sp-xmit",
742                                         append(les, LE{"Size", len(payload)}),
743                                         "got payload",
744                                 )
745                         default:
746                                 state.RLock()
747                                 if len(state.queueTheir) == 0 {
748                                         state.RUnlock()
749                                         time.Sleep(100 * time.Millisecond)
750                                         continue
751                                 }
752                                 freq := state.queueTheir[0].freq
753                                 state.RUnlock()
754                                 if state.txRate > 0 {
755                                         time.Sleep(time.Second / time.Duration(state.txRate))
756                                 }
757                                 lesp := append(les, LEs{
758                                         {"XX", string(TTx)},
759                                         {"Pkt", Base32Codec.EncodeToString(freq.Hash[:])},
760                                         {"Size", int64(freq.Offset)},
761                                 }...)
762                                 state.Ctx.LogD("sp-file", lesp, "queueing")
763                                 pth := filepath.Join(
764                                         state.Ctx.Spool,
765                                         state.Node.Id.String(),
766                                         string(TTx),
767                                         Base32Codec.EncodeToString(freq.Hash[:]),
768                                 )
769                                 fdAndFullSize, exists := state.fds[pth]
770                                 if !exists {
771                                         fd, err := os.Open(pth)
772                                         if err != nil {
773                                                 state.Ctx.LogE("sp-file", lesp, err, "")
774                                                 return
775                                         }
776                                         fi, err := fd.Stat()
777                                         if err != nil {
778                                                 state.Ctx.LogE("sp-file", lesp, err, "")
779                                                 return
780                                         }
781                                         fdAndFullSize = FdAndFullSize{fd: fd, fullSize: fi.Size()}
782                                         state.fds[pth] = fdAndFullSize
783                                 }
784                                 fd := fdAndFullSize.fd
785                                 fullSize := fdAndFullSize.fullSize
786                                 var buf []byte
787                                 if freq.Offset < uint64(fullSize) {
788                                         state.Ctx.LogD("sp-file", lesp, "seeking")
789                                         if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil {
790                                                 state.Ctx.LogE("sp-file", lesp, err, "")
791                                                 return
792                                         }
793                                         buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
794                                         n, err := fd.Read(buf)
795                                         if err != nil {
796                                                 state.Ctx.LogE("sp-file", lesp, err, "")
797                                                 return
798                                         }
799                                         buf = buf[:n]
800                                         state.Ctx.LogD("sp-file", append(lesp, LE{"Size", n}), "read")
801                                 }
802                                 state.closeFd(pth)
803                                 payload = MarshalSP(SPTypeFile, SPFile{
804                                         Hash:    freq.Hash,
805                                         Offset:  freq.Offset,
806                                         Payload: buf,
807                                 })
808                                 ourSize := freq.Offset + uint64(len(buf))
809                                 lesp = append(lesp, LE{"Size", int64(ourSize)})
810                                 lesp = append(lesp, LE{"FullSize", fullSize})
811                                 if state.Ctx.ShowPrgrs {
812                                         Progress("Tx", lesp)
813                                 }
814                                 state.Lock()
815                                 if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash {
816                                         if ourSize == uint64(fullSize) {
817                                                 state.Ctx.LogD("sp-file", lesp, "finished")
818                                                 if len(state.queueTheir) > 1 {
819                                                         state.queueTheir = state.queueTheir[1:]
820                                                 } else {
821                                                         state.queueTheir = state.queueTheir[:0]
822                                                 }
823                                         } else {
824                                                 state.queueTheir[0].freq.Offset += uint64(len(buf))
825                                         }
826                                 } else {
827                                         state.Ctx.LogD("sp-file", lesp, "queue disappeared")
828                                 }
829                                 state.Unlock()
830                         }
831                         state.Ctx.LogD("sp-xmit", append(les, LE{"Size", len(payload)}), "sending")
832                         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
833                         if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload), ping); err != nil {
834                                 state.Ctx.LogE("sp-xmit", les, err, "")
835                                 return
836                         }
837                 }
838         }()
839
840         // Receiver
841         state.wg.Add(1)
842         go func() {
843                 for {
844                         if state.NotAlive() {
845                                 break
846                         }
847                         state.Ctx.LogD("sp-recv", les, "waiting for payload")
848                         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
849                         payload, err := state.ReadSP(conn)
850                         if err != nil {
851                                 if err == io.EOF {
852                                         break
853                                 }
854                                 unmarshalErr := err.(*xdr.UnmarshalError)
855                                 if os.IsTimeout(unmarshalErr.Err) {
856                                         continue
857                                 }
858                                 if unmarshalErr.ErrorCode == xdr.ErrIO {
859                                         break
860                                 }
861                                 state.Ctx.LogE("sp-recv", les, err, "")
862                                 break
863                         }
864                         state.Ctx.LogD(
865                                 "sp-recv",
866                                 append(les, LE{"Size", len(payload)}),
867                                 "got payload",
868                         )
869                         payload, err = state.csTheir.Decrypt(nil, nil, payload)
870                         if err != nil {
871                                 state.Ctx.LogE("sp-recv", les, err, "")
872                                 break
873                         }
874                         state.Ctx.LogD(
875                                 "sp-recv",
876                                 append(les, LE{"Size", len(payload)}),
877                                 "processing",
878                         )
879                         replies, err := state.ProcessSP(payload)
880                         if err != nil {
881                                 state.Ctx.LogE("sp-recv", les, err, "")
882                                 break
883                         }
884                         state.wg.Add(1)
885                         go func() {
886                                 for _, reply := range replies {
887                                         state.Ctx.LogD(
888                                                 "sp-recv",
889                                                 append(les, LE{"Size", len(reply)}),
890                                                 "queuing reply",
891                                         )
892                                         state.payloads <- reply
893                                 }
894                                 state.wg.Done()
895                         }()
896                         if state.rxRate > 0 {
897                                 time.Sleep(time.Second / time.Duration(state.rxRate))
898                         }
899                 }
900                 state.SetDead()
901                 state.wg.Done()
902                 state.SetDead()
903                 conn.Close() // #nosec G104
904         }()
905
906         return nil
907 }
908
909 func (state *SPState) Wait() {
910         state.wg.Wait()
911         close(state.payloads)
912         close(state.pings)
913         state.dirUnlock()
914         state.Duration = time.Now().Sub(state.started)
915         state.RxSpeed = state.RxBytes
916         state.TxSpeed = state.TxBytes
917         rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
918         txDuration := int64(state.TxLastSeen.Sub(state.started).Seconds())
919         if rxDuration > 0 {
920                 state.RxSpeed = state.RxBytes / rxDuration
921         }
922         if txDuration > 0 {
923                 state.TxSpeed = state.TxBytes / txDuration
924         }
925 }
926
927 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
928         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
929         r := bytes.NewReader(payload)
930         var err error
931         var replies [][]byte
932         var infosGot bool
933         for r.Len() > 0 {
934                 state.Ctx.LogD("sp-process", les, "unmarshaling header")
935                 var head SPHead
936                 if _, err = xdr.Unmarshal(r, &head); err != nil {
937                         state.Ctx.LogE("sp-process", les, err, "")
938                         return nil, err
939                 }
940                 if head.Type != SPTypePing {
941                         state.RxLastNonPing = state.RxLastSeen
942                 }
943                 switch head.Type {
944                 case SPTypeHalt:
945                         state.Ctx.LogD("sp-process", append(les, LE{"Type", "halt"}), "")
946                         state.Lock()
947                         state.queueTheir = nil
948                         state.Unlock()
949
950                 case SPTypePing:
951                         state.Ctx.LogD("sp-process", append(les, LE{"Type", "ping"}), "")
952
953                 case SPTypeInfo:
954                         infosGot = true
955                         lesp := append(les, LE{"Type", "info"})
956                         state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
957                         var info SPInfo
958                         if _, err = xdr.Unmarshal(r, &info); err != nil {
959                                 state.Ctx.LogE("sp-process", lesp, err, "")
960                                 return nil, err
961                         }
962                         lesp = append(lesp, LEs{
963                                 {"Pkt", Base32Codec.EncodeToString(info.Hash[:])},
964                                 {"Size", int64(info.Size)},
965                                 {"Nice", int(info.Nice)},
966                         }...)
967                         if !state.listOnly && info.Nice > state.Nice {
968                                 state.Ctx.LogD("sp-process", lesp, "too nice")
969                                 continue
970                         }
971                         state.Ctx.LogD("sp-process", lesp, "received")
972                         if !state.listOnly && state.xxOnly == TTx {
973                                 continue
974                         }
975                         state.Lock()
976                         state.infosTheir[*info.Hash] = &info
977                         state.Unlock()
978                         state.Ctx.LogD("sp-process", lesp, "stating part")
979                         pktPath := filepath.Join(
980                                 state.Ctx.Spool,
981                                 state.Node.Id.String(),
982                                 string(TRx),
983                                 Base32Codec.EncodeToString(info.Hash[:]),
984                         )
985                         if _, err = os.Stat(pktPath); err == nil {
986                                 state.Ctx.LogI("sp-info", lesp, "already done")
987                                 if !state.listOnly {
988                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
989                                 }
990                                 continue
991                         }
992                         if _, err = os.Stat(pktPath + SeenSuffix); err == nil {
993                                 state.Ctx.LogI("sp-info", lesp, "already seen")
994                                 if !state.listOnly {
995                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
996                                 }
997                                 continue
998                         }
999                         if _, err = os.Stat(pktPath + NoCKSuffix); err == nil {
1000                                 state.Ctx.LogI("sp-info", lesp, "still non checksummed")
1001                                 continue
1002                         }
1003                         fi, err := os.Stat(pktPath + PartSuffix)
1004                         var offset int64
1005                         if err == nil {
1006                                 offset = fi.Size()
1007                         }
1008                         if !state.Ctx.IsEnoughSpace(int64(info.Size) - offset) {
1009                                 state.Ctx.LogI("sp-info", lesp, "not enough space")
1010                                 continue
1011                         }
1012                         state.Ctx.LogI("sp-info", append(lesp, LE{"Offset", offset}), "")
1013                         if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) {
1014                                 replies = append(replies, MarshalSP(
1015                                         SPTypeFreq,
1016                                         SPFreq{info.Hash, uint64(offset)},
1017                                 ))
1018                         }
1019
1020                 case SPTypeFile:
1021                         lesp := append(les, LE{"Type", "file"})
1022                         state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
1023                         var file SPFile
1024                         if _, err = xdr.Unmarshal(r, &file); err != nil {
1025                                 state.Ctx.LogE("sp-process", lesp, err, "")
1026                                 return nil, err
1027                         }
1028                         lesp = append(lesp, LEs{
1029                                 {"XX", string(TRx)},
1030                                 {"Pkt", Base32Codec.EncodeToString(file.Hash[:])},
1031                                 {"Size", len(file.Payload)},
1032                         }...)
1033                         dirToSync := filepath.Join(
1034                                 state.Ctx.Spool,
1035                                 state.Node.Id.String(),
1036                                 string(TRx),
1037                         )
1038                         filePath := filepath.Join(dirToSync, Base32Codec.EncodeToString(file.Hash[:]))
1039                         filePathPart := filePath + PartSuffix
1040                         state.Ctx.LogD("sp-file", lesp, "opening part")
1041                         fdAndFullSize, exists := state.fds[filePathPart]
1042                         var fd *os.File
1043                         if exists {
1044                                 fd = fdAndFullSize.fd
1045                         } else {
1046                                 fd, err = os.OpenFile(
1047                                         filePathPart,
1048                                         os.O_RDWR|os.O_CREATE,
1049                                         os.FileMode(0666),
1050                                 )
1051                                 if err != nil {
1052                                         state.Ctx.LogE("sp-file", lesp, err, "")
1053                                         return nil, err
1054                                 }
1055                                 state.fds[filePathPart] = FdAndFullSize{fd: fd}
1056                                 if file.Offset == 0 {
1057                                         h, err := blake2b.New256(nil)
1058                                         if err != nil {
1059                                                 panic(err)
1060                                         }
1061                                         state.fileHashers[filePath] = &HasherAndOffset{h: h}
1062                                 }
1063                         }
1064                         state.Ctx.LogD("sp-file", append(lesp, LE{"Offset", file.Offset}), "seeking")
1065                         if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
1066                                 state.Ctx.LogE("sp-file", lesp, err, "")
1067                                 state.closeFd(filePathPart)
1068                                 return nil, err
1069                         }
1070                         state.Ctx.LogD("sp-file", lesp, "writing")
1071                         if _, err = fd.Write(file.Payload); err != nil {
1072                                 state.Ctx.LogE("sp-file", lesp, err, "")
1073                                 state.closeFd(filePathPart)
1074                                 return nil, err
1075                         }
1076                         hasherAndOffset, hasherExists := state.fileHashers[filePath]
1077                         if hasherExists {
1078                                 if hasherAndOffset.offset == file.Offset {
1079                                         if _, err = hasherAndOffset.h.Write(file.Payload); err != nil {
1080                                                 panic(err)
1081                                         }
1082                                         hasherAndOffset.offset += uint64(len(file.Payload))
1083                                 } else {
1084                                         state.Ctx.LogE(
1085                                                 "sp-file", lesp,
1086                                                 errors.New("offset differs"),
1087                                                 "deleting hasher",
1088                                         )
1089                                         delete(state.fileHashers, filePath)
1090                                         hasherExists = false
1091                                 }
1092                         }
1093                         ourSize := int64(file.Offset + uint64(len(file.Payload)))
1094                         lesp[len(lesp)-1].V = ourSize
1095                         fullsize := int64(0)
1096                         state.RLock()
1097                         infoTheir, ok := state.infosTheir[*file.Hash]
1098                         state.RUnlock()
1099                         if ok {
1100                                 fullsize = int64(infoTheir.Size)
1101                         }
1102                         lesp = append(lesp, LE{"FullSize", fullsize})
1103                         if state.Ctx.ShowPrgrs {
1104                                 Progress("Rx", lesp)
1105                         }
1106                         if fullsize != ourSize {
1107                                 continue
1108                         }
1109                         err = fd.Sync()
1110                         if err != nil {
1111                                 state.Ctx.LogE("sp-file", lesp, err, "sync")
1112                                 state.closeFd(filePathPart)
1113                                 continue
1114                         }
1115                         if hasherExists {
1116                                 if bytes.Compare(hasherAndOffset.h.Sum(nil), file.Hash[:]) != 0 {
1117                                         state.Ctx.LogE("sp-file", lesp, errors.New("checksum mismatch"), "")
1118                                         continue
1119                                 }
1120                                 if err = os.Rename(filePathPart, filePath); err != nil {
1121                                         state.Ctx.LogE("sp-file", lesp, err, "rename")
1122                                         continue
1123                                 }
1124                                 if err = DirSync(dirToSync); err != nil {
1125                                         state.Ctx.LogE("sp-file", lesp, err, "sync")
1126                                         continue
1127                                 }
1128                                 state.Ctx.LogI("sp-file", lesp, "done")
1129                                 state.wg.Add(1)
1130                                 go func() {
1131                                         state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
1132                                         state.wg.Done()
1133                                 }()
1134                                 state.Lock()
1135                                 delete(state.infosTheir, *file.Hash)
1136                                 state.Unlock()
1137                                 if !state.Ctx.HdrUsage {
1138                                         state.closeFd(filePathPart)
1139                                         continue
1140                                 }
1141                                 if _, err = fd.Seek(0, io.SeekStart); err != nil {
1142                                         state.Ctx.LogE("sp-file", lesp, err, "seek")
1143                                         state.closeFd(filePathPart)
1144                                         continue
1145                                 }
1146                                 _, pktEncRaw, err := state.Ctx.HdrRead(fd)
1147                                 state.closeFd(filePathPart)
1148                                 if err != nil {
1149                                         state.Ctx.LogE("sp-file", lesp, err, "HdrRead")
1150                                         continue
1151                                 }
1152                                 state.Ctx.HdrWrite(pktEncRaw, filePath)
1153                                 continue
1154                         }
1155                         state.closeFd(filePathPart)
1156                         if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil {
1157                                 state.Ctx.LogE("sp-file", lesp, err, "rename")
1158                                 continue
1159                         }
1160                         if err = DirSync(dirToSync); err != nil {
1161                                 state.Ctx.LogE("sp-file", lesp, err, "sync")
1162                                 continue
1163                         }
1164                         state.Ctx.LogI("sp-file", lesp, "downloaded")
1165                         state.Lock()
1166                         delete(state.infosTheir, *file.Hash)
1167                         state.Unlock()
1168                         if !state.NoCK {
1169                                 state.checkerJobs <- file.Hash
1170                         }
1171
1172                 case SPTypeDone:
1173                         lesp := append(les, LE{"Type", "done"})
1174                         state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
1175                         var done SPDone
1176                         if _, err = xdr.Unmarshal(r, &done); err != nil {
1177                                 state.Ctx.LogE("sp-process", lesp, err, "")
1178                                 return nil, err
1179                         }
1180                         lesp = append(lesp, LE{"Pkt", Base32Codec.EncodeToString(done.Hash[:])})
1181                         state.Ctx.LogD("sp-done", lesp, "removing")
1182                         pth := filepath.Join(
1183                                 state.Ctx.Spool,
1184                                 state.Node.Id.String(),
1185                                 string(TTx),
1186                                 Base32Codec.EncodeToString(done.Hash[:]),
1187                         )
1188                         err := os.Remove(pth)
1189                         lesp = append(lesp, LE{"XX", string(TTx)})
1190                         if err == nil {
1191                                 state.Ctx.LogI("sp-done", lesp, "")
1192                                 if state.Ctx.HdrUsage {
1193                                         os.Remove(pth + HdrSuffix)
1194                                 }
1195                         } else {
1196                                 state.Ctx.LogE("sp-done", lesp, err, "")
1197                         }
1198
1199                 case SPTypeFreq:
1200                         lesp := append(les, LE{"Type", "freq"})
1201                         state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
1202                         var freq SPFreq
1203                         if _, err = xdr.Unmarshal(r, &freq); err != nil {
1204                                 state.Ctx.LogE("sp-process", lesp, err, "")
1205                                 return nil, err
1206                         }
1207                         lesp = append(lesp, LE{"Pkt", Base32Codec.EncodeToString(freq.Hash[:])})
1208                         lesp = append(lesp, LE{"Offset", freq.Offset})
1209                         state.Ctx.LogD("sp-process", lesp, "queueing")
1210                         nice, exists := state.infosOurSeen[*freq.Hash]
1211                         if exists {
1212                                 if state.onlyPkts == nil || !state.onlyPkts[*freq.Hash] {
1213                                         state.Lock()
1214                                         insertIdx := 0
1215                                         var freqWithNice *FreqWithNice
1216                                         for insertIdx, freqWithNice = range state.queueTheir {
1217                                                 if freqWithNice.nice > nice {
1218                                                         break
1219                                                 }
1220                                         }
1221                                         state.queueTheir = append(state.queueTheir, nil)
1222                                         copy(state.queueTheir[insertIdx+1:], state.queueTheir[insertIdx:])
1223                                         state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice}
1224                                         state.Unlock()
1225                                 } else {
1226                                         state.Ctx.LogD("sp-process", lesp, "skipping")
1227                                 }
1228                         } else {
1229                                 state.Ctx.LogD("sp-process", lesp, "unknown")
1230                         }
1231
1232                 default:
1233                         state.Ctx.LogE(
1234                                 "sp-process",
1235                                 append(les, LE{"Type", head.Type}),
1236                                 errors.New("unknown type"),
1237                                 "",
1238                         )
1239                         return nil, BadPktType
1240                 }
1241         }
1242         if infosGot {
1243                 var pkts int
1244                 var size uint64
1245                 state.RLock()
1246                 for _, info := range state.infosTheir {
1247                         pkts++
1248                         size += info.Size
1249                 }
1250                 state.RUnlock()
1251                 state.Ctx.LogI("sp-infos", LEs{
1252                         {"XX", string(TRx)},
1253                         {"Node", state.Node.Id},
1254                         {"Pkts", pkts},
1255                         {"Size", int64(size)},
1256                 }, "")
1257         }
1258         return payloadsSplit(replies), nil
1259 }