]> Cypherpunks.ru repositories - nncp.git/blob - src/sp.go
recfile log format
[nncp.git] / src / sp.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bytes"
22         "crypto/subtle"
23         "errors"
24         "io"
25         "os"
26         "path/filepath"
27         "sort"
28         "sync"
29         "time"
30
31         xdr "github.com/davecgh/go-xdr/xdr2"
32         "github.com/flynn/noise"
33 )
34
35 const (
36         MaxSPSize      = 1<<16 - 256
37         PartSuffix     = ".part"
38         SPHeadOverhead = 4
39 )
40
41 var (
42         MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1}
43
44         SPInfoOverhead    int
45         SPFreqOverhead    int
46         SPFileOverhead    int
47         SPHaltMarshalized []byte
48         SPPingMarshalized []byte
49
50         NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite(
51                 noise.DH25519,
52                 noise.CipherChaChaPoly,
53                 noise.HashBLAKE2b,
54         )
55
56         DefaultDeadline = 10 * time.Second
57         PingTimeout     = time.Minute
58
59         spCheckerToken chan struct{}
60 )
61
62 type SPType uint8
63
64 const (
65         SPTypeInfo SPType = iota
66         SPTypeFreq SPType = iota
67         SPTypeFile SPType = iota
68         SPTypeDone SPType = iota
69         SPTypeHalt SPType = iota
70         SPTypePing SPType = iota
71 )
72
73 type SPHead struct {
74         Type SPType
75 }
76
77 type SPInfo struct {
78         Nice uint8
79         Size uint64
80         Hash *[32]byte
81 }
82
83 type SPFreq struct {
84         Hash   *[32]byte
85         Offset uint64
86 }
87
88 type SPFile struct {
89         Hash    *[32]byte
90         Offset  uint64
91         Payload []byte
92 }
93
94 type SPDone struct {
95         Hash *[32]byte
96 }
97
98 type SPRaw struct {
99         Magic   [8]byte
100         Payload []byte
101 }
102
103 type FreqWithNice struct {
104         freq *SPFreq
105         nice uint8
106 }
107
108 type ConnDeadlined interface {
109         io.ReadWriteCloser
110         SetReadDeadline(t time.Time) error
111         SetWriteDeadline(t time.Time) error
112 }
113
114 func init() {
115         var buf bytes.Buffer
116         spHead := SPHead{Type: SPTypeHalt}
117         if _, err := xdr.Marshal(&buf, spHead); err != nil {
118                 panic(err)
119         }
120         SPHaltMarshalized = make([]byte, SPHeadOverhead)
121         copy(SPHaltMarshalized, buf.Bytes())
122         buf.Reset()
123
124         spHead = SPHead{Type: SPTypePing}
125         if _, err := xdr.Marshal(&buf, spHead); err != nil {
126                 panic(err)
127         }
128         SPPingMarshalized = make([]byte, SPHeadOverhead)
129         copy(SPPingMarshalized, buf.Bytes())
130         buf.Reset()
131
132         spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([32]byte)}
133         if _, err := xdr.Marshal(&buf, spInfo); err != nil {
134                 panic(err)
135         }
136         SPInfoOverhead = buf.Len()
137         buf.Reset()
138
139         spFreq := SPFreq{Hash: new([32]byte), Offset: 123}
140         if _, err := xdr.Marshal(&buf, spFreq); err != nil {
141                 panic(err)
142         }
143         SPFreqOverhead = buf.Len()
144         buf.Reset()
145
146         spFile := SPFile{Hash: new([32]byte), Offset: 123}
147         if _, err := xdr.Marshal(&buf, spFile); err != nil {
148                 panic(err)
149         }
150         SPFileOverhead = buf.Len()
151         spCheckerToken = make(chan struct{}, 1)
152         spCheckerToken <- struct{}{}
153 }
154
155 func MarshalSP(typ SPType, sp interface{}) []byte {
156         var buf bytes.Buffer
157         if _, err := xdr.Marshal(&buf, SPHead{typ}); err != nil {
158                 panic(err)
159         }
160         if _, err := xdr.Marshal(&buf, sp); err != nil {
161                 panic(err)
162         }
163         return buf.Bytes()
164 }
165
166 func payloadsSplit(payloads [][]byte) [][]byte {
167         var outbounds [][]byte
168         outbound := make([]byte, 0, MaxSPSize)
169         for i, payload := range payloads {
170                 outbound = append(outbound, payload...)
171                 if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxSPSize {
172                         outbounds = append(outbounds, outbound)
173                         outbound = make([]byte, 0, MaxSPSize)
174                 }
175         }
176         if len(outbound) > 0 {
177                 outbounds = append(outbounds, outbound)
178         }
179         return outbounds
180 }
181
182 type SPState struct {
183         Ctx            *Ctx
184         Node           *Node
185         Nice           uint8
186         onlineDeadline time.Duration
187         maxOnlineTime  time.Duration
188         hs             *noise.HandshakeState
189         csOur          *noise.CipherState
190         csTheir        *noise.CipherState
191         payloads       chan []byte
192         pings          chan struct{}
193         infosTheir     map[[32]byte]*SPInfo
194         infosOurSeen   map[[32]byte]uint8
195         queueTheir     []*FreqWithNice
196         wg             sync.WaitGroup
197         RxBytes        int64
198         RxLastSeen     time.Time
199         RxLastNonPing  time.Time
200         TxBytes        int64
201         TxLastSeen     time.Time
202         TxLastNonPing  time.Time
203         started        time.Time
204         mustFinishAt   time.Time
205         Duration       time.Duration
206         RxSpeed        int64
207         TxSpeed        int64
208         rxLock         *os.File
209         txLock         *os.File
210         xxOnly         TRxTx
211         rxRate         int
212         txRate         int
213         isDead         chan struct{}
214         listOnly       bool
215         onlyPkts       map[[32]byte]bool
216         writeSPBuf     bytes.Buffer
217         sync.RWMutex
218 }
219
220 func (state *SPState) SetDead() {
221         state.Lock()
222         defer state.Unlock()
223         select {
224         case <-state.isDead:
225                 // Already closed channel, dead
226                 return
227         default:
228         }
229         close(state.isDead)
230         go func() {
231                 for range state.payloads {
232                 }
233         }()
234         go func() {
235                 for range state.pings {
236                 }
237         }()
238 }
239
240 func (state *SPState) NotAlive() bool {
241         select {
242         case <-state.isDead:
243                 return true
244         default:
245         }
246         return false
247 }
248
249 func (state *SPState) dirUnlock() {
250         state.Ctx.UnlockDir(state.rxLock)
251         state.Ctx.UnlockDir(state.txLock)
252 }
253
254 func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error {
255         state.writeSPBuf.Reset()
256         n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{
257                 Magic:   MagicNNCPLv1,
258                 Payload: payload,
259         })
260         if err != nil {
261                 return err
262         }
263         if n, err = dst.Write(state.writeSPBuf.Bytes()); err == nil {
264                 state.TxLastSeen = time.Now()
265                 state.TxBytes += int64(n)
266                 if !ping {
267                         state.TxLastNonPing = state.TxLastSeen
268                 }
269         }
270         return err
271 }
272
273 func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
274         var sp SPRaw
275         n, err := xdr.UnmarshalLimited(src, &sp, 1<<17)
276         if err != nil {
277                 ue := err.(*xdr.UnmarshalError)
278                 if ue.Err == io.EOF {
279                         return nil, ue.Err
280                 }
281                 return nil, err
282         }
283         state.RxLastSeen = time.Now()
284         state.RxBytes += int64(n)
285         if sp.Magic != MagicNNCPLv1 {
286                 return nil, BadMagic
287         }
288         return sp.Payload, nil
289 }
290
291 func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [][]byte {
292         var infos []*SPInfo
293         var totalSize int64
294         for job := range ctx.Jobs(nodeId, TTx) {
295                 job.Fd.Close() // #nosec G104
296                 if job.PktEnc.Nice > nice {
297                         continue
298                 }
299                 if _, known := (*seen)[*job.HshValue]; known {
300                         continue
301                 }
302                 totalSize += job.Size
303                 infos = append(infos, &SPInfo{
304                         Nice: job.PktEnc.Nice,
305                         Size: uint64(job.Size),
306                         Hash: job.HshValue,
307                 })
308                 (*seen)[*job.HshValue] = job.PktEnc.Nice
309         }
310         sort.Sort(ByNice(infos))
311         var payloads [][]byte
312         for _, info := range infos {
313                 payloads = append(payloads, MarshalSP(SPTypeInfo, info))
314                 ctx.LogD("sp-info-our", LEs{
315                         {"Node", nodeId},
316                         {"Name", Base32Codec.EncodeToString(info.Hash[:])},
317                         {"Size", info.Size},
318                 }, "")
319         }
320         if totalSize > 0 {
321                 ctx.LogI("sp-infos", LEs{
322                         {"XX", string(TTx)},
323                         {"Node", nodeId},
324                         {"Pkts", len(payloads)},
325                         {"Size", totalSize},
326                 }, "")
327         }
328         return payloadsSplit(payloads)
329 }
330
331 func (state *SPState) StartI(conn ConnDeadlined) error {
332         nodeId := state.Node.Id
333         err := state.Ctx.ensureRxDir(nodeId)
334         if err != nil {
335                 return err
336         }
337         var rxLock *os.File
338         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TRx) {
339                 rxLock, err = state.Ctx.LockDir(nodeId, string(TRx))
340                 if err != nil {
341                         return err
342                 }
343         }
344         var txLock *os.File
345         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
346                 txLock, err = state.Ctx.LockDir(nodeId, string(TTx))
347                 if err != nil {
348                         return err
349                 }
350         }
351         started := time.Now()
352         conf := noise.Config{
353                 CipherSuite: NoiseCipherSuite,
354                 Pattern:     noise.HandshakeIK,
355                 Initiator:   true,
356                 StaticKeypair: noise.DHKey{
357                         Private: state.Ctx.Self.NoisePrv[:],
358                         Public:  state.Ctx.Self.NoisePub[:],
359                 },
360                 PeerStatic: state.Node.NoisePub[:],
361         }
362         hs, err := noise.NewHandshakeState(conf)
363         if err != nil {
364                 return err
365         }
366         state.hs = hs
367         state.payloads = make(chan []byte)
368         state.pings = make(chan struct{})
369         state.infosTheir = make(map[[32]byte]*SPInfo)
370         state.infosOurSeen = make(map[[32]byte]uint8)
371         state.started = started
372         state.rxLock = rxLock
373         state.txLock = txLock
374
375         var infosPayloads [][]byte
376         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
377                 infosPayloads = state.Ctx.infosOur(nodeId, state.Nice, &state.infosOurSeen)
378         }
379         var firstPayload []byte
380         if len(infosPayloads) > 0 {
381                 firstPayload = infosPayloads[0]
382         }
383         // Pad first payload, to hide actual number of existing files
384         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
385                 firstPayload = append(firstPayload, SPHaltMarshalized...)
386         }
387
388         var buf []byte
389         var payload []byte
390         buf, _, _, err = state.hs.WriteMessage(nil, firstPayload)
391         if err != nil {
392                 state.dirUnlock()
393                 return err
394         }
395         les := LEs{{"Node", nodeId}, {"Nice", int(state.Nice)}}
396         state.Ctx.LogD("sp-start", les, "sending first message")
397         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
398         if err = state.WriteSP(conn, buf, false); err != nil {
399                 state.Ctx.LogE("sp-start", les, err, "")
400                 state.dirUnlock()
401                 return err
402         }
403         state.Ctx.LogD("sp-start", les, "waiting for first message")
404         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
405         if buf, err = state.ReadSP(conn); err != nil {
406                 state.Ctx.LogE("sp-start", les, err, "")
407                 state.dirUnlock()
408                 return err
409         }
410         payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
411         if err != nil {
412                 state.Ctx.LogE("sp-start", les, err, "")
413                 state.dirUnlock()
414                 return err
415         }
416         state.Ctx.LogD("sp-start", les, "starting workers")
417         err = state.StartWorkers(conn, infosPayloads, payload)
418         if err != nil {
419                 state.Ctx.LogE("sp-start", les, err, "")
420                 state.dirUnlock()
421         }
422         return err
423 }
424
425 func (state *SPState) StartR(conn ConnDeadlined) error {
426         started := time.Now()
427         conf := noise.Config{
428                 CipherSuite: NoiseCipherSuite,
429                 Pattern:     noise.HandshakeIK,
430                 Initiator:   false,
431                 StaticKeypair: noise.DHKey{
432                         Private: state.Ctx.Self.NoisePrv[:],
433                         Public:  state.Ctx.Self.NoisePub[:],
434                 },
435         }
436         hs, err := noise.NewHandshakeState(conf)
437         if err != nil {
438                 return err
439         }
440         xxOnly := TRxTx("")
441         state.hs = hs
442         state.payloads = make(chan []byte)
443         state.pings = make(chan struct{})
444         state.infosOurSeen = make(map[[32]byte]uint8)
445         state.infosTheir = make(map[[32]byte]*SPInfo)
446         state.started = started
447         state.xxOnly = xxOnly
448         var buf []byte
449         var payload []byte
450         state.Ctx.LogD("sp-start", LEs{{"Nice", int(state.Nice)}}, "waiting for first message")
451         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
452         if buf, err = state.ReadSP(conn); err != nil {
453                 state.Ctx.LogE("sp-start", LEs{}, err, "")
454                 return err
455         }
456         if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
457                 state.Ctx.LogE("sp-start", LEs{}, err, "")
458                 return err
459         }
460
461         var node *Node
462         for _, n := range state.Ctx.Neigh {
463                 if subtle.ConstantTimeCompare(state.hs.PeerStatic(), n.NoisePub[:]) == 1 {
464                         node = n
465                         break
466                 }
467         }
468         if node == nil {
469                 peerId := Base32Codec.EncodeToString(state.hs.PeerStatic())
470                 state.Ctx.LogE("sp-start", LEs{{"Peer", peerId}}, errors.New("unknown peer"), "")
471                 return errors.New("Unknown peer: " + peerId)
472         }
473         state.Node = node
474         state.rxRate = node.RxRate
475         state.txRate = node.TxRate
476         state.onlineDeadline = node.OnlineDeadline
477         state.maxOnlineTime = node.MaxOnlineTime
478         les := LEs{{"Node", node.Id}, {"Nice", int(state.Nice)}}
479
480         if err = state.Ctx.ensureRxDir(node.Id); err != nil {
481                 return err
482         }
483         var rxLock *os.File
484         if xxOnly == "" || xxOnly == TRx {
485                 rxLock, err = state.Ctx.LockDir(node.Id, string(TRx))
486                 if err != nil {
487                         return err
488                 }
489         }
490         state.rxLock = rxLock
491         var txLock *os.File
492         if xxOnly == "" || xxOnly == TTx {
493                 txLock, err = state.Ctx.LockDir(node.Id, string(TTx))
494                 if err != nil {
495                         return err
496                 }
497         }
498         state.txLock = txLock
499
500         var infosPayloads [][]byte
501         if xxOnly == "" || xxOnly == TTx {
502                 infosPayloads = state.Ctx.infosOur(node.Id, state.Nice, &state.infosOurSeen)
503         }
504         var firstPayload []byte
505         if len(infosPayloads) > 0 {
506                 firstPayload = infosPayloads[0]
507         }
508         // Pad first payload, to hide actual number of existing files
509         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
510                 firstPayload = append(firstPayload, SPHaltMarshalized...)
511         }
512
513         state.Ctx.LogD("sp-start", les, "sending first message")
514         buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload)
515         if err != nil {
516                 state.dirUnlock()
517                 return err
518         }
519         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
520         if err = state.WriteSP(conn, buf, false); err != nil {
521                 state.Ctx.LogE("sp-start", les, err, "")
522                 state.dirUnlock()
523                 return err
524         }
525         state.Ctx.LogD("sp-start", les, "starting workers")
526         err = state.StartWorkers(conn, infosPayloads, payload)
527         if err != nil {
528                 state.dirUnlock()
529         }
530         return err
531 }
532
533 func (state *SPState) StartWorkers(
534         conn ConnDeadlined,
535         infosPayloads [][]byte,
536         payload []byte,
537 ) error {
538         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
539         state.isDead = make(chan struct{})
540         if state.maxOnlineTime > 0 {
541                 state.mustFinishAt = state.started.Add(state.maxOnlineTime)
542         }
543
544         // Remaining handshake payload sending
545         if len(infosPayloads) > 1 {
546                 state.wg.Add(1)
547                 go func() {
548                         for _, payload := range infosPayloads[1:] {
549                                 state.Ctx.LogD(
550                                         "sp-work",
551                                         append(les, LE{"Size", len(payload)}),
552                                         "queuing remaining payload",
553                                 )
554                                 state.payloads <- payload
555                         }
556                         state.wg.Done()
557                 }()
558         }
559
560         // Processing of first payload and queueing its responses
561         state.Ctx.LogD(
562                 "sp-work",
563                 append(les, LE{"Size", len(payload)}),
564                 "processing first payload",
565         )
566         replies, err := state.ProcessSP(payload)
567         if err != nil {
568                 state.Ctx.LogE("sp-work", les, err, "")
569                 return err
570         }
571         state.wg.Add(1)
572         go func() {
573                 for _, reply := range replies {
574                         state.Ctx.LogD(
575                                 "sp-work",
576                                 append(les, LE{"Size", len(reply)}),
577                                 "queuing reply",
578                         )
579                         state.payloads <- reply
580                 }
581                 state.wg.Done()
582         }()
583
584         // Periodic jobs
585         state.wg.Add(1)
586         go func() {
587                 deadlineTicker := time.NewTicker(time.Second)
588                 pingTicker := time.NewTicker(PingTimeout)
589                 for {
590                         select {
591                         case <-state.isDead:
592                                 state.wg.Done()
593                                 deadlineTicker.Stop()
594                                 pingTicker.Stop()
595                                 return
596                         case now := <-deadlineTicker.C:
597                                 if (now.Sub(state.RxLastNonPing) >= state.onlineDeadline &&
598                                         now.Sub(state.TxLastNonPing) >= state.onlineDeadline) ||
599                                         (state.maxOnlineTime > 0 && state.mustFinishAt.Before(now)) ||
600                                         (now.Sub(state.RxLastSeen) >= 2*PingTimeout) {
601                                         state.SetDead()
602                                         conn.Close() // #nosec G104
603                                 }
604                         case now := <-pingTicker.C:
605                                 if now.After(state.TxLastSeen.Add(PingTimeout)) {
606                                         state.wg.Add(1)
607                                         go func() {
608                                                 state.pings <- struct{}{}
609                                                 state.wg.Done()
610                                         }()
611                                 }
612                         }
613                 }
614         }()
615
616         // Spool checker and INFOs sender of appearing files
617         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
618                 state.wg.Add(1)
619                 go func() {
620                         ticker := time.NewTicker(time.Second)
621                         for {
622                                 select {
623                                 case <-state.isDead:
624                                         state.wg.Done()
625                                         ticker.Stop()
626                                         return
627                                 case <-ticker.C:
628                                         for _, payload := range state.Ctx.infosOur(
629                                                 state.Node.Id,
630                                                 state.Nice,
631                                                 &state.infosOurSeen,
632                                         ) {
633                                                 state.Ctx.LogD(
634                                                         "sp-work",
635                                                         append(les, LE{"Size", len(payload)}),
636                                                         "queuing new info",
637                                                 )
638                                                 state.payloads <- payload
639                                         }
640                                 }
641                         }
642                 }()
643         }
644
645         // Sender
646         state.wg.Add(1)
647         go func() {
648                 defer conn.Close()
649                 defer state.SetDead()
650                 defer state.wg.Done()
651                 for {
652                         if state.NotAlive() {
653                                 return
654                         }
655                         var payload []byte
656                         var ping bool
657                         select {
658                         case <-state.pings:
659                                 state.Ctx.LogD("sp-xmit", les, "got ping")
660                                 payload = SPPingMarshalized
661                                 ping = true
662                         case payload = <-state.payloads:
663                                 state.Ctx.LogD(
664                                         "sp-xmit",
665                                         append(les, LE{"Size", len(payload)}),
666                                         "got payload",
667                                 )
668                         default:
669                                 state.RLock()
670                                 if len(state.queueTheir) == 0 {
671                                         state.RUnlock()
672                                         time.Sleep(100 * time.Millisecond)
673                                         continue
674                                 }
675                                 freq := state.queueTheir[0].freq
676                                 state.RUnlock()
677                                 if state.txRate > 0 {
678                                         time.Sleep(time.Second / time.Duration(state.txRate))
679                                 }
680                                 lesp := append(les, LEs{
681                                         {"XX", string(TTx)},
682                                         {"Pkt", Base32Codec.EncodeToString(freq.Hash[:])},
683                                         {"Size", int64(freq.Offset)},
684                                 }...)
685                                 state.Ctx.LogD("sp-file", lesp, "queueing")
686                                 fd, err := os.Open(filepath.Join(
687                                         state.Ctx.Spool,
688                                         state.Node.Id.String(),
689                                         string(TTx),
690                                         Base32Codec.EncodeToString(freq.Hash[:]),
691                                 ))
692                                 if err != nil {
693                                         state.Ctx.LogE("sp-file", lesp, err, "")
694                                         return
695                                 }
696                                 fi, err := fd.Stat()
697                                 if err != nil {
698                                         state.Ctx.LogE("sp-file", lesp, err, "")
699                                         return
700                                 }
701                                 fullSize := fi.Size()
702                                 var buf []byte
703                                 if freq.Offset < uint64(fullSize) {
704                                         state.Ctx.LogD("sp-file", lesp, "seeking")
705                                         if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil {
706                                                 state.Ctx.LogE("sp-file", lesp, err, "")
707                                                 return
708                                         }
709                                         buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
710                                         n, err := fd.Read(buf)
711                                         if err != nil {
712                                                 state.Ctx.LogE("sp-file", lesp, err, "")
713                                                 return
714                                         }
715                                         buf = buf[:n]
716                                         state.Ctx.LogD("sp-file", append(lesp, LE{"Size", n}), "read")
717                                 }
718                                 fd.Close() // #nosec G104
719                                 payload = MarshalSP(SPTypeFile, SPFile{
720                                         Hash:    freq.Hash,
721                                         Offset:  freq.Offset,
722                                         Payload: buf,
723                                 })
724                                 ourSize := freq.Offset + uint64(len(buf))
725                                 lesp = append(lesp, LE{"Size", int64(ourSize)})
726                                 lesp = append(lesp, LE{"FullSize", fullSize})
727                                 if state.Ctx.ShowPrgrs {
728                                         Progress("Tx", lesp)
729                                 }
730                                 state.Lock()
731                                 if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash {
732                                         if ourSize == uint64(fullSize) {
733                                                 state.Ctx.LogD("sp-file", lesp, "finished")
734                                                 if len(state.queueTheir) > 1 {
735                                                         state.queueTheir = state.queueTheir[1:]
736                                                 } else {
737                                                         state.queueTheir = state.queueTheir[:0]
738                                                 }
739                                         } else {
740                                                 state.queueTheir[0].freq.Offset += uint64(len(buf))
741                                         }
742                                 } else {
743                                         state.Ctx.LogD("sp-file", lesp, "queue disappeared")
744                                 }
745                                 state.Unlock()
746                         }
747                         state.Ctx.LogD("sp-xmit", append(les, LE{"Size", len(payload)}), "sending")
748                         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
749                         if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload), ping); err != nil {
750                                 state.Ctx.LogE("sp-xmit", les, err, "")
751                                 return
752                         }
753                 }
754         }()
755
756         // Receiver
757         state.wg.Add(1)
758         go func() {
759                 for {
760                         if state.NotAlive() {
761                                 break
762                         }
763                         state.Ctx.LogD("sp-recv", les, "waiting for payload")
764                         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
765                         payload, err := state.ReadSP(conn)
766                         if err != nil {
767                                 if err == io.EOF {
768                                         break
769                                 }
770                                 unmarshalErr := err.(*xdr.UnmarshalError)
771                                 if os.IsTimeout(unmarshalErr.Err) {
772                                         continue
773                                 }
774                                 if unmarshalErr.ErrorCode == xdr.ErrIO {
775                                         break
776                                 }
777                                 state.Ctx.LogE("sp-recv", les, err, "")
778                                 break
779                         }
780                         state.Ctx.LogD(
781                                 "sp-recv",
782                                 append(les, LE{"Size", len(payload)}),
783                                 "got payload",
784                         )
785                         payload, err = state.csTheir.Decrypt(nil, nil, payload)
786                         if err != nil {
787                                 state.Ctx.LogE("sp-recv", les, err, "")
788                                 break
789                         }
790                         state.Ctx.LogD(
791                                 "sp-recv",
792                                 append(les, LE{"Size", len(payload)}),
793                                 "processing",
794                         )
795                         replies, err := state.ProcessSP(payload)
796                         if err != nil {
797                                 state.Ctx.LogE("sp-recv", les, err, "")
798                                 break
799                         }
800                         state.wg.Add(1)
801                         go func() {
802                                 for _, reply := range replies {
803                                         state.Ctx.LogD(
804                                                 "sp-recv",
805                                                 append(les, LE{"Size", len(reply)}),
806                                                 "queuing reply",
807                                         )
808                                         state.payloads <- reply
809                                 }
810                                 state.wg.Done()
811                         }()
812                         if state.rxRate > 0 {
813                                 time.Sleep(time.Second / time.Duration(state.rxRate))
814                         }
815                 }
816                 state.SetDead()
817                 state.wg.Done()
818                 state.SetDead()
819                 conn.Close() // #nosec G104
820         }()
821
822         return nil
823 }
824
825 func (state *SPState) Wait() {
826         state.wg.Wait()
827         close(state.payloads)
828         close(state.pings)
829         state.dirUnlock()
830         state.Duration = time.Now().Sub(state.started)
831         state.RxSpeed = state.RxBytes
832         state.TxSpeed = state.TxBytes
833         rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
834         txDuration := int64(state.TxLastSeen.Sub(state.started).Seconds())
835         if rxDuration > 0 {
836                 state.RxSpeed = state.RxBytes / rxDuration
837         }
838         if txDuration > 0 {
839                 state.TxSpeed = state.TxBytes / txDuration
840         }
841 }
842
843 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
844         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
845         r := bytes.NewReader(payload)
846         var err error
847         var replies [][]byte
848         var infosGot bool
849         for r.Len() > 0 {
850                 state.Ctx.LogD("sp-process", les, "unmarshaling header")
851                 var head SPHead
852                 if _, err = xdr.Unmarshal(r, &head); err != nil {
853                         state.Ctx.LogE("sp-process", les, err, "")
854                         return nil, err
855                 }
856                 if head.Type != SPTypePing {
857                         state.RxLastNonPing = state.RxLastSeen
858                 }
859                 switch head.Type {
860                 case SPTypeHalt:
861                         state.Ctx.LogD("sp-process", append(les, LE{"Type", "halt"}), "")
862                         state.Lock()
863                         state.queueTheir = nil
864                         state.Unlock()
865                 case SPTypePing:
866                         state.Ctx.LogD("sp-process", append(les, LE{"Type", "ping"}), "")
867                 case SPTypeInfo:
868                         infosGot = true
869                         lesp := append(les, LE{"Type", "info"})
870                         state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
871                         var info SPInfo
872                         if _, err = xdr.Unmarshal(r, &info); err != nil {
873                                 state.Ctx.LogE("sp-process", lesp, err, "")
874                                 return nil, err
875                         }
876                         lesp = append(lesp, LEs{
877                                 {"Pkt", Base32Codec.EncodeToString(info.Hash[:])},
878                                 {"Size", int64(info.Size)},
879                                 {"Nice", int(info.Nice)},
880                         }...)
881                         if !state.listOnly && info.Nice > state.Nice {
882                                 state.Ctx.LogD("sp-process", lesp, "too nice")
883                                 continue
884                         }
885                         state.Ctx.LogD("sp-process", lesp, "received")
886                         if !state.listOnly && state.xxOnly == TTx {
887                                 continue
888                         }
889                         state.Lock()
890                         state.infosTheir[*info.Hash] = &info
891                         state.Unlock()
892                         state.Ctx.LogD("sp-process", lesp, "stating part")
893                         pktPath := filepath.Join(
894                                 state.Ctx.Spool,
895                                 state.Node.Id.String(),
896                                 string(TRx),
897                                 Base32Codec.EncodeToString(info.Hash[:]),
898                         )
899                         if _, err = os.Stat(pktPath); err == nil {
900                                 state.Ctx.LogI("sp-info", lesp, "already done")
901                                 if !state.listOnly {
902                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
903                                 }
904                                 continue
905                         }
906                         if _, err = os.Stat(pktPath + SeenSuffix); err == nil {
907                                 state.Ctx.LogI("sp-info", lesp, "already seen")
908                                 if !state.listOnly {
909                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
910                                 }
911                                 continue
912                         }
913                         fi, err := os.Stat(pktPath + PartSuffix)
914                         var offset int64
915                         if err == nil {
916                                 offset = fi.Size()
917                         }
918                         if !state.Ctx.IsEnoughSpace(int64(info.Size) - offset) {
919                                 state.Ctx.LogI("sp-info", lesp, "not enough space")
920                                 continue
921                         }
922                         state.Ctx.LogI("sp-info", append(lesp, LE{"Offset", offset}), "")
923                         if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) {
924                                 replies = append(replies, MarshalSP(
925                                         SPTypeFreq,
926                                         SPFreq{info.Hash, uint64(offset)},
927                                 ))
928                         }
929                 case SPTypeFile:
930                         lesp := append(les, LE{"Type", "file"})
931                         state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
932                         var file SPFile
933                         if _, err = xdr.Unmarshal(r, &file); err != nil {
934                                 state.Ctx.LogE("sp-process", lesp, err, "")
935                                 return nil, err
936                         }
937                         lesp = append(lesp, LEs{
938                                 {"XX", string(TRx)},
939                                 {"Pkt", Base32Codec.EncodeToString(file.Hash[:])},
940                                 {"Size", len(file.Payload)},
941                         }...)
942                         dirToSync := filepath.Join(
943                                 state.Ctx.Spool,
944                                 state.Node.Id.String(),
945                                 string(TRx),
946                         )
947                         filePath := filepath.Join(dirToSync, Base32Codec.EncodeToString(file.Hash[:]))
948                         state.Ctx.LogD("sp-file", lesp, "opening part")
949                         fd, err := os.OpenFile(
950                                 filePath+PartSuffix,
951                                 os.O_RDWR|os.O_CREATE,
952                                 os.FileMode(0666),
953                         )
954                         if err != nil {
955                                 state.Ctx.LogE("sp-file", lesp, err, "")
956                                 return nil, err
957                         }
958                         state.Ctx.LogD("sp-file", append(lesp, LE{"Offset", file.Offset}), "seeking")
959                         if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
960                                 state.Ctx.LogE("sp-file", lesp, err, "")
961                                 fd.Close() // #nosec G104
962                                 return nil, err
963                         }
964                         state.Ctx.LogD("sp-file", lesp, "writing")
965                         _, err = fd.Write(file.Payload)
966                         if err != nil {
967                                 state.Ctx.LogE("sp-file", lesp, err, "")
968                                 fd.Close() // #nosec G104
969                                 return nil, err
970                         }
971                         ourSize := int64(file.Offset + uint64(len(file.Payload)))
972                         lesp[len(lesp)-1].V = ourSize
973                         fullsize := int64(0)
974                         state.RLock()
975                         infoTheir, ok := state.infosTheir[*file.Hash]
976                         state.RUnlock()
977                         if ok {
978                                 fullsize = int64(infoTheir.Size)
979                         }
980                         lesp = append(lesp, LE{"FullSize", fullsize})
981                         if state.Ctx.ShowPrgrs {
982                                 Progress("Rx", lesp)
983                         }
984                         if fullsize != ourSize {
985                                 fd.Close() // #nosec G104
986                                 continue
987                         }
988                         <-spCheckerToken
989                         go func() {
990                                 defer func() {
991                                         spCheckerToken <- struct{}{}
992                                 }()
993                                 if err := fd.Sync(); err != nil {
994                                         state.Ctx.LogE("sp-file", lesp, err, "sync")
995                                         fd.Close() // #nosec G104
996                                         return
997                                 }
998                                 state.wg.Add(1)
999                                 defer state.wg.Done()
1000                                 if _, err = fd.Seek(0, io.SeekStart); err != nil {
1001                                         fd.Close() // #nosec G104
1002                                         state.Ctx.LogE("sp-file", lesp, err, "")
1003                                         return
1004                                 }
1005                                 state.Ctx.LogD("sp-file", lesp, "checking")
1006                                 gut, err := Check(fd, file.Hash[:], lesp, state.Ctx.ShowPrgrs)
1007                                 fd.Close() // #nosec G104
1008                                 if err != nil || !gut {
1009                                         state.Ctx.LogE("sp-file", lesp, errors.New("checksum mismatch"), "")
1010                                         return
1011                                 }
1012                                 state.Ctx.LogI("sp-done", lesp, "")
1013                                 if err = os.Rename(filePath+PartSuffix, filePath); err != nil {
1014                                         state.Ctx.LogE("sp-file", lesp, err, "rename")
1015                                         return
1016                                 }
1017                                 if err = DirSync(dirToSync); err != nil {
1018                                         state.Ctx.LogE("sp-file", lesp, err, "sync")
1019                                         return
1020                                 }
1021                                 state.Lock()
1022                                 delete(state.infosTheir, *file.Hash)
1023                                 state.Unlock()
1024                                 state.wg.Add(1)
1025                                 go func() {
1026                                         state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
1027                                         state.wg.Done()
1028                                 }()
1029                         }()
1030                 case SPTypeDone:
1031                         lesp := append(les, LE{"Type", "done"})
1032                         state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
1033                         var done SPDone
1034                         if _, err = xdr.Unmarshal(r, &done); err != nil {
1035                                 state.Ctx.LogE("sp-process", lesp, err, "")
1036                                 return nil, err
1037                         }
1038                         lesp = append(lesp, LE{"Pkt", Base32Codec.EncodeToString(done.Hash[:])})
1039                         state.Ctx.LogD("sp-done", lesp, "removing")
1040                         err := os.Remove(filepath.Join(
1041                                 state.Ctx.Spool,
1042                                 state.Node.Id.String(),
1043                                 string(TTx),
1044                                 Base32Codec.EncodeToString(done.Hash[:]),
1045                         ))
1046                         lesp = append(lesp, LE{"XX", string(TTx)})
1047                         if err == nil {
1048                                 state.Ctx.LogI("sp-done", lesp, "")
1049                         } else {
1050                                 state.Ctx.LogE("sp-done", lesp, err, "")
1051                         }
1052                 case SPTypeFreq:
1053                         lesp := append(les, LE{"Type", "freq"})
1054                         state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
1055                         var freq SPFreq
1056                         if _, err = xdr.Unmarshal(r, &freq); err != nil {
1057                                 state.Ctx.LogE("sp-process", lesp, err, "")
1058                                 return nil, err
1059                         }
1060                         lesp = append(lesp, LE{"Pkt", Base32Codec.EncodeToString(freq.Hash[:])})
1061                         lesp = append(lesp, LE{"Offset", freq.Offset})
1062                         state.Ctx.LogD("sp-process", lesp, "queueing")
1063                         nice, exists := state.infosOurSeen[*freq.Hash]
1064                         if exists {
1065                                 if state.onlyPkts == nil || !state.onlyPkts[*freq.Hash] {
1066                                         state.Lock()
1067                                         insertIdx := 0
1068                                         var freqWithNice *FreqWithNice
1069                                         for insertIdx, freqWithNice = range state.queueTheir {
1070                                                 if freqWithNice.nice > nice {
1071                                                         break
1072                                                 }
1073                                         }
1074                                         state.queueTheir = append(state.queueTheir, nil)
1075                                         copy(state.queueTheir[insertIdx+1:], state.queueTheir[insertIdx:])
1076                                         state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice}
1077                                         state.Unlock()
1078                                 } else {
1079                                         state.Ctx.LogD("sp-process", lesp, "skipping")
1080                                 }
1081                         } else {
1082                                 state.Ctx.LogD("sp-process", lesp, "unknown")
1083                         }
1084                 default:
1085                         state.Ctx.LogE(
1086                                 "sp-process",
1087                                 append(les, LE{"Type", head.Type}),
1088                                 errors.New("unknown type"),
1089                                 "",
1090                         )
1091                         return nil, BadPktType
1092                 }
1093         }
1094         if infosGot {
1095                 var pkts int
1096                 var size uint64
1097                 state.RLock()
1098                 for _, info := range state.infosTheir {
1099                         pkts++
1100                         size += info.Size
1101                 }
1102                 state.RUnlock()
1103                 state.Ctx.LogI("sp-infos", LEs{
1104                         {"XX", string(TRx)},
1105                         {"Node", state.Node.Id},
1106                         {"Pkts", pkts},
1107                         {"Size", int64(size)},
1108                 }, "")
1109         }
1110         return payloadsSplit(replies), nil
1111 }