]> Cypherpunks.ru repositories - nncp.git/blob - src/sp.go
Intermediate .nock packets step
[nncp.git] / src / sp.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bytes"
22         "crypto/subtle"
23         "errors"
24         "io"
25         "os"
26         "path/filepath"
27         "sort"
28         "sync"
29         "time"
30
31         xdr "github.com/davecgh/go-xdr/xdr2"
32         "github.com/flynn/noise"
33 )
34
35 const (
36         MaxSPSize      = 1<<16 - 256
37         PartSuffix     = ".part"
38         SPHeadOverhead = 4
39 )
40
41 var (
42         MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1}
43
44         SPInfoOverhead    int
45         SPFreqOverhead    int
46         SPFileOverhead    int
47         SPHaltMarshalized []byte
48         SPPingMarshalized []byte
49
50         NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite(
51                 noise.DH25519,
52                 noise.CipherChaChaPoly,
53                 noise.HashBLAKE2b,
54         )
55
56         DefaultDeadline = 10 * time.Second
57         PingTimeout     = time.Minute
58 )
59
60 type FdAndFullSize struct {
61         fd       *os.File
62         fullSize int64
63 }
64
65 type SPType uint8
66
67 const (
68         SPTypeInfo SPType = iota
69         SPTypeFreq SPType = iota
70         SPTypeFile SPType = iota
71         SPTypeDone SPType = iota
72         SPTypeHalt SPType = iota
73         SPTypePing SPType = iota
74 )
75
76 type SPHead struct {
77         Type SPType
78 }
79
80 type SPInfo struct {
81         Nice uint8
82         Size uint64
83         Hash *[32]byte
84 }
85
86 type SPFreq struct {
87         Hash   *[32]byte
88         Offset uint64
89 }
90
91 type SPFile struct {
92         Hash    *[32]byte
93         Offset  uint64
94         Payload []byte
95 }
96
97 type SPDone struct {
98         Hash *[32]byte
99 }
100
101 type SPRaw struct {
102         Magic   [8]byte
103         Payload []byte
104 }
105
106 type FreqWithNice struct {
107         freq *SPFreq
108         nice uint8
109 }
110
111 type ConnDeadlined interface {
112         io.ReadWriteCloser
113         SetReadDeadline(t time.Time) error
114         SetWriteDeadline(t time.Time) error
115 }
116
117 func init() {
118         var buf bytes.Buffer
119         spHead := SPHead{Type: SPTypeHalt}
120         if _, err := xdr.Marshal(&buf, spHead); err != nil {
121                 panic(err)
122         }
123         SPHaltMarshalized = make([]byte, SPHeadOverhead)
124         copy(SPHaltMarshalized, buf.Bytes())
125         buf.Reset()
126
127         spHead = SPHead{Type: SPTypePing}
128         if _, err := xdr.Marshal(&buf, spHead); err != nil {
129                 panic(err)
130         }
131         SPPingMarshalized = make([]byte, SPHeadOverhead)
132         copy(SPPingMarshalized, buf.Bytes())
133         buf.Reset()
134
135         spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([32]byte)}
136         if _, err := xdr.Marshal(&buf, spInfo); err != nil {
137                 panic(err)
138         }
139         SPInfoOverhead = buf.Len()
140         buf.Reset()
141
142         spFreq := SPFreq{Hash: new([32]byte), Offset: 123}
143         if _, err := xdr.Marshal(&buf, spFreq); err != nil {
144                 panic(err)
145         }
146         SPFreqOverhead = buf.Len()
147         buf.Reset()
148
149         spFile := SPFile{Hash: new([32]byte), Offset: 123}
150         if _, err := xdr.Marshal(&buf, spFile); err != nil {
151                 panic(err)
152         }
153         SPFileOverhead = buf.Len()
154 }
155
156 func MarshalSP(typ SPType, sp interface{}) []byte {
157         var buf bytes.Buffer
158         if _, err := xdr.Marshal(&buf, SPHead{typ}); err != nil {
159                 panic(err)
160         }
161         if _, err := xdr.Marshal(&buf, sp); err != nil {
162                 panic(err)
163         }
164         return buf.Bytes()
165 }
166
167 func payloadsSplit(payloads [][]byte) [][]byte {
168         var outbounds [][]byte
169         outbound := make([]byte, 0, MaxSPSize)
170         for i, payload := range payloads {
171                 outbound = append(outbound, payload...)
172                 if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxSPSize {
173                         outbounds = append(outbounds, outbound)
174                         outbound = make([]byte, 0, MaxSPSize)
175                 }
176         }
177         if len(outbound) > 0 {
178                 outbounds = append(outbounds, outbound)
179         }
180         return outbounds
181 }
182
183 type SPState struct {
184         Ctx            *Ctx
185         Node           *Node
186         Nice           uint8
187         NoCK           bool
188         onlineDeadline time.Duration
189         maxOnlineTime  time.Duration
190         hs             *noise.HandshakeState
191         csOur          *noise.CipherState
192         csTheir        *noise.CipherState
193         payloads       chan []byte
194         pings          chan struct{}
195         infosTheir     map[[32]byte]*SPInfo
196         infosOurSeen   map[[32]byte]uint8
197         queueTheir     []*FreqWithNice
198         wg             sync.WaitGroup
199         RxBytes        int64
200         RxLastSeen     time.Time
201         RxLastNonPing  time.Time
202         TxBytes        int64
203         TxLastSeen     time.Time
204         TxLastNonPing  time.Time
205         started        time.Time
206         mustFinishAt   time.Time
207         Duration       time.Duration
208         RxSpeed        int64
209         TxSpeed        int64
210         rxLock         *os.File
211         txLock         *os.File
212         xxOnly         TRxTx
213         rxRate         int
214         txRate         int
215         isDead         chan struct{}
216         listOnly       bool
217         onlyPkts       map[[32]byte]bool
218         writeSPBuf     bytes.Buffer
219         fds            map[string]FdAndFullSize
220         checkerJobs    chan *[32]byte
221         sync.RWMutex
222 }
223
224 func (state *SPState) SetDead() {
225         state.Lock()
226         defer state.Unlock()
227         select {
228         case <-state.isDead:
229                 // Already closed channel, dead
230                 return
231         default:
232         }
233         close(state.isDead)
234         go func() {
235                 for range state.payloads {
236                 }
237         }()
238         go func() {
239                 for range state.pings {
240                 }
241         }()
242         go func() {
243                 for _, s := range state.fds {
244                         s.fd.Close()
245                 }
246         }()
247         if !state.NoCK {
248                 close(state.checkerJobs)
249         }
250 }
251
252 func (state *SPState) NotAlive() bool {
253         select {
254         case <-state.isDead:
255                 return true
256         default:
257         }
258         return false
259 }
260
261 func (state *SPState) dirUnlock() {
262         state.Ctx.UnlockDir(state.rxLock)
263         state.Ctx.UnlockDir(state.txLock)
264 }
265
266 func (state *SPState) SPChecker() {
267         for hshValue := range state.checkerJobs {
268                 les := LEs{
269                         {"XX", string(TRx)},
270                         {"Node", state.Node.Id},
271                         {"Pkt", Base32Codec.EncodeToString(hshValue[:])},
272                 }
273                 state.Ctx.LogD("sp-file", les, "checking")
274                 size, err := state.Ctx.CheckNoCK(state.Node.Id, hshValue)
275                 les = append(les, LE{"Size", size})
276                 if err != nil {
277                         state.Ctx.LogE("sp-file", les, err, "")
278                         continue
279                 }
280                 state.Ctx.LogI("sp-done", les, "")
281                 state.wg.Add(1)
282                 go func(hsh *[32]byte) {
283                         if !state.NotAlive() {
284                                 state.payloads <- MarshalSP(SPTypeDone, SPDone{hsh})
285                         }
286                         state.wg.Done()
287                 }(hshValue)
288         }
289 }
290
291 func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error {
292         state.writeSPBuf.Reset()
293         n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{
294                 Magic:   MagicNNCPLv1,
295                 Payload: payload,
296         })
297         if err != nil {
298                 return err
299         }
300         if n, err = dst.Write(state.writeSPBuf.Bytes()); err == nil {
301                 state.TxLastSeen = time.Now()
302                 state.TxBytes += int64(n)
303                 if !ping {
304                         state.TxLastNonPing = state.TxLastSeen
305                 }
306         }
307         return err
308 }
309
310 func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
311         var sp SPRaw
312         n, err := xdr.UnmarshalLimited(src, &sp, 1<<17)
313         if err != nil {
314                 ue := err.(*xdr.UnmarshalError)
315                 if ue.Err == io.EOF {
316                         return nil, ue.Err
317                 }
318                 return nil, err
319         }
320         state.RxLastSeen = time.Now()
321         state.RxBytes += int64(n)
322         if sp.Magic != MagicNNCPLv1 {
323                 return nil, BadMagic
324         }
325         return sp.Payload, nil
326 }
327
328 func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [][]byte {
329         var infos []*SPInfo
330         var totalSize int64
331         for job := range ctx.Jobs(nodeId, TTx) {
332                 if job.PktEnc.Nice > nice {
333                         continue
334                 }
335                 if _, known := (*seen)[*job.HshValue]; known {
336                         continue
337                 }
338                 totalSize += job.Size
339                 infos = append(infos, &SPInfo{
340                         Nice: job.PktEnc.Nice,
341                         Size: uint64(job.Size),
342                         Hash: job.HshValue,
343                 })
344                 (*seen)[*job.HshValue] = job.PktEnc.Nice
345         }
346         sort.Sort(ByNice(infos))
347         var payloads [][]byte
348         for _, info := range infos {
349                 payloads = append(payloads, MarshalSP(SPTypeInfo, info))
350                 ctx.LogD("sp-info-our", LEs{
351                         {"Node", nodeId},
352                         {"Name", Base32Codec.EncodeToString(info.Hash[:])},
353                         {"Size", info.Size},
354                 }, "")
355         }
356         if totalSize > 0 {
357                 ctx.LogI("sp-infos", LEs{
358                         {"XX", string(TTx)},
359                         {"Node", nodeId},
360                         {"Pkts", len(payloads)},
361                         {"Size", totalSize},
362                 }, "")
363         }
364         return payloadsSplit(payloads)
365 }
366
367 func (state *SPState) StartI(conn ConnDeadlined) error {
368         nodeId := state.Node.Id
369         err := state.Ctx.ensureRxDir(nodeId)
370         if err != nil {
371                 return err
372         }
373         var rxLock *os.File
374         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TRx) {
375                 rxLock, err = state.Ctx.LockDir(nodeId, string(TRx))
376                 if err != nil {
377                         return err
378                 }
379         }
380         var txLock *os.File
381         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
382                 txLock, err = state.Ctx.LockDir(nodeId, string(TTx))
383                 if err != nil {
384                         return err
385                 }
386         }
387         started := time.Now()
388         conf := noise.Config{
389                 CipherSuite: NoiseCipherSuite,
390                 Pattern:     noise.HandshakeIK,
391                 Initiator:   true,
392                 StaticKeypair: noise.DHKey{
393                         Private: state.Ctx.Self.NoisePrv[:],
394                         Public:  state.Ctx.Self.NoisePub[:],
395                 },
396                 PeerStatic: state.Node.NoisePub[:],
397         }
398         hs, err := noise.NewHandshakeState(conf)
399         if err != nil {
400                 return err
401         }
402         state.hs = hs
403         state.payloads = make(chan []byte)
404         state.pings = make(chan struct{})
405         state.infosTheir = make(map[[32]byte]*SPInfo)
406         state.infosOurSeen = make(map[[32]byte]uint8)
407         state.started = started
408         state.rxLock = rxLock
409         state.txLock = txLock
410
411         var infosPayloads [][]byte
412         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
413                 infosPayloads = state.Ctx.infosOur(nodeId, state.Nice, &state.infosOurSeen)
414         }
415         var firstPayload []byte
416         if len(infosPayloads) > 0 {
417                 firstPayload = infosPayloads[0]
418         }
419         // Pad first payload, to hide actual number of existing files
420         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
421                 firstPayload = append(firstPayload, SPHaltMarshalized...)
422         }
423
424         var buf []byte
425         var payload []byte
426         buf, _, _, err = state.hs.WriteMessage(nil, firstPayload)
427         if err != nil {
428                 state.dirUnlock()
429                 return err
430         }
431         les := LEs{{"Node", nodeId}, {"Nice", int(state.Nice)}}
432         state.Ctx.LogD("sp-start", les, "sending first message")
433         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
434         if err = state.WriteSP(conn, buf, false); err != nil {
435                 state.Ctx.LogE("sp-start", les, err, "")
436                 state.dirUnlock()
437                 return err
438         }
439         state.Ctx.LogD("sp-start", les, "waiting for first message")
440         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
441         if buf, err = state.ReadSP(conn); err != nil {
442                 state.Ctx.LogE("sp-start", les, err, "")
443                 state.dirUnlock()
444                 return err
445         }
446         payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
447         if err != nil {
448                 state.Ctx.LogE("sp-start", les, err, "")
449                 state.dirUnlock()
450                 return err
451         }
452         state.Ctx.LogD("sp-start", les, "starting workers")
453         err = state.StartWorkers(conn, infosPayloads, payload)
454         if err != nil {
455                 state.Ctx.LogE("sp-start", les, err, "")
456                 state.dirUnlock()
457         }
458         return err
459 }
460
461 func (state *SPState) StartR(conn ConnDeadlined) error {
462         started := time.Now()
463         conf := noise.Config{
464                 CipherSuite: NoiseCipherSuite,
465                 Pattern:     noise.HandshakeIK,
466                 Initiator:   false,
467                 StaticKeypair: noise.DHKey{
468                         Private: state.Ctx.Self.NoisePrv[:],
469                         Public:  state.Ctx.Self.NoisePub[:],
470                 },
471         }
472         hs, err := noise.NewHandshakeState(conf)
473         if err != nil {
474                 return err
475         }
476         xxOnly := TRxTx("")
477         state.hs = hs
478         state.payloads = make(chan []byte)
479         state.pings = make(chan struct{})
480         state.infosOurSeen = make(map[[32]byte]uint8)
481         state.infosTheir = make(map[[32]byte]*SPInfo)
482         state.started = started
483         state.xxOnly = xxOnly
484
485         var buf []byte
486         var payload []byte
487         state.Ctx.LogD("sp-start", LEs{{"Nice", int(state.Nice)}}, "waiting for first message")
488         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
489         if buf, err = state.ReadSP(conn); err != nil {
490                 state.Ctx.LogE("sp-start", LEs{}, err, "")
491                 return err
492         }
493         if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
494                 state.Ctx.LogE("sp-start", LEs{}, err, "")
495                 return err
496         }
497
498         var node *Node
499         for _, n := range state.Ctx.Neigh {
500                 if subtle.ConstantTimeCompare(state.hs.PeerStatic(), n.NoisePub[:]) == 1 {
501                         node = n
502                         break
503                 }
504         }
505         if node == nil {
506                 peerId := Base32Codec.EncodeToString(state.hs.PeerStatic())
507                 state.Ctx.LogE("sp-start", LEs{{"Peer", peerId}}, errors.New("unknown peer"), "")
508                 return errors.New("Unknown peer: " + peerId)
509         }
510         state.Node = node
511         state.rxRate = node.RxRate
512         state.txRate = node.TxRate
513         state.onlineDeadline = node.OnlineDeadline
514         state.maxOnlineTime = node.MaxOnlineTime
515         les := LEs{{"Node", node.Id}, {"Nice", int(state.Nice)}}
516
517         if err = state.Ctx.ensureRxDir(node.Id); err != nil {
518                 return err
519         }
520         var rxLock *os.File
521         if xxOnly == "" || xxOnly == TRx {
522                 rxLock, err = state.Ctx.LockDir(node.Id, string(TRx))
523                 if err != nil {
524                         return err
525                 }
526         }
527         state.rxLock = rxLock
528         var txLock *os.File
529         if xxOnly == "" || xxOnly == TTx {
530                 txLock, err = state.Ctx.LockDir(node.Id, string(TTx))
531                 if err != nil {
532                         return err
533                 }
534         }
535         state.txLock = txLock
536
537         var infosPayloads [][]byte
538         if xxOnly == "" || xxOnly == TTx {
539                 infosPayloads = state.Ctx.infosOur(node.Id, state.Nice, &state.infosOurSeen)
540         }
541         var firstPayload []byte
542         if len(infosPayloads) > 0 {
543                 firstPayload = infosPayloads[0]
544         }
545         // Pad first payload, to hide actual number of existing files
546         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
547                 firstPayload = append(firstPayload, SPHaltMarshalized...)
548         }
549
550         state.Ctx.LogD("sp-start", les, "sending first message")
551         buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload)
552         if err != nil {
553                 state.dirUnlock()
554                 return err
555         }
556         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
557         if err = state.WriteSP(conn, buf, false); err != nil {
558                 state.Ctx.LogE("sp-start", les, err, "")
559                 state.dirUnlock()
560                 return err
561         }
562         state.Ctx.LogD("sp-start", les, "starting workers")
563         err = state.StartWorkers(conn, infosPayloads, payload)
564         if err != nil {
565                 state.dirUnlock()
566         }
567         return err
568 }
569
570 func (state *SPState) closeFd(pth string) {
571         s, exists := state.fds[pth]
572         delete(state.fds, pth)
573         if exists {
574                 s.fd.Close()
575         }
576 }
577
578 func (state *SPState) FillExistingNoCK() {
579         checkerJobs := make([]*[32]byte, 0)
580         for job := range state.Ctx.JobsNoCK(state.Node.Id) {
581                 if job.PktEnc.Nice > state.Nice {
582                         continue
583                 }
584                 checkerJobs = append(checkerJobs, job.HshValue)
585         }
586         for _, job := range checkerJobs {
587                 state.checkerJobs <- job
588         }
589         state.wg.Done()
590 }
591
592 func (state *SPState) StartWorkers(
593         conn ConnDeadlined,
594         infosPayloads [][]byte,
595         payload []byte,
596 ) error {
597         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
598         state.fds = make(map[string]FdAndFullSize)
599         state.isDead = make(chan struct{})
600         if state.maxOnlineTime > 0 {
601                 state.mustFinishAt = state.started.Add(state.maxOnlineTime)
602         }
603
604         // Checker
605         if !state.NoCK {
606                 state.checkerJobs = make(chan *[32]byte)
607                 go state.SPChecker()
608                 state.wg.Add(1)
609                 go state.FillExistingNoCK()
610         }
611
612         // Remaining handshake payload sending
613         if len(infosPayloads) > 1 {
614                 state.wg.Add(1)
615                 go func() {
616                         for _, payload := range infosPayloads[1:] {
617                                 state.Ctx.LogD(
618                                         "sp-work",
619                                         append(les, LE{"Size", len(payload)}),
620                                         "queuing remaining payload",
621                                 )
622                                 state.payloads <- payload
623                         }
624                         state.wg.Done()
625                 }()
626         }
627
628         // Processing of first payload and queueing its responses
629         state.Ctx.LogD(
630                 "sp-work",
631                 append(les, LE{"Size", len(payload)}),
632                 "processing first payload",
633         )
634         replies, err := state.ProcessSP(payload)
635         if err != nil {
636                 state.Ctx.LogE("sp-work", les, err, "")
637                 return err
638         }
639         state.wg.Add(1)
640         go func() {
641                 for _, reply := range replies {
642                         state.Ctx.LogD(
643                                 "sp-work",
644                                 append(les, LE{"Size", len(reply)}),
645                                 "queuing reply",
646                         )
647                         state.payloads <- reply
648                 }
649                 state.wg.Done()
650         }()
651
652         // Periodic jobs
653         state.wg.Add(1)
654         go func() {
655                 deadlineTicker := time.NewTicker(time.Second)
656                 pingTicker := time.NewTicker(PingTimeout)
657                 for {
658                         select {
659                         case <-state.isDead:
660                                 state.wg.Done()
661                                 deadlineTicker.Stop()
662                                 pingTicker.Stop()
663                                 return
664                         case now := <-deadlineTicker.C:
665                                 if (now.Sub(state.RxLastNonPing) >= state.onlineDeadline &&
666                                         now.Sub(state.TxLastNonPing) >= state.onlineDeadline) ||
667                                         (state.maxOnlineTime > 0 && state.mustFinishAt.Before(now)) ||
668                                         (now.Sub(state.RxLastSeen) >= 2*PingTimeout) {
669                                         state.SetDead()
670                                         conn.Close() // #nosec G104
671                                 }
672                         case now := <-pingTicker.C:
673                                 if now.After(state.TxLastSeen.Add(PingTimeout)) {
674                                         state.wg.Add(1)
675                                         go func() {
676                                                 state.pings <- struct{}{}
677                                                 state.wg.Done()
678                                         }()
679                                 }
680                         }
681                 }
682         }()
683
684         // Spool checker and INFOs sender of appearing files
685         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
686                 state.wg.Add(1)
687                 go func() {
688                         ticker := time.NewTicker(time.Second)
689                         for {
690                                 select {
691                                 case <-state.isDead:
692                                         state.wg.Done()
693                                         ticker.Stop()
694                                         return
695                                 case <-ticker.C:
696                                         for _, payload := range state.Ctx.infosOur(
697                                                 state.Node.Id,
698                                                 state.Nice,
699                                                 &state.infosOurSeen,
700                                         ) {
701                                                 state.Ctx.LogD(
702                                                         "sp-work",
703                                                         append(les, LE{"Size", len(payload)}),
704                                                         "queuing new info",
705                                                 )
706                                                 state.payloads <- payload
707                                         }
708                                 }
709                         }
710                 }()
711         }
712
713         // Sender
714         state.wg.Add(1)
715         go func() {
716                 defer conn.Close()
717                 defer state.SetDead()
718                 defer state.wg.Done()
719                 for {
720                         if state.NotAlive() {
721                                 return
722                         }
723                         var payload []byte
724                         var ping bool
725                         select {
726                         case <-state.pings:
727                                 state.Ctx.LogD("sp-xmit", les, "got ping")
728                                 payload = SPPingMarshalized
729                                 ping = true
730                         case payload = <-state.payloads:
731                                 state.Ctx.LogD(
732                                         "sp-xmit",
733                                         append(les, LE{"Size", len(payload)}),
734                                         "got payload",
735                                 )
736                         default:
737                                 state.RLock()
738                                 if len(state.queueTheir) == 0 {
739                                         state.RUnlock()
740                                         time.Sleep(100 * time.Millisecond)
741                                         continue
742                                 }
743                                 freq := state.queueTheir[0].freq
744                                 state.RUnlock()
745                                 if state.txRate > 0 {
746                                         time.Sleep(time.Second / time.Duration(state.txRate))
747                                 }
748                                 lesp := append(les, LEs{
749                                         {"XX", string(TTx)},
750                                         {"Pkt", Base32Codec.EncodeToString(freq.Hash[:])},
751                                         {"Size", int64(freq.Offset)},
752                                 }...)
753                                 state.Ctx.LogD("sp-file", lesp, "queueing")
754                                 pth := filepath.Join(
755                                         state.Ctx.Spool,
756                                         state.Node.Id.String(),
757                                         string(TTx),
758                                         Base32Codec.EncodeToString(freq.Hash[:]),
759                                 )
760                                 fdAndFullSize, exists := state.fds[pth]
761                                 if !exists {
762                                         fd, err := os.Open(pth)
763                                         if err != nil {
764                                                 state.Ctx.LogE("sp-file", lesp, err, "")
765                                                 return
766                                         }
767                                         fi, err := fd.Stat()
768                                         if err != nil {
769                                                 state.Ctx.LogE("sp-file", lesp, err, "")
770                                                 return
771                                         }
772                                         fdAndFullSize = FdAndFullSize{fd: fd, fullSize: fi.Size()}
773                                         state.fds[pth] = fdAndFullSize
774                                 }
775                                 fd := fdAndFullSize.fd
776                                 fullSize := fdAndFullSize.fullSize
777                                 var buf []byte
778                                 if freq.Offset < uint64(fullSize) {
779                                         state.Ctx.LogD("sp-file", lesp, "seeking")
780                                         if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil {
781                                                 state.Ctx.LogE("sp-file", lesp, err, "")
782                                                 return
783                                         }
784                                         buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
785                                         n, err := fd.Read(buf)
786                                         if err != nil {
787                                                 state.Ctx.LogE("sp-file", lesp, err, "")
788                                                 return
789                                         }
790                                         buf = buf[:n]
791                                         state.Ctx.LogD("sp-file", append(lesp, LE{"Size", n}), "read")
792                                 }
793                                 state.closeFd(pth)
794                                 payload = MarshalSP(SPTypeFile, SPFile{
795                                         Hash:    freq.Hash,
796                                         Offset:  freq.Offset,
797                                         Payload: buf,
798                                 })
799                                 ourSize := freq.Offset + uint64(len(buf))
800                                 lesp = append(lesp, LE{"Size", int64(ourSize)})
801                                 lesp = append(lesp, LE{"FullSize", fullSize})
802                                 if state.Ctx.ShowPrgrs {
803                                         Progress("Tx", lesp)
804                                 }
805                                 state.Lock()
806                                 if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash {
807                                         if ourSize == uint64(fullSize) {
808                                                 state.Ctx.LogD("sp-file", lesp, "finished")
809                                                 if len(state.queueTheir) > 1 {
810                                                         state.queueTheir = state.queueTheir[1:]
811                                                 } else {
812                                                         state.queueTheir = state.queueTheir[:0]
813                                                 }
814                                         } else {
815                                                 state.queueTheir[0].freq.Offset += uint64(len(buf))
816                                         }
817                                 } else {
818                                         state.Ctx.LogD("sp-file", lesp, "queue disappeared")
819                                 }
820                                 state.Unlock()
821                         }
822                         state.Ctx.LogD("sp-xmit", append(les, LE{"Size", len(payload)}), "sending")
823                         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
824                         if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload), ping); err != nil {
825                                 state.Ctx.LogE("sp-xmit", les, err, "")
826                                 return
827                         }
828                 }
829         }()
830
831         // Receiver
832         state.wg.Add(1)
833         go func() {
834                 for {
835                         if state.NotAlive() {
836                                 break
837                         }
838                         state.Ctx.LogD("sp-recv", les, "waiting for payload")
839                         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
840                         payload, err := state.ReadSP(conn)
841                         if err != nil {
842                                 if err == io.EOF {
843                                         break
844                                 }
845                                 unmarshalErr := err.(*xdr.UnmarshalError)
846                                 if os.IsTimeout(unmarshalErr.Err) {
847                                         continue
848                                 }
849                                 if unmarshalErr.ErrorCode == xdr.ErrIO {
850                                         break
851                                 }
852                                 state.Ctx.LogE("sp-recv", les, err, "")
853                                 break
854                         }
855                         state.Ctx.LogD(
856                                 "sp-recv",
857                                 append(les, LE{"Size", len(payload)}),
858                                 "got payload",
859                         )
860                         payload, err = state.csTheir.Decrypt(nil, nil, payload)
861                         if err != nil {
862                                 state.Ctx.LogE("sp-recv", les, err, "")
863                                 break
864                         }
865                         state.Ctx.LogD(
866                                 "sp-recv",
867                                 append(les, LE{"Size", len(payload)}),
868                                 "processing",
869                         )
870                         replies, err := state.ProcessSP(payload)
871                         if err != nil {
872                                 state.Ctx.LogE("sp-recv", les, err, "")
873                                 break
874                         }
875                         state.wg.Add(1)
876                         go func() {
877                                 for _, reply := range replies {
878                                         state.Ctx.LogD(
879                                                 "sp-recv",
880                                                 append(les, LE{"Size", len(reply)}),
881                                                 "queuing reply",
882                                         )
883                                         state.payloads <- reply
884                                 }
885                                 state.wg.Done()
886                         }()
887                         if state.rxRate > 0 {
888                                 time.Sleep(time.Second / time.Duration(state.rxRate))
889                         }
890                 }
891                 state.SetDead()
892                 state.wg.Done()
893                 state.SetDead()
894                 conn.Close() // #nosec G104
895         }()
896
897         return nil
898 }
899
900 func (state *SPState) Wait() {
901         state.wg.Wait()
902         close(state.payloads)
903         close(state.pings)
904         state.dirUnlock()
905         state.Duration = time.Now().Sub(state.started)
906         state.RxSpeed = state.RxBytes
907         state.TxSpeed = state.TxBytes
908         rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
909         txDuration := int64(state.TxLastSeen.Sub(state.started).Seconds())
910         if rxDuration > 0 {
911                 state.RxSpeed = state.RxBytes / rxDuration
912         }
913         if txDuration > 0 {
914                 state.TxSpeed = state.TxBytes / txDuration
915         }
916 }
917
918 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
919         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
920         r := bytes.NewReader(payload)
921         var err error
922         var replies [][]byte
923         var infosGot bool
924         for r.Len() > 0 {
925                 state.Ctx.LogD("sp-process", les, "unmarshaling header")
926                 var head SPHead
927                 if _, err = xdr.Unmarshal(r, &head); err != nil {
928                         state.Ctx.LogE("sp-process", les, err, "")
929                         return nil, err
930                 }
931                 if head.Type != SPTypePing {
932                         state.RxLastNonPing = state.RxLastSeen
933                 }
934                 switch head.Type {
935                 case SPTypeHalt:
936                         state.Ctx.LogD("sp-process", append(les, LE{"Type", "halt"}), "")
937                         state.Lock()
938                         state.queueTheir = nil
939                         state.Unlock()
940                 case SPTypePing:
941                         state.Ctx.LogD("sp-process", append(les, LE{"Type", "ping"}), "")
942                 case SPTypeInfo:
943                         infosGot = true
944                         lesp := append(les, LE{"Type", "info"})
945                         state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
946                         var info SPInfo
947                         if _, err = xdr.Unmarshal(r, &info); err != nil {
948                                 state.Ctx.LogE("sp-process", lesp, err, "")
949                                 return nil, err
950                         }
951                         lesp = append(lesp, LEs{
952                                 {"Pkt", Base32Codec.EncodeToString(info.Hash[:])},
953                                 {"Size", int64(info.Size)},
954                                 {"Nice", int(info.Nice)},
955                         }...)
956                         if !state.listOnly && info.Nice > state.Nice {
957                                 state.Ctx.LogD("sp-process", lesp, "too nice")
958                                 continue
959                         }
960                         state.Ctx.LogD("sp-process", lesp, "received")
961                         if !state.listOnly && state.xxOnly == TTx {
962                                 continue
963                         }
964                         state.Lock()
965                         state.infosTheir[*info.Hash] = &info
966                         state.Unlock()
967                         state.Ctx.LogD("sp-process", lesp, "stating part")
968                         pktPath := filepath.Join(
969                                 state.Ctx.Spool,
970                                 state.Node.Id.String(),
971                                 string(TRx),
972                                 Base32Codec.EncodeToString(info.Hash[:]),
973                         )
974                         if _, err = os.Stat(pktPath); err == nil {
975                                 state.Ctx.LogI("sp-info", lesp, "already done")
976                                 if !state.listOnly {
977                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
978                                 }
979                                 continue
980                         }
981                         if _, err = os.Stat(pktPath + SeenSuffix); err == nil {
982                                 state.Ctx.LogI("sp-info", lesp, "already seen")
983                                 if !state.listOnly {
984                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
985                                 }
986                                 continue
987                         }
988                         if _, err = os.Stat(pktPath + NoCKSuffix); err == nil {
989                                 state.Ctx.LogI("sp-info", lesp, "still non checksummed")
990                                 continue
991                         }
992                         fi, err := os.Stat(pktPath + PartSuffix)
993                         var offset int64
994                         if err == nil {
995                                 offset = fi.Size()
996                         }
997                         if !state.Ctx.IsEnoughSpace(int64(info.Size) - offset) {
998                                 state.Ctx.LogI("sp-info", lesp, "not enough space")
999                                 continue
1000                         }
1001                         state.Ctx.LogI("sp-info", append(lesp, LE{"Offset", offset}), "")
1002                         if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) {
1003                                 replies = append(replies, MarshalSP(
1004                                         SPTypeFreq,
1005                                         SPFreq{info.Hash, uint64(offset)},
1006                                 ))
1007                         }
1008                 case SPTypeFile:
1009                         lesp := append(les, LE{"Type", "file"})
1010                         state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
1011                         var file SPFile
1012                         if _, err = xdr.Unmarshal(r, &file); err != nil {
1013                                 state.Ctx.LogE("sp-process", lesp, err, "")
1014                                 return nil, err
1015                         }
1016                         lesp = append(lesp, LEs{
1017                                 {"XX", string(TRx)},
1018                                 {"Pkt", Base32Codec.EncodeToString(file.Hash[:])},
1019                                 {"Size", len(file.Payload)},
1020                         }...)
1021                         dirToSync := filepath.Join(
1022                                 state.Ctx.Spool,
1023                                 state.Node.Id.String(),
1024                                 string(TRx),
1025                         )
1026                         filePath := filepath.Join(dirToSync, Base32Codec.EncodeToString(file.Hash[:]))
1027                         filePathPart := filePath + PartSuffix
1028                         state.Ctx.LogD("sp-file", lesp, "opening part")
1029                         fdAndFullSize, exists := state.fds[filePathPart]
1030                         var fd *os.File
1031                         if exists {
1032                                 fd = fdAndFullSize.fd
1033                         } else {
1034                                 fd, err = os.OpenFile(
1035                                         filePathPart,
1036                                         os.O_RDWR|os.O_CREATE,
1037                                         os.FileMode(0666),
1038                                 )
1039                                 if err != nil {
1040                                         state.Ctx.LogE("sp-file", lesp, err, "")
1041                                         return nil, err
1042                                 }
1043                                 state.fds[filePathPart] = FdAndFullSize{fd: fd}
1044                         }
1045                         state.Ctx.LogD("sp-file", append(lesp, LE{"Offset", file.Offset}), "seeking")
1046                         if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
1047                                 state.Ctx.LogE("sp-file", lesp, err, "")
1048                                 state.closeFd(filePathPart)
1049                                 return nil, err
1050                         }
1051                         state.Ctx.LogD("sp-file", lesp, "writing")
1052                         if _, err = fd.Write(file.Payload); err != nil {
1053                                 state.Ctx.LogE("sp-file", lesp, err, "")
1054                                 state.closeFd(filePathPart)
1055                                 return nil, err
1056                         }
1057                         ourSize := int64(file.Offset + uint64(len(file.Payload)))
1058                         lesp[len(lesp)-1].V = ourSize
1059                         fullsize := int64(0)
1060                         state.RLock()
1061                         infoTheir, ok := state.infosTheir[*file.Hash]
1062                         state.RUnlock()
1063                         if ok {
1064                                 fullsize = int64(infoTheir.Size)
1065                         }
1066                         lesp = append(lesp, LE{"FullSize", fullsize})
1067                         if state.Ctx.ShowPrgrs {
1068                                 Progress("Rx", lesp)
1069                         }
1070                         if fullsize != ourSize {
1071                                 continue
1072                         }
1073                         err = fd.Sync()
1074                         state.closeFd(filePathPart)
1075                         if err != nil {
1076                                 state.Ctx.LogE("sp-file", lesp, err, "sync")
1077                                 continue
1078                         }
1079                         if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil {
1080                                 state.Ctx.LogE("sp-file", lesp, err, "rename")
1081                                 continue
1082                         }
1083                         if err = DirSync(dirToSync); err != nil {
1084                                 state.Ctx.LogE("sp-file", lesp, err, "sync")
1085                                 continue
1086                         }
1087                         state.Ctx.LogI("sp-file", lesp, "downloaded")
1088                         state.Lock()
1089                         delete(state.infosTheir, *file.Hash)
1090                         state.Unlock()
1091                         if !state.NoCK {
1092                                 state.checkerJobs <- file.Hash
1093                         }
1094                 case SPTypeDone:
1095                         lesp := append(les, LE{"Type", "done"})
1096                         state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
1097                         var done SPDone
1098                         if _, err = xdr.Unmarshal(r, &done); err != nil {
1099                                 state.Ctx.LogE("sp-process", lesp, err, "")
1100                                 return nil, err
1101                         }
1102                         lesp = append(lesp, LE{"Pkt", Base32Codec.EncodeToString(done.Hash[:])})
1103                         state.Ctx.LogD("sp-done", lesp, "removing")
1104                         err := os.Remove(filepath.Join(
1105                                 state.Ctx.Spool,
1106                                 state.Node.Id.String(),
1107                                 string(TTx),
1108                                 Base32Codec.EncodeToString(done.Hash[:]),
1109                         ))
1110                         lesp = append(lesp, LE{"XX", string(TTx)})
1111                         if err == nil {
1112                                 state.Ctx.LogI("sp-done", lesp, "")
1113                         } else {
1114                                 state.Ctx.LogE("sp-done", lesp, err, "")
1115                         }
1116                 case SPTypeFreq:
1117                         lesp := append(les, LE{"Type", "freq"})
1118                         state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
1119                         var freq SPFreq
1120                         if _, err = xdr.Unmarshal(r, &freq); err != nil {
1121                                 state.Ctx.LogE("sp-process", lesp, err, "")
1122                                 return nil, err
1123                         }
1124                         lesp = append(lesp, LE{"Pkt", Base32Codec.EncodeToString(freq.Hash[:])})
1125                         lesp = append(lesp, LE{"Offset", freq.Offset})
1126                         state.Ctx.LogD("sp-process", lesp, "queueing")
1127                         nice, exists := state.infosOurSeen[*freq.Hash]
1128                         if exists {
1129                                 if state.onlyPkts == nil || !state.onlyPkts[*freq.Hash] {
1130                                         state.Lock()
1131                                         insertIdx := 0
1132                                         var freqWithNice *FreqWithNice
1133                                         for insertIdx, freqWithNice = range state.queueTheir {
1134                                                 if freqWithNice.nice > nice {
1135                                                         break
1136                                                 }
1137                                         }
1138                                         state.queueTheir = append(state.queueTheir, nil)
1139                                         copy(state.queueTheir[insertIdx+1:], state.queueTheir[insertIdx:])
1140                                         state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice}
1141                                         state.Unlock()
1142                                 } else {
1143                                         state.Ctx.LogD("sp-process", lesp, "skipping")
1144                                 }
1145                         } else {
1146                                 state.Ctx.LogD("sp-process", lesp, "unknown")
1147                         }
1148                 default:
1149                         state.Ctx.LogE(
1150                                 "sp-process",
1151                                 append(les, LE{"Type", head.Type}),
1152                                 errors.New("unknown type"),
1153                                 "",
1154                         )
1155                         return nil, BadPktType
1156                 }
1157         }
1158         if infosGot {
1159                 var pkts int
1160                 var size uint64
1161                 state.RLock()
1162                 for _, info := range state.infosTheir {
1163                         pkts++
1164                         size += info.Size
1165                 }
1166                 state.RUnlock()
1167                 state.Ctx.LogI("sp-infos", LEs{
1168                         {"XX", string(TRx)},
1169                         {"Node", state.Node.Id},
1170                         {"Pkts", pkts},
1171                         {"Size", int64(size)},
1172                 }, "")
1173         }
1174         return payloadsSplit(replies), nil
1175 }