]> Cypherpunks.ru repositories - nncp.git/blob - src/sp.go
b5bad2d5dcd21a16b909c44a8ab29addf6d8a3a2
[nncp.git] / src / sp.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bytes"
22         "crypto/subtle"
23         "errors"
24         "io"
25         "os"
26         "path/filepath"
27         "sort"
28         "sync"
29         "time"
30
31         xdr "github.com/davecgh/go-xdr/xdr2"
32         "github.com/flynn/noise"
33 )
34
35 const (
36         MaxSPSize      = 1<<16 - 256
37         PartSuffix     = ".part"
38         SPHeadOverhead = 4
39 )
40
41 var (
42         MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1}
43
44         SPInfoOverhead    int
45         SPFreqOverhead    int
46         SPFileOverhead    int
47         SPHaltMarshalized []byte
48         SPPingMarshalized []byte
49
50         NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite(
51                 noise.DH25519,
52                 noise.CipherChaChaPoly,
53                 noise.HashBLAKE2b,
54         )
55
56         DefaultDeadline = 10 * time.Second
57         PingTimeout     = time.Minute
58
59         spCheckerToken chan struct{}
60 )
61
62 type SPType uint8
63
64 const (
65         SPTypeInfo SPType = iota
66         SPTypeFreq SPType = iota
67         SPTypeFile SPType = iota
68         SPTypeDone SPType = iota
69         SPTypeHalt SPType = iota
70         SPTypePing SPType = iota
71 )
72
73 type SPHead struct {
74         Type SPType
75 }
76
77 type SPInfo struct {
78         Nice uint8
79         Size uint64
80         Hash *[32]byte
81 }
82
83 type SPFreq struct {
84         Hash   *[32]byte
85         Offset uint64
86 }
87
88 type SPFile struct {
89         Hash    *[32]byte
90         Offset  uint64
91         Payload []byte
92 }
93
94 type SPDone struct {
95         Hash *[32]byte
96 }
97
98 type SPRaw struct {
99         Magic   [8]byte
100         Payload []byte
101 }
102
103 type FreqWithNice struct {
104         freq *SPFreq
105         nice uint8
106 }
107
108 type ConnDeadlined interface {
109         io.ReadWriteCloser
110         SetReadDeadline(t time.Time) error
111         SetWriteDeadline(t time.Time) error
112 }
113
114 func init() {
115         var buf bytes.Buffer
116         spHead := SPHead{Type: SPTypeHalt}
117         if _, err := xdr.Marshal(&buf, spHead); err != nil {
118                 panic(err)
119         }
120         SPHaltMarshalized = make([]byte, SPHeadOverhead)
121         copy(SPHaltMarshalized, buf.Bytes())
122         buf.Reset()
123
124         spHead = SPHead{Type: SPTypePing}
125         if _, err := xdr.Marshal(&buf, spHead); err != nil {
126                 panic(err)
127         }
128         SPPingMarshalized = make([]byte, SPHeadOverhead)
129         copy(SPPingMarshalized, buf.Bytes())
130         buf.Reset()
131
132         spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([32]byte)}
133         if _, err := xdr.Marshal(&buf, spInfo); err != nil {
134                 panic(err)
135         }
136         SPInfoOverhead = buf.Len()
137         buf.Reset()
138
139         spFreq := SPFreq{Hash: new([32]byte), Offset: 123}
140         if _, err := xdr.Marshal(&buf, spFreq); err != nil {
141                 panic(err)
142         }
143         SPFreqOverhead = buf.Len()
144         buf.Reset()
145
146         spFile := SPFile{Hash: new([32]byte), Offset: 123}
147         if _, err := xdr.Marshal(&buf, spFile); err != nil {
148                 panic(err)
149         }
150         SPFileOverhead = buf.Len()
151         spCheckerToken = make(chan struct{}, 1)
152         spCheckerToken <- struct{}{}
153 }
154
155 func MarshalSP(typ SPType, sp interface{}) []byte {
156         var buf bytes.Buffer
157         if _, err := xdr.Marshal(&buf, SPHead{typ}); err != nil {
158                 panic(err)
159         }
160         if _, err := xdr.Marshal(&buf, sp); err != nil {
161                 panic(err)
162         }
163         return buf.Bytes()
164 }
165
166 func payloadsSplit(payloads [][]byte) [][]byte {
167         var outbounds [][]byte
168         outbound := make([]byte, 0, MaxSPSize)
169         for i, payload := range payloads {
170                 outbound = append(outbound, payload...)
171                 if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxSPSize {
172                         outbounds = append(outbounds, outbound)
173                         outbound = make([]byte, 0, MaxSPSize)
174                 }
175         }
176         if len(outbound) > 0 {
177                 outbounds = append(outbounds, outbound)
178         }
179         return outbounds
180 }
181
182 type SPState struct {
183         Ctx            *Ctx
184         Node           *Node
185         Nice           uint8
186         onlineDeadline time.Duration
187         maxOnlineTime  time.Duration
188         hs             *noise.HandshakeState
189         csOur          *noise.CipherState
190         csTheir        *noise.CipherState
191         payloads       chan []byte
192         pings          chan struct{}
193         infosTheir     map[[32]byte]*SPInfo
194         infosOurSeen   map[[32]byte]uint8
195         queueTheir     []*FreqWithNice
196         wg             sync.WaitGroup
197         RxBytes        int64
198         RxLastSeen     time.Time
199         RxLastNonPing  time.Time
200         TxBytes        int64
201         TxLastSeen     time.Time
202         TxLastNonPing  time.Time
203         started        time.Time
204         mustFinishAt   time.Time
205         Duration       time.Duration
206         RxSpeed        int64
207         TxSpeed        int64
208         rxLock         *os.File
209         txLock         *os.File
210         xxOnly         TRxTx
211         rxRate         int
212         txRate         int
213         isDead         chan struct{}
214         listOnly       bool
215         onlyPkts       map[[32]byte]bool
216         writeSPBuf     bytes.Buffer
217         sync.RWMutex
218 }
219
220 func (state *SPState) SetDead() {
221         state.Lock()
222         defer state.Unlock()
223         select {
224         case <-state.isDead:
225                 // Already closed channel, dead
226                 return
227         default:
228         }
229         close(state.isDead)
230         go func() {
231                 for range state.payloads {
232                 }
233         }()
234         go func() {
235                 for range state.pings {
236                 }
237         }()
238 }
239
240 func (state *SPState) NotAlive() bool {
241         select {
242         case <-state.isDead:
243                 return true
244         default:
245         }
246         return false
247 }
248
249 func (state *SPState) dirUnlock() {
250         state.Ctx.UnlockDir(state.rxLock)
251         state.Ctx.UnlockDir(state.txLock)
252 }
253
254 func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error {
255         state.writeSPBuf.Reset()
256         n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{
257                 Magic:   MagicNNCPLv1,
258                 Payload: payload,
259         })
260         if err != nil {
261                 return err
262         }
263         if n, err = dst.Write(state.writeSPBuf.Bytes()); err == nil {
264                 state.TxLastSeen = time.Now()
265                 state.TxBytes += int64(n)
266                 if !ping {
267                         state.TxLastNonPing = state.TxLastSeen
268                 }
269         }
270         return err
271 }
272
273 func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
274         var sp SPRaw
275         n, err := xdr.UnmarshalLimited(src, &sp, 1<<17)
276         if err != nil {
277                 ue := err.(*xdr.UnmarshalError)
278                 if ue.Err == io.EOF {
279                         return nil, ue.Err
280                 }
281                 return nil, err
282         }
283         state.RxLastSeen = time.Now()
284         state.RxBytes += int64(n)
285         if sp.Magic != MagicNNCPLv1 {
286                 return nil, BadMagic
287         }
288         return sp.Payload, nil
289 }
290
291 func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [][]byte {
292         var infos []*SPInfo
293         var totalSize int64
294         for job := range ctx.Jobs(nodeId, TTx) {
295                 if job.PktEnc.Nice > nice {
296                         continue
297                 }
298                 if _, known := (*seen)[*job.HshValue]; known {
299                         continue
300                 }
301                 totalSize += job.Size
302                 infos = append(infos, &SPInfo{
303                         Nice: job.PktEnc.Nice,
304                         Size: uint64(job.Size),
305                         Hash: job.HshValue,
306                 })
307                 (*seen)[*job.HshValue] = job.PktEnc.Nice
308         }
309         sort.Sort(ByNice(infos))
310         var payloads [][]byte
311         for _, info := range infos {
312                 payloads = append(payloads, MarshalSP(SPTypeInfo, info))
313                 ctx.LogD("sp-info-our", LEs{
314                         {"Node", nodeId},
315                         {"Name", Base32Codec.EncodeToString(info.Hash[:])},
316                         {"Size", info.Size},
317                 }, "")
318         }
319         if totalSize > 0 {
320                 ctx.LogI("sp-infos", LEs{
321                         {"XX", string(TTx)},
322                         {"Node", nodeId},
323                         {"Pkts", len(payloads)},
324                         {"Size", totalSize},
325                 }, "")
326         }
327         return payloadsSplit(payloads)
328 }
329
330 func (state *SPState) StartI(conn ConnDeadlined) error {
331         nodeId := state.Node.Id
332         err := state.Ctx.ensureRxDir(nodeId)
333         if err != nil {
334                 return err
335         }
336         var rxLock *os.File
337         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TRx) {
338                 rxLock, err = state.Ctx.LockDir(nodeId, string(TRx))
339                 if err != nil {
340                         return err
341                 }
342         }
343         var txLock *os.File
344         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
345                 txLock, err = state.Ctx.LockDir(nodeId, string(TTx))
346                 if err != nil {
347                         return err
348                 }
349         }
350         started := time.Now()
351         conf := noise.Config{
352                 CipherSuite: NoiseCipherSuite,
353                 Pattern:     noise.HandshakeIK,
354                 Initiator:   true,
355                 StaticKeypair: noise.DHKey{
356                         Private: state.Ctx.Self.NoisePrv[:],
357                         Public:  state.Ctx.Self.NoisePub[:],
358                 },
359                 PeerStatic: state.Node.NoisePub[:],
360         }
361         hs, err := noise.NewHandshakeState(conf)
362         if err != nil {
363                 return err
364         }
365         state.hs = hs
366         state.payloads = make(chan []byte)
367         state.pings = make(chan struct{})
368         state.infosTheir = make(map[[32]byte]*SPInfo)
369         state.infosOurSeen = make(map[[32]byte]uint8)
370         state.started = started
371         state.rxLock = rxLock
372         state.txLock = txLock
373
374         var infosPayloads [][]byte
375         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
376                 infosPayloads = state.Ctx.infosOur(nodeId, state.Nice, &state.infosOurSeen)
377         }
378         var firstPayload []byte
379         if len(infosPayloads) > 0 {
380                 firstPayload = infosPayloads[0]
381         }
382         // Pad first payload, to hide actual number of existing files
383         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
384                 firstPayload = append(firstPayload, SPHaltMarshalized...)
385         }
386
387         var buf []byte
388         var payload []byte
389         buf, _, _, err = state.hs.WriteMessage(nil, firstPayload)
390         if err != nil {
391                 state.dirUnlock()
392                 return err
393         }
394         les := LEs{{"Node", nodeId}, {"Nice", int(state.Nice)}}
395         state.Ctx.LogD("sp-start", les, "sending first message")
396         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
397         if err = state.WriteSP(conn, buf, false); err != nil {
398                 state.Ctx.LogE("sp-start", les, err, "")
399                 state.dirUnlock()
400                 return err
401         }
402         state.Ctx.LogD("sp-start", les, "waiting for first message")
403         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
404         if buf, err = state.ReadSP(conn); err != nil {
405                 state.Ctx.LogE("sp-start", les, err, "")
406                 state.dirUnlock()
407                 return err
408         }
409         payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
410         if err != nil {
411                 state.Ctx.LogE("sp-start", les, err, "")
412                 state.dirUnlock()
413                 return err
414         }
415         state.Ctx.LogD("sp-start", les, "starting workers")
416         err = state.StartWorkers(conn, infosPayloads, payload)
417         if err != nil {
418                 state.Ctx.LogE("sp-start", les, err, "")
419                 state.dirUnlock()
420         }
421         return err
422 }
423
424 func (state *SPState) StartR(conn ConnDeadlined) error {
425         started := time.Now()
426         conf := noise.Config{
427                 CipherSuite: NoiseCipherSuite,
428                 Pattern:     noise.HandshakeIK,
429                 Initiator:   false,
430                 StaticKeypair: noise.DHKey{
431                         Private: state.Ctx.Self.NoisePrv[:],
432                         Public:  state.Ctx.Self.NoisePub[:],
433                 },
434         }
435         hs, err := noise.NewHandshakeState(conf)
436         if err != nil {
437                 return err
438         }
439         xxOnly := TRxTx("")
440         state.hs = hs
441         state.payloads = make(chan []byte)
442         state.pings = make(chan struct{})
443         state.infosOurSeen = make(map[[32]byte]uint8)
444         state.infosTheir = make(map[[32]byte]*SPInfo)
445         state.started = started
446         state.xxOnly = xxOnly
447         var buf []byte
448         var payload []byte
449         state.Ctx.LogD("sp-start", LEs{{"Nice", int(state.Nice)}}, "waiting for first message")
450         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
451         if buf, err = state.ReadSP(conn); err != nil {
452                 state.Ctx.LogE("sp-start", LEs{}, err, "")
453                 return err
454         }
455         if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
456                 state.Ctx.LogE("sp-start", LEs{}, err, "")
457                 return err
458         }
459
460         var node *Node
461         for _, n := range state.Ctx.Neigh {
462                 if subtle.ConstantTimeCompare(state.hs.PeerStatic(), n.NoisePub[:]) == 1 {
463                         node = n
464                         break
465                 }
466         }
467         if node == nil {
468                 peerId := Base32Codec.EncodeToString(state.hs.PeerStatic())
469                 state.Ctx.LogE("sp-start", LEs{{"Peer", peerId}}, errors.New("unknown peer"), "")
470                 return errors.New("Unknown peer: " + peerId)
471         }
472         state.Node = node
473         state.rxRate = node.RxRate
474         state.txRate = node.TxRate
475         state.onlineDeadline = node.OnlineDeadline
476         state.maxOnlineTime = node.MaxOnlineTime
477         les := LEs{{"Node", node.Id}, {"Nice", int(state.Nice)}}
478
479         if err = state.Ctx.ensureRxDir(node.Id); err != nil {
480                 return err
481         }
482         var rxLock *os.File
483         if xxOnly == "" || xxOnly == TRx {
484                 rxLock, err = state.Ctx.LockDir(node.Id, string(TRx))
485                 if err != nil {
486                         return err
487                 }
488         }
489         state.rxLock = rxLock
490         var txLock *os.File
491         if xxOnly == "" || xxOnly == TTx {
492                 txLock, err = state.Ctx.LockDir(node.Id, string(TTx))
493                 if err != nil {
494                         return err
495                 }
496         }
497         state.txLock = txLock
498
499         var infosPayloads [][]byte
500         if xxOnly == "" || xxOnly == TTx {
501                 infosPayloads = state.Ctx.infosOur(node.Id, state.Nice, &state.infosOurSeen)
502         }
503         var firstPayload []byte
504         if len(infosPayloads) > 0 {
505                 firstPayload = infosPayloads[0]
506         }
507         // Pad first payload, to hide actual number of existing files
508         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
509                 firstPayload = append(firstPayload, SPHaltMarshalized...)
510         }
511
512         state.Ctx.LogD("sp-start", les, "sending first message")
513         buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload)
514         if err != nil {
515                 state.dirUnlock()
516                 return err
517         }
518         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
519         if err = state.WriteSP(conn, buf, false); err != nil {
520                 state.Ctx.LogE("sp-start", les, err, "")
521                 state.dirUnlock()
522                 return err
523         }
524         state.Ctx.LogD("sp-start", les, "starting workers")
525         err = state.StartWorkers(conn, infosPayloads, payload)
526         if err != nil {
527                 state.dirUnlock()
528         }
529         return err
530 }
531
532 func (state *SPState) StartWorkers(
533         conn ConnDeadlined,
534         infosPayloads [][]byte,
535         payload []byte,
536 ) error {
537         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
538         state.isDead = make(chan struct{})
539         if state.maxOnlineTime > 0 {
540                 state.mustFinishAt = state.started.Add(state.maxOnlineTime)
541         }
542
543         // Remaining handshake payload sending
544         if len(infosPayloads) > 1 {
545                 state.wg.Add(1)
546                 go func() {
547                         for _, payload := range infosPayloads[1:] {
548                                 state.Ctx.LogD(
549                                         "sp-work",
550                                         append(les, LE{"Size", len(payload)}),
551                                         "queuing remaining payload",
552                                 )
553                                 state.payloads <- payload
554                         }
555                         state.wg.Done()
556                 }()
557         }
558
559         // Processing of first payload and queueing its responses
560         state.Ctx.LogD(
561                 "sp-work",
562                 append(les, LE{"Size", len(payload)}),
563                 "processing first payload",
564         )
565         replies, err := state.ProcessSP(payload)
566         if err != nil {
567                 state.Ctx.LogE("sp-work", les, err, "")
568                 return err
569         }
570         state.wg.Add(1)
571         go func() {
572                 for _, reply := range replies {
573                         state.Ctx.LogD(
574                                 "sp-work",
575                                 append(les, LE{"Size", len(reply)}),
576                                 "queuing reply",
577                         )
578                         state.payloads <- reply
579                 }
580                 state.wg.Done()
581         }()
582
583         // Periodic jobs
584         state.wg.Add(1)
585         go func() {
586                 deadlineTicker := time.NewTicker(time.Second)
587                 pingTicker := time.NewTicker(PingTimeout)
588                 for {
589                         select {
590                         case <-state.isDead:
591                                 state.wg.Done()
592                                 deadlineTicker.Stop()
593                                 pingTicker.Stop()
594                                 return
595                         case now := <-deadlineTicker.C:
596                                 if (now.Sub(state.RxLastNonPing) >= state.onlineDeadline &&
597                                         now.Sub(state.TxLastNonPing) >= state.onlineDeadline) ||
598                                         (state.maxOnlineTime > 0 && state.mustFinishAt.Before(now)) ||
599                                         (now.Sub(state.RxLastSeen) >= 2*PingTimeout) {
600                                         state.SetDead()
601                                         conn.Close() // #nosec G104
602                                 }
603                         case now := <-pingTicker.C:
604                                 if now.After(state.TxLastSeen.Add(PingTimeout)) {
605                                         state.wg.Add(1)
606                                         go func() {
607                                                 state.pings <- struct{}{}
608                                                 state.wg.Done()
609                                         }()
610                                 }
611                         }
612                 }
613         }()
614
615         // Spool checker and INFOs sender of appearing files
616         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
617                 state.wg.Add(1)
618                 go func() {
619                         ticker := time.NewTicker(time.Second)
620                         for {
621                                 select {
622                                 case <-state.isDead:
623                                         state.wg.Done()
624                                         ticker.Stop()
625                                         return
626                                 case <-ticker.C:
627                                         for _, payload := range state.Ctx.infosOur(
628                                                 state.Node.Id,
629                                                 state.Nice,
630                                                 &state.infosOurSeen,
631                                         ) {
632                                                 state.Ctx.LogD(
633                                                         "sp-work",
634                                                         append(les, LE{"Size", len(payload)}),
635                                                         "queuing new info",
636                                                 )
637                                                 state.payloads <- payload
638                                         }
639                                 }
640                         }
641                 }()
642         }
643
644         // Sender
645         state.wg.Add(1)
646         go func() {
647                 defer conn.Close()
648                 defer state.SetDead()
649                 defer state.wg.Done()
650                 for {
651                         if state.NotAlive() {
652                                 return
653                         }
654                         var payload []byte
655                         var ping bool
656                         select {
657                         case <-state.pings:
658                                 state.Ctx.LogD("sp-xmit", les, "got ping")
659                                 payload = SPPingMarshalized
660                                 ping = true
661                         case payload = <-state.payloads:
662                                 state.Ctx.LogD(
663                                         "sp-xmit",
664                                         append(les, LE{"Size", len(payload)}),
665                                         "got payload",
666                                 )
667                         default:
668                                 state.RLock()
669                                 if len(state.queueTheir) == 0 {
670                                         state.RUnlock()
671                                         time.Sleep(100 * time.Millisecond)
672                                         continue
673                                 }
674                                 freq := state.queueTheir[0].freq
675                                 state.RUnlock()
676                                 if state.txRate > 0 {
677                                         time.Sleep(time.Second / time.Duration(state.txRate))
678                                 }
679                                 lesp := append(les, LEs{
680                                         {"XX", string(TTx)},
681                                         {"Pkt", Base32Codec.EncodeToString(freq.Hash[:])},
682                                         {"Size", int64(freq.Offset)},
683                                 }...)
684                                 state.Ctx.LogD("sp-file", lesp, "queueing")
685                                 fd, err := os.Open(filepath.Join(
686                                         state.Ctx.Spool,
687                                         state.Node.Id.String(),
688                                         string(TTx),
689                                         Base32Codec.EncodeToString(freq.Hash[:]),
690                                 ))
691                                 if err != nil {
692                                         state.Ctx.LogE("sp-file", lesp, err, "")
693                                         return
694                                 }
695                                 fi, err := fd.Stat()
696                                 if err != nil {
697                                         state.Ctx.LogE("sp-file", lesp, err, "")
698                                         return
699                                 }
700                                 fullSize := fi.Size()
701                                 var buf []byte
702                                 if freq.Offset < uint64(fullSize) {
703                                         state.Ctx.LogD("sp-file", lesp, "seeking")
704                                         if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil {
705                                                 state.Ctx.LogE("sp-file", lesp, err, "")
706                                                 return
707                                         }
708                                         buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
709                                         n, err := fd.Read(buf)
710                                         if err != nil {
711                                                 state.Ctx.LogE("sp-file", lesp, err, "")
712                                                 return
713                                         }
714                                         buf = buf[:n]
715                                         state.Ctx.LogD("sp-file", append(lesp, LE{"Size", n}), "read")
716                                 }
717                                 fd.Close() // #nosec G104
718                                 payload = MarshalSP(SPTypeFile, SPFile{
719                                         Hash:    freq.Hash,
720                                         Offset:  freq.Offset,
721                                         Payload: buf,
722                                 })
723                                 ourSize := freq.Offset + uint64(len(buf))
724                                 lesp = append(lesp, LE{"Size", int64(ourSize)})
725                                 lesp = append(lesp, LE{"FullSize", fullSize})
726                                 if state.Ctx.ShowPrgrs {
727                                         Progress("Tx", lesp)
728                                 }
729                                 state.Lock()
730                                 if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash {
731                                         if ourSize == uint64(fullSize) {
732                                                 state.Ctx.LogD("sp-file", lesp, "finished")
733                                                 if len(state.queueTheir) > 1 {
734                                                         state.queueTheir = state.queueTheir[1:]
735                                                 } else {
736                                                         state.queueTheir = state.queueTheir[:0]
737                                                 }
738                                         } else {
739                                                 state.queueTheir[0].freq.Offset += uint64(len(buf))
740                                         }
741                                 } else {
742                                         state.Ctx.LogD("sp-file", lesp, "queue disappeared")
743                                 }
744                                 state.Unlock()
745                         }
746                         state.Ctx.LogD("sp-xmit", append(les, LE{"Size", len(payload)}), "sending")
747                         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
748                         if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload), ping); err != nil {
749                                 state.Ctx.LogE("sp-xmit", les, err, "")
750                                 return
751                         }
752                 }
753         }()
754
755         // Receiver
756         state.wg.Add(1)
757         go func() {
758                 for {
759                         if state.NotAlive() {
760                                 break
761                         }
762                         state.Ctx.LogD("sp-recv", les, "waiting for payload")
763                         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
764                         payload, err := state.ReadSP(conn)
765                         if err != nil {
766                                 if err == io.EOF {
767                                         break
768                                 }
769                                 unmarshalErr := err.(*xdr.UnmarshalError)
770                                 if os.IsTimeout(unmarshalErr.Err) {
771                                         continue
772                                 }
773                                 if unmarshalErr.ErrorCode == xdr.ErrIO {
774                                         break
775                                 }
776                                 state.Ctx.LogE("sp-recv", les, err, "")
777                                 break
778                         }
779                         state.Ctx.LogD(
780                                 "sp-recv",
781                                 append(les, LE{"Size", len(payload)}),
782                                 "got payload",
783                         )
784                         payload, err = state.csTheir.Decrypt(nil, nil, payload)
785                         if err != nil {
786                                 state.Ctx.LogE("sp-recv", les, err, "")
787                                 break
788                         }
789                         state.Ctx.LogD(
790                                 "sp-recv",
791                                 append(les, LE{"Size", len(payload)}),
792                                 "processing",
793                         )
794                         replies, err := state.ProcessSP(payload)
795                         if err != nil {
796                                 state.Ctx.LogE("sp-recv", les, err, "")
797                                 break
798                         }
799                         state.wg.Add(1)
800                         go func() {
801                                 for _, reply := range replies {
802                                         state.Ctx.LogD(
803                                                 "sp-recv",
804                                                 append(les, LE{"Size", len(reply)}),
805                                                 "queuing reply",
806                                         )
807                                         state.payloads <- reply
808                                 }
809                                 state.wg.Done()
810                         }()
811                         if state.rxRate > 0 {
812                                 time.Sleep(time.Second / time.Duration(state.rxRate))
813                         }
814                 }
815                 state.SetDead()
816                 state.wg.Done()
817                 state.SetDead()
818                 conn.Close() // #nosec G104
819         }()
820
821         return nil
822 }
823
824 func (state *SPState) Wait() {
825         state.wg.Wait()
826         close(state.payloads)
827         close(state.pings)
828         state.dirUnlock()
829         state.Duration = time.Now().Sub(state.started)
830         state.RxSpeed = state.RxBytes
831         state.TxSpeed = state.TxBytes
832         rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
833         txDuration := int64(state.TxLastSeen.Sub(state.started).Seconds())
834         if rxDuration > 0 {
835                 state.RxSpeed = state.RxBytes / rxDuration
836         }
837         if txDuration > 0 {
838                 state.TxSpeed = state.TxBytes / txDuration
839         }
840 }
841
842 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
843         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
844         r := bytes.NewReader(payload)
845         var err error
846         var replies [][]byte
847         var infosGot bool
848         for r.Len() > 0 {
849                 state.Ctx.LogD("sp-process", les, "unmarshaling header")
850                 var head SPHead
851                 if _, err = xdr.Unmarshal(r, &head); err != nil {
852                         state.Ctx.LogE("sp-process", les, err, "")
853                         return nil, err
854                 }
855                 if head.Type != SPTypePing {
856                         state.RxLastNonPing = state.RxLastSeen
857                 }
858                 switch head.Type {
859                 case SPTypeHalt:
860                         state.Ctx.LogD("sp-process", append(les, LE{"Type", "halt"}), "")
861                         state.Lock()
862                         state.queueTheir = nil
863                         state.Unlock()
864                 case SPTypePing:
865                         state.Ctx.LogD("sp-process", append(les, LE{"Type", "ping"}), "")
866                 case SPTypeInfo:
867                         infosGot = true
868                         lesp := append(les, LE{"Type", "info"})
869                         state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
870                         var info SPInfo
871                         if _, err = xdr.Unmarshal(r, &info); err != nil {
872                                 state.Ctx.LogE("sp-process", lesp, err, "")
873                                 return nil, err
874                         }
875                         lesp = append(lesp, LEs{
876                                 {"Pkt", Base32Codec.EncodeToString(info.Hash[:])},
877                                 {"Size", int64(info.Size)},
878                                 {"Nice", int(info.Nice)},
879                         }...)
880                         if !state.listOnly && info.Nice > state.Nice {
881                                 state.Ctx.LogD("sp-process", lesp, "too nice")
882                                 continue
883                         }
884                         state.Ctx.LogD("sp-process", lesp, "received")
885                         if !state.listOnly && state.xxOnly == TTx {
886                                 continue
887                         }
888                         state.Lock()
889                         state.infosTheir[*info.Hash] = &info
890                         state.Unlock()
891                         state.Ctx.LogD("sp-process", lesp, "stating part")
892                         pktPath := filepath.Join(
893                                 state.Ctx.Spool,
894                                 state.Node.Id.String(),
895                                 string(TRx),
896                                 Base32Codec.EncodeToString(info.Hash[:]),
897                         )
898                         if _, err = os.Stat(pktPath); err == nil {
899                                 state.Ctx.LogI("sp-info", lesp, "already done")
900                                 if !state.listOnly {
901                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
902                                 }
903                                 continue
904                         }
905                         if _, err = os.Stat(pktPath + SeenSuffix); err == nil {
906                                 state.Ctx.LogI("sp-info", lesp, "already seen")
907                                 if !state.listOnly {
908                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
909                                 }
910                                 continue
911                         }
912                         fi, err := os.Stat(pktPath + PartSuffix)
913                         var offset int64
914                         if err == nil {
915                                 offset = fi.Size()
916                         }
917                         if !state.Ctx.IsEnoughSpace(int64(info.Size) - offset) {
918                                 state.Ctx.LogI("sp-info", lesp, "not enough space")
919                                 continue
920                         }
921                         state.Ctx.LogI("sp-info", append(lesp, LE{"Offset", offset}), "")
922                         if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) {
923                                 replies = append(replies, MarshalSP(
924                                         SPTypeFreq,
925                                         SPFreq{info.Hash, uint64(offset)},
926                                 ))
927                         }
928                 case SPTypeFile:
929                         lesp := append(les, LE{"Type", "file"})
930                         state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
931                         var file SPFile
932                         if _, err = xdr.Unmarshal(r, &file); err != nil {
933                                 state.Ctx.LogE("sp-process", lesp, err, "")
934                                 return nil, err
935                         }
936                         lesp = append(lesp, LEs{
937                                 {"XX", string(TRx)},
938                                 {"Pkt", Base32Codec.EncodeToString(file.Hash[:])},
939                                 {"Size", len(file.Payload)},
940                         }...)
941                         dirToSync := filepath.Join(
942                                 state.Ctx.Spool,
943                                 state.Node.Id.String(),
944                                 string(TRx),
945                         )
946                         filePath := filepath.Join(dirToSync, Base32Codec.EncodeToString(file.Hash[:]))
947                         state.Ctx.LogD("sp-file", lesp, "opening part")
948                         fd, err := os.OpenFile(
949                                 filePath+PartSuffix,
950                                 os.O_RDWR|os.O_CREATE,
951                                 os.FileMode(0666),
952                         )
953                         if err != nil {
954                                 state.Ctx.LogE("sp-file", lesp, err, "")
955                                 return nil, err
956                         }
957                         state.Ctx.LogD("sp-file", append(lesp, LE{"Offset", file.Offset}), "seeking")
958                         if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
959                                 state.Ctx.LogE("sp-file", lesp, err, "")
960                                 fd.Close() // #nosec G104
961                                 return nil, err
962                         }
963                         state.Ctx.LogD("sp-file", lesp, "writing")
964                         _, err = fd.Write(file.Payload)
965                         if err != nil {
966                                 state.Ctx.LogE("sp-file", lesp, err, "")
967                                 fd.Close() // #nosec G104
968                                 return nil, err
969                         }
970                         ourSize := int64(file.Offset + uint64(len(file.Payload)))
971                         lesp[len(lesp)-1].V = ourSize
972                         fullsize := int64(0)
973                         state.RLock()
974                         infoTheir, ok := state.infosTheir[*file.Hash]
975                         state.RUnlock()
976                         if ok {
977                                 fullsize = int64(infoTheir.Size)
978                         }
979                         lesp = append(lesp, LE{"FullSize", fullsize})
980                         if state.Ctx.ShowPrgrs {
981                                 Progress("Rx", lesp)
982                         }
983                         if fullsize != ourSize {
984                                 fd.Close() // #nosec G104
985                                 continue
986                         }
987                         <-spCheckerToken
988                         go func() {
989                                 defer func() {
990                                         spCheckerToken <- struct{}{}
991                                 }()
992                                 if err := fd.Sync(); err != nil {
993                                         state.Ctx.LogE("sp-file", lesp, err, "sync")
994                                         fd.Close() // #nosec G104
995                                         return
996                                 }
997                                 state.wg.Add(1)
998                                 defer state.wg.Done()
999                                 if _, err = fd.Seek(0, io.SeekStart); err != nil {
1000                                         fd.Close() // #nosec G104
1001                                         state.Ctx.LogE("sp-file", lesp, err, "")
1002                                         return
1003                                 }
1004                                 state.Ctx.LogD("sp-file", lesp, "checking")
1005                                 gut, err := Check(fd, file.Hash[:], lesp, state.Ctx.ShowPrgrs)
1006                                 fd.Close() // #nosec G104
1007                                 if err != nil || !gut {
1008                                         state.Ctx.LogE("sp-file", lesp, errors.New("checksum mismatch"), "")
1009                                         return
1010                                 }
1011                                 state.Ctx.LogI("sp-done", lesp, "")
1012                                 if err = os.Rename(filePath+PartSuffix, filePath); err != nil {
1013                                         state.Ctx.LogE("sp-file", lesp, err, "rename")
1014                                         return
1015                                 }
1016                                 if err = DirSync(dirToSync); err != nil {
1017                                         state.Ctx.LogE("sp-file", lesp, err, "sync")
1018                                         return
1019                                 }
1020                                 state.Lock()
1021                                 delete(state.infosTheir, *file.Hash)
1022                                 state.Unlock()
1023                                 state.wg.Add(1)
1024                                 go func() {
1025                                         state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
1026                                         state.wg.Done()
1027                                 }()
1028                         }()
1029                 case SPTypeDone:
1030                         lesp := append(les, LE{"Type", "done"})
1031                         state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
1032                         var done SPDone
1033                         if _, err = xdr.Unmarshal(r, &done); err != nil {
1034                                 state.Ctx.LogE("sp-process", lesp, err, "")
1035                                 return nil, err
1036                         }
1037                         lesp = append(lesp, LE{"Pkt", Base32Codec.EncodeToString(done.Hash[:])})
1038                         state.Ctx.LogD("sp-done", lesp, "removing")
1039                         err := os.Remove(filepath.Join(
1040                                 state.Ctx.Spool,
1041                                 state.Node.Id.String(),
1042                                 string(TTx),
1043                                 Base32Codec.EncodeToString(done.Hash[:]),
1044                         ))
1045                         lesp = append(lesp, LE{"XX", string(TTx)})
1046                         if err == nil {
1047                                 state.Ctx.LogI("sp-done", lesp, "")
1048                         } else {
1049                                 state.Ctx.LogE("sp-done", lesp, err, "")
1050                         }
1051                 case SPTypeFreq:
1052                         lesp := append(les, LE{"Type", "freq"})
1053                         state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
1054                         var freq SPFreq
1055                         if _, err = xdr.Unmarshal(r, &freq); err != nil {
1056                                 state.Ctx.LogE("sp-process", lesp, err, "")
1057                                 return nil, err
1058                         }
1059                         lesp = append(lesp, LE{"Pkt", Base32Codec.EncodeToString(freq.Hash[:])})
1060                         lesp = append(lesp, LE{"Offset", freq.Offset})
1061                         state.Ctx.LogD("sp-process", lesp, "queueing")
1062                         nice, exists := state.infosOurSeen[*freq.Hash]
1063                         if exists {
1064                                 if state.onlyPkts == nil || !state.onlyPkts[*freq.Hash] {
1065                                         state.Lock()
1066                                         insertIdx := 0
1067                                         var freqWithNice *FreqWithNice
1068                                         for insertIdx, freqWithNice = range state.queueTheir {
1069                                                 if freqWithNice.nice > nice {
1070                                                         break
1071                                                 }
1072                                         }
1073                                         state.queueTheir = append(state.queueTheir, nil)
1074                                         copy(state.queueTheir[insertIdx+1:], state.queueTheir[insertIdx:])
1075                                         state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice}
1076                                         state.Unlock()
1077                                 } else {
1078                                         state.Ctx.LogD("sp-process", lesp, "skipping")
1079                                 }
1080                         } else {
1081                                 state.Ctx.LogD("sp-process", lesp, "unknown")
1082                         }
1083                 default:
1084                         state.Ctx.LogE(
1085                                 "sp-process",
1086                                 append(les, LE{"Type", head.Type}),
1087                                 errors.New("unknown type"),
1088                                 "",
1089                         )
1090                         return nil, BadPktType
1091                 }
1092         }
1093         if infosGot {
1094                 var pkts int
1095                 var size uint64
1096                 state.RLock()
1097                 for _, info := range state.infosTheir {
1098                         pkts++
1099                         size += info.Size
1100                 }
1101                 state.RUnlock()
1102                 state.Ctx.LogI("sp-infos", LEs{
1103                         {"XX", string(TRx)},
1104                         {"Node", state.Node.Id},
1105                         {"Pkts", pkts},
1106                         {"Size", int64(size)},
1107                 }, "")
1108         }
1109         return payloadsSplit(replies), nil
1110 }