]> Cypherpunks.ru repositories - nncp.git/blob - src/sp.go
Fixed possible data race
[nncp.git] / src / sp.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bytes"
22         "crypto/subtle"
23         "errors"
24         "fmt"
25         "hash"
26         "io"
27         "os"
28         "path/filepath"
29         "sort"
30         "sync"
31         "time"
32
33         xdr "github.com/davecgh/go-xdr/xdr2"
34         "github.com/dustin/go-humanize"
35         "github.com/flynn/noise"
36         "golang.org/x/crypto/blake2b"
37 )
38
39 const (
40         MaxSPSize      = 1<<16 - 256
41         PartSuffix     = ".part"
42         SPHeadOverhead = 4
43 )
44
45 type SPCheckerQueues struct {
46         appeared chan *[32]byte
47         checked  chan *[32]byte
48 }
49
50 var (
51         MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1}
52
53         SPInfoOverhead    int
54         SPFreqOverhead    int
55         SPFileOverhead    int
56         SPHaltMarshalized []byte
57         SPPingMarshalized []byte
58
59         NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite(
60                 noise.DH25519,
61                 noise.CipherChaChaPoly,
62                 noise.HashBLAKE2b,
63         )
64
65         DefaultDeadline = 10 * time.Second
66         PingTimeout     = time.Minute
67
68         spCheckers = make(map[NodeId]*SPCheckerQueues)
69 )
70
71 type FdAndFullSize struct {
72         fd       *os.File
73         fullSize int64
74 }
75
76 type HasherAndOffset struct {
77         h      hash.Hash
78         offset uint64
79 }
80
81 type SPType uint8
82
83 const (
84         SPTypeInfo SPType = iota
85         SPTypeFreq SPType = iota
86         SPTypeFile SPType = iota
87         SPTypeDone SPType = iota
88         SPTypeHalt SPType = iota
89         SPTypePing SPType = iota
90 )
91
92 type SPHead struct {
93         Type SPType
94 }
95
96 type SPInfo struct {
97         Nice uint8
98         Size uint64
99         Hash *[32]byte
100 }
101
102 type SPFreq struct {
103         Hash   *[32]byte
104         Offset uint64
105 }
106
107 type SPFile struct {
108         Hash    *[32]byte
109         Offset  uint64
110         Payload []byte
111 }
112
113 type SPDone struct {
114         Hash *[32]byte
115 }
116
117 type SPRaw struct {
118         Magic   [8]byte
119         Payload []byte
120 }
121
122 type FreqWithNice struct {
123         freq *SPFreq
124         nice uint8
125 }
126
127 type ConnDeadlined interface {
128         io.ReadWriteCloser
129         SetReadDeadline(t time.Time) error
130         SetWriteDeadline(t time.Time) error
131 }
132
133 func init() {
134         var buf bytes.Buffer
135         spHead := SPHead{Type: SPTypeHalt}
136         if _, err := xdr.Marshal(&buf, spHead); err != nil {
137                 panic(err)
138         }
139         SPHaltMarshalized = make([]byte, SPHeadOverhead)
140         copy(SPHaltMarshalized, buf.Bytes())
141         buf.Reset()
142
143         spHead = SPHead{Type: SPTypePing}
144         if _, err := xdr.Marshal(&buf, spHead); err != nil {
145                 panic(err)
146         }
147         SPPingMarshalized = make([]byte, SPHeadOverhead)
148         copy(SPPingMarshalized, buf.Bytes())
149         buf.Reset()
150
151         spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([32]byte)}
152         if _, err := xdr.Marshal(&buf, spInfo); err != nil {
153                 panic(err)
154         }
155         SPInfoOverhead = buf.Len()
156         buf.Reset()
157
158         spFreq := SPFreq{Hash: new([32]byte), Offset: 123}
159         if _, err := xdr.Marshal(&buf, spFreq); err != nil {
160                 panic(err)
161         }
162         SPFreqOverhead = buf.Len()
163         buf.Reset()
164
165         spFile := SPFile{Hash: new([32]byte), Offset: 123}
166         if _, err := xdr.Marshal(&buf, spFile); err != nil {
167                 panic(err)
168         }
169         SPFileOverhead = buf.Len()
170 }
171
172 func MarshalSP(typ SPType, sp interface{}) []byte {
173         var buf bytes.Buffer
174         if _, err := xdr.Marshal(&buf, SPHead{typ}); err != nil {
175                 panic(err)
176         }
177         if _, err := xdr.Marshal(&buf, sp); err != nil {
178                 panic(err)
179         }
180         return buf.Bytes()
181 }
182
183 func payloadsSplit(payloads [][]byte) [][]byte {
184         var outbounds [][]byte
185         outbound := make([]byte, 0, MaxSPSize)
186         for i, payload := range payloads {
187                 outbound = append(outbound, payload...)
188                 if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxSPSize {
189                         outbounds = append(outbounds, outbound)
190                         outbound = make([]byte, 0, MaxSPSize)
191                 }
192         }
193         if len(outbound) > 0 {
194                 outbounds = append(outbounds, outbound)
195         }
196         return outbounds
197 }
198
199 type SPState struct {
200         Ctx            *Ctx
201         Node           *Node
202         Nice           uint8
203         NoCK           bool
204         onlineDeadline time.Duration
205         maxOnlineTime  time.Duration
206         hs             *noise.HandshakeState
207         csOur          *noise.CipherState
208         csTheir        *noise.CipherState
209         payloads       chan []byte
210         pings          chan struct{}
211         infosTheir     map[[32]byte]*SPInfo
212         infosOurSeen   map[[32]byte]uint8
213         queueTheir     []*FreqWithNice
214         wg             sync.WaitGroup
215         RxBytes        int64
216         RxLastSeen     time.Time
217         RxLastNonPing  time.Time
218         TxBytes        int64
219         TxLastSeen     time.Time
220         TxLastNonPing  time.Time
221         started        time.Time
222         mustFinishAt   time.Time
223         Duration       time.Duration
224         RxSpeed        int64
225         TxSpeed        int64
226         rxLock         *os.File
227         txLock         *os.File
228         xxOnly         TRxTx
229         rxRate         int
230         txRate         int
231         isDead         chan struct{}
232         listOnly       bool
233         onlyPkts       map[[32]byte]bool
234         writeSPBuf     bytes.Buffer
235         fds            map[string]FdAndFullSize
236         fdsLock        sync.RWMutex
237         fileHashers    map[string]*HasherAndOffset
238         checkerQueues  SPCheckerQueues
239         sync.RWMutex
240 }
241
242 func (state *SPState) SetDead() {
243         state.Lock()
244         defer state.Unlock()
245         select {
246         case <-state.isDead:
247                 // Already closed channel, dead
248                 return
249         default:
250         }
251         close(state.isDead)
252         go func() {
253                 for range state.payloads {
254                 }
255         }()
256         go func() {
257                 for range state.pings {
258                 }
259         }()
260         go func() {
261                 for _, s := range state.fds {
262                         s.fd.Close()
263                 }
264         }()
265 }
266
267 func (state *SPState) NotAlive() bool {
268         select {
269         case <-state.isDead:
270                 return true
271         default:
272         }
273         return false
274 }
275
276 func (state *SPState) dirUnlock() {
277         state.Ctx.UnlockDir(state.rxLock)
278         state.Ctx.UnlockDir(state.txLock)
279 }
280
281 func SPChecker(ctx *Ctx, nodeId *NodeId, appeared, checked chan *[32]byte) {
282         for hshValue := range appeared {
283                 pktName := Base32Codec.EncodeToString(hshValue[:])
284                 les := LEs{
285                         {"XX", string(TRx)},
286                         {"Node", nodeId},
287                         {"Pkt", pktName},
288                 }
289                 ctx.LogD("sp-checker", les, func(les LEs) string {
290                         return fmt.Sprintf("Checksumming %s/rx/%s", ctx.NodeName(nodeId), pktName)
291                 })
292                 size, err := ctx.CheckNoCK(nodeId, hshValue)
293                 les = append(les, LE{"Size", size})
294                 if err != nil {
295                         ctx.LogE("sp-checker", les, err, func(les LEs) string {
296                                 return fmt.Sprintf(
297                                         "Checksumming %s/rx/%s (%s)", ctx.NodeName(nodeId), pktName,
298                                         humanize.IBytes(uint64(size)),
299                                 )
300                         })
301                         continue
302                 }
303                 ctx.LogI("sp-checker-done", les, func(les LEs) string {
304                         return fmt.Sprintf(
305                                 "Packet %s is retreived (%s)",
306                                 pktName, humanize.IBytes(uint64(size)),
307                         )
308                 })
309                 go func(hsh *[32]byte) { checked <- hsh }(hshValue)
310         }
311 }
312
313 func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error {
314         state.writeSPBuf.Reset()
315         n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{
316                 Magic:   MagicNNCPLv1,
317                 Payload: payload,
318         })
319         if err != nil {
320                 return err
321         }
322         if n, err = dst.Write(state.writeSPBuf.Bytes()); err == nil {
323                 state.TxLastSeen = time.Now()
324                 state.TxBytes += int64(n)
325                 if !ping {
326                         state.TxLastNonPing = state.TxLastSeen
327                 }
328         }
329         return err
330 }
331
332 func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
333         var sp SPRaw
334         n, err := xdr.UnmarshalLimited(src, &sp, 1<<17)
335         if err != nil {
336                 ue := err.(*xdr.UnmarshalError)
337                 if ue.Err == io.EOF {
338                         return nil, ue.Err
339                 }
340                 return nil, err
341         }
342         state.RxLastSeen = time.Now()
343         state.RxBytes += int64(n)
344         if sp.Magic != MagicNNCPLv1 {
345                 return nil, BadMagic
346         }
347         return sp.Payload, nil
348 }
349
350 func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [][]byte {
351         var infos []*SPInfo
352         var totalSize int64
353         for job := range ctx.Jobs(nodeId, TTx) {
354                 if job.PktEnc.Nice > nice {
355                         continue
356                 }
357                 if _, known := (*seen)[*job.HshValue]; known {
358                         continue
359                 }
360                 totalSize += job.Size
361                 infos = append(infos, &SPInfo{
362                         Nice: job.PktEnc.Nice,
363                         Size: uint64(job.Size),
364                         Hash: job.HshValue,
365                 })
366                 (*seen)[*job.HshValue] = job.PktEnc.Nice
367         }
368         sort.Sort(ByNice(infos))
369         var payloads [][]byte
370         for _, info := range infos {
371                 payloads = append(payloads, MarshalSP(SPTypeInfo, info))
372                 pktName := Base32Codec.EncodeToString(info.Hash[:])
373                 ctx.LogD("sp-info-our", LEs{
374                         {"Node", nodeId},
375                         {"Name", pktName},
376                         {"Size", info.Size},
377                 }, func(les LEs) string {
378                         return fmt.Sprintf(
379                                 "Our info: %s/tx/%s (%s)",
380                                 ctx.NodeName(nodeId),
381                                 pktName,
382                                 humanize.IBytes(info.Size),
383                         )
384                 })
385         }
386         if totalSize > 0 {
387                 ctx.LogI("sp-infos-tx", LEs{
388                         {"XX", string(TTx)},
389                         {"Node", nodeId},
390                         {"Pkts", len(payloads)},
391                         {"Size", totalSize},
392                 }, func(les LEs) string {
393                         return fmt.Sprintf(
394                                 "We have got for %s: %d packets, %s",
395                                 ctx.NodeName(nodeId),
396                                 len(payloads),
397                                 humanize.IBytes(uint64(totalSize)),
398                         )
399                 })
400         }
401         return payloadsSplit(payloads)
402 }
403
404 func (state *SPState) StartI(conn ConnDeadlined) error {
405         nodeId := state.Node.Id
406         err := state.Ctx.ensureRxDir(nodeId)
407         if err != nil {
408                 return err
409         }
410         var rxLock *os.File
411         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TRx) {
412                 rxLock, err = state.Ctx.LockDir(nodeId, string(TRx))
413                 if err != nil {
414                         return err
415                 }
416         }
417         var txLock *os.File
418         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
419                 txLock, err = state.Ctx.LockDir(nodeId, string(TTx))
420                 if err != nil {
421                         return err
422                 }
423         }
424         started := time.Now()
425         conf := noise.Config{
426                 CipherSuite: NoiseCipherSuite,
427                 Pattern:     noise.HandshakeIK,
428                 Initiator:   true,
429                 StaticKeypair: noise.DHKey{
430                         Private: state.Ctx.Self.NoisePrv[:],
431                         Public:  state.Ctx.Self.NoisePub[:],
432                 },
433                 PeerStatic: state.Node.NoisePub[:],
434         }
435         hs, err := noise.NewHandshakeState(conf)
436         if err != nil {
437                 return err
438         }
439         state.hs = hs
440         state.payloads = make(chan []byte)
441         state.pings = make(chan struct{})
442         state.infosTheir = make(map[[32]byte]*SPInfo)
443         state.infosOurSeen = make(map[[32]byte]uint8)
444         state.started = started
445         state.rxLock = rxLock
446         state.txLock = txLock
447
448         var infosPayloads [][]byte
449         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
450                 infosPayloads = state.Ctx.infosOur(nodeId, state.Nice, &state.infosOurSeen)
451         }
452         var firstPayload []byte
453         if len(infosPayloads) > 0 {
454                 firstPayload = infosPayloads[0]
455         }
456         // Pad first payload, to hide actual number of existing files
457         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
458                 firstPayload = append(firstPayload, SPHaltMarshalized...)
459         }
460
461         var buf []byte
462         var payload []byte
463         buf, _, _, err = state.hs.WriteMessage(nil, firstPayload)
464         if err != nil {
465                 state.dirUnlock()
466                 return err
467         }
468         les := LEs{{"Node", nodeId}, {"Nice", int(state.Nice)}}
469         state.Ctx.LogD("sp-startI", les, func(les LEs) string {
470                 return fmt.Sprintf(
471                         "SP with %s (nice %s): sending first message",
472                         state.Node.Name,
473                         NicenessFmt(state.Nice),
474                 )
475         })
476         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
477         if err = state.WriteSP(conn, buf, false); err != nil {
478                 state.Ctx.LogE("sp-startI", les, err, func(les LEs) string {
479                         return fmt.Sprintf(
480                                 "SP with %s (nice %s): writing",
481                                 state.Node.Name,
482                                 NicenessFmt(state.Nice),
483                         )
484                 })
485                 state.dirUnlock()
486                 return err
487         }
488         state.Ctx.LogD("sp-startI-wait", les, func(les LEs) string {
489                 return fmt.Sprintf(
490                         "SP with %s (nice %s): waiting for first message",
491                         state.Node.Name,
492                         NicenessFmt(state.Nice),
493                 )
494         })
495         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
496         if buf, err = state.ReadSP(conn); err != nil {
497                 state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string {
498                         return fmt.Sprintf(
499                                 "SP with %s (nice %s): reading",
500                                 state.Node.Name,
501                                 NicenessFmt(state.Nice),
502                         )
503                 })
504                 state.dirUnlock()
505                 return err
506         }
507         payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
508         if err != nil {
509                 state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string {
510                         return fmt.Sprintf(
511                                 "SP with %s (nice %s): reading Noise message",
512                                 state.Node.Name,
513                                 NicenessFmt(state.Nice),
514                         )
515                 })
516                 state.dirUnlock()
517                 return err
518         }
519         state.Ctx.LogD("sp-startI-workers", les, func(les LEs) string {
520                 return fmt.Sprintf(
521                         "SP with %s (nice %s): starting workers",
522                         state.Node.Name,
523                         NicenessFmt(state.Nice),
524                 )
525         })
526         err = state.StartWorkers(conn, infosPayloads, payload)
527         if err != nil {
528                 state.Ctx.LogE("sp-startI-workers", les, err, func(les LEs) string {
529                         return fmt.Sprintf(
530                                 "SP with %s (nice %s): starting workers",
531                                 state.Node.Name,
532                                 NicenessFmt(state.Nice),
533                         )
534                 })
535                 state.dirUnlock()
536         }
537         return err
538 }
539
540 func (state *SPState) StartR(conn ConnDeadlined) error {
541         started := time.Now()
542         conf := noise.Config{
543                 CipherSuite: NoiseCipherSuite,
544                 Pattern:     noise.HandshakeIK,
545                 Initiator:   false,
546                 StaticKeypair: noise.DHKey{
547                         Private: state.Ctx.Self.NoisePrv[:],
548                         Public:  state.Ctx.Self.NoisePub[:],
549                 },
550         }
551         hs, err := noise.NewHandshakeState(conf)
552         if err != nil {
553                 return err
554         }
555         xxOnly := TRxTx("")
556         state.hs = hs
557         state.payloads = make(chan []byte)
558         state.pings = make(chan struct{})
559         state.infosOurSeen = make(map[[32]byte]uint8)
560         state.infosTheir = make(map[[32]byte]*SPInfo)
561         state.started = started
562         state.xxOnly = xxOnly
563
564         var buf []byte
565         var payload []byte
566         logMsg := func(les LEs) string {
567                 return fmt.Sprintf(
568                         "SP nice %s: waiting for first message",
569                         NicenessFmt(state.Nice),
570                 )
571         }
572         les := LEs{{"Nice", int(state.Nice)}}
573         state.Ctx.LogD("sp-startR", les, logMsg)
574         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
575         if buf, err = state.ReadSP(conn); err != nil {
576                 state.Ctx.LogE("sp-startR-read", les, err, logMsg)
577                 return err
578         }
579         if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
580                 state.Ctx.LogE("sp-startR-read", les, err, logMsg)
581                 return err
582         }
583
584         var node *Node
585         for _, n := range state.Ctx.Neigh {
586                 if subtle.ConstantTimeCompare(state.hs.PeerStatic(), n.NoisePub[:]) == 1 {
587                         node = n
588                         break
589                 }
590         }
591         if node == nil {
592                 peerId := Base32Codec.EncodeToString(state.hs.PeerStatic())
593                 err = errors.New("unknown peer: " + peerId)
594                 state.Ctx.LogE("sp-startR-unknown", append(les, LE{"Peer", peerId}), err, logMsg)
595                 return err
596         }
597         state.Node = node
598         state.rxRate = node.RxRate
599         state.txRate = node.TxRate
600         state.onlineDeadline = node.OnlineDeadline
601         state.maxOnlineTime = node.MaxOnlineTime
602         les = LEs{{"Node", node.Id}, {"Nice", int(state.Nice)}}
603
604         if err = state.Ctx.ensureRxDir(node.Id); err != nil {
605                 return err
606         }
607         var rxLock *os.File
608         if xxOnly == "" || xxOnly == TRx {
609                 rxLock, err = state.Ctx.LockDir(node.Id, string(TRx))
610                 if err != nil {
611                         return err
612                 }
613         }
614         state.rxLock = rxLock
615         var txLock *os.File
616         if xxOnly == "" || xxOnly == TTx {
617                 txLock, err = state.Ctx.LockDir(node.Id, string(TTx))
618                 if err != nil {
619                         return err
620                 }
621         }
622         state.txLock = txLock
623
624         var infosPayloads [][]byte
625         if xxOnly == "" || xxOnly == TTx {
626                 infosPayloads = state.Ctx.infosOur(node.Id, state.Nice, &state.infosOurSeen)
627         }
628         var firstPayload []byte
629         if len(infosPayloads) > 0 {
630                 firstPayload = infosPayloads[0]
631         }
632         // Pad first payload, to hide actual number of existing files
633         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
634                 firstPayload = append(firstPayload, SPHaltMarshalized...)
635         }
636
637         state.Ctx.LogD("sp-startR-write", les, func(les LEs) string {
638                 return fmt.Sprintf(
639                         "SP with %s (nice %s): sending first message",
640                         node.Name, NicenessFmt(state.Nice),
641                 )
642         })
643         buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload)
644         if err != nil {
645                 state.dirUnlock()
646                 return err
647         }
648         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
649         if err = state.WriteSP(conn, buf, false); err != nil {
650                 state.Ctx.LogE("sp-startR-write", les, err, func(les LEs) string {
651                         return fmt.Sprintf(
652                                 "SP with %s (nice %s): writing",
653                                 node.Name, NicenessFmt(state.Nice),
654                         )
655                 })
656                 state.dirUnlock()
657                 return err
658         }
659         state.Ctx.LogD("sp-startR-workers", les, func(les LEs) string {
660                 return fmt.Sprintf(
661                         "SP with %s (nice %s): starting workers",
662                         node.Name, NicenessFmt(state.Nice),
663                 )
664         })
665         err = state.StartWorkers(conn, infosPayloads, payload)
666         if err != nil {
667                 state.dirUnlock()
668         }
669         return err
670 }
671
672 func (state *SPState) closeFd(pth string) {
673         state.fdsLock.Lock()
674         if s, exists := state.fds[pth]; exists {
675                 delete(state.fds, pth)
676                 s.fd.Close()
677         }
678         state.fdsLock.Unlock()
679 }
680
681 func (state *SPState) StartWorkers(
682         conn ConnDeadlined,
683         infosPayloads [][]byte,
684         payload []byte,
685 ) error {
686         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
687         state.fds = make(map[string]FdAndFullSize)
688         state.fileHashers = make(map[string]*HasherAndOffset)
689         state.isDead = make(chan struct{})
690         if state.maxOnlineTime > 0 {
691                 state.mustFinishAt = state.started.Add(state.maxOnlineTime)
692         }
693
694         // Checker
695         if !state.NoCK {
696                 queues := spCheckers[*state.Node.Id]
697                 if queues == nil {
698                         queues = &SPCheckerQueues{
699                                 appeared: make(chan *[32]byte),
700                                 checked:  make(chan *[32]byte),
701                         }
702                         spCheckers[*state.Node.Id] = queues
703                         go SPChecker(state.Ctx, state.Node.Id, queues.appeared, queues.checked)
704                 }
705                 state.checkerQueues = *queues
706                 go func() {
707                         for job := range state.Ctx.JobsNoCK(state.Node.Id) {
708                                 if job.PktEnc.Nice <= state.Nice {
709                                         state.checkerQueues.appeared <- job.HshValue
710                                 }
711                         }
712                 }()
713                 state.wg.Add(1)
714                 go func() {
715                         defer state.wg.Done()
716                         for {
717                                 select {
718                                 case <-state.isDead:
719                                         return
720                                 case hsh := <-state.checkerQueues.checked:
721                                         state.payloads <- MarshalSP(SPTypeDone, SPDone{hsh})
722                                 }
723                         }
724                 }()
725         }
726
727         // Remaining handshake payload sending
728         if len(infosPayloads) > 1 {
729                 state.wg.Add(1)
730                 go func() {
731                         for _, payload := range infosPayloads[1:] {
732                                 state.Ctx.LogD(
733                                         "sp-queue-remaining",
734                                         append(les, LE{"Size", int64(len(payload))}),
735                                         func(les LEs) string {
736                                                 return fmt.Sprintf(
737                                                         "SP with %s (nice %s): queuing remaining payload (%s)",
738                                                         state.Node.Name, NicenessFmt(state.Nice),
739                                                         humanize.IBytes(uint64(len(payload))),
740                                                 )
741                                         },
742                                 )
743                                 state.payloads <- payload
744                         }
745                         state.wg.Done()
746                 }()
747         }
748
749         // Processing of first payload and queueing its responses
750         logMsg := func(les LEs) string {
751                 return fmt.Sprintf(
752                         "SP with %s (nice %s): processing first payload (%s)",
753                         state.Node.Name, NicenessFmt(state.Nice),
754                         humanize.IBytes(uint64(len(payload))),
755                 )
756         }
757         state.Ctx.LogD("sp-process", append(les, LE{"Size", int64(len(payload))}), logMsg)
758         replies, err := state.ProcessSP(payload)
759         if err != nil {
760                 state.Ctx.LogE("sp-process", les, err, logMsg)
761                 return err
762         }
763         state.wg.Add(1)
764         go func() {
765                 for _, reply := range replies {
766                         state.Ctx.LogD(
767                                 "sp-queue-reply",
768                                 append(les, LE{"Size", int64(len(reply))}),
769                                 func(les LEs) string {
770                                         return fmt.Sprintf(
771                                                 "SP with %s (nice %s): queuing reply (%s)",
772                                                 state.Node.Name, NicenessFmt(state.Nice),
773                                                 humanize.IBytes(uint64(len(payload))),
774                                         )
775                                 },
776                         )
777                         state.payloads <- reply
778                 }
779                 state.wg.Done()
780         }()
781
782         // Periodic jobs
783         state.wg.Add(1)
784         go func() {
785                 deadlineTicker := time.NewTicker(time.Second)
786                 pingTicker := time.NewTicker(PingTimeout)
787                 for {
788                         select {
789                         case <-state.isDead:
790                                 state.wg.Done()
791                                 deadlineTicker.Stop()
792                                 pingTicker.Stop()
793                                 return
794                         case now := <-deadlineTicker.C:
795                                 if (now.Sub(state.RxLastNonPing) >= state.onlineDeadline &&
796                                         now.Sub(state.TxLastNonPing) >= state.onlineDeadline) ||
797                                         (state.maxOnlineTime > 0 && state.mustFinishAt.Before(now)) ||
798                                         (now.Sub(state.RxLastSeen) >= 2*PingTimeout) {
799                                         state.SetDead()
800                                         conn.Close() // #nosec G104
801                                 }
802                         case now := <-pingTicker.C:
803                                 if now.After(state.TxLastSeen.Add(PingTimeout)) {
804                                         state.wg.Add(1)
805                                         go func() {
806                                                 state.pings <- struct{}{}
807                                                 state.wg.Done()
808                                         }()
809                                 }
810                         }
811                 }
812         }()
813
814         // Spool checker and INFOs sender of appearing files
815         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
816                 state.wg.Add(1)
817                 go func() {
818                         ticker := time.NewTicker(time.Second)
819                         for {
820                                 select {
821                                 case <-state.isDead:
822                                         state.wg.Done()
823                                         ticker.Stop()
824                                         return
825                                 case <-ticker.C:
826                                         for _, payload := range state.Ctx.infosOur(
827                                                 state.Node.Id,
828                                                 state.Nice,
829                                                 &state.infosOurSeen,
830                                         ) {
831                                                 state.Ctx.LogD(
832                                                         "sp-queue-info",
833                                                         append(les, LE{"Size", int64(len(payload))}),
834                                                         func(les LEs) string {
835                                                                 return fmt.Sprintf(
836                                                                         "SP with %s (nice %s): queuing new info (%s)",
837                                                                         state.Node.Name, NicenessFmt(state.Nice),
838                                                                         humanize.IBytes(uint64(len(payload))),
839                                                                 )
840                                                         },
841                                                 )
842                                                 state.payloads <- payload
843                                         }
844                                 }
845                         }
846                 }()
847         }
848
849         // Sender
850         state.wg.Add(1)
851         go func() {
852                 defer conn.Close()
853                 defer state.SetDead()
854                 defer state.wg.Done()
855                 for {
856                         if state.NotAlive() {
857                                 return
858                         }
859                         var payload []byte
860                         var ping bool
861                         select {
862                         case <-state.pings:
863                                 state.Ctx.LogD("sp-got-ping", les, func(les LEs) string {
864                                         return fmt.Sprintf(
865                                                 "SP with %s (nice %s): got ping",
866                                                 state.Node.Name, NicenessFmt(state.Nice),
867                                         )
868                                 })
869                                 payload = SPPingMarshalized
870                                 ping = true
871                         case payload = <-state.payloads:
872                                 state.Ctx.LogD(
873                                         "sp-got-payload",
874                                         append(les, LE{"Size", int64(len(payload))}),
875                                         func(les LEs) string {
876                                                 return fmt.Sprintf(
877                                                         "SP with %s (nice %s): got payload (%s)",
878                                                         state.Node.Name, NicenessFmt(state.Nice),
879                                                         humanize.IBytes(uint64(len(payload))),
880                                                 )
881                                         },
882                                 )
883                         default:
884                                 state.RLock()
885                                 if len(state.queueTheir) == 0 {
886                                         state.RUnlock()
887                                         time.Sleep(100 * time.Millisecond)
888                                         continue
889                                 }
890                                 freq := state.queueTheir[0].freq
891                                 state.RUnlock()
892                                 if state.txRate > 0 {
893                                         time.Sleep(time.Second / time.Duration(state.txRate))
894                                 }
895                                 pktName := Base32Codec.EncodeToString(freq.Hash[:])
896                                 lesp := append(
897                                         les,
898                                         LE{"XX", string(TTx)},
899                                         LE{"Pkt", pktName},
900                                         LE{"Size", int64(freq.Offset)},
901                                 )
902                                 logMsg := func(les LEs) string {
903                                         return fmt.Sprintf(
904                                                 "SP with %s (nice %s): tx/%s (%s)",
905                                                 state.Node.Name, NicenessFmt(state.Nice),
906                                                 pktName,
907                                                 humanize.IBytes(freq.Offset),
908                                         )
909                                 }
910                                 state.Ctx.LogD("sp-queue", lesp, func(les LEs) string {
911                                         return logMsg(les) + ": queueing"
912                                 })
913                                 pth := filepath.Join(
914                                         state.Ctx.Spool,
915                                         state.Node.Id.String(),
916                                         string(TTx),
917                                         Base32Codec.EncodeToString(freq.Hash[:]),
918                                 )
919                                 state.fdsLock.RLock()
920                                 fdAndFullSize, exists := state.fds[pth]
921                                 state.fdsLock.RUnlock()
922                                 if !exists {
923                                         fd, err := os.Open(pth)
924                                         if err != nil {
925                                                 state.Ctx.LogE("sp-queue-open", lesp, err, func(les LEs) string {
926                                                         return logMsg(les) + ": opening"
927                                                 })
928                                                 return
929                                         }
930                                         fi, err := fd.Stat()
931                                         if err != nil {
932                                                 state.Ctx.LogE("sp-queue-stat", lesp, err, func(les LEs) string {
933                                                         return logMsg(les) + ": stating"
934                                                 })
935                                                 return
936                                         }
937                                         fdAndFullSize = FdAndFullSize{fd: fd, fullSize: fi.Size()}
938                                         state.fdsLock.Lock()
939                                         state.fds[pth] = fdAndFullSize
940                                         state.fdsLock.Unlock()
941                                 }
942                                 fd := fdAndFullSize.fd
943                                 fullSize := fdAndFullSize.fullSize
944                                 var buf []byte
945                                 if freq.Offset < uint64(fullSize) {
946                                         state.Ctx.LogD("sp-file-seek", lesp, func(les LEs) string {
947                                                 return logMsg(les) + ": seeking"
948                                         })
949                                         if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil {
950                                                 state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
951                                                         return logMsg(les) + ": seeking"
952                                                 })
953                                                 return
954                                         }
955                                         buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
956                                         n, err := fd.Read(buf)
957                                         if err != nil {
958                                                 state.Ctx.LogE("sp-file-read", lesp, err, func(les LEs) string {
959                                                         return logMsg(les) + ": reading"
960                                                 })
961                                                 return
962                                         }
963                                         buf = buf[:n]
964                                         lesp = append(
965                                                 les,
966                                                 LE{"XX", string(TTx)},
967                                                 LE{"Pkt", pktName},
968                                                 LE{"Size", int64(n)},
969                                         )
970                                         state.Ctx.LogD("sp-file-read", lesp, func(les LEs) string {
971                                                 return fmt.Sprintf(
972                                                         "%s: read %s",
973                                                         logMsg(les), humanize.IBytes(uint64(n)),
974                                                 )
975                                         })
976                                 }
977                                 state.closeFd(pth)
978                                 payload = MarshalSP(SPTypeFile, SPFile{
979                                         Hash:    freq.Hash,
980                                         Offset:  freq.Offset,
981                                         Payload: buf,
982                                 })
983                                 ourSize := freq.Offset + uint64(len(buf))
984                                 lesp = append(
985                                         les,
986                                         LE{"XX", string(TTx)},
987                                         LE{"Pkt", pktName},
988                                         LE{"Size", int64(ourSize)},
989                                         LE{"FullSize", fullSize},
990                                 )
991                                 if state.Ctx.ShowPrgrs {
992                                         Progress("Tx", lesp)
993                                 }
994                                 state.Lock()
995                                 if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash {
996                                         if ourSize == uint64(fullSize) {
997                                                 state.Ctx.LogD("sp-file-finished", lesp, func(les LEs) string {
998                                                         return logMsg(les) + ": finished"
999                                                 })
1000                                                 if len(state.queueTheir) > 1 {
1001                                                         state.queueTheir = state.queueTheir[1:]
1002                                                 } else {
1003                                                         state.queueTheir = state.queueTheir[:0]
1004                                                 }
1005                                         } else {
1006                                                 state.queueTheir[0].freq.Offset += uint64(len(buf))
1007                                         }
1008                                 } else {
1009                                         state.Ctx.LogD("sp-file-disappeared", lesp, func(les LEs) string {
1010                                                 return logMsg(les) + ": queue disappeared"
1011                                         })
1012                                 }
1013                                 state.Unlock()
1014                         }
1015                         logMsg := func(les LEs) string {
1016                                 return fmt.Sprintf(
1017                                         "SP with %s (nice %s): sending %s",
1018                                         state.Node.Name, NicenessFmt(state.Nice),
1019                                         humanize.IBytes(uint64(len(payload))),
1020                                 )
1021                         }
1022                         state.Ctx.LogD("sp-sending", append(les, LE{"Size", int64(len(payload))}), logMsg)
1023                         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
1024                         if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload), ping); err != nil {
1025                                 state.Ctx.LogE("sp-sending", les, err, logMsg)
1026                                 return
1027                         }
1028                 }
1029         }()
1030
1031         // Receiver
1032         state.wg.Add(1)
1033         go func() {
1034                 for {
1035                         if state.NotAlive() {
1036                                 break
1037                         }
1038                         logMsg := func(les LEs) string {
1039                                 return fmt.Sprintf(
1040                                         "SP with %s (nice %s): waiting for payload",
1041                                         state.Node.Name, NicenessFmt(state.Nice),
1042                                 )
1043                         }
1044                         state.Ctx.LogD("sp-recv-wait", les, logMsg)
1045                         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
1046                         payload, err := state.ReadSP(conn)
1047                         if err != nil {
1048                                 if err == io.EOF {
1049                                         break
1050                                 }
1051                                 unmarshalErr := err.(*xdr.UnmarshalError)
1052                                 if os.IsTimeout(unmarshalErr.Err) {
1053                                         continue
1054                                 }
1055                                 if unmarshalErr.ErrorCode == xdr.ErrIO {
1056                                         break
1057                                 }
1058                                 state.Ctx.LogE("sp-recv-wait", les, err, logMsg)
1059                                 break
1060                         }
1061                         logMsg = func(les LEs) string {
1062                                 return fmt.Sprintf(
1063                                         "SP with %s (nice %s): payload (%s)",
1064                                         state.Node.Name, NicenessFmt(state.Nice),
1065                                         humanize.IBytes(uint64(len(payload))),
1066                                 )
1067                         }
1068                         state.Ctx.LogD(
1069                                 "sp-recv-got",
1070                                 append(les, LE{"Size", int64(len(payload))}),
1071                                 func(les LEs) string { return logMsg(les) + ": got" },
1072                         )
1073                         payload, err = state.csTheir.Decrypt(nil, nil, payload)
1074                         if err != nil {
1075                                 state.Ctx.LogE("sp-recv-got", les, err, func(les LEs) string {
1076                                         return logMsg(les) + ": got"
1077                                 })
1078                                 break
1079                         }
1080                         state.Ctx.LogD(
1081                                 "sp-recv-process",
1082                                 append(les, LE{"Size", int64(len(payload))}),
1083                                 func(les LEs) string {
1084                                         return logMsg(les) + ": processing"
1085                                 },
1086                         )
1087                         replies, err := state.ProcessSP(payload)
1088                         if err != nil {
1089                                 state.Ctx.LogE("sp-recv-process", les, err, func(les LEs) string {
1090                                         return logMsg(les) + ": processing"
1091                                 })
1092                                 break
1093                         }
1094                         state.wg.Add(1)
1095                         go func() {
1096                                 for _, reply := range replies {
1097                                         state.Ctx.LogD(
1098                                                 "sp-recv-reply",
1099                                                 append(les[:len(les)-1], LE{"Size", int64(len(reply))}),
1100                                                 func(les LEs) string {
1101                                                         return fmt.Sprintf(
1102                                                                 "SP with %s (nice %s): queuing reply (%s)",
1103                                                                 state.Node.Name, NicenessFmt(state.Nice),
1104                                                                 humanize.IBytes(uint64(len(reply))),
1105                                                         )
1106                                                 },
1107                                         )
1108                                         state.payloads <- reply
1109                                 }
1110                                 state.wg.Done()
1111                         }()
1112                         if state.rxRate > 0 {
1113                                 time.Sleep(time.Second / time.Duration(state.rxRate))
1114                         }
1115                 }
1116                 state.SetDead()
1117                 state.wg.Done()
1118                 state.SetDead()
1119                 conn.Close() // #nosec G104
1120         }()
1121
1122         return nil
1123 }
1124
1125 func (state *SPState) Wait() {
1126         state.wg.Wait()
1127         close(state.payloads)
1128         close(state.pings)
1129         state.dirUnlock()
1130         state.Duration = time.Now().Sub(state.started)
1131         state.RxSpeed = state.RxBytes
1132         state.TxSpeed = state.TxBytes
1133         rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
1134         txDuration := int64(state.TxLastSeen.Sub(state.started).Seconds())
1135         if rxDuration > 0 {
1136                 state.RxSpeed = state.RxBytes / rxDuration
1137         }
1138         if txDuration > 0 {
1139                 state.TxSpeed = state.TxBytes / txDuration
1140         }
1141 }
1142
1143 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
1144         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
1145         r := bytes.NewReader(payload)
1146         var err error
1147         var replies [][]byte
1148         var infosGot bool
1149         for r.Len() > 0 {
1150                 state.Ctx.LogD("sp-process-unmarshal", les, func(les LEs) string {
1151                         return fmt.Sprintf(
1152                                 "SP with %s (nice %s): unmarshaling header",
1153                                 state.Node.Name, NicenessFmt(state.Nice),
1154                         )
1155                 })
1156                 var head SPHead
1157                 if _, err = xdr.Unmarshal(r, &head); err != nil {
1158                         state.Ctx.LogE("sp-process-unmarshal", les, err, func(les LEs) string {
1159                                 return fmt.Sprintf(
1160                                         "SP with %s (nice %s): unmarshaling header",
1161                                         state.Node.Name, NicenessFmt(state.Nice),
1162                                 )
1163                         })
1164                         return nil, err
1165                 }
1166                 if head.Type != SPTypePing {
1167                         state.RxLastNonPing = state.RxLastSeen
1168                 }
1169                 switch head.Type {
1170                 case SPTypeHalt:
1171                         state.Ctx.LogD(
1172                                 "sp-process-halt",
1173                                 append(les, LE{"Type", "halt"}), func(les LEs) string {
1174                                         return fmt.Sprintf(
1175                                                 "SP with %s (nice %s): got HALT",
1176                                                 state.Node.Name, NicenessFmt(state.Nice),
1177                                         )
1178                                 },
1179                         )
1180                         state.Lock()
1181                         state.queueTheir = nil
1182                         state.Unlock()
1183
1184                 case SPTypePing:
1185                         state.Ctx.LogD(
1186                                 "sp-process-ping",
1187                                 append(les, LE{"Type", "ping"}),
1188                                 func(les LEs) string {
1189                                         return fmt.Sprintf(
1190                                                 "SP with %s (nice %s): got PING",
1191                                                 state.Node.Name, NicenessFmt(state.Nice),
1192                                         )
1193                                 },
1194                         )
1195
1196                 case SPTypeInfo:
1197                         infosGot = true
1198                         lesp := append(les, LE{"Type", "info"})
1199                         state.Ctx.LogD(
1200                                 "sp-process-info-unmarshal", lesp,
1201                                 func(les LEs) string {
1202                                         return fmt.Sprintf(
1203                                                 "SP with %s (nice %s): unmarshaling INFO",
1204                                                 state.Node.Name, NicenessFmt(state.Nice),
1205                                         )
1206                                 },
1207                         )
1208                         var info SPInfo
1209                         if _, err = xdr.Unmarshal(r, &info); err != nil {
1210                                 state.Ctx.LogE(
1211                                         "sp-process-info-unmarshal", lesp, err,
1212                                         func(les LEs) string {
1213                                                 return fmt.Sprintf(
1214                                                         "SP with %s (nice %s): unmarshaling INFO",
1215                                                         state.Node.Name, NicenessFmt(state.Nice),
1216                                                 )
1217                                         },
1218                                 )
1219                                 return nil, err
1220                         }
1221                         pktName := Base32Codec.EncodeToString(info.Hash[:])
1222                         lesp = append(
1223                                 lesp,
1224                                 LE{"Pkt", pktName},
1225                                 LE{"Size", int64(info.Size)},
1226                                 LE{"PktNice", int(info.Nice)},
1227                         )
1228                         logMsg := func(les LEs) string {
1229                                 return fmt.Sprintf(
1230                                         "SP with %s (nice %s): INFO %s (%s) nice %s",
1231                                         state.Node.Name, NicenessFmt(state.Nice),
1232                                         pktName,
1233                                         humanize.IBytes(info.Size),
1234                                         NicenessFmt(info.Nice),
1235                                 )
1236                         }
1237                         if !state.listOnly && info.Nice > state.Nice {
1238                                 state.Ctx.LogD("sp-process-info-too-nice", lesp, func(les LEs) string {
1239                                         return logMsg(les) + ": too nice"
1240                                 })
1241                                 continue
1242                         }
1243                         state.Ctx.LogD("sp-process-info-got", lesp, func(les LEs) string {
1244                                 return logMsg(les) + ": received"
1245                         })
1246                         if !state.listOnly && state.xxOnly == TTx {
1247                                 continue
1248                         }
1249                         state.Lock()
1250                         state.infosTheir[*info.Hash] = &info
1251                         state.Unlock()
1252                         state.Ctx.LogD("sp-process-info-stat", lesp, func(les LEs) string {
1253                                 return logMsg(les) + ": stating part"
1254                         })
1255                         pktPath := filepath.Join(
1256                                 state.Ctx.Spool,
1257                                 state.Node.Id.String(),
1258                                 string(TRx),
1259                                 Base32Codec.EncodeToString(info.Hash[:]),
1260                         )
1261                         logMsg = func(les LEs) string {
1262                                 return fmt.Sprintf(
1263                                         "Packet %s (%s) (nice %s)",
1264                                         pktName,
1265                                         humanize.IBytes(info.Size),
1266                                         NicenessFmt(info.Nice),
1267                                 )
1268                         }
1269                         if _, err = os.Stat(pktPath); err == nil {
1270                                 state.Ctx.LogI("sp-info-done", lesp, func(les LEs) string {
1271                                         return logMsg(les) + ": already done"
1272                                 })
1273                                 if !state.listOnly {
1274                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
1275                                 }
1276                                 continue
1277                         }
1278                         if _, err = os.Stat(pktPath + SeenSuffix); err == nil {
1279                                 state.Ctx.LogI("sp-info-seen", lesp, func(les LEs) string {
1280                                         return logMsg(les) + ": already seen"
1281                                 })
1282                                 if !state.listOnly {
1283                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
1284                                 }
1285                                 continue
1286                         }
1287                         if _, err = os.Stat(pktPath + NoCKSuffix); err == nil {
1288                                 state.Ctx.LogI("sp-info-nock", lesp, func(les LEs) string {
1289                                         return logMsg(les) + ": still not checksummed"
1290                                 })
1291                                 continue
1292                         }
1293                         fi, err := os.Stat(pktPath + PartSuffix)
1294                         var offset int64
1295                         if err == nil {
1296                                 offset = fi.Size()
1297                         }
1298                         if !state.Ctx.IsEnoughSpace(int64(info.Size) - offset) {
1299                                 state.Ctx.LogI("sp-info-no-space", lesp, func(les LEs) string {
1300                                         return logMsg(les) + ": not enough space"
1301                                 })
1302                                 continue
1303                         }
1304                         state.Ctx.LogI(
1305                                 "sp-info",
1306                                 append(lesp, LE{"Offset", offset}),
1307                                 func(les LEs) string {
1308                                         return fmt.Sprintf(
1309                                                 "%s: %d%%", logMsg(les), 100*uint64(offset)/info.Size,
1310                                         )
1311                                 },
1312                         )
1313                         if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) {
1314                                 replies = append(replies, MarshalSP(
1315                                         SPTypeFreq,
1316                                         SPFreq{info.Hash, uint64(offset)},
1317                                 ))
1318                         }
1319
1320                 case SPTypeFile:
1321                         lesp := append(les, LE{"Type", "file"})
1322                         state.Ctx.LogD("sp-process-file", lesp, func(les LEs) string {
1323                                 return fmt.Sprintf(
1324                                         "SP with %s (nice %s): unmarshaling FILE",
1325                                         state.Node.Name, NicenessFmt(state.Nice),
1326                                 )
1327                         })
1328                         var file SPFile
1329                         if _, err = xdr.Unmarshal(r, &file); err != nil {
1330                                 state.Ctx.LogE("sp-process-file", lesp, err, func(les LEs) string {
1331                                         return fmt.Sprintf(
1332                                                 "SP with %s (nice %s): unmarshaling FILE",
1333                                                 state.Node.Name, NicenessFmt(state.Nice),
1334                                         )
1335                                 })
1336                                 return nil, err
1337                         }
1338                         pktName := Base32Codec.EncodeToString(file.Hash[:])
1339                         lesp = append(
1340                                 lesp,
1341                                 LE{"XX", string(TRx)},
1342                                 LE{"Pkt", pktName},
1343                                 LE{"Size", int64(len(file.Payload))},
1344                         )
1345                         logMsg := func(les LEs) string {
1346                                 return fmt.Sprintf(
1347                                         "Got packet %s (%s)",
1348                                         pktName, humanize.IBytes(uint64(len(file.Payload))),
1349                                 )
1350                         }
1351                         dirToSync := filepath.Join(
1352                                 state.Ctx.Spool,
1353                                 state.Node.Id.String(),
1354                                 string(TRx),
1355                         )
1356                         filePath := filepath.Join(dirToSync, pktName)
1357                         filePathPart := filePath + PartSuffix
1358                         state.Ctx.LogD("sp-file-open", lesp, func(les LEs) string {
1359                                 return logMsg(les) + ": opening part"
1360                         })
1361                         state.fdsLock.RLock()
1362                         fdAndFullSize, exists := state.fds[filePathPart]
1363                         state.fdsLock.RUnlock()
1364                         var fd *os.File
1365                         if exists {
1366                                 fd = fdAndFullSize.fd
1367                         } else {
1368                                 fd, err = os.OpenFile(
1369                                         filePathPart,
1370                                         os.O_RDWR|os.O_CREATE,
1371                                         os.FileMode(0666),
1372                                 )
1373                                 if err != nil {
1374                                         state.Ctx.LogE("sp-file-open", lesp, err, func(les LEs) string {
1375                                                 return logMsg(les) + ": opening part"
1376                                         })
1377                                         return nil, err
1378                                 }
1379                                 state.fdsLock.Lock()
1380                                 state.fds[filePathPart] = FdAndFullSize{fd: fd}
1381                                 state.fdsLock.Unlock()
1382                                 if file.Offset == 0 {
1383                                         h, err := blake2b.New256(nil)
1384                                         if err != nil {
1385                                                 panic(err)
1386                                         }
1387                                         state.fileHashers[filePath] = &HasherAndOffset{h: h}
1388                                 }
1389                         }
1390                         state.Ctx.LogD(
1391                                 "sp-file-seek",
1392                                 append(lesp, LE{"Offset", file.Offset}),
1393                                 func(les LEs) string {
1394                                         return fmt.Sprintf("%s: seeking %d", logMsg(les), file.Offset)
1395                                 })
1396                         if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
1397                                 state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
1398                                         return logMsg(les) + ": seeking"
1399                                 })
1400                                 state.closeFd(filePathPart)
1401                                 return nil, err
1402                         }
1403                         state.Ctx.LogD("sp-file-write", lesp, func(les LEs) string {
1404                                 return logMsg(les) + ": writing"
1405                         })
1406                         if _, err = fd.Write(file.Payload); err != nil {
1407                                 state.Ctx.LogE("sp-file-write", lesp, err, func(les LEs) string {
1408                                         return logMsg(les) + ": writing"
1409                                 })
1410                                 state.closeFd(filePathPart)
1411                                 return nil, err
1412                         }
1413                         hasherAndOffset, hasherExists := state.fileHashers[filePath]
1414                         if hasherExists {
1415                                 if hasherAndOffset.offset == file.Offset {
1416                                         if _, err = hasherAndOffset.h.Write(file.Payload); err != nil {
1417                                                 panic(err)
1418                                         }
1419                                         hasherAndOffset.offset += uint64(len(file.Payload))
1420                                 } else {
1421                                         state.Ctx.LogD(
1422                                                 "sp-file-offset-differs", lesp,
1423                                                 func(les LEs) string {
1424                                                         return logMsg(les) + ": offset differs, deleting hasher"
1425                                                 },
1426                                         )
1427                                         delete(state.fileHashers, filePath)
1428                                         hasherExists = false
1429                                 }
1430                         }
1431                         ourSize := int64(file.Offset + uint64(len(file.Payload)))
1432                         lesp[len(lesp)-1].V = ourSize
1433                         fullsize := int64(0)
1434                         state.RLock()
1435                         infoTheir, ok := state.infosTheir[*file.Hash]
1436                         state.RUnlock()
1437                         if ok {
1438                                 fullsize = int64(infoTheir.Size)
1439                         }
1440                         lesp = append(lesp, LE{"FullSize", fullsize})
1441                         if state.Ctx.ShowPrgrs {
1442                                 Progress("Rx", lesp)
1443                         }
1444                         if fullsize != ourSize {
1445                                 continue
1446                         }
1447                         logMsg = func(les LEs) string {
1448                                 return fmt.Sprintf(
1449                                         "Got packet %s %d%% (%s / %s)",
1450                                         pktName, 100*ourSize/fullsize,
1451                                         humanize.IBytes(uint64(ourSize)),
1452                                         humanize.IBytes(uint64(fullsize)),
1453                                 )
1454                         }
1455                         err = fd.Sync()
1456                         if err != nil {
1457                                 state.Ctx.LogE("sp-file-sync", lesp, err, func(les LEs) string {
1458                                         return logMsg(les) + ": syncing"
1459                                 })
1460                                 state.closeFd(filePathPart)
1461                                 continue
1462                         }
1463                         if hasherExists {
1464                                 if bytes.Compare(hasherAndOffset.h.Sum(nil), file.Hash[:]) != 0 {
1465                                         state.Ctx.LogE(
1466                                                 "sp-file-bad-checksum", lesp,
1467                                                 errors.New("checksum mismatch"),
1468                                                 logMsg,
1469                                         )
1470                                         continue
1471                                 }
1472                                 if err = os.Rename(filePathPart, filePath); err != nil {
1473                                         state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string {
1474                                                 return logMsg(les) + ": renaming"
1475                                         })
1476                                         continue
1477                                 }
1478                                 if err = DirSync(dirToSync); err != nil {
1479                                         state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string {
1480                                                 return logMsg(les) + ": dirsyncing"
1481                                         })
1482                                         continue
1483                                 }
1484                                 state.Ctx.LogI("sp-file-done", lesp, func(les LEs) string {
1485                                         return logMsg(les) + ": done"
1486                                 })
1487                                 state.wg.Add(1)
1488                                 go func() {
1489                                         state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
1490                                         state.wg.Done()
1491                                 }()
1492                                 state.Lock()
1493                                 delete(state.infosTheir, *file.Hash)
1494                                 state.Unlock()
1495                                 if !state.Ctx.HdrUsage {
1496                                         state.closeFd(filePathPart)
1497                                         continue
1498                                 }
1499                                 if _, err = fd.Seek(0, io.SeekStart); err != nil {
1500                                         state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
1501                                                 return logMsg(les) + ": seeking"
1502                                         })
1503                                         state.closeFd(filePathPart)
1504                                         continue
1505                                 }
1506                                 _, pktEncRaw, err := state.Ctx.HdrRead(fd)
1507                                 state.closeFd(filePathPart)
1508                                 if err != nil {
1509                                         state.Ctx.LogE("sp-file-hdr-read", lesp, err, func(les LEs) string {
1510                                                 return logMsg(les) + ": HdrReading"
1511                                         })
1512                                         continue
1513                                 }
1514                                 state.Ctx.HdrWrite(pktEncRaw, filePath)
1515                                 continue
1516                         }
1517                         state.closeFd(filePathPart)
1518                         if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil {
1519                                 state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string {
1520                                         return logMsg(les) + ": renaming"
1521                                 })
1522                                 continue
1523                         }
1524                         if err = DirSync(dirToSync); err != nil {
1525                                 state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string {
1526                                         return logMsg(les) + ": dirsyncing"
1527                                 })
1528                                 continue
1529                         }
1530                         state.Ctx.LogI("sp-file-downloaded", lesp, func(les LEs) string {
1531                                 return logMsg(les) + ": downloaded"
1532                         })
1533                         state.Lock()
1534                         delete(state.infosTheir, *file.Hash)
1535                         state.Unlock()
1536                         if !state.NoCK {
1537                                 state.checkerQueues.appeared <- file.Hash
1538                         }
1539
1540                 case SPTypeDone:
1541                         lesp := append(les, LE{"Type", "done"})
1542                         state.Ctx.LogD("sp-process-done-unmarshal", lesp, func(les LEs) string {
1543                                 return fmt.Sprintf(
1544                                         "SP with %s (nice %s): unmarshaling DONE",
1545                                         state.Node.Name, NicenessFmt(state.Nice),
1546                                 )
1547                         })
1548                         var done SPDone
1549                         if _, err = xdr.Unmarshal(r, &done); err != nil {
1550                                 state.Ctx.LogE("sp-process-done-unmarshal", lesp, err, func(les LEs) string {
1551                                         return fmt.Sprintf(
1552                                                 "SP with %s (nice %s): unmarshaling DONE",
1553                                                 state.Node.Name, NicenessFmt(state.Nice),
1554                                         )
1555                                 })
1556                                 return nil, err
1557                         }
1558                         pktName := Base32Codec.EncodeToString(done.Hash[:])
1559                         lesp = append(lesp, LE{"Pkt", pktName}, LE{"XX", string(TTx)})
1560                         logMsg := func(les LEs) string {
1561                                 return fmt.Sprintf(
1562                                         "SP with %s (nice %s): DONE: removing %s",
1563                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1564                                 )
1565                         }
1566                         state.Ctx.LogD("sp-done", lesp, logMsg)
1567                         pth := filepath.Join(
1568                                 state.Ctx.Spool,
1569                                 state.Node.Id.String(),
1570                                 string(TTx),
1571                                 pktName,
1572                         )
1573                         if err = os.Remove(pth); err == nil {
1574                                 state.Ctx.LogI("sp-done", lesp, func(les LEs) string {
1575                                         return fmt.Sprintf("Packet %s is sent", pktName)
1576                                 })
1577                                 if state.Ctx.HdrUsage {
1578                                         os.Remove(pth + HdrSuffix)
1579                                 }
1580                         } else {
1581                                 state.Ctx.LogE("sp-done", lesp, err, logMsg)
1582                         }
1583
1584                 case SPTypeFreq:
1585                         lesp := append(les, LE{"Type", "freq"})
1586                         state.Ctx.LogD("sp-process-freq", lesp, func(les LEs) string {
1587                                 return fmt.Sprintf(
1588                                         "SP with %s (nice %s): unmarshaling FREQ",
1589                                         state.Node.Name, NicenessFmt(state.Nice),
1590                                 )
1591                         })
1592                         var freq SPFreq
1593                         if _, err = xdr.Unmarshal(r, &freq); err != nil {
1594                                 state.Ctx.LogE("sp-process-freq", lesp, err, func(les LEs) string {
1595                                         return fmt.Sprintf(
1596                                                 "SP with %s (nice %s): unmarshaling FREQ",
1597                                                 state.Node.Name, NicenessFmt(state.Nice),
1598                                         )
1599                                 })
1600                                 return nil, err
1601                         }
1602                         pktName := Base32Codec.EncodeToString(freq.Hash[:])
1603                         lesp = append(lesp, LE{"Pkt", pktName}, LE{"Offset", freq.Offset})
1604                         state.Ctx.LogD("sp-process-freq-queueing", lesp, func(les LEs) string {
1605                                 return fmt.Sprintf(
1606                                         "SP with %s (nice %s): FREQ %s: queuing",
1607                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1608                                 )
1609                         })
1610                         nice, exists := state.infosOurSeen[*freq.Hash]
1611                         if exists {
1612                                 if state.onlyPkts == nil || !state.onlyPkts[*freq.Hash] {
1613                                         state.Lock()
1614                                         insertIdx := 0
1615                                         var freqWithNice *FreqWithNice
1616                                         for insertIdx, freqWithNice = range state.queueTheir {
1617                                                 if freqWithNice.nice > nice {
1618                                                         break
1619                                                 }
1620                                         }
1621                                         state.queueTheir = append(state.queueTheir, nil)
1622                                         copy(state.queueTheir[insertIdx+1:], state.queueTheir[insertIdx:])
1623                                         state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice}
1624                                         state.Unlock()
1625                                 } else {
1626                                         state.Ctx.LogD("sp-process-freq-skip", lesp, func(les LEs) string {
1627                                                 return fmt.Sprintf(
1628                                                         "SP with %s (nice %s): FREQ %s: skipping",
1629                                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1630                                                 )
1631                                         })
1632                                 }
1633                         } else {
1634                                 state.Ctx.LogD("sp-process-freq-unknown", lesp, func(les LEs) string {
1635                                         return fmt.Sprintf(
1636                                                 "SP with %s (nice %s): FREQ %s: unknown",
1637                                                 state.Node.Name, NicenessFmt(state.Nice), pktName,
1638                                         )
1639                                 })
1640                         }
1641
1642                 default:
1643                         state.Ctx.LogE(
1644                                 "sp-process-type-unknown",
1645                                 append(les, LE{"Type", head.Type}),
1646                                 errors.New("unknown type"),
1647                                 func(les LEs) string {
1648                                         return fmt.Sprintf(
1649                                                 "SP with %s (nice %s): %d",
1650                                                 state.Node.Name, NicenessFmt(state.Nice), head.Type,
1651                                         )
1652                                 },
1653                         )
1654                         return nil, BadPktType
1655                 }
1656         }
1657
1658         if infosGot {
1659                 var pkts int
1660                 var size uint64
1661                 state.RLock()
1662                 for _, info := range state.infosTheir {
1663                         pkts++
1664                         size += info.Size
1665                 }
1666                 state.RUnlock()
1667                 state.Ctx.LogI("sp-infos-rx", LEs{
1668                         {"XX", string(TRx)},
1669                         {"Node", state.Node.Id},
1670                         {"Pkts", pkts},
1671                         {"Size", int64(size)},
1672                 }, func(les LEs) string {
1673                         return fmt.Sprintf(
1674                                 "%s has got for us: %d packets, %s",
1675                                 state.Node.Name, pkts, humanize.IBytes(size),
1676                         )
1677                 })
1678         }
1679         return payloadsSplit(replies), nil
1680 }