]> Cypherpunks.ru repositories - nncp.git/blob - src/sp.go
Really close fd at upload completion
[nncp.git] / src / sp.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bytes"
22         "crypto/subtle"
23         "errors"
24         "fmt"
25         "io"
26         "os"
27         "path/filepath"
28         "sort"
29         "sync"
30         "time"
31
32         xdr "github.com/davecgh/go-xdr/xdr2"
33         "github.com/dustin/go-humanize"
34         "github.com/flynn/noise"
35 )
36
37 const (
38         MaxSPSize      = 1<<16 - 256
39         PartSuffix     = ".part"
40         SPHeadOverhead = 4
41 )
42
43 type MTHAndOffset struct {
44         mth    MTH
45         offset uint64
46 }
47
48 type SPCheckerTask struct {
49         nodeId *NodeId
50         hsh    *[MTHSize]byte
51         mth    MTH
52         done   chan []byte
53 }
54
55 var (
56         SPInfoOverhead    int
57         SPFreqOverhead    int
58         SPFileOverhead    int
59         SPHaltMarshalized []byte
60         SPPingMarshalized []byte
61
62         NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite(
63                 noise.DH25519,
64                 noise.CipherChaChaPoly,
65                 noise.HashBLAKE2b,
66         )
67
68         DefaultDeadline = 10 * time.Second
69         PingTimeout     = time.Minute
70
71         spCheckerTasks chan SPCheckerTask
72         SPCheckerWg    sync.WaitGroup
73         spCheckerOnce  sync.Once
74 )
75
76 type FdAndFullSize struct {
77         fd       *os.File
78         fullSize int64
79 }
80
81 type SPType uint8
82
83 const (
84         SPTypeInfo SPType = iota
85         SPTypeFreq SPType = iota
86         SPTypeFile SPType = iota
87         SPTypeDone SPType = iota
88         SPTypeHalt SPType = iota
89         SPTypePing SPType = iota
90 )
91
92 type SPHead struct {
93         Type SPType
94 }
95
96 type SPInfo struct {
97         Nice uint8
98         Size uint64
99         Hash *[MTHSize]byte
100 }
101
102 type SPFreq struct {
103         Hash   *[MTHSize]byte
104         Offset uint64
105 }
106
107 type SPFile struct {
108         Hash    *[MTHSize]byte
109         Offset  uint64
110         Payload []byte
111 }
112
113 type SPDone struct {
114         Hash *[MTHSize]byte
115 }
116
117 type SPRaw struct {
118         Magic   [8]byte
119         Payload []byte
120 }
121
122 type FreqWithNice struct {
123         freq *SPFreq
124         nice uint8
125 }
126
127 type ConnDeadlined interface {
128         io.ReadWriteCloser
129         SetReadDeadline(t time.Time) error
130         SetWriteDeadline(t time.Time) error
131 }
132
133 func init() {
134         var buf bytes.Buffer
135         spHead := SPHead{Type: SPTypeHalt}
136         if _, err := xdr.Marshal(&buf, spHead); err != nil {
137                 panic(err)
138         }
139         SPHaltMarshalized = make([]byte, SPHeadOverhead)
140         copy(SPHaltMarshalized, buf.Bytes())
141         buf.Reset()
142
143         spHead = SPHead{Type: SPTypePing}
144         if _, err := xdr.Marshal(&buf, spHead); err != nil {
145                 panic(err)
146         }
147         SPPingMarshalized = make([]byte, SPHeadOverhead)
148         copy(SPPingMarshalized, buf.Bytes())
149         buf.Reset()
150
151         spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([MTHSize]byte)}
152         if _, err := xdr.Marshal(&buf, spInfo); err != nil {
153                 panic(err)
154         }
155         SPInfoOverhead = buf.Len()
156         buf.Reset()
157
158         spFreq := SPFreq{Hash: new([MTHSize]byte), Offset: 123}
159         if _, err := xdr.Marshal(&buf, spFreq); err != nil {
160                 panic(err)
161         }
162         SPFreqOverhead = buf.Len()
163         buf.Reset()
164
165         spFile := SPFile{Hash: new([MTHSize]byte), Offset: 123}
166         if _, err := xdr.Marshal(&buf, spFile); err != nil {
167                 panic(err)
168         }
169         SPFileOverhead = buf.Len()
170         spCheckerTasks = make(chan SPCheckerTask)
171 }
172
173 func MarshalSP(typ SPType, sp interface{}) []byte {
174         var buf bytes.Buffer
175         if _, err := xdr.Marshal(&buf, SPHead{typ}); err != nil {
176                 panic(err)
177         }
178         if _, err := xdr.Marshal(&buf, sp); err != nil {
179                 panic(err)
180         }
181         return buf.Bytes()
182 }
183
184 func payloadsSplit(payloads [][]byte) [][]byte {
185         var outbounds [][]byte
186         outbound := make([]byte, 0, MaxSPSize)
187         for i, payload := range payloads {
188                 outbound = append(outbound, payload...)
189                 if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxSPSize {
190                         outbounds = append(outbounds, outbound)
191                         outbound = make([]byte, 0, MaxSPSize)
192                 }
193         }
194         if len(outbound) > 0 {
195                 outbounds = append(outbounds, outbound)
196         }
197         return outbounds
198 }
199
200 type SPState struct {
201         Ctx            *Ctx
202         Node           *Node
203         Nice           uint8
204         NoCK           bool
205         onlineDeadline time.Duration
206         maxOnlineTime  time.Duration
207         hs             *noise.HandshakeState
208         csOur          *noise.CipherState
209         csTheir        *noise.CipherState
210         payloads       chan []byte
211         pings          chan struct{}
212         infosTheir     map[[MTHSize]byte]*SPInfo
213         infosOurSeen   map[[MTHSize]byte]uint8
214         queueTheir     []*FreqWithNice
215         wg             sync.WaitGroup
216         RxBytes        int64
217         RxLastSeen     time.Time
218         RxLastNonPing  time.Time
219         TxBytes        int64
220         TxLastSeen     time.Time
221         TxLastNonPing  time.Time
222         started        time.Time
223         mustFinishAt   time.Time
224         Duration       time.Duration
225         RxSpeed        int64
226         TxSpeed        int64
227         rxLock         *os.File
228         txLock         *os.File
229         xxOnly         TRxTx
230         rxRate         int
231         txRate         int
232         isDead         chan struct{}
233         listOnly       bool
234         onlyPkts       map[[MTHSize]byte]bool
235         writeSPBuf     bytes.Buffer
236         fds            map[string]FdAndFullSize
237         fdsLock        sync.RWMutex
238         fileHashers    map[string]*MTHAndOffset
239         progressBars   map[string]struct{}
240         sync.RWMutex
241 }
242
243 func (state *SPState) SetDead() {
244         state.Lock()
245         defer state.Unlock()
246         select {
247         case <-state.isDead:
248                 // Already closed channel, dead
249                 return
250         default:
251         }
252         close(state.isDead)
253         go func() {
254                 for range state.payloads {
255                 }
256         }()
257         go func() {
258                 for range state.pings {
259                 }
260         }()
261 }
262
263 func (state *SPState) NotAlive() bool {
264         select {
265         case <-state.isDead:
266                 return true
267         default:
268         }
269         return false
270 }
271
272 func (state *SPState) dirUnlock() {
273         state.Ctx.UnlockDir(state.rxLock)
274         state.Ctx.UnlockDir(state.txLock)
275 }
276
277 func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error {
278         state.writeSPBuf.Reset()
279         n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{
280                 Magic:   MagicNNCPSv1.B,
281                 Payload: payload,
282         })
283         if err != nil {
284                 return err
285         }
286         if n, err = dst.Write(state.writeSPBuf.Bytes()); err == nil {
287                 state.TxLastSeen = time.Now()
288                 state.TxBytes += int64(n)
289                 if !ping {
290                         state.TxLastNonPing = state.TxLastSeen
291                 }
292         }
293         return err
294 }
295
296 func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
297         var sp SPRaw
298         n, err := xdr.UnmarshalLimited(src, &sp, 1<<17)
299         if err != nil {
300                 ue := err.(*xdr.UnmarshalError)
301                 if ue.Err == io.EOF {
302                         return nil, ue.Err
303                 }
304                 return nil, err
305         }
306         state.RxLastSeen = time.Now()
307         state.RxBytes += int64(n)
308         if sp.Magic != MagicNNCPSv1.B {
309                 return nil, BadMagic
310         }
311         return sp.Payload, nil
312 }
313
314 func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[MTHSize]byte]uint8) [][]byte {
315         var infos []*SPInfo
316         var totalSize int64
317         for job := range ctx.Jobs(nodeId, TTx) {
318                 if job.PktEnc.Nice > nice {
319                         continue
320                 }
321                 if _, known := (*seen)[*job.HshValue]; known {
322                         continue
323                 }
324                 totalSize += job.Size
325                 infos = append(infos, &SPInfo{
326                         Nice: job.PktEnc.Nice,
327                         Size: uint64(job.Size),
328                         Hash: job.HshValue,
329                 })
330                 (*seen)[*job.HshValue] = job.PktEnc.Nice
331         }
332         sort.Sort(ByNice(infos))
333         var payloads [][]byte
334         for _, info := range infos {
335                 payloads = append(payloads, MarshalSP(SPTypeInfo, info))
336                 pktName := Base32Codec.EncodeToString(info.Hash[:])
337                 ctx.LogD("sp-info-our", LEs{
338                         {"Node", nodeId},
339                         {"Name", pktName},
340                         {"Size", info.Size},
341                 }, func(les LEs) string {
342                         return fmt.Sprintf(
343                                 "Our info: %s/tx/%s (%s)",
344                                 ctx.NodeName(nodeId),
345                                 pktName,
346                                 humanize.IBytes(info.Size),
347                         )
348                 })
349         }
350         if totalSize > 0 {
351                 ctx.LogI("sp-infos-tx", LEs{
352                         {"XX", string(TTx)},
353                         {"Node", nodeId},
354                         {"Pkts", len(payloads)},
355                         {"Size", totalSize},
356                 }, func(les LEs) string {
357                         return fmt.Sprintf(
358                                 "We have got for %s: %d packets, %s",
359                                 ctx.NodeName(nodeId),
360                                 len(payloads),
361                                 humanize.IBytes(uint64(totalSize)),
362                         )
363                 })
364         }
365         return payloadsSplit(payloads)
366 }
367
368 func (state *SPState) StartI(conn ConnDeadlined) error {
369         nodeId := state.Node.Id
370         err := state.Ctx.ensureRxDir(nodeId)
371         if err != nil {
372                 return err
373         }
374         var rxLock *os.File
375         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TRx) {
376                 rxLock, err = state.Ctx.LockDir(nodeId, string(TRx))
377                 if err != nil {
378                         return err
379                 }
380         }
381         var txLock *os.File
382         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
383                 txLock, err = state.Ctx.LockDir(nodeId, string(TTx))
384                 if err != nil {
385                         return err
386                 }
387         }
388         started := time.Now()
389         conf := noise.Config{
390                 CipherSuite: NoiseCipherSuite,
391                 Pattern:     noise.HandshakeIK,
392                 Initiator:   true,
393                 StaticKeypair: noise.DHKey{
394                         Private: state.Ctx.Self.NoisePrv[:],
395                         Public:  state.Ctx.Self.NoisePub[:],
396                 },
397                 PeerStatic: state.Node.NoisePub[:],
398         }
399         hs, err := noise.NewHandshakeState(conf)
400         if err != nil {
401                 return err
402         }
403         state.hs = hs
404         state.payloads = make(chan []byte)
405         state.pings = make(chan struct{})
406         state.infosTheir = make(map[[MTHSize]byte]*SPInfo)
407         state.infosOurSeen = make(map[[MTHSize]byte]uint8)
408         state.progressBars = make(map[string]struct{})
409         state.started = started
410         state.rxLock = rxLock
411         state.txLock = txLock
412
413         var infosPayloads [][]byte
414         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
415                 infosPayloads = state.Ctx.infosOur(nodeId, state.Nice, &state.infosOurSeen)
416         }
417         var firstPayload []byte
418         if len(infosPayloads) > 0 {
419                 firstPayload = infosPayloads[0]
420         }
421         // Pad first payload, to hide actual number of existing files
422         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
423                 firstPayload = append(firstPayload, SPHaltMarshalized...)
424         }
425
426         var buf []byte
427         var payload []byte
428         buf, _, _, err = state.hs.WriteMessage(nil, firstPayload)
429         if err != nil {
430                 state.dirUnlock()
431                 return err
432         }
433         les := LEs{{"Node", nodeId}, {"Nice", int(state.Nice)}}
434         state.Ctx.LogD("sp-startI", les, func(les LEs) string {
435                 return fmt.Sprintf(
436                         "SP with %s (nice %s): sending first message",
437                         state.Node.Name,
438                         NicenessFmt(state.Nice),
439                 )
440         })
441         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
442         if err = state.WriteSP(conn, buf, false); err != nil {
443                 state.Ctx.LogE("sp-startI", les, err, func(les LEs) string {
444                         return fmt.Sprintf(
445                                 "SP with %s (nice %s): writing",
446                                 state.Node.Name,
447                                 NicenessFmt(state.Nice),
448                         )
449                 })
450                 state.dirUnlock()
451                 return err
452         }
453         state.Ctx.LogD("sp-startI-wait", les, func(les LEs) string {
454                 return fmt.Sprintf(
455                         "SP with %s (nice %s): waiting for first message",
456                         state.Node.Name,
457                         NicenessFmt(state.Nice),
458                 )
459         })
460         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
461         if buf, err = state.ReadSP(conn); err != nil {
462                 state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string {
463                         return fmt.Sprintf(
464                                 "SP with %s (nice %s): reading",
465                                 state.Node.Name,
466                                 NicenessFmt(state.Nice),
467                         )
468                 })
469                 state.dirUnlock()
470                 return err
471         }
472         payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
473         if err != nil {
474                 state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string {
475                         return fmt.Sprintf(
476                                 "SP with %s (nice %s): reading Noise message",
477                                 state.Node.Name,
478                                 NicenessFmt(state.Nice),
479                         )
480                 })
481                 state.dirUnlock()
482                 return err
483         }
484         state.Ctx.LogD("sp-startI-workers", les, func(les LEs) string {
485                 return fmt.Sprintf(
486                         "SP with %s (nice %s): starting workers",
487                         state.Node.Name,
488                         NicenessFmt(state.Nice),
489                 )
490         })
491         err = state.StartWorkers(conn, infosPayloads, payload)
492         if err != nil {
493                 state.Ctx.LogE("sp-startI-workers", les, err, func(les LEs) string {
494                         return fmt.Sprintf(
495                                 "SP with %s (nice %s): starting workers",
496                                 state.Node.Name,
497                                 NicenessFmt(state.Nice),
498                         )
499                 })
500                 state.dirUnlock()
501         }
502         return err
503 }
504
505 func (state *SPState) StartR(conn ConnDeadlined) error {
506         started := time.Now()
507         conf := noise.Config{
508                 CipherSuite: NoiseCipherSuite,
509                 Pattern:     noise.HandshakeIK,
510                 Initiator:   false,
511                 StaticKeypair: noise.DHKey{
512                         Private: state.Ctx.Self.NoisePrv[:],
513                         Public:  state.Ctx.Self.NoisePub[:],
514                 },
515         }
516         hs, err := noise.NewHandshakeState(conf)
517         if err != nil {
518                 return err
519         }
520         xxOnly := TRxTx("")
521         state.hs = hs
522         state.payloads = make(chan []byte)
523         state.pings = make(chan struct{})
524         state.infosOurSeen = make(map[[MTHSize]byte]uint8)
525         state.infosTheir = make(map[[MTHSize]byte]*SPInfo)
526         state.progressBars = make(map[string]struct{})
527         state.started = started
528         state.xxOnly = xxOnly
529
530         var buf []byte
531         var payload []byte
532         logMsg := func(les LEs) string {
533                 return fmt.Sprintf(
534                         "SP nice %s: waiting for first message",
535                         NicenessFmt(state.Nice),
536                 )
537         }
538         les := LEs{{"Nice", int(state.Nice)}}
539         state.Ctx.LogD("sp-startR", les, logMsg)
540         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
541         if buf, err = state.ReadSP(conn); err != nil {
542                 state.Ctx.LogE("sp-startR-read", les, err, logMsg)
543                 return err
544         }
545         if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
546                 state.Ctx.LogE("sp-startR-read", les, err, logMsg)
547                 return err
548         }
549
550         var node *Node
551         for _, n := range state.Ctx.Neigh {
552                 if subtle.ConstantTimeCompare(state.hs.PeerStatic(), n.NoisePub[:]) == 1 {
553                         node = n
554                         break
555                 }
556         }
557         if node == nil {
558                 peerId := Base32Codec.EncodeToString(state.hs.PeerStatic())
559                 err = errors.New("unknown peer: " + peerId)
560                 state.Ctx.LogE("sp-startR-unknown", append(les, LE{"Peer", peerId}), err, logMsg)
561                 return err
562         }
563         state.Node = node
564         state.rxRate = node.RxRate
565         state.txRate = node.TxRate
566         state.onlineDeadline = node.OnlineDeadline
567         state.maxOnlineTime = node.MaxOnlineTime
568         les = LEs{{"Node", node.Id}, {"Nice", int(state.Nice)}}
569
570         if err = state.Ctx.ensureRxDir(node.Id); err != nil {
571                 return err
572         }
573         var rxLock *os.File
574         if xxOnly == "" || xxOnly == TRx {
575                 rxLock, err = state.Ctx.LockDir(node.Id, string(TRx))
576                 if err != nil {
577                         return err
578                 }
579         }
580         state.rxLock = rxLock
581         var txLock *os.File
582         if xxOnly == "" || xxOnly == TTx {
583                 txLock, err = state.Ctx.LockDir(node.Id, string(TTx))
584                 if err != nil {
585                         return err
586                 }
587         }
588         state.txLock = txLock
589
590         var infosPayloads [][]byte
591         if xxOnly == "" || xxOnly == TTx {
592                 infosPayloads = state.Ctx.infosOur(node.Id, state.Nice, &state.infosOurSeen)
593         }
594         var firstPayload []byte
595         if len(infosPayloads) > 0 {
596                 firstPayload = infosPayloads[0]
597         }
598         // Pad first payload, to hide actual number of existing files
599         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
600                 firstPayload = append(firstPayload, SPHaltMarshalized...)
601         }
602
603         state.Ctx.LogD("sp-startR-write", les, func(les LEs) string {
604                 return fmt.Sprintf(
605                         "SP with %s (nice %s): sending first message",
606                         node.Name, NicenessFmt(state.Nice),
607                 )
608         })
609         buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload)
610         if err != nil {
611                 state.dirUnlock()
612                 return err
613         }
614         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
615         if err = state.WriteSP(conn, buf, false); err != nil {
616                 state.Ctx.LogE("sp-startR-write", les, err, func(les LEs) string {
617                         return fmt.Sprintf(
618                                 "SP with %s (nice %s): writing",
619                                 node.Name, NicenessFmt(state.Nice),
620                         )
621                 })
622                 state.dirUnlock()
623                 return err
624         }
625         state.Ctx.LogD("sp-startR-workers", les, func(les LEs) string {
626                 return fmt.Sprintf(
627                         "SP with %s (nice %s): starting workers",
628                         node.Name, NicenessFmt(state.Nice),
629                 )
630         })
631         err = state.StartWorkers(conn, infosPayloads, payload)
632         if err != nil {
633                 state.dirUnlock()
634         }
635         return err
636 }
637
638 func (state *SPState) closeFd(pth string) {
639         state.fdsLock.Lock()
640         if s, exists := state.fds[pth]; exists {
641                 delete(state.fds, pth)
642                 s.fd.Close()
643         }
644         state.fdsLock.Unlock()
645 }
646
647 func (state *SPState) StartWorkers(
648         conn ConnDeadlined,
649         infosPayloads [][]byte,
650         payload []byte,
651 ) error {
652         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
653         state.fds = make(map[string]FdAndFullSize)
654         state.fileHashers = make(map[string]*MTHAndOffset)
655         state.isDead = make(chan struct{})
656         if state.maxOnlineTime > 0 {
657                 state.mustFinishAt = state.started.Add(state.maxOnlineTime)
658         }
659         if !state.NoCK {
660                 spCheckerOnce.Do(func() { go SPChecker(state.Ctx) })
661                 go func() {
662                         for job := range state.Ctx.JobsNoCK(state.Node.Id) {
663                                 if job.PktEnc.Nice <= state.Nice {
664                                         spCheckerTasks <- SPCheckerTask{
665                                                 nodeId: state.Node.Id,
666                                                 hsh:    job.HshValue,
667                                                 done:   state.payloads,
668                                         }
669                                 }
670                         }
671                 }()
672         }
673
674         // Remaining handshake payload sending
675         if len(infosPayloads) > 1 {
676                 state.wg.Add(1)
677                 go func() {
678                         for _, payload := range infosPayloads[1:] {
679                                 state.Ctx.LogD(
680                                         "sp-queue-remaining",
681                                         append(les, LE{"Size", int64(len(payload))}),
682                                         func(les LEs) string {
683                                                 return fmt.Sprintf(
684                                                         "SP with %s (nice %s): queuing remaining payload (%s)",
685                                                         state.Node.Name, NicenessFmt(state.Nice),
686                                                         humanize.IBytes(uint64(len(payload))),
687                                                 )
688                                         },
689                                 )
690                                 state.payloads <- payload
691                         }
692                         state.wg.Done()
693                 }()
694         }
695
696         // Processing of first payload and queueing its responses
697         logMsg := func(les LEs) string {
698                 return fmt.Sprintf(
699                         "SP with %s (nice %s): processing first payload (%s)",
700                         state.Node.Name, NicenessFmt(state.Nice),
701                         humanize.IBytes(uint64(len(payload))),
702                 )
703         }
704         state.Ctx.LogD("sp-process", append(les, LE{"Size", int64(len(payload))}), logMsg)
705         replies, err := state.ProcessSP(payload)
706         if err != nil {
707                 state.Ctx.LogE("sp-process", les, err, logMsg)
708                 return err
709         }
710         state.wg.Add(1)
711         go func() {
712                 for _, reply := range replies {
713                         state.Ctx.LogD(
714                                 "sp-queue-reply",
715                                 append(les, LE{"Size", int64(len(reply))}),
716                                 func(les LEs) string {
717                                         return fmt.Sprintf(
718                                                 "SP with %s (nice %s): queuing reply (%s)",
719                                                 state.Node.Name, NicenessFmt(state.Nice),
720                                                 humanize.IBytes(uint64(len(payload))),
721                                         )
722                                 },
723                         )
724                         state.payloads <- reply
725                 }
726                 state.wg.Done()
727         }()
728
729         // Periodic jobs
730         state.wg.Add(1)
731         go func() {
732                 deadlineTicker := time.NewTicker(time.Second)
733                 pingTicker := time.NewTicker(PingTimeout)
734                 for {
735                         select {
736                         case <-state.isDead:
737                                 state.wg.Done()
738                                 deadlineTicker.Stop()
739                                 pingTicker.Stop()
740                                 return
741                         case now := <-deadlineTicker.C:
742                                 if now.Sub(state.RxLastNonPing) >= state.onlineDeadline &&
743                                         now.Sub(state.TxLastNonPing) >= state.onlineDeadline {
744                                         goto Deadlined
745                                 }
746                                 if state.maxOnlineTime > 0 && state.mustFinishAt.Before(now) {
747                                         goto Deadlined
748                                 }
749                                 if now.Sub(state.RxLastSeen) >= 2*PingTimeout {
750                                         goto Deadlined
751                                 }
752                                 break
753                         Deadlined:
754                                 state.SetDead()
755                                 conn.Close() // #nosec G104
756                         case now := <-pingTicker.C:
757                                 if now.After(state.TxLastSeen.Add(PingTimeout)) {
758                                         state.wg.Add(1)
759                                         go func() {
760                                                 state.pings <- struct{}{}
761                                                 state.wg.Done()
762                                         }()
763                                 }
764                         }
765                 }
766         }()
767
768         // Spool checker and INFOs sender of appearing files
769         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
770                 state.wg.Add(1)
771                 go func() {
772                         ticker := time.NewTicker(time.Second)
773                         for {
774                                 select {
775                                 case <-state.isDead:
776                                         state.wg.Done()
777                                         ticker.Stop()
778                                         return
779                                 case <-ticker.C:
780                                         for _, payload := range state.Ctx.infosOur(
781                                                 state.Node.Id,
782                                                 state.Nice,
783                                                 &state.infosOurSeen,
784                                         ) {
785                                                 state.Ctx.LogD(
786                                                         "sp-queue-info",
787                                                         append(les, LE{"Size", int64(len(payload))}),
788                                                         func(les LEs) string {
789                                                                 return fmt.Sprintf(
790                                                                         "SP with %s (nice %s): queuing new info (%s)",
791                                                                         state.Node.Name, NicenessFmt(state.Nice),
792                                                                         humanize.IBytes(uint64(len(payload))),
793                                                                 )
794                                                         },
795                                                 )
796                                                 state.payloads <- payload
797                                         }
798                                 }
799                         }
800                 }()
801         }
802
803         // Sender
804         state.wg.Add(1)
805         go func() {
806                 defer conn.Close()
807                 defer state.SetDead()
808                 defer state.wg.Done()
809                 buf := make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
810                 for {
811                         if state.NotAlive() {
812                                 return
813                         }
814                         var payload []byte
815                         var ping bool
816                         select {
817                         case <-state.pings:
818                                 state.Ctx.LogD("sp-got-ping", les, func(les LEs) string {
819                                         return fmt.Sprintf(
820                                                 "SP with %s (nice %s): got ping",
821                                                 state.Node.Name, NicenessFmt(state.Nice),
822                                         )
823                                 })
824                                 payload = SPPingMarshalized
825                                 ping = true
826                         case payload = <-state.payloads:
827                                 state.Ctx.LogD(
828                                         "sp-got-payload",
829                                         append(les, LE{"Size", int64(len(payload))}),
830                                         func(les LEs) string {
831                                                 return fmt.Sprintf(
832                                                         "SP with %s (nice %s): got payload (%s)",
833                                                         state.Node.Name, NicenessFmt(state.Nice),
834                                                         humanize.IBytes(uint64(len(payload))),
835                                                 )
836                                         },
837                                 )
838                         default:
839                                 state.RLock()
840                                 if len(state.queueTheir) == 0 {
841                                         state.RUnlock()
842                                         time.Sleep(100 * time.Millisecond)
843                                         continue
844                                 }
845                                 freq := state.queueTheir[0].freq
846                                 state.RUnlock()
847                                 if state.txRate > 0 {
848                                         time.Sleep(time.Second / time.Duration(state.txRate))
849                                 }
850                                 pktName := Base32Codec.EncodeToString(freq.Hash[:])
851                                 lesp := append(
852                                         les,
853                                         LE{"XX", string(TTx)},
854                                         LE{"Pkt", pktName},
855                                         LE{"Size", int64(freq.Offset)},
856                                 )
857                                 logMsg := func(les LEs) string {
858                                         return fmt.Sprintf(
859                                                 "SP with %s (nice %s): tx/%s (%s)",
860                                                 state.Node.Name, NicenessFmt(state.Nice),
861                                                 pktName,
862                                                 humanize.IBytes(freq.Offset),
863                                         )
864                                 }
865                                 state.Ctx.LogD("sp-queue", lesp, func(les LEs) string {
866                                         return logMsg(les) + ": queueing"
867                                 })
868                                 pth := filepath.Join(
869                                         state.Ctx.Spool,
870                                         state.Node.Id.String(),
871                                         string(TTx),
872                                         Base32Codec.EncodeToString(freq.Hash[:]),
873                                 )
874                                 state.fdsLock.RLock()
875                                 fdAndFullSize, exists := state.fds[pth]
876                                 state.fdsLock.RUnlock()
877                                 if !exists {
878                                         state.Ctx.LogD("sp-queue-open", lesp, func(les LEs) string {
879                                                 return logMsg(les) + ": opening"
880                                         })
881                                         fd, err := os.Open(pth)
882                                         if err != nil {
883                                                 state.Ctx.LogE("sp-queue-open", lesp, err, func(les LEs) string {
884                                                         return logMsg(les) + ": opening"
885                                                 })
886                                                 return
887                                         }
888                                         fi, err := fd.Stat()
889                                         if err != nil {
890                                                 state.Ctx.LogE("sp-queue-stat", lesp, err, func(les LEs) string {
891                                                         return logMsg(les) + ": stating"
892                                                 })
893                                                 return
894                                         }
895                                         fdAndFullSize = FdAndFullSize{fd: fd, fullSize: fi.Size()}
896                                         state.fdsLock.Lock()
897                                         state.fds[pth] = fdAndFullSize
898                                         state.fdsLock.Unlock()
899                                 }
900                                 fd := fdAndFullSize.fd
901                                 fullSize := fdAndFullSize.fullSize
902                                 lesp = append(lesp, LE{"FullSize", fullSize})
903                                 var bufRead []byte
904                                 if freq.Offset < uint64(fullSize) {
905                                         state.Ctx.LogD("sp-file-seek", lesp, func(les LEs) string {
906                                                 return logMsg(les) + ": seeking"
907                                         })
908                                         if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil {
909                                                 state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
910                                                         return logMsg(les) + ": seeking"
911                                                 })
912                                                 return
913                                         }
914                                         n, err := fd.Read(buf)
915                                         if err != nil {
916                                                 state.Ctx.LogE("sp-file-read", lesp, err, func(les LEs) string {
917                                                         return logMsg(les) + ": reading"
918                                                 })
919                                                 return
920                                         }
921                                         bufRead = buf[:n]
922                                         lesp = append(
923                                                 les,
924                                                 LE{"XX", string(TTx)},
925                                                 LE{"Pkt", pktName},
926                                                 LE{"Size", int64(n)},
927                                                 LE{"FullSize", fullSize},
928                                         )
929                                         state.Ctx.LogD("sp-file-read", lesp, func(les LEs) string {
930                                                 return fmt.Sprintf(
931                                                         "%s: read %s",
932                                                         logMsg(les), humanize.IBytes(uint64(n)),
933                                                 )
934                                         })
935                                 } else {
936                                         state.closeFd(pth)
937                                 }
938                                 payload = MarshalSP(SPTypeFile, SPFile{
939                                         Hash:    freq.Hash,
940                                         Offset:  freq.Offset,
941                                         Payload: bufRead,
942                                 })
943                                 ourSize := freq.Offset + uint64(len(bufRead))
944                                 lesp = append(
945                                         les,
946                                         LE{"XX", string(TTx)},
947                                         LE{"Pkt", pktName},
948                                         LE{"Size", int64(ourSize)},
949                                         LE{"FullSize", fullSize},
950                                 )
951                                 if state.Ctx.ShowPrgrs {
952                                         state.progressBars[pktName] = struct{}{}
953                                         Progress("Tx", lesp)
954                                 }
955                                 if ourSize == uint64(fullSize) {
956                                         state.closeFd(pth)
957                                         state.Ctx.LogD("sp-file-finished", lesp, func(les LEs) string {
958                                                 return logMsg(les) + ": finished"
959                                         })
960                                         if state.Ctx.ShowPrgrs {
961                                                 delete(state.progressBars, pktName)
962                                         }
963                                 }
964                                 state.Lock()
965                                 for i, q := range state.queueTheir {
966                                         if *q.freq.Hash != *freq.Hash {
967                                                 continue
968                                         }
969                                         if ourSize == uint64(fullSize) {
970                                                 state.queueTheir = append(
971                                                         state.queueTheir[:i],
972                                                         state.queueTheir[i+1:]...,
973                                                 )
974                                         } else {
975                                                 q.freq.Offset = ourSize
976                                         }
977                                         break
978                                 }
979                                 state.Unlock()
980                         }
981                         logMsg := func(les LEs) string {
982                                 return fmt.Sprintf(
983                                         "SP with %s (nice %s): sending %s",
984                                         state.Node.Name, NicenessFmt(state.Nice),
985                                         humanize.IBytes(uint64(len(payload))),
986                                 )
987                         }
988                         state.Ctx.LogD("sp-sending", append(les, LE{"Size", int64(len(payload))}), logMsg)
989                         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
990                         ct, err := state.csOur.Encrypt(nil, nil, payload)
991                         if err != nil {
992                                 state.Ctx.LogE("sp-encrypting", les, err, logMsg)
993                                 return
994                         }
995                         if err := state.WriteSP(conn, ct, ping); err != nil {
996                                 state.Ctx.LogE("sp-sending", les, err, logMsg)
997                                 return
998                         }
999                 }
1000         }()
1001
1002         // Receiver
1003         state.wg.Add(1)
1004         go func() {
1005                 for {
1006                         if state.NotAlive() {
1007                                 break
1008                         }
1009                         logMsg := func(les LEs) string {
1010                                 return fmt.Sprintf(
1011                                         "SP with %s (nice %s): waiting for payload",
1012                                         state.Node.Name, NicenessFmt(state.Nice),
1013                                 )
1014                         }
1015                         state.Ctx.LogD("sp-recv-wait", les, logMsg)
1016                         conn.SetReadDeadline(time.Now().Add(DefaultDeadline)) // #nosec G104
1017                         payload, err := state.ReadSP(conn)
1018                         if err != nil {
1019                                 if err == io.EOF {
1020                                         break
1021                                 }
1022                                 unmarshalErr := err.(*xdr.UnmarshalError)
1023                                 if os.IsTimeout(unmarshalErr.Err) {
1024                                         continue
1025                                 }
1026                                 if unmarshalErr.ErrorCode == xdr.ErrIO {
1027                                         break
1028                                 }
1029                                 state.Ctx.LogE("sp-recv-wait", les, err, logMsg)
1030                                 break
1031                         }
1032                         logMsg = func(les LEs) string {
1033                                 return fmt.Sprintf(
1034                                         "SP with %s (nice %s): payload (%s)",
1035                                         state.Node.Name, NicenessFmt(state.Nice),
1036                                         humanize.IBytes(uint64(len(payload))),
1037                                 )
1038                         }
1039                         state.Ctx.LogD(
1040                                 "sp-recv-got",
1041                                 append(les, LE{"Size", int64(len(payload))}),
1042                                 func(les LEs) string { return logMsg(les) + ": got" },
1043                         )
1044                         payload, err = state.csTheir.Decrypt(nil, nil, payload)
1045                         if err != nil {
1046                                 state.Ctx.LogE("sp-recv-got", les, err, func(les LEs) string {
1047                                         return logMsg(les) + ": got"
1048                                 })
1049                                 break
1050                         }
1051                         state.Ctx.LogD(
1052                                 "sp-recv-process",
1053                                 append(les, LE{"Size", int64(len(payload))}),
1054                                 func(les LEs) string {
1055                                         return logMsg(les) + ": processing"
1056                                 },
1057                         )
1058                         replies, err := state.ProcessSP(payload)
1059                         if err != nil {
1060                                 state.Ctx.LogE("sp-recv-process", les, err, func(les LEs) string {
1061                                         return logMsg(les) + ": processing"
1062                                 })
1063                                 break
1064                         }
1065                         state.wg.Add(1)
1066                         go func() {
1067                                 for _, reply := range replies {
1068                                         state.Ctx.LogD(
1069                                                 "sp-recv-reply",
1070                                                 append(les[:len(les)-1], LE{"Size", int64(len(reply))}),
1071                                                 func(les LEs) string {
1072                                                         return fmt.Sprintf(
1073                                                                 "SP with %s (nice %s): queuing reply (%s)",
1074                                                                 state.Node.Name, NicenessFmt(state.Nice),
1075                                                                 humanize.IBytes(uint64(len(reply))),
1076                                                         )
1077                                                 },
1078                                         )
1079                                         state.payloads <- reply
1080                                 }
1081                                 state.wg.Done()
1082                         }()
1083                         if state.rxRate > 0 {
1084                                 time.Sleep(time.Second / time.Duration(state.rxRate))
1085                         }
1086                 }
1087                 state.SetDead()
1088                 state.wg.Done()
1089                 state.SetDead()
1090                 conn.Close() // #nosec G104
1091         }()
1092
1093         return nil
1094 }
1095
1096 func (state *SPState) Wait() {
1097         state.wg.Wait()
1098         close(state.payloads)
1099         close(state.pings)
1100         state.Duration = time.Now().Sub(state.started)
1101         state.dirUnlock()
1102         state.RxSpeed = state.RxBytes
1103         state.TxSpeed = state.TxBytes
1104         rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
1105         txDuration := int64(state.TxLastSeen.Sub(state.started).Seconds())
1106         if rxDuration > 0 {
1107                 state.RxSpeed = state.RxBytes / rxDuration
1108         }
1109         if txDuration > 0 {
1110                 state.TxSpeed = state.TxBytes / txDuration
1111         }
1112         for _, s := range state.fds {
1113                 s.fd.Close()
1114         }
1115         for pktName := range state.progressBars {
1116                 ProgressKill(pktName)
1117         }
1118 }
1119
1120 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
1121         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
1122         r := bytes.NewReader(payload)
1123         var err error
1124         var replies [][]byte
1125         var infosGot bool
1126         for r.Len() > 0 {
1127                 state.Ctx.LogD("sp-process-unmarshal", les, func(les LEs) string {
1128                         return fmt.Sprintf(
1129                                 "SP with %s (nice %s): unmarshaling header",
1130                                 state.Node.Name, NicenessFmt(state.Nice),
1131                         )
1132                 })
1133                 var head SPHead
1134                 if _, err = xdr.Unmarshal(r, &head); err != nil {
1135                         state.Ctx.LogE("sp-process-unmarshal", les, err, func(les LEs) string {
1136                                 return fmt.Sprintf(
1137                                         "SP with %s (nice %s): unmarshaling header",
1138                                         state.Node.Name, NicenessFmt(state.Nice),
1139                                 )
1140                         })
1141                         return nil, err
1142                 }
1143                 if head.Type != SPTypePing {
1144                         state.RxLastNonPing = state.RxLastSeen
1145                 }
1146                 switch head.Type {
1147                 case SPTypeHalt:
1148                         state.Ctx.LogD(
1149                                 "sp-process-halt",
1150                                 append(les, LE{"Type", "halt"}), func(les LEs) string {
1151                                         return fmt.Sprintf(
1152                                                 "SP with %s (nice %s): got HALT",
1153                                                 state.Node.Name, NicenessFmt(state.Nice),
1154                                         )
1155                                 },
1156                         )
1157                         state.Lock()
1158                         state.queueTheir = nil
1159                         state.Unlock()
1160
1161                 case SPTypePing:
1162                         state.Ctx.LogD(
1163                                 "sp-process-ping",
1164                                 append(les, LE{"Type", "ping"}),
1165                                 func(les LEs) string {
1166                                         return fmt.Sprintf(
1167                                                 "SP with %s (nice %s): got PING",
1168                                                 state.Node.Name, NicenessFmt(state.Nice),
1169                                         )
1170                                 },
1171                         )
1172
1173                 case SPTypeInfo:
1174                         infosGot = true
1175                         lesp := append(les, LE{"Type", "info"})
1176                         state.Ctx.LogD(
1177                                 "sp-process-info-unmarshal", lesp,
1178                                 func(les LEs) string {
1179                                         return fmt.Sprintf(
1180                                                 "SP with %s (nice %s): unmarshaling INFO",
1181                                                 state.Node.Name, NicenessFmt(state.Nice),
1182                                         )
1183                                 },
1184                         )
1185                         var info SPInfo
1186                         if _, err = xdr.Unmarshal(r, &info); err != nil {
1187                                 state.Ctx.LogE(
1188                                         "sp-process-info-unmarshal", lesp, err,
1189                                         func(les LEs) string {
1190                                                 return fmt.Sprintf(
1191                                                         "SP with %s (nice %s): unmarshaling INFO",
1192                                                         state.Node.Name, NicenessFmt(state.Nice),
1193                                                 )
1194                                         },
1195                                 )
1196                                 return nil, err
1197                         }
1198                         pktName := Base32Codec.EncodeToString(info.Hash[:])
1199                         lesp = append(
1200                                 lesp,
1201                                 LE{"Pkt", pktName},
1202                                 LE{"Size", int64(info.Size)},
1203                                 LE{"PktNice", int(info.Nice)},
1204                         )
1205                         logMsg := func(les LEs) string {
1206                                 return fmt.Sprintf(
1207                                         "SP with %s (nice %s): INFO %s (%s) nice %s",
1208                                         state.Node.Name, NicenessFmt(state.Nice),
1209                                         pktName,
1210                                         humanize.IBytes(info.Size),
1211                                         NicenessFmt(info.Nice),
1212                                 )
1213                         }
1214                         if !state.listOnly && info.Nice > state.Nice {
1215                                 state.Ctx.LogD("sp-process-info-too-nice", lesp, func(les LEs) string {
1216                                         return logMsg(les) + ": too nice"
1217                                 })
1218                                 continue
1219                         }
1220                         state.Ctx.LogD("sp-process-info-got", lesp, func(les LEs) string {
1221                                 return logMsg(les) + ": received"
1222                         })
1223                         if !state.listOnly && state.xxOnly == TTx {
1224                                 continue
1225                         }
1226                         state.Lock()
1227                         state.infosTheir[*info.Hash] = &info
1228                         state.Unlock()
1229                         state.Ctx.LogD("sp-process-info-stat", lesp, func(les LEs) string {
1230                                 return logMsg(les) + ": stating part"
1231                         })
1232                         pktPath := filepath.Join(
1233                                 state.Ctx.Spool,
1234                                 state.Node.Id.String(),
1235                                 string(TRx),
1236                                 Base32Codec.EncodeToString(info.Hash[:]),
1237                         )
1238                         logMsg = func(les LEs) string {
1239                                 return fmt.Sprintf(
1240                                         "Packet %s (%s) (nice %s)",
1241                                         pktName,
1242                                         humanize.IBytes(info.Size),
1243                                         NicenessFmt(info.Nice),
1244                                 )
1245                         }
1246                         if _, err = os.Stat(pktPath); err == nil {
1247                                 state.Ctx.LogI("sp-info-done", lesp, func(les LEs) string {
1248                                         return logMsg(les) + ": already done"
1249                                 })
1250                                 if !state.listOnly {
1251                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
1252                                 }
1253                                 continue
1254                         }
1255                         if _, err = os.Stat(pktPath + SeenSuffix); err == nil {
1256                                 state.Ctx.LogI("sp-info-seen", lesp, func(les LEs) string {
1257                                         return logMsg(les) + ": already seen"
1258                                 })
1259                                 if !state.listOnly {
1260                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
1261                                 }
1262                                 continue
1263                         }
1264                         if _, err = os.Stat(pktPath + NoCKSuffix); err == nil {
1265                                 state.Ctx.LogI("sp-info-nock", lesp, func(les LEs) string {
1266                                         return logMsg(les) + ": still not checksummed"
1267                                 })
1268                                 continue
1269                         }
1270                         fi, err := os.Stat(pktPath + PartSuffix)
1271                         var offset int64
1272                         if err == nil {
1273                                 offset = fi.Size()
1274                         }
1275                         if !state.Ctx.IsEnoughSpace(int64(info.Size) - offset) {
1276                                 state.Ctx.LogI("sp-info-no-space", lesp, func(les LEs) string {
1277                                         return logMsg(les) + ": not enough space"
1278                                 })
1279                                 continue
1280                         }
1281                         state.Ctx.LogI(
1282                                 "sp-info",
1283                                 append(lesp, LE{"Offset", offset}),
1284                                 func(les LEs) string {
1285                                         return fmt.Sprintf(
1286                                                 "%s: %d%%", logMsg(les), 100*uint64(offset)/info.Size,
1287                                         )
1288                                 },
1289                         )
1290                         if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) {
1291                                 replies = append(replies, MarshalSP(
1292                                         SPTypeFreq,
1293                                         SPFreq{info.Hash, uint64(offset)},
1294                                 ))
1295                         }
1296
1297                 case SPTypeFile:
1298                         lesp := append(les, LE{"Type", "file"})
1299                         state.Ctx.LogD("sp-process-file", lesp, func(les LEs) string {
1300                                 return fmt.Sprintf(
1301                                         "SP with %s (nice %s): unmarshaling FILE",
1302                                         state.Node.Name, NicenessFmt(state.Nice),
1303                                 )
1304                         })
1305                         var file SPFile
1306                         if _, err = xdr.Unmarshal(r, &file); err != nil {
1307                                 state.Ctx.LogE("sp-process-file", lesp, err, func(les LEs) string {
1308                                         return fmt.Sprintf(
1309                                                 "SP with %s (nice %s): unmarshaling FILE",
1310                                                 state.Node.Name, NicenessFmt(state.Nice),
1311                                         )
1312                                 })
1313                                 return nil, err
1314                         }
1315                         pktName := Base32Codec.EncodeToString(file.Hash[:])
1316                         lesp = append(
1317                                 lesp,
1318                                 LE{"XX", string(TRx)},
1319                                 LE{"Pkt", pktName},
1320                                 LE{"Size", int64(len(file.Payload))},
1321                         )
1322                         logMsg := func(les LEs) string {
1323                                 return fmt.Sprintf(
1324                                         "Got packet %s (%s)",
1325                                         pktName, humanize.IBytes(uint64(len(file.Payload))),
1326                                 )
1327                         }
1328                         fullsize := int64(0)
1329                         state.RLock()
1330                         infoTheir := state.infosTheir[*file.Hash]
1331                         state.RUnlock()
1332                         if infoTheir == nil {
1333                                 state.Ctx.LogE("sp-file-open", lesp, err, func(les LEs) string {
1334                                         return logMsg(les) + ": unknown file"
1335                                 })
1336                                 continue
1337                         }
1338                         fullsize = int64(infoTheir.Size)
1339                         lesp = append(lesp, LE{"FullSize", fullsize})
1340                         dirToSync := filepath.Join(
1341                                 state.Ctx.Spool,
1342                                 state.Node.Id.String(),
1343                                 string(TRx),
1344                         )
1345                         filePath := filepath.Join(dirToSync, pktName)
1346                         filePathPart := filePath + PartSuffix
1347                         state.Ctx.LogD("sp-file-open", lesp, func(les LEs) string {
1348                                 return logMsg(les) + ": opening part"
1349                         })
1350                         state.fdsLock.RLock()
1351                         fdAndFullSize, exists := state.fds[filePathPart]
1352                         state.fdsLock.RUnlock()
1353                         hasherAndOffset := state.fileHashers[filePath]
1354                         var fd *os.File
1355                         if exists {
1356                                 fd = fdAndFullSize.fd
1357                         } else {
1358                                 fd, err = os.OpenFile(
1359                                         filePathPart,
1360                                         os.O_RDWR|os.O_CREATE,
1361                                         os.FileMode(0666),
1362                                 )
1363                                 if err != nil {
1364                                         state.Ctx.LogE("sp-file-open", lesp, err, func(les LEs) string {
1365                                                 return logMsg(les) + ": opening part"
1366                                         })
1367                                         return nil, err
1368                                 }
1369                                 state.fdsLock.Lock()
1370                                 state.fds[filePathPart] = FdAndFullSize{fd: fd}
1371                                 state.fdsLock.Unlock()
1372                                 if !state.NoCK {
1373                                         hasherAndOffset = &MTHAndOffset{
1374                                                 mth:    MTHNew(fullsize, int64(file.Offset)),
1375                                                 offset: file.Offset,
1376                                         }
1377                                         state.fileHashers[filePath] = hasherAndOffset
1378                                 }
1379                         }
1380                         state.Ctx.LogD(
1381                                 "sp-file-seek",
1382                                 append(lesp, LE{"Offset", file.Offset}),
1383                                 func(les LEs) string {
1384                                         return fmt.Sprintf("%s: seeking %d", logMsg(les), file.Offset)
1385                                 })
1386                         if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
1387                                 state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
1388                                         return logMsg(les) + ": seeking"
1389                                 })
1390                                 state.closeFd(filePathPart)
1391                                 return nil, err
1392                         }
1393                         state.Ctx.LogD("sp-file-write", lesp, func(les LEs) string {
1394                                 return logMsg(les) + ": writing"
1395                         })
1396                         if _, err = fd.Write(file.Payload); err != nil {
1397                                 state.Ctx.LogE("sp-file-write", lesp, err, func(les LEs) string {
1398                                         return logMsg(les) + ": writing"
1399                                 })
1400                                 state.closeFd(filePathPart)
1401                                 return nil, err
1402                         }
1403                         if hasherAndOffset != nil {
1404                                 if hasherAndOffset.offset == file.Offset {
1405                                         if _, err = hasherAndOffset.mth.Write(file.Payload); err != nil {
1406                                                 panic(err)
1407                                         }
1408                                         hasherAndOffset.offset += uint64(len(file.Payload))
1409                                 } else {
1410                                         state.Ctx.LogE(
1411                                                 "sp-file-offset-differs", lesp, errors.New("offset differs"),
1412                                                 func(les LEs) string {
1413                                                         return logMsg(les) + ": deleting hasher"
1414                                                 },
1415                                         )
1416                                         delete(state.fileHashers, filePath)
1417                                         hasherAndOffset = nil
1418                                 }
1419                         }
1420                         ourSize := int64(file.Offset + uint64(len(file.Payload)))
1421                         lesp[len(lesp)-2].V = ourSize
1422                         if state.Ctx.ShowPrgrs {
1423                                 state.progressBars[pktName] = struct{}{}
1424                                 Progress("Rx", lesp)
1425                         }
1426                         if fullsize != ourSize {
1427                                 continue
1428                         }
1429                         if state.Ctx.ShowPrgrs {
1430                                 delete(state.progressBars, pktName)
1431                         }
1432                         logMsg = func(les LEs) string {
1433                                 return fmt.Sprintf(
1434                                         "Got packet %s %d%% (%s / %s)",
1435                                         pktName, 100*ourSize/fullsize,
1436                                         humanize.IBytes(uint64(ourSize)),
1437                                         humanize.IBytes(uint64(fullsize)),
1438                                 )
1439                         }
1440                         err = fd.Sync()
1441                         if err != nil {
1442                                 state.Ctx.LogE("sp-file-sync", lesp, err, func(les LEs) string {
1443                                         return logMsg(les) + ": syncing"
1444                                 })
1445                                 state.closeFd(filePathPart)
1446                                 continue
1447                         }
1448                         if hasherAndOffset != nil {
1449                                 delete(state.fileHashers, filePath)
1450                                 if hasherAndOffset.mth.PrependSize() == 0 {
1451                                         if bytes.Compare(hasherAndOffset.mth.Sum(nil), file.Hash[:]) != 0 {
1452                                                 state.Ctx.LogE(
1453                                                         "sp-file-bad-checksum", lesp,
1454                                                         errors.New("checksum mismatch"),
1455                                                         logMsg,
1456                                                 )
1457                                                 state.closeFd(filePathPart)
1458                                                 continue
1459                                         }
1460                                         if err = os.Rename(filePathPart, filePath); err != nil {
1461                                                 state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string {
1462                                                         return logMsg(les) + ": renaming"
1463                                                 })
1464                                                 state.closeFd(filePathPart)
1465                                                 continue
1466                                         }
1467                                         if err = DirSync(dirToSync); err != nil {
1468                                                 state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string {
1469                                                         return logMsg(les) + ": dirsyncing"
1470                                                 })
1471                                                 state.closeFd(filePathPart)
1472                                                 continue
1473                                         }
1474                                         state.Ctx.LogI("sp-file-done", lesp, func(les LEs) string {
1475                                                 return logMsg(les) + ": done"
1476                                         })
1477                                         state.wg.Add(1)
1478                                         go func() {
1479                                                 state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
1480                                                 state.wg.Done()
1481                                         }()
1482                                         state.Lock()
1483                                         delete(state.infosTheir, *file.Hash)
1484                                         state.Unlock()
1485                                         if !state.Ctx.HdrUsage {
1486                                                 continue
1487                                         }
1488                                         if _, err = fd.Seek(0, io.SeekStart); err != nil {
1489                                                 state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
1490                                                         return logMsg(les) + ": seeking"
1491                                                 })
1492                                                 state.closeFd(filePathPart)
1493                                                 continue
1494                                         }
1495                                         _, pktEncRaw, err := state.Ctx.HdrRead(fd)
1496                                         state.closeFd(filePathPart)
1497                                         if err != nil {
1498                                                 state.Ctx.LogE("sp-file-hdr-read", lesp, err, func(les LEs) string {
1499                                                         return logMsg(les) + ": HdrReading"
1500                                                 })
1501                                                 continue
1502                                         }
1503                                         state.Ctx.HdrWrite(pktEncRaw, filePath)
1504                                         continue
1505                                 }
1506                         }
1507                         state.closeFd(filePathPart)
1508                         if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil {
1509                                 state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string {
1510                                         return logMsg(les) + ": renaming"
1511                                 })
1512                                 continue
1513                         }
1514                         if err = DirSync(dirToSync); err != nil {
1515                                 state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string {
1516                                         return logMsg(les) + ": dirsyncing"
1517                                 })
1518                                 continue
1519                         }
1520                         state.Ctx.LogI("sp-file-downloaded", lesp, func(les LEs) string {
1521                                 return logMsg(les) + ": downloaded"
1522                         })
1523                         state.Lock()
1524                         delete(state.infosTheir, *file.Hash)
1525                         state.Unlock()
1526                         go func() {
1527                                 t := SPCheckerTask{
1528                                         nodeId: state.Node.Id,
1529                                         hsh:    file.Hash,
1530                                         done:   state.payloads,
1531                                 }
1532                                 if hasherAndOffset != nil {
1533                                         t.mth = hasherAndOffset.mth
1534                                 }
1535                                 spCheckerTasks <- t
1536                         }()
1537
1538                 case SPTypeDone:
1539                         lesp := append(les, LE{"Type", "done"})
1540                         state.Ctx.LogD("sp-process-done-unmarshal", lesp, func(les LEs) string {
1541                                 return fmt.Sprintf(
1542                                         "SP with %s (nice %s): unmarshaling DONE",
1543                                         state.Node.Name, NicenessFmt(state.Nice),
1544                                 )
1545                         })
1546                         var done SPDone
1547                         if _, err = xdr.Unmarshal(r, &done); err != nil {
1548                                 state.Ctx.LogE("sp-process-done-unmarshal", lesp, err, func(les LEs) string {
1549                                         return fmt.Sprintf(
1550                                                 "SP with %s (nice %s): unmarshaling DONE",
1551                                                 state.Node.Name, NicenessFmt(state.Nice),
1552                                         )
1553                                 })
1554                                 return nil, err
1555                         }
1556                         pktName := Base32Codec.EncodeToString(done.Hash[:])
1557                         lesp = append(lesp, LE{"Pkt", pktName}, LE{"XX", string(TTx)})
1558                         logMsg := func(les LEs) string {
1559                                 return fmt.Sprintf(
1560                                         "SP with %s (nice %s): DONE: removing %s",
1561                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1562                                 )
1563                         }
1564                         state.Ctx.LogD("sp-done", lesp, logMsg)
1565                         pth := filepath.Join(
1566                                 state.Ctx.Spool,
1567                                 state.Node.Id.String(),
1568                                 string(TTx),
1569                                 pktName,
1570                         )
1571                         if err = os.Remove(pth); err == nil {
1572                                 state.Ctx.LogI("sp-done", lesp, func(les LEs) string {
1573                                         return fmt.Sprintf("Packet %s is sent", pktName)
1574                                 })
1575                                 if state.Ctx.HdrUsage {
1576                                         os.Remove(pth + HdrSuffix)
1577                                 }
1578                         } else {
1579                                 state.Ctx.LogE("sp-done", lesp, err, logMsg)
1580                         }
1581
1582                 case SPTypeFreq:
1583                         lesp := append(les, LE{"Type", "freq"})
1584                         state.Ctx.LogD("sp-process-freq", lesp, func(les LEs) string {
1585                                 return fmt.Sprintf(
1586                                         "SP with %s (nice %s): unmarshaling FREQ",
1587                                         state.Node.Name, NicenessFmt(state.Nice),
1588                                 )
1589                         })
1590                         var freq SPFreq
1591                         if _, err = xdr.Unmarshal(r, &freq); err != nil {
1592                                 state.Ctx.LogE("sp-process-freq", lesp, err, func(les LEs) string {
1593                                         return fmt.Sprintf(
1594                                                 "SP with %s (nice %s): unmarshaling FREQ",
1595                                                 state.Node.Name, NicenessFmt(state.Nice),
1596                                         )
1597                                 })
1598                                 return nil, err
1599                         }
1600                         pktName := Base32Codec.EncodeToString(freq.Hash[:])
1601                         lesp = append(lesp, LE{"Pkt", pktName}, LE{"Offset", freq.Offset})
1602                         state.Ctx.LogD("sp-process-freq-queueing", lesp, func(les LEs) string {
1603                                 return fmt.Sprintf(
1604                                         "SP with %s (nice %s): FREQ %s: queuing",
1605                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1606                                 )
1607                         })
1608                         nice, exists := state.infosOurSeen[*freq.Hash]
1609                         if exists {
1610                                 if state.onlyPkts == nil || !state.onlyPkts[*freq.Hash] {
1611                                         state.Lock()
1612                                         insertIdx := 0
1613                                         var freqWithNice *FreqWithNice
1614                                         for insertIdx, freqWithNice = range state.queueTheir {
1615                                                 if freqWithNice.nice > nice {
1616                                                         break
1617                                                 }
1618                                         }
1619                                         state.queueTheir = append(state.queueTheir, nil)
1620                                         copy(state.queueTheir[insertIdx+1:], state.queueTheir[insertIdx:])
1621                                         state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice}
1622                                         state.Unlock()
1623                                 } else {
1624                                         state.Ctx.LogD("sp-process-freq-skip", lesp, func(les LEs) string {
1625                                                 return fmt.Sprintf(
1626                                                         "SP with %s (nice %s): FREQ %s: skipping",
1627                                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1628                                                 )
1629                                         })
1630                                 }
1631                         } else {
1632                                 state.Ctx.LogD("sp-process-freq-unknown", lesp, func(les LEs) string {
1633                                         return fmt.Sprintf(
1634                                                 "SP with %s (nice %s): FREQ %s: unknown",
1635                                                 state.Node.Name, NicenessFmt(state.Nice), pktName,
1636                                         )
1637                                 })
1638                         }
1639
1640                 default:
1641                         state.Ctx.LogE(
1642                                 "sp-process-type-unknown",
1643                                 append(les, LE{"Type", head.Type}),
1644                                 errors.New("unknown type"),
1645                                 func(les LEs) string {
1646                                         return fmt.Sprintf(
1647                                                 "SP with %s (nice %s): %d",
1648                                                 state.Node.Name, NicenessFmt(state.Nice), head.Type,
1649                                         )
1650                                 },
1651                         )
1652                         return nil, BadPktType
1653                 }
1654         }
1655
1656         if infosGot {
1657                 var pkts int
1658                 var size uint64
1659                 state.RLock()
1660                 for _, info := range state.infosTheir {
1661                         pkts++
1662                         size += info.Size
1663                 }
1664                 state.RUnlock()
1665                 state.Ctx.LogI("sp-infos-rx", LEs{
1666                         {"XX", string(TRx)},
1667                         {"Node", state.Node.Id},
1668                         {"Pkts", pkts},
1669                         {"Size", int64(size)},
1670                 }, func(les LEs) string {
1671                         return fmt.Sprintf(
1672                                 "%s has got for us: %d packets, %s",
1673                                 state.Node.Name, pkts, humanize.IBytes(size),
1674                         )
1675                 })
1676         }
1677         return payloadsSplit(replies), nil
1678 }
1679
1680 func SPChecker(ctx *Ctx) {
1681         for t := range spCheckerTasks {
1682                 pktName := Base32Codec.EncodeToString(t.hsh[:])
1683                 les := LEs{
1684                         {"XX", string(TRx)},
1685                         {"Node", t.nodeId},
1686                         {"Pkt", pktName},
1687                 }
1688                 SPCheckerWg.Add(1)
1689                 ctx.LogD("sp-checker", les, func(les LEs) string {
1690                         return fmt.Sprintf("Checksumming %s/rx/%s", ctx.NodeName(t.nodeId), pktName)
1691                 })
1692                 size, err := ctx.CheckNoCK(t.nodeId, t.hsh, t.mth)
1693                 les = append(les, LE{"Size", size})
1694                 if err != nil {
1695                         ctx.LogE("sp-checker", les, err, func(les LEs) string {
1696                                 return fmt.Sprintf(
1697                                         "Checksumming %s/rx/%s (%s)", ctx.NodeName(t.nodeId), pktName,
1698                                         humanize.IBytes(uint64(size)),
1699                                 )
1700                         })
1701                         SPCheckerWg.Done()
1702                         continue
1703                 }
1704                 ctx.LogI("sp-checker-done", les, func(les LEs) string {
1705                         return fmt.Sprintf(
1706                                 "Packet %s is retreived (%s)",
1707                                 pktName, humanize.IBytes(uint64(size)),
1708                         )
1709                 })
1710                 SPCheckerWg.Done()
1711                 go func(t SPCheckerTask) {
1712                         defer func() { recover() }()
1713                         t.done <- MarshalSP(SPTypeDone, SPDone{t.hsh})
1714                 }(t)
1715         }
1716 }