]> Cypherpunks.ru repositories - nncp.git/blob - src/sp.go
Kill progress bars when call is finished
[nncp.git] / src / sp.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bytes"
22         "crypto/subtle"
23         "errors"
24         "fmt"
25         "hash"
26         "io"
27         "os"
28         "path/filepath"
29         "sort"
30         "sync"
31         "time"
32
33         xdr "github.com/davecgh/go-xdr/xdr2"
34         "github.com/dustin/go-humanize"
35         "github.com/flynn/noise"
36         "golang.org/x/crypto/blake2b"
37 )
38
39 const (
40         MaxSPSize      = 1<<16 - 256
41         PartSuffix     = ".part"
42         SPHeadOverhead = 4
43 )
44
45 type SPCheckerQueues struct {
46         appeared chan *[32]byte
47         checked  chan *[32]byte
48 }
49
50 var (
51         MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1}
52
53         SPInfoOverhead    int
54         SPFreqOverhead    int
55         SPFileOverhead    int
56         SPHaltMarshalized []byte
57         SPPingMarshalized []byte
58
59         NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite(
60                 noise.DH25519,
61                 noise.CipherChaChaPoly,
62                 noise.HashBLAKE2b,
63         )
64
65         DefaultDeadline = 10 * time.Second
66         PingTimeout     = time.Minute
67
68         spCheckers = make(map[NodeId]*SPCheckerQueues)
69 )
70
71 type FdAndFullSize struct {
72         fd       *os.File
73         fullSize int64
74 }
75
76 type HasherAndOffset struct {
77         h      hash.Hash
78         offset uint64
79 }
80
81 type SPType uint8
82
83 const (
84         SPTypeInfo SPType = iota
85         SPTypeFreq SPType = iota
86         SPTypeFile SPType = iota
87         SPTypeDone SPType = iota
88         SPTypeHalt SPType = iota
89         SPTypePing SPType = iota
90 )
91
92 type SPHead struct {
93         Type SPType
94 }
95
96 type SPInfo struct {
97         Nice uint8
98         Size uint64
99         Hash *[32]byte
100 }
101
102 type SPFreq struct {
103         Hash   *[32]byte
104         Offset uint64
105 }
106
107 type SPFile struct {
108         Hash    *[32]byte
109         Offset  uint64
110         Payload []byte
111 }
112
113 type SPDone struct {
114         Hash *[32]byte
115 }
116
117 type SPRaw struct {
118         Magic   [8]byte
119         Payload []byte
120 }
121
122 type FreqWithNice struct {
123         freq *SPFreq
124         nice uint8
125 }
126
127 type ConnDeadlined interface {
128         io.ReadWriteCloser
129         SetReadDeadline(t time.Time) error
130         SetWriteDeadline(t time.Time) error
131 }
132
133 func init() {
134         var buf bytes.Buffer
135         spHead := SPHead{Type: SPTypeHalt}
136         if _, err := xdr.Marshal(&buf, spHead); err != nil {
137                 panic(err)
138         }
139         SPHaltMarshalized = make([]byte, SPHeadOverhead)
140         copy(SPHaltMarshalized, buf.Bytes())
141         buf.Reset()
142
143         spHead = SPHead{Type: SPTypePing}
144         if _, err := xdr.Marshal(&buf, spHead); err != nil {
145                 panic(err)
146         }
147         SPPingMarshalized = make([]byte, SPHeadOverhead)
148         copy(SPPingMarshalized, buf.Bytes())
149         buf.Reset()
150
151         spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([32]byte)}
152         if _, err := xdr.Marshal(&buf, spInfo); err != nil {
153                 panic(err)
154         }
155         SPInfoOverhead = buf.Len()
156         buf.Reset()
157
158         spFreq := SPFreq{Hash: new([32]byte), Offset: 123}
159         if _, err := xdr.Marshal(&buf, spFreq); err != nil {
160                 panic(err)
161         }
162         SPFreqOverhead = buf.Len()
163         buf.Reset()
164
165         spFile := SPFile{Hash: new([32]byte), Offset: 123}
166         if _, err := xdr.Marshal(&buf, spFile); err != nil {
167                 panic(err)
168         }
169         SPFileOverhead = buf.Len()
170 }
171
172 func MarshalSP(typ SPType, sp interface{}) []byte {
173         var buf bytes.Buffer
174         if _, err := xdr.Marshal(&buf, SPHead{typ}); err != nil {
175                 panic(err)
176         }
177         if _, err := xdr.Marshal(&buf, sp); err != nil {
178                 panic(err)
179         }
180         return buf.Bytes()
181 }
182
183 func payloadsSplit(payloads [][]byte) [][]byte {
184         var outbounds [][]byte
185         outbound := make([]byte, 0, MaxSPSize)
186         for i, payload := range payloads {
187                 outbound = append(outbound, payload...)
188                 if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxSPSize {
189                         outbounds = append(outbounds, outbound)
190                         outbound = make([]byte, 0, MaxSPSize)
191                 }
192         }
193         if len(outbound) > 0 {
194                 outbounds = append(outbounds, outbound)
195         }
196         return outbounds
197 }
198
199 type SPState struct {
200         Ctx            *Ctx
201         Node           *Node
202         Nice           uint8
203         NoCK           bool
204         onlineDeadline time.Duration
205         maxOnlineTime  time.Duration
206         hs             *noise.HandshakeState
207         csOur          *noise.CipherState
208         csTheir        *noise.CipherState
209         payloads       chan []byte
210         pings          chan struct{}
211         infosTheir     map[[32]byte]*SPInfo
212         infosOurSeen   map[[32]byte]uint8
213         queueTheir     []*FreqWithNice
214         wg             sync.WaitGroup
215         RxBytes        int64
216         RxLastSeen     time.Time
217         RxLastNonPing  time.Time
218         TxBytes        int64
219         TxLastSeen     time.Time
220         TxLastNonPing  time.Time
221         started        time.Time
222         mustFinishAt   time.Time
223         Duration       time.Duration
224         RxSpeed        int64
225         TxSpeed        int64
226         rxLock         *os.File
227         txLock         *os.File
228         xxOnly         TRxTx
229         rxRate         int
230         txRate         int
231         isDead         chan struct{}
232         listOnly       bool
233         onlyPkts       map[[32]byte]bool
234         writeSPBuf     bytes.Buffer
235         fds            map[string]FdAndFullSize
236         fdsLock        sync.RWMutex
237         fileHashers    map[string]*HasherAndOffset
238         checkerQueues  SPCheckerQueues
239         progressBars   map[string]struct{}
240         sync.RWMutex
241 }
242
243 func (state *SPState) SetDead() {
244         state.Lock()
245         defer state.Unlock()
246         select {
247         case <-state.isDead:
248                 // Already closed channel, dead
249                 return
250         default:
251         }
252         close(state.isDead)
253         go func() {
254                 for range state.payloads {
255                 }
256         }()
257         go func() {
258                 for range state.pings {
259                 }
260         }()
261         go func() {
262                 for _, s := range state.fds {
263                         s.fd.Close()
264                 }
265         }()
266 }
267
268 func (state *SPState) NotAlive() bool {
269         select {
270         case <-state.isDead:
271                 return true
272         default:
273         }
274         return false
275 }
276
277 func (state *SPState) dirUnlock() {
278         state.Ctx.UnlockDir(state.rxLock)
279         state.Ctx.UnlockDir(state.txLock)
280 }
281
282 func SPChecker(ctx *Ctx, nodeId *NodeId, appeared, checked chan *[32]byte) {
283         for hshValue := range appeared {
284                 pktName := Base32Codec.EncodeToString(hshValue[:])
285                 les := LEs{
286                         {"XX", string(TRx)},
287                         {"Node", nodeId},
288                         {"Pkt", pktName},
289                 }
290                 ctx.LogD("sp-checker", les, func(les LEs) string {
291                         return fmt.Sprintf("Checksumming %s/rx/%s", ctx.NodeName(nodeId), pktName)
292                 })
293                 size, err := ctx.CheckNoCK(nodeId, hshValue)
294                 les = append(les, LE{"Size", size})
295                 if err != nil {
296                         ctx.LogE("sp-checker", les, err, func(les LEs) string {
297                                 return fmt.Sprintf(
298                                         "Checksumming %s/rx/%s (%s)", ctx.NodeName(nodeId), pktName,
299                                         humanize.IBytes(uint64(size)),
300                                 )
301                         })
302                         continue
303                 }
304                 ctx.LogI("sp-checker-done", les, func(les LEs) string {
305                         return fmt.Sprintf(
306                                 "Packet %s is retreived (%s)",
307                                 pktName, humanize.IBytes(uint64(size)),
308                         )
309                 })
310                 go func(hsh *[32]byte) { checked <- hsh }(hshValue)
311         }
312 }
313
314 func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error {
315         state.writeSPBuf.Reset()
316         n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{
317                 Magic:   MagicNNCPLv1,
318                 Payload: payload,
319         })
320         if err != nil {
321                 return err
322         }
323         if n, err = dst.Write(state.writeSPBuf.Bytes()); err == nil {
324                 state.TxLastSeen = time.Now()
325                 state.TxBytes += int64(n)
326                 if !ping {
327                         state.TxLastNonPing = state.TxLastSeen
328                 }
329         }
330         return err
331 }
332
333 func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
334         var sp SPRaw
335         n, err := xdr.UnmarshalLimited(src, &sp, 1<<17)
336         if err != nil {
337                 ue := err.(*xdr.UnmarshalError)
338                 if ue.Err == io.EOF {
339                         return nil, ue.Err
340                 }
341                 return nil, err
342         }
343         state.RxLastSeen = time.Now()
344         state.RxBytes += int64(n)
345         if sp.Magic != MagicNNCPLv1 {
346                 return nil, BadMagic
347         }
348         return sp.Payload, nil
349 }
350
351 func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [][]byte {
352         var infos []*SPInfo
353         var totalSize int64
354         for job := range ctx.Jobs(nodeId, TTx) {
355                 if job.PktEnc.Nice > nice {
356                         continue
357                 }
358                 if _, known := (*seen)[*job.HshValue]; known {
359                         continue
360                 }
361                 totalSize += job.Size
362                 infos = append(infos, &SPInfo{
363                         Nice: job.PktEnc.Nice,
364                         Size: uint64(job.Size),
365                         Hash: job.HshValue,
366                 })
367                 (*seen)[*job.HshValue] = job.PktEnc.Nice
368         }
369         sort.Sort(ByNice(infos))
370         var payloads [][]byte
371         for _, info := range infos {
372                 payloads = append(payloads, MarshalSP(SPTypeInfo, info))
373                 pktName := Base32Codec.EncodeToString(info.Hash[:])
374                 ctx.LogD("sp-info-our", LEs{
375                         {"Node", nodeId},
376                         {"Name", pktName},
377                         {"Size", info.Size},
378                 }, func(les LEs) string {
379                         return fmt.Sprintf(
380                                 "Our info: %s/tx/%s (%s)",
381                                 ctx.NodeName(nodeId),
382                                 pktName,
383                                 humanize.IBytes(info.Size),
384                         )
385                 })
386         }
387         if totalSize > 0 {
388                 ctx.LogI("sp-infos-tx", LEs{
389                         {"XX", string(TTx)},
390                         {"Node", nodeId},
391                         {"Pkts", len(payloads)},
392                         {"Size", totalSize},
393                 }, func(les LEs) string {
394                         return fmt.Sprintf(
395                                 "We have got for %s: %d packets, %s",
396                                 ctx.NodeName(nodeId),
397                                 len(payloads),
398                                 humanize.IBytes(uint64(totalSize)),
399                         )
400                 })
401         }
402         return payloadsSplit(payloads)
403 }
404
405 func (state *SPState) StartI(conn ConnDeadlined) error {
406         nodeId := state.Node.Id
407         err := state.Ctx.ensureRxDir(nodeId)
408         if err != nil {
409                 return err
410         }
411         var rxLock *os.File
412         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TRx) {
413                 rxLock, err = state.Ctx.LockDir(nodeId, string(TRx))
414                 if err != nil {
415                         return err
416                 }
417         }
418         var txLock *os.File
419         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
420                 txLock, err = state.Ctx.LockDir(nodeId, string(TTx))
421                 if err != nil {
422                         return err
423                 }
424         }
425         started := time.Now()
426         conf := noise.Config{
427                 CipherSuite: NoiseCipherSuite,
428                 Pattern:     noise.HandshakeIK,
429                 Initiator:   true,
430                 StaticKeypair: noise.DHKey{
431                         Private: state.Ctx.Self.NoisePrv[:],
432                         Public:  state.Ctx.Self.NoisePub[:],
433                 },
434                 PeerStatic: state.Node.NoisePub[:],
435         }
436         hs, err := noise.NewHandshakeState(conf)
437         if err != nil {
438                 return err
439         }
440         state.hs = hs
441         state.payloads = make(chan []byte)
442         state.pings = make(chan struct{})
443         state.infosTheir = make(map[[32]byte]*SPInfo)
444         state.infosOurSeen = make(map[[32]byte]uint8)
445         state.progressBars = make(map[string]struct{})
446         state.started = started
447         state.rxLock = rxLock
448         state.txLock = txLock
449
450         var infosPayloads [][]byte
451         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
452                 infosPayloads = state.Ctx.infosOur(nodeId, state.Nice, &state.infosOurSeen)
453         }
454         var firstPayload []byte
455         if len(infosPayloads) > 0 {
456                 firstPayload = infosPayloads[0]
457         }
458         // Pad first payload, to hide actual number of existing files
459         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
460                 firstPayload = append(firstPayload, SPHaltMarshalized...)
461         }
462
463         var buf []byte
464         var payload []byte
465         buf, _, _, err = state.hs.WriteMessage(nil, firstPayload)
466         if err != nil {
467                 state.dirUnlock()
468                 return err
469         }
470         les := LEs{{"Node", nodeId}, {"Nice", int(state.Nice)}}
471         state.Ctx.LogD("sp-startI", les, func(les LEs) string {
472                 return fmt.Sprintf(
473                         "SP with %s (nice %s): sending first message",
474                         state.Node.Name,
475                         NicenessFmt(state.Nice),
476                 )
477         })
478         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
479         if err = state.WriteSP(conn, buf, false); err != nil {
480                 state.Ctx.LogE("sp-startI", les, err, func(les LEs) string {
481                         return fmt.Sprintf(
482                                 "SP with %s (nice %s): writing",
483                                 state.Node.Name,
484                                 NicenessFmt(state.Nice),
485                         )
486                 })
487                 state.dirUnlock()
488                 return err
489         }
490         state.Ctx.LogD("sp-startI-wait", les, func(les LEs) string {
491                 return fmt.Sprintf(
492                         "SP with %s (nice %s): waiting for first message",
493                         state.Node.Name,
494                         NicenessFmt(state.Nice),
495                 )
496         })
497         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
498         if buf, err = state.ReadSP(conn); err != nil {
499                 state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string {
500                         return fmt.Sprintf(
501                                 "SP with %s (nice %s): reading",
502                                 state.Node.Name,
503                                 NicenessFmt(state.Nice),
504                         )
505                 })
506                 state.dirUnlock()
507                 return err
508         }
509         payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
510         if err != nil {
511                 state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string {
512                         return fmt.Sprintf(
513                                 "SP with %s (nice %s): reading Noise message",
514                                 state.Node.Name,
515                                 NicenessFmt(state.Nice),
516                         )
517                 })
518                 state.dirUnlock()
519                 return err
520         }
521         state.Ctx.LogD("sp-startI-workers", les, func(les LEs) string {
522                 return fmt.Sprintf(
523                         "SP with %s (nice %s): starting workers",
524                         state.Node.Name,
525                         NicenessFmt(state.Nice),
526                 )
527         })
528         err = state.StartWorkers(conn, infosPayloads, payload)
529         if err != nil {
530                 state.Ctx.LogE("sp-startI-workers", les, err, func(les LEs) string {
531                         return fmt.Sprintf(
532                                 "SP with %s (nice %s): starting workers",
533                                 state.Node.Name,
534                                 NicenessFmt(state.Nice),
535                         )
536                 })
537                 state.dirUnlock()
538         }
539         return err
540 }
541
542 func (state *SPState) StartR(conn ConnDeadlined) error {
543         started := time.Now()
544         conf := noise.Config{
545                 CipherSuite: NoiseCipherSuite,
546                 Pattern:     noise.HandshakeIK,
547                 Initiator:   false,
548                 StaticKeypair: noise.DHKey{
549                         Private: state.Ctx.Self.NoisePrv[:],
550                         Public:  state.Ctx.Self.NoisePub[:],
551                 },
552         }
553         hs, err := noise.NewHandshakeState(conf)
554         if err != nil {
555                 return err
556         }
557         xxOnly := TRxTx("")
558         state.hs = hs
559         state.payloads = make(chan []byte)
560         state.pings = make(chan struct{})
561         state.infosOurSeen = make(map[[32]byte]uint8)
562         state.infosTheir = make(map[[32]byte]*SPInfo)
563         state.progressBars = make(map[string]struct{})
564         state.started = started
565         state.xxOnly = xxOnly
566
567         var buf []byte
568         var payload []byte
569         logMsg := func(les LEs) string {
570                 return fmt.Sprintf(
571                         "SP nice %s: waiting for first message",
572                         NicenessFmt(state.Nice),
573                 )
574         }
575         les := LEs{{"Nice", int(state.Nice)}}
576         state.Ctx.LogD("sp-startR", les, logMsg)
577         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
578         if buf, err = state.ReadSP(conn); err != nil {
579                 state.Ctx.LogE("sp-startR-read", les, err, logMsg)
580                 return err
581         }
582         if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
583                 state.Ctx.LogE("sp-startR-read", les, err, logMsg)
584                 return err
585         }
586
587         var node *Node
588         for _, n := range state.Ctx.Neigh {
589                 if subtle.ConstantTimeCompare(state.hs.PeerStatic(), n.NoisePub[:]) == 1 {
590                         node = n
591                         break
592                 }
593         }
594         if node == nil {
595                 peerId := Base32Codec.EncodeToString(state.hs.PeerStatic())
596                 err = errors.New("unknown peer: " + peerId)
597                 state.Ctx.LogE("sp-startR-unknown", append(les, LE{"Peer", peerId}), err, logMsg)
598                 return err
599         }
600         state.Node = node
601         state.rxRate = node.RxRate
602         state.txRate = node.TxRate
603         state.onlineDeadline = node.OnlineDeadline
604         state.maxOnlineTime = node.MaxOnlineTime
605         les = LEs{{"Node", node.Id}, {"Nice", int(state.Nice)}}
606
607         if err = state.Ctx.ensureRxDir(node.Id); err != nil {
608                 return err
609         }
610         var rxLock *os.File
611         if xxOnly == "" || xxOnly == TRx {
612                 rxLock, err = state.Ctx.LockDir(node.Id, string(TRx))
613                 if err != nil {
614                         return err
615                 }
616         }
617         state.rxLock = rxLock
618         var txLock *os.File
619         if xxOnly == "" || xxOnly == TTx {
620                 txLock, err = state.Ctx.LockDir(node.Id, string(TTx))
621                 if err != nil {
622                         return err
623                 }
624         }
625         state.txLock = txLock
626
627         var infosPayloads [][]byte
628         if xxOnly == "" || xxOnly == TTx {
629                 infosPayloads = state.Ctx.infosOur(node.Id, state.Nice, &state.infosOurSeen)
630         }
631         var firstPayload []byte
632         if len(infosPayloads) > 0 {
633                 firstPayload = infosPayloads[0]
634         }
635         // Pad first payload, to hide actual number of existing files
636         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
637                 firstPayload = append(firstPayload, SPHaltMarshalized...)
638         }
639
640         state.Ctx.LogD("sp-startR-write", les, func(les LEs) string {
641                 return fmt.Sprintf(
642                         "SP with %s (nice %s): sending first message",
643                         node.Name, NicenessFmt(state.Nice),
644                 )
645         })
646         buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload)
647         if err != nil {
648                 state.dirUnlock()
649                 return err
650         }
651         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
652         if err = state.WriteSP(conn, buf, false); err != nil {
653                 state.Ctx.LogE("sp-startR-write", les, err, func(les LEs) string {
654                         return fmt.Sprintf(
655                                 "SP with %s (nice %s): writing",
656                                 node.Name, NicenessFmt(state.Nice),
657                         )
658                 })
659                 state.dirUnlock()
660                 return err
661         }
662         state.Ctx.LogD("sp-startR-workers", les, func(les LEs) string {
663                 return fmt.Sprintf(
664                         "SP with %s (nice %s): starting workers",
665                         node.Name, NicenessFmt(state.Nice),
666                 )
667         })
668         err = state.StartWorkers(conn, infosPayloads, payload)
669         if err != nil {
670                 state.dirUnlock()
671         }
672         return err
673 }
674
675 func (state *SPState) closeFd(pth string) {
676         state.fdsLock.Lock()
677         if s, exists := state.fds[pth]; exists {
678                 delete(state.fds, pth)
679                 s.fd.Close()
680         }
681         state.fdsLock.Unlock()
682 }
683
684 func (state *SPState) StartWorkers(
685         conn ConnDeadlined,
686         infosPayloads [][]byte,
687         payload []byte,
688 ) error {
689         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
690         state.fds = make(map[string]FdAndFullSize)
691         state.fileHashers = make(map[string]*HasherAndOffset)
692         state.isDead = make(chan struct{})
693         if state.maxOnlineTime > 0 {
694                 state.mustFinishAt = state.started.Add(state.maxOnlineTime)
695         }
696
697         // Checker
698         if !state.NoCK {
699                 queues := spCheckers[*state.Node.Id]
700                 if queues == nil {
701                         queues = &SPCheckerQueues{
702                                 appeared: make(chan *[32]byte),
703                                 checked:  make(chan *[32]byte),
704                         }
705                         spCheckers[*state.Node.Id] = queues
706                         go SPChecker(state.Ctx, state.Node.Id, queues.appeared, queues.checked)
707                 }
708                 state.checkerQueues = *queues
709                 go func() {
710                         for job := range state.Ctx.JobsNoCK(state.Node.Id) {
711                                 if job.PktEnc.Nice <= state.Nice {
712                                         state.checkerQueues.appeared <- job.HshValue
713                                 }
714                         }
715                 }()
716                 state.wg.Add(1)
717                 go func() {
718                         defer state.wg.Done()
719                         for {
720                                 select {
721                                 case <-state.isDead:
722                                         return
723                                 case hsh := <-state.checkerQueues.checked:
724                                         state.payloads <- MarshalSP(SPTypeDone, SPDone{hsh})
725                                 }
726                         }
727                 }()
728         }
729
730         // Remaining handshake payload sending
731         if len(infosPayloads) > 1 {
732                 state.wg.Add(1)
733                 go func() {
734                         for _, payload := range infosPayloads[1:] {
735                                 state.Ctx.LogD(
736                                         "sp-queue-remaining",
737                                         append(les, LE{"Size", int64(len(payload))}),
738                                         func(les LEs) string {
739                                                 return fmt.Sprintf(
740                                                         "SP with %s (nice %s): queuing remaining payload (%s)",
741                                                         state.Node.Name, NicenessFmt(state.Nice),
742                                                         humanize.IBytes(uint64(len(payload))),
743                                                 )
744                                         },
745                                 )
746                                 state.payloads <- payload
747                         }
748                         state.wg.Done()
749                 }()
750         }
751
752         // Processing of first payload and queueing its responses
753         logMsg := func(les LEs) string {
754                 return fmt.Sprintf(
755                         "SP with %s (nice %s): processing first payload (%s)",
756                         state.Node.Name, NicenessFmt(state.Nice),
757                         humanize.IBytes(uint64(len(payload))),
758                 )
759         }
760         state.Ctx.LogD("sp-process", append(les, LE{"Size", int64(len(payload))}), logMsg)
761         replies, err := state.ProcessSP(payload)
762         if err != nil {
763                 state.Ctx.LogE("sp-process", les, err, logMsg)
764                 return err
765         }
766         state.wg.Add(1)
767         go func() {
768                 for _, reply := range replies {
769                         state.Ctx.LogD(
770                                 "sp-queue-reply",
771                                 append(les, LE{"Size", int64(len(reply))}),
772                                 func(les LEs) string {
773                                         return fmt.Sprintf(
774                                                 "SP with %s (nice %s): queuing reply (%s)",
775                                                 state.Node.Name, NicenessFmt(state.Nice),
776                                                 humanize.IBytes(uint64(len(payload))),
777                                         )
778                                 },
779                         )
780                         state.payloads <- reply
781                 }
782                 state.wg.Done()
783         }()
784
785         // Periodic jobs
786         state.wg.Add(1)
787         go func() {
788                 deadlineTicker := time.NewTicker(time.Second)
789                 pingTicker := time.NewTicker(PingTimeout)
790                 for {
791                         select {
792                         case <-state.isDead:
793                                 state.wg.Done()
794                                 deadlineTicker.Stop()
795                                 pingTicker.Stop()
796                                 return
797                         case now := <-deadlineTicker.C:
798                                 if (now.Sub(state.RxLastNonPing) >= state.onlineDeadline &&
799                                         now.Sub(state.TxLastNonPing) >= state.onlineDeadline) ||
800                                         (state.maxOnlineTime > 0 && state.mustFinishAt.Before(now)) ||
801                                         (now.Sub(state.RxLastSeen) >= 2*PingTimeout) {
802                                         state.SetDead()
803                                         conn.Close() // #nosec G104
804                                 }
805                         case now := <-pingTicker.C:
806                                 if now.After(state.TxLastSeen.Add(PingTimeout)) {
807                                         state.wg.Add(1)
808                                         go func() {
809                                                 state.pings <- struct{}{}
810                                                 state.wg.Done()
811                                         }()
812                                 }
813                         }
814                 }
815         }()
816
817         // Spool checker and INFOs sender of appearing files
818         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
819                 state.wg.Add(1)
820                 go func() {
821                         ticker := time.NewTicker(time.Second)
822                         for {
823                                 select {
824                                 case <-state.isDead:
825                                         state.wg.Done()
826                                         ticker.Stop()
827                                         return
828                                 case <-ticker.C:
829                                         for _, payload := range state.Ctx.infosOur(
830                                                 state.Node.Id,
831                                                 state.Nice,
832                                                 &state.infosOurSeen,
833                                         ) {
834                                                 state.Ctx.LogD(
835                                                         "sp-queue-info",
836                                                         append(les, LE{"Size", int64(len(payload))}),
837                                                         func(les LEs) string {
838                                                                 return fmt.Sprintf(
839                                                                         "SP with %s (nice %s): queuing new info (%s)",
840                                                                         state.Node.Name, NicenessFmt(state.Nice),
841                                                                         humanize.IBytes(uint64(len(payload))),
842                                                                 )
843                                                         },
844                                                 )
845                                                 state.payloads <- payload
846                                         }
847                                 }
848                         }
849                 }()
850         }
851
852         // Sender
853         state.wg.Add(1)
854         go func() {
855                 defer conn.Close()
856                 defer state.SetDead()
857                 defer state.wg.Done()
858                 for {
859                         if state.NotAlive() {
860                                 return
861                         }
862                         var payload []byte
863                         var ping bool
864                         select {
865                         case <-state.pings:
866                                 state.Ctx.LogD("sp-got-ping", les, func(les LEs) string {
867                                         return fmt.Sprintf(
868                                                 "SP with %s (nice %s): got ping",
869                                                 state.Node.Name, NicenessFmt(state.Nice),
870                                         )
871                                 })
872                                 payload = SPPingMarshalized
873                                 ping = true
874                         case payload = <-state.payloads:
875                                 state.Ctx.LogD(
876                                         "sp-got-payload",
877                                         append(les, LE{"Size", int64(len(payload))}),
878                                         func(les LEs) string {
879                                                 return fmt.Sprintf(
880                                                         "SP with %s (nice %s): got payload (%s)",
881                                                         state.Node.Name, NicenessFmt(state.Nice),
882                                                         humanize.IBytes(uint64(len(payload))),
883                                                 )
884                                         },
885                                 )
886                         default:
887                                 state.RLock()
888                                 if len(state.queueTheir) == 0 {
889                                         state.RUnlock()
890                                         time.Sleep(100 * time.Millisecond)
891                                         continue
892                                 }
893                                 freq := state.queueTheir[0].freq
894                                 state.RUnlock()
895                                 if state.txRate > 0 {
896                                         time.Sleep(time.Second / time.Duration(state.txRate))
897                                 }
898                                 pktName := Base32Codec.EncodeToString(freq.Hash[:])
899                                 lesp := append(
900                                         les,
901                                         LE{"XX", string(TTx)},
902                                         LE{"Pkt", pktName},
903                                         LE{"Size", int64(freq.Offset)},
904                                 )
905                                 logMsg := func(les LEs) string {
906                                         return fmt.Sprintf(
907                                                 "SP with %s (nice %s): tx/%s (%s)",
908                                                 state.Node.Name, NicenessFmt(state.Nice),
909                                                 pktName,
910                                                 humanize.IBytes(freq.Offset),
911                                         )
912                                 }
913                                 state.Ctx.LogD("sp-queue", lesp, func(les LEs) string {
914                                         return logMsg(les) + ": queueing"
915                                 })
916                                 pth := filepath.Join(
917                                         state.Ctx.Spool,
918                                         state.Node.Id.String(),
919                                         string(TTx),
920                                         Base32Codec.EncodeToString(freq.Hash[:]),
921                                 )
922                                 state.fdsLock.RLock()
923                                 fdAndFullSize, exists := state.fds[pth]
924                                 state.fdsLock.RUnlock()
925                                 if !exists {
926                                         fd, err := os.Open(pth)
927                                         if err != nil {
928                                                 state.Ctx.LogE("sp-queue-open", lesp, err, func(les LEs) string {
929                                                         return logMsg(les) + ": opening"
930                                                 })
931                                                 return
932                                         }
933                                         fi, err := fd.Stat()
934                                         if err != nil {
935                                                 state.Ctx.LogE("sp-queue-stat", lesp, err, func(les LEs) string {
936                                                         return logMsg(les) + ": stating"
937                                                 })
938                                                 return
939                                         }
940                                         fdAndFullSize = FdAndFullSize{fd: fd, fullSize: fi.Size()}
941                                         state.fdsLock.Lock()
942                                         state.fds[pth] = fdAndFullSize
943                                         state.fdsLock.Unlock()
944                                 }
945                                 fd := fdAndFullSize.fd
946                                 fullSize := fdAndFullSize.fullSize
947                                 var buf []byte
948                                 if freq.Offset < uint64(fullSize) {
949                                         state.Ctx.LogD("sp-file-seek", lesp, func(les LEs) string {
950                                                 return logMsg(les) + ": seeking"
951                                         })
952                                         if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil {
953                                                 state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
954                                                         return logMsg(les) + ": seeking"
955                                                 })
956                                                 return
957                                         }
958                                         buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
959                                         n, err := fd.Read(buf)
960                                         if err != nil {
961                                                 state.Ctx.LogE("sp-file-read", lesp, err, func(les LEs) string {
962                                                         return logMsg(les) + ": reading"
963                                                 })
964                                                 return
965                                         }
966                                         buf = buf[:n]
967                                         lesp = append(
968                                                 les,
969                                                 LE{"XX", string(TTx)},
970                                                 LE{"Pkt", pktName},
971                                                 LE{"Size", int64(n)},
972                                         )
973                                         state.Ctx.LogD("sp-file-read", lesp, func(les LEs) string {
974                                                 return fmt.Sprintf(
975                                                         "%s: read %s",
976                                                         logMsg(les), humanize.IBytes(uint64(n)),
977                                                 )
978                                         })
979                                 }
980                                 state.closeFd(pth)
981                                 payload = MarshalSP(SPTypeFile, SPFile{
982                                         Hash:    freq.Hash,
983                                         Offset:  freq.Offset,
984                                         Payload: buf,
985                                 })
986                                 ourSize := freq.Offset + uint64(len(buf))
987                                 lesp = append(
988                                         les,
989                                         LE{"XX", string(TTx)},
990                                         LE{"Pkt", pktName},
991                                         LE{"Size", int64(ourSize)},
992                                         LE{"FullSize", fullSize},
993                                 )
994                                 if state.Ctx.ShowPrgrs {
995                                         state.progressBars[pktName] = struct{}{}
996                                         Progress("Tx", lesp)
997                                 }
998                                 state.Lock()
999                                 if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash {
1000                                         if ourSize == uint64(fullSize) {
1001                                                 state.Ctx.LogD("sp-file-finished", lesp, func(les LEs) string {
1002                                                         return logMsg(les) + ": finished"
1003                                                 })
1004                                                 if len(state.queueTheir) > 1 {
1005                                                         state.queueTheir = state.queueTheir[1:]
1006                                                 } else {
1007                                                         state.queueTheir = state.queueTheir[:0]
1008                                                 }
1009                                                 if state.Ctx.ShowPrgrs {
1010                                                         delete(state.progressBars, pktName)
1011                                                 }
1012                                         } else {
1013                                                 state.queueTheir[0].freq.Offset += uint64(len(buf))
1014                                         }
1015                                 } else {
1016                                         state.Ctx.LogD("sp-file-disappeared", lesp, func(les LEs) string {
1017                                                 return logMsg(les) + ": queue disappeared"
1018                                         })
1019                                 }
1020                                 state.Unlock()
1021                         }
1022                         logMsg := func(les LEs) string {
1023                                 return fmt.Sprintf(
1024                                         "SP with %s (nice %s): sending %s",
1025                                         state.Node.Name, NicenessFmt(state.Nice),
1026                                         humanize.IBytes(uint64(len(payload))),
1027                                 )
1028                         }
1029                         state.Ctx.LogD("sp-sending", append(les, LE{"Size", int64(len(payload))}), logMsg)
1030                         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
1031                         if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload), ping); err != nil {
1032                                 state.Ctx.LogE("sp-sending", les, err, logMsg)
1033                                 return
1034                         }
1035                 }
1036         }()
1037
1038         // Receiver
1039         state.wg.Add(1)
1040         go func() {
1041                 for {
1042                         if state.NotAlive() {
1043                                 break
1044                         }
1045                         logMsg := func(les LEs) string {
1046                                 return fmt.Sprintf(
1047                                         "SP with %s (nice %s): waiting for payload",
1048                                         state.Node.Name, NicenessFmt(state.Nice),
1049                                 )
1050                         }
1051                         state.Ctx.LogD("sp-recv-wait", les, logMsg)
1052                         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
1053                         payload, err := state.ReadSP(conn)
1054                         if err != nil {
1055                                 if err == io.EOF {
1056                                         break
1057                                 }
1058                                 unmarshalErr := err.(*xdr.UnmarshalError)
1059                                 if os.IsTimeout(unmarshalErr.Err) {
1060                                         continue
1061                                 }
1062                                 if unmarshalErr.ErrorCode == xdr.ErrIO {
1063                                         break
1064                                 }
1065                                 state.Ctx.LogE("sp-recv-wait", les, err, logMsg)
1066                                 break
1067                         }
1068                         logMsg = func(les LEs) string {
1069                                 return fmt.Sprintf(
1070                                         "SP with %s (nice %s): payload (%s)",
1071                                         state.Node.Name, NicenessFmt(state.Nice),
1072                                         humanize.IBytes(uint64(len(payload))),
1073                                 )
1074                         }
1075                         state.Ctx.LogD(
1076                                 "sp-recv-got",
1077                                 append(les, LE{"Size", int64(len(payload))}),
1078                                 func(les LEs) string { return logMsg(les) + ": got" },
1079                         )
1080                         payload, err = state.csTheir.Decrypt(nil, nil, payload)
1081                         if err != nil {
1082                                 state.Ctx.LogE("sp-recv-got", les, err, func(les LEs) string {
1083                                         return logMsg(les) + ": got"
1084                                 })
1085                                 break
1086                         }
1087                         state.Ctx.LogD(
1088                                 "sp-recv-process",
1089                                 append(les, LE{"Size", int64(len(payload))}),
1090                                 func(les LEs) string {
1091                                         return logMsg(les) + ": processing"
1092                                 },
1093                         )
1094                         replies, err := state.ProcessSP(payload)
1095                         if err != nil {
1096                                 state.Ctx.LogE("sp-recv-process", les, err, func(les LEs) string {
1097                                         return logMsg(les) + ": processing"
1098                                 })
1099                                 break
1100                         }
1101                         state.wg.Add(1)
1102                         go func() {
1103                                 for _, reply := range replies {
1104                                         state.Ctx.LogD(
1105                                                 "sp-recv-reply",
1106                                                 append(les[:len(les)-1], LE{"Size", int64(len(reply))}),
1107                                                 func(les LEs) string {
1108                                                         return fmt.Sprintf(
1109                                                                 "SP with %s (nice %s): queuing reply (%s)",
1110                                                                 state.Node.Name, NicenessFmt(state.Nice),
1111                                                                 humanize.IBytes(uint64(len(reply))),
1112                                                         )
1113                                                 },
1114                                         )
1115                                         state.payloads <- reply
1116                                 }
1117                                 state.wg.Done()
1118                         }()
1119                         if state.rxRate > 0 {
1120                                 time.Sleep(time.Second / time.Duration(state.rxRate))
1121                         }
1122                 }
1123                 state.SetDead()
1124                 state.wg.Done()
1125                 state.SetDead()
1126                 conn.Close() // #nosec G104
1127         }()
1128
1129         return nil
1130 }
1131
1132 func (state *SPState) Wait() {
1133         state.wg.Wait()
1134         close(state.payloads)
1135         close(state.pings)
1136         state.dirUnlock()
1137         state.Duration = time.Now().Sub(state.started)
1138         state.RxSpeed = state.RxBytes
1139         state.TxSpeed = state.TxBytes
1140         rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
1141         txDuration := int64(state.TxLastSeen.Sub(state.started).Seconds())
1142         if rxDuration > 0 {
1143                 state.RxSpeed = state.RxBytes / rxDuration
1144         }
1145         if txDuration > 0 {
1146                 state.TxSpeed = state.TxBytes / txDuration
1147         }
1148         for pktName := range state.progressBars {
1149                 ProgressKill(pktName)
1150         }
1151 }
1152
1153 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
1154         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
1155         r := bytes.NewReader(payload)
1156         var err error
1157         var replies [][]byte
1158         var infosGot bool
1159         for r.Len() > 0 {
1160                 state.Ctx.LogD("sp-process-unmarshal", les, func(les LEs) string {
1161                         return fmt.Sprintf(
1162                                 "SP with %s (nice %s): unmarshaling header",
1163                                 state.Node.Name, NicenessFmt(state.Nice),
1164                         )
1165                 })
1166                 var head SPHead
1167                 if _, err = xdr.Unmarshal(r, &head); err != nil {
1168                         state.Ctx.LogE("sp-process-unmarshal", les, err, func(les LEs) string {
1169                                 return fmt.Sprintf(
1170                                         "SP with %s (nice %s): unmarshaling header",
1171                                         state.Node.Name, NicenessFmt(state.Nice),
1172                                 )
1173                         })
1174                         return nil, err
1175                 }
1176                 if head.Type != SPTypePing {
1177                         state.RxLastNonPing = state.RxLastSeen
1178                 }
1179                 switch head.Type {
1180                 case SPTypeHalt:
1181                         state.Ctx.LogD(
1182                                 "sp-process-halt",
1183                                 append(les, LE{"Type", "halt"}), func(les LEs) string {
1184                                         return fmt.Sprintf(
1185                                                 "SP with %s (nice %s): got HALT",
1186                                                 state.Node.Name, NicenessFmt(state.Nice),
1187                                         )
1188                                 },
1189                         )
1190                         state.Lock()
1191                         state.queueTheir = nil
1192                         state.Unlock()
1193
1194                 case SPTypePing:
1195                         state.Ctx.LogD(
1196                                 "sp-process-ping",
1197                                 append(les, LE{"Type", "ping"}),
1198                                 func(les LEs) string {
1199                                         return fmt.Sprintf(
1200                                                 "SP with %s (nice %s): got PING",
1201                                                 state.Node.Name, NicenessFmt(state.Nice),
1202                                         )
1203                                 },
1204                         )
1205
1206                 case SPTypeInfo:
1207                         infosGot = true
1208                         lesp := append(les, LE{"Type", "info"})
1209                         state.Ctx.LogD(
1210                                 "sp-process-info-unmarshal", lesp,
1211                                 func(les LEs) string {
1212                                         return fmt.Sprintf(
1213                                                 "SP with %s (nice %s): unmarshaling INFO",
1214                                                 state.Node.Name, NicenessFmt(state.Nice),
1215                                         )
1216                                 },
1217                         )
1218                         var info SPInfo
1219                         if _, err = xdr.Unmarshal(r, &info); err != nil {
1220                                 state.Ctx.LogE(
1221                                         "sp-process-info-unmarshal", lesp, err,
1222                                         func(les LEs) string {
1223                                                 return fmt.Sprintf(
1224                                                         "SP with %s (nice %s): unmarshaling INFO",
1225                                                         state.Node.Name, NicenessFmt(state.Nice),
1226                                                 )
1227                                         },
1228                                 )
1229                                 return nil, err
1230                         }
1231                         pktName := Base32Codec.EncodeToString(info.Hash[:])
1232                         lesp = append(
1233                                 lesp,
1234                                 LE{"Pkt", pktName},
1235                                 LE{"Size", int64(info.Size)},
1236                                 LE{"PktNice", int(info.Nice)},
1237                         )
1238                         logMsg := func(les LEs) string {
1239                                 return fmt.Sprintf(
1240                                         "SP with %s (nice %s): INFO %s (%s) nice %s",
1241                                         state.Node.Name, NicenessFmt(state.Nice),
1242                                         pktName,
1243                                         humanize.IBytes(info.Size),
1244                                         NicenessFmt(info.Nice),
1245                                 )
1246                         }
1247                         if !state.listOnly && info.Nice > state.Nice {
1248                                 state.Ctx.LogD("sp-process-info-too-nice", lesp, func(les LEs) string {
1249                                         return logMsg(les) + ": too nice"
1250                                 })
1251                                 continue
1252                         }
1253                         state.Ctx.LogD("sp-process-info-got", lesp, func(les LEs) string {
1254                                 return logMsg(les) + ": received"
1255                         })
1256                         if !state.listOnly && state.xxOnly == TTx {
1257                                 continue
1258                         }
1259                         state.Lock()
1260                         state.infosTheir[*info.Hash] = &info
1261                         state.Unlock()
1262                         state.Ctx.LogD("sp-process-info-stat", lesp, func(les LEs) string {
1263                                 return logMsg(les) + ": stating part"
1264                         })
1265                         pktPath := filepath.Join(
1266                                 state.Ctx.Spool,
1267                                 state.Node.Id.String(),
1268                                 string(TRx),
1269                                 Base32Codec.EncodeToString(info.Hash[:]),
1270                         )
1271                         logMsg = func(les LEs) string {
1272                                 return fmt.Sprintf(
1273                                         "Packet %s (%s) (nice %s)",
1274                                         pktName,
1275                                         humanize.IBytes(info.Size),
1276                                         NicenessFmt(info.Nice),
1277                                 )
1278                         }
1279                         if _, err = os.Stat(pktPath); err == nil {
1280                                 state.Ctx.LogI("sp-info-done", lesp, func(les LEs) string {
1281                                         return logMsg(les) + ": already done"
1282                                 })
1283                                 if !state.listOnly {
1284                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
1285                                 }
1286                                 continue
1287                         }
1288                         if _, err = os.Stat(pktPath + SeenSuffix); err == nil {
1289                                 state.Ctx.LogI("sp-info-seen", lesp, func(les LEs) string {
1290                                         return logMsg(les) + ": already seen"
1291                                 })
1292                                 if !state.listOnly {
1293                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
1294                                 }
1295                                 continue
1296                         }
1297                         if _, err = os.Stat(pktPath + NoCKSuffix); err == nil {
1298                                 state.Ctx.LogI("sp-info-nock", lesp, func(les LEs) string {
1299                                         return logMsg(les) + ": still not checksummed"
1300                                 })
1301                                 continue
1302                         }
1303                         fi, err := os.Stat(pktPath + PartSuffix)
1304                         var offset int64
1305                         if err == nil {
1306                                 offset = fi.Size()
1307                         }
1308                         if !state.Ctx.IsEnoughSpace(int64(info.Size) - offset) {
1309                                 state.Ctx.LogI("sp-info-no-space", lesp, func(les LEs) string {
1310                                         return logMsg(les) + ": not enough space"
1311                                 })
1312                                 continue
1313                         }
1314                         state.Ctx.LogI(
1315                                 "sp-info",
1316                                 append(lesp, LE{"Offset", offset}),
1317                                 func(les LEs) string {
1318                                         return fmt.Sprintf(
1319                                                 "%s: %d%%", logMsg(les), 100*uint64(offset)/info.Size,
1320                                         )
1321                                 },
1322                         )
1323                         if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) {
1324                                 replies = append(replies, MarshalSP(
1325                                         SPTypeFreq,
1326                                         SPFreq{info.Hash, uint64(offset)},
1327                                 ))
1328                         }
1329
1330                 case SPTypeFile:
1331                         lesp := append(les, LE{"Type", "file"})
1332                         state.Ctx.LogD("sp-process-file", lesp, func(les LEs) string {
1333                                 return fmt.Sprintf(
1334                                         "SP with %s (nice %s): unmarshaling FILE",
1335                                         state.Node.Name, NicenessFmt(state.Nice),
1336                                 )
1337                         })
1338                         var file SPFile
1339                         if _, err = xdr.Unmarshal(r, &file); err != nil {
1340                                 state.Ctx.LogE("sp-process-file", lesp, err, func(les LEs) string {
1341                                         return fmt.Sprintf(
1342                                                 "SP with %s (nice %s): unmarshaling FILE",
1343                                                 state.Node.Name, NicenessFmt(state.Nice),
1344                                         )
1345                                 })
1346                                 return nil, err
1347                         }
1348                         pktName := Base32Codec.EncodeToString(file.Hash[:])
1349                         lesp = append(
1350                                 lesp,
1351                                 LE{"XX", string(TRx)},
1352                                 LE{"Pkt", pktName},
1353                                 LE{"Size", int64(len(file.Payload))},
1354                         )
1355                         logMsg := func(les LEs) string {
1356                                 return fmt.Sprintf(
1357                                         "Got packet %s (%s)",
1358                                         pktName, humanize.IBytes(uint64(len(file.Payload))),
1359                                 )
1360                         }
1361                         dirToSync := filepath.Join(
1362                                 state.Ctx.Spool,
1363                                 state.Node.Id.String(),
1364                                 string(TRx),
1365                         )
1366                         filePath := filepath.Join(dirToSync, pktName)
1367                         filePathPart := filePath + PartSuffix
1368                         state.Ctx.LogD("sp-file-open", lesp, func(les LEs) string {
1369                                 return logMsg(les) + ": opening part"
1370                         })
1371                         state.fdsLock.RLock()
1372                         fdAndFullSize, exists := state.fds[filePathPart]
1373                         state.fdsLock.RUnlock()
1374                         var fd *os.File
1375                         if exists {
1376                                 fd = fdAndFullSize.fd
1377                         } else {
1378                                 fd, err = os.OpenFile(
1379                                         filePathPart,
1380                                         os.O_RDWR|os.O_CREATE,
1381                                         os.FileMode(0666),
1382                                 )
1383                                 if err != nil {
1384                                         state.Ctx.LogE("sp-file-open", lesp, err, func(les LEs) string {
1385                                                 return logMsg(les) + ": opening part"
1386                                         })
1387                                         return nil, err
1388                                 }
1389                                 state.fdsLock.Lock()
1390                                 state.fds[filePathPart] = FdAndFullSize{fd: fd}
1391                                 state.fdsLock.Unlock()
1392                                 if file.Offset == 0 {
1393                                         h, err := blake2b.New256(nil)
1394                                         if err != nil {
1395                                                 panic(err)
1396                                         }
1397                                         state.fileHashers[filePath] = &HasherAndOffset{h: h}
1398                                 }
1399                         }
1400                         state.Ctx.LogD(
1401                                 "sp-file-seek",
1402                                 append(lesp, LE{"Offset", file.Offset}),
1403                                 func(les LEs) string {
1404                                         return fmt.Sprintf("%s: seeking %d", logMsg(les), file.Offset)
1405                                 })
1406                         if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
1407                                 state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
1408                                         return logMsg(les) + ": seeking"
1409                                 })
1410                                 state.closeFd(filePathPart)
1411                                 return nil, err
1412                         }
1413                         state.Ctx.LogD("sp-file-write", lesp, func(les LEs) string {
1414                                 return logMsg(les) + ": writing"
1415                         })
1416                         if _, err = fd.Write(file.Payload); err != nil {
1417                                 state.Ctx.LogE("sp-file-write", lesp, err, func(les LEs) string {
1418                                         return logMsg(les) + ": writing"
1419                                 })
1420                                 state.closeFd(filePathPart)
1421                                 return nil, err
1422                         }
1423                         hasherAndOffset, hasherExists := state.fileHashers[filePath]
1424                         if hasherExists {
1425                                 if hasherAndOffset.offset == file.Offset {
1426                                         if _, err = hasherAndOffset.h.Write(file.Payload); err != nil {
1427                                                 panic(err)
1428                                         }
1429                                         hasherAndOffset.offset += uint64(len(file.Payload))
1430                                 } else {
1431                                         state.Ctx.LogD(
1432                                                 "sp-file-offset-differs", lesp,
1433                                                 func(les LEs) string {
1434                                                         return logMsg(les) + ": offset differs, deleting hasher"
1435                                                 },
1436                                         )
1437                                         delete(state.fileHashers, filePath)
1438                                         hasherExists = false
1439                                 }
1440                         }
1441                         ourSize := int64(file.Offset + uint64(len(file.Payload)))
1442                         lesp[len(lesp)-1].V = ourSize
1443                         fullsize := int64(0)
1444                         state.RLock()
1445                         infoTheir, ok := state.infosTheir[*file.Hash]
1446                         state.RUnlock()
1447                         if ok {
1448                                 fullsize = int64(infoTheir.Size)
1449                         }
1450                         lesp = append(lesp, LE{"FullSize", fullsize})
1451                         if state.Ctx.ShowPrgrs {
1452                                 state.progressBars[pktName] = struct{}{}
1453                                 Progress("Rx", lesp)
1454                         }
1455                         if fullsize != ourSize {
1456                                 continue
1457                         }
1458                         if state.Ctx.ShowPrgrs {
1459                                 delete(state.progressBars, pktName)
1460                         }
1461                         logMsg = func(les LEs) string {
1462                                 return fmt.Sprintf(
1463                                         "Got packet %s %d%% (%s / %s)",
1464                                         pktName, 100*ourSize/fullsize,
1465                                         humanize.IBytes(uint64(ourSize)),
1466                                         humanize.IBytes(uint64(fullsize)),
1467                                 )
1468                         }
1469                         err = fd.Sync()
1470                         if err != nil {
1471                                 state.Ctx.LogE("sp-file-sync", lesp, err, func(les LEs) string {
1472                                         return logMsg(les) + ": syncing"
1473                                 })
1474                                 state.closeFd(filePathPart)
1475                                 continue
1476                         }
1477                         if hasherExists {
1478                                 if bytes.Compare(hasherAndOffset.h.Sum(nil), file.Hash[:]) != 0 {
1479                                         state.Ctx.LogE(
1480                                                 "sp-file-bad-checksum", lesp,
1481                                                 errors.New("checksum mismatch"),
1482                                                 logMsg,
1483                                         )
1484                                         continue
1485                                 }
1486                                 if err = os.Rename(filePathPart, filePath); err != nil {
1487                                         state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string {
1488                                                 return logMsg(les) + ": renaming"
1489                                         })
1490                                         continue
1491                                 }
1492                                 if err = DirSync(dirToSync); err != nil {
1493                                         state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string {
1494                                                 return logMsg(les) + ": dirsyncing"
1495                                         })
1496                                         continue
1497                                 }
1498                                 state.Ctx.LogI("sp-file-done", lesp, func(les LEs) string {
1499                                         return logMsg(les) + ": done"
1500                                 })
1501                                 state.wg.Add(1)
1502                                 go func() {
1503                                         state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
1504                                         state.wg.Done()
1505                                 }()
1506                                 state.Lock()
1507                                 delete(state.infosTheir, *file.Hash)
1508                                 state.Unlock()
1509                                 if !state.Ctx.HdrUsage {
1510                                         state.closeFd(filePathPart)
1511                                         continue
1512                                 }
1513                                 if _, err = fd.Seek(0, io.SeekStart); err != nil {
1514                                         state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
1515                                                 return logMsg(les) + ": seeking"
1516                                         })
1517                                         state.closeFd(filePathPart)
1518                                         continue
1519                                 }
1520                                 _, pktEncRaw, err := state.Ctx.HdrRead(fd)
1521                                 state.closeFd(filePathPart)
1522                                 if err != nil {
1523                                         state.Ctx.LogE("sp-file-hdr-read", lesp, err, func(les LEs) string {
1524                                                 return logMsg(les) + ": HdrReading"
1525                                         })
1526                                         continue
1527                                 }
1528                                 state.Ctx.HdrWrite(pktEncRaw, filePath)
1529                                 continue
1530                         }
1531                         state.closeFd(filePathPart)
1532                         if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil {
1533                                 state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string {
1534                                         return logMsg(les) + ": renaming"
1535                                 })
1536                                 continue
1537                         }
1538                         if err = DirSync(dirToSync); err != nil {
1539                                 state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string {
1540                                         return logMsg(les) + ": dirsyncing"
1541                                 })
1542                                 continue
1543                         }
1544                         state.Ctx.LogI("sp-file-downloaded", lesp, func(les LEs) string {
1545                                 return logMsg(les) + ": downloaded"
1546                         })
1547                         state.Lock()
1548                         delete(state.infosTheir, *file.Hash)
1549                         state.Unlock()
1550                         if !state.NoCK {
1551                                 state.checkerQueues.appeared <- file.Hash
1552                         }
1553
1554                 case SPTypeDone:
1555                         lesp := append(les, LE{"Type", "done"})
1556                         state.Ctx.LogD("sp-process-done-unmarshal", lesp, func(les LEs) string {
1557                                 return fmt.Sprintf(
1558                                         "SP with %s (nice %s): unmarshaling DONE",
1559                                         state.Node.Name, NicenessFmt(state.Nice),
1560                                 )
1561                         })
1562                         var done SPDone
1563                         if _, err = xdr.Unmarshal(r, &done); err != nil {
1564                                 state.Ctx.LogE("sp-process-done-unmarshal", lesp, err, func(les LEs) string {
1565                                         return fmt.Sprintf(
1566                                                 "SP with %s (nice %s): unmarshaling DONE",
1567                                                 state.Node.Name, NicenessFmt(state.Nice),
1568                                         )
1569                                 })
1570                                 return nil, err
1571                         }
1572                         pktName := Base32Codec.EncodeToString(done.Hash[:])
1573                         lesp = append(lesp, LE{"Pkt", pktName}, LE{"XX", string(TTx)})
1574                         logMsg := func(les LEs) string {
1575                                 return fmt.Sprintf(
1576                                         "SP with %s (nice %s): DONE: removing %s",
1577                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1578                                 )
1579                         }
1580                         state.Ctx.LogD("sp-done", lesp, logMsg)
1581                         pth := filepath.Join(
1582                                 state.Ctx.Spool,
1583                                 state.Node.Id.String(),
1584                                 string(TTx),
1585                                 pktName,
1586                         )
1587                         if err = os.Remove(pth); err == nil {
1588                                 state.Ctx.LogI("sp-done", lesp, func(les LEs) string {
1589                                         return fmt.Sprintf("Packet %s is sent", pktName)
1590                                 })
1591                                 if state.Ctx.HdrUsage {
1592                                         os.Remove(pth + HdrSuffix)
1593                                 }
1594                         } else {
1595                                 state.Ctx.LogE("sp-done", lesp, err, logMsg)
1596                         }
1597
1598                 case SPTypeFreq:
1599                         lesp := append(les, LE{"Type", "freq"})
1600                         state.Ctx.LogD("sp-process-freq", lesp, func(les LEs) string {
1601                                 return fmt.Sprintf(
1602                                         "SP with %s (nice %s): unmarshaling FREQ",
1603                                         state.Node.Name, NicenessFmt(state.Nice),
1604                                 )
1605                         })
1606                         var freq SPFreq
1607                         if _, err = xdr.Unmarshal(r, &freq); err != nil {
1608                                 state.Ctx.LogE("sp-process-freq", lesp, err, func(les LEs) string {
1609                                         return fmt.Sprintf(
1610                                                 "SP with %s (nice %s): unmarshaling FREQ",
1611                                                 state.Node.Name, NicenessFmt(state.Nice),
1612                                         )
1613                                 })
1614                                 return nil, err
1615                         }
1616                         pktName := Base32Codec.EncodeToString(freq.Hash[:])
1617                         lesp = append(lesp, LE{"Pkt", pktName}, LE{"Offset", freq.Offset})
1618                         state.Ctx.LogD("sp-process-freq-queueing", lesp, func(les LEs) string {
1619                                 return fmt.Sprintf(
1620                                         "SP with %s (nice %s): FREQ %s: queuing",
1621                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1622                                 )
1623                         })
1624                         nice, exists := state.infosOurSeen[*freq.Hash]
1625                         if exists {
1626                                 if state.onlyPkts == nil || !state.onlyPkts[*freq.Hash] {
1627                                         state.Lock()
1628                                         insertIdx := 0
1629                                         var freqWithNice *FreqWithNice
1630                                         for insertIdx, freqWithNice = range state.queueTheir {
1631                                                 if freqWithNice.nice > nice {
1632                                                         break
1633                                                 }
1634                                         }
1635                                         state.queueTheir = append(state.queueTheir, nil)
1636                                         copy(state.queueTheir[insertIdx+1:], state.queueTheir[insertIdx:])
1637                                         state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice}
1638                                         state.Unlock()
1639                                 } else {
1640                                         state.Ctx.LogD("sp-process-freq-skip", lesp, func(les LEs) string {
1641                                                 return fmt.Sprintf(
1642                                                         "SP with %s (nice %s): FREQ %s: skipping",
1643                                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1644                                                 )
1645                                         })
1646                                 }
1647                         } else {
1648                                 state.Ctx.LogD("sp-process-freq-unknown", lesp, func(les LEs) string {
1649                                         return fmt.Sprintf(
1650                                                 "SP with %s (nice %s): FREQ %s: unknown",
1651                                                 state.Node.Name, NicenessFmt(state.Nice), pktName,
1652                                         )
1653                                 })
1654                         }
1655
1656                 default:
1657                         state.Ctx.LogE(
1658                                 "sp-process-type-unknown",
1659                                 append(les, LE{"Type", head.Type}),
1660                                 errors.New("unknown type"),
1661                                 func(les LEs) string {
1662                                         return fmt.Sprintf(
1663                                                 "SP with %s (nice %s): %d",
1664                                                 state.Node.Name, NicenessFmt(state.Nice), head.Type,
1665                                         )
1666                                 },
1667                         )
1668                         return nil, BadPktType
1669                 }
1670         }
1671
1672         if infosGot {
1673                 var pkts int
1674                 var size uint64
1675                 state.RLock()
1676                 for _, info := range state.infosTheir {
1677                         pkts++
1678                         size += info.Size
1679                 }
1680                 state.RUnlock()
1681                 state.Ctx.LogI("sp-infos-rx", LEs{
1682                         {"XX", string(TRx)},
1683                         {"Node", state.Node.Id},
1684                         {"Pkts", pkts},
1685                         {"Size", int64(size)},
1686                 }, func(les LEs) string {
1687                         return fmt.Sprintf(
1688                                 "%s has got for us: %d packets, %s",
1689                                 state.Node.Name, pkts, humanize.IBytes(size),
1690                         )
1691                 })
1692         }
1693         return payloadsSplit(replies), nil
1694 }