]> Cypherpunks.ru repositories - nncp.git/blob - src/sp.go
Merge branch 'develop'
[nncp.git] / src / sp.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bytes"
22         "crypto/subtle"
23         "errors"
24         "fmt"
25         "io"
26         "os"
27         "path/filepath"
28         "sort"
29         "sync"
30         "time"
31
32         xdr "github.com/davecgh/go-xdr/xdr2"
33         "github.com/dustin/go-humanize"
34         "github.com/flynn/noise"
35 )
36
37 const (
38         MaxSPSize      = 1<<16 - 256
39         PartSuffix     = ".part"
40         SPHeadOverhead = 4
41 )
42
43 type MTHAndOffset struct {
44         mth    *MTH
45         offset uint64
46 }
47
48 type SPCheckerTask struct {
49         nodeId *NodeId
50         hsh    *[MTHSize]byte
51         mth    *MTH
52         done   chan []byte
53 }
54
55 var (
56         SPInfoOverhead    int
57         SPFreqOverhead    int
58         SPFileOverhead    int
59         SPHaltMarshalized []byte
60         SPPingMarshalized []byte
61
62         NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite(
63                 noise.DH25519,
64                 noise.CipherChaChaPoly,
65                 noise.HashBLAKE2b,
66         )
67
68         DefaultDeadline = 10 * time.Second
69         PingTimeout     = time.Minute
70
71         spCheckerTasks chan SPCheckerTask
72         SPCheckerWg    sync.WaitGroup
73         spCheckerOnce  sync.Once
74 )
75
76 type FdAndFullSize struct {
77         fd       *os.File
78         fullSize int64
79 }
80
81 type SPType uint8
82
83 const (
84         SPTypeInfo SPType = iota
85         SPTypeFreq SPType = iota
86         SPTypeFile SPType = iota
87         SPTypeDone SPType = iota
88         SPTypeHalt SPType = iota
89         SPTypePing SPType = iota
90 )
91
92 type SPHead struct {
93         Type SPType
94 }
95
96 type SPInfo struct {
97         Nice uint8
98         Size uint64
99         Hash *[MTHSize]byte
100 }
101
102 type SPFreq struct {
103         Hash   *[MTHSize]byte
104         Offset uint64
105 }
106
107 type SPFile struct {
108         Hash    *[MTHSize]byte
109         Offset  uint64
110         Payload []byte
111 }
112
113 type SPDone struct {
114         Hash *[MTHSize]byte
115 }
116
117 type SPRaw struct {
118         Magic   [8]byte
119         Payload []byte
120 }
121
122 type FreqWithNice struct {
123         freq *SPFreq
124         nice uint8
125 }
126
127 type ConnDeadlined interface {
128         io.ReadWriteCloser
129         SetReadDeadline(t time.Time) error
130         SetWriteDeadline(t time.Time) error
131 }
132
133 func init() {
134         var buf bytes.Buffer
135         spHead := SPHead{Type: SPTypeHalt}
136         if _, err := xdr.Marshal(&buf, spHead); err != nil {
137                 panic(err)
138         }
139         SPHaltMarshalized = make([]byte, SPHeadOverhead)
140         copy(SPHaltMarshalized, buf.Bytes())
141         buf.Reset()
142
143         spHead = SPHead{Type: SPTypePing}
144         if _, err := xdr.Marshal(&buf, spHead); err != nil {
145                 panic(err)
146         }
147         SPPingMarshalized = make([]byte, SPHeadOverhead)
148         copy(SPPingMarshalized, buf.Bytes())
149         buf.Reset()
150
151         spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([MTHSize]byte)}
152         if _, err := xdr.Marshal(&buf, spInfo); err != nil {
153                 panic(err)
154         }
155         SPInfoOverhead = buf.Len()
156         buf.Reset()
157
158         spFreq := SPFreq{Hash: new([MTHSize]byte), Offset: 123}
159         if _, err := xdr.Marshal(&buf, spFreq); err != nil {
160                 panic(err)
161         }
162         SPFreqOverhead = buf.Len()
163         buf.Reset()
164
165         spFile := SPFile{Hash: new([MTHSize]byte), Offset: 123}
166         if _, err := xdr.Marshal(&buf, spFile); err != nil {
167                 panic(err)
168         }
169         SPFileOverhead = buf.Len()
170         spCheckerTasks = make(chan SPCheckerTask)
171 }
172
173 func MarshalSP(typ SPType, sp interface{}) []byte {
174         var buf bytes.Buffer
175         if _, err := xdr.Marshal(&buf, SPHead{typ}); err != nil {
176                 panic(err)
177         }
178         if _, err := xdr.Marshal(&buf, sp); err != nil {
179                 panic(err)
180         }
181         return buf.Bytes()
182 }
183
184 func payloadsSplit(payloads [][]byte) [][]byte {
185         var outbounds [][]byte
186         outbound := make([]byte, 0, MaxSPSize)
187         for i, payload := range payloads {
188                 outbound = append(outbound, payload...)
189                 if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxSPSize {
190                         outbounds = append(outbounds, outbound)
191                         outbound = make([]byte, 0, MaxSPSize)
192                 }
193         }
194         if len(outbound) > 0 {
195                 outbounds = append(outbounds, outbound)
196         }
197         return outbounds
198 }
199
200 type SPState struct {
201         Ctx            *Ctx
202         Node           *Node
203         Nice           uint8
204         NoCK           bool
205         onlineDeadline time.Duration
206         maxOnlineTime  time.Duration
207         hs             *noise.HandshakeState
208         csOur          *noise.CipherState
209         csTheir        *noise.CipherState
210         payloads       chan []byte
211         pings          chan struct{}
212         infosTheir     map[[MTHSize]byte]*SPInfo
213         infosOurSeen   map[[MTHSize]byte]uint8
214         queueTheir     []*FreqWithNice
215         wg             sync.WaitGroup
216         RxBytes        int64
217         RxLastSeen     time.Time
218         RxLastNonPing  time.Time
219         TxBytes        int64
220         TxLastSeen     time.Time
221         TxLastNonPing  time.Time
222         started        time.Time
223         mustFinishAt   time.Time
224         Duration       time.Duration
225         RxSpeed        int64
226         TxSpeed        int64
227         rxLock         *os.File
228         txLock         *os.File
229         xxOnly         TRxTx
230         rxRate         int
231         txRate         int
232         isDead         chan struct{}
233         listOnly       bool
234         onlyPkts       map[[MTHSize]byte]bool
235         writeSPBuf     bytes.Buffer
236         fds            map[string]FdAndFullSize
237         fdsLock        sync.RWMutex
238         fileHashers    map[string]*MTHAndOffset
239         progressBars   map[string]struct{}
240         sync.RWMutex
241 }
242
243 func (state *SPState) SetDead() {
244         state.Lock()
245         defer state.Unlock()
246         select {
247         case <-state.isDead:
248                 // Already closed channel, dead
249                 return
250         default:
251         }
252         close(state.isDead)
253         go func() {
254                 for range state.payloads {
255                 }
256         }()
257         go func() {
258                 for range state.pings {
259                 }
260         }()
261 }
262
263 func (state *SPState) NotAlive() bool {
264         select {
265         case <-state.isDead:
266                 return true
267         default:
268         }
269         return false
270 }
271
272 func (state *SPState) dirUnlock() {
273         state.Ctx.UnlockDir(state.rxLock)
274         state.Ctx.UnlockDir(state.txLock)
275 }
276
277 func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error {
278         state.writeSPBuf.Reset()
279         n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{
280                 Magic:   MagicNNCPSv1.B,
281                 Payload: payload,
282         })
283         if err != nil {
284                 return err
285         }
286         if n, err = dst.Write(state.writeSPBuf.Bytes()); err == nil {
287                 state.TxLastSeen = time.Now()
288                 state.TxBytes += int64(n)
289                 if !ping {
290                         state.TxLastNonPing = state.TxLastSeen
291                 }
292         }
293         return err
294 }
295
296 func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
297         var sp SPRaw
298         n, err := xdr.UnmarshalLimited(src, &sp, 1<<17)
299         if err != nil {
300                 ue := err.(*xdr.UnmarshalError)
301                 if ue.Err == io.EOF {
302                         return nil, ue.Err
303                 }
304                 return nil, err
305         }
306         state.RxLastSeen = time.Now()
307         state.RxBytes += int64(n)
308         if sp.Magic != MagicNNCPSv1.B {
309                 return nil, BadMagic
310         }
311         return sp.Payload, nil
312 }
313
314 func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[MTHSize]byte]uint8) [][]byte {
315         var infos []*SPInfo
316         var totalSize int64
317         for job := range ctx.Jobs(nodeId, TTx) {
318                 if job.PktEnc.Nice > nice {
319                         continue
320                 }
321                 if _, known := (*seen)[*job.HshValue]; known {
322                         continue
323                 }
324                 totalSize += job.Size
325                 infos = append(infos, &SPInfo{
326                         Nice: job.PktEnc.Nice,
327                         Size: uint64(job.Size),
328                         Hash: job.HshValue,
329                 })
330                 (*seen)[*job.HshValue] = job.PktEnc.Nice
331         }
332         sort.Sort(ByNice(infos))
333         var payloads [][]byte
334         for _, info := range infos {
335                 payloads = append(payloads, MarshalSP(SPTypeInfo, info))
336                 pktName := Base32Codec.EncodeToString(info.Hash[:])
337                 ctx.LogD("sp-info-our", LEs{
338                         {"Node", nodeId},
339                         {"Name", pktName},
340                         {"Size", info.Size},
341                 }, func(les LEs) string {
342                         return fmt.Sprintf(
343                                 "Our info: %s/tx/%s (%s)",
344                                 ctx.NodeName(nodeId),
345                                 pktName,
346                                 humanize.IBytes(info.Size),
347                         )
348                 })
349         }
350         if totalSize > 0 {
351                 ctx.LogI("sp-infos-tx", LEs{
352                         {"XX", string(TTx)},
353                         {"Node", nodeId},
354                         {"Pkts", len(payloads)},
355                         {"Size", totalSize},
356                 }, func(les LEs) string {
357                         return fmt.Sprintf(
358                                 "We have got for %s: %d packets, %s",
359                                 ctx.NodeName(nodeId),
360                                 len(payloads),
361                                 humanize.IBytes(uint64(totalSize)),
362                         )
363                 })
364         }
365         return payloadsSplit(payloads)
366 }
367
368 func (state *SPState) StartI(conn ConnDeadlined) error {
369         nodeId := state.Node.Id
370         err := state.Ctx.ensureRxDir(nodeId)
371         if err != nil {
372                 return err
373         }
374         var rxLock *os.File
375         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TRx) {
376                 rxLock, err = state.Ctx.LockDir(nodeId, string(TRx))
377                 if err != nil {
378                         return err
379                 }
380         }
381         var txLock *os.File
382         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
383                 txLock, err = state.Ctx.LockDir(nodeId, string(TTx))
384                 if err != nil {
385                         return err
386                 }
387         }
388         started := time.Now()
389         conf := noise.Config{
390                 CipherSuite: NoiseCipherSuite,
391                 Pattern:     noise.HandshakeIK,
392                 Initiator:   true,
393                 StaticKeypair: noise.DHKey{
394                         Private: state.Ctx.Self.NoisePrv[:],
395                         Public:  state.Ctx.Self.NoisePub[:],
396                 },
397                 PeerStatic: state.Node.NoisePub[:],
398         }
399         hs, err := noise.NewHandshakeState(conf)
400         if err != nil {
401                 return err
402         }
403         state.hs = hs
404         state.payloads = make(chan []byte)
405         state.pings = make(chan struct{})
406         state.infosTheir = make(map[[MTHSize]byte]*SPInfo)
407         state.infosOurSeen = make(map[[MTHSize]byte]uint8)
408         state.progressBars = make(map[string]struct{})
409         state.started = started
410         state.rxLock = rxLock
411         state.txLock = txLock
412
413         var infosPayloads [][]byte
414         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
415                 infosPayloads = state.Ctx.infosOur(nodeId, state.Nice, &state.infosOurSeen)
416         }
417         var firstPayload []byte
418         if len(infosPayloads) > 0 {
419                 firstPayload = infosPayloads[0]
420         }
421         // Pad first payload, to hide actual number of existing files
422         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
423                 firstPayload = append(firstPayload, SPHaltMarshalized...)
424         }
425
426         var buf []byte
427         var payload []byte
428         buf, _, _, err = state.hs.WriteMessage(nil, firstPayload)
429         if err != nil {
430                 state.dirUnlock()
431                 return err
432         }
433         les := LEs{{"Node", nodeId}, {"Nice", int(state.Nice)}}
434         state.Ctx.LogD("sp-startI", les, func(les LEs) string {
435                 return fmt.Sprintf(
436                         "SP with %s (nice %s): sending first message",
437                         state.Node.Name,
438                         NicenessFmt(state.Nice),
439                 )
440         })
441         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
442         if err = state.WriteSP(conn, buf, false); err != nil {
443                 state.Ctx.LogE("sp-startI", les, err, func(les LEs) string {
444                         return fmt.Sprintf(
445                                 "SP with %s (nice %s): writing",
446                                 state.Node.Name,
447                                 NicenessFmt(state.Nice),
448                         )
449                 })
450                 state.dirUnlock()
451                 return err
452         }
453         state.Ctx.LogD("sp-startI-wait", les, func(les LEs) string {
454                 return fmt.Sprintf(
455                         "SP with %s (nice %s): waiting for first message",
456                         state.Node.Name,
457                         NicenessFmt(state.Nice),
458                 )
459         })
460         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
461         if buf, err = state.ReadSP(conn); err != nil {
462                 state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string {
463                         return fmt.Sprintf(
464                                 "SP with %s (nice %s): reading",
465                                 state.Node.Name,
466                                 NicenessFmt(state.Nice),
467                         )
468                 })
469                 state.dirUnlock()
470                 return err
471         }
472         payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
473         if err != nil {
474                 state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string {
475                         return fmt.Sprintf(
476                                 "SP with %s (nice %s): reading Noise message",
477                                 state.Node.Name,
478                                 NicenessFmt(state.Nice),
479                         )
480                 })
481                 state.dirUnlock()
482                 return err
483         }
484         state.Ctx.LogD("sp-startI-workers", les, func(les LEs) string {
485                 return fmt.Sprintf(
486                         "SP with %s (nice %s): starting workers",
487                         state.Node.Name,
488                         NicenessFmt(state.Nice),
489                 )
490         })
491         err = state.StartWorkers(conn, infosPayloads, payload)
492         if err != nil {
493                 state.Ctx.LogE("sp-startI-workers", les, err, func(les LEs) string {
494                         return fmt.Sprintf(
495                                 "SP with %s (nice %s): starting workers",
496                                 state.Node.Name,
497                                 NicenessFmt(state.Nice),
498                         )
499                 })
500                 state.dirUnlock()
501         }
502         return err
503 }
504
505 func (state *SPState) StartR(conn ConnDeadlined) error {
506         started := time.Now()
507         conf := noise.Config{
508                 CipherSuite: NoiseCipherSuite,
509                 Pattern:     noise.HandshakeIK,
510                 Initiator:   false,
511                 StaticKeypair: noise.DHKey{
512                         Private: state.Ctx.Self.NoisePrv[:],
513                         Public:  state.Ctx.Self.NoisePub[:],
514                 },
515         }
516         hs, err := noise.NewHandshakeState(conf)
517         if err != nil {
518                 return err
519         }
520         xxOnly := TRxTx("")
521         state.hs = hs
522         state.payloads = make(chan []byte)
523         state.pings = make(chan struct{})
524         state.infosOurSeen = make(map[[MTHSize]byte]uint8)
525         state.infosTheir = make(map[[MTHSize]byte]*SPInfo)
526         state.progressBars = make(map[string]struct{})
527         state.started = started
528         state.xxOnly = xxOnly
529
530         var buf []byte
531         var payload []byte
532         logMsg := func(les LEs) string {
533                 return fmt.Sprintf(
534                         "SP nice %s: waiting for first message",
535                         NicenessFmt(state.Nice),
536                 )
537         }
538         les := LEs{{"Nice", int(state.Nice)}}
539         state.Ctx.LogD("sp-startR", les, logMsg)
540         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
541         if buf, err = state.ReadSP(conn); err != nil {
542                 state.Ctx.LogE("sp-startR-read", les, err, logMsg)
543                 return err
544         }
545         if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
546                 state.Ctx.LogE("sp-startR-read", les, err, logMsg)
547                 return err
548         }
549
550         var node *Node
551         for _, n := range state.Ctx.Neigh {
552                 if subtle.ConstantTimeCompare(state.hs.PeerStatic(), n.NoisePub[:]) == 1 {
553                         node = n
554                         break
555                 }
556         }
557         if node == nil {
558                 peerId := Base32Codec.EncodeToString(state.hs.PeerStatic())
559                 err = errors.New("unknown peer: " + peerId)
560                 state.Ctx.LogE("sp-startR-unknown", append(les, LE{"Peer", peerId}), err, logMsg)
561                 return err
562         }
563         state.Node = node
564         state.rxRate = node.RxRate
565         state.txRate = node.TxRate
566         state.onlineDeadline = node.OnlineDeadline
567         state.maxOnlineTime = node.MaxOnlineTime
568         les = LEs{{"Node", node.Id}, {"Nice", int(state.Nice)}}
569
570         if err = state.Ctx.ensureRxDir(node.Id); err != nil {
571                 return err
572         }
573         var rxLock *os.File
574         if xxOnly == "" || xxOnly == TRx {
575                 rxLock, err = state.Ctx.LockDir(node.Id, string(TRx))
576                 if err != nil {
577                         return err
578                 }
579         }
580         state.rxLock = rxLock
581         var txLock *os.File
582         if xxOnly == "" || xxOnly == TTx {
583                 txLock, err = state.Ctx.LockDir(node.Id, string(TTx))
584                 if err != nil {
585                         return err
586                 }
587         }
588         state.txLock = txLock
589
590         var infosPayloads [][]byte
591         if xxOnly == "" || xxOnly == TTx {
592                 infosPayloads = state.Ctx.infosOur(node.Id, state.Nice, &state.infosOurSeen)
593         }
594         var firstPayload []byte
595         if len(infosPayloads) > 0 {
596                 firstPayload = infosPayloads[0]
597         }
598         // Pad first payload, to hide actual number of existing files
599         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
600                 firstPayload = append(firstPayload, SPHaltMarshalized...)
601         }
602
603         state.Ctx.LogD("sp-startR-write", les, func(les LEs) string {
604                 return fmt.Sprintf(
605                         "SP with %s (nice %s): sending first message",
606                         node.Name, NicenessFmt(state.Nice),
607                 )
608         })
609         buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload)
610         if err != nil {
611                 state.dirUnlock()
612                 return err
613         }
614         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
615         if err = state.WriteSP(conn, buf, false); err != nil {
616                 state.Ctx.LogE("sp-startR-write", les, err, func(les LEs) string {
617                         return fmt.Sprintf(
618                                 "SP with %s (nice %s): writing",
619                                 node.Name, NicenessFmt(state.Nice),
620                         )
621                 })
622                 state.dirUnlock()
623                 return err
624         }
625         state.Ctx.LogD("sp-startR-workers", les, func(les LEs) string {
626                 return fmt.Sprintf(
627                         "SP with %s (nice %s): starting workers",
628                         node.Name, NicenessFmt(state.Nice),
629                 )
630         })
631         err = state.StartWorkers(conn, infosPayloads, payload)
632         if err != nil {
633                 state.dirUnlock()
634         }
635         return err
636 }
637
638 func (state *SPState) closeFd(pth string) {
639         state.fdsLock.Lock()
640         if s, exists := state.fds[pth]; exists {
641                 delete(state.fds, pth)
642                 s.fd.Close()
643         }
644         state.fdsLock.Unlock()
645 }
646
647 func (state *SPState) StartWorkers(
648         conn ConnDeadlined,
649         infosPayloads [][]byte,
650         payload []byte,
651 ) error {
652         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
653         state.fds = make(map[string]FdAndFullSize)
654         state.fileHashers = make(map[string]*MTHAndOffset)
655         state.isDead = make(chan struct{})
656         if state.maxOnlineTime > 0 {
657                 state.mustFinishAt = state.started.Add(state.maxOnlineTime)
658         }
659         if !state.NoCK {
660                 spCheckerOnce.Do(func() { go SPChecker(state.Ctx) })
661                 go func() {
662                         for job := range state.Ctx.JobsNoCK(state.Node.Id) {
663                                 if job.PktEnc.Nice <= state.Nice {
664                                         spCheckerTasks <- SPCheckerTask{
665                                                 nodeId: state.Node.Id,
666                                                 hsh:    job.HshValue,
667                                                 done:   state.payloads,
668                                         }
669                                 }
670                         }
671                 }()
672         }
673
674         // Remaining handshake payload sending
675         if len(infosPayloads) > 1 {
676                 state.wg.Add(1)
677                 go func() {
678                         for _, payload := range infosPayloads[1:] {
679                                 state.Ctx.LogD(
680                                         "sp-queue-remaining",
681                                         append(les, LE{"Size", int64(len(payload))}),
682                                         func(les LEs) string {
683                                                 return fmt.Sprintf(
684                                                         "SP with %s (nice %s): queuing remaining payload (%s)",
685                                                         state.Node.Name, NicenessFmt(state.Nice),
686                                                         humanize.IBytes(uint64(len(payload))),
687                                                 )
688                                         },
689                                 )
690                                 state.payloads <- payload
691                         }
692                         state.wg.Done()
693                 }()
694         }
695
696         // Processing of first payload and queueing its responses
697         logMsg := func(les LEs) string {
698                 return fmt.Sprintf(
699                         "SP with %s (nice %s): processing first payload (%s)",
700                         state.Node.Name, NicenessFmt(state.Nice),
701                         humanize.IBytes(uint64(len(payload))),
702                 )
703         }
704         state.Ctx.LogD("sp-process", append(les, LE{"Size", int64(len(payload))}), logMsg)
705         replies, err := state.ProcessSP(payload)
706         if err != nil {
707                 state.Ctx.LogE("sp-process", les, err, logMsg)
708                 return err
709         }
710         state.wg.Add(1)
711         go func() {
712                 for _, reply := range replies {
713                         state.Ctx.LogD(
714                                 "sp-queue-reply",
715                                 append(les, LE{"Size", int64(len(reply))}),
716                                 func(les LEs) string {
717                                         return fmt.Sprintf(
718                                                 "SP with %s (nice %s): queuing reply (%s)",
719                                                 state.Node.Name, NicenessFmt(state.Nice),
720                                                 humanize.IBytes(uint64(len(payload))),
721                                         )
722                                 },
723                         )
724                         state.payloads <- reply
725                 }
726                 state.wg.Done()
727         }()
728
729         // Periodic jobs
730         state.wg.Add(1)
731         go func() {
732                 deadlineTicker := time.NewTicker(time.Second)
733                 pingTicker := time.NewTicker(PingTimeout)
734                 for {
735                         select {
736                         case <-state.isDead:
737                                 state.wg.Done()
738                                 deadlineTicker.Stop()
739                                 pingTicker.Stop()
740                                 return
741                         case now := <-deadlineTicker.C:
742                                 if now.Sub(state.RxLastNonPing) >= state.onlineDeadline &&
743                                         now.Sub(state.TxLastNonPing) >= state.onlineDeadline {
744                                         goto Deadlined
745                                 }
746                                 if state.maxOnlineTime > 0 && state.mustFinishAt.Before(now) {
747                                         goto Deadlined
748                                 }
749                                 if now.Sub(state.RxLastSeen) >= 2*PingTimeout {
750                                         goto Deadlined
751                                 }
752                                 break
753                         Deadlined:
754                                 state.SetDead()
755                                 conn.Close() // #nosec G104
756                         case now := <-pingTicker.C:
757                                 if now.After(state.TxLastSeen.Add(PingTimeout)) {
758                                         state.wg.Add(1)
759                                         go func() {
760                                                 state.pings <- struct{}{}
761                                                 state.wg.Done()
762                                         }()
763                                 }
764                         }
765                 }
766         }()
767
768         // Spool checker and INFOs sender of appearing files
769         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
770                 state.wg.Add(1)
771                 go func() {
772                         ticker := time.NewTicker(time.Second)
773                         for {
774                                 select {
775                                 case <-state.isDead:
776                                         state.wg.Done()
777                                         ticker.Stop()
778                                         return
779                                 case <-ticker.C:
780                                         for _, payload := range state.Ctx.infosOur(
781                                                 state.Node.Id,
782                                                 state.Nice,
783                                                 &state.infosOurSeen,
784                                         ) {
785                                                 state.Ctx.LogD(
786                                                         "sp-queue-info",
787                                                         append(les, LE{"Size", int64(len(payload))}),
788                                                         func(les LEs) string {
789                                                                 return fmt.Sprintf(
790                                                                         "SP with %s (nice %s): queuing new info (%s)",
791                                                                         state.Node.Name, NicenessFmt(state.Nice),
792                                                                         humanize.IBytes(uint64(len(payload))),
793                                                                 )
794                                                         },
795                                                 )
796                                                 state.payloads <- payload
797                                         }
798                                 }
799                         }
800                 }()
801         }
802
803         // Sender
804         state.wg.Add(1)
805         go func() {
806                 defer conn.Close()
807                 defer state.SetDead()
808                 defer state.wg.Done()
809                 for {
810                         if state.NotAlive() {
811                                 return
812                         }
813                         var payload []byte
814                         var ping bool
815                         select {
816                         case <-state.pings:
817                                 state.Ctx.LogD("sp-got-ping", les, func(les LEs) string {
818                                         return fmt.Sprintf(
819                                                 "SP with %s (nice %s): got ping",
820                                                 state.Node.Name, NicenessFmt(state.Nice),
821                                         )
822                                 })
823                                 payload = SPPingMarshalized
824                                 ping = true
825                         case payload = <-state.payloads:
826                                 state.Ctx.LogD(
827                                         "sp-got-payload",
828                                         append(les, LE{"Size", int64(len(payload))}),
829                                         func(les LEs) string {
830                                                 return fmt.Sprintf(
831                                                         "SP with %s (nice %s): got payload (%s)",
832                                                         state.Node.Name, NicenessFmt(state.Nice),
833                                                         humanize.IBytes(uint64(len(payload))),
834                                                 )
835                                         },
836                                 )
837                         default:
838                                 state.RLock()
839                                 if len(state.queueTheir) == 0 {
840                                         state.RUnlock()
841                                         time.Sleep(100 * time.Millisecond)
842                                         continue
843                                 }
844                                 freq := state.queueTheir[0].freq
845                                 state.RUnlock()
846                                 if state.txRate > 0 {
847                                         time.Sleep(time.Second / time.Duration(state.txRate))
848                                 }
849                                 pktName := Base32Codec.EncodeToString(freq.Hash[:])
850                                 lesp := append(
851                                         les,
852                                         LE{"XX", string(TTx)},
853                                         LE{"Pkt", pktName},
854                                         LE{"Size", int64(freq.Offset)},
855                                 )
856                                 logMsg := func(les LEs) string {
857                                         return fmt.Sprintf(
858                                                 "SP with %s (nice %s): tx/%s (%s)",
859                                                 state.Node.Name, NicenessFmt(state.Nice),
860                                                 pktName,
861                                                 humanize.IBytes(freq.Offset),
862                                         )
863                                 }
864                                 state.Ctx.LogD("sp-queue", lesp, func(les LEs) string {
865                                         return logMsg(les) + ": queueing"
866                                 })
867                                 pth := filepath.Join(
868                                         state.Ctx.Spool,
869                                         state.Node.Id.String(),
870                                         string(TTx),
871                                         Base32Codec.EncodeToString(freq.Hash[:]),
872                                 )
873                                 state.fdsLock.RLock()
874                                 fdAndFullSize, exists := state.fds[pth]
875                                 state.fdsLock.RUnlock()
876                                 if !exists {
877                                         fd, err := os.Open(pth)
878                                         if err != nil {
879                                                 state.Ctx.LogE("sp-queue-open", lesp, err, func(les LEs) string {
880                                                         return logMsg(les) + ": opening"
881                                                 })
882                                                 return
883                                         }
884                                         fi, err := fd.Stat()
885                                         if err != nil {
886                                                 state.Ctx.LogE("sp-queue-stat", lesp, err, func(les LEs) string {
887                                                         return logMsg(les) + ": stating"
888                                                 })
889                                                 return
890                                         }
891                                         fdAndFullSize = FdAndFullSize{fd: fd, fullSize: fi.Size()}
892                                         state.fdsLock.Lock()
893                                         state.fds[pth] = fdAndFullSize
894                                         state.fdsLock.Unlock()
895                                 }
896                                 fd := fdAndFullSize.fd
897                                 fullSize := fdAndFullSize.fullSize
898                                 var buf []byte
899                                 if freq.Offset < uint64(fullSize) {
900                                         state.Ctx.LogD("sp-file-seek", lesp, func(les LEs) string {
901                                                 return logMsg(les) + ": seeking"
902                                         })
903                                         if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil {
904                                                 state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
905                                                         return logMsg(les) + ": seeking"
906                                                 })
907                                                 return
908                                         }
909                                         buf = make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
910                                         n, err := fd.Read(buf)
911                                         if err != nil {
912                                                 state.Ctx.LogE("sp-file-read", lesp, err, func(les LEs) string {
913                                                         return logMsg(les) + ": reading"
914                                                 })
915                                                 return
916                                         }
917                                         buf = buf[:n]
918                                         lesp = append(
919                                                 les,
920                                                 LE{"XX", string(TTx)},
921                                                 LE{"Pkt", pktName},
922                                                 LE{"Size", int64(n)},
923                                         )
924                                         state.Ctx.LogD("sp-file-read", lesp, func(les LEs) string {
925                                                 return fmt.Sprintf(
926                                                         "%s: read %s",
927                                                         logMsg(les), humanize.IBytes(uint64(n)),
928                                                 )
929                                         })
930                                 }
931                                 state.closeFd(pth)
932                                 payload = MarshalSP(SPTypeFile, SPFile{
933                                         Hash:    freq.Hash,
934                                         Offset:  freq.Offset,
935                                         Payload: buf,
936                                 })
937                                 ourSize := freq.Offset + uint64(len(buf))
938                                 lesp = append(
939                                         les,
940                                         LE{"XX", string(TTx)},
941                                         LE{"Pkt", pktName},
942                                         LE{"Size", int64(ourSize)},
943                                         LE{"FullSize", fullSize},
944                                 )
945                                 if state.Ctx.ShowPrgrs {
946                                         state.progressBars[pktName] = struct{}{}
947                                         Progress("Tx", lesp)
948                                 }
949                                 state.Lock()
950                                 if len(state.queueTheir) > 0 && *state.queueTheir[0].freq.Hash == *freq.Hash {
951                                         if ourSize == uint64(fullSize) {
952                                                 state.Ctx.LogD("sp-file-finished", lesp, func(les LEs) string {
953                                                         return logMsg(les) + ": finished"
954                                                 })
955                                                 if len(state.queueTheir) > 1 {
956                                                         state.queueTheir = state.queueTheir[1:]
957                                                 } else {
958                                                         state.queueTheir = state.queueTheir[:0]
959                                                 }
960                                                 if state.Ctx.ShowPrgrs {
961                                                         delete(state.progressBars, pktName)
962                                                 }
963                                         } else {
964                                                 state.queueTheir[0].freq.Offset += uint64(len(buf))
965                                         }
966                                 } else {
967                                         state.Ctx.LogD("sp-file-disappeared", lesp, func(les LEs) string {
968                                                 return logMsg(les) + ": queue disappeared"
969                                         })
970                                 }
971                                 state.Unlock()
972                         }
973                         logMsg := func(les LEs) string {
974                                 return fmt.Sprintf(
975                                         "SP with %s (nice %s): sending %s",
976                                         state.Node.Name, NicenessFmt(state.Nice),
977                                         humanize.IBytes(uint64(len(payload))),
978                                 )
979                         }
980                         state.Ctx.LogD("sp-sending", append(les, LE{"Size", int64(len(payload))}), logMsg)
981                         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
982                         ct, err := state.csOur.Encrypt(nil, nil, payload)
983                         if err != nil {
984                                 state.Ctx.LogE("sp-encrypting", les, err, logMsg)
985                                 return
986                         }
987                         if err := state.WriteSP(conn, ct, ping); err != nil {
988                                 state.Ctx.LogE("sp-sending", les, err, logMsg)
989                                 return
990                         }
991                 }
992         }()
993
994         // Receiver
995         state.wg.Add(1)
996         go func() {
997                 for {
998                         if state.NotAlive() {
999                                 break
1000                         }
1001                         logMsg := func(les LEs) string {
1002                                 return fmt.Sprintf(
1003                                         "SP with %s (nice %s): waiting for payload",
1004                                         state.Node.Name, NicenessFmt(state.Nice),
1005                                 )
1006                         }
1007                         state.Ctx.LogD("sp-recv-wait", les, logMsg)
1008                         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
1009                         payload, err := state.ReadSP(conn)
1010                         if err != nil {
1011                                 if err == io.EOF {
1012                                         break
1013                                 }
1014                                 unmarshalErr := err.(*xdr.UnmarshalError)
1015                                 if os.IsTimeout(unmarshalErr.Err) {
1016                                         continue
1017                                 }
1018                                 if unmarshalErr.ErrorCode == xdr.ErrIO {
1019                                         break
1020                                 }
1021                                 state.Ctx.LogE("sp-recv-wait", les, err, logMsg)
1022                                 break
1023                         }
1024                         logMsg = func(les LEs) string {
1025                                 return fmt.Sprintf(
1026                                         "SP with %s (nice %s): payload (%s)",
1027                                         state.Node.Name, NicenessFmt(state.Nice),
1028                                         humanize.IBytes(uint64(len(payload))),
1029                                 )
1030                         }
1031                         state.Ctx.LogD(
1032                                 "sp-recv-got",
1033                                 append(les, LE{"Size", int64(len(payload))}),
1034                                 func(les LEs) string { return logMsg(les) + ": got" },
1035                         )
1036                         payload, err = state.csTheir.Decrypt(nil, nil, payload)
1037                         if err != nil {
1038                                 state.Ctx.LogE("sp-recv-got", les, err, func(les LEs) string {
1039                                         return logMsg(les) + ": got"
1040                                 })
1041                                 break
1042                         }
1043                         state.Ctx.LogD(
1044                                 "sp-recv-process",
1045                                 append(les, LE{"Size", int64(len(payload))}),
1046                                 func(les LEs) string {
1047                                         return logMsg(les) + ": processing"
1048                                 },
1049                         )
1050                         replies, err := state.ProcessSP(payload)
1051                         if err != nil {
1052                                 state.Ctx.LogE("sp-recv-process", les, err, func(les LEs) string {
1053                                         return logMsg(les) + ": processing"
1054                                 })
1055                                 break
1056                         }
1057                         state.wg.Add(1)
1058                         go func() {
1059                                 for _, reply := range replies {
1060                                         state.Ctx.LogD(
1061                                                 "sp-recv-reply",
1062                                                 append(les[:len(les)-1], LE{"Size", int64(len(reply))}),
1063                                                 func(les LEs) string {
1064                                                         return fmt.Sprintf(
1065                                                                 "SP with %s (nice %s): queuing reply (%s)",
1066                                                                 state.Node.Name, NicenessFmt(state.Nice),
1067                                                                 humanize.IBytes(uint64(len(reply))),
1068                                                         )
1069                                                 },
1070                                         )
1071                                         state.payloads <- reply
1072                                 }
1073                                 state.wg.Done()
1074                         }()
1075                         if state.rxRate > 0 {
1076                                 time.Sleep(time.Second / time.Duration(state.rxRate))
1077                         }
1078                 }
1079                 state.SetDead()
1080                 state.wg.Done()
1081                 state.SetDead()
1082                 conn.Close() // #nosec G104
1083         }()
1084
1085         return nil
1086 }
1087
1088 func (state *SPState) Wait() {
1089         state.wg.Wait()
1090         close(state.payloads)
1091         close(state.pings)
1092         state.Duration = time.Now().Sub(state.started)
1093         state.dirUnlock()
1094         state.RxSpeed = state.RxBytes
1095         state.TxSpeed = state.TxBytes
1096         rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
1097         txDuration := int64(state.TxLastSeen.Sub(state.started).Seconds())
1098         if rxDuration > 0 {
1099                 state.RxSpeed = state.RxBytes / rxDuration
1100         }
1101         if txDuration > 0 {
1102                 state.TxSpeed = state.TxBytes / txDuration
1103         }
1104         for _, s := range state.fds {
1105                 s.fd.Close()
1106         }
1107         for pktName := range state.progressBars {
1108                 ProgressKill(pktName)
1109         }
1110 }
1111
1112 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
1113         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
1114         r := bytes.NewReader(payload)
1115         var err error
1116         var replies [][]byte
1117         var infosGot bool
1118         for r.Len() > 0 {
1119                 state.Ctx.LogD("sp-process-unmarshal", les, func(les LEs) string {
1120                         return fmt.Sprintf(
1121                                 "SP with %s (nice %s): unmarshaling header",
1122                                 state.Node.Name, NicenessFmt(state.Nice),
1123                         )
1124                 })
1125                 var head SPHead
1126                 if _, err = xdr.Unmarshal(r, &head); err != nil {
1127                         state.Ctx.LogE("sp-process-unmarshal", les, err, func(les LEs) string {
1128                                 return fmt.Sprintf(
1129                                         "SP with %s (nice %s): unmarshaling header",
1130                                         state.Node.Name, NicenessFmt(state.Nice),
1131                                 )
1132                         })
1133                         return nil, err
1134                 }
1135                 if head.Type != SPTypePing {
1136                         state.RxLastNonPing = state.RxLastSeen
1137                 }
1138                 switch head.Type {
1139                 case SPTypeHalt:
1140                         state.Ctx.LogD(
1141                                 "sp-process-halt",
1142                                 append(les, LE{"Type", "halt"}), func(les LEs) string {
1143                                         return fmt.Sprintf(
1144                                                 "SP with %s (nice %s): got HALT",
1145                                                 state.Node.Name, NicenessFmt(state.Nice),
1146                                         )
1147                                 },
1148                         )
1149                         state.Lock()
1150                         state.queueTheir = nil
1151                         state.Unlock()
1152
1153                 case SPTypePing:
1154                         state.Ctx.LogD(
1155                                 "sp-process-ping",
1156                                 append(les, LE{"Type", "ping"}),
1157                                 func(les LEs) string {
1158                                         return fmt.Sprintf(
1159                                                 "SP with %s (nice %s): got PING",
1160                                                 state.Node.Name, NicenessFmt(state.Nice),
1161                                         )
1162                                 },
1163                         )
1164
1165                 case SPTypeInfo:
1166                         infosGot = true
1167                         lesp := append(les, LE{"Type", "info"})
1168                         state.Ctx.LogD(
1169                                 "sp-process-info-unmarshal", lesp,
1170                                 func(les LEs) string {
1171                                         return fmt.Sprintf(
1172                                                 "SP with %s (nice %s): unmarshaling INFO",
1173                                                 state.Node.Name, NicenessFmt(state.Nice),
1174                                         )
1175                                 },
1176                         )
1177                         var info SPInfo
1178                         if _, err = xdr.Unmarshal(r, &info); err != nil {
1179                                 state.Ctx.LogE(
1180                                         "sp-process-info-unmarshal", lesp, err,
1181                                         func(les LEs) string {
1182                                                 return fmt.Sprintf(
1183                                                         "SP with %s (nice %s): unmarshaling INFO",
1184                                                         state.Node.Name, NicenessFmt(state.Nice),
1185                                                 )
1186                                         },
1187                                 )
1188                                 return nil, err
1189                         }
1190                         pktName := Base32Codec.EncodeToString(info.Hash[:])
1191                         lesp = append(
1192                                 lesp,
1193                                 LE{"Pkt", pktName},
1194                                 LE{"Size", int64(info.Size)},
1195                                 LE{"PktNice", int(info.Nice)},
1196                         )
1197                         logMsg := func(les LEs) string {
1198                                 return fmt.Sprintf(
1199                                         "SP with %s (nice %s): INFO %s (%s) nice %s",
1200                                         state.Node.Name, NicenessFmt(state.Nice),
1201                                         pktName,
1202                                         humanize.IBytes(info.Size),
1203                                         NicenessFmt(info.Nice),
1204                                 )
1205                         }
1206                         if !state.listOnly && info.Nice > state.Nice {
1207                                 state.Ctx.LogD("sp-process-info-too-nice", lesp, func(les LEs) string {
1208                                         return logMsg(les) + ": too nice"
1209                                 })
1210                                 continue
1211                         }
1212                         state.Ctx.LogD("sp-process-info-got", lesp, func(les LEs) string {
1213                                 return logMsg(les) + ": received"
1214                         })
1215                         if !state.listOnly && state.xxOnly == TTx {
1216                                 continue
1217                         }
1218                         state.Lock()
1219                         state.infosTheir[*info.Hash] = &info
1220                         state.Unlock()
1221                         state.Ctx.LogD("sp-process-info-stat", lesp, func(les LEs) string {
1222                                 return logMsg(les) + ": stating part"
1223                         })
1224                         pktPath := filepath.Join(
1225                                 state.Ctx.Spool,
1226                                 state.Node.Id.String(),
1227                                 string(TRx),
1228                                 Base32Codec.EncodeToString(info.Hash[:]),
1229                         )
1230                         logMsg = func(les LEs) string {
1231                                 return fmt.Sprintf(
1232                                         "Packet %s (%s) (nice %s)",
1233                                         pktName,
1234                                         humanize.IBytes(info.Size),
1235                                         NicenessFmt(info.Nice),
1236                                 )
1237                         }
1238                         if _, err = os.Stat(pktPath); err == nil {
1239                                 state.Ctx.LogI("sp-info-done", lesp, func(les LEs) string {
1240                                         return logMsg(les) + ": already done"
1241                                 })
1242                                 if !state.listOnly {
1243                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
1244                                 }
1245                                 continue
1246                         }
1247                         if _, err = os.Stat(pktPath + SeenSuffix); err == nil {
1248                                 state.Ctx.LogI("sp-info-seen", lesp, func(les LEs) string {
1249                                         return logMsg(les) + ": already seen"
1250                                 })
1251                                 if !state.listOnly {
1252                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
1253                                 }
1254                                 continue
1255                         }
1256                         if _, err = os.Stat(pktPath + NoCKSuffix); err == nil {
1257                                 state.Ctx.LogI("sp-info-nock", lesp, func(les LEs) string {
1258                                         return logMsg(les) + ": still not checksummed"
1259                                 })
1260                                 continue
1261                         }
1262                         fi, err := os.Stat(pktPath + PartSuffix)
1263                         var offset int64
1264                         if err == nil {
1265                                 offset = fi.Size()
1266                         }
1267                         if !state.Ctx.IsEnoughSpace(int64(info.Size) - offset) {
1268                                 state.Ctx.LogI("sp-info-no-space", lesp, func(les LEs) string {
1269                                         return logMsg(les) + ": not enough space"
1270                                 })
1271                                 continue
1272                         }
1273                         state.Ctx.LogI(
1274                                 "sp-info",
1275                                 append(lesp, LE{"Offset", offset}),
1276                                 func(les LEs) string {
1277                                         return fmt.Sprintf(
1278                                                 "%s: %d%%", logMsg(les), 100*uint64(offset)/info.Size,
1279                                         )
1280                                 },
1281                         )
1282                         if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) {
1283                                 replies = append(replies, MarshalSP(
1284                                         SPTypeFreq,
1285                                         SPFreq{info.Hash, uint64(offset)},
1286                                 ))
1287                         }
1288
1289                 case SPTypeFile:
1290                         lesp := append(les, LE{"Type", "file"})
1291                         state.Ctx.LogD("sp-process-file", lesp, func(les LEs) string {
1292                                 return fmt.Sprintf(
1293                                         "SP with %s (nice %s): unmarshaling FILE",
1294                                         state.Node.Name, NicenessFmt(state.Nice),
1295                                 )
1296                         })
1297                         var file SPFile
1298                         if _, err = xdr.Unmarshal(r, &file); err != nil {
1299                                 state.Ctx.LogE("sp-process-file", lesp, err, func(les LEs) string {
1300                                         return fmt.Sprintf(
1301                                                 "SP with %s (nice %s): unmarshaling FILE",
1302                                                 state.Node.Name, NicenessFmt(state.Nice),
1303                                         )
1304                                 })
1305                                 return nil, err
1306                         }
1307                         pktName := Base32Codec.EncodeToString(file.Hash[:])
1308                         lesp = append(
1309                                 lesp,
1310                                 LE{"XX", string(TRx)},
1311                                 LE{"Pkt", pktName},
1312                                 LE{"Size", int64(len(file.Payload))},
1313                         )
1314                         logMsg := func(les LEs) string {
1315                                 return fmt.Sprintf(
1316                                         "Got packet %s (%s)",
1317                                         pktName, humanize.IBytes(uint64(len(file.Payload))),
1318                                 )
1319                         }
1320                         fullsize := int64(0)
1321                         state.RLock()
1322                         infoTheir, ok := state.infosTheir[*file.Hash]
1323                         state.RUnlock()
1324                         if !ok {
1325                                 state.Ctx.LogE("sp-file-open", lesp, err, func(les LEs) string {
1326                                         return logMsg(les) + ": unknown file"
1327                                 })
1328                                 continue
1329                         }
1330                         fullsize = int64(infoTheir.Size)
1331                         lesp = append(lesp, LE{"FullSize", fullsize})
1332                         dirToSync := filepath.Join(
1333                                 state.Ctx.Spool,
1334                                 state.Node.Id.String(),
1335                                 string(TRx),
1336                         )
1337                         filePath := filepath.Join(dirToSync, pktName)
1338                         filePathPart := filePath + PartSuffix
1339                         state.Ctx.LogD("sp-file-open", lesp, func(les LEs) string {
1340                                 return logMsg(les) + ": opening part"
1341                         })
1342                         state.fdsLock.RLock()
1343                         fdAndFullSize, exists := state.fds[filePathPart]
1344                         state.fdsLock.RUnlock()
1345                         hasherAndOffset := state.fileHashers[filePath]
1346                         var fd *os.File
1347                         if exists {
1348                                 fd = fdAndFullSize.fd
1349                         } else {
1350                                 fd, err = os.OpenFile(
1351                                         filePathPart,
1352                                         os.O_RDWR|os.O_CREATE,
1353                                         os.FileMode(0666),
1354                                 )
1355                                 if err != nil {
1356                                         state.Ctx.LogE("sp-file-open", lesp, err, func(les LEs) string {
1357                                                 return logMsg(les) + ": opening part"
1358                                         })
1359                                         return nil, err
1360                                 }
1361                                 state.fdsLock.Lock()
1362                                 state.fds[filePathPart] = FdAndFullSize{fd: fd}
1363                                 state.fdsLock.Unlock()
1364                                 if !state.NoCK {
1365                                         hasherAndOffset = &MTHAndOffset{
1366                                                 mth:    MTHNew(fullsize, int64(file.Offset)),
1367                                                 offset: file.Offset,
1368                                         }
1369                                         state.fileHashers[filePath] = hasherAndOffset
1370                                 }
1371                         }
1372                         state.Ctx.LogD(
1373                                 "sp-file-seek",
1374                                 append(lesp, LE{"Offset", file.Offset}),
1375                                 func(les LEs) string {
1376                                         return fmt.Sprintf("%s: seeking %d", logMsg(les), file.Offset)
1377                                 })
1378                         if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
1379                                 state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
1380                                         return logMsg(les) + ": seeking"
1381                                 })
1382                                 state.closeFd(filePathPart)
1383                                 return nil, err
1384                         }
1385                         state.Ctx.LogD("sp-file-write", lesp, func(les LEs) string {
1386                                 return logMsg(les) + ": writing"
1387                         })
1388                         if _, err = fd.Write(file.Payload); err != nil {
1389                                 state.Ctx.LogE("sp-file-write", lesp, err, func(les LEs) string {
1390                                         return logMsg(les) + ": writing"
1391                                 })
1392                                 state.closeFd(filePathPart)
1393                                 return nil, err
1394                         }
1395                         if hasherAndOffset != nil {
1396                                 if hasherAndOffset.offset == file.Offset {
1397                                         if _, err = hasherAndOffset.mth.Write(file.Payload); err != nil {
1398                                                 panic(err)
1399                                         }
1400                                         hasherAndOffset.offset += uint64(len(file.Payload))
1401                                 } else {
1402                                         state.Ctx.LogE(
1403                                                 "sp-file-offset-differs", lesp, errors.New("offset differs"),
1404                                                 func(les LEs) string {
1405                                                         return logMsg(les) + ": deleting hasher"
1406                                                 },
1407                                         )
1408                                         delete(state.fileHashers, filePath)
1409                                         hasherAndOffset = nil
1410                                 }
1411                         }
1412                         ourSize := int64(file.Offset + uint64(len(file.Payload)))
1413                         lesp[len(lesp)-2].V = ourSize
1414                         if state.Ctx.ShowPrgrs {
1415                                 state.progressBars[pktName] = struct{}{}
1416                                 Progress("Rx", lesp)
1417                         }
1418                         if fullsize != ourSize {
1419                                 continue
1420                         }
1421                         if state.Ctx.ShowPrgrs {
1422                                 delete(state.progressBars, pktName)
1423                         }
1424                         logMsg = func(les LEs) string {
1425                                 return fmt.Sprintf(
1426                                         "Got packet %s %d%% (%s / %s)",
1427                                         pktName, 100*ourSize/fullsize,
1428                                         humanize.IBytes(uint64(ourSize)),
1429                                         humanize.IBytes(uint64(fullsize)),
1430                                 )
1431                         }
1432                         err = fd.Sync()
1433                         if err != nil {
1434                                 state.Ctx.LogE("sp-file-sync", lesp, err, func(les LEs) string {
1435                                         return logMsg(les) + ": syncing"
1436                                 })
1437                                 state.closeFd(filePathPart)
1438                                 continue
1439                         }
1440                         if hasherAndOffset != nil {
1441                                 delete(state.fileHashers, filePath)
1442                                 if hasherAndOffset.mth.PrependSize == 0 {
1443                                         if bytes.Compare(hasherAndOffset.mth.Sum(nil), file.Hash[:]) != 0 {
1444                                                 state.Ctx.LogE(
1445                                                         "sp-file-bad-checksum", lesp,
1446                                                         errors.New("checksum mismatch"),
1447                                                         logMsg,
1448                                                 )
1449                                                 state.closeFd(filePathPart)
1450                                                 continue
1451                                         }
1452                                         if err = os.Rename(filePathPart, filePath); err != nil {
1453                                                 state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string {
1454                                                         return logMsg(les) + ": renaming"
1455                                                 })
1456                                                 state.closeFd(filePathPart)
1457                                                 continue
1458                                         }
1459                                         if err = DirSync(dirToSync); err != nil {
1460                                                 state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string {
1461                                                         return logMsg(les) + ": dirsyncing"
1462                                                 })
1463                                                 state.closeFd(filePathPart)
1464                                                 continue
1465                                         }
1466                                         state.Ctx.LogI("sp-file-done", lesp, func(les LEs) string {
1467                                                 return logMsg(les) + ": done"
1468                                         })
1469                                         state.wg.Add(1)
1470                                         go func() {
1471                                                 state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
1472                                                 state.wg.Done()
1473                                         }()
1474                                         state.Lock()
1475                                         delete(state.infosTheir, *file.Hash)
1476                                         state.Unlock()
1477                                         if !state.Ctx.HdrUsage {
1478                                                 continue
1479                                         }
1480                                         if _, err = fd.Seek(0, io.SeekStart); err != nil {
1481                                                 state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
1482                                                         return logMsg(les) + ": seeking"
1483                                                 })
1484                                                 state.closeFd(filePathPart)
1485                                                 continue
1486                                         }
1487                                         _, pktEncRaw, err := state.Ctx.HdrRead(fd)
1488                                         state.closeFd(filePathPart)
1489                                         if err != nil {
1490                                                 state.Ctx.LogE("sp-file-hdr-read", lesp, err, func(les LEs) string {
1491                                                         return logMsg(les) + ": HdrReading"
1492                                                 })
1493                                                 continue
1494                                         }
1495                                         state.Ctx.HdrWrite(pktEncRaw, filePath)
1496                                         continue
1497                                 }
1498                         }
1499                         state.closeFd(filePathPart)
1500                         if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil {
1501                                 state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string {
1502                                         return logMsg(les) + ": renaming"
1503                                 })
1504                                 continue
1505                         }
1506                         if err = DirSync(dirToSync); err != nil {
1507                                 state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string {
1508                                         return logMsg(les) + ": dirsyncing"
1509                                 })
1510                                 continue
1511                         }
1512                         state.Ctx.LogI("sp-file-downloaded", lesp, func(les LEs) string {
1513                                 return logMsg(les) + ": downloaded"
1514                         })
1515                         state.Lock()
1516                         delete(state.infosTheir, *file.Hash)
1517                         state.Unlock()
1518                         if hasherAndOffset != nil {
1519                                 go func() {
1520                                         spCheckerTasks <- SPCheckerTask{
1521                                                 nodeId: state.Node.Id,
1522                                                 hsh:    file.Hash,
1523                                                 mth:    hasherAndOffset.mth,
1524                                                 done:   state.payloads,
1525                                         }
1526                                 }()
1527                         }
1528
1529                 case SPTypeDone:
1530                         lesp := append(les, LE{"Type", "done"})
1531                         state.Ctx.LogD("sp-process-done-unmarshal", lesp, func(les LEs) string {
1532                                 return fmt.Sprintf(
1533                                         "SP with %s (nice %s): unmarshaling DONE",
1534                                         state.Node.Name, NicenessFmt(state.Nice),
1535                                 )
1536                         })
1537                         var done SPDone
1538                         if _, err = xdr.Unmarshal(r, &done); err != nil {
1539                                 state.Ctx.LogE("sp-process-done-unmarshal", lesp, err, func(les LEs) string {
1540                                         return fmt.Sprintf(
1541                                                 "SP with %s (nice %s): unmarshaling DONE",
1542                                                 state.Node.Name, NicenessFmt(state.Nice),
1543                                         )
1544                                 })
1545                                 return nil, err
1546                         }
1547                         pktName := Base32Codec.EncodeToString(done.Hash[:])
1548                         lesp = append(lesp, LE{"Pkt", pktName}, LE{"XX", string(TTx)})
1549                         logMsg := func(les LEs) string {
1550                                 return fmt.Sprintf(
1551                                         "SP with %s (nice %s): DONE: removing %s",
1552                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1553                                 )
1554                         }
1555                         state.Ctx.LogD("sp-done", lesp, logMsg)
1556                         pth := filepath.Join(
1557                                 state.Ctx.Spool,
1558                                 state.Node.Id.String(),
1559                                 string(TTx),
1560                                 pktName,
1561                         )
1562                         if err = os.Remove(pth); err == nil {
1563                                 state.Ctx.LogI("sp-done", lesp, func(les LEs) string {
1564                                         return fmt.Sprintf("Packet %s is sent", pktName)
1565                                 })
1566                                 if state.Ctx.HdrUsage {
1567                                         os.Remove(pth + HdrSuffix)
1568                                 }
1569                         } else {
1570                                 state.Ctx.LogE("sp-done", lesp, err, logMsg)
1571                         }
1572
1573                 case SPTypeFreq:
1574                         lesp := append(les, LE{"Type", "freq"})
1575                         state.Ctx.LogD("sp-process-freq", lesp, func(les LEs) string {
1576                                 return fmt.Sprintf(
1577                                         "SP with %s (nice %s): unmarshaling FREQ",
1578                                         state.Node.Name, NicenessFmt(state.Nice),
1579                                 )
1580                         })
1581                         var freq SPFreq
1582                         if _, err = xdr.Unmarshal(r, &freq); err != nil {
1583                                 state.Ctx.LogE("sp-process-freq", lesp, err, func(les LEs) string {
1584                                         return fmt.Sprintf(
1585                                                 "SP with %s (nice %s): unmarshaling FREQ",
1586                                                 state.Node.Name, NicenessFmt(state.Nice),
1587                                         )
1588                                 })
1589                                 return nil, err
1590                         }
1591                         pktName := Base32Codec.EncodeToString(freq.Hash[:])
1592                         lesp = append(lesp, LE{"Pkt", pktName}, LE{"Offset", freq.Offset})
1593                         state.Ctx.LogD("sp-process-freq-queueing", lesp, func(les LEs) string {
1594                                 return fmt.Sprintf(
1595                                         "SP with %s (nice %s): FREQ %s: queuing",
1596                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1597                                 )
1598                         })
1599                         nice, exists := state.infosOurSeen[*freq.Hash]
1600                         if exists {
1601                                 if state.onlyPkts == nil || !state.onlyPkts[*freq.Hash] {
1602                                         state.Lock()
1603                                         insertIdx := 0
1604                                         var freqWithNice *FreqWithNice
1605                                         for insertIdx, freqWithNice = range state.queueTheir {
1606                                                 if freqWithNice.nice > nice {
1607                                                         break
1608                                                 }
1609                                         }
1610                                         state.queueTheir = append(state.queueTheir, nil)
1611                                         copy(state.queueTheir[insertIdx+1:], state.queueTheir[insertIdx:])
1612                                         state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice}
1613                                         state.Unlock()
1614                                 } else {
1615                                         state.Ctx.LogD("sp-process-freq-skip", lesp, func(les LEs) string {
1616                                                 return fmt.Sprintf(
1617                                                         "SP with %s (nice %s): FREQ %s: skipping",
1618                                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1619                                                 )
1620                                         })
1621                                 }
1622                         } else {
1623                                 state.Ctx.LogD("sp-process-freq-unknown", lesp, func(les LEs) string {
1624                                         return fmt.Sprintf(
1625                                                 "SP with %s (nice %s): FREQ %s: unknown",
1626                                                 state.Node.Name, NicenessFmt(state.Nice), pktName,
1627                                         )
1628                                 })
1629                         }
1630
1631                 default:
1632                         state.Ctx.LogE(
1633                                 "sp-process-type-unknown",
1634                                 append(les, LE{"Type", head.Type}),
1635                                 errors.New("unknown type"),
1636                                 func(les LEs) string {
1637                                         return fmt.Sprintf(
1638                                                 "SP with %s (nice %s): %d",
1639                                                 state.Node.Name, NicenessFmt(state.Nice), head.Type,
1640                                         )
1641                                 },
1642                         )
1643                         return nil, BadPktType
1644                 }
1645         }
1646
1647         if infosGot {
1648                 var pkts int
1649                 var size uint64
1650                 state.RLock()
1651                 for _, info := range state.infosTheir {
1652                         pkts++
1653                         size += info.Size
1654                 }
1655                 state.RUnlock()
1656                 state.Ctx.LogI("sp-infos-rx", LEs{
1657                         {"XX", string(TRx)},
1658                         {"Node", state.Node.Id},
1659                         {"Pkts", pkts},
1660                         {"Size", int64(size)},
1661                 }, func(les LEs) string {
1662                         return fmt.Sprintf(
1663                                 "%s has got for us: %d packets, %s",
1664                                 state.Node.Name, pkts, humanize.IBytes(size),
1665                         )
1666                 })
1667         }
1668         return payloadsSplit(replies), nil
1669 }
1670
1671 func SPChecker(ctx *Ctx) {
1672         for t := range spCheckerTasks {
1673                 pktName := Base32Codec.EncodeToString(t.hsh[:])
1674                 les := LEs{
1675                         {"XX", string(TRx)},
1676                         {"Node", t.nodeId},
1677                         {"Pkt", pktName},
1678                 }
1679                 SPCheckerWg.Add(1)
1680                 ctx.LogD("sp-checker", les, func(les LEs) string {
1681                         return fmt.Sprintf("Checksumming %s/rx/%s", ctx.NodeName(t.nodeId), pktName)
1682                 })
1683                 size, err := ctx.CheckNoCK(t.nodeId, t.hsh, t.mth)
1684                 les = append(les, LE{"Size", size})
1685                 if err != nil {
1686                         ctx.LogE("sp-checker", les, err, func(les LEs) string {
1687                                 return fmt.Sprintf(
1688                                         "Checksumming %s/rx/%s (%s)", ctx.NodeName(t.nodeId), pktName,
1689                                         humanize.IBytes(uint64(size)),
1690                                 )
1691                         })
1692                         SPCheckerWg.Done()
1693                         continue
1694                 }
1695                 ctx.LogI("sp-checker-done", les, func(les LEs) string {
1696                         return fmt.Sprintf(
1697                                 "Packet %s is retreived (%s)",
1698                                 pktName, humanize.IBytes(uint64(size)),
1699                         )
1700                 })
1701                 SPCheckerWg.Done()
1702                 go func(t SPCheckerTask) {
1703                         defer func() { recover() }()
1704                         t.done <- MarshalSP(SPTypeDone, SPDone{t.hsh})
1705                 }(t)
1706         }
1707 }