]> Cypherpunks.ru repositories - nncp.git/blob - src/sp.go
Fixed invalid "Size" type, leading to panic during Progress
[nncp.git] / src / sp.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bytes"
22         "crypto/subtle"
23         "errors"
24         "fmt"
25         "hash"
26         "io"
27         "os"
28         "path/filepath"
29         "sort"
30         "sync"
31         "time"
32
33         xdr "github.com/davecgh/go-xdr/xdr2"
34         "github.com/dustin/go-humanize"
35         "github.com/flynn/noise"
36         "golang.org/x/crypto/blake2b"
37 )
38
39 const (
40         MaxSPSize      = 1<<16 - 256
41         PartSuffix     = ".part"
42         SPHeadOverhead = 4
43 )
44
45 type SPCheckerQueues struct {
46         appeared chan *[32]byte
47         checked  chan *[32]byte
48 }
49
50 var (
51         MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1}
52
53         SPInfoOverhead    int
54         SPFreqOverhead    int
55         SPFileOverhead    int
56         SPHaltMarshalized []byte
57         SPPingMarshalized []byte
58
59         NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite(
60                 noise.DH25519,
61                 noise.CipherChaChaPoly,
62                 noise.HashBLAKE2b,
63         )
64
65         DefaultDeadline = 10 * time.Second
66         PingTimeout     = time.Minute
67
68         spCheckers = make(map[NodeId]*SPCheckerQueues)
69 )
70
71 type FdAndFullSize struct {
72         fd       *os.File
73         fullSize int64
74 }
75
76 type HasherAndOffset struct {
77         h      hash.Hash
78         offset uint64
79 }
80
81 type SPType uint8
82
83 const (
84         SPTypeInfo SPType = iota
85         SPTypeFreq SPType = iota
86         SPTypeFile SPType = iota
87         SPTypeDone SPType = iota
88         SPTypeHalt SPType = iota
89         SPTypePing SPType = iota
90 )
91
92 type SPHead struct {
93         Type SPType
94 }
95
96 type SPInfo struct {
97         Nice uint8
98         Size uint64
99         Hash *[32]byte
100 }
101
102 type SPFreq struct {
103         Hash   *[32]byte
104         Offset uint64
105 }
106
107 type SPFile struct {
108         Hash    *[32]byte
109         Offset  uint64
110         Payload []byte
111 }
112
113 type SPDone struct {
114         Hash *[32]byte
115 }
116
117 type SPRaw struct {
118         Magic   [8]byte
119         Payload []byte
120 }
121
122 type FreqWithNice struct {
123         freq *SPFreq
124         nice uint8
125 }
126
127 type ConnDeadlined interface {
128         io.ReadWriteCloser
129         SetReadDeadline(t time.Time) error
130         SetWriteDeadline(t time.Time) error
131 }
132
133 func init() {
134         var buf bytes.Buffer
135         spHead := SPHead{Type: SPTypeHalt}
136         if _, err := xdr.Marshal(&buf, spHead); err != nil {
137                 panic(err)
138         }
139         SPHaltMarshalized = make([]byte, SPHeadOverhead)
140         copy(SPHaltMarshalized, buf.Bytes())
141         buf.Reset()
142
143         spHead = SPHead{Type: SPTypePing}
144         if _, err := xdr.Marshal(&buf, spHead); err != nil {
145                 panic(err)
146         }
147         SPPingMarshalized = make([]byte, SPHeadOverhead)
148         copy(SPPingMarshalized, buf.Bytes())
149         buf.Reset()
150
151         spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([32]byte)}
152         if _, err := xdr.Marshal(&buf, spInfo); err != nil {
153                 panic(err)
154         }
155         SPInfoOverhead = buf.Len()
156         buf.Reset()
157
158         spFreq := SPFreq{Hash: new([32]byte), Offset: 123}
159         if _, err := xdr.Marshal(&buf, spFreq); err != nil {
160                 panic(err)
161         }
162         SPFreqOverhead = buf.Len()
163         buf.Reset()
164
165         spFile := SPFile{Hash: new([32]byte), Offset: 123}
166         if _, err := xdr.Marshal(&buf, spFile); err != nil {
167                 panic(err)
168         }
169         SPFileOverhead = buf.Len()
170 }
171
172 func MarshalSP(typ SPType, sp interface{}) []byte {
173         var buf bytes.Buffer
174         if _, err := xdr.Marshal(&buf, SPHead{typ}); err != nil {
175                 panic(err)
176         }
177         if _, err := xdr.Marshal(&buf, sp); err != nil {
178                 panic(err)
179         }
180         return buf.Bytes()
181 }
182
183 func payloadsSplit(payloads [][]byte) [][]byte {
184         var outbounds [][]byte
185         outbound := make([]byte, 0, MaxSPSize)
186         for i, payload := range payloads {
187                 outbound = append(outbound, payload...)
188                 if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxSPSize {
189                         outbounds = append(outbounds, outbound)
190                         outbound = make([]byte, 0, MaxSPSize)
191                 }
192         }
193         if len(outbound) > 0 {
194                 outbounds = append(outbounds, outbound)
195         }
196         return outbounds
197 }
198
199 type SPState struct {
200         Ctx            *Ctx
201         Node           *Node
202         Nice           uint8
203         NoCK           bool
204         onlineDeadline time.Duration
205         maxOnlineTime  time.Duration
206         hs             *noise.HandshakeState
207         csOur          *noise.CipherState
208         csTheir        *noise.CipherState
209         payloads       chan []byte
210         pings          chan struct{}
211         infosTheir     map[[32]byte]*SPInfo
212         infosOurSeen   map[[32]byte]uint8
213         queueTheir     []*FreqWithNice
214         wg             sync.WaitGroup
215         RxBytes        int64
216         RxLastSeen     time.Time
217         RxLastNonPing  time.Time
218         TxBytes        int64
219         TxLastSeen     time.Time
220         TxLastNonPing  time.Time
221         started        time.Time
222         mustFinishAt   time.Time
223         Duration       time.Duration
224         RxSpeed        int64
225         TxSpeed        int64
226         rxLock         *os.File
227         txLock         *os.File
228         xxOnly         TRxTx
229         rxRate         int
230         txRate         int
231         isDead         chan struct{}
232         listOnly       bool
233         onlyPkts       map[[32]byte]bool
234         writeSPBuf     bytes.Buffer
235         fds            map[string]FdAndFullSize
236         fileHashers    map[string]*HasherAndOffset
237         checkerQueues  SPCheckerQueues
238         sync.RWMutex
239 }
240
241 func (state *SPState) SetDead() {
242         state.Lock()
243         defer state.Unlock()
244         select {
245         case <-state.isDead:
246                 // Already closed channel, dead
247                 return
248         default:
249         }
250         close(state.isDead)
251         go func() {
252                 for range state.payloads {
253                 }
254         }()
255         go func() {
256                 for range state.pings {
257                 }
258         }()
259         go func() {
260                 for _, s := range state.fds {
261                         s.fd.Close()
262                 }
263         }()
264 }
265
266 func (state *SPState) NotAlive() bool {
267         select {
268         case <-state.isDead:
269                 return true
270         default:
271         }
272         return false
273 }
274
275 func (state *SPState) dirUnlock() {
276         state.Ctx.UnlockDir(state.rxLock)
277         state.Ctx.UnlockDir(state.txLock)
278 }
279
280 func SPChecker(ctx *Ctx, nodeId *NodeId, appeared, checked chan *[32]byte) {
281         for hshValue := range appeared {
282                 pktName := Base32Codec.EncodeToString(hshValue[:])
283                 les := LEs{
284                         {"XX", string(TRx)},
285                         {"Node", nodeId},
286                         {"Pkt", pktName},
287                 }
288                 ctx.LogD("sp-checker", les, func(les LEs) string {
289                         return fmt.Sprintf("Checksumming %s/rx/%s", ctx.NodeName(nodeId), pktName)
290                 })
291                 size, err := ctx.CheckNoCK(nodeId, hshValue)
292                 les = append(les, LE{"Size", size})
293                 if err != nil {
294                         ctx.LogE("sp-checker", les, err, func(les LEs) string {
295                                 return fmt.Sprintf(
296                                         "Checksumming %s/rx/%s (%s)", ctx.NodeName(nodeId), pktName,
297                                         humanize.IBytes(uint64(size)),
298                                 )
299                         })
300                         continue
301                 }
302                 ctx.LogI("sp-checker-done", les, func(les LEs) string {
303                         return fmt.Sprintf(
304                                 "Packet %s is retreived (%s)",
305                                 pktName, humanize.IBytes(uint64(size)),
306                         )
307                 })
308                 go func(hsh *[32]byte) { checked <- hsh }(hshValue)
309         }
310 }
311
312 func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error {
313         state.writeSPBuf.Reset()
314         n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{
315                 Magic:   MagicNNCPLv1,
316                 Payload: payload,
317         })
318         if err != nil {
319                 return err
320         }
321         if n, err = dst.Write(state.writeSPBuf.Bytes()); err == nil {
322                 state.TxLastSeen = time.Now()
323                 state.TxBytes += int64(n)
324                 if !ping {
325                         state.TxLastNonPing = state.TxLastSeen
326                 }
327         }
328         return err
329 }
330
331 func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
332         var sp SPRaw
333         n, err := xdr.UnmarshalLimited(src, &sp, 1<<17)
334         if err != nil {
335                 ue := err.(*xdr.UnmarshalError)
336                 if ue.Err == io.EOF {
337                         return nil, ue.Err
338                 }
339                 return nil, err
340         }
341         state.RxLastSeen = time.Now()
342         state.RxBytes += int64(n)
343         if sp.Magic != MagicNNCPLv1 {
344                 return nil, BadMagic
345         }
346         return sp.Payload, nil
347 }
348
349 func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [][]byte {
350         var infos []*SPInfo
351         var totalSize int64
352         for job := range ctx.Jobs(nodeId, TTx) {
353                 if job.PktEnc.Nice > nice {
354                         continue
355                 }
356                 if _, known := (*seen)[*job.HshValue]; known {
357                         continue
358                 }
359                 totalSize += job.Size
360                 infos = append(infos, &SPInfo{
361                         Nice: job.PktEnc.Nice,
362                         Size: uint64(job.Size),
363                         Hash: job.HshValue,
364                 })
365                 (*seen)[*job.HshValue] = job.PktEnc.Nice
366         }
367         sort.Sort(ByNice(infos))
368         var payloads [][]byte
369         for _, info := range infos {
370                 payloads = append(payloads, MarshalSP(SPTypeInfo, info))
371                 pktName := Base32Codec.EncodeToString(info.Hash[:])
372                 ctx.LogD("sp-info-our", LEs{
373                         {"Node", nodeId},
374                         {"Name", pktName},
375                         {"Size", info.Size},
376                 }, func(les LEs) string {
377                         return fmt.Sprintf(
378                                 "Our info: %s/tx/%s (%s)",
379                                 ctx.NodeName(nodeId),
380                                 pktName,
381                                 humanize.IBytes(info.Size),
382                         )
383                 })
384         }
385         if totalSize > 0 {
386                 ctx.LogI("sp-infos-tx", LEs{
387                         {"XX", string(TTx)},
388                         {"Node", nodeId},
389                         {"Pkts", len(payloads)},
390                         {"Size", totalSize},
391                 }, func(les LEs) string {
392                         return fmt.Sprintf(
393                                 "We have got for %s: %d packets, %s",
394                                 ctx.NodeName(nodeId),
395                                 len(payloads),
396                                 humanize.IBytes(uint64(totalSize)),
397                         )
398                 })
399         }
400         return payloadsSplit(payloads)
401 }
402
403 func (state *SPState) StartI(conn ConnDeadlined) error {
404         nodeId := state.Node.Id
405         err := state.Ctx.ensureRxDir(nodeId)
406         if err != nil {
407                 return err
408         }
409         var rxLock *os.File
410         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TRx) {
411                 rxLock, err = state.Ctx.LockDir(nodeId, string(TRx))
412                 if err != nil {
413                         return err
414                 }
415         }
416         var txLock *os.File
417         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
418                 txLock, err = state.Ctx.LockDir(nodeId, string(TTx))
419                 if err != nil {
420                         return err
421                 }
422         }
423         started := time.Now()
424         conf := noise.Config{
425                 CipherSuite: NoiseCipherSuite,
426                 Pattern:     noise.HandshakeIK,
427                 Initiator:   true,
428                 StaticKeypair: noise.DHKey{
429                         Private: state.Ctx.Self.NoisePrv[:],
430                         Public:  state.Ctx.Self.NoisePub[:],
431                 },
432                 PeerStatic: state.Node.NoisePub[:],
433         }
434         hs, err := noise.NewHandshakeState(conf)
435         if err != nil {
436                 return err
437         }
438         state.hs = hs
439         state.payloads = make(chan []byte)
440         state.pings = make(chan struct{})
441         state.infosTheir = make(map[[32]byte]*SPInfo)
442         state.infosOurSeen = make(map[[32]byte]uint8)
443         state.started = started
444         state.rxLock = rxLock
445         state.txLock = txLock
446
447         var infosPayloads [][]byte
448         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
449                 infosPayloads = state.Ctx.infosOur(nodeId, state.Nice, &state.infosOurSeen)
450         }
451         var firstPayload []byte
452         if len(infosPayloads) > 0 {
453                 firstPayload = infosPayloads[0]
454         }
455         // Pad first payload, to hide actual number of existing files
456         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
457                 firstPayload = append(firstPayload, SPHaltMarshalized...)
458         }
459
460         var buf []byte
461         var payload []byte
462         buf, _, _, err = state.hs.WriteMessage(nil, firstPayload)
463         if err != nil {
464                 state.dirUnlock()
465                 return err
466         }
467         les := LEs{{"Node", nodeId}, {"Nice", int(state.Nice)}}
468         state.Ctx.LogD("sp-startI", les, func(les LEs) string {
469                 return fmt.Sprintf(
470                         "SP with %s (nice %s): sending first message",
471                         state.Node.Name,
472                         NicenessFmt(state.Nice),
473                 )
474         })
475         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
476         if err = state.WriteSP(conn, buf, false); err != nil {
477                 state.Ctx.LogE("sp-startI", les, err, func(les LEs) string {
478                         return fmt.Sprintf(
479                                 "SP with %s (nice %s): writing",
480                                 state.Node.Name,
481                                 NicenessFmt(state.Nice),
482                         )
483                 })
484                 state.dirUnlock()
485                 return err
486         }
487         state.Ctx.LogD("sp-startI-wait", les, func(les LEs) string {
488                 return fmt.Sprintf(
489                         "SP with %s (nice %s): waiting for first message",
490                         state.Node.Name,
491                         NicenessFmt(state.Nice),
492                 )
493         })
494         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
495         if buf, err = state.ReadSP(conn); err != nil {
496                 state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string {
497                         return fmt.Sprintf(
498                                 "SP with %s (nice %s): reading",
499                                 state.Node.Name,
500                                 NicenessFmt(state.Nice),
501                         )
502                 })
503                 state.dirUnlock()
504                 return err
505         }
506         payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
507         if err != nil {
508                 state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string {
509                         return fmt.Sprintf(
510                                 "SP with %s (nice %s): reading Noise message",
511                                 state.Node.Name,
512                                 NicenessFmt(state.Nice),
513                         )
514                 })
515                 state.dirUnlock()
516                 return err
517         }
518         state.Ctx.LogD("sp-startI-workers", les, func(les LEs) string {
519                 return fmt.Sprintf(
520                         "SP with %s (nice %s): starting workers",
521                         state.Node.Name,
522                         NicenessFmt(state.Nice),
523                 )
524         })
525         err = state.StartWorkers(conn, infosPayloads, payload)
526         if err != nil {
527                 state.Ctx.LogE("sp-startI-workers", les, err, func(les LEs) string {
528                         return fmt.Sprintf(
529                                 "SP with %s (nice %s): starting workers",
530                                 state.Node.Name,
531                                 NicenessFmt(state.Nice),
532                         )
533                 })
534                 state.dirUnlock()
535         }
536         return err
537 }
538
539 func (state *SPState) StartR(conn ConnDeadlined) error {
540         started := time.Now()
541         conf := noise.Config{
542                 CipherSuite: NoiseCipherSuite,
543                 Pattern:     noise.HandshakeIK,
544                 Initiator:   false,
545                 StaticKeypair: noise.DHKey{
546                         Private: state.Ctx.Self.NoisePrv[:],
547                         Public:  state.Ctx.Self.NoisePub[:],
548                 },
549         }
550         hs, err := noise.NewHandshakeState(conf)
551         if err != nil {
552                 return err
553         }
554         xxOnly := TRxTx("")
555         state.hs = hs
556         state.payloads = make(chan []byte)
557         state.pings = make(chan struct{})
558         state.infosOurSeen = make(map[[32]byte]uint8)
559         state.infosTheir = make(map[[32]byte]*SPInfo)
560         state.started = started
561         state.xxOnly = xxOnly
562
563         var buf []byte
564         var payload []byte
565         logMsg := func(les LEs) string {
566                 return fmt.Sprintf(
567                         "SP nice %s: waiting for first message",
568                         NicenessFmt(state.Nice),
569                 )
570         }
571         les := LEs{{"Nice", int(state.Nice)}}
572         state.Ctx.LogD("sp-startR", les, logMsg)
573         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
574         if buf, err = state.ReadSP(conn); err != nil {
575                 state.Ctx.LogE("sp-startR-read", les, err, logMsg)
576                 return err
577         }
578         if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
579                 state.Ctx.LogE("sp-startR-read", les, err, logMsg)
580                 return err
581         }
582
583         var node *Node
584         for _, n := range state.Ctx.Neigh {
585                 if subtle.ConstantTimeCompare(state.hs.PeerStatic(), n.NoisePub[:]) == 1 {
586                         node = n
587                         break
588                 }
589         }
590         if node == nil {
591                 peerId := Base32Codec.EncodeToString(state.hs.PeerStatic())
592                 err = errors.New("unknown peer: " + peerId)
593                 state.Ctx.LogE("sp-startR-unknown", append(les, LE{"Peer", peerId}), err, logMsg)
594                 return err
595         }
596         state.Node = node
597         state.rxRate = node.RxRate
598         state.txRate = node.TxRate
599         state.onlineDeadline = node.OnlineDeadline
600         state.maxOnlineTime = node.MaxOnlineTime
601         les = LEs{{"Node", node.Id}, {"Nice", int(state.Nice)}}
602
603         if err = state.Ctx.ensureRxDir(node.Id); err != nil {
604                 return err
605         }
606         var rxLock *os.File
607         if xxOnly == "" || xxOnly == TRx {
608                 rxLock, err = state.Ctx.LockDir(node.Id, string(TRx))
609                 if err != nil {
610                         return err
611                 }
612         }
613         state.rxLock = rxLock
614         var txLock *os.File
615         if xxOnly == "" || xxOnly == TTx {
616                 txLock, err = state.Ctx.LockDir(node.Id, string(TTx))
617                 if err != nil {
618                         return err
619                 }
620         }
621         state.txLock = txLock
622
623         var infosPayloads [][]byte
624         if xxOnly == "" || xxOnly == TTx {
625                 infosPayloads = state.Ctx.infosOur(node.Id, state.Nice, &state.infosOurSeen)
626         }
627         var firstPayload []byte
628         if len(infosPayloads) > 0 {
629                 firstPayload = infosPayloads[0]
630         }
631         // Pad first payload, to hide actual number of existing files
632         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
633                 firstPayload = append(firstPayload, SPHaltMarshalized...)
634         }
635
636         state.Ctx.LogD("sp-startR-write", les, func(les LEs) string {
637                 return fmt.Sprintf(
638                         "SP with %s (nice %s): sending first message",
639                         node.Name, NicenessFmt(state.Nice),
640                 )
641         })
642         buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload)
643         if err != nil {
644                 state.dirUnlock()
645                 return err
646         }
647         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
648         if err = state.WriteSP(conn, buf, false); err != nil {
649                 state.Ctx.LogE("sp-startR-write", les, err, func(les LEs) string {
650                         return fmt.Sprintf(
651                                 "SP with %s (nice %s): writing",
652                                 node.Name, NicenessFmt(state.Nice),
653                         )
654                 })
655                 state.dirUnlock()
656                 return err
657         }
658         state.Ctx.LogD("sp-startR-workers", les, func(les LEs) string {
659                 return fmt.Sprintf(
660                         "SP with %s (nice %s): starting workers",
661                         node.Name, NicenessFmt(state.Nice),
662                 )
663         })
664         err = state.StartWorkers(conn, infosPayloads, payload)
665         if err != nil {
666                 state.dirUnlock()
667         }
668         return err
669 }
670
671 func (state *SPState) closeFd(pth string) {
672         s, exists := state.fds[pth]
673         delete(state.fds, pth)
674         if exists {
675                 s.fd.Close()
676         }
677 }
678
679 func (state *SPState) StartWorkers(
680         conn ConnDeadlined,
681         infosPayloads [][]byte,
682         payload []byte,
683 ) error {
684         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
685         state.fds = make(map[string]FdAndFullSize)
686         state.fileHashers = make(map[string]*HasherAndOffset)
687         state.isDead = make(chan struct{})
688         if state.maxOnlineTime > 0 {
689                 state.mustFinishAt = state.started.Add(state.maxOnlineTime)
690         }
691
692         // Checker
693         if !state.NoCK {
694                 queues := spCheckers[*state.Node.Id]
695                 if queues == nil {
696                         queues = &SPCheckerQueues{
697                                 appeared: make(chan *[32]byte),
698                                 checked:  make(chan *[32]byte),
699                         }
700                         spCheckers[*state.Node.Id] = queues
701                         go SPChecker(state.Ctx, state.Node.Id, queues.appeared, queues.checked)
702                 }
703                 state.checkerQueues = *queues
704                 go func() {
705                         for job := range state.Ctx.JobsNoCK(state.Node.Id) {
706                                 if job.PktEnc.Nice <= state.Nice {
707                                         state.checkerQueues.appeared <- job.HshValue
708                                 }
709                         }
710                 }()
711                 state.wg.Add(1)
712                 go func() {
713                         defer state.wg.Done()
714                         for {
715                                 select {
716                                 case <-state.isDead:
717                                         return
718                                 case hsh := <-state.checkerQueues.checked:
719                                         state.payloads <- MarshalSP(SPTypeDone, SPDone{hsh})
720                                 }
721                         }
722                 }()
723         }
724
725         // Remaining handshake payload sending
726         if len(infosPayloads) > 1 {
727                 state.wg.Add(1)
728                 go func() {
729                         for _, payload := range infosPayloads[1:] {
730                                 state.Ctx.LogD(
731                                         "sp-queue-remaining",
732                                         append(les, LE{"Size", int64(len(payload))}),
733                                         func(les LEs) string {
734                                                 return fmt.Sprintf(
735                                                         "SP with %s (nice %s): queuing remaining payload (%s)",
736                                                         state.Node.Name, NicenessFmt(state.Nice),
737                                                         humanize.IBytes(uint64(len(payload))),
738                                                 )
739                                         },
740                                 )
741                                 state.payloads <- payload
742                         }
743                         state.wg.Done()
744                 }()
745         }
746
747         // Processing of first payload and queueing its responses
748         logMsg := func(les LEs) string {
749                 return fmt.Sprintf(
750                         "SP with %s (nice %s): processing first payload (%s)",
751                         state.Node.Name, NicenessFmt(state.Nice),
752                         humanize.IBytes(uint64(len(payload))),
753                 )
754         }
755         state.Ctx.LogD("sp-process", append(les, LE{"Size", int64(len(payload))}), logMsg)
756         replies, err := state.ProcessSP(payload)
757         if err != nil {
758                 state.Ctx.LogE("sp-process", les, err, logMsg)
759                 return err
760         }
761         state.wg.Add(1)
762         go func() {
763                 for _, reply := range replies {
764                         state.Ctx.LogD(
765                                 "sp-queue-reply",
766                                 append(les, LE{"Size", int64(len(reply))}),
767                                 func(les LEs) string {
768                                         return fmt.Sprintf(
769                                                 "SP with %s (nice %s): queuing reply (%s)",
770                                                 state.Node.Name, NicenessFmt(state.Nice),
771                                                 humanize.IBytes(uint64(len(payload))),
772                                         )
773                                 },
774                         )
775                         state.payloads <- reply
776                 }
777                 state.wg.Done()
778         }()
779
780         // Periodic jobs
781         state.wg.Add(1)
782         go func() {
783                 deadlineTicker := time.NewTicker(time.Second)
784                 pingTicker := time.NewTicker(PingTimeout)
785                 for {
786                         select {
787                         case <-state.isDead:
788                                 state.wg.Done()
789                                 deadlineTicker.Stop()
790                                 pingTicker.Stop()
791                                 return
792                         case now := <-deadlineTicker.C:
793                                 if (now.Sub(state.RxLastNonPing) >= state.onlineDeadline &&
794                                         now.Sub(state.TxLastNonPing) >= state.onlineDeadline) ||
795                                         (state.maxOnlineTime > 0 && state.mustFinishAt.Before(now)) ||
796                                         (now.Sub(state.RxLastSeen) >= 2*PingTimeout) {
797                                         state.SetDead()
798                                         conn.Close() // #nosec G104
799                                 }
800                         case now := <-pingTicker.C:
801                                 if now.After(state.TxLastSeen.Add(PingTimeout)) {
802                                         state.wg.Add(1)
803                                         go func() {
804                                                 state.pings <- struct{}{}
805                                                 state.wg.Done()
806                                         }()
807                                 }
808                         }
809                 }
810         }()
811
812         // Spool checker and INFOs sender of appearing files
813         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
814                 state.wg.Add(1)
815                 go func() {
816                         ticker := time.NewTicker(time.Second)
817                         for {
818                                 select {
819                                 case <-state.isDead:
820                                         state.wg.Done()
821                                         ticker.Stop()
822                                         return
823                                 case <-ticker.C:
824                                         for _, payload := range state.Ctx.infosOur(
825                                                 state.Node.Id,
826                                                 state.Nice,
827                                                 &state.infosOurSeen,
828                                         ) {
829                                                 state.Ctx.LogD(
830                                                         "sp-queue-info",
831                                                         append(les, LE{"Size", int64(len(payload))}),
832                                                         func(les LEs) string {
833                                                                 return fmt.Sprintf(
834                                                                         "SP with %s (nice %s): queuing new info (%s)",
835                                                                         state.Node.Name, NicenessFmt(state.Nice),
836                                                                         humanize.IBytes(uint64(len(payload))),
837                                                                 )
838                                                         },
839                                                 )
840                                                 state.payloads <- payload
841                                         }
842                                 }
843                         }
844                 }()
845         }
846
847         // Sender
848         state.wg.Add(1)
849         go func() {
850                 defer conn.Close()
851                 defer state.SetDead()
852                 defer state.wg.Done()
853                 for {
854                         if state.NotAlive() {
855                                 return
856                         }
857                         var payload []byte
858                         var ping bool
859                         select {
860                         case <-state.pings:
861                                 state.Ctx.LogD("sp-got-ping", les, func(les LEs) string {
862                                         return fmt.Sprintf(
863                                                 "SP with %s (nice %s): got ping",
864                                                 state.Node.Name, NicenessFmt(state.Nice),
865                                         )
866                                 })
867                                 payload = SPPingMarshalized
868                                 ping = true
869                         case payload = <-state.payloads:
870                                 state.Ctx.LogD(
871                                         "sp-got-payload",
872                                         append(les, LE{"Size", int64(len(payload))}),
873                                         func(les LEs) string {
874                                                 return fmt.Sprintf(
875                                                         "SP with %s (nice %s): got payload (%s)",
876                                                         state.Node.Name, NicenessFmt(state.Nice),
877                                                         humanize.IBytes(uint64(len(payload))),
878                                                 )
879                                         },
880                                 )
881                         default:
882                                 state.RLock()
883                                 if len(state.queueTheir) == 0 {
884                                         state.RUnlock()
885                                         time.Sleep(100 * time.Millisecond)
886                                         continue
887                                 }
888                                 freq := state.queueTheir[0].freq
889                                 state.RUnlock()
890                                 if state.txRate > 0 {
891                                         time.Sleep(time.Second / time.Duration(state.txRate))
892                                 }
893                                 pktName := Base32Codec.EncodeToString(freq.Hash[:])
894                                 lesp := append(
895                                         les,
896                                         LE{"XX", string(TTx)},
897                                         LE{"Pkt", pktName},
898                                         LE{"Size", int64(freq.Offset)},
899                                 )
900                                 logMsg := func(les LEs) string {
901                                         return fmt.Sprintf(
902                                                 "SP with %s (nice %s): tx/%s (%s)",
903                                                 state.Node.Name, NicenessFmt(state.Nice),
904                                                 pktName,
905                                                 humanize.IBytes(freq.Offset),
906                                         )
907                                 }
908                                 state.Ctx.LogD("sp-queue", lesp, func(les LEs) string {
909                                         return logMsg(les) + ": queueing"
910                                 })
911                                 pth := filepath.Join(
912                                         state.Ctx.Spool,
913                                         state.Node.Id.String(),
914                                         string(TTx),
915                                         Base32Codec.EncodeToString(freq.Hash[:]),
916                                 )
917                                 fdAndFullSize, exists := state.fds[pth]
918                                 if !exists {
919                                         fd, err := os.Open(pth)
920                                         if err != nil {
921                                                 state.Ctx.LogE("sp-queue-open", lesp, err, func(les LEs) string {
922                                                         return logMsg(les) + ": opening"
923                                                 })
924                                                 return
925                                         }
926                                         fi, err := fd.Stat()
927                                         if err != nil {
928                                                 state.Ctx.LogE("sp-queue-stat", lesp, err, func(les LEs) string {
929                                                         return logMsg(les) + ": stating"
930                                                 })
931                                                 return
932                                         }
933                                         fdAndFullSize = FdAndFullSize{fd: fd, fullSize: fi.Size()}
934                                         state.fds[pth] = fdAndFullSize
935                                 }
936                                 fd := fdAndFullSize.fd
937                                 fullSize := fdAndFullSize.fullSize
938                                 var buf []byte
939                                 if freq.Offset < uint64(fullSize) {
940                                         state.Ctx.LogD("sp-file-seek", lesp, func(les LEs) string {
941                                                 return logMsg(les) + ": seeking"
942                                         })
943                                         if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil {
944                                                 state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
945                                                         return logMsg(les) + ": seeking"
946                                                 })
947                                                 return
948                                         }
949                                         buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
950                                         n, err := fd.Read(buf)
951                                         if err != nil {
952                                                 state.Ctx.LogE("sp-file-read", lesp, err, func(les LEs) string {
953                                                         return logMsg(les) + ": reading"
954                                                 })
955                                                 return
956                                         }
957                                         buf = buf[:n]
958                                         lesp = append(
959                                                 les,
960                                                 LE{"XX", string(TTx)},
961                                                 LE{"Pkt", pktName},
962                                                 LE{"Size", int64(n)},
963                                         )
964                                         state.Ctx.LogD("sp-file-read", lesp, func(les LEs) string {
965                                                 return fmt.Sprintf(
966                                                         "%s: read %s",
967                                                         logMsg(les), humanize.IBytes(uint64(n)),
968                                                 )
969                                         })
970                                 }
971                                 state.closeFd(pth)
972                                 payload = MarshalSP(SPTypeFile, SPFile{
973                                         Hash:    freq.Hash,
974                                         Offset:  freq.Offset,
975                                         Payload: buf,
976                                 })
977                                 ourSize := freq.Offset + uint64(len(buf))
978                                 lesp = append(
979                                         les,
980                                         LE{"XX", string(TTx)},
981                                         LE{"Pkt", pktName},
982                                         LE{"Size", int64(ourSize)},
983                                         LE{"FullSize", fullSize},
984                                 )
985                                 if state.Ctx.ShowPrgrs {
986                                         Progress("Tx", lesp)
987                                 }
988                                 state.Lock()
989                                 if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash {
990                                         if ourSize == uint64(fullSize) {
991                                                 state.Ctx.LogD("sp-file-finished", lesp, func(les LEs) string {
992                                                         return logMsg(les) + ": finished"
993                                                 })
994                                                 if len(state.queueTheir) > 1 {
995                                                         state.queueTheir = state.queueTheir[1:]
996                                                 } else {
997                                                         state.queueTheir = state.queueTheir[:0]
998                                                 }
999                                         } else {
1000                                                 state.queueTheir[0].freq.Offset += uint64(len(buf))
1001                                         }
1002                                 } else {
1003                                         state.Ctx.LogD("sp-file-disappeared", lesp, func(les LEs) string {
1004                                                 return logMsg(les) + ": queue disappeared"
1005                                         })
1006                                 }
1007                                 state.Unlock()
1008                         }
1009                         logMsg := func(les LEs) string {
1010                                 return fmt.Sprintf(
1011                                         "SP with %s (nice %s): sending %s",
1012                                         state.Node.Name, NicenessFmt(state.Nice),
1013                                         humanize.IBytes(uint64(len(payload))),
1014                                 )
1015                         }
1016                         state.Ctx.LogD("sp-sending", append(les, LE{"Size", int64(len(payload))}), logMsg)
1017                         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
1018                         if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload), ping); err != nil {
1019                                 state.Ctx.LogE("sp-sending", les, err, logMsg)
1020                                 return
1021                         }
1022                 }
1023         }()
1024
1025         // Receiver
1026         state.wg.Add(1)
1027         go func() {
1028                 for {
1029                         if state.NotAlive() {
1030                                 break
1031                         }
1032                         logMsg := func(les LEs) string {
1033                                 return fmt.Sprintf(
1034                                         "SP with %s (nice %s): waiting for payload",
1035                                         state.Node.Name, NicenessFmt(state.Nice),
1036                                 )
1037                         }
1038                         state.Ctx.LogD("sp-recv-wait", les, logMsg)
1039                         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
1040                         payload, err := state.ReadSP(conn)
1041                         if err != nil {
1042                                 if err == io.EOF {
1043                                         break
1044                                 }
1045                                 unmarshalErr := err.(*xdr.UnmarshalError)
1046                                 if os.IsTimeout(unmarshalErr.Err) {
1047                                         continue
1048                                 }
1049                                 if unmarshalErr.ErrorCode == xdr.ErrIO {
1050                                         break
1051                                 }
1052                                 state.Ctx.LogE("sp-recv-wait", les, err, logMsg)
1053                                 break
1054                         }
1055                         logMsg = func(les LEs) string {
1056                                 return fmt.Sprintf(
1057                                         "SP with %s (nice %s): payload (%s)",
1058                                         state.Node.Name, NicenessFmt(state.Nice),
1059                                         humanize.IBytes(uint64(len(payload))),
1060                                 )
1061                         }
1062                         state.Ctx.LogD(
1063                                 "sp-recv-got",
1064                                 append(les, LE{"Size", int64(len(payload))}),
1065                                 func(les LEs) string { return logMsg(les) + ": got" },
1066                         )
1067                         payload, err = state.csTheir.Decrypt(nil, nil, payload)
1068                         if err != nil {
1069                                 state.Ctx.LogE("sp-recv-got", les, err, func(les LEs) string {
1070                                         return logMsg(les) + ": got"
1071                                 })
1072                                 break
1073                         }
1074                         state.Ctx.LogD(
1075                                 "sp-recv-process",
1076                                 append(les, LE{"Size", int64(len(payload))}),
1077                                 func(les LEs) string {
1078                                         return logMsg(les) + ": processing"
1079                                 },
1080                         )
1081                         replies, err := state.ProcessSP(payload)
1082                         if err != nil {
1083                                 state.Ctx.LogE("sp-recv-process", les, err, func(les LEs) string {
1084                                         return logMsg(les) + ": processing"
1085                                 })
1086                                 break
1087                         }
1088                         state.wg.Add(1)
1089                         go func() {
1090                                 for _, reply := range replies {
1091                                         state.Ctx.LogD(
1092                                                 "sp-recv-reply",
1093                                                 append(les[:len(les)-1], LE{"Size", int64(len(reply))}),
1094                                                 func(les LEs) string {
1095                                                         return fmt.Sprintf(
1096                                                                 "SP with %s (nice %s): queuing reply (%s)",
1097                                                                 state.Node.Name, NicenessFmt(state.Nice),
1098                                                                 humanize.IBytes(uint64(len(reply))),
1099                                                         )
1100                                                 },
1101                                         )
1102                                         state.payloads <- reply
1103                                 }
1104                                 state.wg.Done()
1105                         }()
1106                         if state.rxRate > 0 {
1107                                 time.Sleep(time.Second / time.Duration(state.rxRate))
1108                         }
1109                 }
1110                 state.SetDead()
1111                 state.wg.Done()
1112                 state.SetDead()
1113                 conn.Close() // #nosec G104
1114         }()
1115
1116         return nil
1117 }
1118
1119 func (state *SPState) Wait() {
1120         state.wg.Wait()
1121         close(state.payloads)
1122         close(state.pings)
1123         state.dirUnlock()
1124         state.Duration = time.Now().Sub(state.started)
1125         state.RxSpeed = state.RxBytes
1126         state.TxSpeed = state.TxBytes
1127         rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
1128         txDuration := int64(state.TxLastSeen.Sub(state.started).Seconds())
1129         if rxDuration > 0 {
1130                 state.RxSpeed = state.RxBytes / rxDuration
1131         }
1132         if txDuration > 0 {
1133                 state.TxSpeed = state.TxBytes / txDuration
1134         }
1135 }
1136
1137 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
1138         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
1139         r := bytes.NewReader(payload)
1140         var err error
1141         var replies [][]byte
1142         var infosGot bool
1143         for r.Len() > 0 {
1144                 state.Ctx.LogD("sp-process-unmarshal", les, func(les LEs) string {
1145                         return fmt.Sprintf(
1146                                 "SP with %s (nice %s): unmarshaling header",
1147                                 state.Node.Name, NicenessFmt(state.Nice),
1148                         )
1149                 })
1150                 var head SPHead
1151                 if _, err = xdr.Unmarshal(r, &head); err != nil {
1152                         state.Ctx.LogE("sp-process-unmarshal", les, err, func(les LEs) string {
1153                                 return fmt.Sprintf(
1154                                         "SP with %s (nice %s): unmarshaling header",
1155                                         state.Node.Name, NicenessFmt(state.Nice),
1156                                 )
1157                         })
1158                         return nil, err
1159                 }
1160                 if head.Type != SPTypePing {
1161                         state.RxLastNonPing = state.RxLastSeen
1162                 }
1163                 switch head.Type {
1164                 case SPTypeHalt:
1165                         state.Ctx.LogD(
1166                                 "sp-process-halt",
1167                                 append(les, LE{"Type", "halt"}), func(les LEs) string {
1168                                         return fmt.Sprintf(
1169                                                 "SP with %s (nice %s): got HALT",
1170                                                 state.Node.Name, NicenessFmt(state.Nice),
1171                                         )
1172                                 },
1173                         )
1174                         state.Lock()
1175                         state.queueTheir = nil
1176                         state.Unlock()
1177
1178                 case SPTypePing:
1179                         state.Ctx.LogD(
1180                                 "sp-process-ping",
1181                                 append(les, LE{"Type", "ping"}),
1182                                 func(les LEs) string {
1183                                         return fmt.Sprintf(
1184                                                 "SP with %s (nice %s): got PING",
1185                                                 state.Node.Name, NicenessFmt(state.Nice),
1186                                         )
1187                                 },
1188                         )
1189
1190                 case SPTypeInfo:
1191                         infosGot = true
1192                         lesp := append(les, LE{"Type", "info"})
1193                         state.Ctx.LogD(
1194                                 "sp-process-info-unmarshal", lesp,
1195                                 func(les LEs) string {
1196                                         return fmt.Sprintf(
1197                                                 "SP with %s (nice %s): unmarshaling INFO",
1198                                                 state.Node.Name, NicenessFmt(state.Nice),
1199                                         )
1200                                 },
1201                         )
1202                         var info SPInfo
1203                         if _, err = xdr.Unmarshal(r, &info); err != nil {
1204                                 state.Ctx.LogE(
1205                                         "sp-process-info-unmarshal", lesp, err,
1206                                         func(les LEs) string {
1207                                                 return fmt.Sprintf(
1208                                                         "SP with %s (nice %s): unmarshaling INFO",
1209                                                         state.Node.Name, NicenessFmt(state.Nice),
1210                                                 )
1211                                         },
1212                                 )
1213                                 return nil, err
1214                         }
1215                         pktName := Base32Codec.EncodeToString(info.Hash[:])
1216                         lesp = append(
1217                                 lesp,
1218                                 LE{"Pkt", pktName},
1219                                 LE{"Size", int64(info.Size)},
1220                                 LE{"PktNice", int(info.Nice)},
1221                         )
1222                         logMsg := func(les LEs) string {
1223                                 return fmt.Sprintf(
1224                                         "SP with %s (nice %s): INFO %s (%s) nice %s",
1225                                         state.Node.Name, NicenessFmt(state.Nice),
1226                                         pktName,
1227                                         humanize.IBytes(info.Size),
1228                                         NicenessFmt(info.Nice),
1229                                 )
1230                         }
1231                         if !state.listOnly && info.Nice > state.Nice {
1232                                 state.Ctx.LogD("sp-process-info-too-nice", lesp, func(les LEs) string {
1233                                         return logMsg(les) + ": too nice"
1234                                 })
1235                                 continue
1236                         }
1237                         state.Ctx.LogD("sp-process-info-got", lesp, func(les LEs) string {
1238                                 return logMsg(les) + ": received"
1239                         })
1240                         if !state.listOnly && state.xxOnly == TTx {
1241                                 continue
1242                         }
1243                         state.Lock()
1244                         state.infosTheir[*info.Hash] = &info
1245                         state.Unlock()
1246                         state.Ctx.LogD("sp-process-info-stat", lesp, func(les LEs) string {
1247                                 return logMsg(les) + ": stating part"
1248                         })
1249                         pktPath := filepath.Join(
1250                                 state.Ctx.Spool,
1251                                 state.Node.Id.String(),
1252                                 string(TRx),
1253                                 Base32Codec.EncodeToString(info.Hash[:]),
1254                         )
1255                         logMsg = func(les LEs) string {
1256                                 return fmt.Sprintf(
1257                                         "Packet %s (%s) (nice %s)",
1258                                         pktName,
1259                                         humanize.IBytes(info.Size),
1260                                         NicenessFmt(info.Nice),
1261                                 )
1262                         }
1263                         if _, err = os.Stat(pktPath); err == nil {
1264                                 state.Ctx.LogI("sp-info-done", lesp, func(les LEs) string {
1265                                         return logMsg(les) + ": already done"
1266                                 })
1267                                 if !state.listOnly {
1268                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
1269                                 }
1270                                 continue
1271                         }
1272                         if _, err = os.Stat(pktPath + SeenSuffix); err == nil {
1273                                 state.Ctx.LogI("sp-info-seen", lesp, func(les LEs) string {
1274                                         return logMsg(les) + ": already seen"
1275                                 })
1276                                 if !state.listOnly {
1277                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
1278                                 }
1279                                 continue
1280                         }
1281                         if _, err = os.Stat(pktPath + NoCKSuffix); err == nil {
1282                                 state.Ctx.LogI("sp-info-nock", lesp, func(les LEs) string {
1283                                         return logMsg(les) + ": still not checksummed"
1284                                 })
1285                                 continue
1286                         }
1287                         fi, err := os.Stat(pktPath + PartSuffix)
1288                         var offset int64
1289                         if err == nil {
1290                                 offset = fi.Size()
1291                         }
1292                         if !state.Ctx.IsEnoughSpace(int64(info.Size) - offset) {
1293                                 state.Ctx.LogI("sp-info-no-space", lesp, func(les LEs) string {
1294                                         return logMsg(les) + ": not enough space"
1295                                 })
1296                                 continue
1297                         }
1298                         state.Ctx.LogI(
1299                                 "sp-info",
1300                                 append(lesp, LE{"Offset", offset}),
1301                                 func(les LEs) string {
1302                                         return fmt.Sprintf(
1303                                                 "%s: %d%%", logMsg(les), 100*uint64(offset)/info.Size,
1304                                         )
1305                                 },
1306                         )
1307                         if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) {
1308                                 replies = append(replies, MarshalSP(
1309                                         SPTypeFreq,
1310                                         SPFreq{info.Hash, uint64(offset)},
1311                                 ))
1312                         }
1313
1314                 case SPTypeFile:
1315                         lesp := append(les, LE{"Type", "file"})
1316                         state.Ctx.LogD("sp-process-file", lesp, func(les LEs) string {
1317                                 return fmt.Sprintf(
1318                                         "SP with %s (nice %s): unmarshaling FILE",
1319                                         state.Node.Name, NicenessFmt(state.Nice),
1320                                 )
1321                         })
1322                         var file SPFile
1323                         if _, err = xdr.Unmarshal(r, &file); err != nil {
1324                                 state.Ctx.LogE("sp-process-file", lesp, err, func(les LEs) string {
1325                                         return fmt.Sprintf(
1326                                                 "SP with %s (nice %s): unmarshaling FILE",
1327                                                 state.Node.Name, NicenessFmt(state.Nice),
1328                                         )
1329                                 })
1330                                 return nil, err
1331                         }
1332                         pktName := Base32Codec.EncodeToString(file.Hash[:])
1333                         lesp = append(
1334                                 lesp,
1335                                 LE{"XX", string(TRx)},
1336                                 LE{"Pkt", pktName},
1337                                 LE{"Size", int64(len(file.Payload))},
1338                         )
1339                         logMsg := func(les LEs) string {
1340                                 return fmt.Sprintf(
1341                                         "Got packet %s (%s)",
1342                                         pktName, humanize.IBytes(uint64(len(file.Payload))),
1343                                 )
1344                         }
1345                         dirToSync := filepath.Join(
1346                                 state.Ctx.Spool,
1347                                 state.Node.Id.String(),
1348                                 string(TRx),
1349                         )
1350                         filePath := filepath.Join(dirToSync, pktName)
1351                         filePathPart := filePath + PartSuffix
1352                         state.Ctx.LogD("sp-file-open", lesp, func(les LEs) string {
1353                                 return logMsg(les) + ": opening part"
1354                         })
1355                         fdAndFullSize, exists := state.fds[filePathPart]
1356                         var fd *os.File
1357                         if exists {
1358                                 fd = fdAndFullSize.fd
1359                         } else {
1360                                 fd, err = os.OpenFile(
1361                                         filePathPart,
1362                                         os.O_RDWR|os.O_CREATE,
1363                                         os.FileMode(0666),
1364                                 )
1365                                 if err != nil {
1366                                         state.Ctx.LogE("sp-file-open", lesp, err, func(les LEs) string {
1367                                                 return logMsg(les) + ": opening part"
1368                                         })
1369                                         return nil, err
1370                                 }
1371                                 state.fds[filePathPart] = FdAndFullSize{fd: fd}
1372                                 if file.Offset == 0 {
1373                                         h, err := blake2b.New256(nil)
1374                                         if err != nil {
1375                                                 panic(err)
1376                                         }
1377                                         state.fileHashers[filePath] = &HasherAndOffset{h: h}
1378                                 }
1379                         }
1380                         state.Ctx.LogD(
1381                                 "sp-file-seek",
1382                                 append(lesp, LE{"Offset", file.Offset}),
1383                                 func(les LEs) string {
1384                                         return fmt.Sprintf("%s: seeking %d", logMsg(les), file.Offset)
1385                                 })
1386                         if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
1387                                 state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
1388                                         return logMsg(les) + ": seeking"
1389                                 })
1390                                 state.closeFd(filePathPart)
1391                                 return nil, err
1392                         }
1393                         state.Ctx.LogD("sp-file-write", lesp, func(les LEs) string {
1394                                 return logMsg(les) + ": writing"
1395                         })
1396                         if _, err = fd.Write(file.Payload); err != nil {
1397                                 state.Ctx.LogE("sp-file-write", lesp, err, func(les LEs) string {
1398                                         return logMsg(les) + ": writing"
1399                                 })
1400                                 state.closeFd(filePathPart)
1401                                 return nil, err
1402                         }
1403                         hasherAndOffset, hasherExists := state.fileHashers[filePath]
1404                         if hasherExists {
1405                                 if hasherAndOffset.offset == file.Offset {
1406                                         if _, err = hasherAndOffset.h.Write(file.Payload); err != nil {
1407                                                 panic(err)
1408                                         }
1409                                         hasherAndOffset.offset += uint64(len(file.Payload))
1410                                 } else {
1411                                         state.Ctx.LogD(
1412                                                 "sp-file-offset-differs", lesp,
1413                                                 func(les LEs) string {
1414                                                         return logMsg(les) + ": offset differs, deleting hasher"
1415                                                 },
1416                                         )
1417                                         delete(state.fileHashers, filePath)
1418                                         hasherExists = false
1419                                 }
1420                         }
1421                         ourSize := int64(file.Offset + uint64(len(file.Payload)))
1422                         lesp[len(lesp)-1].V = ourSize
1423                         fullsize := int64(0)
1424                         state.RLock()
1425                         infoTheir, ok := state.infosTheir[*file.Hash]
1426                         state.RUnlock()
1427                         if ok {
1428                                 fullsize = int64(infoTheir.Size)
1429                         }
1430                         lesp = append(lesp, LE{"FullSize", fullsize})
1431                         if state.Ctx.ShowPrgrs {
1432                                 Progress("Rx", lesp)
1433                         }
1434                         if fullsize != ourSize {
1435                                 continue
1436                         }
1437                         logMsg = func(les LEs) string {
1438                                 return fmt.Sprintf(
1439                                         "Got packet %s %d%% (%s / %s)",
1440                                         pktName, 100*ourSize/fullsize,
1441                                         humanize.IBytes(uint64(ourSize)),
1442                                         humanize.IBytes(uint64(fullsize)),
1443                                 )
1444                         }
1445                         err = fd.Sync()
1446                         if err != nil {
1447                                 state.Ctx.LogE("sp-file-sync", lesp, err, func(les LEs) string {
1448                                         return logMsg(les) + ": syncing"
1449                                 })
1450                                 state.closeFd(filePathPart)
1451                                 continue
1452                         }
1453                         if hasherExists {
1454                                 if bytes.Compare(hasherAndOffset.h.Sum(nil), file.Hash[:]) != 0 {
1455                                         state.Ctx.LogE(
1456                                                 "sp-file-bad-checksum", lesp,
1457                                                 errors.New("checksum mismatch"),
1458                                                 logMsg,
1459                                         )
1460                                         continue
1461                                 }
1462                                 if err = os.Rename(filePathPart, filePath); err != nil {
1463                                         state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string {
1464                                                 return logMsg(les) + ": renaming"
1465                                         })
1466                                         continue
1467                                 }
1468                                 if err = DirSync(dirToSync); err != nil {
1469                                         state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string {
1470                                                 return logMsg(les) + ": dirsyncing"
1471                                         })
1472                                         continue
1473                                 }
1474                                 state.Ctx.LogI("sp-file-done", lesp, func(les LEs) string {
1475                                         return logMsg(les) + ": done"
1476                                 })
1477                                 state.wg.Add(1)
1478                                 go func() {
1479                                         state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
1480                                         state.wg.Done()
1481                                 }()
1482                                 state.Lock()
1483                                 delete(state.infosTheir, *file.Hash)
1484                                 state.Unlock()
1485                                 if !state.Ctx.HdrUsage {
1486                                         state.closeFd(filePathPart)
1487                                         continue
1488                                 }
1489                                 if _, err = fd.Seek(0, io.SeekStart); err != nil {
1490                                         state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
1491                                                 return logMsg(les) + ": seeking"
1492                                         })
1493                                         state.closeFd(filePathPart)
1494                                         continue
1495                                 }
1496                                 _, pktEncRaw, err := state.Ctx.HdrRead(fd)
1497                                 state.closeFd(filePathPart)
1498                                 if err != nil {
1499                                         state.Ctx.LogE("sp-file-hdr-read", lesp, err, func(les LEs) string {
1500                                                 return logMsg(les) + ": HdrReading"
1501                                         })
1502                                         continue
1503                                 }
1504                                 state.Ctx.HdrWrite(pktEncRaw, filePath)
1505                                 continue
1506                         }
1507                         state.closeFd(filePathPart)
1508                         if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil {
1509                                 state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string {
1510                                         return logMsg(les) + ": renaming"
1511                                 })
1512                                 continue
1513                         }
1514                         if err = DirSync(dirToSync); err != nil {
1515                                 state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string {
1516                                         return logMsg(les) + ": dirsyncing"
1517                                 })
1518                                 continue
1519                         }
1520                         state.Ctx.LogI("sp-file-downloaded", lesp, func(les LEs) string {
1521                                 return logMsg(les) + ": downloaded"
1522                         })
1523                         state.Lock()
1524                         delete(state.infosTheir, *file.Hash)
1525                         state.Unlock()
1526                         if !state.NoCK {
1527                                 state.checkerQueues.appeared <- file.Hash
1528                         }
1529
1530                 case SPTypeDone:
1531                         lesp := append(les, LE{"Type", "done"})
1532                         state.Ctx.LogD("sp-process-done-unmarshal", lesp, func(les LEs) string {
1533                                 return fmt.Sprintf(
1534                                         "SP with %s (nice %s): unmarshaling DONE",
1535                                         state.Node.Name, NicenessFmt(state.Nice),
1536                                 )
1537                         })
1538                         var done SPDone
1539                         if _, err = xdr.Unmarshal(r, &done); err != nil {
1540                                 state.Ctx.LogE("sp-process-done-unmarshal", lesp, err, func(les LEs) string {
1541                                         return fmt.Sprintf(
1542                                                 "SP with %s (nice %s): unmarshaling DONE",
1543                                                 state.Node.Name, NicenessFmt(state.Nice),
1544                                         )
1545                                 })
1546                                 return nil, err
1547                         }
1548                         pktName := Base32Codec.EncodeToString(done.Hash[:])
1549                         lesp = append(lesp, LE{"Pkt", pktName}, LE{"XX", string(TTx)})
1550                         logMsg := func(les LEs) string {
1551                                 return fmt.Sprintf(
1552                                         "SP with %s (nice %s): DONE: removing %s",
1553                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1554                                 )
1555                         }
1556                         state.Ctx.LogD("sp-done", lesp, logMsg)
1557                         pth := filepath.Join(
1558                                 state.Ctx.Spool,
1559                                 state.Node.Id.String(),
1560                                 string(TTx),
1561                                 pktName,
1562                         )
1563                         if err = os.Remove(pth); err == nil {
1564                                 state.Ctx.LogI("sp-done", lesp, func(les LEs) string {
1565                                         return fmt.Sprintf("Packet %s is sent", pktName)
1566                                 })
1567                                 if state.Ctx.HdrUsage {
1568                                         os.Remove(pth + HdrSuffix)
1569                                 }
1570                         } else {
1571                                 state.Ctx.LogE("sp-done", lesp, err, logMsg)
1572                         }
1573
1574                 case SPTypeFreq:
1575                         lesp := append(les, LE{"Type", "freq"})
1576                         state.Ctx.LogD("sp-process-freq", lesp, func(les LEs) string {
1577                                 return fmt.Sprintf(
1578                                         "SP with %s (nice %s): unmarshaling FREQ",
1579                                         state.Node.Name, NicenessFmt(state.Nice),
1580                                 )
1581                         })
1582                         var freq SPFreq
1583                         if _, err = xdr.Unmarshal(r, &freq); err != nil {
1584                                 state.Ctx.LogE("sp-process-freq", lesp, err, func(les LEs) string {
1585                                         return fmt.Sprintf(
1586                                                 "SP with %s (nice %s): unmarshaling FREQ",
1587                                                 state.Node.Name, NicenessFmt(state.Nice),
1588                                         )
1589                                 })
1590                                 return nil, err
1591                         }
1592                         pktName := Base32Codec.EncodeToString(freq.Hash[:])
1593                         lesp = append(lesp, LE{"Pkt", pktName}, LE{"Offset", freq.Offset})
1594                         state.Ctx.LogD("sp-process-freq-queueing", lesp, func(les LEs) string {
1595                                 return fmt.Sprintf(
1596                                         "SP with %s (nice %s): FREQ %s: queuing",
1597                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1598                                 )
1599                         })
1600                         nice, exists := state.infosOurSeen[*freq.Hash]
1601                         if exists {
1602                                 if state.onlyPkts == nil || !state.onlyPkts[*freq.Hash] {
1603                                         state.Lock()
1604                                         insertIdx := 0
1605                                         var freqWithNice *FreqWithNice
1606                                         for insertIdx, freqWithNice = range state.queueTheir {
1607                                                 if freqWithNice.nice > nice {
1608                                                         break
1609                                                 }
1610                                         }
1611                                         state.queueTheir = append(state.queueTheir, nil)
1612                                         copy(state.queueTheir[insertIdx+1:], state.queueTheir[insertIdx:])
1613                                         state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice}
1614                                         state.Unlock()
1615                                 } else {
1616                                         state.Ctx.LogD("sp-process-freq-skip", lesp, func(les LEs) string {
1617                                                 return fmt.Sprintf(
1618                                                         "SP with %s (nice %s): FREQ %s: skipping",
1619                                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1620                                                 )
1621                                         })
1622                                 }
1623                         } else {
1624                                 state.Ctx.LogD("sp-process-freq-unknown", lesp, func(les LEs) string {
1625                                         return fmt.Sprintf(
1626                                                 "SP with %s (nice %s): FREQ %s: unknown",
1627                                                 state.Node.Name, NicenessFmt(state.Nice), pktName,
1628                                         )
1629                                 })
1630                         }
1631
1632                 default:
1633                         state.Ctx.LogE(
1634                                 "sp-process-type-unknown",
1635                                 append(les, LE{"Type", head.Type}),
1636                                 errors.New("unknown type"),
1637                                 func(les LEs) string {
1638                                         return fmt.Sprintf(
1639                                                 "SP with %s (nice %s): %d",
1640                                                 state.Node.Name, NicenessFmt(state.Nice), head.Type,
1641                                         )
1642                                 },
1643                         )
1644                         return nil, BadPktType
1645                 }
1646         }
1647
1648         if infosGot {
1649                 var pkts int
1650                 var size uint64
1651                 state.RLock()
1652                 for _, info := range state.infosTheir {
1653                         pkts++
1654                         size += info.Size
1655                 }
1656                 state.RUnlock()
1657                 state.Ctx.LogI("sp-infos-rx", LEs{
1658                         {"XX", string(TRx)},
1659                         {"Node", state.Node.Id},
1660                         {"Pkts", pkts},
1661                         {"Size", int64(size)},
1662                 }, func(les LEs) string {
1663                         return fmt.Sprintf(
1664                                 "%s has got for us: %d packets, %s",
1665                                 state.Node.Name, pkts, humanize.IBytes(size),
1666                         )
1667                 })
1668         }
1669         return payloadsSplit(replies), nil
1670 }