]> Cypherpunks.ru repositories - nncp.git/blob - src/sp.go
f2c707f1029bd15bd8585bcb974b3da017097cf7
[nncp.git] / src / sp.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bytes"
22         "crypto/subtle"
23         "errors"
24         "fmt"
25         "hash"
26         "io"
27         "os"
28         "path/filepath"
29         "sort"
30         "sync"
31         "time"
32
33         xdr "github.com/davecgh/go-xdr/xdr2"
34         "github.com/dustin/go-humanize"
35         "github.com/flynn/noise"
36         "golang.org/x/crypto/blake2b"
37 )
38
39 const (
40         MaxSPSize      = 1<<16 - 256
41         PartSuffix     = ".part"
42         SPHeadOverhead = 4
43 )
44
45 type SPCheckerQueues struct {
46         appeared chan *[32]byte
47         checked  chan *[32]byte
48 }
49
50 var (
51         MagicNNCPLv1 [8]byte = [8]byte{'N', 'N', 'C', 'P', 'S', 0, 0, 1}
52
53         SPInfoOverhead    int
54         SPFreqOverhead    int
55         SPFileOverhead    int
56         SPHaltMarshalized []byte
57         SPPingMarshalized []byte
58
59         NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite(
60                 noise.DH25519,
61                 noise.CipherChaChaPoly,
62                 noise.HashBLAKE2b,
63         )
64
65         DefaultDeadline = 10 * time.Second
66         PingTimeout     = time.Minute
67
68         spCheckers = make(map[NodeId]*SPCheckerQueues)
69 )
70
71 type FdAndFullSize struct {
72         fd       *os.File
73         fullSize int64
74 }
75
76 type HasherAndOffset struct {
77         h      hash.Hash
78         offset uint64
79 }
80
81 type SPType uint8
82
83 const (
84         SPTypeInfo SPType = iota
85         SPTypeFreq SPType = iota
86         SPTypeFile SPType = iota
87         SPTypeDone SPType = iota
88         SPTypeHalt SPType = iota
89         SPTypePing SPType = iota
90 )
91
92 type SPHead struct {
93         Type SPType
94 }
95
96 type SPInfo struct {
97         Nice uint8
98         Size uint64
99         Hash *[32]byte
100 }
101
102 type SPFreq struct {
103         Hash   *[32]byte
104         Offset uint64
105 }
106
107 type SPFile struct {
108         Hash    *[32]byte
109         Offset  uint64
110         Payload []byte
111 }
112
113 type SPDone struct {
114         Hash *[32]byte
115 }
116
117 type SPRaw struct {
118         Magic   [8]byte
119         Payload []byte
120 }
121
122 type FreqWithNice struct {
123         freq *SPFreq
124         nice uint8
125 }
126
127 type ConnDeadlined interface {
128         io.ReadWriteCloser
129         SetReadDeadline(t time.Time) error
130         SetWriteDeadline(t time.Time) error
131 }
132
133 func init() {
134         var buf bytes.Buffer
135         spHead := SPHead{Type: SPTypeHalt}
136         if _, err := xdr.Marshal(&buf, spHead); err != nil {
137                 panic(err)
138         }
139         SPHaltMarshalized = make([]byte, SPHeadOverhead)
140         copy(SPHaltMarshalized, buf.Bytes())
141         buf.Reset()
142
143         spHead = SPHead{Type: SPTypePing}
144         if _, err := xdr.Marshal(&buf, spHead); err != nil {
145                 panic(err)
146         }
147         SPPingMarshalized = make([]byte, SPHeadOverhead)
148         copy(SPPingMarshalized, buf.Bytes())
149         buf.Reset()
150
151         spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([32]byte)}
152         if _, err := xdr.Marshal(&buf, spInfo); err != nil {
153                 panic(err)
154         }
155         SPInfoOverhead = buf.Len()
156         buf.Reset()
157
158         spFreq := SPFreq{Hash: new([32]byte), Offset: 123}
159         if _, err := xdr.Marshal(&buf, spFreq); err != nil {
160                 panic(err)
161         }
162         SPFreqOverhead = buf.Len()
163         buf.Reset()
164
165         spFile := SPFile{Hash: new([32]byte), Offset: 123}
166         if _, err := xdr.Marshal(&buf, spFile); err != nil {
167                 panic(err)
168         }
169         SPFileOverhead = buf.Len()
170 }
171
172 func MarshalSP(typ SPType, sp interface{}) []byte {
173         var buf bytes.Buffer
174         if _, err := xdr.Marshal(&buf, SPHead{typ}); err != nil {
175                 panic(err)
176         }
177         if _, err := xdr.Marshal(&buf, sp); err != nil {
178                 panic(err)
179         }
180         return buf.Bytes()
181 }
182
183 func payloadsSplit(payloads [][]byte) [][]byte {
184         var outbounds [][]byte
185         outbound := make([]byte, 0, MaxSPSize)
186         for i, payload := range payloads {
187                 outbound = append(outbound, payload...)
188                 if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxSPSize {
189                         outbounds = append(outbounds, outbound)
190                         outbound = make([]byte, 0, MaxSPSize)
191                 }
192         }
193         if len(outbound) > 0 {
194                 outbounds = append(outbounds, outbound)
195         }
196         return outbounds
197 }
198
199 type SPState struct {
200         Ctx            *Ctx
201         Node           *Node
202         Nice           uint8
203         NoCK           bool
204         onlineDeadline time.Duration
205         maxOnlineTime  time.Duration
206         hs             *noise.HandshakeState
207         csOur          *noise.CipherState
208         csTheir        *noise.CipherState
209         payloads       chan []byte
210         pings          chan struct{}
211         infosTheir     map[[32]byte]*SPInfo
212         infosOurSeen   map[[32]byte]uint8
213         queueTheir     []*FreqWithNice
214         wg             sync.WaitGroup
215         RxBytes        int64
216         RxLastSeen     time.Time
217         RxLastNonPing  time.Time
218         TxBytes        int64
219         TxLastSeen     time.Time
220         TxLastNonPing  time.Time
221         started        time.Time
222         mustFinishAt   time.Time
223         Duration       time.Duration
224         RxSpeed        int64
225         TxSpeed        int64
226         rxLock         *os.File
227         txLock         *os.File
228         xxOnly         TRxTx
229         rxRate         int
230         txRate         int
231         isDead         chan struct{}
232         listOnly       bool
233         onlyPkts       map[[32]byte]bool
234         writeSPBuf     bytes.Buffer
235         fds            map[string]FdAndFullSize
236         fileHashers    map[string]*HasherAndOffset
237         checkerQueues  SPCheckerQueues
238         sync.RWMutex
239 }
240
241 func (state *SPState) SetDead() {
242         state.Lock()
243         defer state.Unlock()
244         select {
245         case <-state.isDead:
246                 // Already closed channel, dead
247                 return
248         default:
249         }
250         close(state.isDead)
251         go func() {
252                 for range state.payloads {
253                 }
254         }()
255         go func() {
256                 for range state.pings {
257                 }
258         }()
259         go func() {
260                 for _, s := range state.fds {
261                         s.fd.Close()
262                 }
263         }()
264 }
265
266 func (state *SPState) NotAlive() bool {
267         select {
268         case <-state.isDead:
269                 return true
270         default:
271         }
272         return false
273 }
274
275 func (state *SPState) dirUnlock() {
276         state.Ctx.UnlockDir(state.rxLock)
277         state.Ctx.UnlockDir(state.txLock)
278 }
279
280 func SPChecker(ctx *Ctx, nodeId *NodeId, appeared, checked chan *[32]byte) {
281         for hshValue := range appeared {
282                 pktName := Base32Codec.EncodeToString(hshValue[:])
283                 les := LEs{
284                         {"XX", string(TRx)},
285                         {"Node", nodeId},
286                         {"Pkt", pktName},
287                 }
288                 ctx.LogD("sp-checker", les, func(les LEs) string {
289                         return fmt.Sprintf("Checksumming %s/rx/%s", ctx.NodeName(nodeId), pktName)
290                 })
291                 size, err := ctx.CheckNoCK(nodeId, hshValue)
292                 les = append(les, LE{"Size", size})
293                 if err != nil {
294                         ctx.LogE("sp-checker", les, err, func(les LEs) string {
295                                 return fmt.Sprintf(
296                                         "Checksumming %s/rx/%s (%s)", ctx.NodeName(nodeId), pktName,
297                                         humanize.IBytes(uint64(size)),
298                                 )
299                         })
300                         continue
301                 }
302                 ctx.LogI("sp-checker-done", les, func(les LEs) string {
303                         return fmt.Sprintf(
304                                 "Packet %s is retreived (%s)",
305                                 pktName, humanize.IBytes(uint64(size)),
306                         )
307                 })
308                 go func(hsh *[32]byte) { checked <- hsh }(hshValue)
309         }
310 }
311
312 func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error {
313         state.writeSPBuf.Reset()
314         n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{
315                 Magic:   MagicNNCPLv1,
316                 Payload: payload,
317         })
318         if err != nil {
319                 return err
320         }
321         if n, err = dst.Write(state.writeSPBuf.Bytes()); err == nil {
322                 state.TxLastSeen = time.Now()
323                 state.TxBytes += int64(n)
324                 if !ping {
325                         state.TxLastNonPing = state.TxLastSeen
326                 }
327         }
328         return err
329 }
330
331 func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
332         var sp SPRaw
333         n, err := xdr.UnmarshalLimited(src, &sp, 1<<17)
334         if err != nil {
335                 ue := err.(*xdr.UnmarshalError)
336                 if ue.Err == io.EOF {
337                         return nil, ue.Err
338                 }
339                 return nil, err
340         }
341         state.RxLastSeen = time.Now()
342         state.RxBytes += int64(n)
343         if sp.Magic != MagicNNCPLv1 {
344                 return nil, BadMagic
345         }
346         return sp.Payload, nil
347 }
348
349 func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[32]byte]uint8) [][]byte {
350         var infos []*SPInfo
351         var totalSize int64
352         for job := range ctx.Jobs(nodeId, TTx) {
353                 if job.PktEnc.Nice > nice {
354                         continue
355                 }
356                 if _, known := (*seen)[*job.HshValue]; known {
357                         continue
358                 }
359                 totalSize += job.Size
360                 infos = append(infos, &SPInfo{
361                         Nice: job.PktEnc.Nice,
362                         Size: uint64(job.Size),
363                         Hash: job.HshValue,
364                 })
365                 (*seen)[*job.HshValue] = job.PktEnc.Nice
366         }
367         sort.Sort(ByNice(infos))
368         var payloads [][]byte
369         for _, info := range infos {
370                 payloads = append(payloads, MarshalSP(SPTypeInfo, info))
371                 pktName := Base32Codec.EncodeToString(info.Hash[:])
372                 ctx.LogD("sp-info-our", LEs{
373                         {"Node", nodeId},
374                         {"Name", pktName},
375                         {"Size", info.Size},
376                 }, func(les LEs) string {
377                         return fmt.Sprintf(
378                                 "Our info: %s/tx/%s (%s)",
379                                 ctx.NodeName(nodeId),
380                                 pktName,
381                                 humanize.IBytes(info.Size),
382                         )
383                 })
384         }
385         if totalSize > 0 {
386                 ctx.LogI("sp-infos-tx", LEs{
387                         {"XX", string(TTx)},
388                         {"Node", nodeId},
389                         {"Pkts", len(payloads)},
390                         {"Size", totalSize},
391                 }, func(les LEs) string {
392                         return fmt.Sprintf(
393                                 "We have got for %s: %d packets, %s",
394                                 ctx.NodeName(nodeId),
395                                 len(payloads),
396                                 humanize.IBytes(uint64(totalSize)),
397                         )
398                 })
399         }
400         return payloadsSplit(payloads)
401 }
402
403 func (state *SPState) StartI(conn ConnDeadlined) error {
404         nodeId := state.Node.Id
405         err := state.Ctx.ensureRxDir(nodeId)
406         if err != nil {
407                 return err
408         }
409         var rxLock *os.File
410         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TRx) {
411                 rxLock, err = state.Ctx.LockDir(nodeId, string(TRx))
412                 if err != nil {
413                         return err
414                 }
415         }
416         var txLock *os.File
417         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
418                 txLock, err = state.Ctx.LockDir(nodeId, string(TTx))
419                 if err != nil {
420                         return err
421                 }
422         }
423         started := time.Now()
424         conf := noise.Config{
425                 CipherSuite: NoiseCipherSuite,
426                 Pattern:     noise.HandshakeIK,
427                 Initiator:   true,
428                 StaticKeypair: noise.DHKey{
429                         Private: state.Ctx.Self.NoisePrv[:],
430                         Public:  state.Ctx.Self.NoisePub[:],
431                 },
432                 PeerStatic: state.Node.NoisePub[:],
433         }
434         hs, err := noise.NewHandshakeState(conf)
435         if err != nil {
436                 return err
437         }
438         state.hs = hs
439         state.payloads = make(chan []byte)
440         state.pings = make(chan struct{})
441         state.infosTheir = make(map[[32]byte]*SPInfo)
442         state.infosOurSeen = make(map[[32]byte]uint8)
443         state.started = started
444         state.rxLock = rxLock
445         state.txLock = txLock
446
447         var infosPayloads [][]byte
448         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
449                 infosPayloads = state.Ctx.infosOur(nodeId, state.Nice, &state.infosOurSeen)
450         }
451         var firstPayload []byte
452         if len(infosPayloads) > 0 {
453                 firstPayload = infosPayloads[0]
454         }
455         // Pad first payload, to hide actual number of existing files
456         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
457                 firstPayload = append(firstPayload, SPHaltMarshalized...)
458         }
459
460         var buf []byte
461         var payload []byte
462         buf, _, _, err = state.hs.WriteMessage(nil, firstPayload)
463         if err != nil {
464                 state.dirUnlock()
465                 return err
466         }
467         les := LEs{{"Node", nodeId}, {"Nice", int(state.Nice)}}
468         state.Ctx.LogD("sp-startI", les, func(les LEs) string {
469                 return fmt.Sprintf(
470                         "SP with %s (nice %s): sending first message",
471                         state.Node.Name,
472                         NicenessFmt(state.Nice),
473                 )
474         })
475         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
476         if err = state.WriteSP(conn, buf, false); err != nil {
477                 state.Ctx.LogE("sp-startI", les, err, func(les LEs) string {
478                         return fmt.Sprintf(
479                                 "SP with %s (nice %s): writing",
480                                 state.Node.Name,
481                                 NicenessFmt(state.Nice),
482                         )
483                 })
484                 state.dirUnlock()
485                 return err
486         }
487         state.Ctx.LogD("sp-startI-wait", les, func(les LEs) string {
488                 return fmt.Sprintf(
489                         "SP with %s (nice %s): waiting for first message",
490                         state.Node.Name,
491                         NicenessFmt(state.Nice),
492                 )
493         })
494         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
495         if buf, err = state.ReadSP(conn); err != nil {
496                 state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string {
497                         return fmt.Sprintf(
498                                 "SP with %s (nice %s): reading",
499                                 state.Node.Name,
500                                 NicenessFmt(state.Nice),
501                         )
502                 })
503                 state.dirUnlock()
504                 return err
505         }
506         payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
507         if err != nil {
508                 state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string {
509                         return fmt.Sprintf(
510                                 "SP with %s (nice %s): reading Noise message",
511                                 state.Node.Name,
512                                 NicenessFmt(state.Nice),
513                         )
514                 })
515                 state.dirUnlock()
516                 return err
517         }
518         state.Ctx.LogD("sp-startI-workers", les, func(les LEs) string {
519                 return fmt.Sprintf(
520                         "SP with %s (nice %s): starting workers",
521                         state.Node.Name,
522                         NicenessFmt(state.Nice),
523                 )
524         })
525         err = state.StartWorkers(conn, infosPayloads, payload)
526         if err != nil {
527                 state.Ctx.LogE("sp-startI-workers", les, err, func(les LEs) string {
528                         return fmt.Sprintf(
529                                 "SP with %s (nice %s): starting workers",
530                                 state.Node.Name,
531                                 NicenessFmt(state.Nice),
532                         )
533                 })
534                 state.dirUnlock()
535         }
536         return err
537 }
538
539 func (state *SPState) StartR(conn ConnDeadlined) error {
540         started := time.Now()
541         conf := noise.Config{
542                 CipherSuite: NoiseCipherSuite,
543                 Pattern:     noise.HandshakeIK,
544                 Initiator:   false,
545                 StaticKeypair: noise.DHKey{
546                         Private: state.Ctx.Self.NoisePrv[:],
547                         Public:  state.Ctx.Self.NoisePub[:],
548                 },
549         }
550         hs, err := noise.NewHandshakeState(conf)
551         if err != nil {
552                 return err
553         }
554         xxOnly := TRxTx("")
555         state.hs = hs
556         state.payloads = make(chan []byte)
557         state.pings = make(chan struct{})
558         state.infosOurSeen = make(map[[32]byte]uint8)
559         state.infosTheir = make(map[[32]byte]*SPInfo)
560         state.started = started
561         state.xxOnly = xxOnly
562
563         var buf []byte
564         var payload []byte
565         logMsg := func(les LEs) string {
566                 return fmt.Sprintf(
567                         "SP nice %s: waiting for first message",
568                         NicenessFmt(state.Nice),
569                 )
570         }
571         les := LEs{{"Nice", int(state.Nice)}}
572         state.Ctx.LogD("sp-startR", les, logMsg)
573         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
574         if buf, err = state.ReadSP(conn); err != nil {
575                 state.Ctx.LogE("sp-startR-read", les, err, logMsg)
576                 return err
577         }
578         if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
579                 state.Ctx.LogE("sp-startR-read", les, err, logMsg)
580                 return err
581         }
582
583         var node *Node
584         for _, n := range state.Ctx.Neigh {
585                 if subtle.ConstantTimeCompare(state.hs.PeerStatic(), n.NoisePub[:]) == 1 {
586                         node = n
587                         break
588                 }
589         }
590         if node == nil {
591                 peerId := Base32Codec.EncodeToString(state.hs.PeerStatic())
592                 err = errors.New("unknown peer: " + peerId)
593                 state.Ctx.LogE("sp-startR-unknown", append(les, LE{"Peer", peerId}), err, logMsg)
594                 return err
595         }
596         state.Node = node
597         state.rxRate = node.RxRate
598         state.txRate = node.TxRate
599         state.onlineDeadline = node.OnlineDeadline
600         state.maxOnlineTime = node.MaxOnlineTime
601         les = LEs{{"Node", node.Id}, {"Nice", int(state.Nice)}}
602
603         if err = state.Ctx.ensureRxDir(node.Id); err != nil {
604                 return err
605         }
606         var rxLock *os.File
607         if xxOnly == "" || xxOnly == TRx {
608                 rxLock, err = state.Ctx.LockDir(node.Id, string(TRx))
609                 if err != nil {
610                         return err
611                 }
612         }
613         state.rxLock = rxLock
614         var txLock *os.File
615         if xxOnly == "" || xxOnly == TTx {
616                 txLock, err = state.Ctx.LockDir(node.Id, string(TTx))
617                 if err != nil {
618                         return err
619                 }
620         }
621         state.txLock = txLock
622
623         var infosPayloads [][]byte
624         if xxOnly == "" || xxOnly == TTx {
625                 infosPayloads = state.Ctx.infosOur(node.Id, state.Nice, &state.infosOurSeen)
626         }
627         var firstPayload []byte
628         if len(infosPayloads) > 0 {
629                 firstPayload = infosPayloads[0]
630         }
631         // Pad first payload, to hide actual number of existing files
632         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
633                 firstPayload = append(firstPayload, SPHaltMarshalized...)
634         }
635
636         state.Ctx.LogD("sp-startR-write", les, func(les LEs) string {
637                 return fmt.Sprintf(
638                         "SP with %s (nice %s): sending first message",
639                         node.Name, NicenessFmt(state.Nice),
640                 )
641         })
642         buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload)
643         if err != nil {
644                 state.dirUnlock()
645                 return err
646         }
647         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
648         if err = state.WriteSP(conn, buf, false); err != nil {
649                 state.Ctx.LogE("sp-startR-write", les, err, func(les LEs) string {
650                         return fmt.Sprintf(
651                                 "SP with %s (nice %s): writing",
652                                 node.Name, NicenessFmt(state.Nice),
653                         )
654                 })
655                 state.dirUnlock()
656                 return err
657         }
658         state.Ctx.LogD("sp-startR-workers", les, func(les LEs) string {
659                 return fmt.Sprintf(
660                         "SP with %s (nice %s): starting workers",
661                         node.Name, NicenessFmt(state.Nice),
662                 )
663         })
664         err = state.StartWorkers(conn, infosPayloads, payload)
665         if err != nil {
666                 state.dirUnlock()
667         }
668         return err
669 }
670
671 func (state *SPState) closeFd(pth string) {
672         s, exists := state.fds[pth]
673         delete(state.fds, pth)
674         if exists {
675                 s.fd.Close()
676         }
677 }
678
679 func (state *SPState) StartWorkers(
680         conn ConnDeadlined,
681         infosPayloads [][]byte,
682         payload []byte,
683 ) error {
684         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
685         state.fds = make(map[string]FdAndFullSize)
686         state.fileHashers = make(map[string]*HasherAndOffset)
687         state.isDead = make(chan struct{})
688         if state.maxOnlineTime > 0 {
689                 state.mustFinishAt = state.started.Add(state.maxOnlineTime)
690         }
691
692         // Checker
693         if !state.NoCK {
694                 queues := spCheckers[*state.Node.Id]
695                 if queues == nil {
696                         queues = &SPCheckerQueues{
697                                 appeared: make(chan *[32]byte),
698                                 checked:  make(chan *[32]byte),
699                         }
700                         spCheckers[*state.Node.Id] = queues
701                         go SPChecker(state.Ctx, state.Node.Id, queues.appeared, queues.checked)
702                 }
703                 state.checkerQueues = *queues
704                 go func() {
705                         for job := range state.Ctx.JobsNoCK(state.Node.Id) {
706                                 if job.PktEnc.Nice <= state.Nice {
707                                         state.checkerQueues.appeared <- job.HshValue
708                                 }
709                         }
710                 }()
711                 state.wg.Add(1)
712                 go func() {
713                         defer state.wg.Done()
714                         for {
715                                 select {
716                                 case <-state.isDead:
717                                         return
718                                 case hsh := <-state.checkerQueues.checked:
719                                         state.payloads <- MarshalSP(SPTypeDone, SPDone{hsh})
720                                 }
721                         }
722                 }()
723         }
724
725         // Remaining handshake payload sending
726         if len(infosPayloads) > 1 {
727                 state.wg.Add(1)
728                 go func() {
729                         for _, payload := range infosPayloads[1:] {
730                                 state.Ctx.LogD(
731                                         "sp-queue-remaining",
732                                         append(les, LE{"Size", len(payload)}),
733                                         func(les LEs) string {
734                                                 return fmt.Sprintf(
735                                                         "SP with %s (nice %s): queuing remaining payload (%s)",
736                                                         state.Node.Name, NicenessFmt(state.Nice),
737                                                         humanize.IBytes(uint64(len(payload))),
738                                                 )
739                                         },
740                                 )
741                                 state.payloads <- payload
742                         }
743                         state.wg.Done()
744                 }()
745         }
746
747         // Processing of first payload and queueing its responses
748         logMsg := func(les LEs) string {
749                 return fmt.Sprintf(
750                         "SP with %s (nice %s): processing first payload (%s)",
751                         state.Node.Name, NicenessFmt(state.Nice),
752                         humanize.IBytes(uint64(len(payload))),
753                 )
754         }
755         state.Ctx.LogD("sp-process", append(les, LE{"Size", len(payload)}), logMsg)
756         replies, err := state.ProcessSP(payload)
757         if err != nil {
758                 state.Ctx.LogE("sp-process", les, err, logMsg)
759                 return err
760         }
761         state.wg.Add(1)
762         go func() {
763                 for _, reply := range replies {
764                         state.Ctx.LogD(
765                                 "sp-queue-reply",
766                                 append(les, LE{"Size", len(reply)}),
767                                 func(les LEs) string {
768                                         return fmt.Sprintf(
769                                                 "SP with %s (nice %s): queuing reply (%s)",
770                                                 state.Node.Name, NicenessFmt(state.Nice),
771                                                 humanize.IBytes(uint64(len(payload))),
772                                         )
773                                 },
774                         )
775                         state.payloads <- reply
776                 }
777                 state.wg.Done()
778         }()
779
780         // Periodic jobs
781         state.wg.Add(1)
782         go func() {
783                 deadlineTicker := time.NewTicker(time.Second)
784                 pingTicker := time.NewTicker(PingTimeout)
785                 for {
786                         select {
787                         case <-state.isDead:
788                                 state.wg.Done()
789                                 deadlineTicker.Stop()
790                                 pingTicker.Stop()
791                                 return
792                         case now := <-deadlineTicker.C:
793                                 if (now.Sub(state.RxLastNonPing) >= state.onlineDeadline &&
794                                         now.Sub(state.TxLastNonPing) >= state.onlineDeadline) ||
795                                         (state.maxOnlineTime > 0 && state.mustFinishAt.Before(now)) ||
796                                         (now.Sub(state.RxLastSeen) >= 2*PingTimeout) {
797                                         state.SetDead()
798                                         conn.Close() // #nosec G104
799                                 }
800                         case now := <-pingTicker.C:
801                                 if now.After(state.TxLastSeen.Add(PingTimeout)) {
802                                         state.wg.Add(1)
803                                         go func() {
804                                                 state.pings <- struct{}{}
805                                                 state.wg.Done()
806                                         }()
807                                 }
808                         }
809                 }
810         }()
811
812         // Spool checker and INFOs sender of appearing files
813         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
814                 state.wg.Add(1)
815                 go func() {
816                         ticker := time.NewTicker(time.Second)
817                         for {
818                                 select {
819                                 case <-state.isDead:
820                                         state.wg.Done()
821                                         ticker.Stop()
822                                         return
823                                 case <-ticker.C:
824                                         for _, payload := range state.Ctx.infosOur(
825                                                 state.Node.Id,
826                                                 state.Nice,
827                                                 &state.infosOurSeen,
828                                         ) {
829                                                 state.Ctx.LogD(
830                                                         "sp-queue-info",
831                                                         append(les, LE{"Size", len(payload)}),
832                                                         func(les LEs) string {
833                                                                 return fmt.Sprintf(
834                                                                         "SP with %s (nice %s): queuing new info (%s)",
835                                                                         state.Node.Name, NicenessFmt(state.Nice),
836                                                                         humanize.IBytes(uint64(len(payload))),
837                                                                 )
838                                                         },
839                                                 )
840                                                 state.payloads <- payload
841                                         }
842                                 }
843                         }
844                 }()
845         }
846
847         // Sender
848         state.wg.Add(1)
849         go func() {
850                 defer conn.Close()
851                 defer state.SetDead()
852                 defer state.wg.Done()
853                 for {
854                         if state.NotAlive() {
855                                 return
856                         }
857                         var payload []byte
858                         var ping bool
859                         select {
860                         case <-state.pings:
861                                 state.Ctx.LogD("sp-got-ping", les, func(les LEs) string {
862                                         return fmt.Sprintf(
863                                                 "SP with %s (nice %s): got ping",
864                                                 state.Node.Name, NicenessFmt(state.Nice),
865                                         )
866                                 })
867                                 payload = SPPingMarshalized
868                                 ping = true
869                         case payload = <-state.payloads:
870                                 state.Ctx.LogD(
871                                         "sp-got-payload",
872                                         append(les, LE{"Size", len(payload)}),
873                                         func(les LEs) string {
874                                                 return fmt.Sprintf(
875                                                         "SP with %s (nice %s): got payload (%s)",
876                                                         state.Node.Name, NicenessFmt(state.Nice),
877                                                         humanize.IBytes(uint64(len(payload))),
878                                                 )
879                                         },
880                                 )
881                         default:
882                                 state.RLock()
883                                 if len(state.queueTheir) == 0 {
884                                         state.RUnlock()
885                                         time.Sleep(100 * time.Millisecond)
886                                         continue
887                                 }
888                                 freq := state.queueTheir[0].freq
889                                 state.RUnlock()
890                                 if state.txRate > 0 {
891                                         time.Sleep(time.Second / time.Duration(state.txRate))
892                                 }
893                                 pktName := Base32Codec.EncodeToString(freq.Hash[:])
894                                 lesp := append(
895                                         les,
896                                         LE{"XX", string(TTx)},
897                                         LE{"Pkt", pktName},
898                                         LE{"Size", int64(freq.Offset)},
899                                 )
900                                 logMsg := func(les LEs) string {
901                                         return fmt.Sprintf(
902                                                 "SP with %s (nice %s): tx/%s (%s)",
903                                                 state.Node.Name, NicenessFmt(state.Nice),
904                                                 pktName,
905                                                 humanize.IBytes(freq.Offset),
906                                         )
907                                 }
908                                 state.Ctx.LogD("sp-queue", lesp, func(les LEs) string {
909                                         return logMsg(les) + ": queueing"
910                                 })
911                                 pth := filepath.Join(
912                                         state.Ctx.Spool,
913                                         state.Node.Id.String(),
914                                         string(TTx),
915                                         Base32Codec.EncodeToString(freq.Hash[:]),
916                                 )
917                                 fdAndFullSize, exists := state.fds[pth]
918                                 if !exists {
919                                         fd, err := os.Open(pth)
920                                         if err != nil {
921                                                 state.Ctx.LogE("sp-queue-open", lesp, err, func(les LEs) string {
922                                                         return logMsg(les) + ": opening"
923                                                 })
924                                                 return
925                                         }
926                                         fi, err := fd.Stat()
927                                         if err != nil {
928                                                 state.Ctx.LogE("sp-queue-stat", lesp, err, func(les LEs) string {
929                                                         return logMsg(les) + ": stating"
930                                                 })
931                                                 return
932                                         }
933                                         fdAndFullSize = FdAndFullSize{fd: fd, fullSize: fi.Size()}
934                                         state.fds[pth] = fdAndFullSize
935                                 }
936                                 fd := fdAndFullSize.fd
937                                 fullSize := fdAndFullSize.fullSize
938                                 var buf []byte
939                                 if freq.Offset < uint64(fullSize) {
940                                         state.Ctx.LogD("sp-file-seek", lesp, func(les LEs) string {
941                                                 return logMsg(les) + ": seeking"
942                                         })
943                                         if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil {
944                                                 state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
945                                                         return logMsg(les) + ": seeking"
946                                                 })
947                                                 return
948                                         }
949                                         buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
950                                         n, err := fd.Read(buf)
951                                         if err != nil {
952                                                 state.Ctx.LogE("sp-file-read", lesp, err, func(les LEs) string {
953                                                         return logMsg(les) + ": reading"
954                                                 })
955                                                 return
956                                         }
957                                         buf = buf[:n]
958                                         state.Ctx.LogD(
959                                                 "sp-file-read",
960                                                 append(lesp, LE{"Size", n}),
961                                                 func(les LEs) string {
962                                                         return fmt.Sprintf(
963                                                                 "%s: read %s",
964                                                                 logMsg(les), humanize.IBytes(uint64(n)),
965                                                         )
966                                                 },
967                                         )
968                                 }
969                                 state.closeFd(pth)
970                                 payload = MarshalSP(SPTypeFile, SPFile{
971                                         Hash:    freq.Hash,
972                                         Offset:  freq.Offset,
973                                         Payload: buf,
974                                 })
975                                 ourSize := freq.Offset + uint64(len(buf))
976                                 lesp = append(lesp, LE{"Size", int64(ourSize)}, LE{"FullSize", fullSize})
977                                 if state.Ctx.ShowPrgrs {
978                                         Progress("Tx", lesp)
979                                 }
980                                 state.Lock()
981                                 if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash {
982                                         if ourSize == uint64(fullSize) {
983                                                 state.Ctx.LogD("sp-file-finished", lesp, func(les LEs) string {
984                                                         return logMsg(les) + ": finished"
985                                                 })
986                                                 if len(state.queueTheir) > 1 {
987                                                         state.queueTheir = state.queueTheir[1:]
988                                                 } else {
989                                                         state.queueTheir = state.queueTheir[:0]
990                                                 }
991                                         } else {
992                                                 state.queueTheir[0].freq.Offset += uint64(len(buf))
993                                         }
994                                 } else {
995                                         state.Ctx.LogD("sp-file-disappeared", lesp, func(les LEs) string {
996                                                 return logMsg(les) + ": queue disappeared"
997                                         })
998                                 }
999                                 state.Unlock()
1000                         }
1001                         logMsg := func(les LEs) string {
1002                                 return fmt.Sprintf(
1003                                         "SP with %s (nice %s): sending %s",
1004                                         state.Node.Name, NicenessFmt(state.Nice),
1005                                         humanize.IBytes(uint64(len(payload))),
1006                                 )
1007                         }
1008                         state.Ctx.LogD("sp-sending", append(les, LE{"Size", len(payload)}), logMsg)
1009                         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
1010                         if err := state.WriteSP(conn, state.csOur.Encrypt(nil, nil, payload), ping); err != nil {
1011                                 state.Ctx.LogE("sp-sending", les, err, logMsg)
1012                                 return
1013                         }
1014                 }
1015         }()
1016
1017         // Receiver
1018         state.wg.Add(1)
1019         go func() {
1020                 for {
1021                         if state.NotAlive() {
1022                                 break
1023                         }
1024                         logMsg := func(les LEs) string {
1025                                 return fmt.Sprintf(
1026                                         "SP with %s (nice %s): waiting for payload",
1027                                         state.Node.Name, NicenessFmt(state.Nice),
1028                                 )
1029                         }
1030                         state.Ctx.LogD("sp-recv-wait", les, logMsg)
1031                         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
1032                         payload, err := state.ReadSP(conn)
1033                         if err != nil {
1034                                 if err == io.EOF {
1035                                         break
1036                                 }
1037                                 unmarshalErr := err.(*xdr.UnmarshalError)
1038                                 if os.IsTimeout(unmarshalErr.Err) {
1039                                         continue
1040                                 }
1041                                 if unmarshalErr.ErrorCode == xdr.ErrIO {
1042                                         break
1043                                 }
1044                                 state.Ctx.LogE("sp-recv-wait", les, err, logMsg)
1045                                 break
1046                         }
1047                         logMsg = func(les LEs) string {
1048                                 return fmt.Sprintf(
1049                                         "SP with %s (nice %s): payload (%s)",
1050                                         state.Node.Name, NicenessFmt(state.Nice),
1051                                         humanize.IBytes(uint64(len(payload))),
1052                                 )
1053                         }
1054                         state.Ctx.LogD(
1055                                 "sp-recv-got",
1056                                 append(les, LE{"Size", len(payload)}),
1057                                 func(les LEs) string { return logMsg(les) + ": got" },
1058                         )
1059                         payload, err = state.csTheir.Decrypt(nil, nil, payload)
1060                         if err != nil {
1061                                 state.Ctx.LogE("sp-recv-got", les, err, func(les LEs) string {
1062                                         return logMsg(les) + ": got"
1063                                 })
1064                                 break
1065                         }
1066                         state.Ctx.LogD(
1067                                 "sp-recv-process",
1068                                 append(les, LE{"Size", len(payload)}),
1069                                 func(les LEs) string {
1070                                         return logMsg(les) + ": processing"
1071                                 },
1072                         )
1073                         replies, err := state.ProcessSP(payload)
1074                         if err != nil {
1075                                 state.Ctx.LogE("sp-recv-process", les, err, func(les LEs) string {
1076                                         return logMsg(les) + ": processing"
1077                                 })
1078                                 break
1079                         }
1080                         state.wg.Add(1)
1081                         go func() {
1082                                 for _, reply := range replies {
1083                                         state.Ctx.LogD(
1084                                                 "sp-recv-reply",
1085                                                 append(les[:len(les)-1], LE{"Size", len(reply)}),
1086                                                 func(les LEs) string {
1087                                                         return fmt.Sprintf(
1088                                                                 "SP with %s (nice %s): queuing reply (%s)",
1089                                                                 state.Node.Name, NicenessFmt(state.Nice),
1090                                                                 humanize.IBytes(uint64(len(reply))),
1091                                                         )
1092                                                 },
1093                                         )
1094                                         state.payloads <- reply
1095                                 }
1096                                 state.wg.Done()
1097                         }()
1098                         if state.rxRate > 0 {
1099                                 time.Sleep(time.Second / time.Duration(state.rxRate))
1100                         }
1101                 }
1102                 state.SetDead()
1103                 state.wg.Done()
1104                 state.SetDead()
1105                 conn.Close() // #nosec G104
1106         }()
1107
1108         return nil
1109 }
1110
1111 func (state *SPState) Wait() {
1112         state.wg.Wait()
1113         close(state.payloads)
1114         close(state.pings)
1115         state.dirUnlock()
1116         state.Duration = time.Now().Sub(state.started)
1117         state.RxSpeed = state.RxBytes
1118         state.TxSpeed = state.TxBytes
1119         rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
1120         txDuration := int64(state.TxLastSeen.Sub(state.started).Seconds())
1121         if rxDuration > 0 {
1122                 state.RxSpeed = state.RxBytes / rxDuration
1123         }
1124         if txDuration > 0 {
1125                 state.TxSpeed = state.TxBytes / txDuration
1126         }
1127 }
1128
1129 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
1130         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
1131         r := bytes.NewReader(payload)
1132         var err error
1133         var replies [][]byte
1134         var infosGot bool
1135         for r.Len() > 0 {
1136                 state.Ctx.LogD("sp-process-unmarshal", les, func(les LEs) string {
1137                         return fmt.Sprintf(
1138                                 "SP with %s (nice %s): unmarshaling header",
1139                                 state.Node.Name, NicenessFmt(state.Nice),
1140                         )
1141                 })
1142                 var head SPHead
1143                 if _, err = xdr.Unmarshal(r, &head); err != nil {
1144                         state.Ctx.LogE("sp-process-unmarshal", les, err, func(les LEs) string {
1145                                 return fmt.Sprintf(
1146                                         "SP with %s (nice %s): unmarshaling header",
1147                                         state.Node.Name, NicenessFmt(state.Nice),
1148                                 )
1149                         })
1150                         return nil, err
1151                 }
1152                 if head.Type != SPTypePing {
1153                         state.RxLastNonPing = state.RxLastSeen
1154                 }
1155                 switch head.Type {
1156                 case SPTypeHalt:
1157                         state.Ctx.LogD(
1158                                 "sp-process-halt",
1159                                 append(les, LE{"Type", "halt"}), func(les LEs) string {
1160                                         return fmt.Sprintf(
1161                                                 "SP with %s (nice %s): got HALT",
1162                                                 state.Node.Name, NicenessFmt(state.Nice),
1163                                         )
1164                                 },
1165                         )
1166                         state.Lock()
1167                         state.queueTheir = nil
1168                         state.Unlock()
1169
1170                 case SPTypePing:
1171                         state.Ctx.LogD(
1172                                 "sp-process-ping",
1173                                 append(les, LE{"Type", "ping"}),
1174                                 func(les LEs) string {
1175                                         return fmt.Sprintf(
1176                                                 "SP with %s (nice %s): got PING",
1177                                                 state.Node.Name, NicenessFmt(state.Nice),
1178                                         )
1179                                 },
1180                         )
1181
1182                 case SPTypeInfo:
1183                         infosGot = true
1184                         lesp := append(les, LE{"Type", "info"})
1185                         state.Ctx.LogD(
1186                                 "sp-process-info-unmarshal", lesp,
1187                                 func(les LEs) string {
1188                                         return fmt.Sprintf(
1189                                                 "SP with %s (nice %s): unmarshaling INFO",
1190                                                 state.Node.Name, NicenessFmt(state.Nice),
1191                                         )
1192                                 },
1193                         )
1194                         var info SPInfo
1195                         if _, err = xdr.Unmarshal(r, &info); err != nil {
1196                                 state.Ctx.LogE(
1197                                         "sp-process-info-unmarshal", lesp, err,
1198                                         func(les LEs) string {
1199                                                 return fmt.Sprintf(
1200                                                         "SP with %s (nice %s): unmarshaling INFO",
1201                                                         state.Node.Name, NicenessFmt(state.Nice),
1202                                                 )
1203                                         },
1204                                 )
1205                                 return nil, err
1206                         }
1207                         pktName := Base32Codec.EncodeToString(info.Hash[:])
1208                         lesp = append(
1209                                 lesp,
1210                                 LE{"Pkt", pktName},
1211                                 LE{"Size", int64(info.Size)},
1212                                 LE{"PktNice", int(info.Nice)},
1213                         )
1214                         logMsg := func(les LEs) string {
1215                                 return fmt.Sprintf(
1216                                         "SP with %s (nice %s): INFO %s (%s) nice %s",
1217                                         state.Node.Name, NicenessFmt(state.Nice),
1218                                         pktName,
1219                                         humanize.IBytes(info.Size),
1220                                         NicenessFmt(info.Nice),
1221                                 )
1222                         }
1223                         if !state.listOnly && info.Nice > state.Nice {
1224                                 state.Ctx.LogD("sp-process-info-too-nice", lesp, func(les LEs) string {
1225                                         return logMsg(les) + ": too nice"
1226                                 })
1227                                 continue
1228                         }
1229                         state.Ctx.LogD("sp-process-info-got", lesp, func(les LEs) string {
1230                                 return logMsg(les) + ": received"
1231                         })
1232                         if !state.listOnly && state.xxOnly == TTx {
1233                                 continue
1234                         }
1235                         state.Lock()
1236                         state.infosTheir[*info.Hash] = &info
1237                         state.Unlock()
1238                         state.Ctx.LogD("sp-process-info-stat", lesp, func(les LEs) string {
1239                                 return logMsg(les) + ": stating part"
1240                         })
1241                         pktPath := filepath.Join(
1242                                 state.Ctx.Spool,
1243                                 state.Node.Id.String(),
1244                                 string(TRx),
1245                                 Base32Codec.EncodeToString(info.Hash[:]),
1246                         )
1247                         logMsg = func(les LEs) string {
1248                                 return fmt.Sprintf(
1249                                         "Packet %s (%s) (nice %s)",
1250                                         pktName,
1251                                         humanize.IBytes(info.Size),
1252                                         NicenessFmt(info.Nice),
1253                                 )
1254                         }
1255                         if _, err = os.Stat(pktPath); err == nil {
1256                                 state.Ctx.LogI("sp-info-done", lesp, func(les LEs) string {
1257                                         return logMsg(les) + ": already done"
1258                                 })
1259                                 if !state.listOnly {
1260                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
1261                                 }
1262                                 continue
1263                         }
1264                         if _, err = os.Stat(pktPath + SeenSuffix); err == nil {
1265                                 state.Ctx.LogI("sp-info-seen", lesp, func(les LEs) string {
1266                                         return logMsg(les) + ": already seen"
1267                                 })
1268                                 if !state.listOnly {
1269                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
1270                                 }
1271                                 continue
1272                         }
1273                         if _, err = os.Stat(pktPath + NoCKSuffix); err == nil {
1274                                 state.Ctx.LogI("sp-info-nock", lesp, func(les LEs) string {
1275                                         return logMsg(les) + ": still not checksummed"
1276                                 })
1277                                 continue
1278                         }
1279                         fi, err := os.Stat(pktPath + PartSuffix)
1280                         var offset int64
1281                         if err == nil {
1282                                 offset = fi.Size()
1283                         }
1284                         if !state.Ctx.IsEnoughSpace(int64(info.Size) - offset) {
1285                                 state.Ctx.LogI("sp-info-no-space", lesp, func(les LEs) string {
1286                                         return logMsg(les) + ": not enough space"
1287                                 })
1288                                 continue
1289                         }
1290                         state.Ctx.LogI(
1291                                 "sp-info",
1292                                 append(lesp, LE{"Offset", offset}),
1293                                 func(les LEs) string {
1294                                         return fmt.Sprintf(
1295                                                 "%s: %d%%", logMsg(les), 100*uint64(offset)/info.Size,
1296                                         )
1297                                 },
1298                         )
1299                         if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) {
1300                                 replies = append(replies, MarshalSP(
1301                                         SPTypeFreq,
1302                                         SPFreq{info.Hash, uint64(offset)},
1303                                 ))
1304                         }
1305
1306                 case SPTypeFile:
1307                         lesp := append(les, LE{"Type", "file"})
1308                         state.Ctx.LogD("sp-process-file", lesp, func(les LEs) string {
1309                                 return fmt.Sprintf(
1310                                         "SP with %s (nice %s): unmarshaling FILE",
1311                                         state.Node.Name, NicenessFmt(state.Nice),
1312                                 )
1313                         })
1314                         var file SPFile
1315                         if _, err = xdr.Unmarshal(r, &file); err != nil {
1316                                 state.Ctx.LogE("sp-process-file", lesp, err, func(les LEs) string {
1317                                         return fmt.Sprintf(
1318                                                 "SP with %s (nice %s): unmarshaling FILE",
1319                                                 state.Node.Name, NicenessFmt(state.Nice),
1320                                         )
1321                                 })
1322                                 return nil, err
1323                         }
1324                         pktName := Base32Codec.EncodeToString(file.Hash[:])
1325                         lesp = append(
1326                                 lesp,
1327                                 LE{"XX", string(TRx)},
1328                                 LE{"Pkt", pktName},
1329                                 LE{"Size", len(file.Payload)},
1330                         )
1331                         logMsg := func(les LEs) string {
1332                                 return fmt.Sprintf(
1333                                         "Got packet %s (%s)",
1334                                         pktName, humanize.IBytes(uint64(len(file.Payload))),
1335                                 )
1336                         }
1337                         dirToSync := filepath.Join(
1338                                 state.Ctx.Spool,
1339                                 state.Node.Id.String(),
1340                                 string(TRx),
1341                         )
1342                         filePath := filepath.Join(dirToSync, pktName)
1343                         filePathPart := filePath + PartSuffix
1344                         state.Ctx.LogD("sp-file-open", lesp, func(les LEs) string {
1345                                 return logMsg(les) + ": opening part"
1346                         })
1347                         fdAndFullSize, exists := state.fds[filePathPart]
1348                         var fd *os.File
1349                         if exists {
1350                                 fd = fdAndFullSize.fd
1351                         } else {
1352                                 fd, err = os.OpenFile(
1353                                         filePathPart,
1354                                         os.O_RDWR|os.O_CREATE,
1355                                         os.FileMode(0666),
1356                                 )
1357                                 if err != nil {
1358                                         state.Ctx.LogE("sp-file-open", lesp, err, func(les LEs) string {
1359                                                 return logMsg(les) + ": opening part"
1360                                         })
1361                                         return nil, err
1362                                 }
1363                                 state.fds[filePathPart] = FdAndFullSize{fd: fd}
1364                                 if file.Offset == 0 {
1365                                         h, err := blake2b.New256(nil)
1366                                         if err != nil {
1367                                                 panic(err)
1368                                         }
1369                                         state.fileHashers[filePath] = &HasherAndOffset{h: h}
1370                                 }
1371                         }
1372                         state.Ctx.LogD(
1373                                 "sp-file-seek",
1374                                 append(lesp, LE{"Offset", file.Offset}),
1375                                 func(les LEs) string {
1376                                         return fmt.Sprintf("%s: seeking %d", logMsg(les), file.Offset)
1377                                 })
1378                         if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
1379                                 state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
1380                                         return logMsg(les) + ": seeking"
1381                                 })
1382                                 state.closeFd(filePathPart)
1383                                 return nil, err
1384                         }
1385                         state.Ctx.LogD("sp-file-write", lesp, func(les LEs) string {
1386                                 return logMsg(les) + ": writing"
1387                         })
1388                         if _, err = fd.Write(file.Payload); err != nil {
1389                                 state.Ctx.LogE("sp-file-write", lesp, err, func(les LEs) string {
1390                                         return logMsg(les) + ": writing"
1391                                 })
1392                                 state.closeFd(filePathPart)
1393                                 return nil, err
1394                         }
1395                         hasherAndOffset, hasherExists := state.fileHashers[filePath]
1396                         if hasherExists {
1397                                 if hasherAndOffset.offset == file.Offset {
1398                                         if _, err = hasherAndOffset.h.Write(file.Payload); err != nil {
1399                                                 panic(err)
1400                                         }
1401                                         hasherAndOffset.offset += uint64(len(file.Payload))
1402                                 } else {
1403                                         state.Ctx.LogD(
1404                                                 "sp-file-offset-differs", lesp,
1405                                                 func(les LEs) string {
1406                                                         return logMsg(les) + ": offset differs, deleting hasher"
1407                                                 },
1408                                         )
1409                                         delete(state.fileHashers, filePath)
1410                                         hasherExists = false
1411                                 }
1412                         }
1413                         ourSize := int64(file.Offset + uint64(len(file.Payload)))
1414                         lesp[len(lesp)-1].V = ourSize
1415                         fullsize := int64(0)
1416                         state.RLock()
1417                         infoTheir, ok := state.infosTheir[*file.Hash]
1418                         state.RUnlock()
1419                         if ok {
1420                                 fullsize = int64(infoTheir.Size)
1421                         }
1422                         lesp = append(lesp, LE{"FullSize", fullsize})
1423                         if state.Ctx.ShowPrgrs {
1424                                 Progress("Rx", lesp)
1425                         }
1426                         if fullsize != ourSize {
1427                                 continue
1428                         }
1429                         logMsg = func(les LEs) string {
1430                                 return fmt.Sprintf(
1431                                         "Got packet %s %d%% (%s / %s)",
1432                                         pktName, 100*ourSize/fullsize,
1433                                         humanize.IBytes(uint64(ourSize)),
1434                                         humanize.IBytes(uint64(fullsize)),
1435                                 )
1436                         }
1437                         err = fd.Sync()
1438                         if err != nil {
1439                                 state.Ctx.LogE("sp-file-sync", lesp, err, func(les LEs) string {
1440                                         return logMsg(les) + ": syncing"
1441                                 })
1442                                 state.closeFd(filePathPart)
1443                                 continue
1444                         }
1445                         if hasherExists {
1446                                 if bytes.Compare(hasherAndOffset.h.Sum(nil), file.Hash[:]) != 0 {
1447                                         state.Ctx.LogE(
1448                                                 "sp-file-bad-checksum", lesp,
1449                                                 errors.New("checksum mismatch"),
1450                                                 logMsg,
1451                                         )
1452                                         continue
1453                                 }
1454                                 if err = os.Rename(filePathPart, filePath); err != nil {
1455                                         state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string {
1456                                                 return logMsg(les) + ": renaming"
1457                                         })
1458                                         continue
1459                                 }
1460                                 if err = DirSync(dirToSync); err != nil {
1461                                         state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string {
1462                                                 return logMsg(les) + ": dirsyncing"
1463                                         })
1464                                         continue
1465                                 }
1466                                 state.Ctx.LogI("sp-file-done", lesp, func(les LEs) string {
1467                                         return logMsg(les) + ": done"
1468                                 })
1469                                 state.wg.Add(1)
1470                                 go func() {
1471                                         state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
1472                                         state.wg.Done()
1473                                 }()
1474                                 state.Lock()
1475                                 delete(state.infosTheir, *file.Hash)
1476                                 state.Unlock()
1477                                 if !state.Ctx.HdrUsage {
1478                                         state.closeFd(filePathPart)
1479                                         continue
1480                                 }
1481                                 if _, err = fd.Seek(0, io.SeekStart); err != nil {
1482                                         state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
1483                                                 return logMsg(les) + ": seeking"
1484                                         })
1485                                         state.closeFd(filePathPart)
1486                                         continue
1487                                 }
1488                                 _, pktEncRaw, err := state.Ctx.HdrRead(fd)
1489                                 state.closeFd(filePathPart)
1490                                 if err != nil {
1491                                         state.Ctx.LogE("sp-file-hdr-read", lesp, err, func(les LEs) string {
1492                                                 return logMsg(les) + ": HdrReading"
1493                                         })
1494                                         continue
1495                                 }
1496                                 state.Ctx.HdrWrite(pktEncRaw, filePath)
1497                                 continue
1498                         }
1499                         state.closeFd(filePathPart)
1500                         if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil {
1501                                 state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string {
1502                                         return logMsg(les) + ": renaming"
1503                                 })
1504                                 continue
1505                         }
1506                         if err = DirSync(dirToSync); err != nil {
1507                                 state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string {
1508                                         return logMsg(les) + ": dirsyncing"
1509                                 })
1510                                 continue
1511                         }
1512                         state.Ctx.LogI("sp-file-downloaded", lesp, func(les LEs) string {
1513                                 return logMsg(les) + ": downloaded"
1514                         })
1515                         state.Lock()
1516                         delete(state.infosTheir, *file.Hash)
1517                         state.Unlock()
1518                         if !state.NoCK {
1519                                 state.checkerQueues.appeared <- file.Hash
1520                         }
1521
1522                 case SPTypeDone:
1523                         lesp := append(les, LE{"Type", "done"})
1524                         state.Ctx.LogD("sp-process-done-unmarshal", lesp, func(les LEs) string {
1525                                 return fmt.Sprintf(
1526                                         "SP with %s (nice %s): unmarshaling DONE",
1527                                         state.Node.Name, NicenessFmt(state.Nice),
1528                                 )
1529                         })
1530                         var done SPDone
1531                         if _, err = xdr.Unmarshal(r, &done); err != nil {
1532                                 state.Ctx.LogE("sp-process-done-unmarshal", lesp, err, func(les LEs) string {
1533                                         return fmt.Sprintf(
1534                                                 "SP with %s (nice %s): unmarshaling DONE",
1535                                                 state.Node.Name, NicenessFmt(state.Nice),
1536                                         )
1537                                 })
1538                                 return nil, err
1539                         }
1540                         pktName := Base32Codec.EncodeToString(done.Hash[:])
1541                         lesp = append(lesp, LE{"Pkt", pktName}, LE{"XX", string(TTx)})
1542                         logMsg := func(les LEs) string {
1543                                 return fmt.Sprintf(
1544                                         "SP with %s (nice %s): DONE: removing %s",
1545                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1546                                 )
1547                         }
1548                         state.Ctx.LogD("sp-done", lesp, logMsg)
1549                         pth := filepath.Join(
1550                                 state.Ctx.Spool,
1551                                 state.Node.Id.String(),
1552                                 string(TTx),
1553                                 pktName,
1554                         )
1555                         if err = os.Remove(pth); err == nil {
1556                                 state.Ctx.LogI("sp-done", lesp, func(les LEs) string {
1557                                         return fmt.Sprintf("Packet %s is sent", pktName)
1558                                 })
1559                                 if state.Ctx.HdrUsage {
1560                                         os.Remove(pth + HdrSuffix)
1561                                 }
1562                         } else {
1563                                 state.Ctx.LogE("sp-done", lesp, err, logMsg)
1564                         }
1565
1566                 case SPTypeFreq:
1567                         lesp := append(les, LE{"Type", "freq"})
1568                         state.Ctx.LogD("sp-process-freq", lesp, func(les LEs) string {
1569                                 return fmt.Sprintf(
1570                                         "SP with %s (nice %s): unmarshaling FREQ",
1571                                         state.Node.Name, NicenessFmt(state.Nice),
1572                                 )
1573                         })
1574                         var freq SPFreq
1575                         if _, err = xdr.Unmarshal(r, &freq); err != nil {
1576                                 state.Ctx.LogE("sp-process-freq", lesp, err, func(les LEs) string {
1577                                         return fmt.Sprintf(
1578                                                 "SP with %s (nice %s): unmarshaling FREQ",
1579                                                 state.Node.Name, NicenessFmt(state.Nice),
1580                                         )
1581                                 })
1582                                 return nil, err
1583                         }
1584                         pktName := Base32Codec.EncodeToString(freq.Hash[:])
1585                         lesp = append(lesp, LE{"Pkt", pktName}, LE{"Offset", freq.Offset})
1586                         state.Ctx.LogD("sp-process-freq-queueing", lesp, func(les LEs) string {
1587                                 return fmt.Sprintf(
1588                                         "SP with %s (nice %s): FREQ %s: queuing",
1589                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1590                                 )
1591                         })
1592                         nice, exists := state.infosOurSeen[*freq.Hash]
1593                         if exists {
1594                                 if state.onlyPkts == nil || !state.onlyPkts[*freq.Hash] {
1595                                         state.Lock()
1596                                         insertIdx := 0
1597                                         var freqWithNice *FreqWithNice
1598                                         for insertIdx, freqWithNice = range state.queueTheir {
1599                                                 if freqWithNice.nice > nice {
1600                                                         break
1601                                                 }
1602                                         }
1603                                         state.queueTheir = append(state.queueTheir, nil)
1604                                         copy(state.queueTheir[insertIdx+1:], state.queueTheir[insertIdx:])
1605                                         state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice}
1606                                         state.Unlock()
1607                                 } else {
1608                                         state.Ctx.LogD("sp-process-freq-skip", lesp, func(les LEs) string {
1609                                                 return fmt.Sprintf(
1610                                                         "SP with %s (nice %s): FREQ %s: skipping",
1611                                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1612                                                 )
1613                                         })
1614                                 }
1615                         } else {
1616                                 state.Ctx.LogD("sp-process-freq-unknown", lesp, func(les LEs) string {
1617                                         return fmt.Sprintf(
1618                                                 "SP with %s (nice %s): FREQ %s: unknown",
1619                                                 state.Node.Name, NicenessFmt(state.Nice), pktName,
1620                                         )
1621                                 })
1622                         }
1623
1624                 default:
1625                         state.Ctx.LogE(
1626                                 "sp-process-type-unknown",
1627                                 append(les, LE{"Type", head.Type}),
1628                                 errors.New("unknown type"),
1629                                 func(les LEs) string {
1630                                         return fmt.Sprintf(
1631                                                 "SP with %s (nice %s): %d",
1632                                                 state.Node.Name, NicenessFmt(state.Nice), head.Type,
1633                                         )
1634                                 },
1635                         )
1636                         return nil, BadPktType
1637                 }
1638         }
1639
1640         if infosGot {
1641                 var pkts int
1642                 var size uint64
1643                 state.RLock()
1644                 for _, info := range state.infosTheir {
1645                         pkts++
1646                         size += info.Size
1647                 }
1648                 state.RUnlock()
1649                 state.Ctx.LogI("sp-infos-rx", LEs{
1650                         {"XX", string(TRx)},
1651                         {"Node", state.Node.Id},
1652                         {"Pkts", pkts},
1653                         {"Size", int64(size)},
1654                 }, func(les LEs) string {
1655                         return fmt.Sprintf(
1656                                 "%s has got for us: %d packets, %s",
1657                                 state.Node.Name, pkts, humanize.IBytes(size),
1658                         )
1659                 })
1660         }
1661         return payloadsSplit(replies), nil
1662 }