]> Cypherpunks.ru repositories - nncp.git/blob - src/sp.go
Raise copyright years
[nncp.git] / src / sp.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2022 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bytes"
22         "crypto/subtle"
23         "errors"
24         "fmt"
25         "io"
26         "log"
27         "os"
28         "path/filepath"
29         "sort"
30         "sync"
31         "time"
32
33         xdr "github.com/davecgh/go-xdr/xdr2"
34         "github.com/dustin/go-humanize"
35         "github.com/flynn/noise"
36 )
37
38 const (
39         MaxSPSize      = 1<<16 - 256
40         PartSuffix     = ".part"
41         SPHeadOverhead = 4
42 )
43
44 type MTHAndOffset struct {
45         mth    MTH
46         offset uint64
47 }
48
49 type SPCheckerTask struct {
50         nodeId *NodeId
51         hsh    *[MTHSize]byte
52         mth    MTH
53         done   chan []byte
54 }
55
56 var (
57         SPInfoOverhead    int
58         SPFreqOverhead    int
59         SPFileOverhead    int
60         SPHaltMarshalized []byte
61         SPPingMarshalized []byte
62
63         NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite(
64                 noise.DH25519,
65                 noise.CipherChaChaPoly,
66                 noise.HashBLAKE2b,
67         )
68
69         DefaultDeadline = 10 * time.Second
70         PingTimeout     = time.Minute
71
72         spCheckerTasks chan SPCheckerTask
73         SPCheckerWg    sync.WaitGroup
74         spCheckerOnce  sync.Once
75 )
76
77 type FdAndFullSize struct {
78         fd       *os.File
79         fullSize int64
80 }
81
82 type SPType uint8
83
84 const (
85         SPTypeInfo SPType = iota
86         SPTypeFreq SPType = iota
87         SPTypeFile SPType = iota
88         SPTypeDone SPType = iota
89         SPTypeHalt SPType = iota
90         SPTypePing SPType = iota
91 )
92
93 type SPHead struct {
94         Type SPType
95 }
96
97 type SPInfo struct {
98         Nice uint8
99         Size uint64
100         Hash *[MTHSize]byte
101 }
102
103 type SPFreq struct {
104         Hash   *[MTHSize]byte
105         Offset uint64
106 }
107
108 type SPFile struct {
109         Hash    *[MTHSize]byte
110         Offset  uint64
111         Payload []byte
112 }
113
114 type SPDone struct {
115         Hash *[MTHSize]byte
116 }
117
118 type SPRaw struct {
119         Magic   [8]byte
120         Payload []byte
121 }
122
123 type FreqWithNice struct {
124         freq *SPFreq
125         nice uint8
126 }
127
128 type ConnDeadlined interface {
129         io.ReadWriteCloser
130         SetReadDeadline(t time.Time) error
131         SetWriteDeadline(t time.Time) error
132 }
133
134 func init() {
135         var buf bytes.Buffer
136         spHead := SPHead{Type: SPTypeHalt}
137         if _, err := xdr.Marshal(&buf, spHead); err != nil {
138                 panic(err)
139         }
140         SPHaltMarshalized = make([]byte, SPHeadOverhead)
141         copy(SPHaltMarshalized, buf.Bytes())
142         buf.Reset()
143
144         spHead = SPHead{Type: SPTypePing}
145         if _, err := xdr.Marshal(&buf, spHead); err != nil {
146                 panic(err)
147         }
148         SPPingMarshalized = make([]byte, SPHeadOverhead)
149         copy(SPPingMarshalized, buf.Bytes())
150         buf.Reset()
151
152         spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([MTHSize]byte)}
153         if _, err := xdr.Marshal(&buf, spInfo); err != nil {
154                 panic(err)
155         }
156         SPInfoOverhead = buf.Len()
157         buf.Reset()
158
159         spFreq := SPFreq{Hash: new([MTHSize]byte), Offset: 123}
160         if _, err := xdr.Marshal(&buf, spFreq); err != nil {
161                 panic(err)
162         }
163         SPFreqOverhead = buf.Len()
164         buf.Reset()
165
166         spFile := SPFile{Hash: new([MTHSize]byte), Offset: 123}
167         if _, err := xdr.Marshal(&buf, spFile); err != nil {
168                 panic(err)
169         }
170         SPFileOverhead = buf.Len()
171         spCheckerTasks = make(chan SPCheckerTask)
172 }
173
174 func MarshalSP(typ SPType, sp interface{}) []byte {
175         var buf bytes.Buffer
176         if _, err := xdr.Marshal(&buf, SPHead{typ}); err != nil {
177                 panic(err)
178         }
179         if _, err := xdr.Marshal(&buf, sp); err != nil {
180                 panic(err)
181         }
182         return buf.Bytes()
183 }
184
185 func payloadsSplit(payloads [][]byte) [][]byte {
186         var outbounds [][]byte
187         outbound := make([]byte, 0, MaxSPSize)
188         for i, payload := range payloads {
189                 outbound = append(outbound, payload...)
190                 if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxSPSize {
191                         outbounds = append(outbounds, outbound)
192                         outbound = make([]byte, 0, MaxSPSize)
193                 }
194         }
195         if len(outbound) > 0 {
196                 outbounds = append(outbounds, outbound)
197         }
198         return outbounds
199 }
200
201 type SPState struct {
202         Ctx            *Ctx
203         Node           *Node
204         Nice           uint8
205         NoCK           bool
206         onlineDeadline time.Duration
207         maxOnlineTime  time.Duration
208         hs             *noise.HandshakeState
209         csOur          *noise.CipherState
210         csTheir        *noise.CipherState
211         payloads       chan []byte
212         pings          chan struct{}
213         infosTheir     map[[MTHSize]byte]*SPInfo
214         infosOurSeen   map[[MTHSize]byte]uint8
215         queueTheir     []*FreqWithNice
216         wg             sync.WaitGroup
217         RxBytes        int64
218         RxLastSeen     time.Time
219         RxLastNonPing  time.Time
220         TxBytes        int64
221         TxLastSeen     time.Time
222         TxLastNonPing  time.Time
223         started        time.Time
224         mustFinishAt   time.Time
225         Duration       time.Duration
226         RxSpeed        int64
227         TxSpeed        int64
228         rxLock         *os.File
229         txLock         *os.File
230         xxOnly         TRxTx
231         rxRate         int
232         txRate         int
233         isDead         chan struct{}
234         listOnly       bool
235         onlyPkts       map[[MTHSize]byte]bool
236         writeSPBuf     bytes.Buffer
237         fds            map[string]FdAndFullSize
238         fdsLock        sync.RWMutex
239         fileHashers    map[string]*MTHAndOffset
240         progressBars   map[string]struct{}
241         sync.RWMutex
242 }
243
244 func (state *SPState) SetDead() {
245         state.Lock()
246         defer state.Unlock()
247         select {
248         case <-state.isDead:
249                 // Already closed channel, dead
250                 return
251         default:
252         }
253         close(state.isDead)
254         go func() {
255                 for range state.payloads {
256                 }
257         }()
258         go func() {
259                 for range state.pings {
260                 }
261         }()
262 }
263
264 func (state *SPState) NotAlive() bool {
265         select {
266         case <-state.isDead:
267                 return true
268         default:
269         }
270         return false
271 }
272
273 func (state *SPState) dirUnlock() {
274         state.Ctx.UnlockDir(state.rxLock)
275         state.Ctx.UnlockDir(state.txLock)
276 }
277
278 func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error {
279         state.writeSPBuf.Reset()
280         n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{
281                 Magic:   MagicNNCPSv1.B,
282                 Payload: payload,
283         })
284         if err != nil {
285                 return err
286         }
287         if n, err = dst.Write(state.writeSPBuf.Bytes()); err == nil {
288                 state.TxLastSeen = time.Now()
289                 state.TxBytes += int64(n)
290                 if !ping {
291                         state.TxLastNonPing = state.TxLastSeen
292                 }
293         }
294         return err
295 }
296
297 func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
298         var sp SPRaw
299         n, err := xdr.UnmarshalLimited(src, &sp, 1<<17)
300         if err != nil {
301                 ue := err.(*xdr.UnmarshalError)
302                 if ue.Err == io.EOF {
303                         return nil, ue.Err
304                 }
305                 return nil, err
306         }
307         state.RxLastSeen = time.Now()
308         state.RxBytes += int64(n)
309         if sp.Magic != MagicNNCPSv1.B {
310                 return nil, BadMagic
311         }
312         return sp.Payload, nil
313 }
314
315 func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[MTHSize]byte]uint8) [][]byte {
316         var infos []*SPInfo
317         var totalSize int64
318         for job := range ctx.Jobs(nodeId, TTx) {
319                 if job.PktEnc.Nice > nice {
320                         continue
321                 }
322                 if _, known := (*seen)[*job.HshValue]; known {
323                         continue
324                 }
325                 totalSize += job.Size
326                 infos = append(infos, &SPInfo{
327                         Nice: job.PktEnc.Nice,
328                         Size: uint64(job.Size),
329                         Hash: job.HshValue,
330                 })
331                 (*seen)[*job.HshValue] = job.PktEnc.Nice
332         }
333         sort.Sort(ByNice(infos))
334         var payloads [][]byte
335         for _, info := range infos {
336                 payloads = append(payloads, MarshalSP(SPTypeInfo, info))
337                 pktName := Base32Codec.EncodeToString(info.Hash[:])
338                 ctx.LogD("sp-info-our", LEs{
339                         {"Node", nodeId},
340                         {"Name", pktName},
341                         {"Size", info.Size},
342                 }, func(les LEs) string {
343                         return fmt.Sprintf(
344                                 "Our info: %s/tx/%s (%s)",
345                                 ctx.NodeName(nodeId),
346                                 pktName,
347                                 humanize.IBytes(info.Size),
348                         )
349                 })
350         }
351         if totalSize > 0 {
352                 ctx.LogI("sp-infos-tx", LEs{
353                         {"XX", string(TTx)},
354                         {"Node", nodeId},
355                         {"Pkts", len(payloads)},
356                         {"Size", totalSize},
357                 }, func(les LEs) string {
358                         return fmt.Sprintf(
359                                 "We have got for %s: %d packets, %s",
360                                 ctx.NodeName(nodeId),
361                                 len(payloads),
362                                 humanize.IBytes(uint64(totalSize)),
363                         )
364                 })
365         }
366         return payloadsSplit(payloads)
367 }
368
369 func (state *SPState) StartI(conn ConnDeadlined) error {
370         nodeId := state.Node.Id
371         err := state.Ctx.ensureRxDir(nodeId)
372         if err != nil {
373                 return err
374         }
375         var rxLock *os.File
376         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TRx) {
377                 rxLock, err = state.Ctx.LockDir(nodeId, string(TRx))
378                 if err != nil {
379                         return err
380                 }
381         }
382         var txLock *os.File
383         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
384                 txLock, err = state.Ctx.LockDir(nodeId, string(TTx))
385                 if err != nil {
386                         return err
387                 }
388         }
389         started := time.Now()
390         conf := noise.Config{
391                 CipherSuite: NoiseCipherSuite,
392                 Pattern:     noise.HandshakeIK,
393                 Initiator:   true,
394                 StaticKeypair: noise.DHKey{
395                         Private: state.Ctx.Self.NoisePrv[:],
396                         Public:  state.Ctx.Self.NoisePub[:],
397                 },
398                 PeerStatic: state.Node.NoisePub[:],
399         }
400         hs, err := noise.NewHandshakeState(conf)
401         if err != nil {
402                 return err
403         }
404         state.hs = hs
405         state.payloads = make(chan []byte)
406         state.pings = make(chan struct{})
407         state.infosTheir = make(map[[MTHSize]byte]*SPInfo)
408         state.infosOurSeen = make(map[[MTHSize]byte]uint8)
409         state.progressBars = make(map[string]struct{})
410         state.started = started
411         state.rxLock = rxLock
412         state.txLock = txLock
413
414         var infosPayloads [][]byte
415         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
416                 infosPayloads = state.Ctx.infosOur(nodeId, state.Nice, &state.infosOurSeen)
417         }
418         var firstPayload []byte
419         if len(infosPayloads) > 0 {
420                 firstPayload = infosPayloads[0]
421         }
422         // Pad first payload, to hide actual number of existing files
423         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
424                 firstPayload = append(firstPayload, SPHaltMarshalized...)
425         }
426
427         var buf []byte
428         var payload []byte
429         buf, _, _, err = state.hs.WriteMessage(nil, firstPayload)
430         if err != nil {
431                 state.dirUnlock()
432                 return err
433         }
434         les := LEs{{"Node", nodeId}, {"Nice", int(state.Nice)}}
435         state.Ctx.LogD("sp-startI", les, func(les LEs) string {
436                 return fmt.Sprintf(
437                         "SP with %s (nice %s): sending first message",
438                         state.Node.Name,
439                         NicenessFmt(state.Nice),
440                 )
441         })
442         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline))
443         if err = state.WriteSP(conn, buf, false); err != nil {
444                 state.Ctx.LogE("sp-startI", les, err, func(les LEs) string {
445                         return fmt.Sprintf(
446                                 "SP with %s (nice %s): writing",
447                                 state.Node.Name,
448                                 NicenessFmt(state.Nice),
449                         )
450                 })
451                 state.dirUnlock()
452                 return err
453         }
454         state.Ctx.LogD("sp-startI-wait", les, func(les LEs) string {
455                 return fmt.Sprintf(
456                         "SP with %s (nice %s): waiting for first message",
457                         state.Node.Name,
458                         NicenessFmt(state.Nice),
459                 )
460         })
461         conn.SetReadDeadline(time.Now().Add(DefaultDeadline))
462         if buf, err = state.ReadSP(conn); err != nil {
463                 state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string {
464                         return fmt.Sprintf(
465                                 "SP with %s (nice %s): reading",
466                                 state.Node.Name,
467                                 NicenessFmt(state.Nice),
468                         )
469                 })
470                 state.dirUnlock()
471                 return err
472         }
473         payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
474         if err != nil {
475                 state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string {
476                         return fmt.Sprintf(
477                                 "SP with %s (nice %s): reading Noise message",
478                                 state.Node.Name,
479                                 NicenessFmt(state.Nice),
480                         )
481                 })
482                 state.dirUnlock()
483                 return err
484         }
485         state.Ctx.LogD("sp-startI-workers", les, func(les LEs) string {
486                 return fmt.Sprintf(
487                         "SP with %s (nice %s): starting workers",
488                         state.Node.Name,
489                         NicenessFmt(state.Nice),
490                 )
491         })
492         err = state.StartWorkers(conn, infosPayloads, payload)
493         if err != nil {
494                 state.Ctx.LogE("sp-startI-workers", les, err, func(les LEs) string {
495                         return fmt.Sprintf(
496                                 "SP with %s (nice %s): starting workers",
497                                 state.Node.Name,
498                                 NicenessFmt(state.Nice),
499                         )
500                 })
501                 state.dirUnlock()
502         }
503         return err
504 }
505
506 func (state *SPState) StartR(conn ConnDeadlined) error {
507         started := time.Now()
508         conf := noise.Config{
509                 CipherSuite: NoiseCipherSuite,
510                 Pattern:     noise.HandshakeIK,
511                 Initiator:   false,
512                 StaticKeypair: noise.DHKey{
513                         Private: state.Ctx.Self.NoisePrv[:],
514                         Public:  state.Ctx.Self.NoisePub[:],
515                 },
516         }
517         hs, err := noise.NewHandshakeState(conf)
518         if err != nil {
519                 return err
520         }
521         xxOnly := TRxTx("")
522         state.hs = hs
523         state.payloads = make(chan []byte)
524         state.pings = make(chan struct{})
525         state.infosOurSeen = make(map[[MTHSize]byte]uint8)
526         state.infosTheir = make(map[[MTHSize]byte]*SPInfo)
527         state.progressBars = make(map[string]struct{})
528         state.started = started
529         state.xxOnly = xxOnly
530
531         var buf []byte
532         var payload []byte
533         logMsg := func(les LEs) string {
534                 return fmt.Sprintf(
535                         "SP nice %s: waiting for first message",
536                         NicenessFmt(state.Nice),
537                 )
538         }
539         les := LEs{{"Nice", int(state.Nice)}}
540         state.Ctx.LogD("sp-startR", les, logMsg)
541         conn.SetReadDeadline(time.Now().Add(DefaultDeadline))
542         if buf, err = state.ReadSP(conn); err != nil {
543                 state.Ctx.LogE("sp-startR-read", les, err, logMsg)
544                 return err
545         }
546         if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
547                 state.Ctx.LogE("sp-startR-read", les, err, logMsg)
548                 return err
549         }
550
551         var node *Node
552         for _, n := range state.Ctx.Neigh {
553                 if n.NoisePub == nil {
554                         continue
555                 }
556                 if subtle.ConstantTimeCompare(state.hs.PeerStatic(), n.NoisePub[:]) == 1 {
557                         node = n
558                         break
559                 }
560         }
561         if node == nil {
562                 peerId := Base32Codec.EncodeToString(state.hs.PeerStatic())
563                 err = errors.New("unknown peer: " + peerId)
564                 state.Ctx.LogE("sp-startR-unknown", append(les, LE{"Peer", peerId}), err, logMsg)
565                 return err
566         }
567         state.Node = node
568         state.rxRate = node.RxRate
569         state.txRate = node.TxRate
570         state.onlineDeadline = node.OnlineDeadline
571         state.maxOnlineTime = node.MaxOnlineTime
572         les = LEs{{"Node", node.Id}, {"Nice", int(state.Nice)}}
573
574         if err = state.Ctx.ensureRxDir(node.Id); err != nil {
575                 return err
576         }
577         var rxLock *os.File
578         if xxOnly == "" || xxOnly == TRx {
579                 rxLock, err = state.Ctx.LockDir(node.Id, string(TRx))
580                 if err != nil {
581                         return err
582                 }
583         }
584         state.rxLock = rxLock
585         var txLock *os.File
586         if xxOnly == "" || xxOnly == TTx {
587                 txLock, err = state.Ctx.LockDir(node.Id, string(TTx))
588                 if err != nil {
589                         return err
590                 }
591         }
592         state.txLock = txLock
593
594         var infosPayloads [][]byte
595         if xxOnly == "" || xxOnly == TTx {
596                 infosPayloads = state.Ctx.infosOur(node.Id, state.Nice, &state.infosOurSeen)
597         }
598         var firstPayload []byte
599         if len(infosPayloads) > 0 {
600                 firstPayload = infosPayloads[0]
601         }
602         // Pad first payload, to hide actual number of existing files
603         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
604                 firstPayload = append(firstPayload, SPHaltMarshalized...)
605         }
606
607         state.Ctx.LogD("sp-startR-write", les, func(les LEs) string {
608                 return fmt.Sprintf(
609                         "SP with %s (nice %s): sending first message",
610                         node.Name, NicenessFmt(state.Nice),
611                 )
612         })
613         buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload)
614         if err != nil {
615                 state.dirUnlock()
616                 return err
617         }
618         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline))
619         if err = state.WriteSP(conn, buf, false); err != nil {
620                 state.Ctx.LogE("sp-startR-write", les, err, func(les LEs) string {
621                         return fmt.Sprintf(
622                                 "SP with %s (nice %s): writing",
623                                 node.Name, NicenessFmt(state.Nice),
624                         )
625                 })
626                 state.dirUnlock()
627                 return err
628         }
629         state.Ctx.LogD("sp-startR-workers", les, func(les LEs) string {
630                 return fmt.Sprintf(
631                         "SP with %s (nice %s): starting workers",
632                         node.Name, NicenessFmt(state.Nice),
633                 )
634         })
635         err = state.StartWorkers(conn, infosPayloads, payload)
636         if err != nil {
637                 state.dirUnlock()
638         }
639         return err
640 }
641
642 func (state *SPState) closeFd(pth string) {
643         state.fdsLock.Lock()
644         if s, exists := state.fds[pth]; exists {
645                 delete(state.fds, pth)
646                 s.fd.Close()
647         }
648         state.fdsLock.Unlock()
649 }
650
651 func (state *SPState) StartWorkers(
652         conn ConnDeadlined,
653         infosPayloads [][]byte,
654         payload []byte,
655 ) error {
656         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
657         state.fds = make(map[string]FdAndFullSize)
658         state.fileHashers = make(map[string]*MTHAndOffset)
659         state.isDead = make(chan struct{})
660         if state.maxOnlineTime > 0 {
661                 state.mustFinishAt = state.started.Add(state.maxOnlineTime)
662         }
663         if !state.NoCK {
664                 spCheckerOnce.Do(func() { go SPChecker(state.Ctx) })
665                 go func() {
666                         for job := range state.Ctx.JobsNoCK(state.Node.Id) {
667                                 if job.PktEnc.Nice <= state.Nice {
668                                         spCheckerTasks <- SPCheckerTask{
669                                                 nodeId: state.Node.Id,
670                                                 hsh:    job.HshValue,
671                                                 done:   state.payloads,
672                                         }
673                                 }
674                         }
675                 }()
676         }
677
678         // Remaining handshake payload sending
679         if len(infosPayloads) > 1 {
680                 state.wg.Add(1)
681                 go func() {
682                         for _, payload := range infosPayloads[1:] {
683                                 state.Ctx.LogD(
684                                         "sp-queue-remaining",
685                                         append(les, LE{"Size", int64(len(payload))}),
686                                         func(les LEs) string {
687                                                 return fmt.Sprintf(
688                                                         "SP with %s (nice %s): queuing remaining payload (%s)",
689                                                         state.Node.Name, NicenessFmt(state.Nice),
690                                                         humanize.IBytes(uint64(len(payload))),
691                                                 )
692                                         },
693                                 )
694                                 state.payloads <- payload
695                         }
696                         state.wg.Done()
697                 }()
698         }
699
700         // Processing of first payload and queueing its responses
701         logMsg := func(les LEs) string {
702                 return fmt.Sprintf(
703                         "SP with %s (nice %s): processing first payload (%s)",
704                         state.Node.Name, NicenessFmt(state.Nice),
705                         humanize.IBytes(uint64(len(payload))),
706                 )
707         }
708         state.Ctx.LogD("sp-process", append(les, LE{"Size", int64(len(payload))}), logMsg)
709         replies, err := state.ProcessSP(payload)
710         if err != nil {
711                 state.Ctx.LogE("sp-process", les, err, logMsg)
712                 return err
713         }
714         state.wg.Add(1)
715         go func() {
716                 for _, reply := range replies {
717                         state.Ctx.LogD(
718                                 "sp-queue-reply",
719                                 append(les, LE{"Size", int64(len(reply))}),
720                                 func(les LEs) string {
721                                         return fmt.Sprintf(
722                                                 "SP with %s (nice %s): queuing reply (%s)",
723                                                 state.Node.Name, NicenessFmt(state.Nice),
724                                                 humanize.IBytes(uint64(len(payload))),
725                                         )
726                                 },
727                         )
728                         state.payloads <- reply
729                 }
730                 state.wg.Done()
731         }()
732
733         // Periodic jobs
734         state.wg.Add(1)
735         go func() {
736                 deadlineTicker := time.NewTicker(time.Second)
737                 pingTicker := time.NewTicker(PingTimeout)
738                 for {
739                         select {
740                         case <-state.isDead:
741                                 state.wg.Done()
742                                 deadlineTicker.Stop()
743                                 pingTicker.Stop()
744                                 return
745                         case now := <-deadlineTicker.C:
746                                 if now.Sub(state.RxLastNonPing) >= state.onlineDeadline &&
747                                         now.Sub(state.TxLastNonPing) >= state.onlineDeadline {
748                                         goto Deadlined
749                                 }
750                                 if state.maxOnlineTime > 0 && state.mustFinishAt.Before(now) {
751                                         goto Deadlined
752                                 }
753                                 if now.Sub(state.RxLastSeen) >= 2*PingTimeout {
754                                         goto Deadlined
755                                 }
756                                 break
757                         Deadlined:
758                                 state.SetDead()
759                                 conn.Close()
760                         case now := <-pingTicker.C:
761                                 if now.After(state.TxLastSeen.Add(PingTimeout)) {
762                                         state.wg.Add(1)
763                                         go func() {
764                                                 state.pings <- struct{}{}
765                                                 state.wg.Done()
766                                         }()
767                                 }
768                         }
769                 }
770         }()
771
772         // Spool checker and INFOs sender of appearing files
773         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
774                 state.wg.Add(1)
775                 go func() {
776                         dw, err := state.Ctx.NewDirWatcher(
777                                 filepath.Join(state.Ctx.Spool, state.Node.Id.String(), string(TTx)),
778                                 time.Second,
779                         )
780                         if err != nil {
781                                 state.Ctx.LogE("sp-queue-dir-watch", les, err, logMsg)
782                                 log.Fatalln(err)
783                         }
784                         for {
785                                 select {
786                                 case <-state.isDead:
787                                         dw.Close()
788                                         state.wg.Done()
789                                         return
790                                 case <-dw.C:
791                                         for _, payload := range state.Ctx.infosOur(
792                                                 state.Node.Id,
793                                                 state.Nice,
794                                                 &state.infosOurSeen,
795                                         ) {
796                                                 state.Ctx.LogD(
797                                                         "sp-queue-info",
798                                                         append(les, LE{"Size", int64(len(payload))}),
799                                                         func(les LEs) string {
800                                                                 return fmt.Sprintf(
801                                                                         "SP with %s (nice %s): queuing new info (%s)",
802                                                                         state.Node.Name, NicenessFmt(state.Nice),
803                                                                         humanize.IBytes(uint64(len(payload))),
804                                                                 )
805                                                         },
806                                                 )
807                                                 state.payloads <- payload
808                                         }
809                                 }
810                         }
811                 }()
812         }
813
814         // Sender
815         state.wg.Add(1)
816         go func() {
817                 defer conn.Close()
818                 defer state.SetDead()
819                 defer state.wg.Done()
820                 buf := make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
821                 for {
822                         if state.NotAlive() {
823                                 return
824                         }
825                         var payload []byte
826                         var ping bool
827                         select {
828                         case <-state.pings:
829                                 state.Ctx.LogD("sp-got-ping", les, func(les LEs) string {
830                                         return fmt.Sprintf(
831                                                 "SP with %s (nice %s): got ping",
832                                                 state.Node.Name, NicenessFmt(state.Nice),
833                                         )
834                                 })
835                                 payload = SPPingMarshalized
836                                 ping = true
837                         case payload = <-state.payloads:
838                                 state.Ctx.LogD(
839                                         "sp-got-payload",
840                                         append(les, LE{"Size", int64(len(payload))}),
841                                         func(les LEs) string {
842                                                 return fmt.Sprintf(
843                                                         "SP with %s (nice %s): got payload (%s)",
844                                                         state.Node.Name, NicenessFmt(state.Nice),
845                                                         humanize.IBytes(uint64(len(payload))),
846                                                 )
847                                         },
848                                 )
849                         default:
850                                 state.RLock()
851                                 if len(state.queueTheir) == 0 {
852                                         state.RUnlock()
853                                         time.Sleep(100 * time.Millisecond)
854                                         continue
855                                 }
856                                 freq := state.queueTheir[0].freq
857                                 state.RUnlock()
858                                 if state.txRate > 0 {
859                                         time.Sleep(time.Second / time.Duration(state.txRate))
860                                 }
861                                 pktName := Base32Codec.EncodeToString(freq.Hash[:])
862                                 lesp := append(
863                                         les,
864                                         LE{"XX", string(TTx)},
865                                         LE{"Pkt", pktName},
866                                         LE{"Size", int64(freq.Offset)},
867                                 )
868                                 logMsg := func(les LEs) string {
869                                         return fmt.Sprintf(
870                                                 "SP with %s (nice %s): tx/%s (%s)",
871                                                 state.Node.Name, NicenessFmt(state.Nice),
872                                                 pktName,
873                                                 humanize.IBytes(freq.Offset),
874                                         )
875                                 }
876                                 state.Ctx.LogD("sp-queue", lesp, func(les LEs) string {
877                                         return logMsg(les) + ": queueing"
878                                 })
879                                 pth := filepath.Join(
880                                         state.Ctx.Spool,
881                                         state.Node.Id.String(),
882                                         string(TTx),
883                                         Base32Codec.EncodeToString(freq.Hash[:]),
884                                 )
885                                 state.fdsLock.RLock()
886                                 fdAndFullSize, exists := state.fds[pth]
887                                 state.fdsLock.RUnlock()
888                                 if !exists {
889                                         state.Ctx.LogD("sp-queue-open", lesp, func(les LEs) string {
890                                                 return logMsg(les) + ": opening"
891                                         })
892                                         fd, err := os.Open(pth)
893                                         if err != nil {
894                                                 state.Ctx.LogE("sp-queue-open", lesp, err, func(les LEs) string {
895                                                         return logMsg(les) + ": opening"
896                                                 })
897                                                 return
898                                         }
899                                         fi, err := fd.Stat()
900                                         if err != nil {
901                                                 state.Ctx.LogE("sp-queue-stat", lesp, err, func(les LEs) string {
902                                                         return logMsg(les) + ": stating"
903                                                 })
904                                                 return
905                                         }
906                                         fdAndFullSize = FdAndFullSize{fd: fd, fullSize: fi.Size()}
907                                         state.fdsLock.Lock()
908                                         state.fds[pth] = fdAndFullSize
909                                         state.fdsLock.Unlock()
910                                 }
911                                 fd := fdAndFullSize.fd
912                                 fullSize := fdAndFullSize.fullSize
913                                 lesp = append(lesp, LE{"FullSize", fullSize})
914                                 var bufRead []byte
915                                 if freq.Offset < uint64(fullSize) {
916                                         state.Ctx.LogD("sp-file-seek", lesp, func(les LEs) string {
917                                                 return logMsg(les) + ": seeking"
918                                         })
919                                         if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil {
920                                                 state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
921                                                         return logMsg(les) + ": seeking"
922                                                 })
923                                                 return
924                                         }
925                                         n, err := fd.Read(buf)
926                                         if err != nil {
927                                                 state.Ctx.LogE("sp-file-read", lesp, err, func(les LEs) string {
928                                                         return logMsg(les) + ": reading"
929                                                 })
930                                                 return
931                                         }
932                                         bufRead = buf[:n]
933                                         lesp = append(
934                                                 les,
935                                                 LE{"XX", string(TTx)},
936                                                 LE{"Pkt", pktName},
937                                                 LE{"Size", int64(n)},
938                                                 LE{"FullSize", fullSize},
939                                         )
940                                         state.Ctx.LogD("sp-file-read", lesp, func(les LEs) string {
941                                                 return fmt.Sprintf(
942                                                         "%s: read %s",
943                                                         logMsg(les), humanize.IBytes(uint64(n)),
944                                                 )
945                                         })
946                                 } else {
947                                         state.closeFd(pth)
948                                 }
949                                 payload = MarshalSP(SPTypeFile, SPFile{
950                                         Hash:    freq.Hash,
951                                         Offset:  freq.Offset,
952                                         Payload: bufRead,
953                                 })
954                                 ourSize := freq.Offset + uint64(len(bufRead))
955                                 lesp = append(
956                                         les,
957                                         LE{"XX", string(TTx)},
958                                         LE{"Pkt", pktName},
959                                         LE{"Size", int64(ourSize)},
960                                         LE{"FullSize", fullSize},
961                                 )
962                                 if state.Ctx.ShowPrgrs {
963                                         state.progressBars[pktName] = struct{}{}
964                                         Progress("Tx", lesp)
965                                 }
966                                 if ourSize == uint64(fullSize) {
967                                         state.closeFd(pth)
968                                         state.Ctx.LogD("sp-file-finished", lesp, func(les LEs) string {
969                                                 return logMsg(les) + ": finished"
970                                         })
971                                         if state.Ctx.ShowPrgrs {
972                                                 delete(state.progressBars, pktName)
973                                         }
974                                 }
975                                 state.Lock()
976                                 for i, q := range state.queueTheir {
977                                         if *q.freq.Hash != *freq.Hash {
978                                                 continue
979                                         }
980                                         if ourSize == uint64(fullSize) {
981                                                 state.queueTheir = append(
982                                                         state.queueTheir[:i],
983                                                         state.queueTheir[i+1:]...,
984                                                 )
985                                         } else {
986                                                 q.freq.Offset = ourSize
987                                         }
988                                         break
989                                 }
990                                 state.Unlock()
991                         }
992                         logMsg := func(les LEs) string {
993                                 return fmt.Sprintf(
994                                         "SP with %s (nice %s): sending %s",
995                                         state.Node.Name, NicenessFmt(state.Nice),
996                                         humanize.IBytes(uint64(len(payload))),
997                                 )
998                         }
999                         state.Ctx.LogD("sp-sending", append(les, LE{"Size", int64(len(payload))}), logMsg)
1000                         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline))
1001                         ct, err := state.csOur.Encrypt(nil, nil, payload)
1002                         if err != nil {
1003                                 state.Ctx.LogE("sp-encrypting", les, err, logMsg)
1004                                 return
1005                         }
1006                         if err := state.WriteSP(conn, ct, ping); err != nil {
1007                                 state.Ctx.LogE("sp-sending", les, err, logMsg)
1008                                 return
1009                         }
1010                 }
1011         }()
1012
1013         // Receiver
1014         state.wg.Add(1)
1015         go func() {
1016                 for {
1017                         if state.NotAlive() {
1018                                 break
1019                         }
1020                         logMsg := func(les LEs) string {
1021                                 return fmt.Sprintf(
1022                                         "SP with %s (nice %s): waiting for payload",
1023                                         state.Node.Name, NicenessFmt(state.Nice),
1024                                 )
1025                         }
1026                         state.Ctx.LogD("sp-recv-wait", les, logMsg)
1027                         conn.SetReadDeadline(time.Now().Add(DefaultDeadline))
1028                         payload, err := state.ReadSP(conn)
1029                         if err != nil {
1030                                 if err == io.EOF {
1031                                         break
1032                                 }
1033                                 unmarshalErr := err.(*xdr.UnmarshalError)
1034                                 if os.IsTimeout(unmarshalErr.Err) {
1035                                         continue
1036                                 }
1037                                 if unmarshalErr.ErrorCode == xdr.ErrIO {
1038                                         break
1039                                 }
1040                                 state.Ctx.LogE("sp-recv-wait", les, err, logMsg)
1041                                 break
1042                         }
1043                         logMsg = func(les LEs) string {
1044                                 return fmt.Sprintf(
1045                                         "SP with %s (nice %s): payload (%s)",
1046                                         state.Node.Name, NicenessFmt(state.Nice),
1047                                         humanize.IBytes(uint64(len(payload))),
1048                                 )
1049                         }
1050                         state.Ctx.LogD(
1051                                 "sp-recv-got",
1052                                 append(les, LE{"Size", int64(len(payload))}),
1053                                 func(les LEs) string { return logMsg(les) + ": got" },
1054                         )
1055                         payload, err = state.csTheir.Decrypt(nil, nil, payload)
1056                         if err != nil {
1057                                 state.Ctx.LogE("sp-recv-got", les, err, func(les LEs) string {
1058                                         return logMsg(les) + ": got"
1059                                 })
1060                                 break
1061                         }
1062                         state.Ctx.LogD(
1063                                 "sp-recv-process",
1064                                 append(les, LE{"Size", int64(len(payload))}),
1065                                 func(les LEs) string {
1066                                         return logMsg(les) + ": processing"
1067                                 },
1068                         )
1069                         replies, err := state.ProcessSP(payload)
1070                         if err != nil {
1071                                 state.Ctx.LogE("sp-recv-process", les, err, func(les LEs) string {
1072                                         return logMsg(les) + ": processing"
1073                                 })
1074                                 break
1075                         }
1076                         state.wg.Add(1)
1077                         go func() {
1078                                 for _, reply := range replies {
1079                                         state.Ctx.LogD(
1080                                                 "sp-recv-reply",
1081                                                 append(les[:len(les)-1], LE{"Size", int64(len(reply))}),
1082                                                 func(les LEs) string {
1083                                                         return fmt.Sprintf(
1084                                                                 "SP with %s (nice %s): queuing reply (%s)",
1085                                                                 state.Node.Name, NicenessFmt(state.Nice),
1086                                                                 humanize.IBytes(uint64(len(reply))),
1087                                                         )
1088                                                 },
1089                                         )
1090                                         state.payloads <- reply
1091                                 }
1092                                 state.wg.Done()
1093                         }()
1094                         if state.rxRate > 0 {
1095                                 time.Sleep(time.Second / time.Duration(state.rxRate))
1096                         }
1097                 }
1098                 state.SetDead()
1099                 state.wg.Done()
1100                 state.SetDead()
1101                 conn.Close()
1102         }()
1103
1104         return nil
1105 }
1106
1107 func (state *SPState) Wait() {
1108         state.wg.Wait()
1109         close(state.payloads)
1110         close(state.pings)
1111         state.Duration = time.Now().Sub(state.started)
1112         state.dirUnlock()
1113         state.RxSpeed = state.RxBytes
1114         state.TxSpeed = state.TxBytes
1115         rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
1116         txDuration := int64(state.TxLastSeen.Sub(state.started).Seconds())
1117         if rxDuration > 0 {
1118                 state.RxSpeed = state.RxBytes / rxDuration
1119         }
1120         if txDuration > 0 {
1121                 state.TxSpeed = state.TxBytes / txDuration
1122         }
1123         for _, s := range state.fds {
1124                 s.fd.Close()
1125         }
1126         for pktName := range state.progressBars {
1127                 ProgressKill(pktName)
1128         }
1129 }
1130
1131 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
1132         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
1133         r := bytes.NewReader(payload)
1134         var err error
1135         var replies [][]byte
1136         var infosGot bool
1137         for r.Len() > 0 {
1138                 state.Ctx.LogD("sp-process-unmarshal", les, func(les LEs) string {
1139                         return fmt.Sprintf(
1140                                 "SP with %s (nice %s): unmarshaling header",
1141                                 state.Node.Name, NicenessFmt(state.Nice),
1142                         )
1143                 })
1144                 var head SPHead
1145                 if _, err = xdr.Unmarshal(r, &head); err != nil {
1146                         state.Ctx.LogE("sp-process-unmarshal", les, err, func(les LEs) string {
1147                                 return fmt.Sprintf(
1148                                         "SP with %s (nice %s): unmarshaling header",
1149                                         state.Node.Name, NicenessFmt(state.Nice),
1150                                 )
1151                         })
1152                         return nil, err
1153                 }
1154                 if head.Type != SPTypePing {
1155                         state.RxLastNonPing = state.RxLastSeen
1156                 }
1157                 switch head.Type {
1158                 case SPTypeHalt:
1159                         state.Ctx.LogD(
1160                                 "sp-process-halt",
1161                                 append(les, LE{"Type", "halt"}), func(les LEs) string {
1162                                         return fmt.Sprintf(
1163                                                 "SP with %s (nice %s): got HALT",
1164                                                 state.Node.Name, NicenessFmt(state.Nice),
1165                                         )
1166                                 },
1167                         )
1168                         state.Lock()
1169                         state.queueTheir = nil
1170                         state.Unlock()
1171
1172                 case SPTypePing:
1173                         state.Ctx.LogD(
1174                                 "sp-process-ping",
1175                                 append(les, LE{"Type", "ping"}),
1176                                 func(les LEs) string {
1177                                         return fmt.Sprintf(
1178                                                 "SP with %s (nice %s): got PING",
1179                                                 state.Node.Name, NicenessFmt(state.Nice),
1180                                         )
1181                                 },
1182                         )
1183
1184                 case SPTypeInfo:
1185                         infosGot = true
1186                         lesp := append(les, LE{"Type", "info"})
1187                         state.Ctx.LogD(
1188                                 "sp-process-info-unmarshal", lesp,
1189                                 func(les LEs) string {
1190                                         return fmt.Sprintf(
1191                                                 "SP with %s (nice %s): unmarshaling INFO",
1192                                                 state.Node.Name, NicenessFmt(state.Nice),
1193                                         )
1194                                 },
1195                         )
1196                         var info SPInfo
1197                         if _, err = xdr.Unmarshal(r, &info); err != nil {
1198                                 state.Ctx.LogE(
1199                                         "sp-process-info-unmarshal", lesp, err,
1200                                         func(les LEs) string {
1201                                                 return fmt.Sprintf(
1202                                                         "SP with %s (nice %s): unmarshaling INFO",
1203                                                         state.Node.Name, NicenessFmt(state.Nice),
1204                                                 )
1205                                         },
1206                                 )
1207                                 return nil, err
1208                         }
1209                         pktName := Base32Codec.EncodeToString(info.Hash[:])
1210                         lesp = append(
1211                                 lesp,
1212                                 LE{"Pkt", pktName},
1213                                 LE{"Size", int64(info.Size)},
1214                                 LE{"PktNice", int(info.Nice)},
1215                         )
1216                         logMsg := func(les LEs) string {
1217                                 return fmt.Sprintf(
1218                                         "SP with %s (nice %s): INFO %s (%s) nice %s",
1219                                         state.Node.Name, NicenessFmt(state.Nice),
1220                                         pktName,
1221                                         humanize.IBytes(info.Size),
1222                                         NicenessFmt(info.Nice),
1223                                 )
1224                         }
1225                         if !state.listOnly && info.Nice > state.Nice {
1226                                 state.Ctx.LogD("sp-process-info-too-nice", lesp, func(les LEs) string {
1227                                         return logMsg(les) + ": too nice"
1228                                 })
1229                                 continue
1230                         }
1231                         state.Ctx.LogD("sp-process-info-got", lesp, func(les LEs) string {
1232                                 return logMsg(les) + ": received"
1233                         })
1234                         if !state.listOnly && state.xxOnly == TTx {
1235                                 continue
1236                         }
1237                         state.Lock()
1238                         state.infosTheir[*info.Hash] = &info
1239                         state.Unlock()
1240                         state.Ctx.LogD("sp-process-info-stat", lesp, func(les LEs) string {
1241                                 return logMsg(les) + ": stating part"
1242                         })
1243                         pktPath := filepath.Join(
1244                                 state.Ctx.Spool,
1245                                 state.Node.Id.String(),
1246                                 string(TRx),
1247                                 Base32Codec.EncodeToString(info.Hash[:]),
1248                         )
1249                         logMsg = func(les LEs) string {
1250                                 return fmt.Sprintf(
1251                                         "Packet %s (%s) (nice %s)",
1252                                         pktName,
1253                                         humanize.IBytes(info.Size),
1254                                         NicenessFmt(info.Nice),
1255                                 )
1256                         }
1257                         if _, err = os.Stat(pktPath); err == nil {
1258                                 state.Ctx.LogI("sp-info-done", lesp, func(les LEs) string {
1259                                         return logMsg(les) + ": already done"
1260                                 })
1261                                 if !state.listOnly {
1262                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
1263                                 }
1264                                 continue
1265                         }
1266                         if _, err = os.Stat(filepath.Join(
1267                                 state.Ctx.Spool, state.Node.Id.String(), string(TRx),
1268                                 SeenDir, Base32Codec.EncodeToString(info.Hash[:]),
1269                         )); err == nil {
1270                                 state.Ctx.LogI("sp-info-seen", lesp, func(les LEs) string {
1271                                         return logMsg(les) + ": already seen"
1272                                 })
1273                                 if !state.listOnly {
1274                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
1275                                 }
1276                                 continue
1277                         }
1278                         if _, err = os.Stat(pktPath + NoCKSuffix); err == nil {
1279                                 state.Ctx.LogI("sp-info-nock", lesp, func(les LEs) string {
1280                                         return logMsg(les) + ": still not checksummed"
1281                                 })
1282                                 continue
1283                         }
1284                         fi, err := os.Stat(pktPath + PartSuffix)
1285                         var offset int64
1286                         if err == nil {
1287                                 offset = fi.Size()
1288                         }
1289                         if !state.Ctx.IsEnoughSpace(int64(info.Size) - offset) {
1290                                 state.Ctx.LogI("sp-info-no-space", lesp, func(les LEs) string {
1291                                         return logMsg(les) + ": not enough space"
1292                                 })
1293                                 continue
1294                         }
1295                         state.Ctx.LogI(
1296                                 "sp-info",
1297                                 append(lesp, LE{"Offset", offset}),
1298                                 func(les LEs) string {
1299                                         return fmt.Sprintf(
1300                                                 "%s: %d%%", logMsg(les), 100*uint64(offset)/info.Size,
1301                                         )
1302                                 },
1303                         )
1304                         if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) {
1305                                 replies = append(replies, MarshalSP(
1306                                         SPTypeFreq,
1307                                         SPFreq{info.Hash, uint64(offset)},
1308                                 ))
1309                         }
1310
1311                 case SPTypeFile:
1312                         lesp := append(les, LE{"Type", "file"})
1313                         state.Ctx.LogD("sp-process-file", lesp, func(les LEs) string {
1314                                 return fmt.Sprintf(
1315                                         "SP with %s (nice %s): unmarshaling FILE",
1316                                         state.Node.Name, NicenessFmt(state.Nice),
1317                                 )
1318                         })
1319                         var file SPFile
1320                         if _, err = xdr.Unmarshal(r, &file); err != nil {
1321                                 state.Ctx.LogE("sp-process-file", lesp, err, func(les LEs) string {
1322                                         return fmt.Sprintf(
1323                                                 "SP with %s (nice %s): unmarshaling FILE",
1324                                                 state.Node.Name, NicenessFmt(state.Nice),
1325                                         )
1326                                 })
1327                                 return nil, err
1328                         }
1329                         pktName := Base32Codec.EncodeToString(file.Hash[:])
1330                         lesp = append(
1331                                 lesp,
1332                                 LE{"XX", string(TRx)},
1333                                 LE{"Pkt", pktName},
1334                                 LE{"Size", int64(len(file.Payload))},
1335                         )
1336                         logMsg := func(les LEs) string {
1337                                 return fmt.Sprintf(
1338                                         "Got packet %s (%s)",
1339                                         pktName, humanize.IBytes(uint64(len(file.Payload))),
1340                                 )
1341                         }
1342                         fullsize := int64(0)
1343                         state.RLock()
1344                         infoTheir := state.infosTheir[*file.Hash]
1345                         state.RUnlock()
1346                         if infoTheir == nil {
1347                                 state.Ctx.LogE("sp-file-open", lesp, err, func(les LEs) string {
1348                                         return logMsg(les) + ": unknown file"
1349                                 })
1350                                 continue
1351                         }
1352                         fullsize = int64(infoTheir.Size)
1353                         lesp = append(lesp, LE{"FullSize", fullsize})
1354                         dirToSync := filepath.Join(
1355                                 state.Ctx.Spool,
1356                                 state.Node.Id.String(),
1357                                 string(TRx),
1358                         )
1359                         filePath := filepath.Join(dirToSync, pktName)
1360                         filePathPart := filePath + PartSuffix
1361                         state.Ctx.LogD("sp-file-open", lesp, func(les LEs) string {
1362                                 return logMsg(les) + ": opening part"
1363                         })
1364                         state.fdsLock.RLock()
1365                         fdAndFullSize, exists := state.fds[filePathPart]
1366                         state.fdsLock.RUnlock()
1367                         hasherAndOffset := state.fileHashers[filePath]
1368                         var fd *os.File
1369                         if exists {
1370                                 fd = fdAndFullSize.fd
1371                         } else {
1372                                 fd, err = os.OpenFile(
1373                                         filePathPart,
1374                                         os.O_RDWR|os.O_CREATE,
1375                                         os.FileMode(0666),
1376                                 )
1377                                 if err != nil {
1378                                         state.Ctx.LogE("sp-file-open", lesp, err, func(les LEs) string {
1379                                                 return logMsg(les) + ": opening part"
1380                                         })
1381                                         return nil, err
1382                                 }
1383                                 state.fdsLock.Lock()
1384                                 state.fds[filePathPart] = FdAndFullSize{fd: fd}
1385                                 state.fdsLock.Unlock()
1386                                 if !state.NoCK {
1387                                         hasherAndOffset = &MTHAndOffset{
1388                                                 mth:    MTHNew(fullsize, int64(file.Offset)),
1389                                                 offset: file.Offset,
1390                                         }
1391                                         state.fileHashers[filePath] = hasherAndOffset
1392                                 }
1393                         }
1394                         state.Ctx.LogD(
1395                                 "sp-file-seek",
1396                                 append(lesp, LE{"Offset", file.Offset}),
1397                                 func(les LEs) string {
1398                                         return fmt.Sprintf("%s: seeking %d", logMsg(les), file.Offset)
1399                                 })
1400                         if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
1401                                 state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
1402                                         return logMsg(les) + ": seeking"
1403                                 })
1404                                 state.closeFd(filePathPart)
1405                                 return nil, err
1406                         }
1407                         state.Ctx.LogD("sp-file-write", lesp, func(les LEs) string {
1408                                 return logMsg(les) + ": writing"
1409                         })
1410                         if _, err = fd.Write(file.Payload); err != nil {
1411                                 state.Ctx.LogE("sp-file-write", lesp, err, func(les LEs) string {
1412                                         return logMsg(les) + ": writing"
1413                                 })
1414                                 state.closeFd(filePathPart)
1415                                 return nil, err
1416                         }
1417                         if hasherAndOffset != nil {
1418                                 if hasherAndOffset.offset == file.Offset {
1419                                         if _, err = hasherAndOffset.mth.Write(file.Payload); err != nil {
1420                                                 panic(err)
1421                                         }
1422                                         hasherAndOffset.offset += uint64(len(file.Payload))
1423                                 } else {
1424                                         state.Ctx.LogE(
1425                                                 "sp-file-offset-differs", lesp, errors.New("offset differs"),
1426                                                 func(les LEs) string {
1427                                                         return logMsg(les) + ": deleting hasher"
1428                                                 },
1429                                         )
1430                                         delete(state.fileHashers, filePath)
1431                                         hasherAndOffset = nil
1432                                 }
1433                         }
1434                         ourSize := int64(file.Offset + uint64(len(file.Payload)))
1435                         lesp[len(lesp)-2].V = ourSize
1436                         if state.Ctx.ShowPrgrs {
1437                                 state.progressBars[pktName] = struct{}{}
1438                                 Progress("Rx", lesp)
1439                         }
1440                         if fullsize != ourSize {
1441                                 continue
1442                         }
1443                         if state.Ctx.ShowPrgrs {
1444                                 delete(state.progressBars, pktName)
1445                         }
1446                         logMsg = func(les LEs) string {
1447                                 return fmt.Sprintf(
1448                                         "Got packet %s %d%% (%s / %s)",
1449                                         pktName, 100*ourSize/fullsize,
1450                                         humanize.IBytes(uint64(ourSize)),
1451                                         humanize.IBytes(uint64(fullsize)),
1452                                 )
1453                         }
1454                         err = fd.Sync()
1455                         if err != nil {
1456                                 state.Ctx.LogE("sp-file-sync", lesp, err, func(les LEs) string {
1457                                         return logMsg(les) + ": syncing"
1458                                 })
1459                                 state.closeFd(filePathPart)
1460                                 continue
1461                         }
1462                         if hasherAndOffset != nil {
1463                                 delete(state.fileHashers, filePath)
1464                                 if hasherAndOffset.mth.PreaddSize() == 0 {
1465                                         if bytes.Compare(hasherAndOffset.mth.Sum(nil), file.Hash[:]) != 0 {
1466                                                 state.Ctx.LogE(
1467                                                         "sp-file-bad-checksum", lesp,
1468                                                         errors.New("checksum mismatch"),
1469                                                         logMsg,
1470                                                 )
1471                                                 state.closeFd(filePathPart)
1472                                                 continue
1473                                         }
1474                                         if err = os.Rename(filePathPart, filePath); err != nil {
1475                                                 state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string {
1476                                                         return logMsg(les) + ": renaming"
1477                                                 })
1478                                                 state.closeFd(filePathPart)
1479                                                 continue
1480                                         }
1481                                         if err = DirSync(dirToSync); err != nil {
1482                                                 state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string {
1483                                                         return logMsg(les) + ": dirsyncing"
1484                                                 })
1485                                                 state.closeFd(filePathPart)
1486                                                 continue
1487                                         }
1488                                         state.Ctx.LogI("sp-file-done", lesp, func(les LEs) string {
1489                                                 return logMsg(les) + ": done"
1490                                         })
1491                                         state.wg.Add(1)
1492                                         go func() {
1493                                                 state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
1494                                                 state.wg.Done()
1495                                         }()
1496                                         state.Lock()
1497                                         delete(state.infosTheir, *file.Hash)
1498                                         state.Unlock()
1499                                         if !state.Ctx.HdrUsage {
1500                                                 continue
1501                                         }
1502                                         if _, err = fd.Seek(0, io.SeekStart); err != nil {
1503                                                 state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
1504                                                         return logMsg(les) + ": seeking"
1505                                                 })
1506                                                 state.closeFd(filePathPart)
1507                                                 continue
1508                                         }
1509                                         _, pktEncRaw, err := state.Ctx.HdrRead(fd)
1510                                         state.closeFd(filePathPart)
1511                                         if err != nil {
1512                                                 state.Ctx.LogE("sp-file-hdr-read", lesp, err, func(les LEs) string {
1513                                                         return logMsg(les) + ": HdrReading"
1514                                                 })
1515                                                 continue
1516                                         }
1517                                         state.Ctx.HdrWrite(pktEncRaw, filePath)
1518                                         continue
1519                                 }
1520                         }
1521                         state.closeFd(filePathPart)
1522                         if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil {
1523                                 state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string {
1524                                         return logMsg(les) + ": renaming"
1525                                 })
1526                                 continue
1527                         }
1528                         if err = DirSync(dirToSync); err != nil {
1529                                 state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string {
1530                                         return logMsg(les) + ": dirsyncing"
1531                                 })
1532                                 continue
1533                         }
1534                         state.Ctx.LogI("sp-file-downloaded", lesp, func(les LEs) string {
1535                                 return logMsg(les) + ": downloaded"
1536                         })
1537                         state.Lock()
1538                         delete(state.infosTheir, *file.Hash)
1539                         state.Unlock()
1540                         go func() {
1541                                 t := SPCheckerTask{
1542                                         nodeId: state.Node.Id,
1543                                         hsh:    file.Hash,
1544                                         done:   state.payloads,
1545                                 }
1546                                 if hasherAndOffset != nil {
1547                                         t.mth = hasherAndOffset.mth
1548                                 }
1549                                 spCheckerTasks <- t
1550                         }()
1551
1552                 case SPTypeDone:
1553                         lesp := append(les, LE{"Type", "done"})
1554                         state.Ctx.LogD("sp-process-done-unmarshal", lesp, func(les LEs) string {
1555                                 return fmt.Sprintf(
1556                                         "SP with %s (nice %s): unmarshaling DONE",
1557                                         state.Node.Name, NicenessFmt(state.Nice),
1558                                 )
1559                         })
1560                         var done SPDone
1561                         if _, err = xdr.Unmarshal(r, &done); err != nil {
1562                                 state.Ctx.LogE("sp-process-done-unmarshal", lesp, err, func(les LEs) string {
1563                                         return fmt.Sprintf(
1564                                                 "SP with %s (nice %s): unmarshaling DONE",
1565                                                 state.Node.Name, NicenessFmt(state.Nice),
1566                                         )
1567                                 })
1568                                 return nil, err
1569                         }
1570                         pktName := Base32Codec.EncodeToString(done.Hash[:])
1571                         lesp = append(lesp, LE{"Pkt", pktName}, LE{"XX", string(TTx)})
1572                         logMsg := func(les LEs) string {
1573                                 return fmt.Sprintf(
1574                                         "SP with %s (nice %s): DONE: removing %s",
1575                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1576                                 )
1577                         }
1578                         state.Ctx.LogD("sp-done", lesp, logMsg)
1579                         pth := filepath.Join(
1580                                 state.Ctx.Spool,
1581                                 state.Node.Id.String(),
1582                                 string(TTx),
1583                                 pktName,
1584                         )
1585                         if err = os.Remove(pth); err == nil {
1586                                 state.Ctx.LogI("sp-done", lesp, func(les LEs) string {
1587                                         return fmt.Sprintf("Packet %s is sent", pktName)
1588                                 })
1589                                 if state.Ctx.HdrUsage {
1590                                         os.Remove(JobPath2Hdr(pth))
1591                                 }
1592                         } else {
1593                                 state.Ctx.LogE("sp-done", lesp, err, logMsg)
1594                         }
1595
1596                 case SPTypeFreq:
1597                         lesp := append(les, LE{"Type", "freq"})
1598                         state.Ctx.LogD("sp-process-freq", lesp, func(les LEs) string {
1599                                 return fmt.Sprintf(
1600                                         "SP with %s (nice %s): unmarshaling FREQ",
1601                                         state.Node.Name, NicenessFmt(state.Nice),
1602                                 )
1603                         })
1604                         var freq SPFreq
1605                         if _, err = xdr.Unmarshal(r, &freq); err != nil {
1606                                 state.Ctx.LogE("sp-process-freq", lesp, err, func(les LEs) string {
1607                                         return fmt.Sprintf(
1608                                                 "SP with %s (nice %s): unmarshaling FREQ",
1609                                                 state.Node.Name, NicenessFmt(state.Nice),
1610                                         )
1611                                 })
1612                                 return nil, err
1613                         }
1614                         pktName := Base32Codec.EncodeToString(freq.Hash[:])
1615                         lesp = append(lesp, LE{"Pkt", pktName}, LE{"Offset", freq.Offset})
1616                         state.Ctx.LogD("sp-process-freq-queueing", lesp, func(les LEs) string {
1617                                 return fmt.Sprintf(
1618                                         "SP with %s (nice %s): FREQ %s: queuing",
1619                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1620                                 )
1621                         })
1622                         nice, exists := state.infosOurSeen[*freq.Hash]
1623                         if exists {
1624                                 if state.onlyPkts == nil || !state.onlyPkts[*freq.Hash] {
1625                                         state.Lock()
1626                                         insertIdx := 0
1627                                         var freqWithNice *FreqWithNice
1628                                         for insertIdx, freqWithNice = range state.queueTheir {
1629                                                 if freqWithNice.nice > nice {
1630                                                         break
1631                                                 }
1632                                         }
1633                                         state.queueTheir = append(state.queueTheir, nil)
1634                                         copy(state.queueTheir[insertIdx+1:], state.queueTheir[insertIdx:])
1635                                         state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice}
1636                                         state.Unlock()
1637                                 } else {
1638                                         state.Ctx.LogD("sp-process-freq-skip", lesp, func(les LEs) string {
1639                                                 return fmt.Sprintf(
1640                                                         "SP with %s (nice %s): FREQ %s: skipping",
1641                                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1642                                                 )
1643                                         })
1644                                 }
1645                         } else {
1646                                 state.Ctx.LogD("sp-process-freq-unknown", lesp, func(les LEs) string {
1647                                         return fmt.Sprintf(
1648                                                 "SP with %s (nice %s): FREQ %s: unknown",
1649                                                 state.Node.Name, NicenessFmt(state.Nice), pktName,
1650                                         )
1651                                 })
1652                         }
1653
1654                 default:
1655                         state.Ctx.LogE(
1656                                 "sp-process-type-unknown",
1657                                 append(les, LE{"Type", head.Type}),
1658                                 errors.New("unknown type"),
1659                                 func(les LEs) string {
1660                                         return fmt.Sprintf(
1661                                                 "SP with %s (nice %s): %d",
1662                                                 state.Node.Name, NicenessFmt(state.Nice), head.Type,
1663                                         )
1664                                 },
1665                         )
1666                         return nil, BadPktType
1667                 }
1668         }
1669
1670         if infosGot {
1671                 var pkts int
1672                 var size uint64
1673                 state.RLock()
1674                 for _, info := range state.infosTheir {
1675                         pkts++
1676                         size += info.Size
1677                 }
1678                 state.RUnlock()
1679                 state.Ctx.LogI("sp-infos-rx", LEs{
1680                         {"XX", string(TRx)},
1681                         {"Node", state.Node.Id},
1682                         {"Pkts", pkts},
1683                         {"Size", int64(size)},
1684                 }, func(les LEs) string {
1685                         return fmt.Sprintf(
1686                                 "%s has got for us: %d packets, %s",
1687                                 state.Node.Name, pkts, humanize.IBytes(size),
1688                         )
1689                 })
1690         }
1691         return payloadsSplit(replies), nil
1692 }
1693
1694 func SPChecker(ctx *Ctx) {
1695         for t := range spCheckerTasks {
1696                 pktName := Base32Codec.EncodeToString(t.hsh[:])
1697                 les := LEs{
1698                         {"XX", string(TRx)},
1699                         {"Node", t.nodeId},
1700                         {"Pkt", pktName},
1701                 }
1702                 SPCheckerWg.Add(1)
1703                 ctx.LogD("sp-checker", les, func(les LEs) string {
1704                         return fmt.Sprintf("Checksumming %s/rx/%s", ctx.NodeName(t.nodeId), pktName)
1705                 })
1706                 size, err := ctx.CheckNoCK(t.nodeId, t.hsh, t.mth)
1707                 les = append(les, LE{"Size", size})
1708                 if err != nil {
1709                         ctx.LogE("sp-checker", les, err, func(les LEs) string {
1710                                 return fmt.Sprintf(
1711                                         "Checksumming %s/rx/%s (%s)", ctx.NodeName(t.nodeId), pktName,
1712                                         humanize.IBytes(uint64(size)),
1713                                 )
1714                         })
1715                         SPCheckerWg.Done()
1716                         continue
1717                 }
1718                 ctx.LogI("sp-checker-done", les, func(les LEs) string {
1719                         return fmt.Sprintf(
1720                                 "Packet %s is retreived (%s)",
1721                                 pktName, humanize.IBytes(uint64(size)),
1722                         )
1723                 })
1724                 SPCheckerWg.Done()
1725                 go func(t SPCheckerTask) {
1726                         defer func() { recover() }()
1727                         t.done <- MarshalSP(SPTypeDone, SPDone{t.hsh})
1728                 }(t)
1729         }
1730 }