]> Cypherpunks.ru repositories - nncp.git/blob - src/sp.go
Debug friendly if conditions
[nncp.git] / src / sp.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bytes"
22         "crypto/subtle"
23         "errors"
24         "fmt"
25         "hash"
26         "io"
27         "os"
28         "path/filepath"
29         "sort"
30         "sync"
31         "time"
32
33         xdr "github.com/davecgh/go-xdr/xdr2"
34         "github.com/dustin/go-humanize"
35         "github.com/flynn/noise"
36         "golang.org/x/crypto/blake2b"
37 )
38
39 const (
40         MaxSPSize      = 1<<16 - 256
41         PartSuffix     = ".part"
42         SPHeadOverhead = 4
43 )
44
45 type SPCheckerQueues struct {
46         appeared chan *[32]byte
47         checked  chan *[32]byte
48 }
49
50 var (
51         MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1}
52
53         SPInfoOverhead    int
54         SPFreqOverhead    int
55         SPFileOverhead    int
56         SPHaltMarshalized []byte
57         SPPingMarshalized []byte
58
59         NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite(
60                 noise.DH25519,
61                 noise.CipherChaChaPoly,
62                 noise.HashBLAKE2b,
63         )
64
65         DefaultDeadline = 10 * time.Second
66         PingTimeout     = time.Minute
67
68         spCheckers = make(map[NodeId]*SPCheckerQueues)
69 )
70
71 type FdAndFullSize struct {
72         fd       *os.File
73         fullSize int64
74 }
75
76 type HasherAndOffset struct {
77         h      hash.Hash
78         offset uint64
79 }
80
81 type SPType uint8
82
83 const (
84         SPTypeInfo SPType = iota
85         SPTypeFreq SPType = iota
86         SPTypeFile SPType = iota
87         SPTypeDone SPType = iota
88         SPTypeHalt SPType = iota
89         SPTypePing SPType = iota
90 )
91
92 type SPHead struct {
93         Type SPType
94 }
95
96 type SPInfo struct {
97         Nice uint8
98         Size uint64
99         Hash *[32]byte
100 }
101
102 type SPFreq struct {
103         Hash   *[32]byte
104         Offset uint64
105 }
106
107 type SPFile struct {
108         Hash    *[32]byte
109         Offset  uint64
110         Payload []byte
111 }
112
113 type SPDone struct {
114         Hash *[32]byte
115 }
116
117 type SPRaw struct {
118         Magic   [8]byte
119         Payload []byte
120 }
121
122 type FreqWithNice struct {
123         freq *SPFreq
124         nice uint8
125 }
126
127 type ConnDeadlined interface {
128         io.ReadWriteCloser
129         SetReadDeadline(t time.Time) error
130         SetWriteDeadline(t time.Time) error
131 }
132
133 func init() {
134         var buf bytes.Buffer
135         spHead := SPHead{Type: SPTypeHalt}
136         if _, err := xdr.Marshal(&buf, spHead); err != nil {
137                 panic(err)
138         }
139         SPHaltMarshalized = make([]byte, SPHeadOverhead)
140         copy(SPHaltMarshalized, buf.Bytes())
141         buf.Reset()
142
143         spHead = SPHead{Type: SPTypePing}
144         if _, err := xdr.Marshal(&buf, spHead); err != nil {
145                 panic(err)
146         }
147         SPPingMarshalized = make([]byte, SPHeadOverhead)
148         copy(SPPingMarshalized, buf.Bytes())
149         buf.Reset()
150
151         spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([32]byte)}
152         if _, err := xdr.Marshal(&buf, spInfo); err != nil {
153                 panic(err)
154         }
155         SPInfoOverhead = buf.Len()
156         buf.Reset()
157
158         spFreq := SPFreq{Hash: new([32]byte), Offset: 123}
159         if _, err := xdr.Marshal(&buf, spFreq); err != nil {
160                 panic(err)
161         }
162         SPFreqOverhead = buf.Len()
163         buf.Reset()
164
165         spFile := SPFile{Hash: new([32]byte), Offset: 123}
166         if _, err := xdr.Marshal(&buf, spFile); err != nil {
167                 panic(err)
168         }
169         SPFileOverhead = buf.Len()
170 }
171
172 func MarshalSP(typ SPType, sp interface{}) []byte {
173         var buf bytes.Buffer
174         if _, err := xdr.Marshal(&buf, SPHead{typ}); err != nil {
175                 panic(err)
176         }
177         if _, err := xdr.Marshal(&buf, sp); err != nil {
178                 panic(err)
179         }
180         return buf.Bytes()
181 }
182
183 func payloadsSplit(payloads [][]byte) [][]byte {
184         var outbounds [][]byte
185         outbound := make([]byte, 0, MaxSPSize)
186         for i, payload := range payloads {
187                 outbound = append(outbound, payload...)
188                 if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxSPSize {
189                         outbounds = append(outbounds, outbound)
190                         outbound = make([]byte, 0, MaxSPSize)
191                 }
192         }
193         if len(outbound) > 0 {
194                 outbounds = append(outbounds, outbound)
195         }
196         return outbounds
197 }
198
199 type SPState struct {
200         Ctx            *Ctx
201         Node           *Node
202         Nice           uint8
203         NoCK           bool
204         onlineDeadline time.Duration
205         maxOnlineTime  time.Duration
206         hs             *noise.HandshakeState
207         csOur          *noise.CipherState
208         csTheir        *noise.CipherState
209         payloads       chan []byte
210         pings          chan struct{}
211         infosTheir     map[[32]byte]*SPInfo
212         infosOurSeen   map[[32]byte]uint8
213         queueTheir     []*FreqWithNice
214         wg             sync.WaitGroup
215         RxBytes        int64
216         RxLastSeen     time.Time
217         RxLastNonPing  time.Time
218         TxBytes        int64
219         TxLastSeen     time.Time
220         TxLastNonPing  time.Time
221         started        time.Time
222         mustFinishAt   time.Time
223         Duration       time.Duration
224         RxSpeed        int64
225         TxSpeed        int64
226         rxLock         *os.File
227         txLock         *os.File
228         xxOnly         TRxTx
229         rxRate         int
230         txRate         int
231         isDead         chan struct{}
232         listOnly       bool
233         onlyPkts       map[[32]byte]bool
234         writeSPBuf     bytes.Buffer
235         fds            map[string]FdAndFullSize
236         fdsLock        sync.RWMutex
237         fileHashers    map[string]*HasherAndOffset
238         checkerQueues  SPCheckerQueues
239         progressBars   map[string]struct{}
240         sync.RWMutex
241 }
242
243 func (state *SPState) SetDead() {
244         state.Lock()
245         defer state.Unlock()
246         select {
247         case <-state.isDead:
248                 // Already closed channel, dead
249                 return
250         default:
251         }
252         close(state.isDead)
253         go func() {
254                 for range state.payloads {
255                 }
256         }()
257         go func() {
258                 for range state.pings {
259                 }
260         }()
261         go func() {
262                 for _, s := range state.fds {
263                         s.fd.Close()
264                 }
265         }()
266 }
267
268 func (state *SPState) NotAlive() bool {
269         select {
270         case <-state.isDead:
271                 return true
272         default:
273         }
274         return false
275 }
276
277 func (state *SPState) dirUnlock() {
278         state.Ctx.UnlockDir(state.rxLock)
279         state.Ctx.UnlockDir(state.txLock)
280 }
281
282 func SPChecker(ctx *Ctx, nodeId *NodeId, appeared, checked chan *[32]byte) {
283         for hshValue := range appeared {
284                 pktName := Base32Codec.EncodeToString(hshValue[:])
285                 les := LEs{
286                         {"XX", string(TRx)},
287                         {"Node", nodeId},
288                         {"Pkt", pktName},
289                 }
290                 ctx.LogD("sp-checker", les, func(les LEs) string {
291                         return fmt.Sprintf("Checksumming %s/rx/%s", ctx.NodeName(nodeId), pktName)
292                 })
293                 size, err := ctx.CheckNoCK(nodeId, hshValue)
294                 les = append(les, LE{"Size", size})
295                 if err != nil {
296                         ctx.LogE("sp-checker", les, err, func(les LEs) string {
297                                 return fmt.Sprintf(
298                                         "Checksumming %s/rx/%s (%s)", ctx.NodeName(nodeId), pktName,
299                                         humanize.IBytes(uint64(size)),
300                                 )
301                         })
302                         continue
303                 }
304                 ctx.LogI("sp-checker-done", les, func(les LEs) string {
305                         return fmt.Sprintf(
306                                 "Packet %s is retreived (%s)",
307                                 pktName, humanize.IBytes(uint64(size)),
308                         )
309                 })
310                 go func(hsh *[32]byte) { checked <- hsh }(hshValue)
311         }
312 }
313
314 func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error {
315         state.writeSPBuf.Reset()
316         n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{
317                 Magic:   MagicNNCPLv1,
318                 Payload: payload,
319         })
320         if err != nil {
321                 return err
322         }
323         if n, err = dst.Write(state.writeSPBuf.Bytes()); err == nil {
324                 state.TxLastSeen = time.Now()
325                 state.TxBytes += int64(n)
326                 if !ping {
327                         state.TxLastNonPing = state.TxLastSeen
328                 }
329         }
330         return err
331 }
332
333 func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
334         var sp SPRaw
335         n, err := xdr.UnmarshalLimited(src, &sp, 1<<17)
336         if err != nil {
337                 ue := err.(*xdr.UnmarshalError)
338                 if ue.Err == io.EOF {
339                         return nil, ue.Err
340                 }
341                 return nil, err
342         }
343         state.RxLastSeen = time.Now()
344         state.RxBytes += int64(n)
345         if sp.Magic != MagicNNCPLv1 {
346                 return nil, BadMagic
347         }
348         return sp.Payload, nil
349 }
350
351 func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [][]byte {
352         var infos []*SPInfo
353         var totalSize int64
354         for job := range ctx.Jobs(nodeId, TTx) {
355                 if job.PktEnc.Nice > nice {
356                         continue
357                 }
358                 if _, known := (*seen)[*job.HshValue]; known {
359                         continue
360                 }
361                 totalSize += job.Size
362                 infos = append(infos, &SPInfo{
363                         Nice: job.PktEnc.Nice,
364                         Size: uint64(job.Size),
365                         Hash: job.HshValue,
366                 })
367                 (*seen)[*job.HshValue] = job.PktEnc.Nice
368         }
369         sort.Sort(ByNice(infos))
370         var payloads [][]byte
371         for _, info := range infos {
372                 payloads = append(payloads, MarshalSP(SPTypeInfo, info))
373                 pktName := Base32Codec.EncodeToString(info.Hash[:])
374                 ctx.LogD("sp-info-our", LEs{
375                         {"Node", nodeId},
376                         {"Name", pktName},
377                         {"Size", info.Size},
378                 }, func(les LEs) string {
379                         return fmt.Sprintf(
380                                 "Our info: %s/tx/%s (%s)",
381                                 ctx.NodeName(nodeId),
382                                 pktName,
383                                 humanize.IBytes(info.Size),
384                         )
385                 })
386         }
387         if totalSize > 0 {
388                 ctx.LogI("sp-infos-tx", LEs{
389                         {"XX", string(TTx)},
390                         {"Node", nodeId},
391                         {"Pkts", len(payloads)},
392                         {"Size", totalSize},
393                 }, func(les LEs) string {
394                         return fmt.Sprintf(
395                                 "We have got for %s: %d packets, %s",
396                                 ctx.NodeName(nodeId),
397                                 len(payloads),
398                                 humanize.IBytes(uint64(totalSize)),
399                         )
400                 })
401         }
402         return payloadsSplit(payloads)
403 }
404
405 func (state *SPState) StartI(conn ConnDeadlined) error {
406         nodeId := state.Node.Id
407         err := state.Ctx.ensureRxDir(nodeId)
408         if err != nil {
409                 return err
410         }
411         var rxLock *os.File
412         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TRx) {
413                 rxLock, err = state.Ctx.LockDir(nodeId, string(TRx))
414                 if err != nil {
415                         return err
416                 }
417         }
418         var txLock *os.File
419         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
420                 txLock, err = state.Ctx.LockDir(nodeId, string(TTx))
421                 if err != nil {
422                         return err
423                 }
424         }
425         started := time.Now()
426         conf := noise.Config{
427                 CipherSuite: NoiseCipherSuite,
428                 Pattern:     noise.HandshakeIK,
429                 Initiator:   true,
430                 StaticKeypair: noise.DHKey{
431                         Private: state.Ctx.Self.NoisePrv[:],
432                         Public:  state.Ctx.Self.NoisePub[:],
433                 },
434                 PeerStatic: state.Node.NoisePub[:],
435         }
436         hs, err := noise.NewHandshakeState(conf)
437         if err != nil {
438                 return err
439         }
440         state.hs = hs
441         state.payloads = make(chan []byte)
442         state.pings = make(chan struct{})
443         state.infosTheir = make(map[[32]byte]*SPInfo)
444         state.infosOurSeen = make(map[[32]byte]uint8)
445         state.progressBars = make(map[string]struct{})
446         state.started = started
447         state.rxLock = rxLock
448         state.txLock = txLock
449
450         var infosPayloads [][]byte
451         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
452                 infosPayloads = state.Ctx.infosOur(nodeId, state.Nice, &state.infosOurSeen)
453         }
454         var firstPayload []byte
455         if len(infosPayloads) > 0 {
456                 firstPayload = infosPayloads[0]
457         }
458         // Pad first payload, to hide actual number of existing files
459         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
460                 firstPayload = append(firstPayload, SPHaltMarshalized...)
461         }
462
463         var buf []byte
464         var payload []byte
465         buf, _, _, err = state.hs.WriteMessage(nil, firstPayload)
466         if err != nil {
467                 state.dirUnlock()
468                 return err
469         }
470         les := LEs{{"Node", nodeId}, {"Nice", int(state.Nice)}}
471         state.Ctx.LogD("sp-startI", les, func(les LEs) string {
472                 return fmt.Sprintf(
473                         "SP with %s (nice %s): sending first message",
474                         state.Node.Name,
475                         NicenessFmt(state.Nice),
476                 )
477         })
478         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
479         if err = state.WriteSP(conn, buf, false); err != nil {
480                 state.Ctx.LogE("sp-startI", les, err, func(les LEs) string {
481                         return fmt.Sprintf(
482                                 "SP with %s (nice %s): writing",
483                                 state.Node.Name,
484                                 NicenessFmt(state.Nice),
485                         )
486                 })
487                 state.dirUnlock()
488                 return err
489         }
490         state.Ctx.LogD("sp-startI-wait", les, func(les LEs) string {
491                 return fmt.Sprintf(
492                         "SP with %s (nice %s): waiting for first message",
493                         state.Node.Name,
494                         NicenessFmt(state.Nice),
495                 )
496         })
497         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
498         if buf, err = state.ReadSP(conn); err != nil {
499                 state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string {
500                         return fmt.Sprintf(
501                                 "SP with %s (nice %s): reading",
502                                 state.Node.Name,
503                                 NicenessFmt(state.Nice),
504                         )
505                 })
506                 state.dirUnlock()
507                 return err
508         }
509         payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
510         if err != nil {
511                 state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string {
512                         return fmt.Sprintf(
513                                 "SP with %s (nice %s): reading Noise message",
514                                 state.Node.Name,
515                                 NicenessFmt(state.Nice),
516                         )
517                 })
518                 state.dirUnlock()
519                 return err
520         }
521         state.Ctx.LogD("sp-startI-workers", les, func(les LEs) string {
522                 return fmt.Sprintf(
523                         "SP with %s (nice %s): starting workers",
524                         state.Node.Name,
525                         NicenessFmt(state.Nice),
526                 )
527         })
528         err = state.StartWorkers(conn, infosPayloads, payload)
529         if err != nil {
530                 state.Ctx.LogE("sp-startI-workers", les, err, func(les LEs) string {
531                         return fmt.Sprintf(
532                                 "SP with %s (nice %s): starting workers",
533                                 state.Node.Name,
534                                 NicenessFmt(state.Nice),
535                         )
536                 })
537                 state.dirUnlock()
538         }
539         return err
540 }
541
542 func (state *SPState) StartR(conn ConnDeadlined) error {
543         started := time.Now()
544         conf := noise.Config{
545                 CipherSuite: NoiseCipherSuite,
546                 Pattern:     noise.HandshakeIK,
547                 Initiator:   false,
548                 StaticKeypair: noise.DHKey{
549                         Private: state.Ctx.Self.NoisePrv[:],
550                         Public:  state.Ctx.Self.NoisePub[:],
551                 },
552         }
553         hs, err := noise.NewHandshakeState(conf)
554         if err != nil {
555                 return err
556         }
557         xxOnly := TRxTx("")
558         state.hs = hs
559         state.payloads = make(chan []byte)
560         state.pings = make(chan struct{})
561         state.infosOurSeen = make(map[[32]byte]uint8)
562         state.infosTheir = make(map[[32]byte]*SPInfo)
563         state.progressBars = make(map[string]struct{})
564         state.started = started
565         state.xxOnly = xxOnly
566
567         var buf []byte
568         var payload []byte
569         logMsg := func(les LEs) string {
570                 return fmt.Sprintf(
571                         "SP nice %s: waiting for first message",
572                         NicenessFmt(state.Nice),
573                 )
574         }
575         les := LEs{{"Nice", int(state.Nice)}}
576         state.Ctx.LogD("sp-startR", les, logMsg)
577         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
578         if buf, err = state.ReadSP(conn); err != nil {
579                 state.Ctx.LogE("sp-startR-read", les, err, logMsg)
580                 return err
581         }
582         if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
583                 state.Ctx.LogE("sp-startR-read", les, err, logMsg)
584                 return err
585         }
586
587         var node *Node
588         for _, n := range state.Ctx.Neigh {
589                 if subtle.ConstantTimeCompare(state.hs.PeerStatic(), n.NoisePub[:]) == 1 {
590                         node = n
591                         break
592                 }
593         }
594         if node == nil {
595                 peerId := Base32Codec.EncodeToString(state.hs.PeerStatic())
596                 err = errors.New("unknown peer: " + peerId)
597                 state.Ctx.LogE("sp-startR-unknown", append(les, LE{"Peer", peerId}), err, logMsg)
598                 return err
599         }
600         state.Node = node
601         state.rxRate = node.RxRate
602         state.txRate = node.TxRate
603         state.onlineDeadline = node.OnlineDeadline
604         state.maxOnlineTime = node.MaxOnlineTime
605         les = LEs{{"Node", node.Id}, {"Nice", int(state.Nice)}}
606
607         if err = state.Ctx.ensureRxDir(node.Id); err != nil {
608                 return err
609         }
610         var rxLock *os.File
611         if xxOnly == "" || xxOnly == TRx {
612                 rxLock, err = state.Ctx.LockDir(node.Id, string(TRx))
613                 if err != nil {
614                         return err
615                 }
616         }
617         state.rxLock = rxLock
618         var txLock *os.File
619         if xxOnly == "" || xxOnly == TTx {
620                 txLock, err = state.Ctx.LockDir(node.Id, string(TTx))
621                 if err != nil {
622                         return err
623                 }
624         }
625         state.txLock = txLock
626
627         var infosPayloads [][]byte
628         if xxOnly == "" || xxOnly == TTx {
629                 infosPayloads = state.Ctx.infosOur(node.Id, state.Nice, &state.infosOurSeen)
630         }
631         var firstPayload []byte
632         if len(infosPayloads) > 0 {
633                 firstPayload = infosPayloads[0]
634         }
635         // Pad first payload, to hide actual number of existing files
636         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
637                 firstPayload = append(firstPayload, SPHaltMarshalized...)
638         }
639
640         state.Ctx.LogD("sp-startR-write", les, func(les LEs) string {
641                 return fmt.Sprintf(
642                         "SP with %s (nice %s): sending first message",
643                         node.Name, NicenessFmt(state.Nice),
644                 )
645         })
646         buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload)
647         if err != nil {
648                 state.dirUnlock()
649                 return err
650         }
651         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
652         if err = state.WriteSP(conn, buf, false); err != nil {
653                 state.Ctx.LogE("sp-startR-write", les, err, func(les LEs) string {
654                         return fmt.Sprintf(
655                                 "SP with %s (nice %s): writing",
656                                 node.Name, NicenessFmt(state.Nice),
657                         )
658                 })
659                 state.dirUnlock()
660                 return err
661         }
662         state.Ctx.LogD("sp-startR-workers", les, func(les LEs) string {
663                 return fmt.Sprintf(
664                         "SP with %s (nice %s): starting workers",
665                         node.Name, NicenessFmt(state.Nice),
666                 )
667         })
668         err = state.StartWorkers(conn, infosPayloads, payload)
669         if err != nil {
670                 state.dirUnlock()
671         }
672         return err
673 }
674
675 func (state *SPState) closeFd(pth string) {
676         state.fdsLock.Lock()
677         if s, exists := state.fds[pth]; exists {
678                 delete(state.fds, pth)
679                 s.fd.Close()
680         }
681         state.fdsLock.Unlock()
682 }
683
684 func (state *SPState) StartWorkers(
685         conn ConnDeadlined,
686         infosPayloads [][]byte,
687         payload []byte,
688 ) error {
689         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
690         state.fds = make(map[string]FdAndFullSize)
691         state.fileHashers = make(map[string]*HasherAndOffset)
692         state.isDead = make(chan struct{})
693         if state.maxOnlineTime > 0 {
694                 state.mustFinishAt = state.started.Add(state.maxOnlineTime)
695         }
696
697         // Checker
698         if !state.NoCK {
699                 queues := spCheckers[*state.Node.Id]
700                 if queues == nil {
701                         queues = &SPCheckerQueues{
702                                 appeared: make(chan *[32]byte),
703                                 checked:  make(chan *[32]byte),
704                         }
705                         spCheckers[*state.Node.Id] = queues
706                         go SPChecker(state.Ctx, state.Node.Id, queues.appeared, queues.checked)
707                 }
708                 state.checkerQueues = *queues
709                 go func() {
710                         for job := range state.Ctx.JobsNoCK(state.Node.Id) {
711                                 if job.PktEnc.Nice <= state.Nice {
712                                         state.checkerQueues.appeared <- job.HshValue
713                                 }
714                         }
715                 }()
716                 state.wg.Add(1)
717                 go func() {
718                         defer state.wg.Done()
719                         for {
720                                 select {
721                                 case <-state.isDead:
722                                         return
723                                 case hsh := <-state.checkerQueues.checked:
724                                         state.payloads <- MarshalSP(SPTypeDone, SPDone{hsh})
725                                 }
726                         }
727                 }()
728         }
729
730         // Remaining handshake payload sending
731         if len(infosPayloads) > 1 {
732                 state.wg.Add(1)
733                 go func() {
734                         for _, payload := range infosPayloads[1:] {
735                                 state.Ctx.LogD(
736                                         "sp-queue-remaining",
737                                         append(les, LE{"Size", int64(len(payload))}),
738                                         func(les LEs) string {
739                                                 return fmt.Sprintf(
740                                                         "SP with %s (nice %s): queuing remaining payload (%s)",
741                                                         state.Node.Name, NicenessFmt(state.Nice),
742                                                         humanize.IBytes(uint64(len(payload))),
743                                                 )
744                                         },
745                                 )
746                                 state.payloads <- payload
747                         }
748                         state.wg.Done()
749                 }()
750         }
751
752         // Processing of first payload and queueing its responses
753         logMsg := func(les LEs) string {
754                 return fmt.Sprintf(
755                         "SP with %s (nice %s): processing first payload (%s)",
756                         state.Node.Name, NicenessFmt(state.Nice),
757                         humanize.IBytes(uint64(len(payload))),
758                 )
759         }
760         state.Ctx.LogD("sp-process", append(les, LE{"Size", int64(len(payload))}), logMsg)
761         replies, err := state.ProcessSP(payload)
762         if err != nil {
763                 state.Ctx.LogE("sp-process", les, err, logMsg)
764                 return err
765         }
766         state.wg.Add(1)
767         go func() {
768                 for _, reply := range replies {
769                         state.Ctx.LogD(
770                                 "sp-queue-reply",
771                                 append(les, LE{"Size", int64(len(reply))}),
772                                 func(les LEs) string {
773                                         return fmt.Sprintf(
774                                                 "SP with %s (nice %s): queuing reply (%s)",
775                                                 state.Node.Name, NicenessFmt(state.Nice),
776                                                 humanize.IBytes(uint64(len(payload))),
777                                         )
778                                 },
779                         )
780                         state.payloads <- reply
781                 }
782                 state.wg.Done()
783         }()
784
785         // Periodic jobs
786         state.wg.Add(1)
787         go func() {
788                 deadlineTicker := time.NewTicker(time.Second)
789                 pingTicker := time.NewTicker(PingTimeout)
790                 for {
791                         select {
792                         case <-state.isDead:
793                                 state.wg.Done()
794                                 deadlineTicker.Stop()
795                                 pingTicker.Stop()
796                                 return
797                         case now := <-deadlineTicker.C:
798                                 if now.Sub(state.RxLastNonPing) >= state.onlineDeadline &&
799                                         now.Sub(state.TxLastNonPing) >= state.onlineDeadline {
800                                         goto Deadlined
801                                 }
802                                 if state.maxOnlineTime > 0 && state.mustFinishAt.Before(now) {
803                                         goto Deadlined
804                                 }
805                                 if now.Sub(state.RxLastSeen) >= 2*PingTimeout {
806                                         goto Deadlined
807                                 }
808                                 break
809                         Deadlined:
810                                 state.SetDead()
811                                 conn.Close() // #nosec G104
812                         case now := <-pingTicker.C:
813                                 if now.After(state.TxLastSeen.Add(PingTimeout)) {
814                                         state.wg.Add(1)
815                                         go func() {
816                                                 state.pings <- struct{}{}
817                                                 state.wg.Done()
818                                         }()
819                                 }
820                         }
821                 }
822         }()
823
824         // Spool checker and INFOs sender of appearing files
825         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
826                 state.wg.Add(1)
827                 go func() {
828                         ticker := time.NewTicker(time.Second)
829                         for {
830                                 select {
831                                 case <-state.isDead:
832                                         state.wg.Done()
833                                         ticker.Stop()
834                                         return
835                                 case <-ticker.C:
836                                         for _, payload := range state.Ctx.infosOur(
837                                                 state.Node.Id,
838                                                 state.Nice,
839                                                 &state.infosOurSeen,
840                                         ) {
841                                                 state.Ctx.LogD(
842                                                         "sp-queue-info",
843                                                         append(les, LE{"Size", int64(len(payload))}),
844                                                         func(les LEs) string {
845                                                                 return fmt.Sprintf(
846                                                                         "SP with %s (nice %s): queuing new info (%s)",
847                                                                         state.Node.Name, NicenessFmt(state.Nice),
848                                                                         humanize.IBytes(uint64(len(payload))),
849                                                                 )
850                                                         },
851                                                 )
852                                                 state.payloads <- payload
853                                         }
854                                 }
855                         }
856                 }()
857         }
858
859         // Sender
860         state.wg.Add(1)
861         go func() {
862                 defer conn.Close()
863                 defer state.SetDead()
864                 defer state.wg.Done()
865                 for {
866                         if state.NotAlive() {
867                                 return
868                         }
869                         var payload []byte
870                         var ping bool
871                         select {
872                         case <-state.pings:
873                                 state.Ctx.LogD("sp-got-ping", les, func(les LEs) string {
874                                         return fmt.Sprintf(
875                                                 "SP with %s (nice %s): got ping",
876                                                 state.Node.Name, NicenessFmt(state.Nice),
877                                         )
878                                 })
879                                 payload = SPPingMarshalized
880                                 ping = true
881                         case payload = <-state.payloads:
882                                 state.Ctx.LogD(
883                                         "sp-got-payload",
884                                         append(les, LE{"Size", int64(len(payload))}),
885                                         func(les LEs) string {
886                                                 return fmt.Sprintf(
887                                                         "SP with %s (nice %s): got payload (%s)",
888                                                         state.Node.Name, NicenessFmt(state.Nice),
889                                                         humanize.IBytes(uint64(len(payload))),
890                                                 )
891                                         },
892                                 )
893                         default:
894                                 state.RLock()
895                                 if len(state.queueTheir) == 0 {
896                                         state.RUnlock()
897                                         time.Sleep(100 * time.Millisecond)
898                                         continue
899                                 }
900                                 freq := state.queueTheir[0].freq
901                                 state.RUnlock()
902                                 if state.txRate > 0 {
903                                         time.Sleep(time.Second / time.Duration(state.txRate))
904                                 }
905                                 pktName := Base32Codec.EncodeToString(freq.Hash[:])
906                                 lesp := append(
907                                         les,
908                                         LE{"XX", string(TTx)},
909                                         LE{"Pkt", pktName},
910                                         LE{"Size", int64(freq.Offset)},
911                                 )
912                                 logMsg := func(les LEs) string {
913                                         return fmt.Sprintf(
914                                                 "SP with %s (nice %s): tx/%s (%s)",
915                                                 state.Node.Name, NicenessFmt(state.Nice),
916                                                 pktName,
917                                                 humanize.IBytes(freq.Offset),
918                                         )
919                                 }
920                                 state.Ctx.LogD("sp-queue", lesp, func(les LEs) string {
921                                         return logMsg(les) + ": queueing"
922                                 })
923                                 pth := filepath.Join(
924                                         state.Ctx.Spool,
925                                         state.Node.Id.String(),
926                                         string(TTx),
927                                         Base32Codec.EncodeToString(freq.Hash[:]),
928                                 )
929                                 state.fdsLock.RLock()
930                                 fdAndFullSize, exists := state.fds[pth]
931                                 state.fdsLock.RUnlock()
932                                 if !exists {
933                                         fd, err := os.Open(pth)
934                                         if err != nil {
935                                                 state.Ctx.LogE("sp-queue-open", lesp, err, func(les LEs) string {
936                                                         return logMsg(les) + ": opening"
937                                                 })
938                                                 return
939                                         }
940                                         fi, err := fd.Stat()
941                                         if err != nil {
942                                                 state.Ctx.LogE("sp-queue-stat", lesp, err, func(les LEs) string {
943                                                         return logMsg(les) + ": stating"
944                                                 })
945                                                 return
946                                         }
947                                         fdAndFullSize = FdAndFullSize{fd: fd, fullSize: fi.Size()}
948                                         state.fdsLock.Lock()
949                                         state.fds[pth] = fdAndFullSize
950                                         state.fdsLock.Unlock()
951                                 }
952                                 fd := fdAndFullSize.fd
953                                 fullSize := fdAndFullSize.fullSize
954                                 var buf []byte
955                                 if freq.Offset < uint64(fullSize) {
956                                         state.Ctx.LogD("sp-file-seek", lesp, func(les LEs) string {
957                                                 return logMsg(les) + ": seeking"
958                                         })
959                                         if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil {
960                                                 state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
961                                                         return logMsg(les) + ": seeking"
962                                                 })
963                                                 return
964                                         }
965                                         buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
966                                         n, err := fd.Read(buf)
967                                         if err != nil {
968                                                 state.Ctx.LogE("sp-file-read", lesp, err, func(les LEs) string {
969                                                         return logMsg(les) + ": reading"
970                                                 })
971                                                 return
972                                         }
973                                         buf = buf[:n]
974                                         lesp = append(
975                                                 les,
976                                                 LE{"XX", string(TTx)},
977                                                 LE{"Pkt", pktName},
978                                                 LE{"Size", int64(n)},
979                                         )
980                                         state.Ctx.LogD("sp-file-read", lesp, func(les LEs) string {
981                                                 return fmt.Sprintf(
982                                                         "%s: read %s",
983                                                         logMsg(les), humanize.IBytes(uint64(n)),
984                                                 )
985                                         })
986                                 }
987                                 state.closeFd(pth)
988                                 payload = MarshalSP(SPTypeFile, SPFile{
989                                         Hash:    freq.Hash,
990                                         Offset:  freq.Offset,
991                                         Payload: buf,
992                                 })
993                                 ourSize := freq.Offset + uint64(len(buf))
994                                 lesp = append(
995                                         les,
996                                         LE{"XX", string(TTx)},
997                                         LE{"Pkt", pktName},
998                                         LE{"Size", int64(ourSize)},
999                                         LE{"FullSize", fullSize},
1000                                 )
1001                                 if state.Ctx.ShowPrgrs {
1002                                         state.progressBars[pktName] = struct{}{}
1003                                         Progress("Tx", lesp)
1004                                 }
1005                                 state.Lock()
1006                                 if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash {
1007                                         if ourSize == uint64(fullSize) {
1008                                                 state.Ctx.LogD("sp-file-finished", lesp, func(les LEs) string {
1009                                                         return logMsg(les) + ": finished"
1010                                                 })
1011                                                 if len(state.queueTheir) > 1 {
1012                                                         state.queueTheir = state.queueTheir[1:]
1013                                                 } else {
1014                                                         state.queueTheir = state.queueTheir[:0]
1015                                                 }
1016                                                 if state.Ctx.ShowPrgrs {
1017                                                         delete(state.progressBars, pktName)
1018                                                 }
1019                                         } else {
1020                                                 state.queueTheir[0].freq.Offset += uint64(len(buf))
1021                                         }
1022                                 } else {
1023                                         state.Ctx.LogD("sp-file-disappeared", lesp, func(les LEs) string {
1024                                                 return logMsg(les) + ": queue disappeared"
1025                                         })
1026                                 }
1027                                 state.Unlock()
1028                         }
1029                         logMsg := func(les LEs) string {
1030                                 return fmt.Sprintf(
1031                                         "SP with %s (nice %s): sending %s",
1032                                         state.Node.Name, NicenessFmt(state.Nice),
1033                                         humanize.IBytes(uint64(len(payload))),
1034                                 )
1035                         }
1036                         state.Ctx.LogD("sp-sending", append(les, LE{"Size", int64(len(payload))}), logMsg)
1037                         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
1038                         if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload), ping); err != nil {
1039                                 state.Ctx.LogE("sp-sending", les, err, logMsg)
1040                                 return
1041                         }
1042                 }
1043         }()
1044
1045         // Receiver
1046         state.wg.Add(1)
1047         go func() {
1048                 for {
1049                         if state.NotAlive() {
1050                                 break
1051                         }
1052                         logMsg := func(les LEs) string {
1053                                 return fmt.Sprintf(
1054                                         "SP with %s (nice %s): waiting for payload",
1055                                         state.Node.Name, NicenessFmt(state.Nice),
1056                                 )
1057                         }
1058                         state.Ctx.LogD("sp-recv-wait", les, logMsg)
1059                         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
1060                         payload, err := state.ReadSP(conn)
1061                         if err != nil {
1062                                 if err == io.EOF {
1063                                         break
1064                                 }
1065                                 unmarshalErr := err.(*xdr.UnmarshalError)
1066                                 if os.IsTimeout(unmarshalErr.Err) {
1067                                         continue
1068                                 }
1069                                 if unmarshalErr.ErrorCode == xdr.ErrIO {
1070                                         break
1071                                 }
1072                                 state.Ctx.LogE("sp-recv-wait", les, err, logMsg)
1073                                 break
1074                         }
1075                         logMsg = func(les LEs) string {
1076                                 return fmt.Sprintf(
1077                                         "SP with %s (nice %s): payload (%s)",
1078                                         state.Node.Name, NicenessFmt(state.Nice),
1079                                         humanize.IBytes(uint64(len(payload))),
1080                                 )
1081                         }
1082                         state.Ctx.LogD(
1083                                 "sp-recv-got",
1084                                 append(les, LE{"Size", int64(len(payload))}),
1085                                 func(les LEs) string { return logMsg(les) + ": got" },
1086                         )
1087                         payload, err = state.csTheir.Decrypt(nil, nil, payload)
1088                         if err != nil {
1089                                 state.Ctx.LogE("sp-recv-got", les, err, func(les LEs) string {
1090                                         return logMsg(les) + ": got"
1091                                 })
1092                                 break
1093                         }
1094                         state.Ctx.LogD(
1095                                 "sp-recv-process",
1096                                 append(les, LE{"Size", int64(len(payload))}),
1097                                 func(les LEs) string {
1098                                         return logMsg(les) + ": processing"
1099                                 },
1100                         )
1101                         replies, err := state.ProcessSP(payload)
1102                         if err != nil {
1103                                 state.Ctx.LogE("sp-recv-process", les, err, func(les LEs) string {
1104                                         return logMsg(les) + ": processing"
1105                                 })
1106                                 break
1107                         }
1108                         state.wg.Add(1)
1109                         go func() {
1110                                 for _, reply := range replies {
1111                                         state.Ctx.LogD(
1112                                                 "sp-recv-reply",
1113                                                 append(les[:len(les)-1], LE{"Size", int64(len(reply))}),
1114                                                 func(les LEs) string {
1115                                                         return fmt.Sprintf(
1116                                                                 "SP with %s (nice %s): queuing reply (%s)",
1117                                                                 state.Node.Name, NicenessFmt(state.Nice),
1118                                                                 humanize.IBytes(uint64(len(reply))),
1119                                                         )
1120                                                 },
1121                                         )
1122                                         state.payloads <- reply
1123                                 }
1124                                 state.wg.Done()
1125                         }()
1126                         if state.rxRate > 0 {
1127                                 time.Sleep(time.Second / time.Duration(state.rxRate))
1128                         }
1129                 }
1130                 state.SetDead()
1131                 state.wg.Done()
1132                 state.SetDead()
1133                 conn.Close() // #nosec G104
1134         }()
1135
1136         return nil
1137 }
1138
1139 func (state *SPState) Wait() {
1140         state.wg.Wait()
1141         close(state.payloads)
1142         close(state.pings)
1143         state.dirUnlock()
1144         state.Duration = time.Now().Sub(state.started)
1145         state.RxSpeed = state.RxBytes
1146         state.TxSpeed = state.TxBytes
1147         rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
1148         txDuration := int64(state.TxLastSeen.Sub(state.started).Seconds())
1149         if rxDuration > 0 {
1150                 state.RxSpeed = state.RxBytes / rxDuration
1151         }
1152         if txDuration > 0 {
1153                 state.TxSpeed = state.TxBytes / txDuration
1154         }
1155         for pktName := range state.progressBars {
1156                 ProgressKill(pktName)
1157         }
1158 }
1159
1160 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
1161         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
1162         r := bytes.NewReader(payload)
1163         var err error
1164         var replies [][]byte
1165         var infosGot bool
1166         for r.Len() > 0 {
1167                 state.Ctx.LogD("sp-process-unmarshal", les, func(les LEs) string {
1168                         return fmt.Sprintf(
1169                                 "SP with %s (nice %s): unmarshaling header",
1170                                 state.Node.Name, NicenessFmt(state.Nice),
1171                         )
1172                 })
1173                 var head SPHead
1174                 if _, err = xdr.Unmarshal(r, &head); err != nil {
1175                         state.Ctx.LogE("sp-process-unmarshal", les, err, func(les LEs) string {
1176                                 return fmt.Sprintf(
1177                                         "SP with %s (nice %s): unmarshaling header",
1178                                         state.Node.Name, NicenessFmt(state.Nice),
1179                                 )
1180                         })
1181                         return nil, err
1182                 }
1183                 if head.Type != SPTypePing {
1184                         state.RxLastNonPing = state.RxLastSeen
1185                 }
1186                 switch head.Type {
1187                 case SPTypeHalt:
1188                         state.Ctx.LogD(
1189                                 "sp-process-halt",
1190                                 append(les, LE{"Type", "halt"}), func(les LEs) string {
1191                                         return fmt.Sprintf(
1192                                                 "SP with %s (nice %s): got HALT",
1193                                                 state.Node.Name, NicenessFmt(state.Nice),
1194                                         )
1195                                 },
1196                         )
1197                         state.Lock()
1198                         state.queueTheir = nil
1199                         state.Unlock()
1200
1201                 case SPTypePing:
1202                         state.Ctx.LogD(
1203                                 "sp-process-ping",
1204                                 append(les, LE{"Type", "ping"}),
1205                                 func(les LEs) string {
1206                                         return fmt.Sprintf(
1207                                                 "SP with %s (nice %s): got PING",
1208                                                 state.Node.Name, NicenessFmt(state.Nice),
1209                                         )
1210                                 },
1211                         )
1212
1213                 case SPTypeInfo:
1214                         infosGot = true
1215                         lesp := append(les, LE{"Type", "info"})
1216                         state.Ctx.LogD(
1217                                 "sp-process-info-unmarshal", lesp,
1218                                 func(les LEs) string {
1219                                         return fmt.Sprintf(
1220                                                 "SP with %s (nice %s): unmarshaling INFO",
1221                                                 state.Node.Name, NicenessFmt(state.Nice),
1222                                         )
1223                                 },
1224                         )
1225                         var info SPInfo
1226                         if _, err = xdr.Unmarshal(r, &info); err != nil {
1227                                 state.Ctx.LogE(
1228                                         "sp-process-info-unmarshal", lesp, err,
1229                                         func(les LEs) string {
1230                                                 return fmt.Sprintf(
1231                                                         "SP with %s (nice %s): unmarshaling INFO",
1232                                                         state.Node.Name, NicenessFmt(state.Nice),
1233                                                 )
1234                                         },
1235                                 )
1236                                 return nil, err
1237                         }
1238                         pktName := Base32Codec.EncodeToString(info.Hash[:])
1239                         lesp = append(
1240                                 lesp,
1241                                 LE{"Pkt", pktName},
1242                                 LE{"Size", int64(info.Size)},
1243                                 LE{"PktNice", int(info.Nice)},
1244                         )
1245                         logMsg := func(les LEs) string {
1246                                 return fmt.Sprintf(
1247                                         "SP with %s (nice %s): INFO %s (%s) nice %s",
1248                                         state.Node.Name, NicenessFmt(state.Nice),
1249                                         pktName,
1250                                         humanize.IBytes(info.Size),
1251                                         NicenessFmt(info.Nice),
1252                                 )
1253                         }
1254                         if !state.listOnly && info.Nice > state.Nice {
1255                                 state.Ctx.LogD("sp-process-info-too-nice", lesp, func(les LEs) string {
1256                                         return logMsg(les) + ": too nice"
1257                                 })
1258                                 continue
1259                         }
1260                         state.Ctx.LogD("sp-process-info-got", lesp, func(les LEs) string {
1261                                 return logMsg(les) + ": received"
1262                         })
1263                         if !state.listOnly && state.xxOnly == TTx {
1264                                 continue
1265                         }
1266                         state.Lock()
1267                         state.infosTheir[*info.Hash] = &info
1268                         state.Unlock()
1269                         state.Ctx.LogD("sp-process-info-stat", lesp, func(les LEs) string {
1270                                 return logMsg(les) + ": stating part"
1271                         })
1272                         pktPath := filepath.Join(
1273                                 state.Ctx.Spool,
1274                                 state.Node.Id.String(),
1275                                 string(TRx),
1276                                 Base32Codec.EncodeToString(info.Hash[:]),
1277                         )
1278                         logMsg = func(les LEs) string {
1279                                 return fmt.Sprintf(
1280                                         "Packet %s (%s) (nice %s)",
1281                                         pktName,
1282                                         humanize.IBytes(info.Size),
1283                                         NicenessFmt(info.Nice),
1284                                 )
1285                         }
1286                         if _, err = os.Stat(pktPath); err == nil {
1287                                 state.Ctx.LogI("sp-info-done", lesp, func(les LEs) string {
1288                                         return logMsg(les) + ": already done"
1289                                 })
1290                                 if !state.listOnly {
1291                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
1292                                 }
1293                                 continue
1294                         }
1295                         if _, err = os.Stat(pktPath + SeenSuffix); err == nil {
1296                                 state.Ctx.LogI("sp-info-seen", lesp, func(les LEs) string {
1297                                         return logMsg(les) + ": already seen"
1298                                 })
1299                                 if !state.listOnly {
1300                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
1301                                 }
1302                                 continue
1303                         }
1304                         if _, err = os.Stat(pktPath + NoCKSuffix); err == nil {
1305                                 state.Ctx.LogI("sp-info-nock", lesp, func(les LEs) string {
1306                                         return logMsg(les) + ": still not checksummed"
1307                                 })
1308                                 continue
1309                         }
1310                         fi, err := os.Stat(pktPath + PartSuffix)
1311                         var offset int64
1312                         if err == nil {
1313                                 offset = fi.Size()
1314                         }
1315                         if !state.Ctx.IsEnoughSpace(int64(info.Size) - offset) {
1316                                 state.Ctx.LogI("sp-info-no-space", lesp, func(les LEs) string {
1317                                         return logMsg(les) + ": not enough space"
1318                                 })
1319                                 continue
1320                         }
1321                         state.Ctx.LogI(
1322                                 "sp-info",
1323                                 append(lesp, LE{"Offset", offset}),
1324                                 func(les LEs) string {
1325                                         return fmt.Sprintf(
1326                                                 "%s: %d%%", logMsg(les), 100*uint64(offset)/info.Size,
1327                                         )
1328                                 },
1329                         )
1330                         if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) {
1331                                 replies = append(replies, MarshalSP(
1332                                         SPTypeFreq,
1333                                         SPFreq{info.Hash, uint64(offset)},
1334                                 ))
1335                         }
1336
1337                 case SPTypeFile:
1338                         lesp := append(les, LE{"Type", "file"})
1339                         state.Ctx.LogD("sp-process-file", lesp, func(les LEs) string {
1340                                 return fmt.Sprintf(
1341                                         "SP with %s (nice %s): unmarshaling FILE",
1342                                         state.Node.Name, NicenessFmt(state.Nice),
1343                                 )
1344                         })
1345                         var file SPFile
1346                         if _, err = xdr.Unmarshal(r, &file); err != nil {
1347                                 state.Ctx.LogE("sp-process-file", lesp, err, func(les LEs) string {
1348                                         return fmt.Sprintf(
1349                                                 "SP with %s (nice %s): unmarshaling FILE",
1350                                                 state.Node.Name, NicenessFmt(state.Nice),
1351                                         )
1352                                 })
1353                                 return nil, err
1354                         }
1355                         pktName := Base32Codec.EncodeToString(file.Hash[:])
1356                         lesp = append(
1357                                 lesp,
1358                                 LE{"XX", string(TRx)},
1359                                 LE{"Pkt", pktName},
1360                                 LE{"Size", int64(len(file.Payload))},
1361                         )
1362                         logMsg := func(les LEs) string {
1363                                 return fmt.Sprintf(
1364                                         "Got packet %s (%s)",
1365                                         pktName, humanize.IBytes(uint64(len(file.Payload))),
1366                                 )
1367                         }
1368                         dirToSync := filepath.Join(
1369                                 state.Ctx.Spool,
1370                                 state.Node.Id.String(),
1371                                 string(TRx),
1372                         )
1373                         filePath := filepath.Join(dirToSync, pktName)
1374                         filePathPart := filePath + PartSuffix
1375                         state.Ctx.LogD("sp-file-open", lesp, func(les LEs) string {
1376                                 return logMsg(les) + ": opening part"
1377                         })
1378                         state.fdsLock.RLock()
1379                         fdAndFullSize, exists := state.fds[filePathPart]
1380                         state.fdsLock.RUnlock()
1381                         var fd *os.File
1382                         if exists {
1383                                 fd = fdAndFullSize.fd
1384                         } else {
1385                                 fd, err = os.OpenFile(
1386                                         filePathPart,
1387                                         os.O_RDWR|os.O_CREATE,
1388                                         os.FileMode(0666),
1389                                 )
1390                                 if err != nil {
1391                                         state.Ctx.LogE("sp-file-open", lesp, err, func(les LEs) string {
1392                                                 return logMsg(les) + ": opening part"
1393                                         })
1394                                         return nil, err
1395                                 }
1396                                 state.fdsLock.Lock()
1397                                 state.fds[filePathPart] = FdAndFullSize{fd: fd}
1398                                 state.fdsLock.Unlock()
1399                                 if file.Offset == 0 {
1400                                         h, err := blake2b.New256(nil)
1401                                         if err != nil {
1402                                                 panic(err)
1403                                         }
1404                                         state.fileHashers[filePath] = &HasherAndOffset{h: h}
1405                                 }
1406                         }
1407                         state.Ctx.LogD(
1408                                 "sp-file-seek",
1409                                 append(lesp, LE{"Offset", file.Offset}),
1410                                 func(les LEs) string {
1411                                         return fmt.Sprintf("%s: seeking %d", logMsg(les), file.Offset)
1412                                 })
1413                         if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
1414                                 state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
1415                                         return logMsg(les) + ": seeking"
1416                                 })
1417                                 state.closeFd(filePathPart)
1418                                 return nil, err
1419                         }
1420                         state.Ctx.LogD("sp-file-write", lesp, func(les LEs) string {
1421                                 return logMsg(les) + ": writing"
1422                         })
1423                         if _, err = fd.Write(file.Payload); err != nil {
1424                                 state.Ctx.LogE("sp-file-write", lesp, err, func(les LEs) string {
1425                                         return logMsg(les) + ": writing"
1426                                 })
1427                                 state.closeFd(filePathPart)
1428                                 return nil, err
1429                         }
1430                         hasherAndOffset, hasherExists := state.fileHashers[filePath]
1431                         if hasherExists {
1432                                 if hasherAndOffset.offset == file.Offset {
1433                                         if _, err = hasherAndOffset.h.Write(file.Payload); err != nil {
1434                                                 panic(err)
1435                                         }
1436                                         hasherAndOffset.offset += uint64(len(file.Payload))
1437                                 } else {
1438                                         state.Ctx.LogD(
1439                                                 "sp-file-offset-differs", lesp,
1440                                                 func(les LEs) string {
1441                                                         return logMsg(les) + ": offset differs, deleting hasher"
1442                                                 },
1443                                         )
1444                                         delete(state.fileHashers, filePath)
1445                                         hasherExists = false
1446                                 }
1447                         }
1448                         ourSize := int64(file.Offset + uint64(len(file.Payload)))
1449                         lesp[len(lesp)-1].V = ourSize
1450                         fullsize := int64(0)
1451                         state.RLock()
1452                         infoTheir, ok := state.infosTheir[*file.Hash]
1453                         state.RUnlock()
1454                         if ok {
1455                                 fullsize = int64(infoTheir.Size)
1456                         }
1457                         lesp = append(lesp, LE{"FullSize", fullsize})
1458                         if state.Ctx.ShowPrgrs {
1459                                 state.progressBars[pktName] = struct{}{}
1460                                 Progress("Rx", lesp)
1461                         }
1462                         if fullsize != ourSize {
1463                                 continue
1464                         }
1465                         if state.Ctx.ShowPrgrs {
1466                                 delete(state.progressBars, pktName)
1467                         }
1468                         logMsg = func(les LEs) string {
1469                                 return fmt.Sprintf(
1470                                         "Got packet %s %d%% (%s / %s)",
1471                                         pktName, 100*ourSize/fullsize,
1472                                         humanize.IBytes(uint64(ourSize)),
1473                                         humanize.IBytes(uint64(fullsize)),
1474                                 )
1475                         }
1476                         err = fd.Sync()
1477                         if err != nil {
1478                                 state.Ctx.LogE("sp-file-sync", lesp, err, func(les LEs) string {
1479                                         return logMsg(les) + ": syncing"
1480                                 })
1481                                 state.closeFd(filePathPart)
1482                                 continue
1483                         }
1484                         if hasherExists {
1485                                 if bytes.Compare(hasherAndOffset.h.Sum(nil), file.Hash[:]) != 0 {
1486                                         state.Ctx.LogE(
1487                                                 "sp-file-bad-checksum", lesp,
1488                                                 errors.New("checksum mismatch"),
1489                                                 logMsg,
1490                                         )
1491                                         continue
1492                                 }
1493                                 if err = os.Rename(filePathPart, filePath); err != nil {
1494                                         state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string {
1495                                                 return logMsg(les) + ": renaming"
1496                                         })
1497                                         continue
1498                                 }
1499                                 if err = DirSync(dirToSync); err != nil {
1500                                         state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string {
1501                                                 return logMsg(les) + ": dirsyncing"
1502                                         })
1503                                         continue
1504                                 }
1505                                 state.Ctx.LogI("sp-file-done", lesp, func(les LEs) string {
1506                                         return logMsg(les) + ": done"
1507                                 })
1508                                 state.wg.Add(1)
1509                                 go func() {
1510                                         state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
1511                                         state.wg.Done()
1512                                 }()
1513                                 state.Lock()
1514                                 delete(state.infosTheir, *file.Hash)
1515                                 state.Unlock()
1516                                 if !state.Ctx.HdrUsage {
1517                                         state.closeFd(filePathPart)
1518                                         continue
1519                                 }
1520                                 if _, err = fd.Seek(0, io.SeekStart); err != nil {
1521                                         state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
1522                                                 return logMsg(les) + ": seeking"
1523                                         })
1524                                         state.closeFd(filePathPart)
1525                                         continue
1526                                 }
1527                                 _, pktEncRaw, err := state.Ctx.HdrRead(fd)
1528                                 state.closeFd(filePathPart)
1529                                 if err != nil {
1530                                         state.Ctx.LogE("sp-file-hdr-read", lesp, err, func(les LEs) string {
1531                                                 return logMsg(les) + ": HdrReading"
1532                                         })
1533                                         continue
1534                                 }
1535                                 state.Ctx.HdrWrite(pktEncRaw, filePath)
1536                                 continue
1537                         }
1538                         state.closeFd(filePathPart)
1539                         if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil {
1540                                 state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string {
1541                                         return logMsg(les) + ": renaming"
1542                                 })
1543                                 continue
1544                         }
1545                         if err = DirSync(dirToSync); err != nil {
1546                                 state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string {
1547                                         return logMsg(les) + ": dirsyncing"
1548                                 })
1549                                 continue
1550                         }
1551                         state.Ctx.LogI("sp-file-downloaded", lesp, func(les LEs) string {
1552                                 return logMsg(les) + ": downloaded"
1553                         })
1554                         state.Lock()
1555                         delete(state.infosTheir, *file.Hash)
1556                         state.Unlock()
1557                         if !state.NoCK {
1558                                 state.checkerQueues.appeared <- file.Hash
1559                         }
1560
1561                 case SPTypeDone:
1562                         lesp := append(les, LE{"Type", "done"})
1563                         state.Ctx.LogD("sp-process-done-unmarshal", lesp, func(les LEs) string {
1564                                 return fmt.Sprintf(
1565                                         "SP with %s (nice %s): unmarshaling DONE",
1566                                         state.Node.Name, NicenessFmt(state.Nice),
1567                                 )
1568                         })
1569                         var done SPDone
1570                         if _, err = xdr.Unmarshal(r, &done); err != nil {
1571                                 state.Ctx.LogE("sp-process-done-unmarshal", lesp, err, func(les LEs) string {
1572                                         return fmt.Sprintf(
1573                                                 "SP with %s (nice %s): unmarshaling DONE",
1574                                                 state.Node.Name, NicenessFmt(state.Nice),
1575                                         )
1576                                 })
1577                                 return nil, err
1578                         }
1579                         pktName := Base32Codec.EncodeToString(done.Hash[:])
1580                         lesp = append(lesp, LE{"Pkt", pktName}, LE{"XX", string(TTx)})
1581                         logMsg := func(les LEs) string {
1582                                 return fmt.Sprintf(
1583                                         "SP with %s (nice %s): DONE: removing %s",
1584                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1585                                 )
1586                         }
1587                         state.Ctx.LogD("sp-done", lesp, logMsg)
1588                         pth := filepath.Join(
1589                                 state.Ctx.Spool,
1590                                 state.Node.Id.String(),
1591                                 string(TTx),
1592                                 pktName,
1593                         )
1594                         if err = os.Remove(pth); err == nil {
1595                                 state.Ctx.LogI("sp-done", lesp, func(les LEs) string {
1596                                         return fmt.Sprintf("Packet %s is sent", pktName)
1597                                 })
1598                                 if state.Ctx.HdrUsage {
1599                                         os.Remove(pth + HdrSuffix)
1600                                 }
1601                         } else {
1602                                 state.Ctx.LogE("sp-done", lesp, err, logMsg)
1603                         }
1604
1605                 case SPTypeFreq:
1606                         lesp := append(les, LE{"Type", "freq"})
1607                         state.Ctx.LogD("sp-process-freq", lesp, func(les LEs) string {
1608                                 return fmt.Sprintf(
1609                                         "SP with %s (nice %s): unmarshaling FREQ",
1610                                         state.Node.Name, NicenessFmt(state.Nice),
1611                                 )
1612                         })
1613                         var freq SPFreq
1614                         if _, err = xdr.Unmarshal(r, &freq); err != nil {
1615                                 state.Ctx.LogE("sp-process-freq", lesp, err, func(les LEs) string {
1616                                         return fmt.Sprintf(
1617                                                 "SP with %s (nice %s): unmarshaling FREQ",
1618                                                 state.Node.Name, NicenessFmt(state.Nice),
1619                                         )
1620                                 })
1621                                 return nil, err
1622                         }
1623                         pktName := Base32Codec.EncodeToString(freq.Hash[:])
1624                         lesp = append(lesp, LE{"Pkt", pktName}, LE{"Offset", freq.Offset})
1625                         state.Ctx.LogD("sp-process-freq-queueing", lesp, func(les LEs) string {
1626                                 return fmt.Sprintf(
1627                                         "SP with %s (nice %s): FREQ %s: queuing",
1628                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1629                                 )
1630                         })
1631                         nice, exists := state.infosOurSeen[*freq.Hash]
1632                         if exists {
1633                                 if state.onlyPkts == nil || !state.onlyPkts[*freq.Hash] {
1634                                         state.Lock()
1635                                         insertIdx := 0
1636                                         var freqWithNice *FreqWithNice
1637                                         for insertIdx, freqWithNice = range state.queueTheir {
1638                                                 if freqWithNice.nice > nice {
1639                                                         break
1640                                                 }
1641                                         }
1642                                         state.queueTheir = append(state.queueTheir, nil)
1643                                         copy(state.queueTheir[insertIdx+1:], state.queueTheir[insertIdx:])
1644                                         state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice}
1645                                         state.Unlock()
1646                                 } else {
1647                                         state.Ctx.LogD("sp-process-freq-skip", lesp, func(les LEs) string {
1648                                                 return fmt.Sprintf(
1649                                                         "SP with %s (nice %s): FREQ %s: skipping",
1650                                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1651                                                 )
1652                                         })
1653                                 }
1654                         } else {
1655                                 state.Ctx.LogD("sp-process-freq-unknown", lesp, func(les LEs) string {
1656                                         return fmt.Sprintf(
1657                                                 "SP with %s (nice %s): FREQ %s: unknown",
1658                                                 state.Node.Name, NicenessFmt(state.Nice), pktName,
1659                                         )
1660                                 })
1661                         }
1662
1663                 default:
1664                         state.Ctx.LogE(
1665                                 "sp-process-type-unknown",
1666                                 append(les, LE{"Type", head.Type}),
1667                                 errors.New("unknown type"),
1668                                 func(les LEs) string {
1669                                         return fmt.Sprintf(
1670                                                 "SP with %s (nice %s): %d",
1671                                                 state.Node.Name, NicenessFmt(state.Nice), head.Type,
1672                                         )
1673                                 },
1674                         )
1675                         return nil, BadPktType
1676                 }
1677         }
1678
1679         if infosGot {
1680                 var pkts int
1681                 var size uint64
1682                 state.RLock()
1683                 for _, info := range state.infosTheir {
1684                         pkts++
1685                         size += info.Size
1686                 }
1687                 state.RUnlock()
1688                 state.Ctx.LogI("sp-infos-rx", LEs{
1689                         {"XX", string(TRx)},
1690                         {"Node", state.Node.Id},
1691                         {"Pkts", pkts},
1692                         {"Size", int64(size)},
1693                 }, func(les LEs) string {
1694                         return fmt.Sprintf(
1695                                 "%s has got for us: %d packets, %s",
1696                                 state.Node.Name, pkts, humanize.IBytes(size),
1697                         )
1698                 })
1699         }
1700         return payloadsSplit(replies), nil
1701 }