]> Cypherpunks.ru repositories - nncp.git/blob - src/sp.go
00eaac6b50d58ee0dc8665fa04d2a0a54b6c913b
[nncp.git] / src / sp.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bytes"
22         "crypto/subtle"
23         "errors"
24         "fmt"
25         "hash"
26         "io"
27         "os"
28         "path/filepath"
29         "sort"
30         "sync"
31         "time"
32
33         xdr "github.com/davecgh/go-xdr/xdr2"
34         "github.com/dustin/go-humanize"
35         "github.com/flynn/noise"
36         "golang.org/x/crypto/blake2b"
37 )
38
39 const (
40         MaxSPSize      = 1<<16 - 256
41         PartSuffix     = ".part"
42         SPHeadOverhead = 4
43 )
44
45 type SPCheckerQueues struct {
46         appeared chan *[32]byte
47         checked  chan *[32]byte
48 }
49
50 var (
51         MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1}
52
53         SPInfoOverhead    int
54         SPFreqOverhead    int
55         SPFileOverhead    int
56         SPHaltMarshalized []byte
57         SPPingMarshalized []byte
58
59         NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite(
60                 noise.DH25519,
61                 noise.CipherChaChaPoly,
62                 noise.HashBLAKE2b,
63         )
64
65         DefaultDeadline = 10 * time.Second
66         PingTimeout     = time.Minute
67
68         spCheckers   = make(map[NodeId]*SPCheckerQueues)
69         SPCheckersWg sync.WaitGroup
70 )
71
72 type FdAndFullSize struct {
73         fd       *os.File
74         fullSize int64
75 }
76
77 type HasherAndOffset struct {
78         h      hash.Hash
79         offset uint64
80 }
81
82 type SPType uint8
83
84 const (
85         SPTypeInfo SPType = iota
86         SPTypeFreq SPType = iota
87         SPTypeFile SPType = iota
88         SPTypeDone SPType = iota
89         SPTypeHalt SPType = iota
90         SPTypePing SPType = iota
91 )
92
93 type SPHead struct {
94         Type SPType
95 }
96
97 type SPInfo struct {
98         Nice uint8
99         Size uint64
100         Hash *[32]byte
101 }
102
103 type SPFreq struct {
104         Hash   *[32]byte
105         Offset uint64
106 }
107
108 type SPFile struct {
109         Hash    *[32]byte
110         Offset  uint64
111         Payload []byte
112 }
113
114 type SPDone struct {
115         Hash *[32]byte
116 }
117
118 type SPRaw struct {
119         Magic   [8]byte
120         Payload []byte
121 }
122
123 type FreqWithNice struct {
124         freq *SPFreq
125         nice uint8
126 }
127
128 type ConnDeadlined interface {
129         io.ReadWriteCloser
130         SetReadDeadline(t time.Time) error
131         SetWriteDeadline(t time.Time) error
132 }
133
134 func init() {
135         var buf bytes.Buffer
136         spHead := SPHead{Type: SPTypeHalt}
137         if _, err := xdr.Marshal(&buf, spHead); err != nil {
138                 panic(err)
139         }
140         SPHaltMarshalized = make([]byte, SPHeadOverhead)
141         copy(SPHaltMarshalized, buf.Bytes())
142         buf.Reset()
143
144         spHead = SPHead{Type: SPTypePing}
145         if _, err := xdr.Marshal(&buf, spHead); err != nil {
146                 panic(err)
147         }
148         SPPingMarshalized = make([]byte, SPHeadOverhead)
149         copy(SPPingMarshalized, buf.Bytes())
150         buf.Reset()
151
152         spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([32]byte)}
153         if _, err := xdr.Marshal(&buf, spInfo); err != nil {
154                 panic(err)
155         }
156         SPInfoOverhead = buf.Len()
157         buf.Reset()
158
159         spFreq := SPFreq{Hash: new([32]byte), Offset: 123}
160         if _, err := xdr.Marshal(&buf, spFreq); err != nil {
161                 panic(err)
162         }
163         SPFreqOverhead = buf.Len()
164         buf.Reset()
165
166         spFile := SPFile{Hash: new([32]byte), Offset: 123}
167         if _, err := xdr.Marshal(&buf, spFile); err != nil {
168                 panic(err)
169         }
170         SPFileOverhead = buf.Len()
171 }
172
173 func MarshalSP(typ SPType, sp interface{}) []byte {
174         var buf bytes.Buffer
175         if _, err := xdr.Marshal(&buf, SPHead{typ}); err != nil {
176                 panic(err)
177         }
178         if _, err := xdr.Marshal(&buf, sp); err != nil {
179                 panic(err)
180         }
181         return buf.Bytes()
182 }
183
184 func payloadsSplit(payloads [][]byte) [][]byte {
185         var outbounds [][]byte
186         outbound := make([]byte, 0, MaxSPSize)
187         for i, payload := range payloads {
188                 outbound = append(outbound, payload...)
189                 if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxSPSize {
190                         outbounds = append(outbounds, outbound)
191                         outbound = make([]byte, 0, MaxSPSize)
192                 }
193         }
194         if len(outbound) > 0 {
195                 outbounds = append(outbounds, outbound)
196         }
197         return outbounds
198 }
199
200 type SPState struct {
201         Ctx            *Ctx
202         Node           *Node
203         Nice           uint8
204         NoCK           bool
205         onlineDeadline time.Duration
206         maxOnlineTime  time.Duration
207         hs             *noise.HandshakeState
208         csOur          *noise.CipherState
209         csTheir        *noise.CipherState
210         payloads       chan []byte
211         pings          chan struct{}
212         infosTheir     map[[32]byte]*SPInfo
213         infosOurSeen   map[[32]byte]uint8
214         queueTheir     []*FreqWithNice
215         wg             sync.WaitGroup
216         RxBytes        int64
217         RxLastSeen     time.Time
218         RxLastNonPing  time.Time
219         TxBytes        int64
220         TxLastSeen     time.Time
221         TxLastNonPing  time.Time
222         started        time.Time
223         mustFinishAt   time.Time
224         Duration       time.Duration
225         RxSpeed        int64
226         TxSpeed        int64
227         rxLock         *os.File
228         txLock         *os.File
229         xxOnly         TRxTx
230         rxRate         int
231         txRate         int
232         isDead         chan struct{}
233         listOnly       bool
234         onlyPkts       map[[32]byte]bool
235         writeSPBuf     bytes.Buffer
236         fds            map[string]FdAndFullSize
237         fdsLock        sync.RWMutex
238         fileHashers    map[string]*HasherAndOffset
239         checkerQueues  SPCheckerQueues
240         progressBars   map[string]struct{}
241         sync.RWMutex
242 }
243
244 func (state *SPState) SetDead() {
245         state.Lock()
246         defer state.Unlock()
247         select {
248         case <-state.isDead:
249                 // Already closed channel, dead
250                 return
251         default:
252         }
253         close(state.isDead)
254         go func() {
255                 for range state.payloads {
256                 }
257         }()
258         go func() {
259                 for range state.pings {
260                 }
261         }()
262 }
263
264 func (state *SPState) NotAlive() bool {
265         select {
266         case <-state.isDead:
267                 return true
268         default:
269         }
270         return false
271 }
272
273 func (state *SPState) dirUnlock() {
274         state.Ctx.UnlockDir(state.rxLock)
275         state.Ctx.UnlockDir(state.txLock)
276 }
277
278 func SPChecker(ctx *Ctx, nodeId *NodeId, appeared, checked chan *[32]byte) {
279         for hshValue := range appeared {
280                 pktName := Base32Codec.EncodeToString(hshValue[:])
281                 les := LEs{
282                         {"XX", string(TRx)},
283                         {"Node", nodeId},
284                         {"Pkt", pktName},
285                 }
286                 SPCheckersWg.Add(1)
287                 ctx.LogD("sp-checker", les, func(les LEs) string {
288                         return fmt.Sprintf("Checksumming %s/rx/%s", ctx.NodeName(nodeId), pktName)
289                 })
290                 size, err := ctx.CheckNoCK(nodeId, hshValue)
291                 les = append(les, LE{"Size", size})
292                 if err != nil {
293                         ctx.LogE("sp-checker", les, err, func(les LEs) string {
294                                 return fmt.Sprintf(
295                                         "Checksumming %s/rx/%s (%s)", ctx.NodeName(nodeId), pktName,
296                                         humanize.IBytes(uint64(size)),
297                                 )
298                         })
299                         continue
300                 }
301                 ctx.LogI("sp-checker-done", les, func(les LEs) string {
302                         return fmt.Sprintf(
303                                 "Packet %s is retreived (%s)",
304                                 pktName, humanize.IBytes(uint64(size)),
305                         )
306                 })
307                 SPCheckersWg.Done()
308                 go func(hsh *[32]byte) { checked <- hsh }(hshValue)
309         }
310 }
311
312 func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error {
313         state.writeSPBuf.Reset()
314         n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{
315                 Magic:   MagicNNCPLv1,
316                 Payload: payload,
317         })
318         if err != nil {
319                 return err
320         }
321         if n, err = dst.Write(state.writeSPBuf.Bytes()); err == nil {
322                 state.TxLastSeen = time.Now()
323                 state.TxBytes += int64(n)
324                 if !ping {
325                         state.TxLastNonPing = state.TxLastSeen
326                 }
327         }
328         return err
329 }
330
331 func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
332         var sp SPRaw
333         n, err := xdr.UnmarshalLimited(src, &sp, 1<<17)
334         if err != nil {
335                 ue := err.(*xdr.UnmarshalError)
336                 if ue.Err == io.EOF {
337                         return nil, ue.Err
338                 }
339                 return nil, err
340         }
341         state.RxLastSeen = time.Now()
342         state.RxBytes += int64(n)
343         if sp.Magic != MagicNNCPLv1 {
344                 return nil, BadMagic
345         }
346         return sp.Payload, nil
347 }
348
349 func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [][]byte {
350         var infos []*SPInfo
351         var totalSize int64
352         for job := range ctx.Jobs(nodeId, TTx) {
353                 if job.PktEnc.Nice > nice {
354                         continue
355                 }
356                 if _, known := (*seen)[*job.HshValue]; known {
357                         continue
358                 }
359                 totalSize += job.Size
360                 infos = append(infos, &SPInfo{
361                         Nice: job.PktEnc.Nice,
362                         Size: uint64(job.Size),
363                         Hash: job.HshValue,
364                 })
365                 (*seen)[*job.HshValue] = job.PktEnc.Nice
366         }
367         sort.Sort(ByNice(infos))
368         var payloads [][]byte
369         for _, info := range infos {
370                 payloads = append(payloads, MarshalSP(SPTypeInfo, info))
371                 pktName := Base32Codec.EncodeToString(info.Hash[:])
372                 ctx.LogD("sp-info-our", LEs{
373                         {"Node", nodeId},
374                         {"Name", pktName},
375                         {"Size", info.Size},
376                 }, func(les LEs) string {
377                         return fmt.Sprintf(
378                                 "Our info: %s/tx/%s (%s)",
379                                 ctx.NodeName(nodeId),
380                                 pktName,
381                                 humanize.IBytes(info.Size),
382                         )
383                 })
384         }
385         if totalSize > 0 {
386                 ctx.LogI("sp-infos-tx", LEs{
387                         {"XX", string(TTx)},
388                         {"Node", nodeId},
389                         {"Pkts", len(payloads)},
390                         {"Size", totalSize},
391                 }, func(les LEs) string {
392                         return fmt.Sprintf(
393                                 "We have got for %s: %d packets, %s",
394                                 ctx.NodeName(nodeId),
395                                 len(payloads),
396                                 humanize.IBytes(uint64(totalSize)),
397                         )
398                 })
399         }
400         return payloadsSplit(payloads)
401 }
402
403 func (state *SPState) StartI(conn ConnDeadlined) error {
404         nodeId := state.Node.Id
405         err := state.Ctx.ensureRxDir(nodeId)
406         if err != nil {
407                 return err
408         }
409         var rxLock *os.File
410         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TRx) {
411                 rxLock, err = state.Ctx.LockDir(nodeId, string(TRx))
412                 if err != nil {
413                         return err
414                 }
415         }
416         var txLock *os.File
417         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
418                 txLock, err = state.Ctx.LockDir(nodeId, string(TTx))
419                 if err != nil {
420                         return err
421                 }
422         }
423         started := time.Now()
424         conf := noise.Config{
425                 CipherSuite: NoiseCipherSuite,
426                 Pattern:     noise.HandshakeIK,
427                 Initiator:   true,
428                 StaticKeypair: noise.DHKey{
429                         Private: state.Ctx.Self.NoisePrv[:],
430                         Public:  state.Ctx.Self.NoisePub[:],
431                 },
432                 PeerStatic: state.Node.NoisePub[:],
433         }
434         hs, err := noise.NewHandshakeState(conf)
435         if err != nil {
436                 return err
437         }
438         state.hs = hs
439         state.payloads = make(chan []byte)
440         state.pings = make(chan struct{})
441         state.infosTheir = make(map[[32]byte]*SPInfo)
442         state.infosOurSeen = make(map[[32]byte]uint8)
443         state.progressBars = make(map[string]struct{})
444         state.started = started
445         state.rxLock = rxLock
446         state.txLock = txLock
447
448         var infosPayloads [][]byte
449         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
450                 infosPayloads = state.Ctx.infosOur(nodeId, state.Nice, &state.infosOurSeen)
451         }
452         var firstPayload []byte
453         if len(infosPayloads) > 0 {
454                 firstPayload = infosPayloads[0]
455         }
456         // Pad first payload, to hide actual number of existing files
457         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
458                 firstPayload = append(firstPayload, SPHaltMarshalized...)
459         }
460
461         var buf []byte
462         var payload []byte
463         buf, _, _, err = state.hs.WriteMessage(nil, firstPayload)
464         if err != nil {
465                 state.dirUnlock()
466                 return err
467         }
468         les := LEs{{"Node", nodeId}, {"Nice", int(state.Nice)}}
469         state.Ctx.LogD("sp-startI", les, func(les LEs) string {
470                 return fmt.Sprintf(
471                         "SP with %s (nice %s): sending first message",
472                         state.Node.Name,
473                         NicenessFmt(state.Nice),
474                 )
475         })
476         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
477         if err = state.WriteSP(conn, buf, false); err != nil {
478                 state.Ctx.LogE("sp-startI", les, err, func(les LEs) string {
479                         return fmt.Sprintf(
480                                 "SP with %s (nice %s): writing",
481                                 state.Node.Name,
482                                 NicenessFmt(state.Nice),
483                         )
484                 })
485                 state.dirUnlock()
486                 return err
487         }
488         state.Ctx.LogD("sp-startI-wait", les, func(les LEs) string {
489                 return fmt.Sprintf(
490                         "SP with %s (nice %s): waiting for first message",
491                         state.Node.Name,
492                         NicenessFmt(state.Nice),
493                 )
494         })
495         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
496         if buf, err = state.ReadSP(conn); err != nil {
497                 state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string {
498                         return fmt.Sprintf(
499                                 "SP with %s (nice %s): reading",
500                                 state.Node.Name,
501                                 NicenessFmt(state.Nice),
502                         )
503                 })
504                 state.dirUnlock()
505                 return err
506         }
507         payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
508         if err != nil {
509                 state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string {
510                         return fmt.Sprintf(
511                                 "SP with %s (nice %s): reading Noise message",
512                                 state.Node.Name,
513                                 NicenessFmt(state.Nice),
514                         )
515                 })
516                 state.dirUnlock()
517                 return err
518         }
519         state.Ctx.LogD("sp-startI-workers", les, func(les LEs) string {
520                 return fmt.Sprintf(
521                         "SP with %s (nice %s): starting workers",
522                         state.Node.Name,
523                         NicenessFmt(state.Nice),
524                 )
525         })
526         err = state.StartWorkers(conn, infosPayloads, payload)
527         if err != nil {
528                 state.Ctx.LogE("sp-startI-workers", les, err, func(les LEs) string {
529                         return fmt.Sprintf(
530                                 "SP with %s (nice %s): starting workers",
531                                 state.Node.Name,
532                                 NicenessFmt(state.Nice),
533                         )
534                 })
535                 state.dirUnlock()
536         }
537         return err
538 }
539
540 func (state *SPState) StartR(conn ConnDeadlined) error {
541         started := time.Now()
542         conf := noise.Config{
543                 CipherSuite: NoiseCipherSuite,
544                 Pattern:     noise.HandshakeIK,
545                 Initiator:   false,
546                 StaticKeypair: noise.DHKey{
547                         Private: state.Ctx.Self.NoisePrv[:],
548                         Public:  state.Ctx.Self.NoisePub[:],
549                 },
550         }
551         hs, err := noise.NewHandshakeState(conf)
552         if err != nil {
553                 return err
554         }
555         xxOnly := TRxTx("")
556         state.hs = hs
557         state.payloads = make(chan []byte)
558         state.pings = make(chan struct{})
559         state.infosOurSeen = make(map[[32]byte]uint8)
560         state.infosTheir = make(map[[32]byte]*SPInfo)
561         state.progressBars = make(map[string]struct{})
562         state.started = started
563         state.xxOnly = xxOnly
564
565         var buf []byte
566         var payload []byte
567         logMsg := func(les LEs) string {
568                 return fmt.Sprintf(
569                         "SP nice %s: waiting for first message",
570                         NicenessFmt(state.Nice),
571                 )
572         }
573         les := LEs{{"Nice", int(state.Nice)}}
574         state.Ctx.LogD("sp-startR", les, logMsg)
575         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
576         if buf, err = state.ReadSP(conn); err != nil {
577                 state.Ctx.LogE("sp-startR-read", les, err, logMsg)
578                 return err
579         }
580         if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
581                 state.Ctx.LogE("sp-startR-read", les, err, logMsg)
582                 return err
583         }
584
585         var node *Node
586         for _, n := range state.Ctx.Neigh {
587                 if subtle.ConstantTimeCompare(state.hs.PeerStatic(), n.NoisePub[:]) == 1 {
588                         node = n
589                         break
590                 }
591         }
592         if node == nil {
593                 peerId := Base32Codec.EncodeToString(state.hs.PeerStatic())
594                 err = errors.New("unknown peer: " + peerId)
595                 state.Ctx.LogE("sp-startR-unknown", append(les, LE{"Peer", peerId}), err, logMsg)
596                 return err
597         }
598         state.Node = node
599         state.rxRate = node.RxRate
600         state.txRate = node.TxRate
601         state.onlineDeadline = node.OnlineDeadline
602         state.maxOnlineTime = node.MaxOnlineTime
603         les = LEs{{"Node", node.Id}, {"Nice", int(state.Nice)}}
604
605         if err = state.Ctx.ensureRxDir(node.Id); err != nil {
606                 return err
607         }
608         var rxLock *os.File
609         if xxOnly == "" || xxOnly == TRx {
610                 rxLock, err = state.Ctx.LockDir(node.Id, string(TRx))
611                 if err != nil {
612                         return err
613                 }
614         }
615         state.rxLock = rxLock
616         var txLock *os.File
617         if xxOnly == "" || xxOnly == TTx {
618                 txLock, err = state.Ctx.LockDir(node.Id, string(TTx))
619                 if err != nil {
620                         return err
621                 }
622         }
623         state.txLock = txLock
624
625         var infosPayloads [][]byte
626         if xxOnly == "" || xxOnly == TTx {
627                 infosPayloads = state.Ctx.infosOur(node.Id, state.Nice, &state.infosOurSeen)
628         }
629         var firstPayload []byte
630         if len(infosPayloads) > 0 {
631                 firstPayload = infosPayloads[0]
632         }
633         // Pad first payload, to hide actual number of existing files
634         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
635                 firstPayload = append(firstPayload, SPHaltMarshalized...)
636         }
637
638         state.Ctx.LogD("sp-startR-write", les, func(les LEs) string {
639                 return fmt.Sprintf(
640                         "SP with %s (nice %s): sending first message",
641                         node.Name, NicenessFmt(state.Nice),
642                 )
643         })
644         buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload)
645         if err != nil {
646                 state.dirUnlock()
647                 return err
648         }
649         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
650         if err = state.WriteSP(conn, buf, false); err != nil {
651                 state.Ctx.LogE("sp-startR-write", les, err, func(les LEs) string {
652                         return fmt.Sprintf(
653                                 "SP with %s (nice %s): writing",
654                                 node.Name, NicenessFmt(state.Nice),
655                         )
656                 })
657                 state.dirUnlock()
658                 return err
659         }
660         state.Ctx.LogD("sp-startR-workers", les, func(les LEs) string {
661                 return fmt.Sprintf(
662                         "SP with %s (nice %s): starting workers",
663                         node.Name, NicenessFmt(state.Nice),
664                 )
665         })
666         err = state.StartWorkers(conn, infosPayloads, payload)
667         if err != nil {
668                 state.dirUnlock()
669         }
670         return err
671 }
672
673 func (state *SPState) closeFd(pth string) {
674         state.fdsLock.Lock()
675         if s, exists := state.fds[pth]; exists {
676                 delete(state.fds, pth)
677                 s.fd.Close()
678         }
679         state.fdsLock.Unlock()
680 }
681
682 func (state *SPState) StartWorkers(
683         conn ConnDeadlined,
684         infosPayloads [][]byte,
685         payload []byte,
686 ) error {
687         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
688         state.fds = make(map[string]FdAndFullSize)
689         state.fileHashers = make(map[string]*HasherAndOffset)
690         state.isDead = make(chan struct{})
691         if state.maxOnlineTime > 0 {
692                 state.mustFinishAt = state.started.Add(state.maxOnlineTime)
693         }
694
695         // Checker
696         if !state.NoCK {
697                 queues := spCheckers[*state.Node.Id]
698                 if queues == nil {
699                         queues = &SPCheckerQueues{
700                                 appeared: make(chan *[32]byte),
701                                 checked:  make(chan *[32]byte),
702                         }
703                         spCheckers[*state.Node.Id] = queues
704                         go SPChecker(state.Ctx, state.Node.Id, queues.appeared, queues.checked)
705                 }
706                 state.checkerQueues = *queues
707                 go func() {
708                         for job := range state.Ctx.JobsNoCK(state.Node.Id) {
709                                 if job.PktEnc.Nice <= state.Nice {
710                                         state.checkerQueues.appeared <- job.HshValue
711                                 }
712                         }
713                 }()
714                 state.wg.Add(1)
715                 go func() {
716                         defer state.wg.Done()
717                         for {
718                                 select {
719                                 case <-state.isDead:
720                                         return
721                                 case hsh := <-state.checkerQueues.checked:
722                                         state.payloads <- MarshalSP(SPTypeDone, SPDone{hsh})
723                                 }
724                         }
725                 }()
726         }
727
728         // Remaining handshake payload sending
729         if len(infosPayloads) > 1 {
730                 state.wg.Add(1)
731                 go func() {
732                         for _, payload := range infosPayloads[1:] {
733                                 state.Ctx.LogD(
734                                         "sp-queue-remaining",
735                                         append(les, LE{"Size", int64(len(payload))}),
736                                         func(les LEs) string {
737                                                 return fmt.Sprintf(
738                                                         "SP with %s (nice %s): queuing remaining payload (%s)",
739                                                         state.Node.Name, NicenessFmt(state.Nice),
740                                                         humanize.IBytes(uint64(len(payload))),
741                                                 )
742                                         },
743                                 )
744                                 state.payloads <- payload
745                         }
746                         state.wg.Done()
747                 }()
748         }
749
750         // Processing of first payload and queueing its responses
751         logMsg := func(les LEs) string {
752                 return fmt.Sprintf(
753                         "SP with %s (nice %s): processing first payload (%s)",
754                         state.Node.Name, NicenessFmt(state.Nice),
755                         humanize.IBytes(uint64(len(payload))),
756                 )
757         }
758         state.Ctx.LogD("sp-process", append(les, LE{"Size", int64(len(payload))}), logMsg)
759         replies, err := state.ProcessSP(payload)
760         if err != nil {
761                 state.Ctx.LogE("sp-process", les, err, logMsg)
762                 return err
763         }
764         state.wg.Add(1)
765         go func() {
766                 for _, reply := range replies {
767                         state.Ctx.LogD(
768                                 "sp-queue-reply",
769                                 append(les, LE{"Size", int64(len(reply))}),
770                                 func(les LEs) string {
771                                         return fmt.Sprintf(
772                                                 "SP with %s (nice %s): queuing reply (%s)",
773                                                 state.Node.Name, NicenessFmt(state.Nice),
774                                                 humanize.IBytes(uint64(len(payload))),
775                                         )
776                                 },
777                         )
778                         state.payloads <- reply
779                 }
780                 state.wg.Done()
781         }()
782
783         // Periodic jobs
784         state.wg.Add(1)
785         go func() {
786                 deadlineTicker := time.NewTicker(time.Second)
787                 pingTicker := time.NewTicker(PingTimeout)
788                 for {
789                         select {
790                         case <-state.isDead:
791                                 state.wg.Done()
792                                 deadlineTicker.Stop()
793                                 pingTicker.Stop()
794                                 return
795                         case now := <-deadlineTicker.C:
796                                 if now.Sub(state.RxLastNonPing) >= state.onlineDeadline &&
797                                         now.Sub(state.TxLastNonPing) >= state.onlineDeadline {
798                                         goto Deadlined
799                                 }
800                                 if state.maxOnlineTime > 0 && state.mustFinishAt.Before(now) {
801                                         goto Deadlined
802                                 }
803                                 if now.Sub(state.RxLastSeen) >= 2*PingTimeout {
804                                         goto Deadlined
805                                 }
806                                 break
807                         Deadlined:
808                                 state.SetDead()
809                                 conn.Close() // #nosec G104
810                         case now := <-pingTicker.C:
811                                 if now.After(state.TxLastSeen.Add(PingTimeout)) {
812                                         state.wg.Add(1)
813                                         go func() {
814                                                 state.pings <- struct{}{}
815                                                 state.wg.Done()
816                                         }()
817                                 }
818                         }
819                 }
820         }()
821
822         // Spool checker and INFOs sender of appearing files
823         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
824                 state.wg.Add(1)
825                 go func() {
826                         ticker := time.NewTicker(time.Second)
827                         for {
828                                 select {
829                                 case <-state.isDead:
830                                         state.wg.Done()
831                                         ticker.Stop()
832                                         return
833                                 case <-ticker.C:
834                                         for _, payload := range state.Ctx.infosOur(
835                                                 state.Node.Id,
836                                                 state.Nice,
837                                                 &state.infosOurSeen,
838                                         ) {
839                                                 state.Ctx.LogD(
840                                                         "sp-queue-info",
841                                                         append(les, LE{"Size", int64(len(payload))}),
842                                                         func(les LEs) string {
843                                                                 return fmt.Sprintf(
844                                                                         "SP with %s (nice %s): queuing new info (%s)",
845                                                                         state.Node.Name, NicenessFmt(state.Nice),
846                                                                         humanize.IBytes(uint64(len(payload))),
847                                                                 )
848                                                         },
849                                                 )
850                                                 state.payloads <- payload
851                                         }
852                                 }
853                         }
854                 }()
855         }
856
857         // Sender
858         state.wg.Add(1)
859         go func() {
860                 defer conn.Close()
861                 defer state.SetDead()
862                 defer state.wg.Done()
863                 for {
864                         if state.NotAlive() {
865                                 return
866                         }
867                         var payload []byte
868                         var ping bool
869                         select {
870                         case <-state.pings:
871                                 state.Ctx.LogD("sp-got-ping", les, func(les LEs) string {
872                                         return fmt.Sprintf(
873                                                 "SP with %s (nice %s): got ping",
874                                                 state.Node.Name, NicenessFmt(state.Nice),
875                                         )
876                                 })
877                                 payload = SPPingMarshalized
878                                 ping = true
879                         case payload = <-state.payloads:
880                                 state.Ctx.LogD(
881                                         "sp-got-payload",
882                                         append(les, LE{"Size", int64(len(payload))}),
883                                         func(les LEs) string {
884                                                 return fmt.Sprintf(
885                                                         "SP with %s (nice %s): got payload (%s)",
886                                                         state.Node.Name, NicenessFmt(state.Nice),
887                                                         humanize.IBytes(uint64(len(payload))),
888                                                 )
889                                         },
890                                 )
891                         default:
892                                 state.RLock()
893                                 if len(state.queueTheir) == 0 {
894                                         state.RUnlock()
895                                         time.Sleep(100 * time.Millisecond)
896                                         continue
897                                 }
898                                 freq := state.queueTheir[0].freq
899                                 state.RUnlock()
900                                 if state.txRate > 0 {
901                                         time.Sleep(time.Second / time.Duration(state.txRate))
902                                 }
903                                 pktName := Base32Codec.EncodeToString(freq.Hash[:])
904                                 lesp := append(
905                                         les,
906                                         LE{"XX", string(TTx)},
907                                         LE{"Pkt", pktName},
908                                         LE{"Size", int64(freq.Offset)},
909                                 )
910                                 logMsg := func(les LEs) string {
911                                         return fmt.Sprintf(
912                                                 "SP with %s (nice %s): tx/%s (%s)",
913                                                 state.Node.Name, NicenessFmt(state.Nice),
914                                                 pktName,
915                                                 humanize.IBytes(freq.Offset),
916                                         )
917                                 }
918                                 state.Ctx.LogD("sp-queue", lesp, func(les LEs) string {
919                                         return logMsg(les) + ": queueing"
920                                 })
921                                 pth := filepath.Join(
922                                         state.Ctx.Spool,
923                                         state.Node.Id.String(),
924                                         string(TTx),
925                                         Base32Codec.EncodeToString(freq.Hash[:]),
926                                 )
927                                 state.fdsLock.RLock()
928                                 fdAndFullSize, exists := state.fds[pth]
929                                 state.fdsLock.RUnlock()
930                                 if !exists {
931                                         fd, err := os.Open(pth)
932                                         if err != nil {
933                                                 state.Ctx.LogE("sp-queue-open", lesp, err, func(les LEs) string {
934                                                         return logMsg(les) + ": opening"
935                                                 })
936                                                 return
937                                         }
938                                         fi, err := fd.Stat()
939                                         if err != nil {
940                                                 state.Ctx.LogE("sp-queue-stat", lesp, err, func(les LEs) string {
941                                                         return logMsg(les) + ": stating"
942                                                 })
943                                                 return
944                                         }
945                                         fdAndFullSize = FdAndFullSize{fd: fd, fullSize: fi.Size()}
946                                         state.fdsLock.Lock()
947                                         state.fds[pth] = fdAndFullSize
948                                         state.fdsLock.Unlock()
949                                 }
950                                 fd := fdAndFullSize.fd
951                                 fullSize := fdAndFullSize.fullSize
952                                 var buf []byte
953                                 if freq.Offset < uint64(fullSize) {
954                                         state.Ctx.LogD("sp-file-seek", lesp, func(les LEs) string {
955                                                 return logMsg(les) + ": seeking"
956                                         })
957                                         if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil {
958                                                 state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
959                                                         return logMsg(les) + ": seeking"
960                                                 })
961                                                 return
962                                         }
963                                         buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
964                                         n, err := fd.Read(buf)
965                                         if err != nil {
966                                                 state.Ctx.LogE("sp-file-read", lesp, err, func(les LEs) string {
967                                                         return logMsg(les) + ": reading"
968                                                 })
969                                                 return
970                                         }
971                                         buf = buf[:n]
972                                         lesp = append(
973                                                 les,
974                                                 LE{"XX", string(TTx)},
975                                                 LE{"Pkt", pktName},
976                                                 LE{"Size", int64(n)},
977                                         )
978                                         state.Ctx.LogD("sp-file-read", lesp, func(les LEs) string {
979                                                 return fmt.Sprintf(
980                                                         "%s: read %s",
981                                                         logMsg(les), humanize.IBytes(uint64(n)),
982                                                 )
983                                         })
984                                 }
985                                 state.closeFd(pth)
986                                 payload = MarshalSP(SPTypeFile, SPFile{
987                                         Hash:    freq.Hash,
988                                         Offset:  freq.Offset,
989                                         Payload: buf,
990                                 })
991                                 ourSize := freq.Offset + uint64(len(buf))
992                                 lesp = append(
993                                         les,
994                                         LE{"XX", string(TTx)},
995                                         LE{"Pkt", pktName},
996                                         LE{"Size", int64(ourSize)},
997                                         LE{"FullSize", fullSize},
998                                 )
999                                 if state.Ctx.ShowPrgrs {
1000                                         state.progressBars[pktName] = struct{}{}
1001                                         Progress("Tx", lesp)
1002                                 }
1003                                 state.Lock()
1004                                 if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash {
1005                                         if ourSize == uint64(fullSize) {
1006                                                 state.Ctx.LogD("sp-file-finished", lesp, func(les LEs) string {
1007                                                         return logMsg(les) + ": finished"
1008                                                 })
1009                                                 if len(state.queueTheir) > 1 {
1010                                                         state.queueTheir = state.queueTheir[1:]
1011                                                 } else {
1012                                                         state.queueTheir = state.queueTheir[:0]
1013                                                 }
1014                                                 if state.Ctx.ShowPrgrs {
1015                                                         delete(state.progressBars, pktName)
1016                                                 }
1017                                         } else {
1018                                                 state.queueTheir[0].freq.Offset += uint64(len(buf))
1019                                         }
1020                                 } else {
1021                                         state.Ctx.LogD("sp-file-disappeared", lesp, func(les LEs) string {
1022                                                 return logMsg(les) + ": queue disappeared"
1023                                         })
1024                                 }
1025                                 state.Unlock()
1026                         }
1027                         logMsg := func(les LEs) string {
1028                                 return fmt.Sprintf(
1029                                         "SP with %s (nice %s): sending %s",
1030                                         state.Node.Name, NicenessFmt(state.Nice),
1031                                         humanize.IBytes(uint64(len(payload))),
1032                                 )
1033                         }
1034                         state.Ctx.LogD("sp-sending", append(les, LE{"Size", int64(len(payload))}), logMsg)
1035                         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
1036                         if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload), ping); err != nil {
1037                                 state.Ctx.LogE("sp-sending", les, err, logMsg)
1038                                 return
1039                         }
1040                 }
1041         }()
1042
1043         // Receiver
1044         state.wg.Add(1)
1045         go func() {
1046                 for {
1047                         if state.NotAlive() {
1048                                 break
1049                         }
1050                         logMsg := func(les LEs) string {
1051                                 return fmt.Sprintf(
1052                                         "SP with %s (nice %s): waiting for payload",
1053                                         state.Node.Name, NicenessFmt(state.Nice),
1054                                 )
1055                         }
1056                         state.Ctx.LogD("sp-recv-wait", les, logMsg)
1057                         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
1058                         payload, err := state.ReadSP(conn)
1059                         if err != nil {
1060                                 if err == io.EOF {
1061                                         break
1062                                 }
1063                                 unmarshalErr := err.(*xdr.UnmarshalError)
1064                                 if os.IsTimeout(unmarshalErr.Err) {
1065                                         continue
1066                                 }
1067                                 if unmarshalErr.ErrorCode == xdr.ErrIO {
1068                                         break
1069                                 }
1070                                 state.Ctx.LogE("sp-recv-wait", les, err, logMsg)
1071                                 break
1072                         }
1073                         logMsg = func(les LEs) string {
1074                                 return fmt.Sprintf(
1075                                         "SP with %s (nice %s): payload (%s)",
1076                                         state.Node.Name, NicenessFmt(state.Nice),
1077                                         humanize.IBytes(uint64(len(payload))),
1078                                 )
1079                         }
1080                         state.Ctx.LogD(
1081                                 "sp-recv-got",
1082                                 append(les, LE{"Size", int64(len(payload))}),
1083                                 func(les LEs) string { return logMsg(les) + ": got" },
1084                         )
1085                         payload, err = state.csTheir.Decrypt(nil, nil, payload)
1086                         if err != nil {
1087                                 state.Ctx.LogE("sp-recv-got", les, err, func(les LEs) string {
1088                                         return logMsg(les) + ": got"
1089                                 })
1090                                 break
1091                         }
1092                         state.Ctx.LogD(
1093                                 "sp-recv-process",
1094                                 append(les, LE{"Size", int64(len(payload))}),
1095                                 func(les LEs) string {
1096                                         return logMsg(les) + ": processing"
1097                                 },
1098                         )
1099                         replies, err := state.ProcessSP(payload)
1100                         if err != nil {
1101                                 state.Ctx.LogE("sp-recv-process", les, err, func(les LEs) string {
1102                                         return logMsg(les) + ": processing"
1103                                 })
1104                                 break
1105                         }
1106                         state.wg.Add(1)
1107                         go func() {
1108                                 for _, reply := range replies {
1109                                         state.Ctx.LogD(
1110                                                 "sp-recv-reply",
1111                                                 append(les[:len(les)-1], LE{"Size", int64(len(reply))}),
1112                                                 func(les LEs) string {
1113                                                         return fmt.Sprintf(
1114                                                                 "SP with %s (nice %s): queuing reply (%s)",
1115                                                                 state.Node.Name, NicenessFmt(state.Nice),
1116                                                                 humanize.IBytes(uint64(len(reply))),
1117                                                         )
1118                                                 },
1119                                         )
1120                                         state.payloads <- reply
1121                                 }
1122                                 state.wg.Done()
1123                         }()
1124                         if state.rxRate > 0 {
1125                                 time.Sleep(time.Second / time.Duration(state.rxRate))
1126                         }
1127                 }
1128                 state.SetDead()
1129                 state.wg.Done()
1130                 state.SetDead()
1131                 conn.Close() // #nosec G104
1132         }()
1133
1134         return nil
1135 }
1136
1137 func (state *SPState) Wait() {
1138         state.wg.Wait()
1139         close(state.payloads)
1140         close(state.pings)
1141         state.Duration = time.Now().Sub(state.started)
1142         SPCheckersWg.Wait()
1143         state.dirUnlock()
1144         state.RxSpeed = state.RxBytes
1145         state.TxSpeed = state.TxBytes
1146         rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
1147         txDuration := int64(state.TxLastSeen.Sub(state.started).Seconds())
1148         if rxDuration > 0 {
1149                 state.RxSpeed = state.RxBytes / rxDuration
1150         }
1151         if txDuration > 0 {
1152                 state.TxSpeed = state.TxBytes / txDuration
1153         }
1154         for _, s := range state.fds {
1155                 s.fd.Close()
1156         }
1157         for pktName := range state.progressBars {
1158                 ProgressKill(pktName)
1159         }
1160 }
1161
1162 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
1163         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
1164         r := bytes.NewReader(payload)
1165         var err error
1166         var replies [][]byte
1167         var infosGot bool
1168         for r.Len() > 0 {
1169                 state.Ctx.LogD("sp-process-unmarshal", les, func(les LEs) string {
1170                         return fmt.Sprintf(
1171                                 "SP with %s (nice %s): unmarshaling header",
1172                                 state.Node.Name, NicenessFmt(state.Nice),
1173                         )
1174                 })
1175                 var head SPHead
1176                 if _, err = xdr.Unmarshal(r, &head); err != nil {
1177                         state.Ctx.LogE("sp-process-unmarshal", les, err, func(les LEs) string {
1178                                 return fmt.Sprintf(
1179                                         "SP with %s (nice %s): unmarshaling header",
1180                                         state.Node.Name, NicenessFmt(state.Nice),
1181                                 )
1182                         })
1183                         return nil, err
1184                 }
1185                 if head.Type != SPTypePing {
1186                         state.RxLastNonPing = state.RxLastSeen
1187                 }
1188                 switch head.Type {
1189                 case SPTypeHalt:
1190                         state.Ctx.LogD(
1191                                 "sp-process-halt",
1192                                 append(les, LE{"Type", "halt"}), func(les LEs) string {
1193                                         return fmt.Sprintf(
1194                                                 "SP with %s (nice %s): got HALT",
1195                                                 state.Node.Name, NicenessFmt(state.Nice),
1196                                         )
1197                                 },
1198                         )
1199                         state.Lock()
1200                         state.queueTheir = nil
1201                         state.Unlock()
1202
1203                 case SPTypePing:
1204                         state.Ctx.LogD(
1205                                 "sp-process-ping",
1206                                 append(les, LE{"Type", "ping"}),
1207                                 func(les LEs) string {
1208                                         return fmt.Sprintf(
1209                                                 "SP with %s (nice %s): got PING",
1210                                                 state.Node.Name, NicenessFmt(state.Nice),
1211                                         )
1212                                 },
1213                         )
1214
1215                 case SPTypeInfo:
1216                         infosGot = true
1217                         lesp := append(les, LE{"Type", "info"})
1218                         state.Ctx.LogD(
1219                                 "sp-process-info-unmarshal", lesp,
1220                                 func(les LEs) string {
1221                                         return fmt.Sprintf(
1222                                                 "SP with %s (nice %s): unmarshaling INFO",
1223                                                 state.Node.Name, NicenessFmt(state.Nice),
1224                                         )
1225                                 },
1226                         )
1227                         var info SPInfo
1228                         if _, err = xdr.Unmarshal(r, &info); err != nil {
1229                                 state.Ctx.LogE(
1230                                         "sp-process-info-unmarshal", lesp, err,
1231                                         func(les LEs) string {
1232                                                 return fmt.Sprintf(
1233                                                         "SP with %s (nice %s): unmarshaling INFO",
1234                                                         state.Node.Name, NicenessFmt(state.Nice),
1235                                                 )
1236                                         },
1237                                 )
1238                                 return nil, err
1239                         }
1240                         pktName := Base32Codec.EncodeToString(info.Hash[:])
1241                         lesp = append(
1242                                 lesp,
1243                                 LE{"Pkt", pktName},
1244                                 LE{"Size", int64(info.Size)},
1245                                 LE{"PktNice", int(info.Nice)},
1246                         )
1247                         logMsg := func(les LEs) string {
1248                                 return fmt.Sprintf(
1249                                         "SP with %s (nice %s): INFO %s (%s) nice %s",
1250                                         state.Node.Name, NicenessFmt(state.Nice),
1251                                         pktName,
1252                                         humanize.IBytes(info.Size),
1253                                         NicenessFmt(info.Nice),
1254                                 )
1255                         }
1256                         if !state.listOnly && info.Nice > state.Nice {
1257                                 state.Ctx.LogD("sp-process-info-too-nice", lesp, func(les LEs) string {
1258                                         return logMsg(les) + ": too nice"
1259                                 })
1260                                 continue
1261                         }
1262                         state.Ctx.LogD("sp-process-info-got", lesp, func(les LEs) string {
1263                                 return logMsg(les) + ": received"
1264                         })
1265                         if !state.listOnly && state.xxOnly == TTx {
1266                                 continue
1267                         }
1268                         state.Lock()
1269                         state.infosTheir[*info.Hash] = &info
1270                         state.Unlock()
1271                         state.Ctx.LogD("sp-process-info-stat", lesp, func(les LEs) string {
1272                                 return logMsg(les) + ": stating part"
1273                         })
1274                         pktPath := filepath.Join(
1275                                 state.Ctx.Spool,
1276                                 state.Node.Id.String(),
1277                                 string(TRx),
1278                                 Base32Codec.EncodeToString(info.Hash[:]),
1279                         )
1280                         logMsg = func(les LEs) string {
1281                                 return fmt.Sprintf(
1282                                         "Packet %s (%s) (nice %s)",
1283                                         pktName,
1284                                         humanize.IBytes(info.Size),
1285                                         NicenessFmt(info.Nice),
1286                                 )
1287                         }
1288                         if _, err = os.Stat(pktPath); err == nil {
1289                                 state.Ctx.LogI("sp-info-done", lesp, func(les LEs) string {
1290                                         return logMsg(les) + ": already done"
1291                                 })
1292                                 if !state.listOnly {
1293                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
1294                                 }
1295                                 continue
1296                         }
1297                         if _, err = os.Stat(pktPath + SeenSuffix); err == nil {
1298                                 state.Ctx.LogI("sp-info-seen", lesp, func(les LEs) string {
1299                                         return logMsg(les) + ": already seen"
1300                                 })
1301                                 if !state.listOnly {
1302                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
1303                                 }
1304                                 continue
1305                         }
1306                         if _, err = os.Stat(pktPath + NoCKSuffix); err == nil {
1307                                 state.Ctx.LogI("sp-info-nock", lesp, func(les LEs) string {
1308                                         return logMsg(les) + ": still not checksummed"
1309                                 })
1310                                 continue
1311                         }
1312                         fi, err := os.Stat(pktPath + PartSuffix)
1313                         var offset int64
1314                         if err == nil {
1315                                 offset = fi.Size()
1316                         }
1317                         if !state.Ctx.IsEnoughSpace(int64(info.Size) - offset) {
1318                                 state.Ctx.LogI("sp-info-no-space", lesp, func(les LEs) string {
1319                                         return logMsg(les) + ": not enough space"
1320                                 })
1321                                 continue
1322                         }
1323                         state.Ctx.LogI(
1324                                 "sp-info",
1325                                 append(lesp, LE{"Offset", offset}),
1326                                 func(les LEs) string {
1327                                         return fmt.Sprintf(
1328                                                 "%s: %d%%", logMsg(les), 100*uint64(offset)/info.Size,
1329                                         )
1330                                 },
1331                         )
1332                         if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) {
1333                                 replies = append(replies, MarshalSP(
1334                                         SPTypeFreq,
1335                                         SPFreq{info.Hash, uint64(offset)},
1336                                 ))
1337                         }
1338
1339                 case SPTypeFile:
1340                         lesp := append(les, LE{"Type", "file"})
1341                         state.Ctx.LogD("sp-process-file", lesp, func(les LEs) string {
1342                                 return fmt.Sprintf(
1343                                         "SP with %s (nice %s): unmarshaling FILE",
1344                                         state.Node.Name, NicenessFmt(state.Nice),
1345                                 )
1346                         })
1347                         var file SPFile
1348                         if _, err = xdr.Unmarshal(r, &file); err != nil {
1349                                 state.Ctx.LogE("sp-process-file", lesp, err, func(les LEs) string {
1350                                         return fmt.Sprintf(
1351                                                 "SP with %s (nice %s): unmarshaling FILE",
1352                                                 state.Node.Name, NicenessFmt(state.Nice),
1353                                         )
1354                                 })
1355                                 return nil, err
1356                         }
1357                         pktName := Base32Codec.EncodeToString(file.Hash[:])
1358                         lesp = append(
1359                                 lesp,
1360                                 LE{"XX", string(TRx)},
1361                                 LE{"Pkt", pktName},
1362                                 LE{"Size", int64(len(file.Payload))},
1363                         )
1364                         logMsg := func(les LEs) string {
1365                                 return fmt.Sprintf(
1366                                         "Got packet %s (%s)",
1367                                         pktName, humanize.IBytes(uint64(len(file.Payload))),
1368                                 )
1369                         }
1370                         dirToSync := filepath.Join(
1371                                 state.Ctx.Spool,
1372                                 state.Node.Id.String(),
1373                                 string(TRx),
1374                         )
1375                         filePath := filepath.Join(dirToSync, pktName)
1376                         filePathPart := filePath + PartSuffix
1377                         state.Ctx.LogD("sp-file-open", lesp, func(les LEs) string {
1378                                 return logMsg(les) + ": opening part"
1379                         })
1380                         state.fdsLock.RLock()
1381                         fdAndFullSize, exists := state.fds[filePathPart]
1382                         state.fdsLock.RUnlock()
1383                         var fd *os.File
1384                         if exists {
1385                                 fd = fdAndFullSize.fd
1386                         } else {
1387                                 fd, err = os.OpenFile(
1388                                         filePathPart,
1389                                         os.O_RDWR|os.O_CREATE,
1390                                         os.FileMode(0666),
1391                                 )
1392                                 if err != nil {
1393                                         state.Ctx.LogE("sp-file-open", lesp, err, func(les LEs) string {
1394                                                 return logMsg(les) + ": opening part"
1395                                         })
1396                                         return nil, err
1397                                 }
1398                                 state.fdsLock.Lock()
1399                                 state.fds[filePathPart] = FdAndFullSize{fd: fd}
1400                                 state.fdsLock.Unlock()
1401                                 if file.Offset == 0 {
1402                                         h, err := blake2b.New256(nil)
1403                                         if err != nil {
1404                                                 panic(err)
1405                                         }
1406                                         state.fileHashers[filePath] = &HasherAndOffset{h: h}
1407                                 }
1408                         }
1409                         state.Ctx.LogD(
1410                                 "sp-file-seek",
1411                                 append(lesp, LE{"Offset", file.Offset}),
1412                                 func(les LEs) string {
1413                                         return fmt.Sprintf("%s: seeking %d", logMsg(les), file.Offset)
1414                                 })
1415                         if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
1416                                 state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
1417                                         return logMsg(les) + ": seeking"
1418                                 })
1419                                 state.closeFd(filePathPart)
1420                                 return nil, err
1421                         }
1422                         state.Ctx.LogD("sp-file-write", lesp, func(les LEs) string {
1423                                 return logMsg(les) + ": writing"
1424                         })
1425                         if _, err = fd.Write(file.Payload); err != nil {
1426                                 state.Ctx.LogE("sp-file-write", lesp, err, func(les LEs) string {
1427                                         return logMsg(les) + ": writing"
1428                                 })
1429                                 state.closeFd(filePathPart)
1430                                 return nil, err
1431                         }
1432                         hasherAndOffset, hasherExists := state.fileHashers[filePath]
1433                         if hasherExists {
1434                                 if hasherAndOffset.offset == file.Offset {
1435                                         if _, err = hasherAndOffset.h.Write(file.Payload); err != nil {
1436                                                 panic(err)
1437                                         }
1438                                         hasherAndOffset.offset += uint64(len(file.Payload))
1439                                 } else {
1440                                         state.Ctx.LogD(
1441                                                 "sp-file-offset-differs", lesp,
1442                                                 func(les LEs) string {
1443                                                         return logMsg(les) + ": offset differs, deleting hasher"
1444                                                 },
1445                                         )
1446                                         delete(state.fileHashers, filePath)
1447                                         hasherExists = false
1448                                 }
1449                         }
1450                         ourSize := int64(file.Offset + uint64(len(file.Payload)))
1451                         lesp[len(lesp)-1].V = ourSize
1452                         fullsize := int64(0)
1453                         state.RLock()
1454                         infoTheir, ok := state.infosTheir[*file.Hash]
1455                         state.RUnlock()
1456                         if ok {
1457                                 fullsize = int64(infoTheir.Size)
1458                         }
1459                         lesp = append(lesp, LE{"FullSize", fullsize})
1460                         if state.Ctx.ShowPrgrs {
1461                                 state.progressBars[pktName] = struct{}{}
1462                                 Progress("Rx", lesp)
1463                         }
1464                         if fullsize != ourSize {
1465                                 continue
1466                         }
1467                         if state.Ctx.ShowPrgrs {
1468                                 delete(state.progressBars, pktName)
1469                         }
1470                         logMsg = func(les LEs) string {
1471                                 return fmt.Sprintf(
1472                                         "Got packet %s %d%% (%s / %s)",
1473                                         pktName, 100*ourSize/fullsize,
1474                                         humanize.IBytes(uint64(ourSize)),
1475                                         humanize.IBytes(uint64(fullsize)),
1476                                 )
1477                         }
1478                         err = fd.Sync()
1479                         if err != nil {
1480                                 state.Ctx.LogE("sp-file-sync", lesp, err, func(les LEs) string {
1481                                         return logMsg(les) + ": syncing"
1482                                 })
1483                                 state.closeFd(filePathPart)
1484                                 continue
1485                         }
1486                         if hasherExists {
1487                                 if bytes.Compare(hasherAndOffset.h.Sum(nil), file.Hash[:]) != 0 {
1488                                         state.Ctx.LogE(
1489                                                 "sp-file-bad-checksum", lesp,
1490                                                 errors.New("checksum mismatch"),
1491                                                 logMsg,
1492                                         )
1493                                         continue
1494                                 }
1495                                 if err = os.Rename(filePathPart, filePath); err != nil {
1496                                         state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string {
1497                                                 return logMsg(les) + ": renaming"
1498                                         })
1499                                         continue
1500                                 }
1501                                 if err = DirSync(dirToSync); err != nil {
1502                                         state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string {
1503                                                 return logMsg(les) + ": dirsyncing"
1504                                         })
1505                                         continue
1506                                 }
1507                                 state.Ctx.LogI("sp-file-done", lesp, func(les LEs) string {
1508                                         return logMsg(les) + ": done"
1509                                 })
1510                                 state.wg.Add(1)
1511                                 go func() {
1512                                         state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
1513                                         state.wg.Done()
1514                                 }()
1515                                 state.Lock()
1516                                 delete(state.infosTheir, *file.Hash)
1517                                 state.Unlock()
1518                                 if !state.Ctx.HdrUsage {
1519                                         state.closeFd(filePathPart)
1520                                         continue
1521                                 }
1522                                 if _, err = fd.Seek(0, io.SeekStart); err != nil {
1523                                         state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
1524                                                 return logMsg(les) + ": seeking"
1525                                         })
1526                                         state.closeFd(filePathPart)
1527                                         continue
1528                                 }
1529                                 _, pktEncRaw, err := state.Ctx.HdrRead(fd)
1530                                 state.closeFd(filePathPart)
1531                                 if err != nil {
1532                                         state.Ctx.LogE("sp-file-hdr-read", lesp, err, func(les LEs) string {
1533                                                 return logMsg(les) + ": HdrReading"
1534                                         })
1535                                         continue
1536                                 }
1537                                 state.Ctx.HdrWrite(pktEncRaw, filePath)
1538                                 continue
1539                         }
1540                         state.closeFd(filePathPart)
1541                         if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil {
1542                                 state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string {
1543                                         return logMsg(les) + ": renaming"
1544                                 })
1545                                 continue
1546                         }
1547                         if err = DirSync(dirToSync); err != nil {
1548                                 state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string {
1549                                         return logMsg(les) + ": dirsyncing"
1550                                 })
1551                                 continue
1552                         }
1553                         state.Ctx.LogI("sp-file-downloaded", lesp, func(les LEs) string {
1554                                 return logMsg(les) + ": downloaded"
1555                         })
1556                         state.Lock()
1557                         delete(state.infosTheir, *file.Hash)
1558                         state.Unlock()
1559                         if !state.NoCK {
1560                                 state.checkerQueues.appeared <- file.Hash
1561                         }
1562
1563                 case SPTypeDone:
1564                         lesp := append(les, LE{"Type", "done"})
1565                         state.Ctx.LogD("sp-process-done-unmarshal", lesp, func(les LEs) string {
1566                                 return fmt.Sprintf(
1567                                         "SP with %s (nice %s): unmarshaling DONE",
1568                                         state.Node.Name, NicenessFmt(state.Nice),
1569                                 )
1570                         })
1571                         var done SPDone
1572                         if _, err = xdr.Unmarshal(r, &done); err != nil {
1573                                 state.Ctx.LogE("sp-process-done-unmarshal", lesp, err, func(les LEs) string {
1574                                         return fmt.Sprintf(
1575                                                 "SP with %s (nice %s): unmarshaling DONE",
1576                                                 state.Node.Name, NicenessFmt(state.Nice),
1577                                         )
1578                                 })
1579                                 return nil, err
1580                         }
1581                         pktName := Base32Codec.EncodeToString(done.Hash[:])
1582                         lesp = append(lesp, LE{"Pkt", pktName}, LE{"XX", string(TTx)})
1583                         logMsg := func(les LEs) string {
1584                                 return fmt.Sprintf(
1585                                         "SP with %s (nice %s): DONE: removing %s",
1586                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1587                                 )
1588                         }
1589                         state.Ctx.LogD("sp-done", lesp, logMsg)
1590                         pth := filepath.Join(
1591                                 state.Ctx.Spool,
1592                                 state.Node.Id.String(),
1593                                 string(TTx),
1594                                 pktName,
1595                         )
1596                         if err = os.Remove(pth); err == nil {
1597                                 state.Ctx.LogI("sp-done", lesp, func(les LEs) string {
1598                                         return fmt.Sprintf("Packet %s is sent", pktName)
1599                                 })
1600                                 if state.Ctx.HdrUsage {
1601                                         os.Remove(pth + HdrSuffix)
1602                                 }
1603                         } else {
1604                                 state.Ctx.LogE("sp-done", lesp, err, logMsg)
1605                         }
1606
1607                 case SPTypeFreq:
1608                         lesp := append(les, LE{"Type", "freq"})
1609                         state.Ctx.LogD("sp-process-freq", lesp, func(les LEs) string {
1610                                 return fmt.Sprintf(
1611                                         "SP with %s (nice %s): unmarshaling FREQ",
1612                                         state.Node.Name, NicenessFmt(state.Nice),
1613                                 )
1614                         })
1615                         var freq SPFreq
1616                         if _, err = xdr.Unmarshal(r, &freq); err != nil {
1617                                 state.Ctx.LogE("sp-process-freq", lesp, err, func(les LEs) string {
1618                                         return fmt.Sprintf(
1619                                                 "SP with %s (nice %s): unmarshaling FREQ",
1620                                                 state.Node.Name, NicenessFmt(state.Nice),
1621                                         )
1622                                 })
1623                                 return nil, err
1624                         }
1625                         pktName := Base32Codec.EncodeToString(freq.Hash[:])
1626                         lesp = append(lesp, LE{"Pkt", pktName}, LE{"Offset", freq.Offset})
1627                         state.Ctx.LogD("sp-process-freq-queueing", lesp, func(les LEs) string {
1628                                 return fmt.Sprintf(
1629                                         "SP with %s (nice %s): FREQ %s: queuing",
1630                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1631                                 )
1632                         })
1633                         nice, exists := state.infosOurSeen[*freq.Hash]
1634                         if exists {
1635                                 if state.onlyPkts == nil || !state.onlyPkts[*freq.Hash] {
1636                                         state.Lock()
1637                                         insertIdx := 0
1638                                         var freqWithNice *FreqWithNice
1639                                         for insertIdx, freqWithNice = range state.queueTheir {
1640                                                 if freqWithNice.nice > nice {
1641                                                         break
1642                                                 }
1643                                         }
1644                                         state.queueTheir = append(state.queueTheir, nil)
1645                                         copy(state.queueTheir[insertIdx+1:], state.queueTheir[insertIdx:])
1646                                         state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice}
1647                                         state.Unlock()
1648                                 } else {
1649                                         state.Ctx.LogD("sp-process-freq-skip", lesp, func(les LEs) string {
1650                                                 return fmt.Sprintf(
1651                                                         "SP with %s (nice %s): FREQ %s: skipping",
1652                                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1653                                                 )
1654                                         })
1655                                 }
1656                         } else {
1657                                 state.Ctx.LogD("sp-process-freq-unknown", lesp, func(les LEs) string {
1658                                         return fmt.Sprintf(
1659                                                 "SP with %s (nice %s): FREQ %s: unknown",
1660                                                 state.Node.Name, NicenessFmt(state.Nice), pktName,
1661                                         )
1662                                 })
1663                         }
1664
1665                 default:
1666                         state.Ctx.LogE(
1667                                 "sp-process-type-unknown",
1668                                 append(les, LE{"Type", head.Type}),
1669                                 errors.New("unknown type"),
1670                                 func(les LEs) string {
1671                                         return fmt.Sprintf(
1672                                                 "SP with %s (nice %s): %d",
1673                                                 state.Node.Name, NicenessFmt(state.Nice), head.Type,
1674                                         )
1675                                 },
1676                         )
1677                         return nil, BadPktType
1678                 }
1679         }
1680
1681         if infosGot {
1682                 var pkts int
1683                 var size uint64
1684                 state.RLock()
1685                 for _, info := range state.infosTheir {
1686                         pkts++
1687                         size += info.Size
1688                 }
1689                 state.RUnlock()
1690                 state.Ctx.LogI("sp-infos-rx", LEs{
1691                         {"XX", string(TRx)},
1692                         {"Node", state.Node.Id},
1693                         {"Pkts", pkts},
1694                         {"Size", int64(size)},
1695                 }, func(les LEs) string {
1696                         return fmt.Sprintf(
1697                                 "%s has got for us: %d packets, %s",
1698                                 state.Node.Name, pkts, humanize.IBytes(size),
1699                         )
1700                 })
1701         }
1702         return payloadsSplit(replies), nil
1703 }