]> Cypherpunks.ru repositories - nncp.git/blob - src/sp.go
Merge branch 'develop'
[nncp.git] / src / sp.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bytes"
22         "crypto/subtle"
23         "errors"
24         "fmt"
25         "hash"
26         "io"
27         "os"
28         "path/filepath"
29         "sort"
30         "sync"
31         "time"
32
33         xdr "github.com/davecgh/go-xdr/xdr2"
34         "github.com/dustin/go-humanize"
35         "github.com/flynn/noise"
36         "golang.org/x/crypto/blake2b"
37 )
38
39 const (
40         MaxSPSize      = 1<<16 - 256
41         PartSuffix     = ".part"
42         SPHeadOverhead = 4
43 )
44
45 type SPCheckerQueues struct {
46         appeared chan *[32]byte
47         checked  chan *[32]byte
48 }
49
50 var (
51         MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1}
52
53         SPInfoOverhead    int
54         SPFreqOverhead    int
55         SPFileOverhead    int
56         SPHaltMarshalized []byte
57         SPPingMarshalized []byte
58
59         NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite(
60                 noise.DH25519,
61                 noise.CipherChaChaPoly,
62                 noise.HashBLAKE2b,
63         )
64
65         DefaultDeadline = 10 * time.Second
66         PingTimeout     = time.Minute
67
68         spCheckers = make(map[NodeId]*SPCheckerQueues)
69 )
70
71 type FdAndFullSize struct {
72         fd       *os.File
73         fullSize int64
74 }
75
76 type HasherAndOffset struct {
77         h      hash.Hash
78         offset uint64
79 }
80
81 type SPType uint8
82
83 const (
84         SPTypeInfo SPType = iota
85         SPTypeFreq SPType = iota
86         SPTypeFile SPType = iota
87         SPTypeDone SPType = iota
88         SPTypeHalt SPType = iota
89         SPTypePing SPType = iota
90 )
91
92 type SPHead struct {
93         Type SPType
94 }
95
96 type SPInfo struct {
97         Nice uint8
98         Size uint64
99         Hash *[32]byte
100 }
101
102 type SPFreq struct {
103         Hash   *[32]byte
104         Offset uint64
105 }
106
107 type SPFile struct {
108         Hash    *[32]byte
109         Offset  uint64
110         Payload []byte
111 }
112
113 type SPDone struct {
114         Hash *[32]byte
115 }
116
117 type SPRaw struct {
118         Magic   [8]byte
119         Payload []byte
120 }
121
122 type FreqWithNice struct {
123         freq *SPFreq
124         nice uint8
125 }
126
127 type ConnDeadlined interface {
128         io.ReadWriteCloser
129         SetReadDeadline(t time.Time) error
130         SetWriteDeadline(t time.Time) error
131 }
132
133 func init() {
134         var buf bytes.Buffer
135         spHead := SPHead{Type: SPTypeHalt}
136         if _, err := xdr.Marshal(&buf, spHead); err != nil {
137                 panic(err)
138         }
139         SPHaltMarshalized = make([]byte, SPHeadOverhead)
140         copy(SPHaltMarshalized, buf.Bytes())
141         buf.Reset()
142
143         spHead = SPHead{Type: SPTypePing}
144         if _, err := xdr.Marshal(&buf, spHead); err != nil {
145                 panic(err)
146         }
147         SPPingMarshalized = make([]byte, SPHeadOverhead)
148         copy(SPPingMarshalized, buf.Bytes())
149         buf.Reset()
150
151         spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([32]byte)}
152         if _, err := xdr.Marshal(&buf, spInfo); err != nil {
153                 panic(err)
154         }
155         SPInfoOverhead = buf.Len()
156         buf.Reset()
157
158         spFreq := SPFreq{Hash: new([32]byte), Offset: 123}
159         if _, err := xdr.Marshal(&buf, spFreq); err != nil {
160                 panic(err)
161         }
162         SPFreqOverhead = buf.Len()
163         buf.Reset()
164
165         spFile := SPFile{Hash: new([32]byte), Offset: 123}
166         if _, err := xdr.Marshal(&buf, spFile); err != nil {
167                 panic(err)
168         }
169         SPFileOverhead = buf.Len()
170 }
171
172 func MarshalSP(typ SPType, sp interface{}) []byte {
173         var buf bytes.Buffer
174         if _, err := xdr.Marshal(&buf, SPHead{typ}); err != nil {
175                 panic(err)
176         }
177         if _, err := xdr.Marshal(&buf, sp); err != nil {
178                 panic(err)
179         }
180         return buf.Bytes()
181 }
182
183 func payloadsSplit(payloads [][]byte) [][]byte {
184         var outbounds [][]byte
185         outbound := make([]byte, 0, MaxSPSize)
186         for i, payload := range payloads {
187                 outbound = append(outbound, payload...)
188                 if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxSPSize {
189                         outbounds = append(outbounds, outbound)
190                         outbound = make([]byte, 0, MaxSPSize)
191                 }
192         }
193         if len(outbound) > 0 {
194                 outbounds = append(outbounds, outbound)
195         }
196         return outbounds
197 }
198
199 type SPState struct {
200         Ctx            *Ctx
201         Node           *Node
202         Nice           uint8
203         NoCK           bool
204         onlineDeadline time.Duration
205         maxOnlineTime  time.Duration
206         hs             *noise.HandshakeState
207         csOur          *noise.CipherState
208         csTheir        *noise.CipherState
209         payloads       chan []byte
210         pings          chan struct{}
211         infosTheir     map[[32]byte]*SPInfo
212         infosOurSeen   map[[32]byte]uint8
213         queueTheir     []*FreqWithNice
214         wg             sync.WaitGroup
215         RxBytes        int64
216         RxLastSeen     time.Time
217         RxLastNonPing  time.Time
218         TxBytes        int64
219         TxLastSeen     time.Time
220         TxLastNonPing  time.Time
221         started        time.Time
222         mustFinishAt   time.Time
223         Duration       time.Duration
224         RxSpeed        int64
225         TxSpeed        int64
226         rxLock         *os.File
227         txLock         *os.File
228         xxOnly         TRxTx
229         rxRate         int
230         txRate         int
231         isDead         chan struct{}
232         listOnly       bool
233         onlyPkts       map[[32]byte]bool
234         writeSPBuf     bytes.Buffer
235         fds            map[string]FdAndFullSize
236         fdsLock        sync.RWMutex
237         fileHashers    map[string]*HasherAndOffset
238         checkerQueues  SPCheckerQueues
239         progressBars   map[string]struct{}
240         sync.RWMutex
241 }
242
243 func (state *SPState) SetDead() {
244         state.Lock()
245         defer state.Unlock()
246         select {
247         case <-state.isDead:
248                 // Already closed channel, dead
249                 return
250         default:
251         }
252         close(state.isDead)
253         go func() {
254                 for range state.payloads {
255                 }
256         }()
257         go func() {
258                 for range state.pings {
259                 }
260         }()
261 }
262
263 func (state *SPState) NotAlive() bool {
264         select {
265         case <-state.isDead:
266                 return true
267         default:
268         }
269         return false
270 }
271
272 func (state *SPState) dirUnlock() {
273         state.Ctx.UnlockDir(state.rxLock)
274         state.Ctx.UnlockDir(state.txLock)
275 }
276
277 func SPChecker(ctx *Ctx, nodeId *NodeId, appeared, checked chan *[32]byte) {
278         for hshValue := range appeared {
279                 pktName := Base32Codec.EncodeToString(hshValue[:])
280                 les := LEs{
281                         {"XX", string(TRx)},
282                         {"Node", nodeId},
283                         {"Pkt", pktName},
284                 }
285                 ctx.LogD("sp-checker", les, func(les LEs) string {
286                         return fmt.Sprintf("Checksumming %s/rx/%s", ctx.NodeName(nodeId), pktName)
287                 })
288                 size, err := ctx.CheckNoCK(nodeId, hshValue)
289                 les = append(les, LE{"Size", size})
290                 if err != nil {
291                         ctx.LogE("sp-checker", les, err, func(les LEs) string {
292                                 return fmt.Sprintf(
293                                         "Checksumming %s/rx/%s (%s)", ctx.NodeName(nodeId), pktName,
294                                         humanize.IBytes(uint64(size)),
295                                 )
296                         })
297                         continue
298                 }
299                 ctx.LogI("sp-checker-done", les, func(les LEs) string {
300                         return fmt.Sprintf(
301                                 "Packet %s is retreived (%s)",
302                                 pktName, humanize.IBytes(uint64(size)),
303                         )
304                 })
305                 go func(hsh *[32]byte) { checked <- hsh }(hshValue)
306         }
307 }
308
309 func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error {
310         state.writeSPBuf.Reset()
311         n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{
312                 Magic:   MagicNNCPLv1,
313                 Payload: payload,
314         })
315         if err != nil {
316                 return err
317         }
318         if n, err = dst.Write(state.writeSPBuf.Bytes()); err == nil {
319                 state.TxLastSeen = time.Now()
320                 state.TxBytes += int64(n)
321                 if !ping {
322                         state.TxLastNonPing = state.TxLastSeen
323                 }
324         }
325         return err
326 }
327
328 func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
329         var sp SPRaw
330         n, err := xdr.UnmarshalLimited(src, &sp, 1<<17)
331         if err != nil {
332                 ue := err.(*xdr.UnmarshalError)
333                 if ue.Err == io.EOF {
334                         return nil, ue.Err
335                 }
336                 return nil, err
337         }
338         state.RxLastSeen = time.Now()
339         state.RxBytes += int64(n)
340         if sp.Magic != MagicNNCPLv1 {
341                 return nil, BadMagic
342         }
343         return sp.Payload, nil
344 }
345
346 func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [][]byte {
347         var infos []*SPInfo
348         var totalSize int64
349         for job := range ctx.Jobs(nodeId, TTx) {
350                 if job.PktEnc.Nice > nice {
351                         continue
352                 }
353                 if _, known := (*seen)[*job.HshValue]; known {
354                         continue
355                 }
356                 totalSize += job.Size
357                 infos = append(infos, &SPInfo{
358                         Nice: job.PktEnc.Nice,
359                         Size: uint64(job.Size),
360                         Hash: job.HshValue,
361                 })
362                 (*seen)[*job.HshValue] = job.PktEnc.Nice
363         }
364         sort.Sort(ByNice(infos))
365         var payloads [][]byte
366         for _, info := range infos {
367                 payloads = append(payloads, MarshalSP(SPTypeInfo, info))
368                 pktName := Base32Codec.EncodeToString(info.Hash[:])
369                 ctx.LogD("sp-info-our", LEs{
370                         {"Node", nodeId},
371                         {"Name", pktName},
372                         {"Size", info.Size},
373                 }, func(les LEs) string {
374                         return fmt.Sprintf(
375                                 "Our info: %s/tx/%s (%s)",
376                                 ctx.NodeName(nodeId),
377                                 pktName,
378                                 humanize.IBytes(info.Size),
379                         )
380                 })
381         }
382         if totalSize > 0 {
383                 ctx.LogI("sp-infos-tx", LEs{
384                         {"XX", string(TTx)},
385                         {"Node", nodeId},
386                         {"Pkts", len(payloads)},
387                         {"Size", totalSize},
388                 }, func(les LEs) string {
389                         return fmt.Sprintf(
390                                 "We have got for %s: %d packets, %s",
391                                 ctx.NodeName(nodeId),
392                                 len(payloads),
393                                 humanize.IBytes(uint64(totalSize)),
394                         )
395                 })
396         }
397         return payloadsSplit(payloads)
398 }
399
400 func (state *SPState) StartI(conn ConnDeadlined) error {
401         nodeId := state.Node.Id
402         err := state.Ctx.ensureRxDir(nodeId)
403         if err != nil {
404                 return err
405         }
406         var rxLock *os.File
407         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TRx) {
408                 rxLock, err = state.Ctx.LockDir(nodeId, string(TRx))
409                 if err != nil {
410                         return err
411                 }
412         }
413         var txLock *os.File
414         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
415                 txLock, err = state.Ctx.LockDir(nodeId, string(TTx))
416                 if err != nil {
417                         return err
418                 }
419         }
420         started := time.Now()
421         conf := noise.Config{
422                 CipherSuite: NoiseCipherSuite,
423                 Pattern:     noise.HandshakeIK,
424                 Initiator:   true,
425                 StaticKeypair: noise.DHKey{
426                         Private: state.Ctx.Self.NoisePrv[:],
427                         Public:  state.Ctx.Self.NoisePub[:],
428                 },
429                 PeerStatic: state.Node.NoisePub[:],
430         }
431         hs, err := noise.NewHandshakeState(conf)
432         if err != nil {
433                 return err
434         }
435         state.hs = hs
436         state.payloads = make(chan []byte)
437         state.pings = make(chan struct{})
438         state.infosTheir = make(map[[32]byte]*SPInfo)
439         state.infosOurSeen = make(map[[32]byte]uint8)
440         state.progressBars = make(map[string]struct{})
441         state.started = started
442         state.rxLock = rxLock
443         state.txLock = txLock
444
445         var infosPayloads [][]byte
446         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
447                 infosPayloads = state.Ctx.infosOur(nodeId, state.Nice, &state.infosOurSeen)
448         }
449         var firstPayload []byte
450         if len(infosPayloads) > 0 {
451                 firstPayload = infosPayloads[0]
452         }
453         // Pad first payload, to hide actual number of existing files
454         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
455                 firstPayload = append(firstPayload, SPHaltMarshalized...)
456         }
457
458         var buf []byte
459         var payload []byte
460         buf, _, _, err = state.hs.WriteMessage(nil, firstPayload)
461         if err != nil {
462                 state.dirUnlock()
463                 return err
464         }
465         les := LEs{{"Node", nodeId}, {"Nice", int(state.Nice)}}
466         state.Ctx.LogD("sp-startI", les, func(les LEs) string {
467                 return fmt.Sprintf(
468                         "SP with %s (nice %s): sending first message",
469                         state.Node.Name,
470                         NicenessFmt(state.Nice),
471                 )
472         })
473         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
474         if err = state.WriteSP(conn, buf, false); err != nil {
475                 state.Ctx.LogE("sp-startI", les, err, func(les LEs) string {
476                         return fmt.Sprintf(
477                                 "SP with %s (nice %s): writing",
478                                 state.Node.Name,
479                                 NicenessFmt(state.Nice),
480                         )
481                 })
482                 state.dirUnlock()
483                 return err
484         }
485         state.Ctx.LogD("sp-startI-wait", les, func(les LEs) string {
486                 return fmt.Sprintf(
487                         "SP with %s (nice %s): waiting for first message",
488                         state.Node.Name,
489                         NicenessFmt(state.Nice),
490                 )
491         })
492         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
493         if buf, err = state.ReadSP(conn); err != nil {
494                 state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string {
495                         return fmt.Sprintf(
496                                 "SP with %s (nice %s): reading",
497                                 state.Node.Name,
498                                 NicenessFmt(state.Nice),
499                         )
500                 })
501                 state.dirUnlock()
502                 return err
503         }
504         payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
505         if err != nil {
506                 state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string {
507                         return fmt.Sprintf(
508                                 "SP with %s (nice %s): reading Noise message",
509                                 state.Node.Name,
510                                 NicenessFmt(state.Nice),
511                         )
512                 })
513                 state.dirUnlock()
514                 return err
515         }
516         state.Ctx.LogD("sp-startI-workers", les, func(les LEs) string {
517                 return fmt.Sprintf(
518                         "SP with %s (nice %s): starting workers",
519                         state.Node.Name,
520                         NicenessFmt(state.Nice),
521                 )
522         })
523         err = state.StartWorkers(conn, infosPayloads, payload)
524         if err != nil {
525                 state.Ctx.LogE("sp-startI-workers", les, err, func(les LEs) string {
526                         return fmt.Sprintf(
527                                 "SP with %s (nice %s): starting workers",
528                                 state.Node.Name,
529                                 NicenessFmt(state.Nice),
530                         )
531                 })
532                 state.dirUnlock()
533         }
534         return err
535 }
536
537 func (state *SPState) StartR(conn ConnDeadlined) error {
538         started := time.Now()
539         conf := noise.Config{
540                 CipherSuite: NoiseCipherSuite,
541                 Pattern:     noise.HandshakeIK,
542                 Initiator:   false,
543                 StaticKeypair: noise.DHKey{
544                         Private: state.Ctx.Self.NoisePrv[:],
545                         Public:  state.Ctx.Self.NoisePub[:],
546                 },
547         }
548         hs, err := noise.NewHandshakeState(conf)
549         if err != nil {
550                 return err
551         }
552         xxOnly := TRxTx("")
553         state.hs = hs
554         state.payloads = make(chan []byte)
555         state.pings = make(chan struct{})
556         state.infosOurSeen = make(map[[32]byte]uint8)
557         state.infosTheir = make(map[[32]byte]*SPInfo)
558         state.progressBars = make(map[string]struct{})
559         state.started = started
560         state.xxOnly = xxOnly
561
562         var buf []byte
563         var payload []byte
564         logMsg := func(les LEs) string {
565                 return fmt.Sprintf(
566                         "SP nice %s: waiting for first message",
567                         NicenessFmt(state.Nice),
568                 )
569         }
570         les := LEs{{"Nice", int(state.Nice)}}
571         state.Ctx.LogD("sp-startR", les, logMsg)
572         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
573         if buf, err = state.ReadSP(conn); err != nil {
574                 state.Ctx.LogE("sp-startR-read", les, err, logMsg)
575                 return err
576         }
577         if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
578                 state.Ctx.LogE("sp-startR-read", les, err, logMsg)
579                 return err
580         }
581
582         var node *Node
583         for _, n := range state.Ctx.Neigh {
584                 if subtle.ConstantTimeCompare(state.hs.PeerStatic(), n.NoisePub[:]) == 1 {
585                         node = n
586                         break
587                 }
588         }
589         if node == nil {
590                 peerId := Base32Codec.EncodeToString(state.hs.PeerStatic())
591                 err = errors.New("unknown peer: " + peerId)
592                 state.Ctx.LogE("sp-startR-unknown", append(les, LE{"Peer", peerId}), err, logMsg)
593                 return err
594         }
595         state.Node = node
596         state.rxRate = node.RxRate
597         state.txRate = node.TxRate
598         state.onlineDeadline = node.OnlineDeadline
599         state.maxOnlineTime = node.MaxOnlineTime
600         les = LEs{{"Node", node.Id}, {"Nice", int(state.Nice)}}
601
602         if err = state.Ctx.ensureRxDir(node.Id); err != nil {
603                 return err
604         }
605         var rxLock *os.File
606         if xxOnly == "" || xxOnly == TRx {
607                 rxLock, err = state.Ctx.LockDir(node.Id, string(TRx))
608                 if err != nil {
609                         return err
610                 }
611         }
612         state.rxLock = rxLock
613         var txLock *os.File
614         if xxOnly == "" || xxOnly == TTx {
615                 txLock, err = state.Ctx.LockDir(node.Id, string(TTx))
616                 if err != nil {
617                         return err
618                 }
619         }
620         state.txLock = txLock
621
622         var infosPayloads [][]byte
623         if xxOnly == "" || xxOnly == TTx {
624                 infosPayloads = state.Ctx.infosOur(node.Id, state.Nice, &state.infosOurSeen)
625         }
626         var firstPayload []byte
627         if len(infosPayloads) > 0 {
628                 firstPayload = infosPayloads[0]
629         }
630         // Pad first payload, to hide actual number of existing files
631         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
632                 firstPayload = append(firstPayload, SPHaltMarshalized...)
633         }
634
635         state.Ctx.LogD("sp-startR-write", les, func(les LEs) string {
636                 return fmt.Sprintf(
637                         "SP with %s (nice %s): sending first message",
638                         node.Name, NicenessFmt(state.Nice),
639                 )
640         })
641         buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload)
642         if err != nil {
643                 state.dirUnlock()
644                 return err
645         }
646         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
647         if err = state.WriteSP(conn, buf, false); err != nil {
648                 state.Ctx.LogE("sp-startR-write", les, err, func(les LEs) string {
649                         return fmt.Sprintf(
650                                 "SP with %s (nice %s): writing",
651                                 node.Name, NicenessFmt(state.Nice),
652                         )
653                 })
654                 state.dirUnlock()
655                 return err
656         }
657         state.Ctx.LogD("sp-startR-workers", les, func(les LEs) string {
658                 return fmt.Sprintf(
659                         "SP with %s (nice %s): starting workers",
660                         node.Name, NicenessFmt(state.Nice),
661                 )
662         })
663         err = state.StartWorkers(conn, infosPayloads, payload)
664         if err != nil {
665                 state.dirUnlock()
666         }
667         return err
668 }
669
670 func (state *SPState) closeFd(pth string) {
671         state.fdsLock.Lock()
672         if s, exists := state.fds[pth]; exists {
673                 delete(state.fds, pth)
674                 s.fd.Close()
675         }
676         state.fdsLock.Unlock()
677 }
678
679 func (state *SPState) StartWorkers(
680         conn ConnDeadlined,
681         infosPayloads [][]byte,
682         payload []byte,
683 ) error {
684         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
685         state.fds = make(map[string]FdAndFullSize)
686         state.fileHashers = make(map[string]*HasherAndOffset)
687         state.isDead = make(chan struct{})
688         if state.maxOnlineTime > 0 {
689                 state.mustFinishAt = state.started.Add(state.maxOnlineTime)
690         }
691
692         // Checker
693         if !state.NoCK {
694                 queues := spCheckers[*state.Node.Id]
695                 if queues == nil {
696                         queues = &SPCheckerQueues{
697                                 appeared: make(chan *[32]byte),
698                                 checked:  make(chan *[32]byte),
699                         }
700                         spCheckers[*state.Node.Id] = queues
701                         go SPChecker(state.Ctx, state.Node.Id, queues.appeared, queues.checked)
702                 }
703                 state.checkerQueues = *queues
704                 go func() {
705                         for job := range state.Ctx.JobsNoCK(state.Node.Id) {
706                                 if job.PktEnc.Nice <= state.Nice {
707                                         state.checkerQueues.appeared <- job.HshValue
708                                 }
709                         }
710                 }()
711                 state.wg.Add(1)
712                 go func() {
713                         defer state.wg.Done()
714                         for {
715                                 select {
716                                 case <-state.isDead:
717                                         return
718                                 case hsh := <-state.checkerQueues.checked:
719                                         state.payloads <- MarshalSP(SPTypeDone, SPDone{hsh})
720                                 }
721                         }
722                 }()
723         }
724
725         // Remaining handshake payload sending
726         if len(infosPayloads) > 1 {
727                 state.wg.Add(1)
728                 go func() {
729                         for _, payload := range infosPayloads[1:] {
730                                 state.Ctx.LogD(
731                                         "sp-queue-remaining",
732                                         append(les, LE{"Size", int64(len(payload))}),
733                                         func(les LEs) string {
734                                                 return fmt.Sprintf(
735                                                         "SP with %s (nice %s): queuing remaining payload (%s)",
736                                                         state.Node.Name, NicenessFmt(state.Nice),
737                                                         humanize.IBytes(uint64(len(payload))),
738                                                 )
739                                         },
740                                 )
741                                 state.payloads <- payload
742                         }
743                         state.wg.Done()
744                 }()
745         }
746
747         // Processing of first payload and queueing its responses
748         logMsg := func(les LEs) string {
749                 return fmt.Sprintf(
750                         "SP with %s (nice %s): processing first payload (%s)",
751                         state.Node.Name, NicenessFmt(state.Nice),
752                         humanize.IBytes(uint64(len(payload))),
753                 )
754         }
755         state.Ctx.LogD("sp-process", append(les, LE{"Size", int64(len(payload))}), logMsg)
756         replies, err := state.ProcessSP(payload)
757         if err != nil {
758                 state.Ctx.LogE("sp-process", les, err, logMsg)
759                 return err
760         }
761         state.wg.Add(1)
762         go func() {
763                 for _, reply := range replies {
764                         state.Ctx.LogD(
765                                 "sp-queue-reply",
766                                 append(les, LE{"Size", int64(len(reply))}),
767                                 func(les LEs) string {
768                                         return fmt.Sprintf(
769                                                 "SP with %s (nice %s): queuing reply (%s)",
770                                                 state.Node.Name, NicenessFmt(state.Nice),
771                                                 humanize.IBytes(uint64(len(payload))),
772                                         )
773                                 },
774                         )
775                         state.payloads <- reply
776                 }
777                 state.wg.Done()
778         }()
779
780         // Periodic jobs
781         state.wg.Add(1)
782         go func() {
783                 deadlineTicker := time.NewTicker(time.Second)
784                 pingTicker := time.NewTicker(PingTimeout)
785                 for {
786                         select {
787                         case <-state.isDead:
788                                 state.wg.Done()
789                                 deadlineTicker.Stop()
790                                 pingTicker.Stop()
791                                 return
792                         case now := <-deadlineTicker.C:
793                                 if now.Sub(state.RxLastNonPing) >= state.onlineDeadline &&
794                                         now.Sub(state.TxLastNonPing) >= state.onlineDeadline {
795                                         goto Deadlined
796                                 }
797                                 if state.maxOnlineTime > 0 && state.mustFinishAt.Before(now) {
798                                         goto Deadlined
799                                 }
800                                 if now.Sub(state.RxLastSeen) >= 2*PingTimeout {
801                                         goto Deadlined
802                                 }
803                                 break
804                         Deadlined:
805                                 state.SetDead()
806                                 conn.Close() // #nosec G104
807                         case now := <-pingTicker.C:
808                                 if now.After(state.TxLastSeen.Add(PingTimeout)) {
809                                         state.wg.Add(1)
810                                         go func() {
811                                                 state.pings <- struct{}{}
812                                                 state.wg.Done()
813                                         }()
814                                 }
815                         }
816                 }
817         }()
818
819         // Spool checker and INFOs sender of appearing files
820         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
821                 state.wg.Add(1)
822                 go func() {
823                         ticker := time.NewTicker(time.Second)
824                         for {
825                                 select {
826                                 case <-state.isDead:
827                                         state.wg.Done()
828                                         ticker.Stop()
829                                         return
830                                 case <-ticker.C:
831                                         for _, payload := range state.Ctx.infosOur(
832                                                 state.Node.Id,
833                                                 state.Nice,
834                                                 &state.infosOurSeen,
835                                         ) {
836                                                 state.Ctx.LogD(
837                                                         "sp-queue-info",
838                                                         append(les, LE{"Size", int64(len(payload))}),
839                                                         func(les LEs) string {
840                                                                 return fmt.Sprintf(
841                                                                         "SP with %s (nice %s): queuing new info (%s)",
842                                                                         state.Node.Name, NicenessFmt(state.Nice),
843                                                                         humanize.IBytes(uint64(len(payload))),
844                                                                 )
845                                                         },
846                                                 )
847                                                 state.payloads <- payload
848                                         }
849                                 }
850                         }
851                 }()
852         }
853
854         // Sender
855         state.wg.Add(1)
856         go func() {
857                 defer conn.Close()
858                 defer state.SetDead()
859                 defer state.wg.Done()
860                 for {
861                         if state.NotAlive() {
862                                 return
863                         }
864                         var payload []byte
865                         var ping bool
866                         select {
867                         case <-state.pings:
868                                 state.Ctx.LogD("sp-got-ping", les, func(les LEs) string {
869                                         return fmt.Sprintf(
870                                                 "SP with %s (nice %s): got ping",
871                                                 state.Node.Name, NicenessFmt(state.Nice),
872                                         )
873                                 })
874                                 payload = SPPingMarshalized
875                                 ping = true
876                         case payload = <-state.payloads:
877                                 state.Ctx.LogD(
878                                         "sp-got-payload",
879                                         append(les, LE{"Size", int64(len(payload))}),
880                                         func(les LEs) string {
881                                                 return fmt.Sprintf(
882                                                         "SP with %s (nice %s): got payload (%s)",
883                                                         state.Node.Name, NicenessFmt(state.Nice),
884                                                         humanize.IBytes(uint64(len(payload))),
885                                                 )
886                                         },
887                                 )
888                         default:
889                                 state.RLock()
890                                 if len(state.queueTheir) == 0 {
891                                         state.RUnlock()
892                                         time.Sleep(100 * time.Millisecond)
893                                         continue
894                                 }
895                                 freq := state.queueTheir[0].freq
896                                 state.RUnlock()
897                                 if state.txRate > 0 {
898                                         time.Sleep(time.Second / time.Duration(state.txRate))
899                                 }
900                                 pktName := Base32Codec.EncodeToString(freq.Hash[:])
901                                 lesp := append(
902                                         les,
903                                         LE{"XX", string(TTx)},
904                                         LE{"Pkt", pktName},
905                                         LE{"Size", int64(freq.Offset)},
906                                 )
907                                 logMsg := func(les LEs) string {
908                                         return fmt.Sprintf(
909                                                 "SP with %s (nice %s): tx/%s (%s)",
910                                                 state.Node.Name, NicenessFmt(state.Nice),
911                                                 pktName,
912                                                 humanize.IBytes(freq.Offset),
913                                         )
914                                 }
915                                 state.Ctx.LogD("sp-queue", lesp, func(les LEs) string {
916                                         return logMsg(les) + ": queueing"
917                                 })
918                                 pth := filepath.Join(
919                                         state.Ctx.Spool,
920                                         state.Node.Id.String(),
921                                         string(TTx),
922                                         Base32Codec.EncodeToString(freq.Hash[:]),
923                                 )
924                                 state.fdsLock.RLock()
925                                 fdAndFullSize, exists := state.fds[pth]
926                                 state.fdsLock.RUnlock()
927                                 if !exists {
928                                         fd, err := os.Open(pth)
929                                         if err != nil {
930                                                 state.Ctx.LogE("sp-queue-open", lesp, err, func(les LEs) string {
931                                                         return logMsg(les) + ": opening"
932                                                 })
933                                                 return
934                                         }
935                                         fi, err := fd.Stat()
936                                         if err != nil {
937                                                 state.Ctx.LogE("sp-queue-stat", lesp, err, func(les LEs) string {
938                                                         return logMsg(les) + ": stating"
939                                                 })
940                                                 return
941                                         }
942                                         fdAndFullSize = FdAndFullSize{fd: fd, fullSize: fi.Size()}
943                                         state.fdsLock.Lock()
944                                         state.fds[pth] = fdAndFullSize
945                                         state.fdsLock.Unlock()
946                                 }
947                                 fd := fdAndFullSize.fd
948                                 fullSize := fdAndFullSize.fullSize
949                                 var buf []byte
950                                 if freq.Offset < uint64(fullSize) {
951                                         state.Ctx.LogD("sp-file-seek", lesp, func(les LEs) string {
952                                                 return logMsg(les) + ": seeking"
953                                         })
954                                         if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil {
955                                                 state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
956                                                         return logMsg(les) + ": seeking"
957                                                 })
958                                                 return
959                                         }
960                                         buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
961                                         n, err := fd.Read(buf)
962                                         if err != nil {
963                                                 state.Ctx.LogE("sp-file-read", lesp, err, func(les LEs) string {
964                                                         return logMsg(les) + ": reading"
965                                                 })
966                                                 return
967                                         }
968                                         buf = buf[:n]
969                                         lesp = append(
970                                                 les,
971                                                 LE{"XX", string(TTx)},
972                                                 LE{"Pkt", pktName},
973                                                 LE{"Size", int64(n)},
974                                         )
975                                         state.Ctx.LogD("sp-file-read", lesp, func(les LEs) string {
976                                                 return fmt.Sprintf(
977                                                         "%s: read %s",
978                                                         logMsg(les), humanize.IBytes(uint64(n)),
979                                                 )
980                                         })
981                                 }
982                                 state.closeFd(pth)
983                                 payload = MarshalSP(SPTypeFile, SPFile{
984                                         Hash:    freq.Hash,
985                                         Offset:  freq.Offset,
986                                         Payload: buf,
987                                 })
988                                 ourSize := freq.Offset + uint64(len(buf))
989                                 lesp = append(
990                                         les,
991                                         LE{"XX", string(TTx)},
992                                         LE{"Pkt", pktName},
993                                         LE{"Size", int64(ourSize)},
994                                         LE{"FullSize", fullSize},
995                                 )
996                                 if state.Ctx.ShowPrgrs {
997                                         state.progressBars[pktName] = struct{}{}
998                                         Progress("Tx", lesp)
999                                 }
1000                                 state.Lock()
1001                                 if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash {
1002                                         if ourSize == uint64(fullSize) {
1003                                                 state.Ctx.LogD("sp-file-finished", lesp, func(les LEs) string {
1004                                                         return logMsg(les) + ": finished"
1005                                                 })
1006                                                 if len(state.queueTheir) > 1 {
1007                                                         state.queueTheir = state.queueTheir[1:]
1008                                                 } else {
1009                                                         state.queueTheir = state.queueTheir[:0]
1010                                                 }
1011                                                 if state.Ctx.ShowPrgrs {
1012                                                         delete(state.progressBars, pktName)
1013                                                 }
1014                                         } else {
1015                                                 state.queueTheir[0].freq.Offset += uint64(len(buf))
1016                                         }
1017                                 } else {
1018                                         state.Ctx.LogD("sp-file-disappeared", lesp, func(les LEs) string {
1019                                                 return logMsg(les) + ": queue disappeared"
1020                                         })
1021                                 }
1022                                 state.Unlock()
1023                         }
1024                         logMsg := func(les LEs) string {
1025                                 return fmt.Sprintf(
1026                                         "SP with %s (nice %s): sending %s",
1027                                         state.Node.Name, NicenessFmt(state.Nice),
1028                                         humanize.IBytes(uint64(len(payload))),
1029                                 )
1030                         }
1031                         state.Ctx.LogD("sp-sending", append(les, LE{"Size", int64(len(payload))}), logMsg)
1032                         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
1033                         if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload), ping); err != nil {
1034                                 state.Ctx.LogE("sp-sending", les, err, logMsg)
1035                                 return
1036                         }
1037                 }
1038         }()
1039
1040         // Receiver
1041         state.wg.Add(1)
1042         go func() {
1043                 for {
1044                         if state.NotAlive() {
1045                                 break
1046                         }
1047                         logMsg := func(les LEs) string {
1048                                 return fmt.Sprintf(
1049                                         "SP with %s (nice %s): waiting for payload",
1050                                         state.Node.Name, NicenessFmt(state.Nice),
1051                                 )
1052                         }
1053                         state.Ctx.LogD("sp-recv-wait", les, logMsg)
1054                         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
1055                         payload, err := state.ReadSP(conn)
1056                         if err != nil {
1057                                 if err == io.EOF {
1058                                         break
1059                                 }
1060                                 unmarshalErr := err.(*xdr.UnmarshalError)
1061                                 if os.IsTimeout(unmarshalErr.Err) {
1062                                         continue
1063                                 }
1064                                 if unmarshalErr.ErrorCode == xdr.ErrIO {
1065                                         break
1066                                 }
1067                                 state.Ctx.LogE("sp-recv-wait", les, err, logMsg)
1068                                 break
1069                         }
1070                         logMsg = func(les LEs) string {
1071                                 return fmt.Sprintf(
1072                                         "SP with %s (nice %s): payload (%s)",
1073                                         state.Node.Name, NicenessFmt(state.Nice),
1074                                         humanize.IBytes(uint64(len(payload))),
1075                                 )
1076                         }
1077                         state.Ctx.LogD(
1078                                 "sp-recv-got",
1079                                 append(les, LE{"Size", int64(len(payload))}),
1080                                 func(les LEs) string { return logMsg(les) + ": got" },
1081                         )
1082                         payload, err = state.csTheir.Decrypt(nil, nil, payload)
1083                         if err != nil {
1084                                 state.Ctx.LogE("sp-recv-got", les, err, func(les LEs) string {
1085                                         return logMsg(les) + ": got"
1086                                 })
1087                                 break
1088                         }
1089                         state.Ctx.LogD(
1090                                 "sp-recv-process",
1091                                 append(les, LE{"Size", int64(len(payload))}),
1092                                 func(les LEs) string {
1093                                         return logMsg(les) + ": processing"
1094                                 },
1095                         )
1096                         replies, err := state.ProcessSP(payload)
1097                         if err != nil {
1098                                 state.Ctx.LogE("sp-recv-process", les, err, func(les LEs) string {
1099                                         return logMsg(les) + ": processing"
1100                                 })
1101                                 break
1102                         }
1103                         state.wg.Add(1)
1104                         go func() {
1105                                 for _, reply := range replies {
1106                                         state.Ctx.LogD(
1107                                                 "sp-recv-reply",
1108                                                 append(les[:len(les)-1], LE{"Size", int64(len(reply))}),
1109                                                 func(les LEs) string {
1110                                                         return fmt.Sprintf(
1111                                                                 "SP with %s (nice %s): queuing reply (%s)",
1112                                                                 state.Node.Name, NicenessFmt(state.Nice),
1113                                                                 humanize.IBytes(uint64(len(reply))),
1114                                                         )
1115                                                 },
1116                                         )
1117                                         state.payloads <- reply
1118                                 }
1119                                 state.wg.Done()
1120                         }()
1121                         if state.rxRate > 0 {
1122                                 time.Sleep(time.Second / time.Duration(state.rxRate))
1123                         }
1124                 }
1125                 state.SetDead()
1126                 state.wg.Done()
1127                 state.SetDead()
1128                 conn.Close() // #nosec G104
1129         }()
1130
1131         return nil
1132 }
1133
1134 func (state *SPState) Wait() {
1135         state.wg.Wait()
1136         close(state.payloads)
1137         close(state.pings)
1138         state.dirUnlock()
1139         state.Duration = time.Now().Sub(state.started)
1140         state.RxSpeed = state.RxBytes
1141         state.TxSpeed = state.TxBytes
1142         rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
1143         txDuration := int64(state.TxLastSeen.Sub(state.started).Seconds())
1144         if rxDuration > 0 {
1145                 state.RxSpeed = state.RxBytes / rxDuration
1146         }
1147         if txDuration > 0 {
1148                 state.TxSpeed = state.TxBytes / txDuration
1149         }
1150         for _, s := range state.fds {
1151                 s.fd.Close()
1152         }
1153         for pktName := range state.progressBars {
1154                 ProgressKill(pktName)
1155         }
1156 }
1157
1158 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
1159         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
1160         r := bytes.NewReader(payload)
1161         var err error
1162         var replies [][]byte
1163         var infosGot bool
1164         for r.Len() > 0 {
1165                 state.Ctx.LogD("sp-process-unmarshal", les, func(les LEs) string {
1166                         return fmt.Sprintf(
1167                                 "SP with %s (nice %s): unmarshaling header",
1168                                 state.Node.Name, NicenessFmt(state.Nice),
1169                         )
1170                 })
1171                 var head SPHead
1172                 if _, err = xdr.Unmarshal(r, &head); err != nil {
1173                         state.Ctx.LogE("sp-process-unmarshal", les, err, func(les LEs) string {
1174                                 return fmt.Sprintf(
1175                                         "SP with %s (nice %s): unmarshaling header",
1176                                         state.Node.Name, NicenessFmt(state.Nice),
1177                                 )
1178                         })
1179                         return nil, err
1180                 }
1181                 if head.Type != SPTypePing {
1182                         state.RxLastNonPing = state.RxLastSeen
1183                 }
1184                 switch head.Type {
1185                 case SPTypeHalt:
1186                         state.Ctx.LogD(
1187                                 "sp-process-halt",
1188                                 append(les, LE{"Type", "halt"}), func(les LEs) string {
1189                                         return fmt.Sprintf(
1190                                                 "SP with %s (nice %s): got HALT",
1191                                                 state.Node.Name, NicenessFmt(state.Nice),
1192                                         )
1193                                 },
1194                         )
1195                         state.Lock()
1196                         state.queueTheir = nil
1197                         state.Unlock()
1198
1199                 case SPTypePing:
1200                         state.Ctx.LogD(
1201                                 "sp-process-ping",
1202                                 append(les, LE{"Type", "ping"}),
1203                                 func(les LEs) string {
1204                                         return fmt.Sprintf(
1205                                                 "SP with %s (nice %s): got PING",
1206                                                 state.Node.Name, NicenessFmt(state.Nice),
1207                                         )
1208                                 },
1209                         )
1210
1211                 case SPTypeInfo:
1212                         infosGot = true
1213                         lesp := append(les, LE{"Type", "info"})
1214                         state.Ctx.LogD(
1215                                 "sp-process-info-unmarshal", lesp,
1216                                 func(les LEs) string {
1217                                         return fmt.Sprintf(
1218                                                 "SP with %s (nice %s): unmarshaling INFO",
1219                                                 state.Node.Name, NicenessFmt(state.Nice),
1220                                         )
1221                                 },
1222                         )
1223                         var info SPInfo
1224                         if _, err = xdr.Unmarshal(r, &info); err != nil {
1225                                 state.Ctx.LogE(
1226                                         "sp-process-info-unmarshal", lesp, err,
1227                                         func(les LEs) string {
1228                                                 return fmt.Sprintf(
1229                                                         "SP with %s (nice %s): unmarshaling INFO",
1230                                                         state.Node.Name, NicenessFmt(state.Nice),
1231                                                 )
1232                                         },
1233                                 )
1234                                 return nil, err
1235                         }
1236                         pktName := Base32Codec.EncodeToString(info.Hash[:])
1237                         lesp = append(
1238                                 lesp,
1239                                 LE{"Pkt", pktName},
1240                                 LE{"Size", int64(info.Size)},
1241                                 LE{"PktNice", int(info.Nice)},
1242                         )
1243                         logMsg := func(les LEs) string {
1244                                 return fmt.Sprintf(
1245                                         "SP with %s (nice %s): INFO %s (%s) nice %s",
1246                                         state.Node.Name, NicenessFmt(state.Nice),
1247                                         pktName,
1248                                         humanize.IBytes(info.Size),
1249                                         NicenessFmt(info.Nice),
1250                                 )
1251                         }
1252                         if !state.listOnly && info.Nice > state.Nice {
1253                                 state.Ctx.LogD("sp-process-info-too-nice", lesp, func(les LEs) string {
1254                                         return logMsg(les) + ": too nice"
1255                                 })
1256                                 continue
1257                         }
1258                         state.Ctx.LogD("sp-process-info-got", lesp, func(les LEs) string {
1259                                 return logMsg(les) + ": received"
1260                         })
1261                         if !state.listOnly && state.xxOnly == TTx {
1262                                 continue
1263                         }
1264                         state.Lock()
1265                         state.infosTheir[*info.Hash] = &info
1266                         state.Unlock()
1267                         state.Ctx.LogD("sp-process-info-stat", lesp, func(les LEs) string {
1268                                 return logMsg(les) + ": stating part"
1269                         })
1270                         pktPath := filepath.Join(
1271                                 state.Ctx.Spool,
1272                                 state.Node.Id.String(),
1273                                 string(TRx),
1274                                 Base32Codec.EncodeToString(info.Hash[:]),
1275                         )
1276                         logMsg = func(les LEs) string {
1277                                 return fmt.Sprintf(
1278                                         "Packet %s (%s) (nice %s)",
1279                                         pktName,
1280                                         humanize.IBytes(info.Size),
1281                                         NicenessFmt(info.Nice),
1282                                 )
1283                         }
1284                         if _, err = os.Stat(pktPath); err == nil {
1285                                 state.Ctx.LogI("sp-info-done", lesp, func(les LEs) string {
1286                                         return logMsg(les) + ": already done"
1287                                 })
1288                                 if !state.listOnly {
1289                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
1290                                 }
1291                                 continue
1292                         }
1293                         if _, err = os.Stat(pktPath + SeenSuffix); err == nil {
1294                                 state.Ctx.LogI("sp-info-seen", lesp, func(les LEs) string {
1295                                         return logMsg(les) + ": already seen"
1296                                 })
1297                                 if !state.listOnly {
1298                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
1299                                 }
1300                                 continue
1301                         }
1302                         if _, err = os.Stat(pktPath + NoCKSuffix); err == nil {
1303                                 state.Ctx.LogI("sp-info-nock", lesp, func(les LEs) string {
1304                                         return logMsg(les) + ": still not checksummed"
1305                                 })
1306                                 continue
1307                         }
1308                         fi, err := os.Stat(pktPath + PartSuffix)
1309                         var offset int64
1310                         if err == nil {
1311                                 offset = fi.Size()
1312                         }
1313                         if !state.Ctx.IsEnoughSpace(int64(info.Size) - offset) {
1314                                 state.Ctx.LogI("sp-info-no-space", lesp, func(les LEs) string {
1315                                         return logMsg(les) + ": not enough space"
1316                                 })
1317                                 continue
1318                         }
1319                         state.Ctx.LogI(
1320                                 "sp-info",
1321                                 append(lesp, LE{"Offset", offset}),
1322                                 func(les LEs) string {
1323                                         return fmt.Sprintf(
1324                                                 "%s: %d%%", logMsg(les), 100*uint64(offset)/info.Size,
1325                                         )
1326                                 },
1327                         )
1328                         if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) {
1329                                 replies = append(replies, MarshalSP(
1330                                         SPTypeFreq,
1331                                         SPFreq{info.Hash, uint64(offset)},
1332                                 ))
1333                         }
1334
1335                 case SPTypeFile:
1336                         lesp := append(les, LE{"Type", "file"})
1337                         state.Ctx.LogD("sp-process-file", lesp, func(les LEs) string {
1338                                 return fmt.Sprintf(
1339                                         "SP with %s (nice %s): unmarshaling FILE",
1340                                         state.Node.Name, NicenessFmt(state.Nice),
1341                                 )
1342                         })
1343                         var file SPFile
1344                         if _, err = xdr.Unmarshal(r, &file); err != nil {
1345                                 state.Ctx.LogE("sp-process-file", lesp, err, func(les LEs) string {
1346                                         return fmt.Sprintf(
1347                                                 "SP with %s (nice %s): unmarshaling FILE",
1348                                                 state.Node.Name, NicenessFmt(state.Nice),
1349                                         )
1350                                 })
1351                                 return nil, err
1352                         }
1353                         pktName := Base32Codec.EncodeToString(file.Hash[:])
1354                         lesp = append(
1355                                 lesp,
1356                                 LE{"XX", string(TRx)},
1357                                 LE{"Pkt", pktName},
1358                                 LE{"Size", int64(len(file.Payload))},
1359                         )
1360                         logMsg := func(les LEs) string {
1361                                 return fmt.Sprintf(
1362                                         "Got packet %s (%s)",
1363                                         pktName, humanize.IBytes(uint64(len(file.Payload))),
1364                                 )
1365                         }
1366                         dirToSync := filepath.Join(
1367                                 state.Ctx.Spool,
1368                                 state.Node.Id.String(),
1369                                 string(TRx),
1370                         )
1371                         filePath := filepath.Join(dirToSync, pktName)
1372                         filePathPart := filePath + PartSuffix
1373                         state.Ctx.LogD("sp-file-open", lesp, func(les LEs) string {
1374                                 return logMsg(les) + ": opening part"
1375                         })
1376                         state.fdsLock.RLock()
1377                         fdAndFullSize, exists := state.fds[filePathPart]
1378                         state.fdsLock.RUnlock()
1379                         var fd *os.File
1380                         if exists {
1381                                 fd = fdAndFullSize.fd
1382                         } else {
1383                                 fd, err = os.OpenFile(
1384                                         filePathPart,
1385                                         os.O_RDWR|os.O_CREATE,
1386                                         os.FileMode(0666),
1387                                 )
1388                                 if err != nil {
1389                                         state.Ctx.LogE("sp-file-open", lesp, err, func(les LEs) string {
1390                                                 return logMsg(les) + ": opening part"
1391                                         })
1392                                         return nil, err
1393                                 }
1394                                 state.fdsLock.Lock()
1395                                 state.fds[filePathPart] = FdAndFullSize{fd: fd}
1396                                 state.fdsLock.Unlock()
1397                                 if file.Offset == 0 {
1398                                         h, err := blake2b.New256(nil)
1399                                         if err != nil {
1400                                                 panic(err)
1401                                         }
1402                                         state.fileHashers[filePath] = &HasherAndOffset{h: h}
1403                                 }
1404                         }
1405                         state.Ctx.LogD(
1406                                 "sp-file-seek",
1407                                 append(lesp, LE{"Offset", file.Offset}),
1408                                 func(les LEs) string {
1409                                         return fmt.Sprintf("%s: seeking %d", logMsg(les), file.Offset)
1410                                 })
1411                         if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
1412                                 state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
1413                                         return logMsg(les) + ": seeking"
1414                                 })
1415                                 state.closeFd(filePathPart)
1416                                 return nil, err
1417                         }
1418                         state.Ctx.LogD("sp-file-write", lesp, func(les LEs) string {
1419                                 return logMsg(les) + ": writing"
1420                         })
1421                         if _, err = fd.Write(file.Payload); err != nil {
1422                                 state.Ctx.LogE("sp-file-write", lesp, err, func(les LEs) string {
1423                                         return logMsg(les) + ": writing"
1424                                 })
1425                                 state.closeFd(filePathPart)
1426                                 return nil, err
1427                         }
1428                         hasherAndOffset, hasherExists := state.fileHashers[filePath]
1429                         if hasherExists {
1430                                 if hasherAndOffset.offset == file.Offset {
1431                                         if _, err = hasherAndOffset.h.Write(file.Payload); err != nil {
1432                                                 panic(err)
1433                                         }
1434                                         hasherAndOffset.offset += uint64(len(file.Payload))
1435                                 } else {
1436                                         state.Ctx.LogD(
1437                                                 "sp-file-offset-differs", lesp,
1438                                                 func(les LEs) string {
1439                                                         return logMsg(les) + ": offset differs, deleting hasher"
1440                                                 },
1441                                         )
1442                                         delete(state.fileHashers, filePath)
1443                                         hasherExists = false
1444                                 }
1445                         }
1446                         ourSize := int64(file.Offset + uint64(len(file.Payload)))
1447                         lesp[len(lesp)-1].V = ourSize
1448                         fullsize := int64(0)
1449                         state.RLock()
1450                         infoTheir, ok := state.infosTheir[*file.Hash]
1451                         state.RUnlock()
1452                         if ok {
1453                                 fullsize = int64(infoTheir.Size)
1454                         }
1455                         lesp = append(lesp, LE{"FullSize", fullsize})
1456                         if state.Ctx.ShowPrgrs {
1457                                 state.progressBars[pktName] = struct{}{}
1458                                 Progress("Rx", lesp)
1459                         }
1460                         if fullsize != ourSize {
1461                                 continue
1462                         }
1463                         if state.Ctx.ShowPrgrs {
1464                                 delete(state.progressBars, pktName)
1465                         }
1466                         logMsg = func(les LEs) string {
1467                                 return fmt.Sprintf(
1468                                         "Got packet %s %d%% (%s / %s)",
1469                                         pktName, 100*ourSize/fullsize,
1470                                         humanize.IBytes(uint64(ourSize)),
1471                                         humanize.IBytes(uint64(fullsize)),
1472                                 )
1473                         }
1474                         err = fd.Sync()
1475                         if err != nil {
1476                                 state.Ctx.LogE("sp-file-sync", lesp, err, func(les LEs) string {
1477                                         return logMsg(les) + ": syncing"
1478                                 })
1479                                 state.closeFd(filePathPart)
1480                                 continue
1481                         }
1482                         if hasherExists {
1483                                 if bytes.Compare(hasherAndOffset.h.Sum(nil), file.Hash[:]) != 0 {
1484                                         state.Ctx.LogE(
1485                                                 "sp-file-bad-checksum", lesp,
1486                                                 errors.New("checksum mismatch"),
1487                                                 logMsg,
1488                                         )
1489                                         continue
1490                                 }
1491                                 if err = os.Rename(filePathPart, filePath); err != nil {
1492                                         state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string {
1493                                                 return logMsg(les) + ": renaming"
1494                                         })
1495                                         continue
1496                                 }
1497                                 if err = DirSync(dirToSync); err != nil {
1498                                         state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string {
1499                                                 return logMsg(les) + ": dirsyncing"
1500                                         })
1501                                         continue
1502                                 }
1503                                 state.Ctx.LogI("sp-file-done", lesp, func(les LEs) string {
1504                                         return logMsg(les) + ": done"
1505                                 })
1506                                 state.wg.Add(1)
1507                                 go func() {
1508                                         state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
1509                                         state.wg.Done()
1510                                 }()
1511                                 state.Lock()
1512                                 delete(state.infosTheir, *file.Hash)
1513                                 state.Unlock()
1514                                 if !state.Ctx.HdrUsage {
1515                                         state.closeFd(filePathPart)
1516                                         continue
1517                                 }
1518                                 if _, err = fd.Seek(0, io.SeekStart); err != nil {
1519                                         state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
1520                                                 return logMsg(les) + ": seeking"
1521                                         })
1522                                         state.closeFd(filePathPart)
1523                                         continue
1524                                 }
1525                                 _, pktEncRaw, err := state.Ctx.HdrRead(fd)
1526                                 state.closeFd(filePathPart)
1527                                 if err != nil {
1528                                         state.Ctx.LogE("sp-file-hdr-read", lesp, err, func(les LEs) string {
1529                                                 return logMsg(les) + ": HdrReading"
1530                                         })
1531                                         continue
1532                                 }
1533                                 state.Ctx.HdrWrite(pktEncRaw, filePath)
1534                                 continue
1535                         }
1536                         state.closeFd(filePathPart)
1537                         if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil {
1538                                 state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string {
1539                                         return logMsg(les) + ": renaming"
1540                                 })
1541                                 continue
1542                         }
1543                         if err = DirSync(dirToSync); err != nil {
1544                                 state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string {
1545                                         return logMsg(les) + ": dirsyncing"
1546                                 })
1547                                 continue
1548                         }
1549                         state.Ctx.LogI("sp-file-downloaded", lesp, func(les LEs) string {
1550                                 return logMsg(les) + ": downloaded"
1551                         })
1552                         state.Lock()
1553                         delete(state.infosTheir, *file.Hash)
1554                         state.Unlock()
1555                         if !state.NoCK {
1556                                 state.checkerQueues.appeared <- file.Hash
1557                         }
1558
1559                 case SPTypeDone:
1560                         lesp := append(les, LE{"Type", "done"})
1561                         state.Ctx.LogD("sp-process-done-unmarshal", lesp, func(les LEs) string {
1562                                 return fmt.Sprintf(
1563                                         "SP with %s (nice %s): unmarshaling DONE",
1564                                         state.Node.Name, NicenessFmt(state.Nice),
1565                                 )
1566                         })
1567                         var done SPDone
1568                         if _, err = xdr.Unmarshal(r, &done); err != nil {
1569                                 state.Ctx.LogE("sp-process-done-unmarshal", lesp, err, func(les LEs) string {
1570                                         return fmt.Sprintf(
1571                                                 "SP with %s (nice %s): unmarshaling DONE",
1572                                                 state.Node.Name, NicenessFmt(state.Nice),
1573                                         )
1574                                 })
1575                                 return nil, err
1576                         }
1577                         pktName := Base32Codec.EncodeToString(done.Hash[:])
1578                         lesp = append(lesp, LE{"Pkt", pktName}, LE{"XX", string(TTx)})
1579                         logMsg := func(les LEs) string {
1580                                 return fmt.Sprintf(
1581                                         "SP with %s (nice %s): DONE: removing %s",
1582                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1583                                 )
1584                         }
1585                         state.Ctx.LogD("sp-done", lesp, logMsg)
1586                         pth := filepath.Join(
1587                                 state.Ctx.Spool,
1588                                 state.Node.Id.String(),
1589                                 string(TTx),
1590                                 pktName,
1591                         )
1592                         if err = os.Remove(pth); err == nil {
1593                                 state.Ctx.LogI("sp-done", lesp, func(les LEs) string {
1594                                         return fmt.Sprintf("Packet %s is sent", pktName)
1595                                 })
1596                                 if state.Ctx.HdrUsage {
1597                                         os.Remove(pth + HdrSuffix)
1598                                 }
1599                         } else {
1600                                 state.Ctx.LogE("sp-done", lesp, err, logMsg)
1601                         }
1602
1603                 case SPTypeFreq:
1604                         lesp := append(les, LE{"Type", "freq"})
1605                         state.Ctx.LogD("sp-process-freq", lesp, func(les LEs) string {
1606                                 return fmt.Sprintf(
1607                                         "SP with %s (nice %s): unmarshaling FREQ",
1608                                         state.Node.Name, NicenessFmt(state.Nice),
1609                                 )
1610                         })
1611                         var freq SPFreq
1612                         if _, err = xdr.Unmarshal(r, &freq); err != nil {
1613                                 state.Ctx.LogE("sp-process-freq", lesp, err, func(les LEs) string {
1614                                         return fmt.Sprintf(
1615                                                 "SP with %s (nice %s): unmarshaling FREQ",
1616                                                 state.Node.Name, NicenessFmt(state.Nice),
1617                                         )
1618                                 })
1619                                 return nil, err
1620                         }
1621                         pktName := Base32Codec.EncodeToString(freq.Hash[:])
1622                         lesp = append(lesp, LE{"Pkt", pktName}, LE{"Offset", freq.Offset})
1623                         state.Ctx.LogD("sp-process-freq-queueing", lesp, func(les LEs) string {
1624                                 return fmt.Sprintf(
1625                                         "SP with %s (nice %s): FREQ %s: queuing",
1626                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1627                                 )
1628                         })
1629                         nice, exists := state.infosOurSeen[*freq.Hash]
1630                         if exists {
1631                                 if state.onlyPkts == nil || !state.onlyPkts[*freq.Hash] {
1632                                         state.Lock()
1633                                         insertIdx := 0
1634                                         var freqWithNice *FreqWithNice
1635                                         for insertIdx, freqWithNice = range state.queueTheir {
1636                                                 if freqWithNice.nice > nice {
1637                                                         break
1638                                                 }
1639                                         }
1640                                         state.queueTheir = append(state.queueTheir, nil)
1641                                         copy(state.queueTheir[insertIdx+1:], state.queueTheir[insertIdx:])
1642                                         state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice}
1643                                         state.Unlock()
1644                                 } else {
1645                                         state.Ctx.LogD("sp-process-freq-skip", lesp, func(les LEs) string {
1646                                                 return fmt.Sprintf(
1647                                                         "SP with %s (nice %s): FREQ %s: skipping",
1648                                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1649                                                 )
1650                                         })
1651                                 }
1652                         } else {
1653                                 state.Ctx.LogD("sp-process-freq-unknown", lesp, func(les LEs) string {
1654                                         return fmt.Sprintf(
1655                                                 "SP with %s (nice %s): FREQ %s: unknown",
1656                                                 state.Node.Name, NicenessFmt(state.Nice), pktName,
1657                                         )
1658                                 })
1659                         }
1660
1661                 default:
1662                         state.Ctx.LogE(
1663                                 "sp-process-type-unknown",
1664                                 append(les, LE{"Type", head.Type}),
1665                                 errors.New("unknown type"),
1666                                 func(les LEs) string {
1667                                         return fmt.Sprintf(
1668                                                 "SP with %s (nice %s): %d",
1669                                                 state.Node.Name, NicenessFmt(state.Nice), head.Type,
1670                                         )
1671                                 },
1672                         )
1673                         return nil, BadPktType
1674                 }
1675         }
1676
1677         if infosGot {
1678                 var pkts int
1679                 var size uint64
1680                 state.RLock()
1681                 for _, info := range state.infosTheir {
1682                         pkts++
1683                         size += info.Size
1684                 }
1685                 state.RUnlock()
1686                 state.Ctx.LogI("sp-infos-rx", LEs{
1687                         {"XX", string(TRx)},
1688                         {"Node", state.Node.Id},
1689                         {"Pkts", pkts},
1690                         {"Size", int64(size)},
1691                 }, func(les LEs) string {
1692                         return fmt.Sprintf(
1693                                 "%s has got for us: %d packets, %s",
1694                                 state.Node.Name, pkts, humanize.IBytes(size),
1695                         )
1696                 })
1697         }
1698         return payloadsSplit(replies), nil
1699 }