]> Cypherpunks.ru repositories - nncp.git/blob - src/sp.go
Keep downloading file opened
[nncp.git] / src / sp.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bytes"
22         "crypto/subtle"
23         "errors"
24         "io"
25         "os"
26         "path/filepath"
27         "sort"
28         "sync"
29         "time"
30
31         xdr "github.com/davecgh/go-xdr/xdr2"
32         "github.com/flynn/noise"
33 )
34
35 const (
36         MaxSPSize      = 1<<16 - 256
37         PartSuffix     = ".part"
38         SPHeadOverhead = 4
39 )
40
41 var (
42         MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1}
43
44         SPInfoOverhead    int
45         SPFreqOverhead    int
46         SPFileOverhead    int
47         SPHaltMarshalized []byte
48         SPPingMarshalized []byte
49
50         NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite(
51                 noise.DH25519,
52                 noise.CipherChaChaPoly,
53                 noise.HashBLAKE2b,
54         )
55
56         DefaultDeadline = 10 * time.Second
57         PingTimeout     = time.Minute
58
59         spCheckerToken chan struct{}
60 )
61
62 type FdAndFullSize struct {
63         fd       *os.File
64         fullSize int64
65 }
66
67 type SPType uint8
68
69 const (
70         SPTypeInfo SPType = iota
71         SPTypeFreq SPType = iota
72         SPTypeFile SPType = iota
73         SPTypeDone SPType = iota
74         SPTypeHalt SPType = iota
75         SPTypePing SPType = iota
76 )
77
78 type SPHead struct {
79         Type SPType
80 }
81
82 type SPInfo struct {
83         Nice uint8
84         Size uint64
85         Hash *[32]byte
86 }
87
88 type SPFreq struct {
89         Hash   *[32]byte
90         Offset uint64
91 }
92
93 type SPFile struct {
94         Hash    *[32]byte
95         Offset  uint64
96         Payload []byte
97 }
98
99 type SPDone struct {
100         Hash *[32]byte
101 }
102
103 type SPRaw struct {
104         Magic   [8]byte
105         Payload []byte
106 }
107
108 type FreqWithNice struct {
109         freq *SPFreq
110         nice uint8
111 }
112
113 type ConnDeadlined interface {
114         io.ReadWriteCloser
115         SetReadDeadline(t time.Time) error
116         SetWriteDeadline(t time.Time) error
117 }
118
119 func init() {
120         var buf bytes.Buffer
121         spHead := SPHead{Type: SPTypeHalt}
122         if _, err := xdr.Marshal(&buf, spHead); err != nil {
123                 panic(err)
124         }
125         SPHaltMarshalized = make([]byte, SPHeadOverhead)
126         copy(SPHaltMarshalized, buf.Bytes())
127         buf.Reset()
128
129         spHead = SPHead{Type: SPTypePing}
130         if _, err := xdr.Marshal(&buf, spHead); err != nil {
131                 panic(err)
132         }
133         SPPingMarshalized = make([]byte, SPHeadOverhead)
134         copy(SPPingMarshalized, buf.Bytes())
135         buf.Reset()
136
137         spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([32]byte)}
138         if _, err := xdr.Marshal(&buf, spInfo); err != nil {
139                 panic(err)
140         }
141         SPInfoOverhead = buf.Len()
142         buf.Reset()
143
144         spFreq := SPFreq{Hash: new([32]byte), Offset: 123}
145         if _, err := xdr.Marshal(&buf, spFreq); err != nil {
146                 panic(err)
147         }
148         SPFreqOverhead = buf.Len()
149         buf.Reset()
150
151         spFile := SPFile{Hash: new([32]byte), Offset: 123}
152         if _, err := xdr.Marshal(&buf, spFile); err != nil {
153                 panic(err)
154         }
155         SPFileOverhead = buf.Len()
156         spCheckerToken = make(chan struct{}, 1)
157         spCheckerToken <- struct{}{}
158 }
159
160 func MarshalSP(typ SPType, sp interface{}) []byte {
161         var buf bytes.Buffer
162         if _, err := xdr.Marshal(&buf, SPHead{typ}); err != nil {
163                 panic(err)
164         }
165         if _, err := xdr.Marshal(&buf, sp); err != nil {
166                 panic(err)
167         }
168         return buf.Bytes()
169 }
170
171 func payloadsSplit(payloads [][]byte) [][]byte {
172         var outbounds [][]byte
173         outbound := make([]byte, 0, MaxSPSize)
174         for i, payload := range payloads {
175                 outbound = append(outbound, payload...)
176                 if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxSPSize {
177                         outbounds = append(outbounds, outbound)
178                         outbound = make([]byte, 0, MaxSPSize)
179                 }
180         }
181         if len(outbound) > 0 {
182                 outbounds = append(outbounds, outbound)
183         }
184         return outbounds
185 }
186
187 type SPState struct {
188         Ctx            *Ctx
189         Node           *Node
190         Nice           uint8
191         onlineDeadline time.Duration
192         maxOnlineTime  time.Duration
193         hs             *noise.HandshakeState
194         csOur          *noise.CipherState
195         csTheir        *noise.CipherState
196         payloads       chan []byte
197         pings          chan struct{}
198         infosTheir     map[[32]byte]*SPInfo
199         infosOurSeen   map[[32]byte]uint8
200         queueTheir     []*FreqWithNice
201         wg             sync.WaitGroup
202         RxBytes        int64
203         RxLastSeen     time.Time
204         RxLastNonPing  time.Time
205         TxBytes        int64
206         TxLastSeen     time.Time
207         TxLastNonPing  time.Time
208         started        time.Time
209         mustFinishAt   time.Time
210         Duration       time.Duration
211         RxSpeed        int64
212         TxSpeed        int64
213         rxLock         *os.File
214         txLock         *os.File
215         xxOnly         TRxTx
216         rxRate         int
217         txRate         int
218         isDead         chan struct{}
219         listOnly       bool
220         onlyPkts       map[[32]byte]bool
221         writeSPBuf     bytes.Buffer
222         fds            map[string]FdAndFullSize
223         sync.RWMutex
224 }
225
226 func (state *SPState) SetDead() {
227         state.Lock()
228         defer state.Unlock()
229         select {
230         case <-state.isDead:
231                 // Already closed channel, dead
232                 return
233         default:
234         }
235         close(state.isDead)
236         go func() {
237                 for range state.payloads {
238                 }
239         }()
240         go func() {
241                 for range state.pings {
242                 }
243         }()
244 }
245
246 func (state *SPState) NotAlive() bool {
247         select {
248         case <-state.isDead:
249                 return true
250         default:
251         }
252         return false
253 }
254
255 func (state *SPState) dirUnlock() {
256         state.Ctx.UnlockDir(state.rxLock)
257         state.Ctx.UnlockDir(state.txLock)
258 }
259
260 func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error {
261         state.writeSPBuf.Reset()
262         n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{
263                 Magic:   MagicNNCPLv1,
264                 Payload: payload,
265         })
266         if err != nil {
267                 return err
268         }
269         if n, err = dst.Write(state.writeSPBuf.Bytes()); err == nil {
270                 state.TxLastSeen = time.Now()
271                 state.TxBytes += int64(n)
272                 if !ping {
273                         state.TxLastNonPing = state.TxLastSeen
274                 }
275         }
276         return err
277 }
278
279 func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
280         var sp SPRaw
281         n, err := xdr.UnmarshalLimited(src, &sp, 1<<17)
282         if err != nil {
283                 ue := err.(*xdr.UnmarshalError)
284                 if ue.Err == io.EOF {
285                         return nil, ue.Err
286                 }
287                 return nil, err
288         }
289         state.RxLastSeen = time.Now()
290         state.RxBytes += int64(n)
291         if sp.Magic != MagicNNCPLv1 {
292                 return nil, BadMagic
293         }
294         return sp.Payload, nil
295 }
296
297 func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [][]byte {
298         var infos []*SPInfo
299         var totalSize int64
300         for job := range ctx.Jobs(nodeId, TTx) {
301                 if job.PktEnc.Nice > nice {
302                         continue
303                 }
304                 if _, known := (*seen)[*job.HshValue]; known {
305                         continue
306                 }
307                 totalSize += job.Size
308                 infos = append(infos, &SPInfo{
309                         Nice: job.PktEnc.Nice,
310                         Size: uint64(job.Size),
311                         Hash: job.HshValue,
312                 })
313                 (*seen)[*job.HshValue] = job.PktEnc.Nice
314         }
315         sort.Sort(ByNice(infos))
316         var payloads [][]byte
317         for _, info := range infos {
318                 payloads = append(payloads, MarshalSP(SPTypeInfo, info))
319                 ctx.LogD("sp-info-our", LEs{
320                         {"Node", nodeId},
321                         {"Name", Base32Codec.EncodeToString(info.Hash[:])},
322                         {"Size", info.Size},
323                 }, "")
324         }
325         if totalSize > 0 {
326                 ctx.LogI("sp-infos", LEs{
327                         {"XX", string(TTx)},
328                         {"Node", nodeId},
329                         {"Pkts", len(payloads)},
330                         {"Size", totalSize},
331                 }, "")
332         }
333         return payloadsSplit(payloads)
334 }
335
336 func (state *SPState) StartI(conn ConnDeadlined) error {
337         nodeId := state.Node.Id
338         err := state.Ctx.ensureRxDir(nodeId)
339         if err != nil {
340                 return err
341         }
342         var rxLock *os.File
343         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TRx) {
344                 rxLock, err = state.Ctx.LockDir(nodeId, string(TRx))
345                 if err != nil {
346                         return err
347                 }
348         }
349         var txLock *os.File
350         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
351                 txLock, err = state.Ctx.LockDir(nodeId, string(TTx))
352                 if err != nil {
353                         return err
354                 }
355         }
356         started := time.Now()
357         conf := noise.Config{
358                 CipherSuite: NoiseCipherSuite,
359                 Pattern:     noise.HandshakeIK,
360                 Initiator:   true,
361                 StaticKeypair: noise.DHKey{
362                         Private: state.Ctx.Self.NoisePrv[:],
363                         Public:  state.Ctx.Self.NoisePub[:],
364                 },
365                 PeerStatic: state.Node.NoisePub[:],
366         }
367         hs, err := noise.NewHandshakeState(conf)
368         if err != nil {
369                 return err
370         }
371         state.hs = hs
372         state.payloads = make(chan []byte)
373         state.pings = make(chan struct{})
374         state.infosTheir = make(map[[32]byte]*SPInfo)
375         state.infosOurSeen = make(map[[32]byte]uint8)
376         state.started = started
377         state.rxLock = rxLock
378         state.txLock = txLock
379
380         var infosPayloads [][]byte
381         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
382                 infosPayloads = state.Ctx.infosOur(nodeId, state.Nice, &state.infosOurSeen)
383         }
384         var firstPayload []byte
385         if len(infosPayloads) > 0 {
386                 firstPayload = infosPayloads[0]
387         }
388         // Pad first payload, to hide actual number of existing files
389         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
390                 firstPayload = append(firstPayload, SPHaltMarshalized...)
391         }
392
393         var buf []byte
394         var payload []byte
395         buf, _, _, err = state.hs.WriteMessage(nil, firstPayload)
396         if err != nil {
397                 state.dirUnlock()
398                 return err
399         }
400         les := LEs{{"Node", nodeId}, {"Nice", int(state.Nice)}}
401         state.Ctx.LogD("sp-start", les, "sending first message")
402         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
403         if err = state.WriteSP(conn, buf, false); err != nil {
404                 state.Ctx.LogE("sp-start", les, err, "")
405                 state.dirUnlock()
406                 return err
407         }
408         state.Ctx.LogD("sp-start", les, "waiting for first message")
409         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
410         if buf, err = state.ReadSP(conn); err != nil {
411                 state.Ctx.LogE("sp-start", les, err, "")
412                 state.dirUnlock()
413                 return err
414         }
415         payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
416         if err != nil {
417                 state.Ctx.LogE("sp-start", les, err, "")
418                 state.dirUnlock()
419                 return err
420         }
421         state.Ctx.LogD("sp-start", les, "starting workers")
422         err = state.StartWorkers(conn, infosPayloads, payload)
423         if err != nil {
424                 state.Ctx.LogE("sp-start", les, err, "")
425                 state.dirUnlock()
426         }
427         return err
428 }
429
430 func (state *SPState) StartR(conn ConnDeadlined) error {
431         started := time.Now()
432         conf := noise.Config{
433                 CipherSuite: NoiseCipherSuite,
434                 Pattern:     noise.HandshakeIK,
435                 Initiator:   false,
436                 StaticKeypair: noise.DHKey{
437                         Private: state.Ctx.Self.NoisePrv[:],
438                         Public:  state.Ctx.Self.NoisePub[:],
439                 },
440         }
441         hs, err := noise.NewHandshakeState(conf)
442         if err != nil {
443                 return err
444         }
445         xxOnly := TRxTx("")
446         state.hs = hs
447         state.payloads = make(chan []byte)
448         state.pings = make(chan struct{})
449         state.infosOurSeen = make(map[[32]byte]uint8)
450         state.infosTheir = make(map[[32]byte]*SPInfo)
451         state.started = started
452         state.xxOnly = xxOnly
453         var buf []byte
454         var payload []byte
455         state.Ctx.LogD("sp-start", LEs{{"Nice", int(state.Nice)}}, "waiting for first message")
456         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
457         if buf, err = state.ReadSP(conn); err != nil {
458                 state.Ctx.LogE("sp-start", LEs{}, err, "")
459                 return err
460         }
461         if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
462                 state.Ctx.LogE("sp-start", LEs{}, err, "")
463                 return err
464         }
465
466         var node *Node
467         for _, n := range state.Ctx.Neigh {
468                 if subtle.ConstantTimeCompare(state.hs.PeerStatic(), n.NoisePub[:]) == 1 {
469                         node = n
470                         break
471                 }
472         }
473         if node == nil {
474                 peerId := Base32Codec.EncodeToString(state.hs.PeerStatic())
475                 state.Ctx.LogE("sp-start", LEs{{"Peer", peerId}}, errors.New("unknown peer"), "")
476                 return errors.New("Unknown peer: " + peerId)
477         }
478         state.Node = node
479         state.rxRate = node.RxRate
480         state.txRate = node.TxRate
481         state.onlineDeadline = node.OnlineDeadline
482         state.maxOnlineTime = node.MaxOnlineTime
483         les := LEs{{"Node", node.Id}, {"Nice", int(state.Nice)}}
484
485         if err = state.Ctx.ensureRxDir(node.Id); err != nil {
486                 return err
487         }
488         var rxLock *os.File
489         if xxOnly == "" || xxOnly == TRx {
490                 rxLock, err = state.Ctx.LockDir(node.Id, string(TRx))
491                 if err != nil {
492                         return err
493                 }
494         }
495         state.rxLock = rxLock
496         var txLock *os.File
497         if xxOnly == "" || xxOnly == TTx {
498                 txLock, err = state.Ctx.LockDir(node.Id, string(TTx))
499                 if err != nil {
500                         return err
501                 }
502         }
503         state.txLock = txLock
504
505         var infosPayloads [][]byte
506         if xxOnly == "" || xxOnly == TTx {
507                 infosPayloads = state.Ctx.infosOur(node.Id, state.Nice, &state.infosOurSeen)
508         }
509         var firstPayload []byte
510         if len(infosPayloads) > 0 {
511                 firstPayload = infosPayloads[0]
512         }
513         // Pad first payload, to hide actual number of existing files
514         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
515                 firstPayload = append(firstPayload, SPHaltMarshalized...)
516         }
517
518         state.Ctx.LogD("sp-start", les, "sending first message")
519         buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload)
520         if err != nil {
521                 state.dirUnlock()
522                 return err
523         }
524         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
525         if err = state.WriteSP(conn, buf, false); err != nil {
526                 state.Ctx.LogE("sp-start", les, err, "")
527                 state.dirUnlock()
528                 return err
529         }
530         state.Ctx.LogD("sp-start", les, "starting workers")
531         err = state.StartWorkers(conn, infosPayloads, payload)
532         if err != nil {
533                 state.dirUnlock()
534         }
535         return err
536 }
537
538 func (state *SPState) closeFd(pth string) {
539         s, exists := state.fds[pth]
540         delete(state.fds, pth)
541         if exists {
542                 s.fd.Close()
543         }
544 }
545
546 func (state *SPState) StartWorkers(
547         conn ConnDeadlined,
548         infosPayloads [][]byte,
549         payload []byte,
550 ) error {
551         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
552         state.fds = make(map[string]FdAndFullSize)
553         state.isDead = make(chan struct{})
554         if state.maxOnlineTime > 0 {
555                 state.mustFinishAt = state.started.Add(state.maxOnlineTime)
556         }
557
558         // Remaining handshake payload sending
559         if len(infosPayloads) > 1 {
560                 state.wg.Add(1)
561                 go func() {
562                         for _, payload := range infosPayloads[1:] {
563                                 state.Ctx.LogD(
564                                         "sp-work",
565                                         append(les, LE{"Size", len(payload)}),
566                                         "queuing remaining payload",
567                                 )
568                                 state.payloads <- payload
569                         }
570                         state.wg.Done()
571                 }()
572         }
573
574         // Processing of first payload and queueing its responses
575         state.Ctx.LogD(
576                 "sp-work",
577                 append(les, LE{"Size", len(payload)}),
578                 "processing first payload",
579         )
580         replies, err := state.ProcessSP(payload)
581         if err != nil {
582                 state.Ctx.LogE("sp-work", les, err, "")
583                 return err
584         }
585         state.wg.Add(1)
586         go func() {
587                 for _, reply := range replies {
588                         state.Ctx.LogD(
589                                 "sp-work",
590                                 append(les, LE{"Size", len(reply)}),
591                                 "queuing reply",
592                         )
593                         state.payloads <- reply
594                 }
595                 state.wg.Done()
596         }()
597
598         // Periodic jobs
599         state.wg.Add(1)
600         go func() {
601                 deadlineTicker := time.NewTicker(time.Second)
602                 pingTicker := time.NewTicker(PingTimeout)
603                 for {
604                         select {
605                         case <-state.isDead:
606                                 state.wg.Done()
607                                 deadlineTicker.Stop()
608                                 pingTicker.Stop()
609                                 return
610                         case now := <-deadlineTicker.C:
611                                 if (now.Sub(state.RxLastNonPing) >= state.onlineDeadline &&
612                                         now.Sub(state.TxLastNonPing) >= state.onlineDeadline) ||
613                                         (state.maxOnlineTime > 0 && state.mustFinishAt.Before(now)) ||
614                                         (now.Sub(state.RxLastSeen) >= 2*PingTimeout) {
615                                         state.SetDead()
616                                         conn.Close() // #nosec G104
617                                 }
618                         case now := <-pingTicker.C:
619                                 if now.After(state.TxLastSeen.Add(PingTimeout)) {
620                                         state.wg.Add(1)
621                                         go func() {
622                                                 state.pings <- struct{}{}
623                                                 state.wg.Done()
624                                         }()
625                                 }
626                         }
627                 }
628         }()
629
630         // Spool checker and INFOs sender of appearing files
631         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
632                 state.wg.Add(1)
633                 go func() {
634                         ticker := time.NewTicker(time.Second)
635                         for {
636                                 select {
637                                 case <-state.isDead:
638                                         state.wg.Done()
639                                         ticker.Stop()
640                                         return
641                                 case <-ticker.C:
642                                         for _, payload := range state.Ctx.infosOur(
643                                                 state.Node.Id,
644                                                 state.Nice,
645                                                 &state.infosOurSeen,
646                                         ) {
647                                                 state.Ctx.LogD(
648                                                         "sp-work",
649                                                         append(les, LE{"Size", len(payload)}),
650                                                         "queuing new info",
651                                                 )
652                                                 state.payloads <- payload
653                                         }
654                                 }
655                         }
656                 }()
657         }
658
659         // Sender
660         state.wg.Add(1)
661         go func() {
662                 defer conn.Close()
663                 defer state.SetDead()
664                 defer state.wg.Done()
665                 for {
666                         if state.NotAlive() {
667                                 return
668                         }
669                         var payload []byte
670                         var ping bool
671                         select {
672                         case <-state.pings:
673                                 state.Ctx.LogD("sp-xmit", les, "got ping")
674                                 payload = SPPingMarshalized
675                                 ping = true
676                         case payload = <-state.payloads:
677                                 state.Ctx.LogD(
678                                         "sp-xmit",
679                                         append(les, LE{"Size", len(payload)}),
680                                         "got payload",
681                                 )
682                         default:
683                                 state.RLock()
684                                 if len(state.queueTheir) == 0 {
685                                         state.RUnlock()
686                                         time.Sleep(100 * time.Millisecond)
687                                         continue
688                                 }
689                                 freq := state.queueTheir[0].freq
690                                 state.RUnlock()
691                                 if state.txRate > 0 {
692                                         time.Sleep(time.Second / time.Duration(state.txRate))
693                                 }
694                                 lesp := append(les, LEs{
695                                         {"XX", string(TTx)},
696                                         {"Pkt", Base32Codec.EncodeToString(freq.Hash[:])},
697                                         {"Size", int64(freq.Offset)},
698                                 }...)
699                                 state.Ctx.LogD("sp-file", lesp, "queueing")
700                                 pth := filepath.Join(
701                                         state.Ctx.Spool,
702                                         state.Node.Id.String(),
703                                         string(TTx),
704                                         Base32Codec.EncodeToString(freq.Hash[:]),
705                                 )
706                                 fdAndFullSize, exists := state.fds[pth]
707                                 if !exists {
708                                         fd, err := os.Open(pth)
709                                         if err != nil {
710                                                 state.Ctx.LogE("sp-file", lesp, err, "")
711                                                 return
712                                         }
713                                         fi, err := fd.Stat()
714                                         if err != nil {
715                                                 state.Ctx.LogE("sp-file", lesp, err, "")
716                                                 return
717                                         }
718                                         fdAndFullSize = FdAndFullSize{fd: fd, fullSize: fi.Size()}
719                                         state.fds[pth] = fdAndFullSize
720                                 }
721                                 fd := fdAndFullSize.fd
722                                 fullSize := fdAndFullSize.fullSize
723                                 var buf []byte
724                                 if freq.Offset < uint64(fullSize) {
725                                         state.Ctx.LogD("sp-file", lesp, "seeking")
726                                         if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil {
727                                                 state.Ctx.LogE("sp-file", lesp, err, "")
728                                                 return
729                                         }
730                                         buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
731                                         n, err := fd.Read(buf)
732                                         if err != nil {
733                                                 state.Ctx.LogE("sp-file", lesp, err, "")
734                                                 return
735                                         }
736                                         buf = buf[:n]
737                                         state.Ctx.LogD("sp-file", append(lesp, LE{"Size", n}), "read")
738                                 }
739                                 state.closeFd(pth)
740                                 payload = MarshalSP(SPTypeFile, SPFile{
741                                         Hash:    freq.Hash,
742                                         Offset:  freq.Offset,
743                                         Payload: buf,
744                                 })
745                                 ourSize := freq.Offset + uint64(len(buf))
746                                 lesp = append(lesp, LE{"Size", int64(ourSize)})
747                                 lesp = append(lesp, LE{"FullSize", fullSize})
748                                 if state.Ctx.ShowPrgrs {
749                                         Progress("Tx", lesp)
750                                 }
751                                 state.Lock()
752                                 if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash {
753                                         if ourSize == uint64(fullSize) {
754                                                 state.Ctx.LogD("sp-file", lesp, "finished")
755                                                 if len(state.queueTheir) > 1 {
756                                                         state.queueTheir = state.queueTheir[1:]
757                                                 } else {
758                                                         state.queueTheir = state.queueTheir[:0]
759                                                 }
760                                         } else {
761                                                 state.queueTheir[0].freq.Offset += uint64(len(buf))
762                                         }
763                                 } else {
764                                         state.Ctx.LogD("sp-file", lesp, "queue disappeared")
765                                 }
766                                 state.Unlock()
767                         }
768                         state.Ctx.LogD("sp-xmit", append(les, LE{"Size", len(payload)}), "sending")
769                         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
770                         if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload), ping); err != nil {
771                                 state.Ctx.LogE("sp-xmit", les, err, "")
772                                 return
773                         }
774                 }
775         }()
776
777         // Receiver
778         state.wg.Add(1)
779         go func() {
780                 for {
781                         if state.NotAlive() {
782                                 break
783                         }
784                         state.Ctx.LogD("sp-recv", les, "waiting for payload")
785                         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
786                         payload, err := state.ReadSP(conn)
787                         if err != nil {
788                                 if err == io.EOF {
789                                         break
790                                 }
791                                 unmarshalErr := err.(*xdr.UnmarshalError)
792                                 if os.IsTimeout(unmarshalErr.Err) {
793                                         continue
794                                 }
795                                 if unmarshalErr.ErrorCode == xdr.ErrIO {
796                                         break
797                                 }
798                                 state.Ctx.LogE("sp-recv", les, err, "")
799                                 break
800                         }
801                         state.Ctx.LogD(
802                                 "sp-recv",
803                                 append(les, LE{"Size", len(payload)}),
804                                 "got payload",
805                         )
806                         payload, err = state.csTheir.Decrypt(nil, nil, payload)
807                         if err != nil {
808                                 state.Ctx.LogE("sp-recv", les, err, "")
809                                 break
810                         }
811                         state.Ctx.LogD(
812                                 "sp-recv",
813                                 append(les, LE{"Size", len(payload)}),
814                                 "processing",
815                         )
816                         replies, err := state.ProcessSP(payload)
817                         if err != nil {
818                                 state.Ctx.LogE("sp-recv", les, err, "")
819                                 break
820                         }
821                         state.wg.Add(1)
822                         go func() {
823                                 for _, reply := range replies {
824                                         state.Ctx.LogD(
825                                                 "sp-recv",
826                                                 append(les, LE{"Size", len(reply)}),
827                                                 "queuing reply",
828                                         )
829                                         state.payloads <- reply
830                                 }
831                                 state.wg.Done()
832                         }()
833                         if state.rxRate > 0 {
834                                 time.Sleep(time.Second / time.Duration(state.rxRate))
835                         }
836                 }
837                 state.SetDead()
838                 state.wg.Done()
839                 state.SetDead()
840                 conn.Close() // #nosec G104
841                 for _, s := range state.fds {
842                         s.fd.Close()
843                 }
844         }()
845
846         return nil
847 }
848
849 func (state *SPState) Wait() {
850         state.wg.Wait()
851         close(state.payloads)
852         close(state.pings)
853         state.dirUnlock()
854         state.Duration = time.Now().Sub(state.started)
855         state.RxSpeed = state.RxBytes
856         state.TxSpeed = state.TxBytes
857         rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
858         txDuration := int64(state.TxLastSeen.Sub(state.started).Seconds())
859         if rxDuration > 0 {
860                 state.RxSpeed = state.RxBytes / rxDuration
861         }
862         if txDuration > 0 {
863                 state.TxSpeed = state.TxBytes / txDuration
864         }
865 }
866
867 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
868         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
869         r := bytes.NewReader(payload)
870         var err error
871         var replies [][]byte
872         var infosGot bool
873         for r.Len() > 0 {
874                 state.Ctx.LogD("sp-process", les, "unmarshaling header")
875                 var head SPHead
876                 if _, err = xdr.Unmarshal(r, &head); err != nil {
877                         state.Ctx.LogE("sp-process", les, err, "")
878                         return nil, err
879                 }
880                 if head.Type != SPTypePing {
881                         state.RxLastNonPing = state.RxLastSeen
882                 }
883                 switch head.Type {
884                 case SPTypeHalt:
885                         state.Ctx.LogD("sp-process", append(les, LE{"Type", "halt"}), "")
886                         state.Lock()
887                         state.queueTheir = nil
888                         state.Unlock()
889                 case SPTypePing:
890                         state.Ctx.LogD("sp-process", append(les, LE{"Type", "ping"}), "")
891                 case SPTypeInfo:
892                         infosGot = true
893                         lesp := append(les, LE{"Type", "info"})
894                         state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
895                         var info SPInfo
896                         if _, err = xdr.Unmarshal(r, &info); err != nil {
897                                 state.Ctx.LogE("sp-process", lesp, err, "")
898                                 return nil, err
899                         }
900                         lesp = append(lesp, LEs{
901                                 {"Pkt", Base32Codec.EncodeToString(info.Hash[:])},
902                                 {"Size", int64(info.Size)},
903                                 {"Nice", int(info.Nice)},
904                         }...)
905                         if !state.listOnly && info.Nice > state.Nice {
906                                 state.Ctx.LogD("sp-process", lesp, "too nice")
907                                 continue
908                         }
909                         state.Ctx.LogD("sp-process", lesp, "received")
910                         if !state.listOnly && state.xxOnly == TTx {
911                                 continue
912                         }
913                         state.Lock()
914                         state.infosTheir[*info.Hash] = &info
915                         state.Unlock()
916                         state.Ctx.LogD("sp-process", lesp, "stating part")
917                         pktPath := filepath.Join(
918                                 state.Ctx.Spool,
919                                 state.Node.Id.String(),
920                                 string(TRx),
921                                 Base32Codec.EncodeToString(info.Hash[:]),
922                         )
923                         if _, err = os.Stat(pktPath); err == nil {
924                                 state.Ctx.LogI("sp-info", lesp, "already done")
925                                 if !state.listOnly {
926                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
927                                 }
928                                 continue
929                         }
930                         if _, err = os.Stat(pktPath + SeenSuffix); err == nil {
931                                 state.Ctx.LogI("sp-info", lesp, "already seen")
932                                 if !state.listOnly {
933                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
934                                 }
935                                 continue
936                         }
937                         fi, err := os.Stat(pktPath + PartSuffix)
938                         var offset int64
939                         if err == nil {
940                                 offset = fi.Size()
941                         }
942                         if !state.Ctx.IsEnoughSpace(int64(info.Size) - offset) {
943                                 state.Ctx.LogI("sp-info", lesp, "not enough space")
944                                 continue
945                         }
946                         state.Ctx.LogI("sp-info", append(lesp, LE{"Offset", offset}), "")
947                         if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) {
948                                 replies = append(replies, MarshalSP(
949                                         SPTypeFreq,
950                                         SPFreq{info.Hash, uint64(offset)},
951                                 ))
952                         }
953                 case SPTypeFile:
954                         lesp := append(les, LE{"Type", "file"})
955                         state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
956                         var file SPFile
957                         if _, err = xdr.Unmarshal(r, &file); err != nil {
958                                 state.Ctx.LogE("sp-process", lesp, err, "")
959                                 return nil, err
960                         }
961                         lesp = append(lesp, LEs{
962                                 {"XX", string(TRx)},
963                                 {"Pkt", Base32Codec.EncodeToString(file.Hash[:])},
964                                 {"Size", len(file.Payload)},
965                         }...)
966                         dirToSync := filepath.Join(
967                                 state.Ctx.Spool,
968                                 state.Node.Id.String(),
969                                 string(TRx),
970                         )
971                         filePath := filepath.Join(dirToSync, Base32Codec.EncodeToString(file.Hash[:]))
972                         filePathPart := filePath + PartSuffix
973                         state.Ctx.LogD("sp-file", lesp, "opening part")
974                         fdAndFullSize, exists := state.fds[filePathPart]
975                         var fd *os.File
976                         if exists {
977                                 fd = fdAndFullSize.fd
978                         } else {
979                                 fd, err = os.OpenFile(
980                                         filePathPart,
981                                         os.O_RDWR|os.O_CREATE,
982                                         os.FileMode(0666),
983                                 )
984                                 if err != nil {
985                                         state.Ctx.LogE("sp-file", lesp, err, "")
986                                         return nil, err
987                                 }
988                                 state.fds[filePathPart] = FdAndFullSize{fd: fd}
989                         }
990                         state.Ctx.LogD("sp-file", append(lesp, LE{"Offset", file.Offset}), "seeking")
991                         if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
992                                 state.Ctx.LogE("sp-file", lesp, err, "")
993                                 state.closeFd(filePathPart)
994                                 return nil, err
995                         }
996                         state.Ctx.LogD("sp-file", lesp, "writing")
997                         if _, err = fd.Write(file.Payload); err != nil {
998                                 state.Ctx.LogE("sp-file", lesp, err, "")
999                                 state.closeFd(filePathPart)
1000                                 return nil, err
1001                         }
1002                         ourSize := int64(file.Offset + uint64(len(file.Payload)))
1003                         lesp[len(lesp)-1].V = ourSize
1004                         fullsize := int64(0)
1005                         state.RLock()
1006                         infoTheir, ok := state.infosTheir[*file.Hash]
1007                         state.RUnlock()
1008                         if ok {
1009                                 fullsize = int64(infoTheir.Size)
1010                         }
1011                         lesp = append(lesp, LE{"FullSize", fullsize})
1012                         if state.Ctx.ShowPrgrs {
1013                                 Progress("Rx", lesp)
1014                         }
1015                         if fullsize != ourSize {
1016                                 continue
1017                         }
1018                         <-spCheckerToken
1019                         go func() {
1020                                 defer func() {
1021                                         spCheckerToken <- struct{}{}
1022                                 }()
1023                                 if err := fd.Sync(); err != nil {
1024                                         state.Ctx.LogE("sp-file", lesp, err, "sync")
1025                                         state.closeFd(filePathPart)
1026                                         return
1027                                 }
1028                                 state.wg.Add(1)
1029                                 defer state.wg.Done()
1030                                 if _, err = fd.Seek(0, io.SeekStart); err != nil {
1031                                         state.closeFd(filePathPart)
1032                                         state.Ctx.LogE("sp-file", lesp, err, "")
1033                                         return
1034                                 }
1035                                 state.Ctx.LogD("sp-file", lesp, "checking")
1036                                 gut, err := Check(fd, file.Hash[:], lesp, state.Ctx.ShowPrgrs)
1037                                 state.closeFd(filePathPart)
1038                                 if err != nil || !gut {
1039                                         state.Ctx.LogE("sp-file", lesp, errors.New("checksum mismatch"), "")
1040                                         return
1041                                 }
1042                                 state.Ctx.LogI("sp-done", lesp, "")
1043                                 if err = os.Rename(filePath+PartSuffix, filePath); err != nil {
1044                                         state.Ctx.LogE("sp-file", lesp, err, "rename")
1045                                         return
1046                                 }
1047                                 if err = DirSync(dirToSync); err != nil {
1048                                         state.Ctx.LogE("sp-file", lesp, err, "sync")
1049                                         return
1050                                 }
1051                                 state.Lock()
1052                                 delete(state.infosTheir, *file.Hash)
1053                                 state.Unlock()
1054                                 state.wg.Add(1)
1055                                 go func() {
1056                                         state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
1057                                         state.wg.Done()
1058                                 }()
1059                         }()
1060                 case SPTypeDone:
1061                         lesp := append(les, LE{"Type", "done"})
1062                         state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
1063                         var done SPDone
1064                         if _, err = xdr.Unmarshal(r, &done); err != nil {
1065                                 state.Ctx.LogE("sp-process", lesp, err, "")
1066                                 return nil, err
1067                         }
1068                         lesp = append(lesp, LE{"Pkt", Base32Codec.EncodeToString(done.Hash[:])})
1069                         state.Ctx.LogD("sp-done", lesp, "removing")
1070                         err := os.Remove(filepath.Join(
1071                                 state.Ctx.Spool,
1072                                 state.Node.Id.String(),
1073                                 string(TTx),
1074                                 Base32Codec.EncodeToString(done.Hash[:]),
1075                         ))
1076                         lesp = append(lesp, LE{"XX", string(TTx)})
1077                         if err == nil {
1078                                 state.Ctx.LogI("sp-done", lesp, "")
1079                         } else {
1080                                 state.Ctx.LogE("sp-done", lesp, err, "")
1081                         }
1082                 case SPTypeFreq:
1083                         lesp := append(les, LE{"Type", "freq"})
1084                         state.Ctx.LogD("sp-process", lesp, "unmarshaling packet")
1085                         var freq SPFreq
1086                         if _, err = xdr.Unmarshal(r, &freq); err != nil {
1087                                 state.Ctx.LogE("sp-process", lesp, err, "")
1088                                 return nil, err
1089                         }
1090                         lesp = append(lesp, LE{"Pkt", Base32Codec.EncodeToString(freq.Hash[:])})
1091                         lesp = append(lesp, LE{"Offset", freq.Offset})
1092                         state.Ctx.LogD("sp-process", lesp, "queueing")
1093                         nice, exists := state.infosOurSeen[*freq.Hash]
1094                         if exists {
1095                                 if state.onlyPkts == nil || !state.onlyPkts[*freq.Hash] {
1096                                         state.Lock()
1097                                         insertIdx := 0
1098                                         var freqWithNice *FreqWithNice
1099                                         for insertIdx, freqWithNice = range state.queueTheir {
1100                                                 if freqWithNice.nice > nice {
1101                                                         break
1102                                                 }
1103                                         }
1104                                         state.queueTheir = append(state.queueTheir, nil)
1105                                         copy(state.queueTheir[insertIdx+1:], state.queueTheir[insertIdx:])
1106                                         state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice}
1107                                         state.Unlock()
1108                                 } else {
1109                                         state.Ctx.LogD("sp-process", lesp, "skipping")
1110                                 }
1111                         } else {
1112                                 state.Ctx.LogD("sp-process", lesp, "unknown")
1113                         }
1114                 default:
1115                         state.Ctx.LogE(
1116                                 "sp-process",
1117                                 append(les, LE{"Type", head.Type}),
1118                                 errors.New("unknown type"),
1119                                 "",
1120                         )
1121                         return nil, BadPktType
1122                 }
1123         }
1124         if infosGot {
1125                 var pkts int
1126                 var size uint64
1127                 state.RLock()
1128                 for _, info := range state.infosTheir {
1129                         pkts++
1130                         size += info.Size
1131                 }
1132                 state.RUnlock()
1133                 state.Ctx.LogI("sp-infos", LEs{
1134                         {"XX", string(TRx)},
1135                         {"Node", state.Node.Id},
1136                         {"Pkts", pkts},
1137                         {"Size", int64(size)},
1138                 }, "")
1139         }
1140         return payloadsSplit(replies), nil
1141 }