]> Cypherpunks.ru repositories - nncp.git/blob - src/sp.go
fsnotify usage
[nncp.git] / src / sp.go
1 /*
2 NNCP -- Node to Node copy, utilities for store-and-forward data exchange
3 Copyright (C) 2016-2021 Sergey Matveev <stargrave@stargrave.org>
4
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, version 3 of the License.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 package nncp
19
20 import (
21         "bytes"
22         "crypto/subtle"
23         "errors"
24         "fmt"
25         "io"
26         "log"
27         "os"
28         "path/filepath"
29         "sort"
30         "sync"
31         "time"
32
33         xdr "github.com/davecgh/go-xdr/xdr2"
34         "github.com/dustin/go-humanize"
35         "github.com/flynn/noise"
36 )
37
38 const (
39         MaxSPSize      = 1<<16 - 256
40         PartSuffix     = ".part"
41         SPHeadOverhead = 4
42 )
43
44 type MTHAndOffset struct {
45         mth    MTH
46         offset uint64
47 }
48
49 type SPCheckerTask struct {
50         nodeId *NodeId
51         hsh    *[MTHSize]byte
52         mth    MTH
53         done   chan []byte
54 }
55
56 var (
57         SPInfoOverhead    int
58         SPFreqOverhead    int
59         SPFileOverhead    int
60         SPHaltMarshalized []byte
61         SPPingMarshalized []byte
62
63         NoiseCipherSuite noise.CipherSuite = noise.NewCipherSuite(
64                 noise.DH25519,
65                 noise.CipherChaChaPoly,
66                 noise.HashBLAKE2b,
67         )
68
69         DefaultDeadline = 10 * time.Second
70         PingTimeout     = time.Minute
71
72         spCheckerTasks chan SPCheckerTask
73         SPCheckerWg    sync.WaitGroup
74         spCheckerOnce  sync.Once
75 )
76
77 type FdAndFullSize struct {
78         fd       *os.File
79         fullSize int64
80 }
81
82 type SPType uint8
83
84 const (
85         SPTypeInfo SPType = iota
86         SPTypeFreq SPType = iota
87         SPTypeFile SPType = iota
88         SPTypeDone SPType = iota
89         SPTypeHalt SPType = iota
90         SPTypePing SPType = iota
91 )
92
93 type SPHead struct {
94         Type SPType
95 }
96
97 type SPInfo struct {
98         Nice uint8
99         Size uint64
100         Hash *[MTHSize]byte
101 }
102
103 type SPFreq struct {
104         Hash   *[MTHSize]byte
105         Offset uint64
106 }
107
108 type SPFile struct {
109         Hash    *[MTHSize]byte
110         Offset  uint64
111         Payload []byte
112 }
113
114 type SPDone struct {
115         Hash *[MTHSize]byte
116 }
117
118 type SPRaw struct {
119         Magic   [8]byte
120         Payload []byte
121 }
122
123 type FreqWithNice struct {
124         freq *SPFreq
125         nice uint8
126 }
127
128 type ConnDeadlined interface {
129         io.ReadWriteCloser
130         SetReadDeadline(t time.Time) error
131         SetWriteDeadline(t time.Time) error
132 }
133
134 func init() {
135         var buf bytes.Buffer
136         spHead := SPHead{Type: SPTypeHalt}
137         if _, err := xdr.Marshal(&buf, spHead); err != nil {
138                 panic(err)
139         }
140         SPHaltMarshalized = make([]byte, SPHeadOverhead)
141         copy(SPHaltMarshalized, buf.Bytes())
142         buf.Reset()
143
144         spHead = SPHead{Type: SPTypePing}
145         if _, err := xdr.Marshal(&buf, spHead); err != nil {
146                 panic(err)
147         }
148         SPPingMarshalized = make([]byte, SPHeadOverhead)
149         copy(SPPingMarshalized, buf.Bytes())
150         buf.Reset()
151
152         spInfo := SPInfo{Nice: 123, Size: 123, Hash: new([MTHSize]byte)}
153         if _, err := xdr.Marshal(&buf, spInfo); err != nil {
154                 panic(err)
155         }
156         SPInfoOverhead = buf.Len()
157         buf.Reset()
158
159         spFreq := SPFreq{Hash: new([MTHSize]byte), Offset: 123}
160         if _, err := xdr.Marshal(&buf, spFreq); err != nil {
161                 panic(err)
162         }
163         SPFreqOverhead = buf.Len()
164         buf.Reset()
165
166         spFile := SPFile{Hash: new([MTHSize]byte), Offset: 123}
167         if _, err := xdr.Marshal(&buf, spFile); err != nil {
168                 panic(err)
169         }
170         SPFileOverhead = buf.Len()
171         spCheckerTasks = make(chan SPCheckerTask)
172 }
173
174 func MarshalSP(typ SPType, sp interface{}) []byte {
175         var buf bytes.Buffer
176         if _, err := xdr.Marshal(&buf, SPHead{typ}); err != nil {
177                 panic(err)
178         }
179         if _, err := xdr.Marshal(&buf, sp); err != nil {
180                 panic(err)
181         }
182         return buf.Bytes()
183 }
184
185 func payloadsSplit(payloads [][]byte) [][]byte {
186         var outbounds [][]byte
187         outbound := make([]byte, 0, MaxSPSize)
188         for i, payload := range payloads {
189                 outbound = append(outbound, payload...)
190                 if i+1 < len(payloads) && len(outbound)+len(payloads[i+1]) > MaxSPSize {
191                         outbounds = append(outbounds, outbound)
192                         outbound = make([]byte, 0, MaxSPSize)
193                 }
194         }
195         if len(outbound) > 0 {
196                 outbounds = append(outbounds, outbound)
197         }
198         return outbounds
199 }
200
201 type SPState struct {
202         Ctx            *Ctx
203         Node           *Node
204         Nice           uint8
205         NoCK           bool
206         onlineDeadline time.Duration
207         maxOnlineTime  time.Duration
208         hs             *noise.HandshakeState
209         csOur          *noise.CipherState
210         csTheir        *noise.CipherState
211         payloads       chan []byte
212         pings          chan struct{}
213         infosTheir     map[[MTHSize]byte]*SPInfo
214         infosOurSeen   map[[MTHSize]byte]uint8
215         queueTheir     []*FreqWithNice
216         wg             sync.WaitGroup
217         RxBytes        int64
218         RxLastSeen     time.Time
219         RxLastNonPing  time.Time
220         TxBytes        int64
221         TxLastSeen     time.Time
222         TxLastNonPing  time.Time
223         started        time.Time
224         mustFinishAt   time.Time
225         Duration       time.Duration
226         RxSpeed        int64
227         TxSpeed        int64
228         rxLock         *os.File
229         txLock         *os.File
230         xxOnly         TRxTx
231         rxRate         int
232         txRate         int
233         isDead         chan struct{}
234         listOnly       bool
235         onlyPkts       map[[MTHSize]byte]bool
236         writeSPBuf     bytes.Buffer
237         fds            map[string]FdAndFullSize
238         fdsLock        sync.RWMutex
239         fileHashers    map[string]*MTHAndOffset
240         progressBars   map[string]struct{}
241         sync.RWMutex
242 }
243
244 func (state *SPState) SetDead() {
245         state.Lock()
246         defer state.Unlock()
247         select {
248         case <-state.isDead:
249                 // Already closed channel, dead
250                 return
251         default:
252         }
253         close(state.isDead)
254         go func() {
255                 for range state.payloads {
256                 }
257         }()
258         go func() {
259                 for range state.pings {
260                 }
261         }()
262 }
263
264 func (state *SPState) NotAlive() bool {
265         select {
266         case <-state.isDead:
267                 return true
268         default:
269         }
270         return false
271 }
272
273 func (state *SPState) dirUnlock() {
274         state.Ctx.UnlockDir(state.rxLock)
275         state.Ctx.UnlockDir(state.txLock)
276 }
277
278 func (state *SPState) WriteSP(dst io.Writer, payload []byte, ping bool) error {
279         state.writeSPBuf.Reset()
280         n, err := xdr.Marshal(&state.writeSPBuf, SPRaw{
281                 Magic:   MagicNNCPSv1.B,
282                 Payload: payload,
283         })
284         if err != nil {
285                 return err
286         }
287         if n, err = dst.Write(state.writeSPBuf.Bytes()); err == nil {
288                 state.TxLastSeen = time.Now()
289                 state.TxBytes += int64(n)
290                 if !ping {
291                         state.TxLastNonPing = state.TxLastSeen
292                 }
293         }
294         return err
295 }
296
297 func (state *SPState) ReadSP(src io.Reader) ([]byte, error) {
298         var sp SPRaw
299         n, err := xdr.UnmarshalLimited(src, &sp, 1<<17)
300         if err != nil {
301                 ue := err.(*xdr.UnmarshalError)
302                 if ue.Err == io.EOF {
303                         return nil, ue.Err
304                 }
305                 return nil, err
306         }
307         state.RxLastSeen = time.Now()
308         state.RxBytes += int64(n)
309         if sp.Magic != MagicNNCPSv1.B {
310                 return nil, BadMagic
311         }
312         return sp.Payload, nil
313 }
314
315 func (ctx *Ctx) infosOur(nodeId *NodeId, nice uint8, seen *map[[MTHSize]byte]uint8) [][]byte {
316         var infos []*SPInfo
317         var totalSize int64
318         for job := range ctx.Jobs(nodeId, TTx) {
319                 if job.PktEnc.Nice > nice {
320                         continue
321                 }
322                 if _, known := (*seen)[*job.HshValue]; known {
323                         continue
324                 }
325                 totalSize += job.Size
326                 infos = append(infos, &SPInfo{
327                         Nice: job.PktEnc.Nice,
328                         Size: uint64(job.Size),
329                         Hash: job.HshValue,
330                 })
331                 (*seen)[*job.HshValue] = job.PktEnc.Nice
332         }
333         sort.Sort(ByNice(infos))
334         var payloads [][]byte
335         for _, info := range infos {
336                 payloads = append(payloads, MarshalSP(SPTypeInfo, info))
337                 pktName := Base32Codec.EncodeToString(info.Hash[:])
338                 ctx.LogD("sp-info-our", LEs{
339                         {"Node", nodeId},
340                         {"Name", pktName},
341                         {"Size", info.Size},
342                 }, func(les LEs) string {
343                         return fmt.Sprintf(
344                                 "Our info: %s/tx/%s (%s)",
345                                 ctx.NodeName(nodeId),
346                                 pktName,
347                                 humanize.IBytes(info.Size),
348                         )
349                 })
350         }
351         if totalSize > 0 {
352                 ctx.LogI("sp-infos-tx", LEs{
353                         {"XX", string(TTx)},
354                         {"Node", nodeId},
355                         {"Pkts", len(payloads)},
356                         {"Size", totalSize},
357                 }, func(les LEs) string {
358                         return fmt.Sprintf(
359                                 "We have got for %s: %d packets, %s",
360                                 ctx.NodeName(nodeId),
361                                 len(payloads),
362                                 humanize.IBytes(uint64(totalSize)),
363                         )
364                 })
365         }
366         return payloadsSplit(payloads)
367 }
368
369 func (state *SPState) StartI(conn ConnDeadlined) error {
370         nodeId := state.Node.Id
371         err := state.Ctx.ensureRxDir(nodeId)
372         if err != nil {
373                 return err
374         }
375         var rxLock *os.File
376         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TRx) {
377                 rxLock, err = state.Ctx.LockDir(nodeId, string(TRx))
378                 if err != nil {
379                         return err
380                 }
381         }
382         var txLock *os.File
383         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
384                 txLock, err = state.Ctx.LockDir(nodeId, string(TTx))
385                 if err != nil {
386                         return err
387                 }
388         }
389         started := time.Now()
390         conf := noise.Config{
391                 CipherSuite: NoiseCipherSuite,
392                 Pattern:     noise.HandshakeIK,
393                 Initiator:   true,
394                 StaticKeypair: noise.DHKey{
395                         Private: state.Ctx.Self.NoisePrv[:],
396                         Public:  state.Ctx.Self.NoisePub[:],
397                 },
398                 PeerStatic: state.Node.NoisePub[:],
399         }
400         hs, err := noise.NewHandshakeState(conf)
401         if err != nil {
402                 return err
403         }
404         state.hs = hs
405         state.payloads = make(chan []byte)
406         state.pings = make(chan struct{})
407         state.infosTheir = make(map[[MTHSize]byte]*SPInfo)
408         state.infosOurSeen = make(map[[MTHSize]byte]uint8)
409         state.progressBars = make(map[string]struct{})
410         state.started = started
411         state.rxLock = rxLock
412         state.txLock = txLock
413
414         var infosPayloads [][]byte
415         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
416                 infosPayloads = state.Ctx.infosOur(nodeId, state.Nice, &state.infosOurSeen)
417         }
418         var firstPayload []byte
419         if len(infosPayloads) > 0 {
420                 firstPayload = infosPayloads[0]
421         }
422         // Pad first payload, to hide actual number of existing files
423         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
424                 firstPayload = append(firstPayload, SPHaltMarshalized...)
425         }
426
427         var buf []byte
428         var payload []byte
429         buf, _, _, err = state.hs.WriteMessage(nil, firstPayload)
430         if err != nil {
431                 state.dirUnlock()
432                 return err
433         }
434         les := LEs{{"Node", nodeId}, {"Nice", int(state.Nice)}}
435         state.Ctx.LogD("sp-startI", les, func(les LEs) string {
436                 return fmt.Sprintf(
437                         "SP with %s (nice %s): sending first message",
438                         state.Node.Name,
439                         NicenessFmt(state.Nice),
440                 )
441         })
442         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline))
443         if err = state.WriteSP(conn, buf, false); err != nil {
444                 state.Ctx.LogE("sp-startI", les, err, func(les LEs) string {
445                         return fmt.Sprintf(
446                                 "SP with %s (nice %s): writing",
447                                 state.Node.Name,
448                                 NicenessFmt(state.Nice),
449                         )
450                 })
451                 state.dirUnlock()
452                 return err
453         }
454         state.Ctx.LogD("sp-startI-wait", les, func(les LEs) string {
455                 return fmt.Sprintf(
456                         "SP with %s (nice %s): waiting for first message",
457                         state.Node.Name,
458                         NicenessFmt(state.Nice),
459                 )
460         })
461         conn.SetReadDeadline(time.Now().Add(DefaultDeadline))
462         if buf, err = state.ReadSP(conn); err != nil {
463                 state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string {
464                         return fmt.Sprintf(
465                                 "SP with %s (nice %s): reading",
466                                 state.Node.Name,
467                                 NicenessFmt(state.Nice),
468                         )
469                 })
470                 state.dirUnlock()
471                 return err
472         }
473         payload, state.csOur, state.csTheir, err = state.hs.ReadMessage(nil, buf)
474         if err != nil {
475                 state.Ctx.LogE("sp-startI-read", les, err, func(les LEs) string {
476                         return fmt.Sprintf(
477                                 "SP with %s (nice %s): reading Noise message",
478                                 state.Node.Name,
479                                 NicenessFmt(state.Nice),
480                         )
481                 })
482                 state.dirUnlock()
483                 return err
484         }
485         state.Ctx.LogD("sp-startI-workers", les, func(les LEs) string {
486                 return fmt.Sprintf(
487                         "SP with %s (nice %s): starting workers",
488                         state.Node.Name,
489                         NicenessFmt(state.Nice),
490                 )
491         })
492         err = state.StartWorkers(conn, infosPayloads, payload)
493         if err != nil {
494                 state.Ctx.LogE("sp-startI-workers", les, err, func(les LEs) string {
495                         return fmt.Sprintf(
496                                 "SP with %s (nice %s): starting workers",
497                                 state.Node.Name,
498                                 NicenessFmt(state.Nice),
499                         )
500                 })
501                 state.dirUnlock()
502         }
503         return err
504 }
505
506 func (state *SPState) StartR(conn ConnDeadlined) error {
507         started := time.Now()
508         conf := noise.Config{
509                 CipherSuite: NoiseCipherSuite,
510                 Pattern:     noise.HandshakeIK,
511                 Initiator:   false,
512                 StaticKeypair: noise.DHKey{
513                         Private: state.Ctx.Self.NoisePrv[:],
514                         Public:  state.Ctx.Self.NoisePub[:],
515                 },
516         }
517         hs, err := noise.NewHandshakeState(conf)
518         if err != nil {
519                 return err
520         }
521         xxOnly := TRxTx("")
522         state.hs = hs
523         state.payloads = make(chan []byte)
524         state.pings = make(chan struct{})
525         state.infosOurSeen = make(map[[MTHSize]byte]uint8)
526         state.infosTheir = make(map[[MTHSize]byte]*SPInfo)
527         state.progressBars = make(map[string]struct{})
528         state.started = started
529         state.xxOnly = xxOnly
530
531         var buf []byte
532         var payload []byte
533         logMsg := func(les LEs) string {
534                 return fmt.Sprintf(
535                         "SP nice %s: waiting for first message",
536                         NicenessFmt(state.Nice),
537                 )
538         }
539         les := LEs{{"Nice", int(state.Nice)}}
540         state.Ctx.LogD("sp-startR", les, logMsg)
541         conn.SetReadDeadline(time.Now().Add(DefaultDeadline))
542         if buf, err = state.ReadSP(conn); err != nil {
543                 state.Ctx.LogE("sp-startR-read", les, err, logMsg)
544                 return err
545         }
546         if payload, _, _, err = state.hs.ReadMessage(nil, buf); err != nil {
547                 state.Ctx.LogE("sp-startR-read", les, err, logMsg)
548                 return err
549         }
550
551         var node *Node
552         for _, n := range state.Ctx.Neigh {
553                 if n.NoisePub == nil {
554                         continue
555                 }
556                 if subtle.ConstantTimeCompare(state.hs.PeerStatic(), n.NoisePub[:]) == 1 {
557                         node = n
558                         break
559                 }
560         }
561         if node == nil {
562                 peerId := Base32Codec.EncodeToString(state.hs.PeerStatic())
563                 err = errors.New("unknown peer: " + peerId)
564                 state.Ctx.LogE("sp-startR-unknown", append(les, LE{"Peer", peerId}), err, logMsg)
565                 return err
566         }
567         state.Node = node
568         state.rxRate = node.RxRate
569         state.txRate = node.TxRate
570         state.onlineDeadline = node.OnlineDeadline
571         state.maxOnlineTime = node.MaxOnlineTime
572         les = LEs{{"Node", node.Id}, {"Nice", int(state.Nice)}}
573
574         if err = state.Ctx.ensureRxDir(node.Id); err != nil {
575                 return err
576         }
577         var rxLock *os.File
578         if xxOnly == "" || xxOnly == TRx {
579                 rxLock, err = state.Ctx.LockDir(node.Id, string(TRx))
580                 if err != nil {
581                         return err
582                 }
583         }
584         state.rxLock = rxLock
585         var txLock *os.File
586         if xxOnly == "" || xxOnly == TTx {
587                 txLock, err = state.Ctx.LockDir(node.Id, string(TTx))
588                 if err != nil {
589                         return err
590                 }
591         }
592         state.txLock = txLock
593
594         var infosPayloads [][]byte
595         if xxOnly == "" || xxOnly == TTx {
596                 infosPayloads = state.Ctx.infosOur(node.Id, state.Nice, &state.infosOurSeen)
597         }
598         var firstPayload []byte
599         if len(infosPayloads) > 0 {
600                 firstPayload = infosPayloads[0]
601         }
602         // Pad first payload, to hide actual number of existing files
603         for i := 0; i < (MaxSPSize-len(firstPayload))/SPHeadOverhead; i++ {
604                 firstPayload = append(firstPayload, SPHaltMarshalized...)
605         }
606
607         state.Ctx.LogD("sp-startR-write", les, func(les LEs) string {
608                 return fmt.Sprintf(
609                         "SP with %s (nice %s): sending first message",
610                         node.Name, NicenessFmt(state.Nice),
611                 )
612         })
613         buf, state.csTheir, state.csOur, err = state.hs.WriteMessage(nil, firstPayload)
614         if err != nil {
615                 state.dirUnlock()
616                 return err
617         }
618         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline))
619         if err = state.WriteSP(conn, buf, false); err != nil {
620                 state.Ctx.LogE("sp-startR-write", les, err, func(les LEs) string {
621                         return fmt.Sprintf(
622                                 "SP with %s (nice %s): writing",
623                                 node.Name, NicenessFmt(state.Nice),
624                         )
625                 })
626                 state.dirUnlock()
627                 return err
628         }
629         state.Ctx.LogD("sp-startR-workers", les, func(les LEs) string {
630                 return fmt.Sprintf(
631                         "SP with %s (nice %s): starting workers",
632                         node.Name, NicenessFmt(state.Nice),
633                 )
634         })
635         err = state.StartWorkers(conn, infosPayloads, payload)
636         if err != nil {
637                 state.dirUnlock()
638         }
639         return err
640 }
641
642 func (state *SPState) closeFd(pth string) {
643         state.fdsLock.Lock()
644         if s, exists := state.fds[pth]; exists {
645                 delete(state.fds, pth)
646                 s.fd.Close()
647         }
648         state.fdsLock.Unlock()
649 }
650
651 func (state *SPState) StartWorkers(
652         conn ConnDeadlined,
653         infosPayloads [][]byte,
654         payload []byte,
655 ) error {
656         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
657         state.fds = make(map[string]FdAndFullSize)
658         state.fileHashers = make(map[string]*MTHAndOffset)
659         state.isDead = make(chan struct{})
660         if state.maxOnlineTime > 0 {
661                 state.mustFinishAt = state.started.Add(state.maxOnlineTime)
662         }
663         if !state.NoCK {
664                 spCheckerOnce.Do(func() { go SPChecker(state.Ctx) })
665                 go func() {
666                         for job := range state.Ctx.JobsNoCK(state.Node.Id) {
667                                 if job.PktEnc.Nice <= state.Nice {
668                                         spCheckerTasks <- SPCheckerTask{
669                                                 nodeId: state.Node.Id,
670                                                 hsh:    job.HshValue,
671                                                 done:   state.payloads,
672                                         }
673                                 }
674                         }
675                 }()
676         }
677
678         // Remaining handshake payload sending
679         if len(infosPayloads) > 1 {
680                 state.wg.Add(1)
681                 go func() {
682                         for _, payload := range infosPayloads[1:] {
683                                 state.Ctx.LogD(
684                                         "sp-queue-remaining",
685                                         append(les, LE{"Size", int64(len(payload))}),
686                                         func(les LEs) string {
687                                                 return fmt.Sprintf(
688                                                         "SP with %s (nice %s): queuing remaining payload (%s)",
689                                                         state.Node.Name, NicenessFmt(state.Nice),
690                                                         humanize.IBytes(uint64(len(payload))),
691                                                 )
692                                         },
693                                 )
694                                 state.payloads <- payload
695                         }
696                         state.wg.Done()
697                 }()
698         }
699
700         // Processing of first payload and queueing its responses
701         logMsg := func(les LEs) string {
702                 return fmt.Sprintf(
703                         "SP with %s (nice %s): processing first payload (%s)",
704                         state.Node.Name, NicenessFmt(state.Nice),
705                         humanize.IBytes(uint64(len(payload))),
706                 )
707         }
708         state.Ctx.LogD("sp-process", append(les, LE{"Size", int64(len(payload))}), logMsg)
709         replies, err := state.ProcessSP(payload)
710         if err != nil {
711                 state.Ctx.LogE("sp-process", les, err, logMsg)
712                 return err
713         }
714         state.wg.Add(1)
715         go func() {
716                 for _, reply := range replies {
717                         state.Ctx.LogD(
718                                 "sp-queue-reply",
719                                 append(les, LE{"Size", int64(len(reply))}),
720                                 func(les LEs) string {
721                                         return fmt.Sprintf(
722                                                 "SP with %s (nice %s): queuing reply (%s)",
723                                                 state.Node.Name, NicenessFmt(state.Nice),
724                                                 humanize.IBytes(uint64(len(payload))),
725                                         )
726                                 },
727                         )
728                         state.payloads <- reply
729                 }
730                 state.wg.Done()
731         }()
732
733         // Periodic jobs
734         state.wg.Add(1)
735         go func() {
736                 deadlineTicker := time.NewTicker(time.Second)
737                 pingTicker := time.NewTicker(PingTimeout)
738                 for {
739                         select {
740                         case <-state.isDead:
741                                 state.wg.Done()
742                                 deadlineTicker.Stop()
743                                 pingTicker.Stop()
744                                 return
745                         case now := <-deadlineTicker.C:
746                                 if now.Sub(state.RxLastNonPing) >= state.onlineDeadline &&
747                                         now.Sub(state.TxLastNonPing) >= state.onlineDeadline {
748                                         goto Deadlined
749                                 }
750                                 if state.maxOnlineTime > 0 && state.mustFinishAt.Before(now) {
751                                         goto Deadlined
752                                 }
753                                 if now.Sub(state.RxLastSeen) >= 2*PingTimeout {
754                                         goto Deadlined
755                                 }
756                                 break
757                         Deadlined:
758                                 state.SetDead()
759                                 conn.Close()
760                         case now := <-pingTicker.C:
761                                 if now.After(state.TxLastSeen.Add(PingTimeout)) {
762                                         state.wg.Add(1)
763                                         go func() {
764                                                 state.pings <- struct{}{}
765                                                 state.wg.Done()
766                                         }()
767                                 }
768                         }
769                 }
770         }()
771
772         // Spool checker and INFOs sender of appearing files
773         if !state.listOnly && (state.xxOnly == "" || state.xxOnly == TTx) {
774                 state.wg.Add(1)
775                 go func() {
776                         dw, err := state.Ctx.NewDirWatcher(
777                                 filepath.Join(state.Ctx.Spool, state.Node.Id.String(), string(TTx)),
778                                 time.Second,
779                         )
780                         if err != nil {
781                                 state.Ctx.LogE("sp-queue-dir-watch", les, err, logMsg)
782                                 log.Fatalln(err)
783                         }
784                         for {
785                                 select {
786                                 case <-state.isDead:
787                                         dw.Close()
788                                         state.wg.Done()
789                                         return
790                                 case <-dw.C:
791                                         for _, payload := range state.Ctx.infosOur(
792                                                 state.Node.Id,
793                                                 state.Nice,
794                                                 &state.infosOurSeen,
795                                         ) {
796                                                 state.Ctx.LogD(
797                                                         "sp-queue-info",
798                                                         append(les, LE{"Size", int64(len(payload))}),
799                                                         func(les LEs) string {
800                                                                 return fmt.Sprintf(
801                                                                         "SP with %s (nice %s): queuing new info (%s)",
802                                                                         state.Node.Name, NicenessFmt(state.Nice),
803                                                                         humanize.IBytes(uint64(len(payload))),
804                                                                 )
805                                                         },
806                                                 )
807                                                 state.payloads <- payload
808                                         }
809                                 }
810                         }
811                 }()
812         }
813
814         // Sender
815         state.wg.Add(1)
816         go func() {
817                 defer conn.Close()
818                 defer state.SetDead()
819                 defer state.wg.Done()
820                 buf := make([]byte, MaxSPSize-SPHeadOverhead-SPFileOverhead)
821                 for {
822                         if state.NotAlive() {
823                                 return
824                         }
825                         var payload []byte
826                         var ping bool
827                         select {
828                         case <-state.pings:
829                                 state.Ctx.LogD("sp-got-ping", les, func(les LEs) string {
830                                         return fmt.Sprintf(
831                                                 "SP with %s (nice %s): got ping",
832                                                 state.Node.Name, NicenessFmt(state.Nice),
833                                         )
834                                 })
835                                 payload = SPPingMarshalized
836                                 ping = true
837                         case payload = <-state.payloads:
838                                 state.Ctx.LogD(
839                                         "sp-got-payload",
840                                         append(les, LE{"Size", int64(len(payload))}),
841                                         func(les LEs) string {
842                                                 return fmt.Sprintf(
843                                                         "SP with %s (nice %s): got payload (%s)",
844                                                         state.Node.Name, NicenessFmt(state.Nice),
845                                                         humanize.IBytes(uint64(len(payload))),
846                                                 )
847                                         },
848                                 )
849                         default:
850                                 state.RLock()
851                                 if len(state.queueTheir) == 0 {
852                                         state.RUnlock()
853                                         time.Sleep(100 * time.Millisecond)
854                                         continue
855                                 }
856                                 freq := state.queueTheir[0].freq
857                                 state.RUnlock()
858                                 if state.txRate > 0 {
859                                         time.Sleep(time.Second / time.Duration(state.txRate))
860                                 }
861                                 pktName := Base32Codec.EncodeToString(freq.Hash[:])
862                                 lesp := append(
863                                         les,
864                                         LE{"XX", string(TTx)},
865                                         LE{"Pkt", pktName},
866                                         LE{"Size", int64(freq.Offset)},
867                                 )
868                                 logMsg := func(les LEs) string {
869                                         return fmt.Sprintf(
870                                                 "SP with %s (nice %s): tx/%s (%s)",
871                                                 state.Node.Name, NicenessFmt(state.Nice),
872                                                 pktName,
873                                                 humanize.IBytes(freq.Offset),
874                                         )
875                                 }
876                                 state.Ctx.LogD("sp-queue", lesp, func(les LEs) string {
877                                         return logMsg(les) + ": queueing"
878                                 })
879                                 pth := filepath.Join(
880                                         state.Ctx.Spool,
881                                         state.Node.Id.String(),
882                                         string(TTx),
883                                         Base32Codec.EncodeToString(freq.Hash[:]),
884                                 )
885                                 state.fdsLock.RLock()
886                                 fdAndFullSize, exists := state.fds[pth]
887                                 state.fdsLock.RUnlock()
888                                 if !exists {
889                                         state.Ctx.LogD("sp-queue-open", lesp, func(les LEs) string {
890                                                 return logMsg(les) + ": opening"
891                                         })
892                                         fd, err := os.Open(pth)
893                                         if err != nil {
894                                                 state.Ctx.LogE("sp-queue-open", lesp, err, func(les LEs) string {
895                                                         return logMsg(les) + ": opening"
896                                                 })
897                                                 return
898                                         }
899                                         fi, err := fd.Stat()
900                                         if err != nil {
901                                                 state.Ctx.LogE("sp-queue-stat", lesp, err, func(les LEs) string {
902                                                         return logMsg(les) + ": stating"
903                                                 })
904                                                 return
905                                         }
906                                         fdAndFullSize = FdAndFullSize{fd: fd, fullSize: fi.Size()}
907                                         state.fdsLock.Lock()
908                                         state.fds[pth] = fdAndFullSize
909                                         state.fdsLock.Unlock()
910                                 }
911                                 fd := fdAndFullSize.fd
912                                 fullSize := fdAndFullSize.fullSize
913                                 lesp = append(lesp, LE{"FullSize", fullSize})
914                                 var bufRead []byte
915                                 if freq.Offset < uint64(fullSize) {
916                                         state.Ctx.LogD("sp-file-seek", lesp, func(les LEs) string {
917                                                 return logMsg(les) + ": seeking"
918                                         })
919                                         if _, err = fd.Seek(int64(freq.Offset), io.SeekStart); err != nil {
920                                                 state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
921                                                         return logMsg(les) + ": seeking"
922                                                 })
923                                                 return
924                                         }
925                                         n, err := fd.Read(buf)
926                                         if err != nil {
927                                                 state.Ctx.LogE("sp-file-read", lesp, err, func(les LEs) string {
928                                                         return logMsg(les) + ": reading"
929                                                 })
930                                                 return
931                                         }
932                                         bufRead = buf[:n]
933                                         lesp = append(
934                                                 les,
935                                                 LE{"XX", string(TTx)},
936                                                 LE{"Pkt", pktName},
937                                                 LE{"Size", int64(n)},
938                                                 LE{"FullSize", fullSize},
939                                         )
940                                         state.Ctx.LogD("sp-file-read", lesp, func(les LEs) string {
941                                                 return fmt.Sprintf(
942                                                         "%s: read %s",
943                                                         logMsg(les), humanize.IBytes(uint64(n)),
944                                                 )
945                                         })
946                                 } else {
947                                         state.closeFd(pth)
948                                 }
949                                 payload = MarshalSP(SPTypeFile, SPFile{
950                                         Hash:    freq.Hash,
951                                         Offset:  freq.Offset,
952                                         Payload: bufRead,
953                                 })
954                                 ourSize := freq.Offset + uint64(len(bufRead))
955                                 lesp = append(
956                                         les,
957                                         LE{"XX", string(TTx)},
958                                         LE{"Pkt", pktName},
959                                         LE{"Size", int64(ourSize)},
960                                         LE{"FullSize", fullSize},
961                                 )
962                                 if state.Ctx.ShowPrgrs {
963                                         state.progressBars[pktName] = struct{}{}
964                                         Progress("Tx", lesp)
965                                 }
966                                 if ourSize == uint64(fullSize) {
967                                         state.closeFd(pth)
968                                         state.Ctx.LogD("sp-file-finished", lesp, func(les LEs) string {
969                                                 return logMsg(les) + ": finished"
970                                         })
971                                         if state.Ctx.ShowPrgrs {
972                                                 delete(state.progressBars, pktName)
973                                         }
974                                 }
975                                 state.Lock()
976                                 for i, q := range state.queueTheir {
977                                         if *q.freq.Hash != *freq.Hash {
978                                                 continue
979                                         }
980                                         if ourSize == uint64(fullSize) {
981                                                 state.queueTheir = append(
982                                                         state.queueTheir[:i],
983                                                         state.queueTheir[i+1:]...,
984                                                 )
985                                         } else {
986                                                 q.freq.Offset = ourSize
987                                         }
988                                         break
989                                 }
990                                 state.Unlock()
991                         }
992                         logMsg := func(les LEs) string {
993                                 return fmt.Sprintf(
994                                         "SP with %s (nice %s): sending %s",
995                                         state.Node.Name, NicenessFmt(state.Nice),
996                                         humanize.IBytes(uint64(len(payload))),
997                                 )
998                         }
999                         state.Ctx.LogD("sp-sending", append(les, LE{"Size", int64(len(payload))}), logMsg)
1000                         conn.SetWriteDeadline(time.Now().Add(DefaultDeadline))
1001                         ct, err := state.csOur.Encrypt(nil, nil, payload)
1002                         if err != nil {
1003                                 state.Ctx.LogE("sp-encrypting", les, err, logMsg)
1004                                 return
1005                         }
1006                         if err := state.WriteSP(conn, ct, ping); err != nil {
1007                                 state.Ctx.LogE("sp-sending", les, err, logMsg)
1008                                 return
1009                         }
1010                 }
1011         }()
1012
1013         // Receiver
1014         state.wg.Add(1)
1015         go func() {
1016                 for {
1017                         if state.NotAlive() {
1018                                 break
1019                         }
1020                         logMsg := func(les LEs) string {
1021                                 return fmt.Sprintf(
1022                                         "SP with %s (nice %s): waiting for payload",
1023                                         state.Node.Name, NicenessFmt(state.Nice),
1024                                 )
1025                         }
1026                         state.Ctx.LogD("sp-recv-wait", les, logMsg)
1027                         conn.SetReadDeadline(time.Now().Add(DefaultDeadline))
1028                         payload, err := state.ReadSP(conn)
1029                         if err != nil {
1030                                 if err == io.EOF {
1031                                         break
1032                                 }
1033                                 unmarshalErr := err.(*xdr.UnmarshalError)
1034                                 if os.IsTimeout(unmarshalErr.Err) {
1035                                         continue
1036                                 }
1037                                 if unmarshalErr.ErrorCode == xdr.ErrIO {
1038                                         break
1039                                 }
1040                                 state.Ctx.LogE("sp-recv-wait", les, err, logMsg)
1041                                 break
1042                         }
1043                         logMsg = func(les LEs) string {
1044                                 return fmt.Sprintf(
1045                                         "SP with %s (nice %s): payload (%s)",
1046                                         state.Node.Name, NicenessFmt(state.Nice),
1047                                         humanize.IBytes(uint64(len(payload))),
1048                                 )
1049                         }
1050                         state.Ctx.LogD(
1051                                 "sp-recv-got",
1052                                 append(les, LE{"Size", int64(len(payload))}),
1053                                 func(les LEs) string { return logMsg(les) + ": got" },
1054                         )
1055                         payload, err = state.csTheir.Decrypt(nil, nil, payload)
1056                         if err != nil {
1057                                 state.Ctx.LogE("sp-recv-got", les, err, func(les LEs) string {
1058                                         return logMsg(les) + ": got"
1059                                 })
1060                                 break
1061                         }
1062                         state.Ctx.LogD(
1063                                 "sp-recv-process",
1064                                 append(les, LE{"Size", int64(len(payload))}),
1065                                 func(les LEs) string {
1066                                         return logMsg(les) + ": processing"
1067                                 },
1068                         )
1069                         replies, err := state.ProcessSP(payload)
1070                         if err != nil {
1071                                 state.Ctx.LogE("sp-recv-process", les, err, func(les LEs) string {
1072                                         return logMsg(les) + ": processing"
1073                                 })
1074                                 break
1075                         }
1076                         state.wg.Add(1)
1077                         go func() {
1078                                 for _, reply := range replies {
1079                                         state.Ctx.LogD(
1080                                                 "sp-recv-reply",
1081                                                 append(les[:len(les)-1], LE{"Size", int64(len(reply))}),
1082                                                 func(les LEs) string {
1083                                                         return fmt.Sprintf(
1084                                                                 "SP with %s (nice %s): queuing reply (%s)",
1085                                                                 state.Node.Name, NicenessFmt(state.Nice),
1086                                                                 humanize.IBytes(uint64(len(reply))),
1087                                                         )
1088                                                 },
1089                                         )
1090                                         state.payloads <- reply
1091                                 }
1092                                 state.wg.Done()
1093                         }()
1094                         if state.rxRate > 0 {
1095                                 time.Sleep(time.Second / time.Duration(state.rxRate))
1096                         }
1097                 }
1098                 state.SetDead()
1099                 state.wg.Done()
1100                 state.SetDead()
1101                 conn.Close()
1102         }()
1103
1104         return nil
1105 }
1106
1107 func (state *SPState) Wait() {
1108         state.wg.Wait()
1109         close(state.payloads)
1110         close(state.pings)
1111         state.Duration = time.Now().Sub(state.started)
1112         state.dirUnlock()
1113         state.RxSpeed = state.RxBytes
1114         state.TxSpeed = state.TxBytes
1115         rxDuration := int64(state.RxLastSeen.Sub(state.started).Seconds())
1116         txDuration := int64(state.TxLastSeen.Sub(state.started).Seconds())
1117         if rxDuration > 0 {
1118                 state.RxSpeed = state.RxBytes / rxDuration
1119         }
1120         if txDuration > 0 {
1121                 state.TxSpeed = state.TxBytes / txDuration
1122         }
1123         for _, s := range state.fds {
1124                 s.fd.Close()
1125         }
1126         for pktName := range state.progressBars {
1127                 ProgressKill(pktName)
1128         }
1129 }
1130
1131 func (state *SPState) ProcessSP(payload []byte) ([][]byte, error) {
1132         les := LEs{{"Node", state.Node.Id}, {"Nice", int(state.Nice)}}
1133         r := bytes.NewReader(payload)
1134         var err error
1135         var replies [][]byte
1136         var infosGot bool
1137         for r.Len() > 0 {
1138                 state.Ctx.LogD("sp-process-unmarshal", les, func(les LEs) string {
1139                         return fmt.Sprintf(
1140                                 "SP with %s (nice %s): unmarshaling header",
1141                                 state.Node.Name, NicenessFmt(state.Nice),
1142                         )
1143                 })
1144                 var head SPHead
1145                 if _, err = xdr.Unmarshal(r, &head); err != nil {
1146                         state.Ctx.LogE("sp-process-unmarshal", les, err, func(les LEs) string {
1147                                 return fmt.Sprintf(
1148                                         "SP with %s (nice %s): unmarshaling header",
1149                                         state.Node.Name, NicenessFmt(state.Nice),
1150                                 )
1151                         })
1152                         return nil, err
1153                 }
1154                 if head.Type != SPTypePing {
1155                         state.RxLastNonPing = state.RxLastSeen
1156                 }
1157                 switch head.Type {
1158                 case SPTypeHalt:
1159                         state.Ctx.LogD(
1160                                 "sp-process-halt",
1161                                 append(les, LE{"Type", "halt"}), func(les LEs) string {
1162                                         return fmt.Sprintf(
1163                                                 "SP with %s (nice %s): got HALT",
1164                                                 state.Node.Name, NicenessFmt(state.Nice),
1165                                         )
1166                                 },
1167                         )
1168                         state.Lock()
1169                         state.queueTheir = nil
1170                         state.Unlock()
1171
1172                 case SPTypePing:
1173                         state.Ctx.LogD(
1174                                 "sp-process-ping",
1175                                 append(les, LE{"Type", "ping"}),
1176                                 func(les LEs) string {
1177                                         return fmt.Sprintf(
1178                                                 "SP with %s (nice %s): got PING",
1179                                                 state.Node.Name, NicenessFmt(state.Nice),
1180                                         )
1181                                 },
1182                         )
1183
1184                 case SPTypeInfo:
1185                         infosGot = true
1186                         lesp := append(les, LE{"Type", "info"})
1187                         state.Ctx.LogD(
1188                                 "sp-process-info-unmarshal", lesp,
1189                                 func(les LEs) string {
1190                                         return fmt.Sprintf(
1191                                                 "SP with %s (nice %s): unmarshaling INFO",
1192                                                 state.Node.Name, NicenessFmt(state.Nice),
1193                                         )
1194                                 },
1195                         )
1196                         var info SPInfo
1197                         if _, err = xdr.Unmarshal(r, &info); err != nil {
1198                                 state.Ctx.LogE(
1199                                         "sp-process-info-unmarshal", lesp, err,
1200                                         func(les LEs) string {
1201                                                 return fmt.Sprintf(
1202                                                         "SP with %s (nice %s): unmarshaling INFO",
1203                                                         state.Node.Name, NicenessFmt(state.Nice),
1204                                                 )
1205                                         },
1206                                 )
1207                                 return nil, err
1208                         }
1209                         pktName := Base32Codec.EncodeToString(info.Hash[:])
1210                         lesp = append(
1211                                 lesp,
1212                                 LE{"Pkt", pktName},
1213                                 LE{"Size", int64(info.Size)},
1214                                 LE{"PktNice", int(info.Nice)},
1215                         )
1216                         logMsg := func(les LEs) string {
1217                                 return fmt.Sprintf(
1218                                         "SP with %s (nice %s): INFO %s (%s) nice %s",
1219                                         state.Node.Name, NicenessFmt(state.Nice),
1220                                         pktName,
1221                                         humanize.IBytes(info.Size),
1222                                         NicenessFmt(info.Nice),
1223                                 )
1224                         }
1225                         if !state.listOnly && info.Nice > state.Nice {
1226                                 state.Ctx.LogD("sp-process-info-too-nice", lesp, func(les LEs) string {
1227                                         return logMsg(les) + ": too nice"
1228                                 })
1229                                 continue
1230                         }
1231                         state.Ctx.LogD("sp-process-info-got", lesp, func(les LEs) string {
1232                                 return logMsg(les) + ": received"
1233                         })
1234                         if !state.listOnly && state.xxOnly == TTx {
1235                                 continue
1236                         }
1237                         state.Lock()
1238                         state.infosTheir[*info.Hash] = &info
1239                         state.Unlock()
1240                         state.Ctx.LogD("sp-process-info-stat", lesp, func(les LEs) string {
1241                                 return logMsg(les) + ": stating part"
1242                         })
1243                         pktPath := filepath.Join(
1244                                 state.Ctx.Spool,
1245                                 state.Node.Id.String(),
1246                                 string(TRx),
1247                                 Base32Codec.EncodeToString(info.Hash[:]),
1248                         )
1249                         logMsg = func(les LEs) string {
1250                                 return fmt.Sprintf(
1251                                         "Packet %s (%s) (nice %s)",
1252                                         pktName,
1253                                         humanize.IBytes(info.Size),
1254                                         NicenessFmt(info.Nice),
1255                                 )
1256                         }
1257                         if _, err = os.Stat(pktPath); err == nil {
1258                                 state.Ctx.LogI("sp-info-done", lesp, func(les LEs) string {
1259                                         return logMsg(les) + ": already done"
1260                                 })
1261                                 if !state.listOnly {
1262                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
1263                                 }
1264                                 continue
1265                         }
1266                         if _, err = os.Stat(pktPath + SeenSuffix); err == nil {
1267                                 state.Ctx.LogI("sp-info-seen", lesp, func(les LEs) string {
1268                                         return logMsg(les) + ": already seen"
1269                                 })
1270                                 if !state.listOnly {
1271                                         replies = append(replies, MarshalSP(SPTypeDone, SPDone{info.Hash}))
1272                                 }
1273                                 continue
1274                         }
1275                         if _, err = os.Stat(pktPath + NoCKSuffix); err == nil {
1276                                 state.Ctx.LogI("sp-info-nock", lesp, func(les LEs) string {
1277                                         return logMsg(les) + ": still not checksummed"
1278                                 })
1279                                 continue
1280                         }
1281                         fi, err := os.Stat(pktPath + PartSuffix)
1282                         var offset int64
1283                         if err == nil {
1284                                 offset = fi.Size()
1285                         }
1286                         if !state.Ctx.IsEnoughSpace(int64(info.Size) - offset) {
1287                                 state.Ctx.LogI("sp-info-no-space", lesp, func(les LEs) string {
1288                                         return logMsg(les) + ": not enough space"
1289                                 })
1290                                 continue
1291                         }
1292                         state.Ctx.LogI(
1293                                 "sp-info",
1294                                 append(lesp, LE{"Offset", offset}),
1295                                 func(les LEs) string {
1296                                         return fmt.Sprintf(
1297                                                 "%s: %d%%", logMsg(les), 100*uint64(offset)/info.Size,
1298                                         )
1299                                 },
1300                         )
1301                         if !state.listOnly && (state.onlyPkts == nil || state.onlyPkts[*info.Hash]) {
1302                                 replies = append(replies, MarshalSP(
1303                                         SPTypeFreq,
1304                                         SPFreq{info.Hash, uint64(offset)},
1305                                 ))
1306                         }
1307
1308                 case SPTypeFile:
1309                         lesp := append(les, LE{"Type", "file"})
1310                         state.Ctx.LogD("sp-process-file", lesp, func(les LEs) string {
1311                                 return fmt.Sprintf(
1312                                         "SP with %s (nice %s): unmarshaling FILE",
1313                                         state.Node.Name, NicenessFmt(state.Nice),
1314                                 )
1315                         })
1316                         var file SPFile
1317                         if _, err = xdr.Unmarshal(r, &file); err != nil {
1318                                 state.Ctx.LogE("sp-process-file", lesp, err, func(les LEs) string {
1319                                         return fmt.Sprintf(
1320                                                 "SP with %s (nice %s): unmarshaling FILE",
1321                                                 state.Node.Name, NicenessFmt(state.Nice),
1322                                         )
1323                                 })
1324                                 return nil, err
1325                         }
1326                         pktName := Base32Codec.EncodeToString(file.Hash[:])
1327                         lesp = append(
1328                                 lesp,
1329                                 LE{"XX", string(TRx)},
1330                                 LE{"Pkt", pktName},
1331                                 LE{"Size", int64(len(file.Payload))},
1332                         )
1333                         logMsg := func(les LEs) string {
1334                                 return fmt.Sprintf(
1335                                         "Got packet %s (%s)",
1336                                         pktName, humanize.IBytes(uint64(len(file.Payload))),
1337                                 )
1338                         }
1339                         fullsize := int64(0)
1340                         state.RLock()
1341                         infoTheir := state.infosTheir[*file.Hash]
1342                         state.RUnlock()
1343                         if infoTheir == nil {
1344                                 state.Ctx.LogE("sp-file-open", lesp, err, func(les LEs) string {
1345                                         return logMsg(les) + ": unknown file"
1346                                 })
1347                                 continue
1348                         }
1349                         fullsize = int64(infoTheir.Size)
1350                         lesp = append(lesp, LE{"FullSize", fullsize})
1351                         dirToSync := filepath.Join(
1352                                 state.Ctx.Spool,
1353                                 state.Node.Id.String(),
1354                                 string(TRx),
1355                         )
1356                         filePath := filepath.Join(dirToSync, pktName)
1357                         filePathPart := filePath + PartSuffix
1358                         state.Ctx.LogD("sp-file-open", lesp, func(les LEs) string {
1359                                 return logMsg(les) + ": opening part"
1360                         })
1361                         state.fdsLock.RLock()
1362                         fdAndFullSize, exists := state.fds[filePathPart]
1363                         state.fdsLock.RUnlock()
1364                         hasherAndOffset := state.fileHashers[filePath]
1365                         var fd *os.File
1366                         if exists {
1367                                 fd = fdAndFullSize.fd
1368                         } else {
1369                                 fd, err = os.OpenFile(
1370                                         filePathPart,
1371                                         os.O_RDWR|os.O_CREATE,
1372                                         os.FileMode(0666),
1373                                 )
1374                                 if err != nil {
1375                                         state.Ctx.LogE("sp-file-open", lesp, err, func(les LEs) string {
1376                                                 return logMsg(les) + ": opening part"
1377                                         })
1378                                         return nil, err
1379                                 }
1380                                 state.fdsLock.Lock()
1381                                 state.fds[filePathPart] = FdAndFullSize{fd: fd}
1382                                 state.fdsLock.Unlock()
1383                                 if !state.NoCK {
1384                                         hasherAndOffset = &MTHAndOffset{
1385                                                 mth:    MTHNew(fullsize, int64(file.Offset)),
1386                                                 offset: file.Offset,
1387                                         }
1388                                         state.fileHashers[filePath] = hasherAndOffset
1389                                 }
1390                         }
1391                         state.Ctx.LogD(
1392                                 "sp-file-seek",
1393                                 append(lesp, LE{"Offset", file.Offset}),
1394                                 func(les LEs) string {
1395                                         return fmt.Sprintf("%s: seeking %d", logMsg(les), file.Offset)
1396                                 })
1397                         if _, err = fd.Seek(int64(file.Offset), io.SeekStart); err != nil {
1398                                 state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
1399                                         return logMsg(les) + ": seeking"
1400                                 })
1401                                 state.closeFd(filePathPart)
1402                                 return nil, err
1403                         }
1404                         state.Ctx.LogD("sp-file-write", lesp, func(les LEs) string {
1405                                 return logMsg(les) + ": writing"
1406                         })
1407                         if _, err = fd.Write(file.Payload); err != nil {
1408                                 state.Ctx.LogE("sp-file-write", lesp, err, func(les LEs) string {
1409                                         return logMsg(les) + ": writing"
1410                                 })
1411                                 state.closeFd(filePathPart)
1412                                 return nil, err
1413                         }
1414                         if hasherAndOffset != nil {
1415                                 if hasherAndOffset.offset == file.Offset {
1416                                         if _, err = hasherAndOffset.mth.Write(file.Payload); err != nil {
1417                                                 panic(err)
1418                                         }
1419                                         hasherAndOffset.offset += uint64(len(file.Payload))
1420                                 } else {
1421                                         state.Ctx.LogE(
1422                                                 "sp-file-offset-differs", lesp, errors.New("offset differs"),
1423                                                 func(les LEs) string {
1424                                                         return logMsg(les) + ": deleting hasher"
1425                                                 },
1426                                         )
1427                                         delete(state.fileHashers, filePath)
1428                                         hasherAndOffset = nil
1429                                 }
1430                         }
1431                         ourSize := int64(file.Offset + uint64(len(file.Payload)))
1432                         lesp[len(lesp)-2].V = ourSize
1433                         if state.Ctx.ShowPrgrs {
1434                                 state.progressBars[pktName] = struct{}{}
1435                                 Progress("Rx", lesp)
1436                         }
1437                         if fullsize != ourSize {
1438                                 continue
1439                         }
1440                         if state.Ctx.ShowPrgrs {
1441                                 delete(state.progressBars, pktName)
1442                         }
1443                         logMsg = func(les LEs) string {
1444                                 return fmt.Sprintf(
1445                                         "Got packet %s %d%% (%s / %s)",
1446                                         pktName, 100*ourSize/fullsize,
1447                                         humanize.IBytes(uint64(ourSize)),
1448                                         humanize.IBytes(uint64(fullsize)),
1449                                 )
1450                         }
1451                         err = fd.Sync()
1452                         if err != nil {
1453                                 state.Ctx.LogE("sp-file-sync", lesp, err, func(les LEs) string {
1454                                         return logMsg(les) + ": syncing"
1455                                 })
1456                                 state.closeFd(filePathPart)
1457                                 continue
1458                         }
1459                         if hasherAndOffset != nil {
1460                                 delete(state.fileHashers, filePath)
1461                                 if hasherAndOffset.mth.PreaddSize() == 0 {
1462                                         if bytes.Compare(hasherAndOffset.mth.Sum(nil), file.Hash[:]) != 0 {
1463                                                 state.Ctx.LogE(
1464                                                         "sp-file-bad-checksum", lesp,
1465                                                         errors.New("checksum mismatch"),
1466                                                         logMsg,
1467                                                 )
1468                                                 state.closeFd(filePathPart)
1469                                                 continue
1470                                         }
1471                                         if err = os.Rename(filePathPart, filePath); err != nil {
1472                                                 state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string {
1473                                                         return logMsg(les) + ": renaming"
1474                                                 })
1475                                                 state.closeFd(filePathPart)
1476                                                 continue
1477                                         }
1478                                         if err = DirSync(dirToSync); err != nil {
1479                                                 state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string {
1480                                                         return logMsg(les) + ": dirsyncing"
1481                                                 })
1482                                                 state.closeFd(filePathPart)
1483                                                 continue
1484                                         }
1485                                         state.Ctx.LogI("sp-file-done", lesp, func(les LEs) string {
1486                                                 return logMsg(les) + ": done"
1487                                         })
1488                                         state.wg.Add(1)
1489                                         go func() {
1490                                                 state.payloads <- MarshalSP(SPTypeDone, SPDone{file.Hash})
1491                                                 state.wg.Done()
1492                                         }()
1493                                         state.Lock()
1494                                         delete(state.infosTheir, *file.Hash)
1495                                         state.Unlock()
1496                                         if !state.Ctx.HdrUsage {
1497                                                 continue
1498                                         }
1499                                         if _, err = fd.Seek(0, io.SeekStart); err != nil {
1500                                                 state.Ctx.LogE("sp-file-seek", lesp, err, func(les LEs) string {
1501                                                         return logMsg(les) + ": seeking"
1502                                                 })
1503                                                 state.closeFd(filePathPart)
1504                                                 continue
1505                                         }
1506                                         _, pktEncRaw, err := state.Ctx.HdrRead(fd)
1507                                         state.closeFd(filePathPart)
1508                                         if err != nil {
1509                                                 state.Ctx.LogE("sp-file-hdr-read", lesp, err, func(les LEs) string {
1510                                                         return logMsg(les) + ": HdrReading"
1511                                                 })
1512                                                 continue
1513                                         }
1514                                         state.Ctx.HdrWrite(pktEncRaw, filePath)
1515                                         continue
1516                                 }
1517                         }
1518                         state.closeFd(filePathPart)
1519                         if err = os.Rename(filePathPart, filePath+NoCKSuffix); err != nil {
1520                                 state.Ctx.LogE("sp-file-rename", lesp, err, func(les LEs) string {
1521                                         return logMsg(les) + ": renaming"
1522                                 })
1523                                 continue
1524                         }
1525                         if err = DirSync(dirToSync); err != nil {
1526                                 state.Ctx.LogE("sp-file-dirsync", lesp, err, func(les LEs) string {
1527                                         return logMsg(les) + ": dirsyncing"
1528                                 })
1529                                 continue
1530                         }
1531                         state.Ctx.LogI("sp-file-downloaded", lesp, func(les LEs) string {
1532                                 return logMsg(les) + ": downloaded"
1533                         })
1534                         state.Lock()
1535                         delete(state.infosTheir, *file.Hash)
1536                         state.Unlock()
1537                         go func() {
1538                                 t := SPCheckerTask{
1539                                         nodeId: state.Node.Id,
1540                                         hsh:    file.Hash,
1541                                         done:   state.payloads,
1542                                 }
1543                                 if hasherAndOffset != nil {
1544                                         t.mth = hasherAndOffset.mth
1545                                 }
1546                                 spCheckerTasks <- t
1547                         }()
1548
1549                 case SPTypeDone:
1550                         lesp := append(les, LE{"Type", "done"})
1551                         state.Ctx.LogD("sp-process-done-unmarshal", lesp, func(les LEs) string {
1552                                 return fmt.Sprintf(
1553                                         "SP with %s (nice %s): unmarshaling DONE",
1554                                         state.Node.Name, NicenessFmt(state.Nice),
1555                                 )
1556                         })
1557                         var done SPDone
1558                         if _, err = xdr.Unmarshal(r, &done); err != nil {
1559                                 state.Ctx.LogE("sp-process-done-unmarshal", lesp, err, func(les LEs) string {
1560                                         return fmt.Sprintf(
1561                                                 "SP with %s (nice %s): unmarshaling DONE",
1562                                                 state.Node.Name, NicenessFmt(state.Nice),
1563                                         )
1564                                 })
1565                                 return nil, err
1566                         }
1567                         pktName := Base32Codec.EncodeToString(done.Hash[:])
1568                         lesp = append(lesp, LE{"Pkt", pktName}, LE{"XX", string(TTx)})
1569                         logMsg := func(les LEs) string {
1570                                 return fmt.Sprintf(
1571                                         "SP with %s (nice %s): DONE: removing %s",
1572                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1573                                 )
1574                         }
1575                         state.Ctx.LogD("sp-done", lesp, logMsg)
1576                         pth := filepath.Join(
1577                                 state.Ctx.Spool,
1578                                 state.Node.Id.String(),
1579                                 string(TTx),
1580                                 pktName,
1581                         )
1582                         if err = os.Remove(pth); err == nil {
1583                                 state.Ctx.LogI("sp-done", lesp, func(les LEs) string {
1584                                         return fmt.Sprintf("Packet %s is sent", pktName)
1585                                 })
1586                                 if state.Ctx.HdrUsage {
1587                                         os.Remove(pth + HdrSuffix)
1588                                 }
1589                         } else {
1590                                 state.Ctx.LogE("sp-done", lesp, err, logMsg)
1591                         }
1592
1593                 case SPTypeFreq:
1594                         lesp := append(les, LE{"Type", "freq"})
1595                         state.Ctx.LogD("sp-process-freq", lesp, func(les LEs) string {
1596                                 return fmt.Sprintf(
1597                                         "SP with %s (nice %s): unmarshaling FREQ",
1598                                         state.Node.Name, NicenessFmt(state.Nice),
1599                                 )
1600                         })
1601                         var freq SPFreq
1602                         if _, err = xdr.Unmarshal(r, &freq); err != nil {
1603                                 state.Ctx.LogE("sp-process-freq", lesp, err, func(les LEs) string {
1604                                         return fmt.Sprintf(
1605                                                 "SP with %s (nice %s): unmarshaling FREQ",
1606                                                 state.Node.Name, NicenessFmt(state.Nice),
1607                                         )
1608                                 })
1609                                 return nil, err
1610                         }
1611                         pktName := Base32Codec.EncodeToString(freq.Hash[:])
1612                         lesp = append(lesp, LE{"Pkt", pktName}, LE{"Offset", freq.Offset})
1613                         state.Ctx.LogD("sp-process-freq-queueing", lesp, func(les LEs) string {
1614                                 return fmt.Sprintf(
1615                                         "SP with %s (nice %s): FREQ %s: queuing",
1616                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1617                                 )
1618                         })
1619                         nice, exists := state.infosOurSeen[*freq.Hash]
1620                         if exists {
1621                                 if state.onlyPkts == nil || !state.onlyPkts[*freq.Hash] {
1622                                         state.Lock()
1623                                         insertIdx := 0
1624                                         var freqWithNice *FreqWithNice
1625                                         for insertIdx, freqWithNice = range state.queueTheir {
1626                                                 if freqWithNice.nice > nice {
1627                                                         break
1628                                                 }
1629                                         }
1630                                         state.queueTheir = append(state.queueTheir, nil)
1631                                         copy(state.queueTheir[insertIdx+1:], state.queueTheir[insertIdx:])
1632                                         state.queueTheir[insertIdx] = &FreqWithNice{&freq, nice}
1633                                         state.Unlock()
1634                                 } else {
1635                                         state.Ctx.LogD("sp-process-freq-skip", lesp, func(les LEs) string {
1636                                                 return fmt.Sprintf(
1637                                                         "SP with %s (nice %s): FREQ %s: skipping",
1638                                                         state.Node.Name, NicenessFmt(state.Nice), pktName,
1639                                                 )
1640                                         })
1641                                 }
1642                         } else {
1643                                 state.Ctx.LogD("sp-process-freq-unknown", lesp, func(les LEs) string {
1644                                         return fmt.Sprintf(
1645                                                 "SP with %s (nice %s): FREQ %s: unknown",
1646                                                 state.Node.Name, NicenessFmt(state.Nice), pktName,
1647                                         )
1648                                 })
1649                         }
1650
1651                 default:
1652                         state.Ctx.LogE(
1653                                 "sp-process-type-unknown",
1654                                 append(les, LE{"Type", head.Type}),
1655                                 errors.New("unknown type"),
1656                                 func(les LEs) string {
1657                                         return fmt.Sprintf(
1658                                                 "SP with %s (nice %s): %d",
1659                                                 state.Node.Name, NicenessFmt(state.Nice), head.Type,
1660                                         )
1661                                 },
1662                         )
1663                         return nil, BadPktType
1664                 }
1665         }
1666
1667         if infosGot {
1668                 var pkts int
1669                 var size uint64
1670                 state.RLock()
1671                 for _, info := range state.infosTheir {
1672                         pkts++
1673                         size += info.Size
1674                 }
1675                 state.RUnlock()
1676                 state.Ctx.LogI("sp-infos-rx", LEs{
1677                         {"XX", string(TRx)},
1678                         {"Node", state.Node.Id},
1679                         {"Pkts", pkts},
1680                         {"Size", int64(size)},
1681                 }, func(les LEs) string {
1682                         return fmt.Sprintf(
1683                                 "%s has got for us: %d packets, %s",
1684                                 state.Node.Name, pkts, humanize.IBytes(size),
1685                         )
1686                 })
1687         }
1688         return payloadsSplit(replies), nil
1689 }
1690
1691 func SPChecker(ctx *Ctx) {
1692         for t := range spCheckerTasks {
1693                 pktName := Base32Codec.EncodeToString(t.hsh[:])
1694                 les := LEs{
1695                         {"XX", string(TRx)},
1696                         {"Node", t.nodeId},
1697                         {"Pkt", pktName},
1698                 }
1699                 SPCheckerWg.Add(1)
1700                 ctx.LogD("sp-checker", les, func(les LEs) string {
1701                         return fmt.Sprintf("Checksumming %s/rx/%s", ctx.NodeName(t.nodeId), pktName)
1702                 })
1703                 size, err := ctx.CheckNoCK(t.nodeId, t.hsh, t.mth)
1704                 les = append(les, LE{"Size", size})
1705                 if err != nil {
1706                         ctx.LogE("sp-checker", les, err, func(les LEs) string {
1707                                 return fmt.Sprintf(
1708                                         "Checksumming %s/rx/%s (%s)", ctx.NodeName(t.nodeId), pktName,
1709                                         humanize.IBytes(uint64(size)),
1710                                 )
1711                         })
1712                         SPCheckerWg.Done()
1713                         continue
1714                 }
1715                 ctx.LogI("sp-checker-done", les, func(les LEs) string {
1716                         return fmt.Sprintf(
1717                                 "Packet %s is retreived (%s)",
1718                                 pktName, humanize.IBytes(uint64(size)),
1719                         )
1720                 })
1721                 SPCheckerWg.Done()
1722                 go func(t SPCheckerTask) {
1723                         defer func() { recover() }()
1724                         t.done <- MarshalSP(SPTypeDone, SPDone{t.hsh})
1725                 }(t)
1726         }
1727 }