]> Cypherpunks.ru repositories - nncp.git/blob - src/sp.go
MTH
[nncp.git] / src / sp.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bytes"
22         "crypto/subtle"
23         "errors"
24         "fmt"
25         "io"
26         "os"
27         "path/filepath"
28         "sort"
29         "sync"
30         "time"
31
32         xdr "github.com/davecgh/go-xdr/xdr2"
33         "github.com/dustin/go-humanize"
34         "github.com/flynn/noise"
35 )
36
37 const (
38         MaxSPSize      = 1<<16 - 256
39         PartSuffix     = ".part"
40         SPHeadOverhead = 4
41 )
42
43 type MTHAndOffset struct {
44         mth    *MTH
45         offset uint64
46 }
47
48 type SPCheckerTask struct {
49         nodeId *NodeId
50         hsh    *[MTHSize]byte
51         mth    *MTH
52         done   chan []byte
53 }
54
55 var (
56         MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1}
57
58         SPInfoOverhead    int
59         SPFreqOverhead    int
60         SPFileOverhead    int
61         SPHaltMarshalized []byte
62         SPPingMarshalized []byte
63
64         NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite(
65                 noise.DH25519,
66                 noise.CipherChaChaPoly,
67                 noise.HashBLAKE2b,
68         )
69
70         DefaultDeadline = 10 * time.Second
71         PingTimeout     = time.Minute
72
73         spCheckerTasks chan SPCheckerTask
74         SPCheckerWg    sync.WaitGroup
75         spCheckerOnce  sync.Once
76 )
77
78 type FdAndFullSize struct {
79         fd       *os.File
80         fullSize int64
81 }
82
83 type SPType uint8
84
85 const (
86         SPTypeInfo SPType = iota
87         SPTypeFreq SPType = iota
88         SPTypeFile SPType = iota
89         SPTypeDone SPType = iota
90         SPTypeHalt SPType = iota
91         SPTypePing SPType = iota
92 )
93
94 type SPHead struct {
95         Type SPType
96 }
97
98 type SPInfo struct {
99         Nice uint8
100         Size uint64
101         Hash *[MTHSize]byte
102 }
103
104 type SPFreq struct {
105         Hash   *[MTHSize]byte
106         Offset uint64
107 }
108
109 type SPFile struct {
110         Hash    *[MTHSize]byte
111         Offset  uint64
112         Payload []byte
113 }
114
115 type SPDone struct {
116         Hash *[MTHSize]byte
117 }
118
119 type SPRaw struct {
120         Magic   [8]byte
121         Payload []byte
122 }
123
124 type FreqWithNice struct {
125         freq *SPFreq
126         nice uint8
127 }
128
129 type ConnDeadlined interface {
130         io.ReadWriteCloser
131         SetReadDeadline(t time.Time) error
132         SetWriteDeadline(t time.Time) error
133 }
134
135 func init() {
136         var buf bytes.Buffer
137         spHead := SPHead{Type: SPTypeHalt}
138         if _, err := xdr.Marshal(&buf, spHead); err != nil {
139                 panic(err)
140         }
141         SPHaltMarshalized = make([]byte, SPHeadOverhead)
142         copy(SPHaltMarshalized, buf.Bytes())
143         buf.Reset()
144
145         spHead = SPHead{Type: SPTypePing}
146         if _, err := xdr.Marshal(&buf, spHead); err != nil {
147                 panic(err)
148         }
149         SPPingMarshalized = make([]byte, SPHeadOverhead)
150         copy(SPPingMarshalized, buf.Bytes())
151         buf.Reset()
152
153         spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([MTHSize]byte)}
154         if _, err := xdr.Marshal(&buf, spInfo); err != nil {
155                 panic(err)
156         }
157         SPInfoOverhead = buf.Len()
158         buf.Reset()
159
160         spFreq := SPFreq{Hash: new([MTHSize]byte), Offset: 123}
161         if _, err := xdr.Marshal(&buf, spFreq); err != nil {
162                 panic(err)
163         }
164         SPFreqOverhead = buf.Len()
165         buf.Reset()
166
167         spFile := SPFile{Hash: new([MTHSize]byte), Offset: 123}
168         if _, err := xdr.Marshal(&buf, spFile); err != nil {
169                 panic(err)
170         }
171         SPFileOverhead = buf.Len()
172         spCheckerTasks = make(chan SPCheckerTask)
173 }
174
175 func MarshalSP(typ SPType, sp interface{}) []byte {
176         var buf bytes.Buffer
177         if _, err := xdr.Marshal(&buf, SPHead{typ}); err != nil {
178                 panic(err)
179         }
180         if _, err := xdr.Marshal(&buf, sp); err != nil {
181                 panic(err)
182         }
183         return buf.Bytes()
184 }
185
186 func payloadsSplit(payloads [][]byte) [][]byte {
187         var outbounds [][]byte
188         outbound := make([]byte, 0, MaxSPSize)
189         for i, payload := range payloads {
190                 outbound = append(outbound, payload...)
191                 if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxSPSize {
192                         outbounds = append(outbounds, outbound)
193                         outbound = make([]byte, 0, MaxSPSize)
194                 }
195         }
196         if len(outbound) > 0 {
197                 outbounds = append(outbounds, outbound)
198         }
199         return outbounds
200 }
201
202 type SPState struct {
203         Ctx            *Ctx
204         Node           *Node
205         Nice           uint8
206         NoCK           bool
207         onlineDeadline time.Duration
208         maxOnlineTime  time.Duration
209         hs             *noise.HandshakeState
210         csOur          *noise.CipherState
211         csTheir        *noise.CipherState
212         payloads       chan []byte
213         pings          chan struct{}
214         infosTheir     map[[MTHSize]byte]*SPInfo
215         infosOurSeen   map[[MTHSize]byte]uint8
216         queueTheir     []*FreqWithNice
217         wg             sync.WaitGroup
218         RxBytes        int64
219         RxLastSeen     time.Time
220         RxLastNonPing  time.Time
221         TxBytes        int64
222         TxLastSeen     time.Time
223         TxLastNonPing  time.Time
224         started        time.Time
225         mustFinishAt   time.Time
226         Duration       time.Duration
227         RxSpeed        int64
228         TxSpeed        int64
229         rxLock         *os.File
230         txLock         *os.File
231         xxOnly         TRxTx
232         rxRate         int
233         txRate         int
234         isDead         chan struct{}
235         listOnly       bool
236         onlyPkts       map[[MTHSize]byte]bool
237         writeSPBuf     bytes.Buffer
238         fds            map[string]FdAndFullSize
239         fdsLock        sync.RWMutex
240         fileHashers    map[string]*MTHAndOffset
241         progressBars   map[string]struct{}
242         sync.RWMutex
243 }
244
245 func (state *SPState) SetDead() {
246         state.Lock()
247         defer state.Unlock()
248         select {
249         case <-state.isDead:
250                 // Already closed channel, dead
251                 return
252         default:
253         }
254         close(state.isDead)
255         go func() {
256                 for range state.payloads {
257                 }
258         }()
259         go func() {
260                 for range state.pings {
261                 }
262         }()
263 }
264
265 func (state *SPState) NotAlive() bool {
266         select {
267         case <-state.isDead:
268                 return true
269         default:
270         }
271         return false
272 }
273
274 func (state *SPState) dirUnlock() {
275         state.Ctx.UnlockDir(state.rxLock)
276         state.Ctx.UnlockDir(state.txLock)
277 }
278
279 func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error {
280         state.writeSPBuf.Reset()
281         n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{
282                 Magic:   MagicNNCPLv1,
283                 Payload: payload,
284         })
285         if err != nil {
286                 return err
287         }
288         if n, err = dst.Write(state.writeSPBuf.Bytes()); err == nil {
289                 state.TxLastSeen = time.Now()
290                 state.TxBytes += int64(n)
291                 if !ping {
292                         state.TxLastNonPing = state.TxLastSeen
293                 }
294         }
295         return err
296 }
297
298 func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
299         var sp SPRaw
300         n, err := xdr.UnmarshalLimited(src, &sp, 1<<17)
301         if err != nil {
302                 ue := err.(*xdr.UnmarshalError)
303                 if ue.Err == io.EOF {
304                         return nil, ue.Err
305                 }
306                 return nil, err
307         }
308         state.RxLastSeen = time.Now()
309         state.RxBytes += int64(n)
310         if sp.Magic != MagicNNCPLv1 {
311                 return nil, BadMagic
312         }
313         return sp.Payload, nil
314 }
315
316 func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[MTHSize]byte]uint8) [][]byte {
317         var infos []*SPInfo
318         var totalSize int64
319         for job := range ctx.Jobs(nodeId, TTx) {
320                 if job.PktEnc.Nice > nice {
321                         continue
322                 }
323                 if _, known := (*seen)[*job.HshValue]; known {
324                         continue
325                 }
326                 totalSize += job.Size
327                 infos = append(infos, &SPInfo{
328                         Nice: job.PktEnc.Nice,
329                         Size: uint64(job.Size),
330                         Hash: job.HshValue,
331                 })
332                 (*seen)[*job.HshValue] = job.PktEnc.Nice
333         }
334         sort.Sort(ByNice(infos))
335         var payloads [][]byte
336         for _, info := range infos {
337                 payloads = append(payloads, MarshalSP(SPTypeInfo, info))
338                 pktName := Base32Codec.EncodeToString(info.Hash[:])
339                 ctx.LogD("sp-info-our", LEs{
340                         {"Node", nodeId},
341                         {"Name", pktName},
342                         {"Size", info.Size},
343                 }, func(les LEs) string {
344                         return fmt.Sprintf(
345                                 "Our info: %s/tx/%s (%s)",
346                                 ctx.NodeName(nodeId),
347                                 pktName,
348                                 humanize.IBytes(info.Size),
349                         )
350                 })
351         }
352         if totalSize > 0 {
353                 ctx.LogI("sp-infos-tx", LEs{
354                         {"XX", string(TTx)},
355                         {"Node", nodeId},
356                         {"Pkts", len(payloads)},
357                         {"Size", totalSize},
358                 }, func(les LEs) string {
359                         return fmt.Sprintf(
360                                 "We have got for %s: %d packets, %s",
361                                 ctx.NodeName(nodeId),
362                                 len(payloads),
363                                 humanize.IBytes(uint64(totalSize)),
364                         )
365                 })
366         }
367         return payloadsSplit(payloads)
368 }
369
370 func (state *SPState) StartI(conn ConnDeadlined) error {
371         nodeId := state.Node.Id
372         err := state.Ctx.ensureRxDir(nodeId)
373         if err != nil {
374                 return err
375         }
376         var rxLock *os.File
377         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TRx) {
378                 rxLock, err = state.Ctx.LockDir(nodeId, string(TRx))
379                 if err != nil {
380                         return err
381                 }
382         }
383         var txLock *os.File
384         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
385                 txLock, err = state.Ctx.LockDir(nodeId, string(TTx))
386                 if err != nil {
387                         return err
388                 }
389         }
390         started := time.Now()
391         conf := noise.Config{
392                 CipherSuite: NoiseCipherSuite,
393                 Pattern:     noise.HandshakeIK,
394                 Initiator:   true,
395                 StaticKeypair: noise.DHKey{
396                         Private: state.Ctx.Self.NoisePrv[:],
397                         Public:  state.Ctx.Self.NoisePub[:],
398                 },
399                 PeerStatic: state.Node.NoisePub[:],
400         }
401         hs, err := noise.NewHandshakeState(conf)
402         if err != nil {
403                 return err
404         }
405         state.hs = hs
406         state.payloads = make(chan []byte)
407         state.pings = make(chan struct{})
408         state.infosTheir = make(map[[MTHSize]byte]*SPInfo)
409         state.infosOurSeen = make(map[[MTHSize]byte]uint8)
410         state.progressBars = make(map[string]struct{})
411         state.started = started
412         state.rxLock = rxLock
413         state.txLock = txLock
414
415         var infosPayloads [][]byte
416         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
417                 infosPayloads = state.Ctx.infosOur(nodeId, state.Nice, &state.infosOurSeen)
418         }
419         var firstPayload []byte
420         if len(infosPayloads) > 0 {
421                 firstPayload = infosPayloads[0]
422         }
423         // Pad first payload, to hide actual number of existing files
424         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
425                 firstPayload = append(firstPayload, SPHaltMarshalized...)
426         }
427
428         var buf []byte
429         var payload []byte
430         buf, _, _, err = state.hs.WriteMessage(nil, firstPayload)
431         if err != nil {
432                 state.dirUnlock()
433                 return err
434         }
435         les := LEs{{"Node", nodeId}, {"Nice", int(state.Nice)}}
436         state.Ctx.LogD("sp-startI", les, func(les LEs) string {
437                 return fmt.Sprintf(
438                         "SP with %s (nice %s): sending first message",
439                         state.Node.Name,
440                         NicenessFmt(state.Nice),
441                 )
442         })
443         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
444         if err = state.WriteSP(conn, buf, false); err != nil {
445                 state.Ctx.LogE("sp-startI", les, err, func(les LEs) string {
446                         return fmt.Sprintf(
447                                 "SP with %s (nice %s): writing",
448                                 state.Node.Name,
449                                 NicenessFmt(state.Nice),
450                         )
451                 })
452                 state.dirUnlock()
453                 return err
454         }
455         state.Ctx.LogD("sp-startI-wait", les, func(les LEs) string {
456                 return fmt.Sprintf(
457                         "SP with %s (nice %s): waiting for first message",
458                         state.Node.Name,
459                         NicenessFmt(state.Nice),
460                 )
461         })
462         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
463         if buf, err = state.ReadSP(conn); err != nil {
464                 state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string {
465                         return fmt.Sprintf(
466                                 "SP with %s (nice %s): reading",
467                                 state.Node.Name,
468                                 NicenessFmt(state.Nice),
469                         )
470                 })
471                 state.dirUnlock()
472                 return err
473         }
474         payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
475         if err != nil {
476                 state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string {
477                         return fmt.Sprintf(
478                                 "SP with %s (nice %s): reading Noise message",
479                                 state.Node.Name,
480                                 NicenessFmt(state.Nice),
481                         )
482                 })
483                 state.dirUnlock()
484                 return err
485         }
486         state.Ctx.LogD("sp-startI-workers", les, func(les LEs) string {
487                 return fmt.Sprintf(
488                         "SP with %s (nice %s): starting workers",
489                         state.Node.Name,
490                         NicenessFmt(state.Nice),
491                 )
492         })
493         err = state.StartWorkers(conn, infosPayloads, payload)
494         if err != nil {
495                 state.Ctx.LogE("sp-startI-workers", les, err, func(les LEs) string {
496                         return fmt.Sprintf(
497                                 "SP with %s (nice %s): starting workers",
498                                 state.Node.Name,
499                                 NicenessFmt(state.Nice),
500                         )
501                 })
502                 state.dirUnlock()
503         }
504         return err
505 }
506
507 func (state *SPState) StartR(conn ConnDeadlined) error {
508         started := time.Now()
509         conf := noise.Config{
510                 CipherSuite: NoiseCipherSuite,
511                 Pattern:     noise.HandshakeIK,
512                 Initiator:   false,
513                 StaticKeypair: noise.DHKey{
514                         Private: state.Ctx.Self.NoisePrv[:],
515                         Public:  state.Ctx.Self.NoisePub[:],
516                 },
517         }
518         hs, err := noise.NewHandshakeState(conf)
519         if err != nil {
520                 return err
521         }
522         xxOnly := TRxTx("")
523         state.hs = hs
524         state.payloads = make(chan []byte)
525         state.pings = make(chan struct{})
526         state.infosOurSeen = make(map[[MTHSize]byte]uint8)
527         state.infosTheir = make(map[[MTHSize]byte]*SPInfo)
528         state.progressBars = make(map[string]struct{})
529         state.started = started
530         state.xxOnly = xxOnly
531
532         var buf []byte
533         var payload []byte
534         logMsg := func(les LEs) string {
535                 return fmt.Sprintf(
536                         "SP nice %s: waiting for first message",
537                         NicenessFmt(state.Nice),
538                 )
539         }
540         les := LEs{{"Nice", int(state.Nice)}}
541         state.Ctx.LogD("sp-startR", les, logMsg)
542         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
543         if buf, err = state.ReadSP(conn); err != nil {
544                 state.Ctx.LogE("sp-startR-read", les, err, logMsg)
545                 return err
546         }
547         if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
548                 state.Ctx.LogE("sp-startR-read", les, err, logMsg)
549                 return err
550         }
551
552         var node *Node
553         for _, n := range state.Ctx.Neigh {
554                 if subtle.ConstantTimeCompare(state.hs.PeerStatic(), n.NoisePub[:]) == 1 {
555                         node = n
556                         break
557                 }
558         }
559         if node == nil {
560                 peerId := Base32Codec.EncodeToString(state.hs.PeerStatic())
561                 err = errors.New("unknown peer: " + peerId)
562                 state.Ctx.LogE("sp-startR-unknown", append(les, LE{"Peer", peerId}), err, logMsg)
563                 return err
564         }
565         state.Node = node
566         state.rxRate = node.RxRate
567         state.txRate = node.TxRate
568         state.onlineDeadline = node.OnlineDeadline
569         state.maxOnlineTime = node.MaxOnlineTime
570         les = LEs{{"Node", node.Id}, {"Nice", int(state.Nice)}}
571
572         if err = state.Ctx.ensureRxDir(node.Id); err != nil {
573                 return err
574         }
575         var rxLock *os.File
576         if xxOnly == "" || xxOnly == TRx {
577                 rxLock, err = state.Ctx.LockDir(node.Id, string(TRx))
578                 if err != nil {
579                         return err
580                 }
581         }
582         state.rxLock = rxLock
583         var txLock *os.File
584         if xxOnly == "" || xxOnly == TTx {
585                 txLock, err = state.Ctx.LockDir(node.Id, string(TTx))
586                 if err != nil {
587                         return err
588                 }
589         }
590         state.txLock = txLock
591
592         var infosPayloads [][]byte
593         if xxOnly == "" || xxOnly == TTx {
594                 infosPayloads = state.Ctx.infosOur(node.Id, state.Nice, &state.infosOurSeen)
595         }
596         var firstPayload []byte
597         if len(infosPayloads) > 0 {
598                 firstPayload = infosPayloads[0]
599         }
600         // Pad first payload, to hide actual number of existing files
601         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
602                 firstPayload = append(firstPayload, SPHaltMarshalized...)
603         }
604
605         state.Ctx.LogD("sp-startR-write", les, func(les LEs) string {
606                 return fmt.Sprintf(
607                         "SP with %s (nice %s): sending first message",
608                         node.Name, NicenessFmt(state.Nice),
609                 )
610         })
611         buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload)
612         if err != nil {
613                 state.dirUnlock()
614                 return err
615         }
616         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
617         if err = state.WriteSP(conn, buf, false); err != nil {
618                 state.Ctx.LogE("sp-startR-write", les, err, func(les LEs) string {
619                         return fmt.Sprintf(
620                                 "SP with %s (nice %s): writing",
621                                 node.Name, NicenessFmt(state.Nice),
622                         )
623                 })
624                 state.dirUnlock()
625                 return err
626         }
627         state.Ctx.LogD("sp-startR-workers", les, func(les LEs) string {
628                 return fmt.Sprintf(
629                         "SP with %s (nice %s): starting workers",
630                         node.Name, NicenessFmt(state.Nice),
631                 )
632         })
633         err = state.StartWorkers(conn, infosPayloads, payload)
634         if err != nil {
635                 state.dirUnlock()
636         }
637         return err
638 }
639
640 func (state *SPState) closeFd(pth string) {
641         state.fdsLock.Lock()
642         if s, exists := state.fds[pth]; exists {
643                 delete(state.fds, pth)
644                 s.fd.Close()
645         }
646         state.fdsLock.Unlock()
647 }
648
649 func (state *SPState) StartWorkers(
650         conn ConnDeadlined,
651         infosPayloads [][]byte,
652         payload []byte,
653 ) error {
654         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
655         state.fds = make(map[string]FdAndFullSize)
656         state.fileHashers = make(map[string]*MTHAndOffset)
657         state.isDead = make(chan struct{})
658         if state.maxOnlineTime > 0 {
659                 state.mustFinishAt = state.started.Add(state.maxOnlineTime)
660         }
661         if !state.NoCK {
662                 spCheckerOnce.Do(func() { go SPChecker(state.Ctx) })
663         }
664
665         // Remaining handshake payload sending
666         if len(infosPayloads) > 1 {
667                 state.wg.Add(1)
668                 go func() {
669                         for _, payload := range infosPayloads[1:] {
670                                 state.Ctx.LogD(
671                                         "sp-queue-remaining",
672                                         append(les, LE{"Size", int64(len(payload))}),
673                                         func(les LEs) string {
674                                                 return fmt.Sprintf(
675                                                         "SP with %s (nice %s): queuing remaining payload (%s)",
676                                                         state.Node.Name, NicenessFmt(state.Nice),
677                                                         humanize.IBytes(uint64(len(payload))),
678                                                 )
679                                         },
680                                 )
681                                 state.payloads <- payload
682                         }
683                         state.wg.Done()
684                 }()
685         }
686
687         // Processing of first payload and queueing its responses
688         logMsg := func(les LEs) string {
689                 return fmt.Sprintf(
690                         "SP with %s (nice %s): processing first payload (%s)",
691                         state.Node.Name, NicenessFmt(state.Nice),
692                         humanize.IBytes(uint64(len(payload))),
693                 )
694         }
695         state.Ctx.LogD("sp-process", append(les, LE{"Size", int64(len(payload))}), logMsg)
696         replies, err := state.ProcessSP(payload)
697         if err != nil {
698                 state.Ctx.LogE("sp-process", les, err, logMsg)
699                 return err
700         }
701         state.wg.Add(1)
702         go func() {
703                 for _, reply := range replies {
704                         state.Ctx.LogD(
705                                 "sp-queue-reply",
706                                 append(les, LE{"Size", int64(len(reply))}),
707                                 func(les LEs) string {
708                                         return fmt.Sprintf(
709                                                 "SP with %s (nice %s): queuing reply (%s)",
710                                                 state.Node.Name, NicenessFmt(state.Nice),
711                                                 humanize.IBytes(uint64(len(payload))),
712                                         )
713                                 },
714                         )
715                         state.payloads <- reply
716                 }
717                 state.wg.Done()
718         }()
719
720         // Periodic jobs
721         state.wg.Add(1)
722         go func() {
723                 deadlineTicker := time.NewTicker(time.Second)
724                 pingTicker := time.NewTicker(PingTimeout)
725                 for {
726                         select {
727                         case <-state.isDead:
728                                 state.wg.Done()
729                                 deadlineTicker.Stop()
730                                 pingTicker.Stop()
731                                 return
732                         case now := <-deadlineTicker.C:
733                                 if now.Sub(state.RxLastNonPing) >= state.onlineDeadline &&
734                                         now.Sub(state.TxLastNonPing) >= state.onlineDeadline {
735                                         goto Deadlined
736                                 }
737                                 if state.maxOnlineTime > 0 && state.mustFinishAt.Before(now) {
738                                         goto Deadlined
739                                 }
740                                 if now.Sub(state.RxLastSeen) >= 2*PingTimeout {
741                                         goto Deadlined
742                                 }
743                                 break
744                         Deadlined:
745                                 state.SetDead()
746                                 conn.Close() // #nosec G104
747                         case now := <-pingTicker.C:
748                                 if now.After(state.TxLastSeen.Add(PingTimeout)) {
749                                         state.wg.Add(1)
750                                         go func() {
751                                                 state.pings <- struct{}{}
752                                                 state.wg.Done()
753                                         }()
754                                 }
755                         }
756                 }
757         }()
758
759         // Spool checker and INFOs sender of appearing files
760         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
761                 state.wg.Add(1)
762                 go func() {
763                         ticker := time.NewTicker(time.Second)
764                         for {
765                                 select {
766                                 case <-state.isDead:
767                                         state.wg.Done()
768                                         ticker.Stop()
769                                         return
770                                 case <-ticker.C:
771                                         for _, payload := range state.Ctx.infosOur(
772                                                 state.Node.Id,
773                                                 state.Nice,
774                                                 &state.infosOurSeen,
775                                         ) {
776                                                 state.Ctx.LogD(
777                                                         "sp-queue-info",
778                                                         append(les, LE{"Size", int64(len(payload))}),
779                                                         func(les LEs) string {
780                                                                 return fmt.Sprintf(
781                                                                         "SP with %s (nice %s): queuing new info (%s)",
782                                                                         state.Node.Name, NicenessFmt(state.Nice),
783                                                                         humanize.IBytes(uint64(len(payload))),
784                                                                 )
785                                                         },
786                                                 )
787                                                 state.payloads <- payload
788                                         }
789                                 }
790                         }
791                 }()
792         }
793
794         // Sender
795         state.wg.Add(1)
796         go func() {
797                 defer conn.Close()
798                 defer state.SetDead()
799                 defer state.wg.Done()
800                 for {
801                         if state.NotAlive() {
802                                 return
803                         }
804                         var payload []byte
805                         var ping bool
806                         select {
807                         case <-state.pings:
808                                 state.Ctx.LogD("sp-got-ping", les, func(les LEs) string {
809                                         return fmt.Sprintf(
810                                                 "SP with %s (nice %s): got ping",
811                                                 state.Node.Name, NicenessFmt(state.Nice),
812                                         )
813                                 })
814                                 payload = SPPingMarshalized
815                                 ping = true
816                         case payload = <-state.payloads:
817                                 state.Ctx.LogD(
818                                         "sp-got-payload",
819                                         append(les, LE{"Size", int64(len(payload))}),
820                                         func(les LEs) string {
821                                                 return fmt.Sprintf(
822                                                         "SP with %s (nice %s): got payload (%s)",
823                                                         state.Node.Name, NicenessFmt(state.Nice),
824                                                         humanize.IBytes(uint64(len(payload))),
825                                                 )
826                                         },
827                                 )
828                         default:
829                                 state.RLock()
830                                 if len(state.queueTheir) == 0 {
831                                         state.RUnlock()
832                                         time.Sleep(100 * time.Millisecond)
833                                         continue
834                                 }
835                                 freq := state.queueTheir[0].freq
836                                 state.RUnlock()
837                                 if state.txRate > 0 {
838                                         time.Sleep(time.Second / time.Duration(state.txRate))
839                                 }
840                                 pktName := Base32Codec.EncodeToString(freq.Hash[:])
841                                 lesp := append(
842                                         les,
843                                         LE{"XX", string(TTx)},
844                                         LE{"Pkt", pktName},
845                                         LE{"Size", int64(freq.Offset)},
846                                 )
847                                 logMsg := func(les LEs) string {
848                                         return fmt.Sprintf(
849                                                 "SP with %s (nice %s): tx/%s (%s)",
850                                                 state.Node.Name, NicenessFmt(state.Nice),
851                                                 pktName,
852                                                 humanize.IBytes(freq.Offset),
853                                         )
854                                 }
855                                 state.Ctx.LogD("sp-queue", lesp, func(les LEs) string {
856                                         return logMsg(les) + ": queueing"
857                                 })
858                                 pth := filepath.Join(
859                                         state.Ctx.Spool,
860                                         state.Node.Id.String(),
861                                         string(TTx),
862                                         Base32Codec.EncodeToString(freq.Hash[:]),
863                                 )
864                                 state.fdsLock.RLock()
865                                 fdAndFullSize, exists := state.fds[pth]
866                                 state.fdsLock.RUnlock()
867                                 if !exists {
868                                         fd, err := os.Open(pth)
869                                         if err != nil {
870                                                 state.Ctx.LogE("sp-queue-open", lesp, err, func(les LEs) string {
871                                                         return logMsg(les) + ": opening"
872                                                 })
873                                                 return
874                                         }
875                                         fi, err := fd.Stat()
876                                         if err != nil {
877                                                 state.Ctx.LogE("sp-queue-stat", lesp, err, func(les LEs) string {
878                                                         return logMsg(les) + ": stating"
879                                                 })
880                                                 return
881                                         }
882                                         fdAndFullSize = FdAndFullSize{fd: fd, fullSize: fi.Size()}
883                                         state.fdsLock.Lock()
884                                         state.fds[pth] = fdAndFullSize
885                                         state.fdsLock.Unlock()
886                                 }
887                                 fd := fdAndFullSize.fd
888                                 fullSize := fdAndFullSize.fullSize
889                                 var buf []byte
890                                 if freq.Offset < uint64(fullSize) {
891                                         state.Ctx.LogD("sp-file-seek", lesp, func(les LEs) string {
892                                                 return logMsg(les) + ": seeking"
893                                         })
894                                         if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil {
895                                                 state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
896                                                         return logMsg(les) + ": seeking"
897                                                 })
898                                                 return
899                                         }
900                                         buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
901                                         n, err := fd.Read(buf)
902                                         if err != nil {
903                                                 state.Ctx.LogE("sp-file-read", lesp, err, func(les LEs) string {
904                                                         return logMsg(les) + ": reading"
905                                                 })
906                                                 return
907                                         }
908                                         buf = buf[:n]
909                                         lesp = append(
910                                                 les,
911                                                 LE{"XX", string(TTx)},
912                                                 LE{"Pkt", pktName},
913                                                 LE{"Size", int64(n)},
914                                         )
915                                         state.Ctx.LogD("sp-file-read", lesp, func(les LEs) string {
916                                                 return fmt.Sprintf(
917                                                         "%s: read %s",
918                                                         logMsg(les), humanize.IBytes(uint64(n)),
919                                                 )
920                                         })
921                                 }
922                                 state.closeFd(pth)
923                                 payload = MarshalSP(SPTypeFile, SPFile{
924                                         Hash:    freq.Hash,
925                                         Offset:  freq.Offset,
926                                         Payload: buf,
927                                 })
928                                 ourSize := freq.Offset + uint64(len(buf))
929                                 lesp = append(
930                                         les,
931                                         LE{"XX", string(TTx)},
932                                         LE{"Pkt", pktName},
933                                         LE{"Size", int64(ourSize)},
934                                         LE{"FullSize", fullSize},
935                                 )
936                                 if state.Ctx.ShowPrgrs {
937                                         state.progressBars[pktName] = struct{}{}
938                                         Progress("Tx", lesp)
939                                 }
940                                 state.Lock()
941                                 if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash {
942                                         if ourSize == uint64(fullSize) {
943                                                 state.Ctx.LogD("sp-file-finished", lesp, func(les LEs) string {
944                                                         return logMsg(les) + ": finished"
945                                                 })
946                                                 if len(state.queueTheir) > 1 {
947                                                         state.queueTheir = state.queueTheir[1:]
948                                                 } else {
949                                                         state.queueTheir = state.queueTheir[:0]
950                                                 }
951                                                 if state.Ctx.ShowPrgrs {
952                                                         delete(state.progressBars, pktName)
953                                                 }
954                                         } else {
955                                                 state.queueTheir[0].freq.Offset += uint64(len(buf))
956                                         }
957                                 } else {
958                                         state.Ctx.LogD("sp-file-disappeared", lesp, func(les LEs) string {
959                                                 return logMsg(les) + ": queue disappeared"
960                                         })
961                                 }
962                                 state.Unlock()
963                         }
964                         logMsg := func(les LEs) string {
965                                 return fmt.Sprintf(
966                                         "SP with %s (nice %s): sending %s",
967                                         state.Node.Name, NicenessFmt(state.Nice),
968                                         humanize.IBytes(uint64(len(payload))),
969                                 )
970                         }
971                         state.Ctx.LogD("sp-sending", append(les, LE{"Size", int64(len(payload))}), logMsg)
972                         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
973                         ct, err := state.csOur.Encrypt(nil, nil, payload)
974                         if err != nil {
975                                 state.Ctx.LogE("sp-encrypting", les, err, logMsg)
976                                 return
977                         }
978                         if err := state.WriteSP(conn, ct, ping); err != nil {
979                                 state.Ctx.LogE("sp-sending", les, err, logMsg)
980                                 return
981                         }
982                 }
983         }()
984
985         // Receiver
986         state.wg.Add(1)
987         go func() {
988                 for {
989                         if state.NotAlive() {
990                                 break
991                         }
992                         logMsg := func(les LEs) string {
993                                 return fmt.Sprintf(
994                                         "SP with %s (nice %s): waiting for payload",
995                                         state.Node.Name, NicenessFmt(state.Nice),
996                                 )
997                         }
998                         state.Ctx.LogD("sp-recv-wait", les, logMsg)
999                         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
1000                         payload, err := state.ReadSP(conn)
1001                         if err != nil {
1002                                 if err == io.EOF {
1003                                         break
1004                                 }
1005                                 unmarshalErr := err.(*xdr.UnmarshalError)
1006                                 if os.IsTimeout(unmarshalErr.Err) {
1007                                         continue
1008                                 }
1009                                 if unmarshalErr.ErrorCode == xdr.ErrIO {
1010                                         break
1011                                 }
1012                                 state.Ctx.LogE("sp-recv-wait", les, err, logMsg)
1013                                 break
1014                         }
1015                         logMsg = func(les LEs) string {
1016                                 return fmt.Sprintf(
1017                                         "SP with %s (nice %s): payload (%s)",
1018                                         state.Node.Name, NicenessFmt(state.Nice),
1019                                         humanize.IBytes(uint64(len(payload))),
1020                                 )
1021                         }
1022                         state.Ctx.LogD(
1023                                 "sp-recv-got",
1024                                 append(les, LE{"Size", int64(len(payload))}),
1025                                 func(les LEs) string { return logMsg(les) + ": got" },
1026                         )
1027                         payload, err = state.csTheir.Decrypt(nil, nil, payload)
1028                         if err != nil {
1029                                 state.Ctx.LogE("sp-recv-got", les, err, func(les LEs) string {
1030                                         return logMsg(les) + ": got"
1031                                 })
1032                                 break
1033                         }
1034                         state.Ctx.LogD(
1035                                 "sp-recv-process",
1036                                 append(les, LE{"Size", int64(len(payload))}),
1037                                 func(les LEs) string {
1038                                         return logMsg(les) + ": processing"
1039                                 },
1040                         )
1041                         replies, err := state.ProcessSP(payload)
1042                         if err != nil {
1043                                 state.Ctx.LogE("sp-recv-process", les, err, func(les LEs) string {
1044                                         return logMsg(les) + ": processing"
1045                                 })
1046                                 break
1047                         }
1048                         state.wg.Add(1)
1049                         go func() {
1050                                 for _, reply := range replies {
1051                                         state.Ctx.LogD(
1052                                                 "sp-recv-reply",
1053                                                 append(les[:len(les)-1], LE{"Size", int64(len(reply))}),
1054                                                 func(les LEs) string {
1055                                                         return fmt.Sprintf(
1056                                                                 "SP with %s (nice %s): queuing reply (%s)",
1057                                                                 state.Node.Name, NicenessFmt(state.Nice),
1058                                                                 humanize.IBytes(uint64(len(reply))),
1059                                                         )
1060                                                 },
1061                                         )
1062                                         state.payloads <- reply
1063                                 }
1064                                 state.wg.Done()
1065                         }()
1066                         if state.rxRate > 0 {
1067                                 time.Sleep(time.Second / time.Duration(state.rxRate))
1068                         }
1069                 }
1070                 state.SetDead()
1071                 state.wg.Done()
1072                 state.SetDead()
1073                 conn.Close() // #nosec G104
1074         }()
1075
1076         return nil
1077 }
1078
1079 func (state *SPState) Wait() {
1080         state.wg.Wait()
1081         close(state.payloads)
1082         close(state.pings)
1083         state.Duration = time.Now().Sub(state.started)
1084         state.dirUnlock()
1085         state.RxSpeed = state.RxBytes
1086         state.TxSpeed = state.TxBytes
1087         rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
1088         txDuration := int64(state.TxLastSeen.Sub(state.started).Seconds())
1089         if rxDuration > 0 {
1090                 state.RxSpeed = state.RxBytes / rxDuration
1091         }
1092         if txDuration > 0 {
1093                 state.TxSpeed = state.TxBytes / txDuration
1094         }
1095         for _, s := range state.fds {
1096                 s.fd.Close()
1097         }
1098         for pktName := range state.progressBars {
1099                 ProgressKill(pktName)
1100         }
1101 }
1102
1103 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
1104         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
1105         r := bytes.NewReader(payload)
1106         var err error
1107         var replies [][]byte
1108         var infosGot bool
1109         for r.Len() > 0 {
1110                 state.Ctx.LogD("sp-process-unmarshal", les, func(les LEs) string {
1111                         return fmt.Sprintf(
1112                                 "SP with %s (nice %s): unmarshaling header",
1113                                 state.Node.Name, NicenessFmt(state.Nice),
1114                         )
1115                 })
1116                 var head SPHead
1117                 if _, err = xdr.Unmarshal(r, &head); err != nil {
1118                         state.Ctx.LogE("sp-process-unmarshal", les, err, func(les LEs) string {
1119                                 return fmt.Sprintf(
1120                                         "SP with %s (nice %s): unmarshaling header",
1121                                         state.Node.Name, NicenessFmt(state.Nice),
1122                                 )
1123                         })
1124                         return nil, err
1125                 }
1126                 if head.Type != SPTypePing {
1127                         state.RxLastNonPing = state.RxLastSeen
1128                 }
1129                 switch head.Type {
1130                 case SPTypeHalt:
1131                         state.Ctx.LogD(
1132                                 "sp-process-halt",
1133                                 append(les, LE{"Type", "halt"}), func(les LEs) string {
1134                                         return fmt.Sprintf(
1135                                                 "SP with %s (nice %s): got HALT",
1136                                                 state.Node.Name, NicenessFmt(state.Nice),
1137                                         )
1138                                 },
1139                         )
1140                         state.Lock()
1141                         state.queueTheir = nil
1142                         state.Unlock()
1143
1144                 case SPTypePing:
1145                         state.Ctx.LogD(
1146                                 "sp-process-ping",
1147                                 append(les, LE{"Type", "ping"}),
1148                                 func(les LEs) string {
1149                                         return fmt.Sprintf(
1150                                                 "SP with %s (nice %s): got PING",
1151                                                 state.Node.Name, NicenessFmt(state.Nice),
1152                                         )
1153                                 },
1154                         )
1155
1156                 case SPTypeInfo:
1157                         infosGot = true
1158                         lesp := append(les, LE{"Type", "info"})
1159                         state.Ctx.LogD(
1160                                 "sp-process-info-unmarshal", lesp,
1161                                 func(les LEs) string {
1162                                         return fmt.Sprintf(
1163                                                 "SP with %s (nice %s): unmarshaling INFO",
1164                                                 state.Node.Name, NicenessFmt(state.Nice),
1165                                         )
1166                                 },
1167                         )
1168                         var info SPInfo
1169                         if _, err = xdr.Unmarshal(r, &info); err != nil {
1170                                 state.Ctx.LogE(
1171                                         "sp-process-info-unmarshal", lesp, err,
1172                                         func(les LEs) string {
1173                                                 return fmt.Sprintf(
1174                                                         "SP with %s (nice %s): unmarshaling INFO",
1175                                                         state.Node.Name, NicenessFmt(state.Nice),
1176                                                 )
1177                                         },
1178                                 )
1179                                 return nil, err
1180                         }
1181                         pktName := Base32Codec.EncodeToString(info.Hash[:])
1182                         lesp = append(
1183                                 lesp,
1184                                 LE{"Pkt", pktName},
1185                                 LE{"Size", int64(info.Size)},
1186                                 LE{"PktNice", int(info.Nice)},
1187                         )
1188                         logMsg := func(les LEs) string {
1189                                 return fmt.Sprintf(
1190                                         "SP with %s (nice %s): INFO %s (%s) nice %s",
1191                                         state.Node.Name, NicenessFmt(state.Nice),
1192                                         pktName,
1193                                         humanize.IBytes(info.Size),
1194                                         NicenessFmt(info.Nice),
1195                                 )
1196                         }
1197                         if !state.listOnly && info.Nice > state.Nice {
1198                                 state.Ctx.LogD("sp-process-info-too-nice", lesp, func(les LEs) string {
1199                                         return logMsg(les) + ": too nice"
1200                                 })
1201                                 continue
1202                         }
1203                         state.Ctx.LogD("sp-process-info-got", lesp, func(les LEs) string {
1204                                 return logMsg(les) + ": received"
1205                         })
1206                         if !state.listOnly && state.xxOnly == TTx {
1207                                 continue
1208                         }
1209                         state.Lock()
1210                         state.infosTheir[*info.Hash] = &info
1211                         state.Unlock()
1212                         state.Ctx.LogD("sp-process-info-stat", lesp, func(les LEs) string {
1213                                 return logMsg(les) + ": stating part"
1214                         })
1215                         pktPath := filepath.Join(
1216                                 state.Ctx.Spool,
1217                                 state.Node.Id.String(),
1218                                 string(TRx),
1219                                 Base32Codec.EncodeToString(info.Hash[:]),
1220                         )
1221                         logMsg = func(les LEs) string {
1222                                 return fmt.Sprintf(
1223                                         "Packet %s (%s) (nice %s)",
1224                                         pktName,
1225                                         humanize.IBytes(info.Size),
1226                                         NicenessFmt(info.Nice),
1227                                 )
1228                         }
1229                         if _, err = os.Stat(pktPath); err == nil {
1230                                 state.Ctx.LogI("sp-info-done", lesp, func(les LEs) string {
1231                                         return logMsg(les) + ": already done"
1232                                 })
1233                                 if !state.listOnly {
1234                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
1235                                 }
1236                                 continue
1237                         }
1238                         if _, err = os.Stat(pktPath + SeenSuffix); err == nil {
1239                                 state.Ctx.LogI("sp-info-seen", lesp, func(les LEs) string {
1240                                         return logMsg(les) + ": already seen"
1241                                 })
1242                                 if !state.listOnly {
1243                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
1244                                 }
1245                                 continue
1246                         }
1247                         if _, err = os.Stat(pktPath + NoCKSuffix); err == nil {
1248                                 state.Ctx.LogI("sp-info-nock", lesp, func(les LEs) string {
1249                                         return logMsg(les) + ": still not checksummed"
1250                                 })
1251                                 continue
1252                         }
1253                         fi, err := os.Stat(pktPath + PartSuffix)
1254                         var offset int64
1255                         if err == nil {
1256                                 offset = fi.Size()
1257                         }
1258                         if !state.Ctx.IsEnoughSpace(int64(info.Size) - offset) {
1259                                 state.Ctx.LogI("sp-info-no-space", lesp, func(les LEs) string {
1260                                         return logMsg(les) + ": not enough space"
1261                                 })
1262                                 continue
1263                         }
1264                         state.Ctx.LogI(
1265                                 "sp-info",
1266                                 append(lesp, LE{"Offset", offset}),
1267                                 func(les LEs) string {
1268                                         return fmt.Sprintf(
1269                                                 "%s: %d%%", logMsg(les), 100*uint64(offset)/info.Size,
1270                                         )
1271                                 },
1272                         )
1273                         if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) {
1274                                 replies = append(replies, MarshalSP(
1275                                         SPTypeFreq,
1276                                         SPFreq{info.Hash, uint64(offset)},
1277                                 ))
1278                         }
1279
1280                 case SPTypeFile:
1281                         lesp := append(les, LE{"Type", "file"})
1282                         state.Ctx.LogD("sp-process-file", lesp, func(les LEs) string {
1283                                 return fmt.Sprintf(
1284                                         "SP with %s (nice %s): unmarshaling FILE",
1285                                         state.Node.Name, NicenessFmt(state.Nice),
1286                                 )
1287                         })
1288                         var file SPFile
1289                         if _, err = xdr.Unmarshal(r, &file); err != nil {
1290                                 state.Ctx.LogE("sp-process-file", lesp, err, func(les LEs) string {
1291                                         return fmt.Sprintf(
1292                                                 "SP with %s (nice %s): unmarshaling FILE",
1293                                                 state.Node.Name, NicenessFmt(state.Nice),
1294                                         )
1295                                 })
1296                                 return nil, err
1297                         }
1298                         pktName := Base32Codec.EncodeToString(file.Hash[:])
1299                         lesp = append(
1300                                 lesp,
1301                                 LE{"XX", string(TRx)},
1302                                 LE{"Pkt", pktName},
1303                                 LE{"Size", int64(len(file.Payload))},
1304                         )
1305                         logMsg := func(les LEs) string {
1306                                 return fmt.Sprintf(
1307                                         "Got packet %s (%s)",
1308                                         pktName, humanize.IBytes(uint64(len(file.Payload))),
1309                                 )
1310                         }
1311                         fullsize := int64(0)
1312                         state.RLock()
1313                         infoTheir, ok := state.infosTheir[*file.Hash]
1314                         state.RUnlock()
1315                         if !ok {
1316                                 state.Ctx.LogE("sp-file-open", lesp, err, func(les LEs) string {
1317                                         return logMsg(les) + ": unknown file"
1318                                 })
1319                                 continue
1320                         }
1321                         fullsize = int64(infoTheir.Size)
1322                         lesp = append(lesp, LE{"FullSize", fullsize})
1323                         dirToSync := filepath.Join(
1324                                 state.Ctx.Spool,
1325                                 state.Node.Id.String(),
1326                                 string(TRx),
1327                         )
1328                         filePath := filepath.Join(dirToSync, pktName)
1329                         filePathPart := filePath + PartSuffix
1330                         state.Ctx.LogD("sp-file-open", lesp, func(les LEs) string {
1331                                 return logMsg(les) + ": opening part"
1332                         })
1333                         state.fdsLock.RLock()
1334                         fdAndFullSize, exists := state.fds[filePathPart]
1335                         state.fdsLock.RUnlock()
1336                         hasherAndOffset := state.fileHashers[filePath]
1337                         var fd *os.File
1338                         if exists {
1339                                 fd = fdAndFullSize.fd
1340                         } else {
1341                                 fd, err = os.OpenFile(
1342                                         filePathPart,
1343                                         os.O_RDWR|os.O_CREATE,
1344                                         os.FileMode(0666),
1345                                 )
1346                                 if err != nil {
1347                                         state.Ctx.LogE("sp-file-open", lesp, err, func(les LEs) string {
1348                                                 return logMsg(les) + ": opening part"
1349                                         })
1350                                         return nil, err
1351                                 }
1352                                 state.fdsLock.Lock()
1353                                 state.fds[filePathPart] = FdAndFullSize{fd: fd}
1354                                 state.fdsLock.Unlock()
1355                                 if !state.NoCK {
1356                                         hasherAndOffset = &MTHAndOffset{
1357                                                 mth:    MTHNew(fullsize, int64(file.Offset)),
1358                                                 offset: file.Offset,
1359                                         }
1360                                         state.fileHashers[filePath] = hasherAndOffset
1361                                 }
1362                         }
1363                         state.Ctx.LogD(
1364                                 "sp-file-seek",
1365                                 append(lesp, LE{"Offset", file.Offset}),
1366                                 func(les LEs) string {
1367                                         return fmt.Sprintf("%s: seeking %d", logMsg(les), file.Offset)
1368                                 })
1369                         if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
1370                                 state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
1371                                         return logMsg(les) + ": seeking"
1372                                 })
1373                                 state.closeFd(filePathPart)
1374                                 return nil, err
1375                         }
1376                         state.Ctx.LogD("sp-file-write", lesp, func(les LEs) string {
1377                                 return logMsg(les) + ": writing"
1378                         })
1379                         if _, err = fd.Write(file.Payload); err != nil {
1380                                 state.Ctx.LogE("sp-file-write", lesp, err, func(les LEs) string {
1381                                         return logMsg(les) + ": writing"
1382                                 })
1383                                 state.closeFd(filePathPart)
1384                                 return nil, err
1385                         }
1386                         if hasherAndOffset != nil {
1387                                 if hasherAndOffset.offset == file.Offset {
1388                                         if _, err = hasherAndOffset.mth.Write(file.Payload); err != nil {
1389                                                 panic(err)
1390                                         }
1391                                         hasherAndOffset.offset += uint64(len(file.Payload))
1392                                 } else {
1393                                         state.Ctx.LogE(
1394                                                 "sp-file-offset-differs", lesp, errors.New("offset differs"),
1395                                                 func(les LEs) string {
1396                                                         return logMsg(les) + ": deleting hasher"
1397                                                 },
1398                                         )
1399                                         delete(state.fileHashers, filePath)
1400                                         hasherAndOffset = nil
1401                                 }
1402                         }
1403                         ourSize := int64(file.Offset + uint64(len(file.Payload)))
1404                         lesp[len(lesp)-2].V = ourSize
1405                         if state.Ctx.ShowPrgrs {
1406                                 state.progressBars[pktName] = struct{}{}
1407                                 Progress("Rx", lesp)
1408                         }
1409                         if fullsize != ourSize {
1410                                 continue
1411                         }
1412                         if state.Ctx.ShowPrgrs {
1413                                 delete(state.progressBars, pktName)
1414                         }
1415                         logMsg = func(les LEs) string {
1416                                 return fmt.Sprintf(
1417                                         "Got packet %s %d%% (%s / %s)",
1418                                         pktName, 100*ourSize/fullsize,
1419                                         humanize.IBytes(uint64(ourSize)),
1420                                         humanize.IBytes(uint64(fullsize)),
1421                                 )
1422                         }
1423                         err = fd.Sync()
1424                         if err != nil {
1425                                 state.Ctx.LogE("sp-file-sync", lesp, err, func(les LEs) string {
1426                                         return logMsg(les) + ": syncing"
1427                                 })
1428                                 state.closeFd(filePathPart)
1429                                 continue
1430                         }
1431                         if hasherAndOffset != nil {
1432                                 delete(state.fileHashers, filePath)
1433                                 if hasherAndOffset.mth.PrependSize == 0 {
1434                                         if bytes.Compare(hasherAndOffset.mth.Sum(nil), file.Hash[:]) != 0 {
1435                                                 state.Ctx.LogE(
1436                                                         "sp-file-bad-checksum", lesp,
1437                                                         errors.New("checksum mismatch"),
1438                                                         logMsg,
1439                                                 )
1440                                                 state.closeFd(filePathPart)
1441                                                 continue
1442                                         }
1443                                         if err = os.Rename(filePathPart, filePath); err != nil {
1444                                                 state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string {
1445                                                         return logMsg(les) + ": renaming"
1446                                                 })
1447                                                 state.closeFd(filePathPart)
1448                                                 continue
1449                                         }
1450                                         if err = DirSync(dirToSync); err != nil {
1451                                                 state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string {
1452                                                         return logMsg(les) + ": dirsyncing"
1453                                                 })
1454                                                 state.closeFd(filePathPart)
1455                                                 continue
1456                                         }
1457                                         state.Ctx.LogI("sp-file-done", lesp, func(les LEs) string {
1458                                                 return logMsg(les) + ": done"
1459                                         })
1460                                         state.wg.Add(1)
1461                                         go func() {
1462                                                 state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
1463                                                 state.wg.Done()
1464                                         }()
1465                                         state.Lock()
1466                                         delete(state.infosTheir, *file.Hash)
1467                                         state.Unlock()
1468                                         if !state.Ctx.HdrUsage {
1469                                                 continue
1470                                         }
1471                                         if _, err = fd.Seek(0, io.SeekStart); err != nil {
1472                                                 state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
1473                                                         return logMsg(les) + ": seeking"
1474                                                 })
1475                                                 state.closeFd(filePathPart)
1476                                                 continue
1477                                         }
1478                                         _, pktEncRaw, err := state.Ctx.HdrRead(fd)
1479                                         state.closeFd(filePathPart)
1480                                         if err != nil {
1481                                                 state.Ctx.LogE("sp-file-hdr-read", lesp, err, func(les LEs) string {
1482                                                         return logMsg(les) + ": HdrReading"
1483                                                 })
1484                                                 continue
1485                                         }
1486                                         state.Ctx.HdrWrite(pktEncRaw, filePath)
1487                                         continue
1488                                 }
1489                         }
1490                         state.closeFd(filePathPart)
1491                         if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil {
1492                                 state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string {
1493                                         return logMsg(les) + ": renaming"
1494                                 })
1495                                 continue
1496                         }
1497                         if err = DirSync(dirToSync); err != nil {
1498                                 state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string {
1499                                         return logMsg(les) + ": dirsyncing"
1500                                 })
1501                                 continue
1502                         }
1503                         state.Ctx.LogI("sp-file-downloaded", lesp, func(les LEs) string {
1504                                 return logMsg(les) + ": downloaded"
1505                         })
1506                         state.Lock()
1507                         delete(state.infosTheir, *file.Hash)
1508                         state.Unlock()
1509                         if hasherAndOffset != nil {
1510                                 go func() {
1511                                         spCheckerTasks <- SPCheckerTask{
1512                                                 nodeId: state.Node.Id,
1513                                                 hsh:    file.Hash,
1514                                                 mth:    hasherAndOffset.mth,
1515                                                 done:   state.payloads,
1516                                         }
1517                                 }()
1518                         }
1519
1520                 case SPTypeDone:
1521                         lesp := append(les, LE{"Type", "done"})
1522                         state.Ctx.LogD("sp-process-done-unmarshal", lesp, func(les LEs) string {
1523                                 return fmt.Sprintf(
1524                                         "SP with %s (nice %s): unmarshaling DONE",
1525                                         state.Node.Name, NicenessFmt(state.Nice),
1526                                 )
1527                         })
1528                         var done SPDone
1529                         if _, err = xdr.Unmarshal(r, &done); err != nil {
1530                                 state.Ctx.LogE("sp-process-done-unmarshal", lesp, err, func(les LEs) string {
1531                                         return fmt.Sprintf(
1532                                                 "SP with %s (nice %s): unmarshaling DONE",
1533                                                 state.Node.Name, NicenessFmt(state.Nice),
1534                                         )
1535                                 })
1536                                 return nil, err
1537                         }
1538                         pktName := Base32Codec.EncodeToString(done.Hash[:])
1539                         lesp = append(lesp, LE{"Pkt", pktName}, LE{"XX", string(TTx)})
1540                         logMsg := func(les LEs) string {
1541                                 return fmt.Sprintf(
1542                                         "SP with %s (nice %s): DONE: removing %s",
1543                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1544                                 )
1545                         }
1546                         state.Ctx.LogD("sp-done", lesp, logMsg)
1547                         pth := filepath.Join(
1548                                 state.Ctx.Spool,
1549                                 state.Node.Id.String(),
1550                                 string(TTx),
1551                                 pktName,
1552                         )
1553                         if err = os.Remove(pth); err == nil {
1554                                 state.Ctx.LogI("sp-done", lesp, func(les LEs) string {
1555                                         return fmt.Sprintf("Packet %s is sent", pktName)
1556                                 })
1557                                 if state.Ctx.HdrUsage {
1558                                         os.Remove(pth + HdrSuffix)
1559                                 }
1560                         } else {
1561                                 state.Ctx.LogE("sp-done", lesp, err, logMsg)
1562                         }
1563
1564                 case SPTypeFreq:
1565                         lesp := append(les, LE{"Type", "freq"})
1566                         state.Ctx.LogD("sp-process-freq", lesp, func(les LEs) string {
1567                                 return fmt.Sprintf(
1568                                         "SP with %s (nice %s): unmarshaling FREQ",
1569                                         state.Node.Name, NicenessFmt(state.Nice),
1570                                 )
1571                         })
1572                         var freq SPFreq
1573                         if _, err = xdr.Unmarshal(r, &freq); err != nil {
1574                                 state.Ctx.LogE("sp-process-freq", lesp, err, func(les LEs) string {
1575                                         return fmt.Sprintf(
1576                                                 "SP with %s (nice %s): unmarshaling FREQ",
1577                                                 state.Node.Name, NicenessFmt(state.Nice),
1578                                         )
1579                                 })
1580                                 return nil, err
1581                         }
1582                         pktName := Base32Codec.EncodeToString(freq.Hash[:])
1583                         lesp = append(lesp, LE{"Pkt", pktName}, LE{"Offset", freq.Offset})
1584                         state.Ctx.LogD("sp-process-freq-queueing", lesp, func(les LEs) string {
1585                                 return fmt.Sprintf(
1586                                         "SP with %s (nice %s): FREQ %s: queuing",
1587                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1588                                 )
1589                         })
1590                         nice, exists := state.infosOurSeen[*freq.Hash]
1591                         if exists {
1592                                 if state.onlyPkts == nil || !state.onlyPkts[*freq.Hash] {
1593                                         state.Lock()
1594                                         insertIdx := 0
1595                                         var freqWithNice *FreqWithNice
1596                                         for insertIdx, freqWithNice = range state.queueTheir {
1597                                                 if freqWithNice.nice > nice {
1598                                                         break
1599                                                 }
1600                                         }
1601                                         state.queueTheir = append(state.queueTheir, nil)
1602                                         copy(state.queueTheir[insertIdx+1:], state.queueTheir[insertIdx:])
1603                                         state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice}
1604                                         state.Unlock()
1605                                 } else {
1606                                         state.Ctx.LogD("sp-process-freq-skip", lesp, func(les LEs) string {
1607                                                 return fmt.Sprintf(
1608                                                         "SP with %s (nice %s): FREQ %s: skipping",
1609                                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1610                                                 )
1611                                         })
1612                                 }
1613                         } else {
1614                                 state.Ctx.LogD("sp-process-freq-unknown", lesp, func(les LEs) string {
1615                                         return fmt.Sprintf(
1616                                                 "SP with %s (nice %s): FREQ %s: unknown",
1617                                                 state.Node.Name, NicenessFmt(state.Nice), pktName,
1618                                         )
1619                                 })
1620                         }
1621
1622                 default:
1623                         state.Ctx.LogE(
1624                                 "sp-process-type-unknown",
1625                                 append(les, LE{"Type", head.Type}),
1626                                 errors.New("unknown type"),
1627                                 func(les LEs) string {
1628                                         return fmt.Sprintf(
1629                                                 "SP with %s (nice %s): %d",
1630                                                 state.Node.Name, NicenessFmt(state.Nice), head.Type,
1631                                         )
1632                                 },
1633                         )
1634                         return nil, BadPktType
1635                 }
1636         }
1637
1638         if infosGot {
1639                 var pkts int
1640                 var size uint64
1641                 state.RLock()
1642                 for _, info := range state.infosTheir {
1643                         pkts++
1644                         size += info.Size
1645                 }
1646                 state.RUnlock()
1647                 state.Ctx.LogI("sp-infos-rx", LEs{
1648                         {"XX", string(TRx)},
1649                         {"Node", state.Node.Id},
1650                         {"Pkts", pkts},
1651                         {"Size", int64(size)},
1652                 }, func(les LEs) string {
1653                         return fmt.Sprintf(
1654                                 "%s has got for us: %d packets, %s",
1655                                 state.Node.Name, pkts, humanize.IBytes(size),
1656                         )
1657                 })
1658         }
1659         return payloadsSplit(replies), nil
1660 }
1661
1662 func SPChecker(ctx *Ctx) {
1663         for t := range spCheckerTasks {
1664                 pktName := Base32Codec.EncodeToString(t.hsh[:])
1665                 les := LEs{
1666                         {"XX", string(TRx)},
1667                         {"Node", t.nodeId},
1668                         {"Pkt", pktName},
1669                 }
1670                 SPCheckerWg.Add(1)
1671                 ctx.LogD("sp-checker", les, func(les LEs) string {
1672                         return fmt.Sprintf("Checksumming %s/rx/%s", ctx.NodeName(t.nodeId), pktName)
1673                 })
1674                 size, err := ctx.CheckNoCK(t.nodeId, t.hsh, t.mth)
1675                 les = append(les, LE{"Size", size})
1676                 if err != nil {
1677                         ctx.LogE("sp-checker", les, err, func(les LEs) string {
1678                                 return fmt.Sprintf(
1679                                         "Checksumming %s/rx/%s (%s)", ctx.NodeName(t.nodeId), pktName,
1680                                         humanize.IBytes(uint64(size)),
1681                                 )
1682                         })
1683                         SPCheckerWg.Done()
1684                         continue
1685                 }
1686                 ctx.LogI("sp-checker-done", les, func(les LEs) string {
1687                         return fmt.Sprintf(
1688                                 "Packet %s is retreived (%s)",
1689                                 pktName, humanize.IBytes(uint64(size)),
1690                         )
1691                 })
1692                 SPCheckerWg.Done()
1693                 go func(t SPCheckerTask) {
1694                         defer func() { recover() }()
1695                         t.done <- MarshalSP(SPTypeDone, SPDone{t.hsh})
1696                 }(t)
1697         }
1698 }