]> Cypherpunks.ru repositories - nncp.git/blob - src/sp.go
Merge branch 'develop'
[nncp.git] / src / sp.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bytes"
22         "crypto/subtle"
23         "errors"
24         "fmt"
25         "io"
26         "os"
27         "path/filepath"
28         "sort"
29         "sync"
30         "time"
31
32         xdr "github.com/davecgh/go-xdr/xdr2"
33         "github.com/dustin/go-humanize"
34         "github.com/flynn/noise"
35 )
36
37 const (
38         MaxSPSize      = 1<<16 - 256
39         PartSuffix     = ".part"
40         SPHeadOverhead = 4
41 )
42
43 type MTHAndOffset struct {
44         mth    *MTH
45         offset uint64
46 }
47
48 type SPCheckerTask struct {
49         nodeId *NodeId
50         hsh    *[MTHSize]byte
51         mth    *MTH
52         done   chan []byte
53 }
54
55 var (
56         SPInfoOverhead    int
57         SPFreqOverhead    int
58         SPFileOverhead    int
59         SPHaltMarshalized []byte
60         SPPingMarshalized []byte
61
62         NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite(
63                 noise.DH25519,
64                 noise.CipherChaChaPoly,
65                 noise.HashBLAKE2b,
66         )
67
68         DefaultDeadline = 10 * time.Second
69         PingTimeout     = time.Minute
70
71         spCheckerTasks chan SPCheckerTask
72         SPCheckerWg    sync.WaitGroup
73         spCheckerOnce  sync.Once
74 )
75
76 type FdAndFullSize struct {
77         fd       *os.File
78         fullSize int64
79 }
80
81 type SPType uint8
82
83 const (
84         SPTypeInfo SPType = iota
85         SPTypeFreq SPType = iota
86         SPTypeFile SPType = iota
87         SPTypeDone SPType = iota
88         SPTypeHalt SPType = iota
89         SPTypePing SPType = iota
90 )
91
92 type SPHead struct {
93         Type SPType
94 }
95
96 type SPInfo struct {
97         Nice uint8
98         Size uint64
99         Hash *[MTHSize]byte
100 }
101
102 type SPFreq struct {
103         Hash   *[MTHSize]byte
104         Offset uint64
105 }
106
107 type SPFile struct {
108         Hash    *[MTHSize]byte
109         Offset  uint64
110         Payload []byte
111 }
112
113 type SPDone struct {
114         Hash *[MTHSize]byte
115 }
116
117 type SPRaw struct {
118         Magic   [8]byte
119         Payload []byte
120 }
121
122 type FreqWithNice struct {
123         freq *SPFreq
124         nice uint8
125 }
126
127 type ConnDeadlined interface {
128         io.ReadWriteCloser
129         SetReadDeadline(t time.Time) error
130         SetWriteDeadline(t time.Time) error
131 }
132
133 func init() {
134         var buf bytes.Buffer
135         spHead := SPHead{Type: SPTypeHalt}
136         if _, err := xdr.Marshal(&buf, spHead); err != nil {
137                 panic(err)
138         }
139         SPHaltMarshalized = make([]byte, SPHeadOverhead)
140         copy(SPHaltMarshalized, buf.Bytes())
141         buf.Reset()
142
143         spHead = SPHead{Type: SPTypePing}
144         if _, err := xdr.Marshal(&buf, spHead); err != nil {
145                 panic(err)
146         }
147         SPPingMarshalized = make([]byte, SPHeadOverhead)
148         copy(SPPingMarshalized, buf.Bytes())
149         buf.Reset()
150
151         spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([MTHSize]byte)}
152         if _, err := xdr.Marshal(&buf, spInfo); err != nil {
153                 panic(err)
154         }
155         SPInfoOverhead = buf.Len()
156         buf.Reset()
157
158         spFreq := SPFreq{Hash: new([MTHSize]byte), Offset: 123}
159         if _, err := xdr.Marshal(&buf, spFreq); err != nil {
160                 panic(err)
161         }
162         SPFreqOverhead = buf.Len()
163         buf.Reset()
164
165         spFile := SPFile{Hash: new([MTHSize]byte), Offset: 123}
166         if _, err := xdr.Marshal(&buf, spFile); err != nil {
167                 panic(err)
168         }
169         SPFileOverhead = buf.Len()
170         spCheckerTasks = make(chan SPCheckerTask)
171 }
172
173 func MarshalSP(typ SPType, sp interface{}) []byte {
174         var buf bytes.Buffer
175         if _, err := xdr.Marshal(&buf, SPHead{typ}); err != nil {
176                 panic(err)
177         }
178         if _, err := xdr.Marshal(&buf, sp); err != nil {
179                 panic(err)
180         }
181         return buf.Bytes()
182 }
183
184 func payloadsSplit(payloads [][]byte) [][]byte {
185         var outbounds [][]byte
186         outbound := make([]byte, 0, MaxSPSize)
187         for i, payload := range payloads {
188                 outbound = append(outbound, payload...)
189                 if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxSPSize {
190                         outbounds = append(outbounds, outbound)
191                         outbound = make([]byte, 0, MaxSPSize)
192                 }
193         }
194         if len(outbound) > 0 {
195                 outbounds = append(outbounds, outbound)
196         }
197         return outbounds
198 }
199
200 type SPState struct {
201         Ctx            *Ctx
202         Node           *Node
203         Nice           uint8
204         NoCK           bool
205         onlineDeadline time.Duration
206         maxOnlineTime  time.Duration
207         hs             *noise.HandshakeState
208         csOur          *noise.CipherState
209         csTheir        *noise.CipherState
210         payloads       chan []byte
211         pings          chan struct{}
212         infosTheir     map[[MTHSize]byte]*SPInfo
213         infosOurSeen   map[[MTHSize]byte]uint8
214         queueTheir     []*FreqWithNice
215         wg             sync.WaitGroup
216         RxBytes        int64
217         RxLastSeen     time.Time
218         RxLastNonPing  time.Time
219         TxBytes        int64
220         TxLastSeen     time.Time
221         TxLastNonPing  time.Time
222         started        time.Time
223         mustFinishAt   time.Time
224         Duration       time.Duration
225         RxSpeed        int64
226         TxSpeed        int64
227         rxLock         *os.File
228         txLock         *os.File
229         xxOnly         TRxTx
230         rxRate         int
231         txRate         int
232         isDead         chan struct{}
233         listOnly       bool
234         onlyPkts       map[[MTHSize]byte]bool
235         writeSPBuf     bytes.Buffer
236         fds            map[string]FdAndFullSize
237         fdsLock        sync.RWMutex
238         fileHashers    map[string]*MTHAndOffset
239         progressBars   map[string]struct{}
240         sync.RWMutex
241 }
242
243 func (state *SPState) SetDead() {
244         state.Lock()
245         defer state.Unlock()
246         select {
247         case <-state.isDead:
248                 // Already closed channel, dead
249                 return
250         default:
251         }
252         close(state.isDead)
253         go func() {
254                 for range state.payloads {
255                 }
256         }()
257         go func() {
258                 for range state.pings {
259                 }
260         }()
261 }
262
263 func (state *SPState) NotAlive() bool {
264         select {
265         case <-state.isDead:
266                 return true
267         default:
268         }
269         return false
270 }
271
272 func (state *SPState) dirUnlock() {
273         state.Ctx.UnlockDir(state.rxLock)
274         state.Ctx.UnlockDir(state.txLock)
275 }
276
277 func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error {
278         state.writeSPBuf.Reset()
279         n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{
280                 Magic:   MagicNNCPSv1.B,
281                 Payload: payload,
282         })
283         if err != nil {
284                 return err
285         }
286         if n, err = dst.Write(state.writeSPBuf.Bytes()); err == nil {
287                 state.TxLastSeen = time.Now()
288                 state.TxBytes += int64(n)
289                 if !ping {
290                         state.TxLastNonPing = state.TxLastSeen
291                 }
292         }
293         return err
294 }
295
296 func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
297         var sp SPRaw
298         n, err := xdr.UnmarshalLimited(src, &sp, 1<<17)
299         if err != nil {
300                 ue := err.(*xdr.UnmarshalError)
301                 if ue.Err == io.EOF {
302                         return nil, ue.Err
303                 }
304                 return nil, err
305         }
306         state.RxLastSeen = time.Now()
307         state.RxBytes += int64(n)
308         if sp.Magic != MagicNNCPSv1.B {
309                 return nil, BadMagic
310         }
311         return sp.Payload, nil
312 }
313
314 func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[MTHSize]byte]uint8) [][]byte {
315         var infos []*SPInfo
316         var totalSize int64
317         for job := range ctx.Jobs(nodeId, TTx) {
318                 if job.PktEnc.Nice > nice {
319                         continue
320                 }
321                 if _, known := (*seen)[*job.HshValue]; known {
322                         continue
323                 }
324                 totalSize += job.Size
325                 infos = append(infos, &SPInfo{
326                         Nice: job.PktEnc.Nice,
327                         Size: uint64(job.Size),
328                         Hash: job.HshValue,
329                 })
330                 (*seen)[*job.HshValue] = job.PktEnc.Nice
331         }
332         sort.Sort(ByNice(infos))
333         var payloads [][]byte
334         for _, info := range infos {
335                 payloads = append(payloads, MarshalSP(SPTypeInfo, info))
336                 pktName := Base32Codec.EncodeToString(info.Hash[:])
337                 ctx.LogD("sp-info-our", LEs{
338                         {"Node", nodeId},
339                         {"Name", pktName},
340                         {"Size", info.Size},
341                 }, func(les LEs) string {
342                         return fmt.Sprintf(
343                                 "Our info: %s/tx/%s (%s)",
344                                 ctx.NodeName(nodeId),
345                                 pktName,
346                                 humanize.IBytes(info.Size),
347                         )
348                 })
349         }
350         if totalSize > 0 {
351                 ctx.LogI("sp-infos-tx", LEs{
352                         {"XX", string(TTx)},
353                         {"Node", nodeId},
354                         {"Pkts", len(payloads)},
355                         {"Size", totalSize},
356                 }, func(les LEs) string {
357                         return fmt.Sprintf(
358                                 "We have got for %s: %d packets, %s",
359                                 ctx.NodeName(nodeId),
360                                 len(payloads),
361                                 humanize.IBytes(uint64(totalSize)),
362                         )
363                 })
364         }
365         return payloadsSplit(payloads)
366 }
367
368 func (state *SPState) StartI(conn ConnDeadlined) error {
369         nodeId := state.Node.Id
370         err := state.Ctx.ensureRxDir(nodeId)
371         if err != nil {
372                 return err
373         }
374         var rxLock *os.File
375         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TRx) {
376                 rxLock, err = state.Ctx.LockDir(nodeId, string(TRx))
377                 if err != nil {
378                         return err
379                 }
380         }
381         var txLock *os.File
382         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
383                 txLock, err = state.Ctx.LockDir(nodeId, string(TTx))
384                 if err != nil {
385                         return err
386                 }
387         }
388         started := time.Now()
389         conf := noise.Config{
390                 CipherSuite: NoiseCipherSuite,
391                 Pattern:     noise.HandshakeIK,
392                 Initiator:   true,
393                 StaticKeypair: noise.DHKey{
394                         Private: state.Ctx.Self.NoisePrv[:],
395                         Public:  state.Ctx.Self.NoisePub[:],
396                 },
397                 PeerStatic: state.Node.NoisePub[:],
398         }
399         hs, err := noise.NewHandshakeState(conf)
400         if err != nil {
401                 return err
402         }
403         state.hs = hs
404         state.payloads = make(chan []byte)
405         state.pings = make(chan struct{})
406         state.infosTheir = make(map[[MTHSize]byte]*SPInfo)
407         state.infosOurSeen = make(map[[MTHSize]byte]uint8)
408         state.progressBars = make(map[string]struct{})
409         state.started = started
410         state.rxLock = rxLock
411         state.txLock = txLock
412
413         var infosPayloads [][]byte
414         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
415                 infosPayloads = state.Ctx.infosOur(nodeId, state.Nice, &state.infosOurSeen)
416         }
417         var firstPayload []byte
418         if len(infosPayloads) > 0 {
419                 firstPayload = infosPayloads[0]
420         }
421         // Pad first payload, to hide actual number of existing files
422         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
423                 firstPayload = append(firstPayload, SPHaltMarshalized...)
424         }
425
426         var buf []byte
427         var payload []byte
428         buf, _, _, err = state.hs.WriteMessage(nil, firstPayload)
429         if err != nil {
430                 state.dirUnlock()
431                 return err
432         }
433         les := LEs{{"Node", nodeId}, {"Nice", int(state.Nice)}}
434         state.Ctx.LogD("sp-startI", les, func(les LEs) string {
435                 return fmt.Sprintf(
436                         "SP with %s (nice %s): sending first message",
437                         state.Node.Name,
438                         NicenessFmt(state.Nice),
439                 )
440         })
441         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
442         if err = state.WriteSP(conn, buf, false); err != nil {
443                 state.Ctx.LogE("sp-startI", les, err, func(les LEs) string {
444                         return fmt.Sprintf(
445                                 "SP with %s (nice %s): writing",
446                                 state.Node.Name,
447                                 NicenessFmt(state.Nice),
448                         )
449                 })
450                 state.dirUnlock()
451                 return err
452         }
453         state.Ctx.LogD("sp-startI-wait", les, func(les LEs) string {
454                 return fmt.Sprintf(
455                         "SP with %s (nice %s): waiting for first message",
456                         state.Node.Name,
457                         NicenessFmt(state.Nice),
458                 )
459         })
460         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
461         if buf, err = state.ReadSP(conn); err != nil {
462                 state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string {
463                         return fmt.Sprintf(
464                                 "SP with %s (nice %s): reading",
465                                 state.Node.Name,
466                                 NicenessFmt(state.Nice),
467                         )
468                 })
469                 state.dirUnlock()
470                 return err
471         }
472         payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
473         if err != nil {
474                 state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string {
475                         return fmt.Sprintf(
476                                 "SP with %s (nice %s): reading Noise message",
477                                 state.Node.Name,
478                                 NicenessFmt(state.Nice),
479                         )
480                 })
481                 state.dirUnlock()
482                 return err
483         }
484         state.Ctx.LogD("sp-startI-workers", les, func(les LEs) string {
485                 return fmt.Sprintf(
486                         "SP with %s (nice %s): starting workers",
487                         state.Node.Name,
488                         NicenessFmt(state.Nice),
489                 )
490         })
491         err = state.StartWorkers(conn, infosPayloads, payload)
492         if err != nil {
493                 state.Ctx.LogE("sp-startI-workers", les, err, func(les LEs) string {
494                         return fmt.Sprintf(
495                                 "SP with %s (nice %s): starting workers",
496                                 state.Node.Name,
497                                 NicenessFmt(state.Nice),
498                         )
499                 })
500                 state.dirUnlock()
501         }
502         return err
503 }
504
505 func (state *SPState) StartR(conn ConnDeadlined) error {
506         started := time.Now()
507         conf := noise.Config{
508                 CipherSuite: NoiseCipherSuite,
509                 Pattern:     noise.HandshakeIK,
510                 Initiator:   false,
511                 StaticKeypair: noise.DHKey{
512                         Private: state.Ctx.Self.NoisePrv[:],
513                         Public:  state.Ctx.Self.NoisePub[:],
514                 },
515         }
516         hs, err := noise.NewHandshakeState(conf)
517         if err != nil {
518                 return err
519         }
520         xxOnly := TRxTx("")
521         state.hs = hs
522         state.payloads = make(chan []byte)
523         state.pings = make(chan struct{})
524         state.infosOurSeen = make(map[[MTHSize]byte]uint8)
525         state.infosTheir = make(map[[MTHSize]byte]*SPInfo)
526         state.progressBars = make(map[string]struct{})
527         state.started = started
528         state.xxOnly = xxOnly
529
530         var buf []byte
531         var payload []byte
532         logMsg := func(les LEs) string {
533                 return fmt.Sprintf(
534                         "SP nice %s: waiting for first message",
535                         NicenessFmt(state.Nice),
536                 )
537         }
538         les := LEs{{"Nice", int(state.Nice)}}
539         state.Ctx.LogD("sp-startR", les, logMsg)
540         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
541         if buf, err = state.ReadSP(conn); err != nil {
542                 state.Ctx.LogE("sp-startR-read", les, err, logMsg)
543                 return err
544         }
545         if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
546                 state.Ctx.LogE("sp-startR-read", les, err, logMsg)
547                 return err
548         }
549
550         var node *Node
551         for _, n := range state.Ctx.Neigh {
552                 if subtle.ConstantTimeCompare(state.hs.PeerStatic(), n.NoisePub[:]) == 1 {
553                         node = n
554                         break
555                 }
556         }
557         if node == nil {
558                 peerId := Base32Codec.EncodeToString(state.hs.PeerStatic())
559                 err = errors.New("unknown peer: " + peerId)
560                 state.Ctx.LogE("sp-startR-unknown", append(les, LE{"Peer", peerId}), err, logMsg)
561                 return err
562         }
563         state.Node = node
564         state.rxRate = node.RxRate
565         state.txRate = node.TxRate
566         state.onlineDeadline = node.OnlineDeadline
567         state.maxOnlineTime = node.MaxOnlineTime
568         les = LEs{{"Node", node.Id}, {"Nice", int(state.Nice)}}
569
570         if err = state.Ctx.ensureRxDir(node.Id); err != nil {
571                 return err
572         }
573         var rxLock *os.File
574         if xxOnly == "" || xxOnly == TRx {
575                 rxLock, err = state.Ctx.LockDir(node.Id, string(TRx))
576                 if err != nil {
577                         return err
578                 }
579         }
580         state.rxLock = rxLock
581         var txLock *os.File
582         if xxOnly == "" || xxOnly == TTx {
583                 txLock, err = state.Ctx.LockDir(node.Id, string(TTx))
584                 if err != nil {
585                         return err
586                 }
587         }
588         state.txLock = txLock
589
590         var infosPayloads [][]byte
591         if xxOnly == "" || xxOnly == TTx {
592                 infosPayloads = state.Ctx.infosOur(node.Id, state.Nice, &state.infosOurSeen)
593         }
594         var firstPayload []byte
595         if len(infosPayloads) > 0 {
596                 firstPayload = infosPayloads[0]
597         }
598         // Pad first payload, to hide actual number of existing files
599         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
600                 firstPayload = append(firstPayload, SPHaltMarshalized...)
601         }
602
603         state.Ctx.LogD("sp-startR-write", les, func(les LEs) string {
604                 return fmt.Sprintf(
605                         "SP with %s (nice %s): sending first message",
606                         node.Name, NicenessFmt(state.Nice),
607                 )
608         })
609         buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload)
610         if err != nil {
611                 state.dirUnlock()
612                 return err
613         }
614         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
615         if err = state.WriteSP(conn, buf, false); err != nil {
616                 state.Ctx.LogE("sp-startR-write", les, err, func(les LEs) string {
617                         return fmt.Sprintf(
618                                 "SP with %s (nice %s): writing",
619                                 node.Name, NicenessFmt(state.Nice),
620                         )
621                 })
622                 state.dirUnlock()
623                 return err
624         }
625         state.Ctx.LogD("sp-startR-workers", les, func(les LEs) string {
626                 return fmt.Sprintf(
627                         "SP with %s (nice %s): starting workers",
628                         node.Name, NicenessFmt(state.Nice),
629                 )
630         })
631         err = state.StartWorkers(conn, infosPayloads, payload)
632         if err != nil {
633                 state.dirUnlock()
634         }
635         return err
636 }
637
638 func (state *SPState) closeFd(pth string) {
639         state.fdsLock.Lock()
640         if s, exists := state.fds[pth]; exists {
641                 delete(state.fds, pth)
642                 s.fd.Close()
643         }
644         state.fdsLock.Unlock()
645 }
646
647 func (state *SPState) StartWorkers(
648         conn ConnDeadlined,
649         infosPayloads [][]byte,
650         payload []byte,
651 ) error {
652         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
653         state.fds = make(map[string]FdAndFullSize)
654         state.fileHashers = make(map[string]*MTHAndOffset)
655         state.isDead = make(chan struct{})
656         if state.maxOnlineTime > 0 {
657                 state.mustFinishAt = state.started.Add(state.maxOnlineTime)
658         }
659         if !state.NoCK {
660                 spCheckerOnce.Do(func() { go SPChecker(state.Ctx) })
661         }
662
663         // Remaining handshake payload sending
664         if len(infosPayloads) > 1 {
665                 state.wg.Add(1)
666                 go func() {
667                         for _, payload := range infosPayloads[1:] {
668                                 state.Ctx.LogD(
669                                         "sp-queue-remaining",
670                                         append(les, LE{"Size", int64(len(payload))}),
671                                         func(les LEs) string {
672                                                 return fmt.Sprintf(
673                                                         "SP with %s (nice %s): queuing remaining payload (%s)",
674                                                         state.Node.Name, NicenessFmt(state.Nice),
675                                                         humanize.IBytes(uint64(len(payload))),
676                                                 )
677                                         },
678                                 )
679                                 state.payloads <- payload
680                         }
681                         state.wg.Done()
682                 }()
683         }
684
685         // Processing of first payload and queueing its responses
686         logMsg := func(les LEs) string {
687                 return fmt.Sprintf(
688                         "SP with %s (nice %s): processing first payload (%s)",
689                         state.Node.Name, NicenessFmt(state.Nice),
690                         humanize.IBytes(uint64(len(payload))),
691                 )
692         }
693         state.Ctx.LogD("sp-process", append(les, LE{"Size", int64(len(payload))}), logMsg)
694         replies, err := state.ProcessSP(payload)
695         if err != nil {
696                 state.Ctx.LogE("sp-process", les, err, logMsg)
697                 return err
698         }
699         state.wg.Add(1)
700         go func() {
701                 for _, reply := range replies {
702                         state.Ctx.LogD(
703                                 "sp-queue-reply",
704                                 append(les, LE{"Size", int64(len(reply))}),
705                                 func(les LEs) string {
706                                         return fmt.Sprintf(
707                                                 "SP with %s (nice %s): queuing reply (%s)",
708                                                 state.Node.Name, NicenessFmt(state.Nice),
709                                                 humanize.IBytes(uint64(len(payload))),
710                                         )
711                                 },
712                         )
713                         state.payloads <- reply
714                 }
715                 state.wg.Done()
716         }()
717
718         // Periodic jobs
719         state.wg.Add(1)
720         go func() {
721                 deadlineTicker := time.NewTicker(time.Second)
722                 pingTicker := time.NewTicker(PingTimeout)
723                 for {
724                         select {
725                         case <-state.isDead:
726                                 state.wg.Done()
727                                 deadlineTicker.Stop()
728                                 pingTicker.Stop()
729                                 return
730                         case now := <-deadlineTicker.C:
731                                 if now.Sub(state.RxLastNonPing) >= state.onlineDeadline &&
732                                         now.Sub(state.TxLastNonPing) >= state.onlineDeadline {
733                                         goto Deadlined
734                                 }
735                                 if state.maxOnlineTime > 0 && state.mustFinishAt.Before(now) {
736                                         goto Deadlined
737                                 }
738                                 if now.Sub(state.RxLastSeen) >= 2*PingTimeout {
739                                         goto Deadlined
740                                 }
741                                 break
742                         Deadlined:
743                                 state.SetDead()
744                                 conn.Close() // #nosec G104
745                         case now := <-pingTicker.C:
746                                 if now.After(state.TxLastSeen.Add(PingTimeout)) {
747                                         state.wg.Add(1)
748                                         go func() {
749                                                 state.pings <- struct{}{}
750                                                 state.wg.Done()
751                                         }()
752                                 }
753                         }
754                 }
755         }()
756
757         // Spool checker and INFOs sender of appearing files
758         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
759                 state.wg.Add(1)
760                 go func() {
761                         ticker := time.NewTicker(time.Second)
762                         for {
763                                 select {
764                                 case <-state.isDead:
765                                         state.wg.Done()
766                                         ticker.Stop()
767                                         return
768                                 case <-ticker.C:
769                                         for _, payload := range state.Ctx.infosOur(
770                                                 state.Node.Id,
771                                                 state.Nice,
772                                                 &state.infosOurSeen,
773                                         ) {
774                                                 state.Ctx.LogD(
775                                                         "sp-queue-info",
776                                                         append(les, LE{"Size", int64(len(payload))}),
777                                                         func(les LEs) string {
778                                                                 return fmt.Sprintf(
779                                                                         "SP with %s (nice %s): queuing new info (%s)",
780                                                                         state.Node.Name, NicenessFmt(state.Nice),
781                                                                         humanize.IBytes(uint64(len(payload))),
782                                                                 )
783                                                         },
784                                                 )
785                                                 state.payloads <- payload
786                                         }
787                                 }
788                         }
789                 }()
790         }
791
792         // Sender
793         state.wg.Add(1)
794         go func() {
795                 defer conn.Close()
796                 defer state.SetDead()
797                 defer state.wg.Done()
798                 for {
799                         if state.NotAlive() {
800                                 return
801                         }
802                         var payload []byte
803                         var ping bool
804                         select {
805                         case <-state.pings:
806                                 state.Ctx.LogD("sp-got-ping", les, func(les LEs) string {
807                                         return fmt.Sprintf(
808                                                 "SP with %s (nice %s): got ping",
809                                                 state.Node.Name, NicenessFmt(state.Nice),
810                                         )
811                                 })
812                                 payload = SPPingMarshalized
813                                 ping = true
814                         case payload = <-state.payloads:
815                                 state.Ctx.LogD(
816                                         "sp-got-payload",
817                                         append(les, LE{"Size", int64(len(payload))}),
818                                         func(les LEs) string {
819                                                 return fmt.Sprintf(
820                                                         "SP with %s (nice %s): got payload (%s)",
821                                                         state.Node.Name, NicenessFmt(state.Nice),
822                                                         humanize.IBytes(uint64(len(payload))),
823                                                 )
824                                         },
825                                 )
826                         default:
827                                 state.RLock()
828                                 if len(state.queueTheir) == 0 {
829                                         state.RUnlock()
830                                         time.Sleep(100 * time.Millisecond)
831                                         continue
832                                 }
833                                 freq := state.queueTheir[0].freq
834                                 state.RUnlock()
835                                 if state.txRate > 0 {
836                                         time.Sleep(time.Second / time.Duration(state.txRate))
837                                 }
838                                 pktName := Base32Codec.EncodeToString(freq.Hash[:])
839                                 lesp := append(
840                                         les,
841                                         LE{"XX", string(TTx)},
842                                         LE{"Pkt", pktName},
843                                         LE{"Size", int64(freq.Offset)},
844                                 )
845                                 logMsg := func(les LEs) string {
846                                         return fmt.Sprintf(
847                                                 "SP with %s (nice %s): tx/%s (%s)",
848                                                 state.Node.Name, NicenessFmt(state.Nice),
849                                                 pktName,
850                                                 humanize.IBytes(freq.Offset),
851                                         )
852                                 }
853                                 state.Ctx.LogD("sp-queue", lesp, func(les LEs) string {
854                                         return logMsg(les) + ": queueing"
855                                 })
856                                 pth := filepath.Join(
857                                         state.Ctx.Spool,
858                                         state.Node.Id.String(),
859                                         string(TTx),
860                                         Base32Codec.EncodeToString(freq.Hash[:]),
861                                 )
862                                 state.fdsLock.RLock()
863                                 fdAndFullSize, exists := state.fds[pth]
864                                 state.fdsLock.RUnlock()
865                                 if !exists {
866                                         fd, err := os.Open(pth)
867                                         if err != nil {
868                                                 state.Ctx.LogE("sp-queue-open", lesp, err, func(les LEs) string {
869                                                         return logMsg(les) + ": opening"
870                                                 })
871                                                 return
872                                         }
873                                         fi, err := fd.Stat()
874                                         if err != nil {
875                                                 state.Ctx.LogE("sp-queue-stat", lesp, err, func(les LEs) string {
876                                                         return logMsg(les) + ": stating"
877                                                 })
878                                                 return
879                                         }
880                                         fdAndFullSize = FdAndFullSize{fd: fd, fullSize: fi.Size()}
881                                         state.fdsLock.Lock()
882                                         state.fds[pth] = fdAndFullSize
883                                         state.fdsLock.Unlock()
884                                 }
885                                 fd := fdAndFullSize.fd
886                                 fullSize := fdAndFullSize.fullSize
887                                 var buf []byte
888                                 if freq.Offset < uint64(fullSize) {
889                                         state.Ctx.LogD("sp-file-seek", lesp, func(les LEs) string {
890                                                 return logMsg(les) + ": seeking"
891                                         })
892                                         if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil {
893                                                 state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
894                                                         return logMsg(les) + ": seeking"
895                                                 })
896                                                 return
897                                         }
898                                         buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
899                                         n, err := fd.Read(buf)
900                                         if err != nil {
901                                                 state.Ctx.LogE("sp-file-read", lesp, err, func(les LEs) string {
902                                                         return logMsg(les) + ": reading"
903                                                 })
904                                                 return
905                                         }
906                                         buf = buf[:n]
907                                         lesp = append(
908                                                 les,
909                                                 LE{"XX", string(TTx)},
910                                                 LE{"Pkt", pktName},
911                                                 LE{"Size", int64(n)},
912                                         )
913                                         state.Ctx.LogD("sp-file-read", lesp, func(les LEs) string {
914                                                 return fmt.Sprintf(
915                                                         "%s: read %s",
916                                                         logMsg(les), humanize.IBytes(uint64(n)),
917                                                 )
918                                         })
919                                 }
920                                 state.closeFd(pth)
921                                 payload = MarshalSP(SPTypeFile, SPFile{
922                                         Hash:    freq.Hash,
923                                         Offset:  freq.Offset,
924                                         Payload: buf,
925                                 })
926                                 ourSize := freq.Offset + uint64(len(buf))
927                                 lesp = append(
928                                         les,
929                                         LE{"XX", string(TTx)},
930                                         LE{"Pkt", pktName},
931                                         LE{"Size", int64(ourSize)},
932                                         LE{"FullSize", fullSize},
933                                 )
934                                 if state.Ctx.ShowPrgrs {
935                                         state.progressBars[pktName] = struct{}{}
936                                         Progress("Tx", lesp)
937                                 }
938                                 state.Lock()
939                                 if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash {
940                                         if ourSize == uint64(fullSize) {
941                                                 state.Ctx.LogD("sp-file-finished", lesp, func(les LEs) string {
942                                                         return logMsg(les) + ": finished"
943                                                 })
944                                                 if len(state.queueTheir) > 1 {
945                                                         state.queueTheir = state.queueTheir[1:]
946                                                 } else {
947                                                         state.queueTheir = state.queueTheir[:0]
948                                                 }
949                                                 if state.Ctx.ShowPrgrs {
950                                                         delete(state.progressBars, pktName)
951                                                 }
952                                         } else {
953                                                 state.queueTheir[0].freq.Offset += uint64(len(buf))
954                                         }
955                                 } else {
956                                         state.Ctx.LogD("sp-file-disappeared", lesp, func(les LEs) string {
957                                                 return logMsg(les) + ": queue disappeared"
958                                         })
959                                 }
960                                 state.Unlock()
961                         }
962                         logMsg := func(les LEs) string {
963                                 return fmt.Sprintf(
964                                         "SP with %s (nice %s): sending %s",
965                                         state.Node.Name, NicenessFmt(state.Nice),
966                                         humanize.IBytes(uint64(len(payload))),
967                                 )
968                         }
969                         state.Ctx.LogD("sp-sending", append(les, LE{"Size", int64(len(payload))}), logMsg)
970                         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
971                         ct, err := state.csOur.Encrypt(nil, nil, payload)
972                         if err != nil {
973                                 state.Ctx.LogE("sp-encrypting", les, err, logMsg)
974                                 return
975                         }
976                         if err := state.WriteSP(conn, ct, ping); err != nil {
977                                 state.Ctx.LogE("sp-sending", les, err, logMsg)
978                                 return
979                         }
980                 }
981         }()
982
983         // Receiver
984         state.wg.Add(1)
985         go func() {
986                 for {
987                         if state.NotAlive() {
988                                 break
989                         }
990                         logMsg := func(les LEs) string {
991                                 return fmt.Sprintf(
992                                         "SP with %s (nice %s): waiting for payload",
993                                         state.Node.Name, NicenessFmt(state.Nice),
994                                 )
995                         }
996                         state.Ctx.LogD("sp-recv-wait", les, logMsg)
997                         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
998                         payload, err := state.ReadSP(conn)
999                         if err != nil {
1000                                 if err == io.EOF {
1001                                         break
1002                                 }
1003                                 unmarshalErr := err.(*xdr.UnmarshalError)
1004                                 if os.IsTimeout(unmarshalErr.Err) {
1005                                         continue
1006                                 }
1007                                 if unmarshalErr.ErrorCode == xdr.ErrIO {
1008                                         break
1009                                 }
1010                                 state.Ctx.LogE("sp-recv-wait", les, err, logMsg)
1011                                 break
1012                         }
1013                         logMsg = func(les LEs) string {
1014                                 return fmt.Sprintf(
1015                                         "SP with %s (nice %s): payload (%s)",
1016                                         state.Node.Name, NicenessFmt(state.Nice),
1017                                         humanize.IBytes(uint64(len(payload))),
1018                                 )
1019                         }
1020                         state.Ctx.LogD(
1021                                 "sp-recv-got",
1022                                 append(les, LE{"Size", int64(len(payload))}),
1023                                 func(les LEs) string { return logMsg(les) + ": got" },
1024                         )
1025                         payload, err = state.csTheir.Decrypt(nil, nil, payload)
1026                         if err != nil {
1027                                 state.Ctx.LogE("sp-recv-got", les, err, func(les LEs) string {
1028                                         return logMsg(les) + ": got"
1029                                 })
1030                                 break
1031                         }
1032                         state.Ctx.LogD(
1033                                 "sp-recv-process",
1034                                 append(les, LE{"Size", int64(len(payload))}),
1035                                 func(les LEs) string {
1036                                         return logMsg(les) + ": processing"
1037                                 },
1038                         )
1039                         replies, err := state.ProcessSP(payload)
1040                         if err != nil {
1041                                 state.Ctx.LogE("sp-recv-process", les, err, func(les LEs) string {
1042                                         return logMsg(les) + ": processing"
1043                                 })
1044                                 break
1045                         }
1046                         state.wg.Add(1)
1047                         go func() {
1048                                 for _, reply := range replies {
1049                                         state.Ctx.LogD(
1050                                                 "sp-recv-reply",
1051                                                 append(les[:len(les)-1], LE{"Size", int64(len(reply))}),
1052                                                 func(les LEs) string {
1053                                                         return fmt.Sprintf(
1054                                                                 "SP with %s (nice %s): queuing reply (%s)",
1055                                                                 state.Node.Name, NicenessFmt(state.Nice),
1056                                                                 humanize.IBytes(uint64(len(reply))),
1057                                                         )
1058                                                 },
1059                                         )
1060                                         state.payloads <- reply
1061                                 }
1062                                 state.wg.Done()
1063                         }()
1064                         if state.rxRate > 0 {
1065                                 time.Sleep(time.Second / time.Duration(state.rxRate))
1066                         }
1067                 }
1068                 state.SetDead()
1069                 state.wg.Done()
1070                 state.SetDead()
1071                 conn.Close() // #nosec G104
1072         }()
1073
1074         return nil
1075 }
1076
1077 func (state *SPState) Wait() {
1078         state.wg.Wait()
1079         close(state.payloads)
1080         close(state.pings)
1081         state.Duration = time.Now().Sub(state.started)
1082         state.dirUnlock()
1083         state.RxSpeed = state.RxBytes
1084         state.TxSpeed = state.TxBytes
1085         rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
1086         txDuration := int64(state.TxLastSeen.Sub(state.started).Seconds())
1087         if rxDuration > 0 {
1088                 state.RxSpeed = state.RxBytes / rxDuration
1089         }
1090         if txDuration > 0 {
1091                 state.TxSpeed = state.TxBytes / txDuration
1092         }
1093         for _, s := range state.fds {
1094                 s.fd.Close()
1095         }
1096         for pktName := range state.progressBars {
1097                 ProgressKill(pktName)
1098         }
1099 }
1100
1101 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
1102         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
1103         r := bytes.NewReader(payload)
1104         var err error
1105         var replies [][]byte
1106         var infosGot bool
1107         for r.Len() > 0 {
1108                 state.Ctx.LogD("sp-process-unmarshal", les, func(les LEs) string {
1109                         return fmt.Sprintf(
1110                                 "SP with %s (nice %s): unmarshaling header",
1111                                 state.Node.Name, NicenessFmt(state.Nice),
1112                         )
1113                 })
1114                 var head SPHead
1115                 if _, err = xdr.Unmarshal(r, &head); err != nil {
1116                         state.Ctx.LogE("sp-process-unmarshal", les, err, func(les LEs) string {
1117                                 return fmt.Sprintf(
1118                                         "SP with %s (nice %s): unmarshaling header",
1119                                         state.Node.Name, NicenessFmt(state.Nice),
1120                                 )
1121                         })
1122                         return nil, err
1123                 }
1124                 if head.Type != SPTypePing {
1125                         state.RxLastNonPing = state.RxLastSeen
1126                 }
1127                 switch head.Type {
1128                 case SPTypeHalt:
1129                         state.Ctx.LogD(
1130                                 "sp-process-halt",
1131                                 append(les, LE{"Type", "halt"}), func(les LEs) string {
1132                                         return fmt.Sprintf(
1133                                                 "SP with %s (nice %s): got HALT",
1134                                                 state.Node.Name, NicenessFmt(state.Nice),
1135                                         )
1136                                 },
1137                         )
1138                         state.Lock()
1139                         state.queueTheir = nil
1140                         state.Unlock()
1141
1142                 case SPTypePing:
1143                         state.Ctx.LogD(
1144                                 "sp-process-ping",
1145                                 append(les, LE{"Type", "ping"}),
1146                                 func(les LEs) string {
1147                                         return fmt.Sprintf(
1148                                                 "SP with %s (nice %s): got PING",
1149                                                 state.Node.Name, NicenessFmt(state.Nice),
1150                                         )
1151                                 },
1152                         )
1153
1154                 case SPTypeInfo:
1155                         infosGot = true
1156                         lesp := append(les, LE{"Type", "info"})
1157                         state.Ctx.LogD(
1158                                 "sp-process-info-unmarshal", lesp,
1159                                 func(les LEs) string {
1160                                         return fmt.Sprintf(
1161                                                 "SP with %s (nice %s): unmarshaling INFO",
1162                                                 state.Node.Name, NicenessFmt(state.Nice),
1163                                         )
1164                                 },
1165                         )
1166                         var info SPInfo
1167                         if _, err = xdr.Unmarshal(r, &info); err != nil {
1168                                 state.Ctx.LogE(
1169                                         "sp-process-info-unmarshal", lesp, err,
1170                                         func(les LEs) string {
1171                                                 return fmt.Sprintf(
1172                                                         "SP with %s (nice %s): unmarshaling INFO",
1173                                                         state.Node.Name, NicenessFmt(state.Nice),
1174                                                 )
1175                                         },
1176                                 )
1177                                 return nil, err
1178                         }
1179                         pktName := Base32Codec.EncodeToString(info.Hash[:])
1180                         lesp = append(
1181                                 lesp,
1182                                 LE{"Pkt", pktName},
1183                                 LE{"Size", int64(info.Size)},
1184                                 LE{"PktNice", int(info.Nice)},
1185                         )
1186                         logMsg := func(les LEs) string {
1187                                 return fmt.Sprintf(
1188                                         "SP with %s (nice %s): INFO %s (%s) nice %s",
1189                                         state.Node.Name, NicenessFmt(state.Nice),
1190                                         pktName,
1191                                         humanize.IBytes(info.Size),
1192                                         NicenessFmt(info.Nice),
1193                                 )
1194                         }
1195                         if !state.listOnly && info.Nice > state.Nice {
1196                                 state.Ctx.LogD("sp-process-info-too-nice", lesp, func(les LEs) string {
1197                                         return logMsg(les) + ": too nice"
1198                                 })
1199                                 continue
1200                         }
1201                         state.Ctx.LogD("sp-process-info-got", lesp, func(les LEs) string {
1202                                 return logMsg(les) + ": received"
1203                         })
1204                         if !state.listOnly && state.xxOnly == TTx {
1205                                 continue
1206                         }
1207                         state.Lock()
1208                         state.infosTheir[*info.Hash] = &info
1209                         state.Unlock()
1210                         state.Ctx.LogD("sp-process-info-stat", lesp, func(les LEs) string {
1211                                 return logMsg(les) + ": stating part"
1212                         })
1213                         pktPath := filepath.Join(
1214                                 state.Ctx.Spool,
1215                                 state.Node.Id.String(),
1216                                 string(TRx),
1217                                 Base32Codec.EncodeToString(info.Hash[:]),
1218                         )
1219                         logMsg = func(les LEs) string {
1220                                 return fmt.Sprintf(
1221                                         "Packet %s (%s) (nice %s)",
1222                                         pktName,
1223                                         humanize.IBytes(info.Size),
1224                                         NicenessFmt(info.Nice),
1225                                 )
1226                         }
1227                         if _, err = os.Stat(pktPath); err == nil {
1228                                 state.Ctx.LogI("sp-info-done", lesp, func(les LEs) string {
1229                                         return logMsg(les) + ": already done"
1230                                 })
1231                                 if !state.listOnly {
1232                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
1233                                 }
1234                                 continue
1235                         }
1236                         if _, err = os.Stat(pktPath + SeenSuffix); err == nil {
1237                                 state.Ctx.LogI("sp-info-seen", lesp, func(les LEs) string {
1238                                         return logMsg(les) + ": already seen"
1239                                 })
1240                                 if !state.listOnly {
1241                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
1242                                 }
1243                                 continue
1244                         }
1245                         if _, err = os.Stat(pktPath + NoCKSuffix); err == nil {
1246                                 state.Ctx.LogI("sp-info-nock", lesp, func(les LEs) string {
1247                                         return logMsg(les) + ": still not checksummed"
1248                                 })
1249                                 continue
1250                         }
1251                         fi, err := os.Stat(pktPath + PartSuffix)
1252                         var offset int64
1253                         if err == nil {
1254                                 offset = fi.Size()
1255                         }
1256                         if !state.Ctx.IsEnoughSpace(int64(info.Size) - offset) {
1257                                 state.Ctx.LogI("sp-info-no-space", lesp, func(les LEs) string {
1258                                         return logMsg(les) + ": not enough space"
1259                                 })
1260                                 continue
1261                         }
1262                         state.Ctx.LogI(
1263                                 "sp-info",
1264                                 append(lesp, LE{"Offset", offset}),
1265                                 func(les LEs) string {
1266                                         return fmt.Sprintf(
1267                                                 "%s: %d%%", logMsg(les), 100*uint64(offset)/info.Size,
1268                                         )
1269                                 },
1270                         )
1271                         if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) {
1272                                 replies = append(replies, MarshalSP(
1273                                         SPTypeFreq,
1274                                         SPFreq{info.Hash, uint64(offset)},
1275                                 ))
1276                         }
1277
1278                 case SPTypeFile:
1279                         lesp := append(les, LE{"Type", "file"})
1280                         state.Ctx.LogD("sp-process-file", lesp, func(les LEs) string {
1281                                 return fmt.Sprintf(
1282                                         "SP with %s (nice %s): unmarshaling FILE",
1283                                         state.Node.Name, NicenessFmt(state.Nice),
1284                                 )
1285                         })
1286                         var file SPFile
1287                         if _, err = xdr.Unmarshal(r, &file); err != nil {
1288                                 state.Ctx.LogE("sp-process-file", lesp, err, func(les LEs) string {
1289                                         return fmt.Sprintf(
1290                                                 "SP with %s (nice %s): unmarshaling FILE",
1291                                                 state.Node.Name, NicenessFmt(state.Nice),
1292                                         )
1293                                 })
1294                                 return nil, err
1295                         }
1296                         pktName := Base32Codec.EncodeToString(file.Hash[:])
1297                         lesp = append(
1298                                 lesp,
1299                                 LE{"XX", string(TRx)},
1300                                 LE{"Pkt", pktName},
1301                                 LE{"Size", int64(len(file.Payload))},
1302                         )
1303                         logMsg := func(les LEs) string {
1304                                 return fmt.Sprintf(
1305                                         "Got packet %s (%s)",
1306                                         pktName, humanize.IBytes(uint64(len(file.Payload))),
1307                                 )
1308                         }
1309                         fullsize := int64(0)
1310                         state.RLock()
1311                         infoTheir, ok := state.infosTheir[*file.Hash]
1312                         state.RUnlock()
1313                         if !ok {
1314                                 state.Ctx.LogE("sp-file-open", lesp, err, func(les LEs) string {
1315                                         return logMsg(les) + ": unknown file"
1316                                 })
1317                                 continue
1318                         }
1319                         fullsize = int64(infoTheir.Size)
1320                         lesp = append(lesp, LE{"FullSize", fullsize})
1321                         dirToSync := filepath.Join(
1322                                 state.Ctx.Spool,
1323                                 state.Node.Id.String(),
1324                                 string(TRx),
1325                         )
1326                         filePath := filepath.Join(dirToSync, pktName)
1327                         filePathPart := filePath + PartSuffix
1328                         state.Ctx.LogD("sp-file-open", lesp, func(les LEs) string {
1329                                 return logMsg(les) + ": opening part"
1330                         })
1331                         state.fdsLock.RLock()
1332                         fdAndFullSize, exists := state.fds[filePathPart]
1333                         state.fdsLock.RUnlock()
1334                         hasherAndOffset := state.fileHashers[filePath]
1335                         var fd *os.File
1336                         if exists {
1337                                 fd = fdAndFullSize.fd
1338                         } else {
1339                                 fd, err = os.OpenFile(
1340                                         filePathPart,
1341                                         os.O_RDWR|os.O_CREATE,
1342                                         os.FileMode(0666),
1343                                 )
1344                                 if err != nil {
1345                                         state.Ctx.LogE("sp-file-open", lesp, err, func(les LEs) string {
1346                                                 return logMsg(les) + ": opening part"
1347                                         })
1348                                         return nil, err
1349                                 }
1350                                 state.fdsLock.Lock()
1351                                 state.fds[filePathPart] = FdAndFullSize{fd: fd}
1352                                 state.fdsLock.Unlock()
1353                                 if !state.NoCK {
1354                                         hasherAndOffset = &MTHAndOffset{
1355                                                 mth:    MTHNew(fullsize, int64(file.Offset)),
1356                                                 offset: file.Offset,
1357                                         }
1358                                         state.fileHashers[filePath] = hasherAndOffset
1359                                 }
1360                         }
1361                         state.Ctx.LogD(
1362                                 "sp-file-seek",
1363                                 append(lesp, LE{"Offset", file.Offset}),
1364                                 func(les LEs) string {
1365                                         return fmt.Sprintf("%s: seeking %d", logMsg(les), file.Offset)
1366                                 })
1367                         if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
1368                                 state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
1369                                         return logMsg(les) + ": seeking"
1370                                 })
1371                                 state.closeFd(filePathPart)
1372                                 return nil, err
1373                         }
1374                         state.Ctx.LogD("sp-file-write", lesp, func(les LEs) string {
1375                                 return logMsg(les) + ": writing"
1376                         })
1377                         if _, err = fd.Write(file.Payload); err != nil {
1378                                 state.Ctx.LogE("sp-file-write", lesp, err, func(les LEs) string {
1379                                         return logMsg(les) + ": writing"
1380                                 })
1381                                 state.closeFd(filePathPart)
1382                                 return nil, err
1383                         }
1384                         if hasherAndOffset != nil {
1385                                 if hasherAndOffset.offset == file.Offset {
1386                                         if _, err = hasherAndOffset.mth.Write(file.Payload); err != nil {
1387                                                 panic(err)
1388                                         }
1389                                         hasherAndOffset.offset += uint64(len(file.Payload))
1390                                 } else {
1391                                         state.Ctx.LogE(
1392                                                 "sp-file-offset-differs", lesp, errors.New("offset differs"),
1393                                                 func(les LEs) string {
1394                                                         return logMsg(les) + ": deleting hasher"
1395                                                 },
1396                                         )
1397                                         delete(state.fileHashers, filePath)
1398                                         hasherAndOffset = nil
1399                                 }
1400                         }
1401                         ourSize := int64(file.Offset + uint64(len(file.Payload)))
1402                         lesp[len(lesp)-2].V = ourSize
1403                         if state.Ctx.ShowPrgrs {
1404                                 state.progressBars[pktName] = struct{}{}
1405                                 Progress("Rx", lesp)
1406                         }
1407                         if fullsize != ourSize {
1408                                 continue
1409                         }
1410                         if state.Ctx.ShowPrgrs {
1411                                 delete(state.progressBars, pktName)
1412                         }
1413                         logMsg = func(les LEs) string {
1414                                 return fmt.Sprintf(
1415                                         "Got packet %s %d%% (%s / %s)",
1416                                         pktName, 100*ourSize/fullsize,
1417                                         humanize.IBytes(uint64(ourSize)),
1418                                         humanize.IBytes(uint64(fullsize)),
1419                                 )
1420                         }
1421                         err = fd.Sync()
1422                         if err != nil {
1423                                 state.Ctx.LogE("sp-file-sync", lesp, err, func(les LEs) string {
1424                                         return logMsg(les) + ": syncing"
1425                                 })
1426                                 state.closeFd(filePathPart)
1427                                 continue
1428                         }
1429                         if hasherAndOffset != nil {
1430                                 delete(state.fileHashers, filePath)
1431                                 if hasherAndOffset.mth.PrependSize == 0 {
1432                                         if bytes.Compare(hasherAndOffset.mth.Sum(nil), file.Hash[:]) != 0 {
1433                                                 state.Ctx.LogE(
1434                                                         "sp-file-bad-checksum", lesp,
1435                                                         errors.New("checksum mismatch"),
1436                                                         logMsg,
1437                                                 )
1438                                                 state.closeFd(filePathPart)
1439                                                 continue
1440                                         }
1441                                         if err = os.Rename(filePathPart, filePath); err != nil {
1442                                                 state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string {
1443                                                         return logMsg(les) + ": renaming"
1444                                                 })
1445                                                 state.closeFd(filePathPart)
1446                                                 continue
1447                                         }
1448                                         if err = DirSync(dirToSync); err != nil {
1449                                                 state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string {
1450                                                         return logMsg(les) + ": dirsyncing"
1451                                                 })
1452                                                 state.closeFd(filePathPart)
1453                                                 continue
1454                                         }
1455                                         state.Ctx.LogI("sp-file-done", lesp, func(les LEs) string {
1456                                                 return logMsg(les) + ": done"
1457                                         })
1458                                         state.wg.Add(1)
1459                                         go func() {
1460                                                 state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
1461                                                 state.wg.Done()
1462                                         }()
1463                                         state.Lock()
1464                                         delete(state.infosTheir, *file.Hash)
1465                                         state.Unlock()
1466                                         if !state.Ctx.HdrUsage {
1467                                                 continue
1468                                         }
1469                                         if _, err = fd.Seek(0, io.SeekStart); err != nil {
1470                                                 state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
1471                                                         return logMsg(les) + ": seeking"
1472                                                 })
1473                                                 state.closeFd(filePathPart)
1474                                                 continue
1475                                         }
1476                                         _, pktEncRaw, err := state.Ctx.HdrRead(fd)
1477                                         state.closeFd(filePathPart)
1478                                         if err != nil {
1479                                                 state.Ctx.LogE("sp-file-hdr-read", lesp, err, func(les LEs) string {
1480                                                         return logMsg(les) + ": HdrReading"
1481                                                 })
1482                                                 continue
1483                                         }
1484                                         state.Ctx.HdrWrite(pktEncRaw, filePath)
1485                                         continue
1486                                 }
1487                         }
1488                         state.closeFd(filePathPart)
1489                         if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil {
1490                                 state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string {
1491                                         return logMsg(les) + ": renaming"
1492                                 })
1493                                 continue
1494                         }
1495                         if err = DirSync(dirToSync); err != nil {
1496                                 state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string {
1497                                         return logMsg(les) + ": dirsyncing"
1498                                 })
1499                                 continue
1500                         }
1501                         state.Ctx.LogI("sp-file-downloaded", lesp, func(les LEs) string {
1502                                 return logMsg(les) + ": downloaded"
1503                         })
1504                         state.Lock()
1505                         delete(state.infosTheir, *file.Hash)
1506                         state.Unlock()
1507                         if hasherAndOffset != nil {
1508                                 go func() {
1509                                         spCheckerTasks <- SPCheckerTask{
1510                                                 nodeId: state.Node.Id,
1511                                                 hsh:    file.Hash,
1512                                                 mth:    hasherAndOffset.mth,
1513                                                 done:   state.payloads,
1514                                         }
1515                                 }()
1516                         }
1517
1518                 case SPTypeDone:
1519                         lesp := append(les, LE{"Type", "done"})
1520                         state.Ctx.LogD("sp-process-done-unmarshal", lesp, func(les LEs) string {
1521                                 return fmt.Sprintf(
1522                                         "SP with %s (nice %s): unmarshaling DONE",
1523                                         state.Node.Name, NicenessFmt(state.Nice),
1524                                 )
1525                         })
1526                         var done SPDone
1527                         if _, err = xdr.Unmarshal(r, &done); err != nil {
1528                                 state.Ctx.LogE("sp-process-done-unmarshal", lesp, err, func(les LEs) string {
1529                                         return fmt.Sprintf(
1530                                                 "SP with %s (nice %s): unmarshaling DONE",
1531                                                 state.Node.Name, NicenessFmt(state.Nice),
1532                                         )
1533                                 })
1534                                 return nil, err
1535                         }
1536                         pktName := Base32Codec.EncodeToString(done.Hash[:])
1537                         lesp = append(lesp, LE{"Pkt", pktName}, LE{"XX", string(TTx)})
1538                         logMsg := func(les LEs) string {
1539                                 return fmt.Sprintf(
1540                                         "SP with %s (nice %s): DONE: removing %s",
1541                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1542                                 )
1543                         }
1544                         state.Ctx.LogD("sp-done", lesp, logMsg)
1545                         pth := filepath.Join(
1546                                 state.Ctx.Spool,
1547                                 state.Node.Id.String(),
1548                                 string(TTx),
1549                                 pktName,
1550                         )
1551                         if err = os.Remove(pth); err == nil {
1552                                 state.Ctx.LogI("sp-done", lesp, func(les LEs) string {
1553                                         return fmt.Sprintf("Packet %s is sent", pktName)
1554                                 })
1555                                 if state.Ctx.HdrUsage {
1556                                         os.Remove(pth + HdrSuffix)
1557                                 }
1558                         } else {
1559                                 state.Ctx.LogE("sp-done", lesp, err, logMsg)
1560                         }
1561
1562                 case SPTypeFreq:
1563                         lesp := append(les, LE{"Type", "freq"})
1564                         state.Ctx.LogD("sp-process-freq", lesp, func(les LEs) string {
1565                                 return fmt.Sprintf(
1566                                         "SP with %s (nice %s): unmarshaling FREQ",
1567                                         state.Node.Name, NicenessFmt(state.Nice),
1568                                 )
1569                         })
1570                         var freq SPFreq
1571                         if _, err = xdr.Unmarshal(r, &freq); err != nil {
1572                                 state.Ctx.LogE("sp-process-freq", lesp, err, func(les LEs) string {
1573                                         return fmt.Sprintf(
1574                                                 "SP with %s (nice %s): unmarshaling FREQ",
1575                                                 state.Node.Name, NicenessFmt(state.Nice),
1576                                         )
1577                                 })
1578                                 return nil, err
1579                         }
1580                         pktName := Base32Codec.EncodeToString(freq.Hash[:])
1581                         lesp = append(lesp, LE{"Pkt", pktName}, LE{"Offset", freq.Offset})
1582                         state.Ctx.LogD("sp-process-freq-queueing", lesp, func(les LEs) string {
1583                                 return fmt.Sprintf(
1584                                         "SP with %s (nice %s): FREQ %s: queuing",
1585                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1586                                 )
1587                         })
1588                         nice, exists := state.infosOurSeen[*freq.Hash]
1589                         if exists {
1590                                 if state.onlyPkts == nil || !state.onlyPkts[*freq.Hash] {
1591                                         state.Lock()
1592                                         insertIdx := 0
1593                                         var freqWithNice *FreqWithNice
1594                                         for insertIdx, freqWithNice = range state.queueTheir {
1595                                                 if freqWithNice.nice > nice {
1596                                                         break
1597                                                 }
1598                                         }
1599                                         state.queueTheir = append(state.queueTheir, nil)
1600                                         copy(state.queueTheir[insertIdx+1:], state.queueTheir[insertIdx:])
1601                                         state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice}
1602                                         state.Unlock()
1603                                 } else {
1604                                         state.Ctx.LogD("sp-process-freq-skip", lesp, func(les LEs) string {
1605                                                 return fmt.Sprintf(
1606                                                         "SP with %s (nice %s): FREQ %s: skipping",
1607                                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1608                                                 )
1609                                         })
1610                                 }
1611                         } else {
1612                                 state.Ctx.LogD("sp-process-freq-unknown", lesp, func(les LEs) string {
1613                                         return fmt.Sprintf(
1614                                                 "SP with %s (nice %s): FREQ %s: unknown",
1615                                                 state.Node.Name, NicenessFmt(state.Nice), pktName,
1616                                         )
1617                                 })
1618                         }
1619
1620                 default:
1621                         state.Ctx.LogE(
1622                                 "sp-process-type-unknown",
1623                                 append(les, LE{"Type", head.Type}),
1624                                 errors.New("unknown type"),
1625                                 func(les LEs) string {
1626                                         return fmt.Sprintf(
1627                                                 "SP with %s (nice %s): %d",
1628                                                 state.Node.Name, NicenessFmt(state.Nice), head.Type,
1629                                         )
1630                                 },
1631                         )
1632                         return nil, BadPktType
1633                 }
1634         }
1635
1636         if infosGot {
1637                 var pkts int
1638                 var size uint64
1639                 state.RLock()
1640                 for _, info := range state.infosTheir {
1641                         pkts++
1642                         size += info.Size
1643                 }
1644                 state.RUnlock()
1645                 state.Ctx.LogI("sp-infos-rx", LEs{
1646                         {"XX", string(TRx)},
1647                         {"Node", state.Node.Id},
1648                         {"Pkts", pkts},
1649                         {"Size", int64(size)},
1650                 }, func(les LEs) string {
1651                         return fmt.Sprintf(
1652                                 "%s has got for us: %d packets, %s",
1653                                 state.Node.Name, pkts, humanize.IBytes(size),
1654                         )
1655                 })
1656         }
1657         return payloadsSplit(replies), nil
1658 }
1659
1660 func SPChecker(ctx *Ctx) {
1661         for t := range spCheckerTasks {
1662                 pktName := Base32Codec.EncodeToString(t.hsh[:])
1663                 les := LEs{
1664                         {"XX", string(TRx)},
1665                         {"Node", t.nodeId},
1666                         {"Pkt", pktName},
1667                 }
1668                 SPCheckerWg.Add(1)
1669                 ctx.LogD("sp-checker", les, func(les LEs) string {
1670                         return fmt.Sprintf("Checksumming %s/rx/%s", ctx.NodeName(t.nodeId), pktName)
1671                 })
1672                 size, err := ctx.CheckNoCK(t.nodeId, t.hsh, t.mth)
1673                 les = append(les, LE{"Size", size})
1674                 if err != nil {
1675                         ctx.LogE("sp-checker", les, err, func(les LEs) string {
1676                                 return fmt.Sprintf(
1677                                         "Checksumming %s/rx/%s (%s)", ctx.NodeName(t.nodeId), pktName,
1678                                         humanize.IBytes(uint64(size)),
1679                                 )
1680                         })
1681                         SPCheckerWg.Done()
1682                         continue
1683                 }
1684                 ctx.LogI("sp-checker-done", les, func(les LEs) string {
1685                         return fmt.Sprintf(
1686                                 "Packet %s is retreived (%s)",
1687                                 pktName, humanize.IBytes(uint64(size)),
1688                         )
1689                 })
1690                 SPCheckerWg.Done()
1691                 go func(t SPCheckerTask) {
1692                         defer func() { recover() }()
1693                         t.done <- MarshalSP(SPTypeDone, SPDone{t.hsh})
1694                 }(t)
1695         }
1696 }